* Fix bug in analytics manager where source of affinity link was wrong. 71/5371/1
authorSuchi Raman <suchi.raman@plexxi.com>
Mon, 17 Feb 2014 16:51:45 +0000 (11:51 -0500)
committerSuchi Raman <suchi.raman@plexxi.com>
Mon, 17 Feb 2014 16:51:45 +0000 (11:51 -0500)
* Add a demo script (demo3) to test rate detection on affinity link.
* Miscellaneous debug print statements.

Signed-off-by: Suchi Raman <suchi.raman@plexxi.com>
affinity/implementation/src/main/java/org/opendaylight/affinity/affinity/internal/AffinityManagerImpl.java
analytics/implementation/src/main/java/org/opendaylight/affinity/analytics/internal/AnalyticsManager.java
scripts/affinity.py
scripts/analytics.py
scripts/demo3.py [new file with mode: 0644]
scripts/stats.py

index 33d6b058194d01aca6e53cf13d627b53b9c88d4f..c1c1210e6c1a243715ab8b9a9df03cc1ffdfdc34 100644 (file)
@@ -371,9 +371,9 @@ public class AffinityManagerImpl implements IAffinityManager,
 
         try {
             for (AffinityIdentifier h : ag.getAllElements()) {
-                log.debug("host = {}", h);
                 if (hostTracker != null) {
                     Host host1 = hostTracker.hostFind((InetAddress) h.get());
+                    //                    log.debug("host = {}", host1);
                     hostList.add(host1);
                 }
             }
index 93ec5de7a84d4111977ca1b1fd435a9a6dc6dd1a..a3e65f447edc052f7dc6d559da4d4746594351f7 100644 (file)
@@ -191,13 +191,23 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager
      * per-protocol (across all protocols if protocol is null).*/
     protected long getByteCount(Set<Host> srcSet, Set<Host> dstSet, Byte protocol) {
         long byteCount = 0;
+        /**
+        log.info("Printing all sources and destinations.");
+        for (Host h: srcSet) {
+            log.info("src: {}, DL {}, inet {}", h, h.getDataLayerAddress(), h.getNetworkAddress());
+        }
+        for (Host h: dstSet) {
+            log.info("dst: {}, DL {}, inet {}", h, h.getDataLayerAddress(), h.getNetworkAddress());
+        }
+        **/
         for (Host src : srcSet) {
             for (Host dst : dstSet) {
                 if (this.hostsToStats.get(src) != null &&
                     this.hostsToStats.get(src).get(dst) != null) {
-                    if (protocol == null)
+                    if (protocol == null) {
                         byteCount += this.hostsToStats.get(src).get(dst).getByteCount();
-                    else
+                        //                        log.info("Source and destination: {} and {}.", src, dst);
+                    } else
                         byteCount += this.hostsToStats.get(src).get(dst).getByteCount(protocol);
                 }
             }
@@ -302,7 +312,7 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager
 
     /* AffinityLink -> Set of source Hosts */
     private Set<Host> getSrcSet(AffinityLink al) {
-        return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getToGroup()));
+        return new HashSet<Host>(this.affinityManager.getAllElementsByHost(al.getFromGroup()));
     }
 
     /* AffinityLink -> Set of destination Hosts */
index 095253ce0dbbbcd2fe79d1133174b483843ef9e6..d96a16593e79df2591570a5f6451ea17a32aa50e 100644 (file)
@@ -98,8 +98,12 @@ def client_ws_example():
     put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/create/link/inflows/from/clients/to/webservers'
     rest_method(put_url, "PUT")
 
+#    print "add subnet to webservers"
+#    put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/group/webservers/addsubnet/ipprefix/10.0.0.1/mask/31'
+#    rest_method(put_url, "PUT")
+
     print "add subnet to webservers"
-    put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/group/webservers/addsubnet/ipprefix/10.0.0.1/mask/31'
+    put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/group/webservers/add/ip/10.0.0.1'
     rest_method(put_url, "PUT")
 
     print "add ip to external"    
@@ -267,13 +271,13 @@ def add_static_host_tap(al, ipaddr):
     put_url = 'http://localhost:8080/affinity/nb/v2/affinity/default/link/' + al + '/settap/' + ipaddr
     rest_method(put_url, "PUT")
     
-#if __name__ == "__main__":
-#    main()
 
 #opener = {}
 #init_urllib()
 h = httplib2.Http(".cache")
 h.add_credentials('admin', 'admin')
+#if __name__ == "__main__":
+#    main()
 
 
     
index 3b1003819894d7ee2cfd43ddfb1c8b6e57b6f732..757be9f1067dbb75d38014bc2bf607ef79bf807f 100644 (file)
@@ -97,7 +97,7 @@ def stats_link(al, do_print=True):
     url = "http://localhost:8080/affinity/nb/v2/analytics/default/affinitylinkstats/%s" % al
     data = rest_method(url, "GET")
     if (do_print):
-        print("%s bytes (%s packets) over %s seconds (%s bit/s)" % (data["byteCount"], data["packetCount"], data["duration"], data["bitRate"]))
+        print("stats_link: %s %s bytes (%s packets) over %s seconds (%s bit/s)" % (url, data["byteCount"], data["packetCount"], data["duration"], data["bitRate"]))
     return data
 
 def stats_link_protocol(al, protocol, do_print=True):
diff --git a/scripts/demo3.py b/scripts/demo3.py
new file mode 100644 (file)
index 0000000..829fa10
--- /dev/null
@@ -0,0 +1,112 @@
+#!/usr/local/bin/python
+
+'''
+Copyright (c) 2013 Plexxi, Inc.  All rights reserved.
+
+This program and the accompanying materials are made available under the
+terms of the Eclipse Public License v1.0 which accompanies this distribution,
+and is available at http://www.eclipse.org/legal/epl-v10.html
+'''
+
+import httplib2
+import json
+import signal
+import time
+
+from threading import Thread
+
+from affinity_control import AffinityControl
+from subnet import SubnetControl
+from stats import Stats
+
+import analytics
+
+global link_name
+global rate_low
+global rate_high
+global prefix
+
+link_name = "inflows"
+rate_low = (100 * 10**3)  # 100 kbps
+rate_high = (1 * 10**6) # 1 Mbps
+prefix = "10.0.0.254/8"
+
+# If True, SIG_INT has been captured
+global sigint
+sigint = False
+
+# Handle SIG_INT
+def signal_handler(signal, frame):
+    global sigint
+    sigint = True
+
+# Monitors statistics
+class WaypointMonitor(Thread):
+
+    def __init__(self, monitor_type, **kwargs):
+        Thread.__init__(self)
+        self.stat = Stats(monitor_type, **kwargs)
+        self.stat_type = monitor_type
+        self.waypoint_address = None
+        print "Created waypoint monitor for %s" % self.stat
+
+    def set_waypoint(self, waypoint_ip):
+        self.waypoint_address = waypoint_ip
+        print "Registered waypoint for %s.  Any large flows will be redirected to %s." % (self.stat, waypoint_ip)
+
+    def set_high_threshold(self, s):
+        self.stat.set_high_threshold(s)
+        print "Set high threshold for flow rate to %d bps" % s
+        print("-------------------------")
+
+    def set_low_threshold(self, s):
+        self.stat.set_low_threshold(s)
+        print "Set low threshold for flow rate to %d bps" % s
+        print("-------------------------")
+
+    def run(self):
+        global sigint
+        high_rate_set = False
+        while not sigint:
+            is_high, is_low = self.stat.refresh()
+            print "Rate is: %f" % (self.stat.get_ewma_rate())
+            if is_high and not high_rate_set:
+                print "Fast flow detected (%3.3f bit/s)" % (self.stat.get_ewma_rate())
+                ac = AffinityControl()
+#                ac.add_waypoint(link_name, self.waypoint_address)
+##                ac.add_isolate(link_name)
+##                ac.enable_affinity()
+                high_rate_set = True
+#                time.sleep(3)            
+            elif is_low and high_rate_set: 
+                print "Flow rate below threshold (%3.3f bit/s)" % (self.stat.get_ewma_rate())
+##               ac.remove_isolate(link_name)
+##               ac.disable_affinity()
+                high_rate_set = False
+            time.sleep(1)
+
+def main():
+
+    # Default subnet is required for the host tracker to work.
+    subnet_control = SubnetControl()
+    subnet_control.list()
+    subnet_control.add_subnet("defaultSubnet", prefix)
+
+    raw_input("[Press enter when mininet is ready] ")
+    print("-------------------------")
+
+    m = WaypointMonitor(Stats.TYPE_AL, al="inflows")
+#    m.set_waypoint("10.0.0.2")
+    m.set_high_threshold(rate_high) 
+    m.set_low_threshold(rate_low)
+    m.start()
+
+    # Register signal-handler to catch SIG_INT
+    signal.signal(signal.SIGINT, signal_handler)
+    signal.pause()
+
+    # join() won't return until SIG_INT has been captured
+    m.join()
+
+if __name__ == "__main__":
+    main()
index 6d80f9480b3352a6b736a3ca8b4e617cbfad5421..4bf6fbdd97606866e7302f624892f6393dc0b64d 100644 (file)
@@ -31,6 +31,7 @@ class Stats:
 
         self.large_flow_threshold = 5 * 10**6 # in bytes
         self.rate_threshold = 1 * 10**6 # bits/sec
+        print "Created stats"
 
     def __str__(self):
         if (self.stat_type == Stats.TYPE_HOST):
@@ -48,19 +49,26 @@ class Stats:
             self.stats = analytics.stats_hosts(self.src, self.dst, False)
             self.protocol_stats = analytics.all_stats_hosts(self.src, self.dst, False)
         elif (self.stat_type == Stats.TYPE_AL):
-            self.stats = analytics.stats_link(self.al, False)
-            self.protocol_stats = analytics.all_stats_link(self.al, False)
+            self.stats = analytics.stats_link(self.al, True)
+            self.protocol_stats = analytics.all_stats_link(self.al, True)
         elif (self.stat_type == Stats.TYPE_SUBNET):
             self.stats = analytics.stats_subnet("null/null", self.subnet, True)
             self.protocol_stats = analytics.all_stats_subnet("null/null", self.subnet, True)
         try:
+            # Flag this as high if above high threshold, and low if below low threshold
+            is_high, is_low = self.calc_ewma_rate()
 #            is_fast = self.handle_rate_ewma()
-            is_fast = self.calc_ewma_rate()
-            is_big = self.check_large_flow()
-            return [is_fast, is_big]
+#            is_big = self.check_large_flow()
+            return [is_high, is_low]
         except:
             return [False, False]
 
+    def set_high_threshold(self, s):
+        self.rate_high_threshold = s;
+
+    def set_low_threshold(self, s):
+        self.rate_low_threshold = s;
+
     def set_large_flow_threshold(self, s):
         self.large_flow_threshold = s;
 
@@ -108,7 +116,9 @@ class Stats:
     def calc_ewma_rate(self):
         alpha = .75
 
-        return_val = False
+        is_high = False
+        is_low = False
+
         if (self.prev_t == None or self.prev_bytes == None): 
             new_bitrate = 0
         else:
@@ -120,14 +130,21 @@ class Stats:
             self.rate_ewma = 0
         else: 
             self.rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
-        if (self.rate_ewma > self.rate_threshold): 
-            print "Rate exceeded %f" % (self.rate_ewma)
-            return_val = True
 
+        if (self.rate_ewma > self.rate_high_threshold): 
+            print "Rate exceeded %f" % (self.rate_ewma)
+            is_high = True
+        
+        if (self.rate_ewma < self.rate_high_threshold): 
+            print "Rate dropped %f" % (self.rate_ewma)
+            is_low = True
+        
         # Update the time and bytes snapshots
-        self.prev_t = self.get_duration();
+        self.prev_t = self.get_duration()
         self.prev_bytes = self.get_bytes()
-        return return_val
+#        print "calc_ewma_rate: [high=%s:low=%s] rate=%f" % (is_high, is_low, self.rate_ewma)
+        print "calc_ewma_rate: rate=%f" % (self.rate_ewma)
+        return [is_high, is_low]
 
 
     # Returns true if this is a large flow