* Add a demo script (demo3) to test rate detection on affinity link.
* Miscellaneous debug print statements.
Signed-off-by: Suchi Raman <suchi.raman@plexxi.com>
try {
for (AffinityIdentifier h : ag.getAllElements()) {
- log.debug("host = {}", h);
if (hostTracker != null) {
Host host1 = hostTracker.hostFind((InetAddress) h.get());
+ // log.debug("host = {}", host1);
hostList.add(host1);
}
}
* per-protocol (across all protocols if protocol is null).*/
protected long getByteCount(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
long byteCount = 0;
+ /**
+ log.info("Printing all sources and destinations.");
+ for (Host h: srcSet) {
+ log.info("src: {}, DL {}, inet {}", h, h.getDataLayerAddress(), h.getNetworkAddress());
+ }
+ for (Host h: dstSet) {
+ log.info("dst: {}, DL {}, inet {}", h, h.getDataLayerAddress(), h.getNetworkAddress());
+ }
+ **/
for (Host src : srcSet) {
for (Host dst : dstSet) {
if (this.hostsToStats.get(src) != null &&
this.hostsToStats.get(src).get(dst) != null) {
- if (protocol == null)
+ if (protocol == null) {
byteCount += this.hostsToStats.get(src).get(dst).getByteCount();
- else
+ // log.info("Source and destination: {} and {}.", src, dst);
+ } else
byteCount += this.hostsToStats.get(src).get(dst).getByteCount(protocol);
}
}
/* AffinityLink -> Set of source Hosts */
private Set<Host> getSrcSet(AffinityLink al) {
- return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getToGroup()));
+ return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getFromGroup()));
}
/* AffinityLink -> Set of destination Hosts */
put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/create/link/inflows/from/clients/to/webservers'
rest_method(put_url, "PUT")
+# print "add subnet to webservers"
+# put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/group/webservers/addsubnet/ipprefix/10.0.0.1/mask/31'
+# rest_method(put_url, "PUT")
+
print "add subnet to webservers"
- put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/group/webservers/addsubnet/ipprefix/10.0.0.1/mask/31'
+ put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/group/webservers/add/ip/10.0.0.1'
rest_method(put_url, "PUT")
print "add ip to external"
put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/link/' + al + '/settap/' + ipaddr
rest_method(put_url, "PUT")
-#if __name__ == "__main__":
-# main()
#opener = {}
#init_urllib()
h = httplib2.Http(".cache")
h.add_credentials('admin', 'admin')
+#if __name__ == "__main__":
+# main()
url = "http://localhost:8080/affinity/nb/v2/analytics/default/affinitylinkstats/%s" % al
data = rest_method(url, "GET")
if (do_print):
- print("%s bytes (%s packets) over %s seconds (%s bit/s)" % (data["byteCount"], data["packetCount"], data["duration"], data["bitRate"]))
+ print("stats_link: %s %s bytes (%s packets) over %s seconds (%s bit/s)" % (url, data["byteCount"], data["packetCount"], data["duration"], data["bitRate"]))
return data
def stats_link_protocol(al, protocol, do_print=True):
--- /dev/null
+#!/usr/local/bin/python
+
+'''
+Copyright (c) 2013 Plexxi, Inc. All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+'''
+
+import httplib2
+import json
+import signal
+import time
+
+from threading import Thread
+
+from affinity_control import AffinityControl
+from subnet import SubnetControl
+from stats import Stats
+
+import analytics
+
+global link_name
+global rate_low
+global rate_high
+global prefix
+
+link_name = "inflows"
+rate_low = (100 * 10**3) # 100 kbps
+rate_high = (1 * 10**6) # 1 Mbps
+prefix = "10.0.0.254/8"
+
+# If True, SIG_INT has been captured
+global sigint
+sigint = False
+
+# Handle SIG_INT
+def signal_handler(signal, frame):
+ global sigint
+ sigint = True
+
+# Monitors statistics
+class WaypointMonitor(Thread):
+
+ def __init__(self, monitor_type, **kwargs):
+ Thread.__init__(self)
+ self.stat = Stats(monitor_type, **kwargs)
+ self.stat_type = monitor_type
+ self.waypoint_address = None
+ print "Created waypoint monitor for %s" % self.stat
+
+ def set_waypoint(self, waypoint_ip):
+ self.waypoint_address = waypoint_ip
+ print "Registered waypoint for %s. Any large flows will be redirected to %s." % (self.stat, waypoint_ip)
+
+ def set_high_threshold(self, s):
+ self.stat.set_high_threshold(s)
+ print "Set high threshold for flow rate to %d bps" % s
+ print("-------------------------")
+
+ def set_low_threshold(self, s):
+ self.stat.set_low_threshold(s)
+ print "Set low threshold for flow rate to %d bps" % s
+ print("-------------------------")
+
+ def run(self):
+ global sigint
+ high_rate_set = False
+ while not sigint:
+ is_high, is_low = self.stat.refresh()
+ print "Rate is: %f" % (self.stat.get_ewma_rate())
+ if is_high and not high_rate_set:
+ print "Fast flow detected (%3.3f bit/s)" % (self.stat.get_ewma_rate())
+ ac = AffinityControl()
+# ac.add_waypoint(link_name, self.waypoint_address)
+## ac.add_isolate(link_name)
+## ac.enable_affinity()
+ high_rate_set = True
+# time.sleep(3)
+ elif is_low and high_rate_set:
+ print "Flow rate below threshold (%3.3f bit/s)" % (self.stat.get_ewma_rate())
+## ac.remove_isolate(link_name)
+## ac.disable_affinity()
+ high_rate_set = False
+ time.sleep(1)
+
+def main():
+
+ # Default subnet is required for the host tracker to work.
+ subnet_control = SubnetControl()
+ subnet_control.list()
+ subnet_control.add_subnet("defaultSubnet", prefix)
+
+ raw_input("[Press enter when mininet is ready] ")
+ print("-------------------------")
+
+ m = WaypointMonitor(Stats.TYPE_AL, al="inflows")
+# m.set_waypoint("10.0.0.2")
+ m.set_high_threshold(rate_high)
+ m.set_low_threshold(rate_low)
+ m.start()
+
+ # Register signal-handler to catch SIG_INT
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.pause()
+
+ # join() won't return until SIG_INT has been captured
+ m.join()
+
+if __name__ == "__main__":
+ main()
self.large_flow_threshold = 5 * 10**6 # in bytes
self.rate_threshold = 1 * 10**6 # bits/sec
+ print "Created stats"
def __str__(self):
if (self.stat_type == Stats.TYPE_HOST):
self.stats = analytics.stats_hosts(self.src, self.dst, False)
self.protocol_stats = analytics.all_stats_hosts(self.src, self.dst, False)
elif (self.stat_type == Stats.TYPE_AL):
- self.stats = analytics.stats_link(self.al, False)
- self.protocol_stats = analytics.all_stats_link(self.al, False)
+ self.stats = analytics.stats_link(self.al, True)
+ self.protocol_stats = analytics.all_stats_link(self.al, True)
elif (self.stat_type == Stats.TYPE_SUBNET):
self.stats = analytics.stats_subnet("null/null", self.subnet, True)
self.protocol_stats = analytics.all_stats_subnet("null/null", self.subnet, True)
try:
+ # Flag this as high if above high threshold, and low if below low threshold
+ is_high, is_low = self.calc_ewma_rate()
# is_fast = self.handle_rate_ewma()
- is_fast = self.calc_ewma_rate()
- is_big = self.check_large_flow()
- return [is_fast, is_big]
+# is_big = self.check_large_flow()
+ return [is_high, is_low]
except:
return [False, False]
+ def set_high_threshold(self, s):
+ self.rate_high_threshold = s;
+
+ def set_low_threshold(self, s):
+ self.rate_low_threshold = s;
+
def set_large_flow_threshold(self, s):
self.large_flow_threshold = s;
def calc_ewma_rate(self):
alpha = .75
- return_val = False
+ is_high = False
+ is_low = False
+
if (self.prev_t == None or self.prev_bytes == None):
new_bitrate = 0
else:
self.rate_ewma = 0
else:
self.rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
- if (self.rate_ewma > self.rate_threshold):
- print "Rate exceeded %f" % (self.rate_ewma)
- return_val = True
+ if (self.rate_ewma > self.rate_high_threshold):
+ print "Rate exceeded %f" % (self.rate_ewma)
+ is_high = True
+
+ if (self.rate_ewma < self.rate_high_threshold):
+ print "Rate dropped %f" % (self.rate_ewma)
+ is_low = True
+
# Update the time and bytes snapshots
- self.prev_t = self.get_duration();
+ self.prev_t = self.get_duration()
self.prev_bytes = self.get_bytes()
- return return_val
+# print "calc_ewma_rate: [high=%s:low=%s] rate=%f" % (is_high, is_low, self.rate_ewma)
+ print "calc_ewma_rate: rate=%f" % (self.rate_ewma)
+ return [is_high, is_low]
# Returns true if this is a large flow