From: Katrina LaCurts Date: Wed, 6 Nov 2013 17:23:58 +0000 (-0500) Subject: Waypoint redirection now takes into account how much each host contributed to a large... X-Git-Tag: jenkins-affinity-bulk-release-prepare-only-1~33 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=e51c127a773ce23130cfc919212d7a51eefff928;p=affinity.git Waypoint redirection now takes into account how much each host contributed to a large flow, and creates an AG containing only the hosts responsible for at least 1/nth of the traffic. Signed-off-by: Katrina LaCurts --- diff --git a/analytics/api/src/main/java/org/opendaylight/affinity/analytics/IAnalyticsManager.java b/analytics/api/src/main/java/org/opendaylight/affinity/analytics/IAnalyticsManager.java index 6d1c04d..8b5baa8 100644 --- a/analytics/api/src/main/java/org/opendaylight/affinity/analytics/IAnalyticsManager.java +++ b/analytics/api/src/main/java/org/opendaylight/affinity/analytics/IAnalyticsManager.java @@ -8,7 +8,7 @@ package org.opendaylight.affinity.analytics; -import java.util.Set; +import java.util.Map; import org.opendaylight.affinity.affinity.AffinityLink; import org.opendaylight.controller.sal.core.Host; @@ -25,5 +25,5 @@ public interface IAnalyticsManager { long getByteCountIntoPrefix(String ipAndMask); - Set getIncomingHosts(String ipAndMask); + Map getIncomingHosts(String ipAndMask); } diff --git a/analytics/implementation/src/main/java/org/opendaylight/affinity/analytics/internal/AnalyticsManager.java b/analytics/implementation/src/main/java/org/opendaylight/affinity/analytics/internal/AnalyticsManager.java index 4248f4e..b59ba38 100644 --- a/analytics/implementation/src/main/java/org/opendaylight/affinity/analytics/internal/AnalyticsManager.java +++ b/analytics/implementation/src/main/java/org/opendaylight/affinity/analytics/internal/AnalyticsManager.java @@ -278,13 +278,13 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager } // Return the set of hosts that transfer data to the targetHost - public Set getIncomingHosts(Host targetHost) { - Set incomingHosts = new HashSet(); + public Map getIncomingHosts(Host targetHost) { + Map incomingHosts = new HashMap(); for (Host sourceHost : this.hostsToStats.keySet()) { if (this.hostsToStats.get(sourceHost).get(targetHost) != null) { long bytes = this.hostsToStats.get(sourceHost).get(targetHost).getByteCount(); if (bytes > 0) { - incomingHosts.add(sourceHost); + incomingHosts.put(sourceHost, bytes); } } } @@ -292,10 +292,10 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager } // Return the set of hosts that transfer data into any host in this subnet - public Set getIncomingHosts(String prefixAndMask) { + public Map getIncomingHosts(String prefixAndMask) { InetAddress ip; Short mask; - Set hosts = new HashSet(); + Map hosts = new HashMap(); // Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5) try { @@ -313,51 +313,27 @@ public class AnalyticsManager implements IReadServiceListener, IAnalyticsManager for (HostNodeConnector host : allHosts) { InetAddress hostPrefix = getPrefix(host.getNetworkAddress(), mask); if (hostPrefix.equals(targetPrefix)) { - hosts.addAll(getIncomingHosts(host)); + Map these_hosts = getIncomingHosts(host); + // Merge the two maps by summing bytes between them if necessary + for (Host h : these_hosts.keySet()) { + if (hosts.get(h) == null) { + hosts.put(h, these_hosts.get(h)); + } else { + hosts.put(h, these_hosts.get(h) + hosts.get(h)); + } + } } } return hosts; } - public long getByteCountIntoHost(Host targetHost) { - long totalBytes = 0; - // We're calculating bytes *into* the target host, not out of - for (Host sourceHost : this.hostsToStats.keySet()) { - if (this.hostsToStats.get(sourceHost).get(targetHost) != null) { - totalBytes += this.hostsToStats.get(sourceHost).get(targetHost).getByteCount(); - } - } - return totalBytes; - } - public long getByteCountIntoPrefix(String prefixAndMask) { long totalBytes = 0; - InetAddress ip; - Short mask; - - Set hosts = getIncomingHosts(prefixAndMask); - - // Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5) - try { - String[] splitPrefix = prefixAndMask.split("/"); - ip = InetAddress.getByName(splitPrefix[0]); - mask = (splitPrefix.length == 2) ? Short.valueOf(splitPrefix[1]) : 32; - } catch (UnknownHostException e) { - log.debug("Incorrect prefix/mask format: " + prefixAndMask); - return 0; + Map hostData = getIncomingHosts(prefixAndMask); + for (Host h : hostData.keySet()) { + totalBytes += hostData.get(h); } - - // Match on prefixes - InetAddress targetPrefix = getPrefix(ip, mask); - Set allHosts = this.hostTracker.getAllHosts(); - for (HostNodeConnector host : allHosts) { - InetAddress hostPrefix = getPrefix(host.getNetworkAddress(), mask); - if (hostPrefix.equals(targetPrefix)) { - totalBytes += getByteCountIntoHost(host); - } - } - return totalBytes; } diff --git a/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AllHosts.java b/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AllHosts.java index 0769e9d..6198698 100644 --- a/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AllHosts.java +++ b/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AllHosts.java @@ -8,27 +8,28 @@ package org.opendaylight.affinity.analytics.northbound; -import java.util.List; +import java.util.Map; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -@XmlRootElement(name = "list") +@XmlRootElement(name = "map") @XmlAccessorType(XmlAccessType.NONE) public class AllHosts { @XmlElement - List hosts; + Map hosts; + // TODO: There is a better way to serialize a map @SuppressWarnings("unused") // To satisfy JAXB private AllHosts() {} - public AllHosts(List hostIPs) { - this.hosts = hostIPs; + public AllHosts(Map hostData) { + this.hosts = hostData; } - public List getHosts() { + public Map getHosts() { return this.hosts; } } diff --git a/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AnalyticsNorthbound.java b/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AnalyticsNorthbound.java index 307d421..8ee947a 100644 --- a/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AnalyticsNorthbound.java +++ b/analytics/northbound/src/main/java/org/opendaylight/affinity/analytics/northbound/AnalyticsNorthbound.java @@ -13,6 +13,8 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Set; +import java.util.Map; +import java.util.HashMap; import java.util.ArrayList; import javax.ws.rs.GET; @@ -244,13 +246,13 @@ public class AnalyticsNorthbound { throw new ServiceUnavailableException("Analytics " + RestMessages.SERVICEUNAVAILABLE.toString()); } - Set hosts = analyticsManager.getIncomingHosts(ip + "/" + mask); - List ips = new ArrayList(); - for (Host h : hosts) { + Map hostData = analyticsManager.getIncomingHosts(ip + "/" + mask); + Map hostDataWithStrings = new HashMap(); + for (Host h : hostData.keySet()) { InetAddress i = h.getNetworkAddress(); - ips.add(i.toString()); + hostDataWithStrings.put(i.toString(), hostData.get(h)); } - return new AllHosts(ips); + return new AllHosts(hostDataWithStrings); } private void handleDefaultDisabled(String containerName) { diff --git a/scripts/demo.py b/scripts/demo.py index eb254c3..78c4d6d 100644 --- a/scripts/demo.py +++ b/scripts/demo.py @@ -3,7 +3,6 @@ import httplib2 import json import signal -import sys import time from threading import Thread @@ -29,22 +28,29 @@ class WaypointMonitor(Thread): self.stat = Stats(monitor_type, **kwargs) self.stat_type = monitor_type self.waypoint_address = None + print "Created waypoint monitor for %s" % self.stat def set_waypoint(self, waypoint_ip): self.waypoint_address = waypoint_ip print "Registered waypoint for %s. Any large flows will be redirected to %s." % (self.stat, waypoint_ip) + def set_large_flow_threshold(self, s): + self.stat.set_large_flow_threshold(s) + print "Set threshold for large flows to %d bytes" % s + def run(self): global sigint did_waypoint = False while not sigint: _, is_big = self.stat.refresh() if is_big and not did_waypoint: - print "Large flow detected" + print "Large flow detected (%d bytes)" % self.stat.get_bytes() ac = AffinityControl() # First AG: Sources sending data into this subnet src_ag_name = "sources" - src_ips = self.stat.get_incoming_hosts() + src_ips = self.stat.get_large_incoming_hosts() + if (self.waypoint_address in src_ips): + src_ips.remove(self.waypoint_address) ac.add_affinity_group(src_ag_name, ips=src_ips) # Second AG: This entity dst_ag_name = "client" @@ -70,6 +76,7 @@ def main(): m = WaypointMonitor(Stats.TYPE_PREFIX, subnet="10.0.0.0/31") m.set_waypoint("10.0.0.2") + m.set_large_flow_threshold(2000) # 2000 bytes m.start() # Register signal-handler to catch SIG_INT diff --git a/scripts/stats.py b/scripts/stats.py index bc12bee..b48ee7f 100644 --- a/scripts/stats.py +++ b/scripts/stats.py @@ -29,6 +29,7 @@ class Stats: self.stats = {} self.rate_ewma = None + self.large_flow_threshold = 5 * 10**6 # in bytes self.http = httplib2.Http(".cache") self.http.add_credentials('admin', 'admin') @@ -59,19 +60,30 @@ class Stats: print "error: ", e return [False, False] - # Return hosts that transferred data into this entity. Right now only supports prefixes. - def get_incoming_hosts(self): + def set_large_flow_threshold(self, s): + self.large_flow_threshold = s; + + # Return all hosts that transferred a particular percentage of data into this entity. Right now only supports prefixes. + def get_large_incoming_hosts(self): if (self.stat_type == Stats.TYPE_PREFIX): resp, content = self.http.request(self.url_prefix + "incoming/" + self.subnet, "GET") data = json.loads(content) - if (data != {}): - # IPs sometimes (always?) get returned as strings like /1.2.3.4; strip off the leading / - ips = [h.replace("/", "") for h in data['hosts']] - return ips + if (data == {}): return [] + host_data = data['hosts']['entry'] + ips = [] + total_bytes_in = self.get_bytes() + n = len(host_data) + for d in host_data: + bytes_from_ip = int(d['value']) + ip = d['key'].replace("/", "") # IPs sometimes (always?) get returned as strings like /1.2.3.4 + if (bytes_from_ip >= total_bytes_in / float(n)): + ips.append(ip) + return ips else: print "Stat type not supported for incoming hosts" return [] + # EWMA calculation for bit rate. Returns true if there is an anomaly. def handle_rate_ewma(self): alpha = .25 @@ -91,7 +103,7 @@ class Stats: # Returns true if this is a large flow def check_large_flow(self): - if (self.get_bytes() > 5 * (10**6)): + if (self.get_bytes() > self.large_flow_threshold): return True return False @@ -100,6 +112,7 @@ class Stats: try: bytes = long(self.stats["byteCount"]) except Exception as e: + print "exception: ", e bytes = 0 return bytes