package org.opendaylight.affinity.analytics;
+import java.util.Set;
+
import org.opendaylight.affinity.affinity.AffinityLink;
import org.opendaylight.controller.sal.core.Host;
double getBitRateOnAffinityLink(AffinityLink al);
long getByteCountIntoPrefix(String ipAndMask);
+
+ Set<Host> getIncomingHosts(String ipAndMask);
}
return newIP;
}
+ // Return the set of hosts that transfer data to the targetHost
+ public Set<Host> getIncomingHosts(Host targetHost) {
+ Set<Host> incomingHosts = new HashSet<Host>();
+ for (Host sourceHost : this.hostsToStats.keySet()) {
+ if (this.hostsToStats.get(sourceHost).get(targetHost) != null) {
+ long bytes = this.hostsToStats.get(sourceHost).get(targetHost).getByteCount();
+ if (bytes > 0) {
+ incomingHosts.add(sourceHost);
+ }
+ }
+ }
+ return incomingHosts;
+ }
+
+ // Return the set of hosts that transfer data into any host in this subnet
+ public Set<Host> getIncomingHosts(String prefixAndMask) {
+ InetAddress ip;
+ Short mask;
+ Set<Host> hosts = new HashSet<Host>();
+
+ // Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5)
+ try {
+ String[] splitPrefix = prefixAndMask.split("/");
+ ip = InetAddress.getByName(splitPrefix[0]);
+ mask = (splitPrefix.length == 2) ? Short.valueOf(splitPrefix[1]) : 32;
+ } catch (UnknownHostException e) {
+ log.debug("Incorrect prefix/mask format: " + prefixAndMask);
+ return hosts;
+ }
+
+ // Match on prefixes
+ InetAddress targetPrefix = getPrefix(ip, mask);
+ Set<HostNodeConnector> allHosts = this.hostTracker.getAllHosts();
+ for (HostNodeConnector host : allHosts) {
+ InetAddress hostPrefix = getPrefix(host.getNetworkAddress(), mask);
+ if (hostPrefix.equals(targetPrefix)) {
+ hosts.addAll(getIncomingHosts(host));
+ }
+ }
+
+ return hosts;
+ }
+
public long getByteCountIntoHost(Host targetHost) {
long totalBytes = 0;
// We're calculating bytes *into* the target host, not out of
InetAddress ip;
Short mask;
+ Set<Host> hosts = getIncomingHosts(prefixAndMask);
+
// Split 1.2.3.4/5 format into the prefix (1.2.3.4) and the mask (5)
try {
String[] splitPrefix = prefixAndMask.split("/");
--- /dev/null
+/*
+ * Copyright (c) 2013 Plexxi, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.affinity.analytics.northbound;
+
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "list")
+@XmlAccessorType(XmlAccessType.NONE)
+public class AllHosts {
+ @XmlElement
+ List<String> hosts;
+
+ @SuppressWarnings("unused") // To satisfy JAXB
+ private AllHosts() {}
+
+ public AllHosts(List<String> hostIPs) {
+ this.hosts = hostIPs;
+ }
+
+ public List<String> getHosts() {
+ return this.hosts;
+ }
+}
import java.net.UnknownHostException;
import java.util.List;
import java.util.Set;
+import java.util.ArrayList;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
}
/**
- * Returns TODO:
+ * Returns PrefixStatistics
*
* @param containerName
* Name of the Container. The Container name for the base
* controller is "default".
- * @param TODO:
- * @return TODO:
+ * @param ip
+ * @return mask
*/
@Path("/{containerName}/prefixstats/{ip}/{mask}/")
@GET
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
- // @TypeHint(Long.class)
+ @TypeHint(PrefixStatistics.class)
@StatusCodes({
@ResponseCode(code = 200, condition = "Operation successful"),
@ResponseCode(code = 404, condition = "The containerName is not found"),
return new PrefixStatistics(byteCount);
}
+ /**
+ * Returns TODO:
+ *
+ * @param containerName
+ * Name of the Container. The Container name for the base
+ * controller is "default".
+ * @param TODO:
+ * @return TODO:
+ */
+ @Path("/{containerName}/prefixstats/incoming/{ip}/{mask}/")
+ @GET
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ @TypeHint(AllHosts.class)
+ @StatusCodes({
+ @ResponseCode(code = 200, condition = "Operation successful"),
+ @ResponseCode(code = 404, condition = "The containerName is not found"),
+ @ResponseCode(code = 503, condition = "One or more of Controller Services are unavailable") })
+ public AllHosts getIncomingHosts(
+ @PathParam("containerName") String containerName,
+ @PathParam("ip") String ip,
+ @PathParam("mask") String mask) {
+ if (!NorthboundUtils.isAuthorized(getUserName(), containerName, Privilege.READ, this)) {
+ throw new UnauthorizedException("User is not authorized to perform this operation on container " + containerName);
+ }
+ handleDefaultDisabled(containerName);
+
+ IAnalyticsManager analyticsManager = getAnalyticsService(containerName);
+ if (analyticsManager == null) {
+ throw new ServiceUnavailableException("Analytics " + RestMessages.SERVICEUNAVAILABLE.toString());
+ }
+
+ Set<Host> hosts = analyticsManager.getIncomingHosts(ip + "/" + mask);
+ List<String> ips = new ArrayList<String>();
+ for (Host h : hosts) {
+ InetAddress i = h.getNetworkAddress();
+ ips.add(i.toString());
+ }
+ return new AllHosts(ips);
+ }
+
private void handleDefaultDisabled(String containerName) {
IContainerManager containerManager = (IContainerManager) ServiceHelper.getGlobalInstance(IContainerManager.class, this);
if (containerManager == null) {
--- /dev/null
+#!/usr/local/bin/python
+
+import httplib2
+import json
+
+class AffinityControl:
+
+ def __init__(self):
+ self.http = httplib2.Http(".cache")
+ self.http.add_credentials("admin", "admin")
+ self.url_prefix = "http://localhost:8080/affinity/nb/v2/affinity/default/"
+
+ # Add affinity group
+ def add_affinity_group(self, group_name, **kwargs):
+ # Create the group
+ resp, content = self.http.request(self.url_prefix + "create/group/%s" % group_name, "PUT")
+ if (resp.status != 201):
+ print "AffinityGroup %s could not be created" % group_name
+ return
+ # If a list of IPs is passed, add each one
+ if "ips" in kwargs:
+ ips = kwargs['ips']
+ for ip in ips:
+ resp, content = self.http.request(self.url_prefix + "group/%s/add/ip/%s" % (group_name, ip), "PUT")
+ if (resp.status != 201):
+ print "IP %s could not be added to AffinityGroup %s" % (ip, group_name)
+ return
+ print "AffinityGroup %s added successfully. IPs are %s" % (group_name, ips)
+ # If a subnet is passed, add that
+ elif "subnet" in kwargs:
+ ip, mask = kwargs['subnet'].split("/")
+ resp, content = self.http.request(self.url_prefix + "group/%s/addsubnet/ipprefix/%s/mask/%s" % (group_name, ip, mask), "PUT")
+ if (resp.status != 201):
+ print "AffinityGroup could not be created for subnet %s/%s" % (ip, mask)
+ return
+ print "AffinityGroup %s added successfully. Subnet is %s/%s" % (group_name, ip, mask)
+
+ # Add affinity link
+ def add_affinity_link(self, link_name, src_group, dst_group):
+ resp, content = self.http.request(self.url_prefix + "create/link/%s/from/%s/to/%s" % (link_name, src_group, dst_group), "PUT")
+ if (resp.status != 201):
+ print "AffinityLink %s could not be added between %s and %s" % (link_name, src_group, dst_group)
+ return
+ print "AffinityLink %s added between %s and %s" % (link_name, src_group, dst_group)
+
+ # Add waypoint
+ def add_waypoint(self, link_name, ip):
+ resp, content = self.http.request(self.url_prefix + "link/%s/setwaypoint/%s" % (link_name, ip), "PUT")
+ if (resp.status != 201):
+ print "Waypoint %s could not be set for link %s" % (ip, link_name)
+ return
+ print "Waypoint %s successfully set for link %s" % (ip, link_name)
import sys
import time
+from stats import Stats
+from subnet import SubnetControl
+from affinity_control import AffinityControl
+
# 1. Start the controller
# 2. On the local machine (e.g., your laptop), start this script.
# > python analytics.py
# (There is a usage prompt that prints at the beginning of analytics.py)
# 5. Type 'quit' to exit analytics.py
-
-'''
-Class for keeping track of host stats or affinity link stats, depending.
-'''
-class Stats:
-
- # TODO: Each stat should probably be a thread, and handle its
- # own output and refreshing for the EWMA
-
- def __init__(self, stat_type, **kwargs):
- self.stat_type = stat_type
- if stat_type == "host":
- self.src = kwargs['src']
- self.dst = kwargs['dst']
- self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/hoststats/"
- elif stat_type == "affinityLink":
- self.al = kwargs['al']
- self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/affinitylinkstats/"
- else:
- print "incorrect stat type", stat_type
-
- self.stats = {}
- self.rate_ewma = None
-
- self.http = httplib2.Http(".cache")
- self.http.add_credentials('admin', 'admin')
- self.refresh()
-
- def __str__(self):
- if (self.stat_type == "host"):
- return "host pair %s -> %s" % (self.src, self.dst)
- elif (self.stat_type == "affinityLink"):
- return "AffinityLink %s" % self.al
- else:
- return "Unknown Stats type"
-
- # Refresh statistics
- def refresh(self):
- if (self.stat_type == "host"):
- resp, content = self.http.request(self.url_prefix + self.src + "/" + self.dst, "GET")
- elif (self.stat_type == "affinityLink"):
- resp, content = self.http.request(self.url_prefix + self.al, "GET")
- self.stats = json.loads(content)
- self.handle_rate_ewma()
- self.check_large_flow()
-
- # EWMA calculation for bit rate
- def handle_rate_ewma(self):
- alpha = .25
- anomaly_threshold = 2.0
- new_bitrate = self.get_bit_rate()
-
- if self.rate_ewma == None:
- self.rate_ewma = new_bitrate
- else:
- new_rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
- if (self.rate_ewma > 0 and new_rate_ewma > anomaly_threshold * self.rate_ewma):
- print "!! Anomaly detected on %s" % self
- print "!! Rate rose from %1.1f Mbit/s to %1.1f Mbit/s" % ((self.rate_ewma/10**6), (new_rate_ewma/10**6))
- self.rate_ewma = new_rate_ewma
-
- def check_large_flow(self):
- if (self.get_bytes() > 5 * (10**6)):
- print "!! Large flow detected on %s" % self
-
- # Bytes
- def get_bytes(self):
- try:
- bytes = long(self.stats["byteCount"])
- except Exception as e:
- bytes = 0
- return bytes
-
- # Bit Rate
- def get_bit_rate(self):
- try:
- bitrate = float(self.stats["bitRate"])
- except Exception as e:
- bitrate = 0.0
- return bitrate
-
-
-class AffinityControl:
-
- def __init__(self):
- self.http = httplib2.Http(".cache")
- self.http.add_credentials("admin", "admin")
- self.url_prefix = "http://localhost:8080/affinity/nb/v2/affinity/default/"
- self.groups = []
- self.links = []
-
- def add_affinity_group(self, group_name, ips):
- resp, content = self.http.request(self.url_prefix + "create/group/%s" % group_name, "PUT")
- if (resp.status != 201):
- print "AffinityGroup %s could not be created" % group_name
- return
- for ip in ips:
- resp, content = self.http.request(self.url_prefix + "group/%s/add/ip/%s" % (group_name, ip), "PUT")
- if (resp.status != 201):
- print "IP %s could not be added to AffinityGroup %s" % (ip, group_name)
- return
- self.groups.append(group_name)
- print "AffinityGroup %s added successfully. IPs are %s" % (group_name, ips)
-
-
- def add_affinity_link(self, link_name, src_group, dst_group):
- resp, content = self.http.request(self.url_prefix + "create/link/%s/from/%s/to/%s" % (link_name, src_group, dst_group), "PUT")
- if (resp.status != 201):
- print "AffinityLink %s could not be added between %s and %s" % (link_name, src_group, dst_group)
- return
- self.links.append(link_name)
- print "AffinityLink %s added between %s and %s" % (link_name, src_group, dst_group)
-
-
-'''
-Class for controlling subnets. Right now, just adds subnets and
-checks whether they exist, because that's all we need.
-'''
-class SubnetControl:
-
- def __init__(self):
- self.http = httplib2.Http(".cache")
- self.http.add_credentials("admin", "admin")
- self.url_prefix = "http://localhost:8080/controller/nb/v2/subnetservice/default/"
-
- # Checks whether subnet exists. Checks against the actual subnet
- # string (e.g., "10.0.0.255/1"), not the subnet name. Will not
- # catch things like overlapping subnets.
- def exists(self, subnet):
- resp, content = self.http.request(self.url_prefix + "subnets", "GET")
- if (resp.status != 200):
- print "Fatal error - can't check for subnet existence"
- sys.exit(-1)
- data = json.loads(content)
-
- for key in data["subnetConfig"]:
- if (key["subnet"] == subnet):
- return True
- return False
-
- # Add a subnet if it doesn't already exist.
- def add_subnet(self, subnet_name, subnet):
- if (self.exists(subnet)):
- print "subnet", subnet, "already exists"
- return
- subnet_config = dict(name=subnet_name, subnet=subnet)
- json_data = json.dumps(subnet_config)
- resp, content = self.http.request(self.url_prefix + "subnet/" + subnet_name, "POST", json_data, {'Content-Type': 'application/json'})
- if (resp.status == 201):
- print "subnet", subnet, "added"
- else:
- print "subnet", subnet, "could not be added"
-
-
def run_interactive_mode():
print "Usage: [host | link] [bytes | rate] [src dst | link-name]"
--- /dev/null
+#!/usr/local/bin/python
+
+import httplib2
+import json
+import signal
+import sys
+import time
+
+from threading import Thread
+
+from affinity_control import AffinityControl
+from subnet import SubnetControl
+from stats import Stats
+
+# If True, SIG_INT has been captured
+global sigint
+sigint = False
+
+# Monitors statistics
+class Monitor(Thread):
+
+ def __init__(self, monitor_type, **kwargs):
+ Thread.__init__(self)
+ self.stat = Stats(monitor_type, **kwargs)
+ self.stat_type = monitor_type
+ self.did_waypoint = False
+
+ def run(self):
+ global sigint
+ while not sigint:
+ is_fast, is_big = self.stat.refresh()
+ self.stat.get_incoming_hosts()
+ if is_big and not self.did_waypoint:
+ print "Large flow; redirect here"
+ ac = AffinityControl()
+ # First AG: Sources sending data into this subnet
+ src_ips = self.stat.get_incoming_hosts()
+ ac.add_affinity_group("webservers", ips=src_ips)
+ # Second AG: This entity
+ if (self.stat_type == "prefix"):
+ ac.add_affinity_group("clients", subnet=self.stat.subnet)
+ else:
+ print "type", self.stat_type, "not supported for redirection"
+ # AL: Between them
+ ac.add_affinity_link("inflows", "webservers", "client")
+ # TODO: This IP should be an option
+ ac.add_waypoint("inflows", "10.0.0.2")
+ self.did_waypoint = True
+ time.sleep(1)
+
+# Handle SIG_INT
+def signal_handler(signal, frame):
+ global sigint
+ sigint = True
+
+def main():
+
+ # Default subnet is required for the host tracker to work.
+ subnet_control = SubnetControl()
+ subnet_control.add_subnet("defaultSubnet", "10.0.0.254/8")
+
+ m = Monitor("prefix", subnet="10.0.0.0/31")
+ m.start()
+
+ # Register signal-handler to catch SIG_INT
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.pause()
+
+ # join() won't return until SIG_INT has been captured
+ m.join()
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+import httplib2
+import json
+import pprint
+
+h = httplib2.Http(".cache")
+h.add_credentials('admin', 'admin')
+
+# Flow statistics.
+def get_flow_stats():
+ global h
+ resp, content = h.request('http://localhost:8080/controller/nb/v2/statistics/default/flow', "GET")
+ allFlowStats = json.loads(content)
+
+ # raw dump
+ print content
+
+ # raw dumps
+ json.dumps(allFlowStats, indent=2, default=str)
+ s = pprint.pformat(allFlowStats, indent=4)
+ print s
+
+
+ for fs in allFlowStats["flowStatistics"]:
+ node = fs["node"]
+ flows = fs["flowStatistic"]
+
+ print "#### Switch = " + node["id"] + ", type = " + node["type"]
+ print "# flows = %d" % len(flows)
+ for f in flows:
+ print f["flow"]["match"], "priority = ", f["flow"]["priority"]
+ print "\t Actions:"
+ for a in f["flow"]["actions"]:
+ print "\t \t", a
+
+
+def get_all_nodes():
+ global h
+ resp, content = h.request('http://localhost:8080/controller/nb/v2/switchmanager/default/nodes', 'GET')
+ nodes = json.loads(content)
+ return nodes
+
+get_flow_stats()
+#get_all_nodes()
+#!/usr/bin/python
+
import httplib2
import json
-import pprint
-
-h = httplib2.Http(".cache")
-h.add_credentials('admin', 'admin')
-
-# Flow statistics.
-def get_flow_stats():
- global h
- resp, content = h.request('http://localhost:8080/controller/nb/v2/statistics/default/flow', "GET")
- allFlowStats = json.loads(content)
-
- # raw dump
- print content
-
- # raw dumps
- json.dumps(allFlowStats, indent=2, default=str)
- s = pprint.pformat(allFlowStats, indent=4)
- print s
-
-
- for fs in allFlowStats["flowStatistics"]:
- node = fs["node"]
- flows = fs["flowStatistic"]
-
- print "#### Switch = " + node["id"] + ", type = " + node["type"]
- print "# flows = %d" % len(flows)
- for f in flows:
- print f["flow"]["match"], "priority = ", f["flow"]["priority"]
- print "\t Actions:"
- for a in f["flow"]["actions"]:
- print "\t \t", a
-
-
-def get_all_nodes():
- global h
- resp, content = h.request('http://localhost:8080/controller/nb/v2/switchmanager/default/nodes', 'GET')
- nodes = json.loads(content)
- return nodes
-
-get_flow_stats()
-#get_all_nodes()
+
+'''
+Class for keeping track of host stats or affinity link stats, depending.
+'''
+class Stats:
+
+ def __init__(self, stat_type, **kwargs):
+ self.stat_type = stat_type
+ if stat_type == "host":
+ self.src = kwargs['src']
+ self.dst = kwargs['dst']
+ self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/hoststats/"
+ elif stat_type == "affinityLink":
+ self.al = kwargs['al']
+ self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/affinitylinkstats/"
+ elif stat_type == "prefix":
+ self.subnet = kwargs['subnet']
+ self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/prefixstats/"
+ else:
+ print "incorrect stat type", stat_type
+
+ self.stats = {}
+ self.rate_ewma = None
+ self.http = httplib2.Http(".cache")
+ self.http.add_credentials('admin', 'admin')
+
+ def __str__(self):
+ if (self.stat_type == "host"):
+ return "host pair %s -> %s" % (self.src, self.dst)
+ elif (self.stat_type == "affinityLink"):
+ return "AffinityLink %s" % self.al
+ elif (self.stat_type == "prefix"):
+ return "Prefix %s" % self.subnet
+ else:
+ return "Unknown Stats type"
+
+ # Refresh statistics
+ def refresh(self):
+ if (self.stat_type == "host"):
+ resp, content = self.http.request(self.url_prefix + self.src + "/" + self.dst, "GET")
+ elif (self.stat_type == "affinityLink"):
+ resp, content = self.http.request(self.url_prefix + self.al, "GET")
+ elif (self.stat_type == "prefix"):
+ resp, content = self.http.request(self.url_prefix + self.subnet, "GET")
+ try:
+ self.stats = json.loads(content)
+ is_fast = self.handle_rate_ewma()
+ is_big = self.check_large_flow()
+ return [is_fast, is_big]
+ except Exception as e:
+ print "error: ", e
+ return [False, False]
+
+ # Return hosts that transferred data into this entity. Right now only supports prefixes.
+ def get_incoming_hosts(self):
+ if (self.stat_type == "prefix"):
+ resp, content = self.http.request(self.url_prefix + "incoming/" + self.subnet, "GET")
+ data = json.loads(content)
+ if (data != {}):
+ # IPs sometimes (always?) get returned as strings like /1.2.3.4; strip off the leading /
+ ips = [h.replace("/", "") for h in data['hosts']]
+ return ips
+ return []
+
+ # EWMA calculation for bit rate. Returns true if there is an anomaly.
+ def handle_rate_ewma(self):
+ alpha = .25
+ anomaly_threshold = 2.0
+ new_bitrate = self.get_bit_rate()
+
+ return_val = False
+
+ if self.rate_ewma == None:
+ self.rate_ewma = new_bitrate
+ else:
+ new_rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
+ if (self.rate_ewma > 0 and new_rate_ewma > anomaly_threshold * self.rate_ewma):
+ return_val = True
+ self.rate_ewma = new_rate_ewma
+ return return_val
+
+ # Returns true if this is a large flow
+ def check_large_flow(self):
+ if (self.get_bytes() > 5 * (10**6)):
+ return True
+ return False
+
+ # Bytes
+ def get_bytes(self):
+ try:
+ bytes = long(self.stats["byteCount"])
+ except Exception as e:
+ bytes = 0
+ return bytes
+
+ # Bit Rate
+ def get_bit_rate(self):
+ try:
+ bitrate = float(self.stats["bitRate"])
+ except Exception as e:
+ bitrate = 0.0
+ return bitrate
--- /dev/null
+#!/usr/bin/python
+
+import httplib2
+import json
+import sys
+
+'''
+Class for controlling subnets. Right now, just adds subnets and
+checks whether they exist, because that's all we need.
+'''
+class SubnetControl:
+
+ def __init__(self):
+ self.http = httplib2.Http(".cache")
+ self.http.add_credentials("admin", "admin")
+ self.url_prefix = "http://localhost:8080/controller/nb/v2/subnetservice/default/"
+
+ # Checks whether subnet exists. Checks against the actual subnet
+ # string (e.g., "10.0.0.255/1"), not the subnet name. Will not
+ # catch things like overlapping subnets.
+ def exists(self, subnet):
+ resp, content = self.http.request(self.url_prefix + "subnets", "GET")
+ if (resp.status != 200):
+ print "Fatal error - can't check for subnet existence"
+ sys.exit(-1)
+
+ data = json.loads(content)
+ for key in data["subnetConfig"]:
+ if (key["subnet"] == subnet):
+ return True
+ return False
+
+ # Add a subnet if it doesn't already exist.
+ def add_subnet(self, subnet_name, subnet):
+ if (self.exists(subnet)):
+ print "subnet", subnet, "already exists"
+ return
+ subnet_config = dict(name=subnet_name, subnet=subnet)
+ json_data = json.dumps(subnet_config)
+ resp, content = self.http.request(self.url_prefix + "subnet/" + subnet_name, "POST", json_data, {'Content-Type': 'application/json'})
+ if (resp.status == 201):
+ print "subnet", subnet, "added"
+ else:
+ print "subnet", subnet, "could not be added"