package org.opendaylight.affinity.analytics;
-import java.util.Set;
+import java.util.Map;
import org.opendaylight.affinity.affinity.AffinityLink;
import org.opendaylight.controller.sal.core.Host;
long getByteCountIntoPrefix(String ipAndMask);
- Set<Host> getIncomingHosts(String ipAndMask);
+ Map<Host, Long> getIncomingHosts(String ipAndMask);
}
}
// Return the set of hosts that transfer data to the targetHost
- public Set<Host> getIncomingHosts(Host targetHost) {
- Set<Host> incomingHosts = new HashSet<Host>();
+ public Map<Host, Long> getIncomingHosts(Host targetHost) {
+ Map<Host, Long> incomingHosts = new HashMap<Host, Long>();
for (Host sourceHost : this.hostsToStats.keySet()) {
if (this.hostsToStats.get(sourceHost).get(targetHost) != null) {
long bytes = this.hostsToStats.get(sourceHost).get(targetHost).getByteCount();
if (bytes > 0) {
- incomingHosts.add(sourceHost);
+ incomingHosts.put(sourceHost, bytes);
}
}
}
}
// Return the set of hosts that transfer data into any host in this subnet
- public Set<Host> getIncomingHosts(String prefixAndMask) {
+ public Map<Host, Long> getIncomingHosts(String prefixAndMask) {
InetAddress ip;
Short mask;
- Set<Host> hosts = new HashSet<Host>();
+ Map<Host, Long> hosts = new HashMap<Host, Long>();
// Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5)
try {
for (HostNodeConnector host : allHosts) {
InetAddress hostPrefix = getPrefix(host.getNetworkAddress(), mask);
if (hostPrefix.equals(targetPrefix)) {
- hosts.addAll(getIncomingHosts(host));
+ Map<Host, Long> these_hosts = getIncomingHosts(host);
+ // Merge the two maps by summing bytes between them if necessary
+ for (Host h : these_hosts.keySet()) {
+ if (hosts.get(h) == null) {
+ hosts.put(h, these_hosts.get(h));
+ } else {
+ hosts.put(h, these_hosts.get(h) + hosts.get(h));
+ }
+ }
}
}
return hosts;
}
- public long getByteCountIntoHost(Host targetHost) {
- long totalBytes = 0;
- // We're calculating bytes *into* the target host, not out of
- for (Host sourceHost : this.hostsToStats.keySet()) {
- if (this.hostsToStats.get(sourceHost).get(targetHost) != null) {
- totalBytes += this.hostsToStats.get(sourceHost).get(targetHost).getByteCount();
- }
- }
- return totalBytes;
- }
-
public long getByteCountIntoPrefix(String prefixAndMask) {
long totalBytes = 0;
- InetAddress ip;
- Short mask;
-
- Set<Host> hosts = getIncomingHosts(prefixAndMask);
-
- // Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5)
- try {
- String[] splitPrefix = prefixAndMask.split("/");
- ip = InetAddress.getByName(splitPrefix[0]);
- mask = (splitPrefix.length == 2) ? Short.valueOf(splitPrefix[1]) : 32;
- } catch (UnknownHostException e) {
- log.debug("Incorrect prefix/mask format: " + prefixAndMask);
- return 0;
+ Map<Host, Long> hostData = getIncomingHosts(prefixAndMask);
+ for (Host h : hostData.keySet()) {
+ totalBytes += hostData.get(h);
}
-
- // Match on prefixes
- InetAddress targetPrefix = getPrefix(ip, mask);
- Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
- for (HostNodeConnector host : allHosts) {
- InetAddress hostPrefix = getPrefix(host.getNetworkAddress(), mask);
- if (hostPrefix.equals(targetPrefix)) {
- totalBytes += getByteCountIntoHost(host);
- }
- }
-
return totalBytes;
}
package org.opendaylight.affinity.analytics.northbound;
-import java.util.List;
+import java.util.Map;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
-@XmlRootElement(name = "list")
+@XmlRootElement(name = "map")
@XmlAccessorType(XmlAccessType.NONE)
public class AllHosts {
@XmlElement
- List<String> hosts;
+ Map<String, Long> hosts;
+ // TODO: There is a better way to serialize a map
@SuppressWarnings("unused") // To satisfy JAXB
private AllHosts() {}
- public AllHosts(List<String> hostIPs) {
- this.hosts = hostIPs;
+ public AllHosts(Map<String, Long> hostData) {
+ this.hosts = hostData;
}
- public List<String> getHosts() {
+ public Map<String, Long> getHosts() {
return this.hosts;
}
}
import java.net.UnknownHostException;
import java.util.List;
import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
import java.util.ArrayList;
import javax.ws.rs.GET;
throw new ServiceUnavailableException("Analytics " + RestMessages.SERVICEUNAVAILABLE.toString());
}
- Set<Host> hosts = analyticsManager.getIncomingHosts(ip + "/" + mask);
- List<String> ips = new ArrayList<String>();
- for (Host h : hosts) {
+ Map<Host, Long> hostData = analyticsManager.getIncomingHosts(ip + "/" + mask);
+ Map<String, Long> hostDataWithStrings = new HashMap<String, Long>();
+ for (Host h : hostData.keySet()) {
InetAddress i = h.getNetworkAddress();
- ips.add(i.toString());
+ hostDataWithStrings.put(i.toString(), hostData.get(h));
}
- return new AllHosts(ips);
+ return new AllHosts(hostDataWithStrings);
}
private void handleDefaultDisabled(String containerName) {
import httplib2
import json
import signal
-import sys
import time
from threading import Thread
self.stat = Stats(monitor_type, **kwargs)
self.stat_type = monitor_type
self.waypoint_address = None
+ print "Created waypoint monitor for %s" % self.stat
def set_waypoint(self, waypoint_ip):
self.waypoint_address = waypoint_ip
print "Registered waypoint for %s. Any large flows will be redirected to %s." % (self.stat, waypoint_ip)
+ def set_large_flow_threshold(self, s):
+ self.stat.set_large_flow_threshold(s)
+ print "Set threshold for large flows to %d bytes" % s
+
def run(self):
global sigint
did_waypoint = False
while not sigint:
_, is_big = self.stat.refresh()
if is_big and not did_waypoint:
- print "Large flow detected"
+ print "Large flow detected (%d bytes)" % self.stat.get_bytes()
ac = AffinityControl()
# First AG: Sources sending data into this subnet
src_ag_name = "sources"
- src_ips = self.stat.get_incoming_hosts()
+ src_ips = self.stat.get_large_incoming_hosts()
+ if (self.waypoint_address in src_ips):
+ src_ips.remove(self.waypoint_address)
ac.add_affinity_group(src_ag_name, ips=src_ips)
# Second AG: This entity
dst_ag_name = "client"
m = WaypointMonitor(Stats.TYPE_PREFIX, subnet="10.0.0.0/31")
m.set_waypoint("10.0.0.2")
+ m.set_large_flow_threshold(2000) # 2000 bytes
m.start()
# Register signal-handler to catch SIG_INT
self.stats = {}
self.rate_ewma = None
+ self.large_flow_threshold = 5 * 10**6 # in bytes
self.http = httplib2.Http(".cache")
self.http.add_credentials('admin', 'admin')
print "error: ", e
return [False, False]
- # Return hosts that transferred data into this entity. Right now only supports prefixes.
- def get_incoming_hosts(self):
+ def set_large_flow_threshold(self, s):
+ self.large_flow_threshold = s;
+
+ # Return all hosts that transferred a particular percentage of data into this entity. Right now only supports prefixes.
+ def get_large_incoming_hosts(self):
if (self.stat_type == Stats.TYPE_PREFIX):
resp, content = self.http.request(self.url_prefix + "incoming/" + self.subnet, "GET")
data = json.loads(content)
- if (data != {}):
- # IPs sometimes (always?) get returned as strings like /1.2.3.4; strip off the leading /
- ips = [h.replace("/", "") for h in data['hosts']]
- return ips
+ if (data == {}): return []
+ host_data = data['hosts']['entry']
+ ips = []
+ total_bytes_in = self.get_bytes()
+ n = len(host_data)
+ for d in host_data:
+ bytes_from_ip = int(d['value'])
+ ip = d['key'].replace("/", "") # IPs sometimes (always?) get returned as strings like /1.2.3.4
+ if (bytes_from_ip >= total_bytes_in / float(n)):
+ ips.append(ip)
+ return ips
else:
print "Stat type not supported for incoming hosts"
return []
+
# EWMA calculation for bit rate. Returns true if there is an anomaly.
def handle_rate_ewma(self):
alpha = .25
# Returns true if this is a large flow
def check_large_flow(self):
- if (self.get_bytes() > 5 * (10**6)):
+ if (self.get_bytes() > self.large_flow_threshold):
return True
return False
try:
bytes = long(self.stats["byteCount"])
except Exception as e:
+ print "exception: ", e
bytes = 0
return bytes