Improvements to demo.py script. 77/5177/1
authorSuchi Raman <suchi.raman@plexxi.com>
Thu, 6 Feb 2014 17:01:49 +0000 (12:01 -0500)
committerSuchi Raman <suchi.raman@plexxi.com>
Thu, 6 Feb 2014 17:01:49 +0000 (12:01 -0500)
Signed-off-by: Suchi Raman <suchi.raman@plexxi.com>
scripts/demo.py
scripts/stats.py

index 60f239f66c18a1fa8fbdf8d7086240616ce79ec0..514990621bf7aa8c4a40361f3d524bef457d42ee 100644 (file)
@@ -63,11 +63,11 @@ class WaypointMonitor(Thread):
         print "Set threshold for large flows to %d bytes" % s
         print("-------------------------")
 
-    def run(self):
+    def run2(self):
         global sigint
         did_waypoint = False
         while not sigint:
-            _, is_big = self.stat.refresh()
+            is_fast, is_big = self.stat.refresh()
             if is_big and not did_waypoint:
                 print "Large flow detected (%d bytes, %d packets, %3.3f bit/s)" % (self.stat.get_bytes(), self.stat.get_packets(), self.stat.get_bit_rate())
                 print "   ICMP: %d bytes, %d packets" % (self.stat.get_bytes(1), self.stat.get_packets(1))
@@ -103,6 +103,51 @@ class WaypointMonitor(Thread):
 #                ac.disable_waypoint(link_name)
             time.sleep(1)
 
+
+
+    def run(self):
+        global sigint
+        did_waypoint = False
+        while not sigint:
+            is_fast, is_big = self.stat.refresh()
+            if is_fast and not did_waypoint:
+                print "Fast flow detected (%3.3f bit/s)" % (self.stat.get_ewma_rate())
+                ac = AffinityControl()
+                # First AG: Sources sending data into this subnet
+                src_ag_name = "sources"
+                # Hosts that transmit > 10% of link capacity
+                src_ips = self.stat.get_large_incoming_hosts()
+                print "LFD src list = ", src_ips
+                print "Discard waypoint address"
+                if (self.waypoint_address in src_ips):
+                    src_ips.remove(self.waypoint_address)
+                ac.add_affinity_group(src_ag_name, ips=src_ips)
+
+                # Second AG: This entity
+                dst_ag_name = "client"
+                if (self.stat_type == Stats.TYPE_SUBNET):
+                    ac.add_affinity_group(dst_ag_name, subnet=self.stat.subnet)
+                elif (self.stat_type == Stats.TYPE_HOST):
+                    pass
+                else:
+                    print "type", self.stat_type, "not supported for redirection"
+
+                # AL: Between them
+                link_name = "inflows"
+                ac.add_affinity_link(link_name, src_ag_name, dst_ag_name)
+                ac.add_waypoint(link_name, self.waypoint_address)
+                ac.add_isolate(link_name)
+                ac.enable_affinity()
+                did_waypoint = True
+                time.sleep(10)
+            
+            # Flow now gone, disable affinity.
+            elif did_waypoint and not is_fast: 
+                ac.disable_affinity()
+                did_waypoint = False
+            time.sleep(1)
+
 def main():
 
     # Default subnet is required for the host tracker to work.
index 096da966255752967a054b7fa0acd76171d2be16..6d80f9480b3352a6b736a3ca8b4e617cbfad5421 100644 (file)
@@ -26,7 +26,11 @@ class Stats:
         self.stats = {}
         self.protocol_stats = {}
         self.rate_ewma = None
+        self.prev_t = None
+        self.prev_bytes = None
+
         self.large_flow_threshold = 5 * 10**6 # in bytes
+        self.rate_threshold = 1 * 10**6 # bits/sec
 
     def __str__(self):
         if (self.stat_type == Stats.TYPE_HOST):
@@ -50,7 +54,8 @@ class Stats:
             self.stats = analytics.stats_subnet("null/null", self.subnet, True)
             self.protocol_stats = analytics.all_stats_subnet("null/null", self.subnet, True)
         try:
-            is_fast = self.handle_rate_ewma()
+#            is_fast = self.handle_rate_ewma()
+            is_fast = self.calc_ewma_rate()
             is_big = self.check_large_flow()
             return [is_fast, is_big]
         except:
@@ -79,21 +84,52 @@ class Stats:
 
     # EWMA calculation for bit rate.  Returns true if there is an anomaly.
     def handle_rate_ewma(self):
-        alpha = .25
+        alpha = .75
         anomaly_threshold = 2.0
         new_bitrate = self.get_bit_rate()
+        print "bitrate = %f" % (new_bitrate)
 
         return_val = False
 
         if self.rate_ewma == None:
             self.rate_ewma = new_bitrate
         else:
-            new_rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
-            if (self.rate_ewma > 0 and new_rate_ewma > anomaly_threshold * self.rate_ewma):
-                return_val = True
-            self.rate_ewma = new_rate_ewma
+            self.rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
+
+        if (new_rate_ewma > self.rate_threshold):
+            return_val = True
         return return_val
 
+    def get_ewma_rate(self):
+        alpha = .75
+        return self.rate_ewma
+
+    # EWMA calculation for bit rate.  Returns true if there is an anomaly.
+    def calc_ewma_rate(self):
+        alpha = .75
+
+        return_val = False
+        if (self.prev_t == None or self.prev_bytes == None): 
+            new_bitrate = 0
+        else:
+            new_bitrate = (self.get_bytes() - self.prev_bytes)/(self.get_duration() - self.prev_t)
+            print "Rate is now %f" % (new_bitrate)
+
+        # Calculate ewma rate from instantaneous rate and check if it crosses threshold.
+        if (self.rate_ewma == None): 
+            self.rate_ewma = 0
+        else: 
+            self.rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
+        if (self.rate_ewma > self.rate_threshold): 
+            print "Rate exceeded %f" % (self.rate_ewma)
+            return_val = True
+
+        # Update the time and bytes snapshots
+        self.prev_t = self.get_duration();
+        self.prev_bytes = self.get_bytes()
+        return return_val
+
+
     # Returns true if this is a large flow
     def check_large_flow(self):
         if (self.get_bytes() > self.large_flow_threshold):
@@ -111,6 +147,17 @@ class Stats:
             bytes = 0
         return bytes
 
+    # Duration of longest flow in group
+    def get_duration(self, protocol=None):
+        try:
+            if (protocol == None):
+                seconds = long(self.stats["duration"])
+            else:
+                seconds = long(self.protocol_stats[protocol]["duration"])
+        except Exception as e:
+            seconds = 0
+        return seconds
+
     # Packets
     def get_packets(self, protocol=None):
         try: