Waypoint redirection now takes into account how much each host contributed to a large... 52/2452/1
authorKatrina LaCurts <katrina.lacurts@plexxi.com>
Wed, 6 Nov 2013 17:23:58 +0000 (12:23 -0500)
committerKatrina LaCurts <katrina.lacurts@plexxi.com>
Wed, 6 Nov 2013 17:23:58 +0000 (12:23 -0500)
Signed-off-by: Katrina LaCurts <katrina.lacurts@plexxi.com>
analytics/api/src/main/java/org/opendaylight/affinity/analytics/IAnalyticsManager.java
analytics/implementation/src/main/java/org/opendaylight/affinity/analytics/internal/AnalyticsManager.java
analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AllHosts.java
analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AnalyticsNorthbound.java
scripts/demo.py
scripts/stats.py

index 6d1c04dc59922f360a2162c0f92f4e3f2a4166da..8b5baa858ea779b7fc41edc1ed235fa3f5f1e520 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.affinity.analytics;
 
-import java.util.Set;
+import java.util.Map;
 
 import org.opendaylight.affinity.affinity.AffinityLink;
 import org.opendaylight.controller.sal.core.Host;
@@ -25,5 +25,5 @@ public interface IAnalyticsManager {
 
     long getByteCountIntoPrefix(String ipAndMask);
 
-    Set<Host> getIncomingHosts(String ipAndMask);
+    Map<Host, Long> getIncomingHosts(String ipAndMask);
 }
index 4248f4e97ffb677c8d3060ef2266b4f3bb03b735..b59ba38403cb61dda911839873b91e1258d217ec 100644 (file)
@@ -278,13 +278,13 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager
     }
 
     // Return the set of hosts that transfer data to the targetHost
-    public Set<Host> getIncomingHosts(Host targetHost) {
-        Set<Host> incomingHosts = new HashSet<Host>();
+    public Map<Host, Long> getIncomingHosts(Host targetHost) {
+        Map<Host, Long> incomingHosts = new HashMap<Host, Long>();
         for (Host sourceHost : this.hostsToStats.keySet()) {
             if (this.hostsToStats.get(sourceHost).get(targetHost) != null) {
                 long bytes = this.hostsToStats.get(sourceHost).get(targetHost).getByteCount();
                 if (bytes > 0) {
-                    incomingHosts.add(sourceHost);
+                    incomingHosts.put(sourceHost, bytes);
                 }
             }
         }
@@ -292,10 +292,10 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager
     }
 
     // Return the set of hosts that transfer data into any host in this subnet
-    public Set<Host> getIncomingHosts(String prefixAndMask) {
+    public Map<Host, Long> getIncomingHosts(String prefixAndMask) {
         InetAddress ip;
         Short mask;
-        Set<Host> hosts = new HashSet<Host>();
+        Map<Host, Long> hosts = new HashMap<Host, Long>();
 
         // Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5)
         try {
@@ -313,51 +313,27 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager
         for (HostNodeConnector host : allHosts) {
             InetAddress hostPrefix = getPrefix(host.getNetworkAddress(), mask);
             if (hostPrefix.equals(targetPrefix)) {
-                hosts.addAll(getIncomingHosts(host));
+                Map<Host, Long> these_hosts = getIncomingHosts(host);
+                // Merge the two maps by summing bytes between them if necessary
+                for (Host h : these_hosts.keySet()) {
+                    if (hosts.get(h) == null) {
+                        hosts.put(h, these_hosts.get(h));
+                    } else {
+                        hosts.put(h, these_hosts.get(h) + hosts.get(h));
+                    }
+                }
             }
         }
 
         return hosts;
     }
 
-    public long getByteCountIntoHost(Host targetHost) {
-        long totalBytes = 0;
-        // We're calculating bytes *into* the target host, not out of
-        for (Host sourceHost : this.hostsToStats.keySet()) {
-            if (this.hostsToStats.get(sourceHost).get(targetHost) != null) {
-                totalBytes += this.hostsToStats.get(sourceHost).get(targetHost).getByteCount();
-            }
-        }
-        return totalBytes;
-    }
-
     public long getByteCountIntoPrefix(String prefixAndMask) {
         long totalBytes = 0;
-        InetAddress ip;
-        Short mask;
-
-        Set<Host> hosts = getIncomingHosts(prefixAndMask);
-
-        // Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5)
-        try {
-            String[] splitPrefix = prefixAndMask.split("/");
-            ip = InetAddress.getByName(splitPrefix[0]);
-            mask = (splitPrefix.length == 2) ? Short.valueOf(splitPrefix[1]) : 32;
-        } catch (UnknownHostException e) {
-            log.debug("Incorrect prefix/mask format: " + prefixAndMask);
-            return 0;
+        Map<Host, Long> hostData = getIncomingHosts(prefixAndMask);
+        for (Host h : hostData.keySet()) {
+            totalBytes += hostData.get(h);
         }
-
-        // Match on prefixes
-        InetAddress targetPrefix = getPrefix(ip, mask);
-        Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
-        for (HostNodeConnector host : allHosts) {
-            InetAddress hostPrefix = getPrefix(host.getNetworkAddress(), mask);
-            if (hostPrefix.equals(targetPrefix)) {
-                totalBytes += getByteCountIntoHost(host);
-            }
-        }
-
         return totalBytes;
     }
 
index 0769e9d283df070332e2a5f7d5e0abc8b6be5658..6198698d021809ea3af86a23c3c6112bd9799431 100644 (file)
@@ -8,27 +8,28 @@
 
 package org.opendaylight.affinity.analytics.northbound;
 
-import java.util.List;
+import java.util.Map;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 
-@XmlRootElement(name = "list")
+@XmlRootElement(name = "map")
 @XmlAccessorType(XmlAccessType.NONE)
 public class AllHosts {
     @XmlElement
-    List<String> hosts;
+    Map<String, Long> hosts;
+    // TODO: There is a better way to serialize a map
 
     @SuppressWarnings("unused") // To satisfy JAXB
     private AllHosts() {}
 
-    public AllHosts(List<String> hostIPs) {
-        this.hosts = hostIPs;
+    public AllHosts(Map<String, Long> hostData) {
+        this.hosts = hostData;
     }
 
-    public List<String> getHosts() {
+    public Map<String, Long> getHosts() {
         return this.hosts;
     }
 }
index 307d421476880eb5530e8277956090b1b5d6a676..8ee947ae4c74ab266c78454d763dcc78da413bc2 100644 (file)
@@ -13,6 +13,8 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.ArrayList;
 
 import javax.ws.rs.GET;
@@ -244,13 +246,13 @@ public class AnalyticsNorthbound {
             throw new ServiceUnavailableException("Analytics " + RestMessages.SERVICEUNAVAILABLE.toString());
         }
 
-        Set<Host> hosts = analyticsManager.getIncomingHosts(ip + "/" + mask);
-        List<String> ips = new ArrayList<String>();
-        for (Host h : hosts) {
+        Map<Host, Long> hostData = analyticsManager.getIncomingHosts(ip + "/" + mask);
+        Map<String, Long> hostDataWithStrings = new HashMap<String, Long>();
+        for (Host h : hostData.keySet()) {
             InetAddress i = h.getNetworkAddress();
-            ips.add(i.toString());
+            hostDataWithStrings.put(i.toString(), hostData.get(h));
         }
-        return new AllHosts(ips);
+        return new AllHosts(hostDataWithStrings);
     }
 
     private void handleDefaultDisabled(String containerName) {
index eb254c319c6aa7f73cd303f8635f86ac6683ae95..78c4d6dd81d24f1be3af40d37c5b89e08e31bea6 100644 (file)
@@ -3,7 +3,6 @@
 import httplib2
 import json
 import signal
-import sys
 import time
 
 from threading import Thread
@@ -29,22 +28,29 @@ class WaypointMonitor(Thread):
         self.stat = Stats(monitor_type, **kwargs)
         self.stat_type = monitor_type
         self.waypoint_address = None
+        print "Created waypoint monitor for %s" % self.stat
 
     def set_waypoint(self, waypoint_ip):
         self.waypoint_address = waypoint_ip
         print "Registered waypoint for %s.  Any large flows will be redirected to %s." % (self.stat, waypoint_ip)
 
+    def set_large_flow_threshold(self, s):
+        self.stat.set_large_flow_threshold(s)
+        print "Set threshold for large flows to %d bytes" % s
+
     def run(self):
         global sigint
         did_waypoint = False
         while not sigint:
             _, is_big = self.stat.refresh()
             if is_big and not did_waypoint:
-                print "Large flow detected"
+                print "Large flow detected (%d bytes)" % self.stat.get_bytes()
                 ac = AffinityControl()
                 # First AG: Sources sending data into this subnet
                 src_ag_name = "sources"
-                src_ips = self.stat.get_incoming_hosts()
+                src_ips = self.stat.get_large_incoming_hosts()
+                if (self.waypoint_address in src_ips):
+                    src_ips.remove(self.waypoint_address)
                 ac.add_affinity_group(src_ag_name, ips=src_ips)
                 # Second AG: This entity
                 dst_ag_name = "client"
@@ -70,6 +76,7 @@ def main():
 
     m = WaypointMonitor(Stats.TYPE_PREFIX, subnet="10.0.0.0/31")
     m.set_waypoint("10.0.0.2")
+    m.set_large_flow_threshold(2000) # 2000 bytes
     m.start()
 
     # Register signal-handler to catch SIG_INT
index bc12bee22d97fd9c5daa4bd3d36947fe592f47a8..b48ee7f2bea11c9f4776781fe1388db2511a1901 100644 (file)
@@ -29,6 +29,7 @@ class Stats:
 
         self.stats = {}
         self.rate_ewma = None
+        self.large_flow_threshold = 5 * 10**6 # in bytes
         self.http = httplib2.Http(".cache")
         self.http.add_credentials('admin', 'admin')
 
@@ -59,19 +60,30 @@ class Stats:
             print "error: ", e
             return [False, False]
 
-    # Return hosts that transferred data into this entity.  Right now only supports prefixes.
-    def get_incoming_hosts(self):
+    def set_large_flow_threshold(self, s):
+        self.large_flow_threshold = s;
+
+    # Return all hosts that transferred a particular percentage of data into this entity.  Right now only supports prefixes.
+    def get_large_incoming_hosts(self):
         if (self.stat_type == Stats.TYPE_PREFIX):
             resp, content = self.http.request(self.url_prefix + "incoming/" + self.subnet, "GET")
             data = json.loads(content)
-            if (data != {}):
-                # IPs sometimes (always?) get returned as strings like /1.2.3.4; strip off the leading /
-                ips = [h.replace("/", "") for h in data['hosts']]
-                return ips
+            if (data == {}): return []
+            host_data = data['hosts']['entry']
+            ips = []
+            total_bytes_in = self.get_bytes()
+            n = len(host_data)
+            for d in host_data:
+                bytes_from_ip = int(d['value'])
+                ip = d['key'].replace("/", "") # IPs sometimes (always?) get returned as strings like /1.2.3.4
+                if (bytes_from_ip >= total_bytes_in / float(n)):
+                    ips.append(ip)
+            return ips
         else:
             print "Stat type not supported for incoming hosts"
         return []
 
+
     # EWMA calculation for bit rate.  Returns true if there is an anomaly.
     def handle_rate_ewma(self):
         alpha = .25
@@ -91,7 +103,7 @@ class Stats:
 
     # Returns true if this is a large flow
     def check_large_flow(self):
-        if (self.get_bytes() > 5 * (10**6)):
+        if (self.get_bytes() > self.large_flow_threshold):
             return True
         return False
 
@@ -100,6 +112,7 @@ class Stats:
         try:
             bytes = long(self.stats["byteCount"])
         except Exception as e:
+            print "exception: ", e
             bytes = 0
         return bytes