Prototype southbound actions using flow rules. 27/1627/1
authorSuchi Raman <suchi.raman@plexxi.com>
Thu, 3 Oct 2013 00:55:50 +0000 (20:55 -0400)
committerSuchi Raman <suchi.raman@plexxi.com>
Thu, 3 Oct 2013 00:57:19 +0000 (20:57 -0400)
Signed-off-by: Suchi Raman <suchi.raman@plexxi.com>
affinity/api/src/main/java/org/opendaylight/affinity/affinity/AffinityLink.java
affinity/api/src/main/java/org/opendaylight/affinity/affinity/IAffinityManager.java
affinity/implementation/src/main/java/org/opendaylight/affinity/affinity/internal/AffinityManagerImpl.java
l2agent/src/main/java/org/opendaylight/l2agent/L2Agent.java
scripts/analytics.py

index 65c98b4c0f0db8a89a4b18c8199e7496035405a7..aa95f58ac5f4d6c9f123b14c7a21d88e30bcd5b1 100644 (file)
@@ -78,7 +78,9 @@ public class AffinityLink implements Cloneable, Serializable {
     public void setWaypoint(String wpaddr) {
        this.affinityWaypoint = wpaddr;
     }
-
+    public String getWaypoint() {
+       return this.affinityWaypoint;
+    }
     public String getAttribute() {
        return this.affinityAttribute;
     }
index c0c9c6166b94b28afca4b74eb04019da21409ce0..9ae2718804828bd6952f10830431689fa13c1191 100644 (file)
@@ -54,5 +54,5 @@ public interface IAffinityManager {
     public List<Entry<AffinityIdentifier, AffinityIdentifier>> getAllFlowsByAffinityIdentifier(AffinityLink al);
 
     public Status addFlowRulesForRedirect(AffinityLink al) throws Exception;
-    public Status pushFlowRule(Flow flow);
+    public Status pushFlowRule(Flow flow, byte [] mac);
 }
index 16773a148d32910fa3b4acbdb1c57bcf0c4616c5..ce0325d0727114de303dacd3e5f610dce848d8b1 100644 (file)
@@ -298,9 +298,11 @@ public class AffinityManagerImpl implements IAffinityManager, IConfigurationCont
 
 
     /** 
-     * Fetch all node connectors. Each switch port will receive a flow rule. Do not stop on error.
+     * Fetch all node connectors. Each switch port will receive a flow
+     * rule. Do not stop on error. Pass in the waypointMAC address so
+     * that the correct output port can be determined.
      */
-    public Status pushFlowRule(Flow flow) {
+    public Status pushFlowRule(Flow flow, byte [] waypointMAC) {
         /* Get all node connectors. */
         Set<Node> nodes = switchManager.getNodes();
         Status success = new Status(StatusCode.SUCCESS);
@@ -311,10 +313,11 @@ public class AffinityManagerImpl implements IAffinityManager, IConfigurationCont
             return success;
         } 
         for (Node node: nodes) {
-            Set<NodeConnector> ncs = switchManager.getNodeConnectors(node);
-            if (ncs == null) {
-                continue;
-            }
+            /* Look up the output port leading to the waypoint. */
+            NodeConnector dst_connector = l2agent.lookup(node, waypointMAC);
+            Action action = new Output(dst_connector);
+            flow.addAction(action);
+
             Status status = fps.addFlow(node, flow);
             if (!status.isSuccess()) {
                 log.debug("Error during addFlow: {} on {}. The failure is: {}",
@@ -336,6 +339,7 @@ public class AffinityManagerImpl implements IAffinityManager, IConfigurationCont
         mask = InetAddress.getByName("255.255.255.255");
 
         Flow f = new Flow(match, actions);
+        String waypoint = al.getWaypoint();
 
         List<Entry<Host,Host>> hostPairList= getAllFlowsByHost(al);
         for (Entry<Host,Host> hostPair : hostPairList) {
@@ -349,19 +353,20 @@ public class AffinityManagerImpl implements IAffinityManager, IConfigurationCont
             match.setField(MatchType.NW_SRC, address1, mask);
             match.setField(MatchType.NW_DST, address2, mask);
             
-
-            /* For each end point, discover the mac address of the
-             * host. Then lookup the L2 table to find the port to send
-             * this flow along. Program the flow. */
-
-            byte [] mac = ((HostNodeConnector) host1).getDataLayerAddressBytes();
-            /* Tbd: use hosttracker for this. */
-            //            NodeConnector dst_connector = l2agent.lookupMacAddress(mac);
-            //            actions.add(new Output(dst_connector));
+            /* Send this flow rule to all nodes in the network. */
+            byte [] dstMAC = InetAddressToMAC(waypoint);
+            pushFlowRule(f, dstMAC);
         }
        return new Status(StatusCode.SUCCESS);
     }
 
+    public byte [] InetAddressToMAC(String ipaddress) {
+        InetAddress inetAddr = NetUtils.parseInetAddress(ipaddress);
+        HostNodeConnector host = hostTracker.hostFind(inetAddr);
+        byte [] dst_mac = host.getDataLayerAddressBytes();
+        return dst_mac;
+    }
+
     public Status removeAffinityLink(String name) {
        affinityLinkList.remove(name);
        return new Status(StatusCode.SUCCESS);
index 0146f5345609317e1a6271c3c2b8e61f1f05ead7..172b562a76475ff22c18dc619ee92bad1afdeda9 100644 (file)
@@ -241,4 +241,8 @@ public class L2Agent implements IListenDataPacket {
         }
         return PacketResult.IGNORED;
     }
+    
+    public NodeConnector lookup(Node node, byte [] dstMAC) {
+        return this.mac_to_ports.get(node).get(dstMAC);
+    }
 }
index 0492df7d2e1d60d78ff5a390cf0891054e63e016..7f31baecfe97555b938ab11e971613ed8a28b3e8 100644 (file)
@@ -3,6 +3,7 @@
 import httplib2
 import json
 import sys
+import time
 
 # 1. Start the controller
 # 2. On the local machine (e.g., your laptop), start this script.
@@ -15,44 +16,78 @@ import sys
 #   (There is a usage prompt that prints at the beginning of analytics.py)
 # 5. Type 'quit' to exit analytics.py
 
+
 '''
-Class to keep track of host statistics (byte count, bit rate)
+Class for keeping track of host stats or affinity link stats, depending.
 '''
-class HostStats:
+class Stats:
+
+    # TODO: Each stat should probably be a thread, and handle its
+    # own output and refreshing for the EWMA
+
+    def __init__(self, stat_type, **kwargs):
+        self.stat_type = stat_type
+        if stat_type == "host":
+            self.src = kwargs['src']
+            self.dst = kwargs['dst']
+            self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/hoststats/" 
+        elif stat_type == "affinityLink":
+            self.al = kwargs['al']
+            self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/affinitylinkstats/"
+        else:
+            print "incorrect stat type", stat_type
+
+        self.stats = {}
+        self.rate_ewma = None
 
-    def __init__(self, src, dst):
-        self.src = src
-        self.dst = dst
         self.http = httplib2.Http(".cache")
         self.http.add_credentials('admin', 'admin')
         self.refresh()
 
+    # Refresh statistics
     def refresh(self):
-        resp, content = self.http.request("http://localhost:8080/affinity/nb/v2/analytics/default/hoststats/" + self.src + "/" + self.dst, "GET")
-        if (resp.status == 404):
-            print "404 error"
-            return
-        if (resp.status == 503):
-            print "503 error"
-            return
-        self.host_stats = json.loads(content)
-
+        if (self.stat_type == "host"):
+            resp, content = self.http.request(self.url_prefix + self.src + "/" + self.dst, "GET")
+        elif (self.stat_type == "affinityLink"):
+            resp, content = self.http.request(self.url_prefix + self.al, "GET")
+        self.stats = json.loads(content)
+        self.handle_rate_ewma()
+
+    # EWMA calculation for bit rate
+    def handle_rate_ewma(self):
+        alpha = .25
+        anomaly_threshold = 2.0
+        new_bitrate = self.get_bit_rate()
+
+        if self.rate_ewma == None:
+            self.rate_ewma = new_bitrate
+        else:
+            new_rate_ewma = alpha * new_bitrate + (1 - alpha) * self.rate_ewma
+            if (self.rate_ewma > 0 and new_rate_ewma > anomaly_threshold * self.rate_ewma):
+                if (self.stat_type == "host"):
+                    print "Anomaly detected between %s and %s" % (self.src, self.dst)
+                elif (self.stat_type == "affinityLink"):
+                    print "Anomaly detected on AffinityLink %s" % (self.al)
+                print "Rate rose from %1.1f mbit/s to %1.1f mbit/s" % ((self.rate_ewma/10**6), (new_rate_ewma/10**6))
+            self.rate_ewma = new_rate_ewma
+
+    # Bytes
     def get_bytes(self):
         try:
-            bytes = long(self.host_stats["byteCount"])
+            bytes = long(self.stats["byteCount"])
         except Exception as e:
-            print "exception:", e
             bytes = 0
         return bytes
 
+    # Bit Rate
     def get_bit_rate(self):
-
         try:
-            bitrate = float(self.host_stats["bitRate"])
+            bitrate = float(self.stats["bitRate"])
         except Exception as e:
             bitrate = 0.0
         return bitrate
 
+
 '''
 Class for controlling subnets.  Right now, just adds subnets and
 checks whether they exist, because that's all we need.
@@ -93,26 +128,7 @@ class SubnetControl:
             print "subnet", subnet, "could not be added"
 
 
-def main():
-
-    # Default subnet is required for the host tracker to work.  Run
-    # this script once *before* you start mininet.
-    subnet_control = SubnetControl()
-    subnet_control.add_subnet("defaultSubnet", "10.0.0.254/8")
-
-    demo_mode = True
-
-    # Test mode
-    if (not demo_mode):
-
-        src = "10.0.0.1"
-        dst = "10.0.0.2"
-        host_stat = HostStats(src, dst)
-
-        # These counts should be nonzero
-        print("%d bytes between %s and %s" % (host_stat.get_bytes(), src, dst))
-        print("%f mbit/s between %s and %s" % (host_stat.get_bit_rate(), src, dst))
-        sys.exit()
+def run_interactive_mode():
 
     print "Usage: [host | link] [bytes | rate] [src dst | link-name]"
 
@@ -131,19 +147,20 @@ def main():
             if (request_type == "host"):
                 src, dst = request[2:4]
                 if (action == "bytes"):
-                    host_stat = HostStats(src, dst)
+                    host_stat = Stats("host", src=src, dst=dst)
                     print("%d bytes between %s and %s" % (host_stat.get_bytes(), src, dst))
                 elif (action == "rate"):
-                    host_stat = HostStats(src, dst)
+                    host_stat = Stats("host", src=src, dst=dst)
                     print("%f bit/s between %s and %s" % (host_stat.get_bit_rate(), src, dst))
                 else:
                     raise Exception
 
+            # TODO: Change this to use AffinityLinkStats
             elif (request_type == "link"):
                 link = request[2]
                 h = httplib2.Http(".cache")
                 h.add_credentials("admin", "admin")
-                resp, content = h.request("http://localhost:8080/controller/nb/v2/analytics/default/affinitylinkstats/" + link, "GET")
+                resp, content = h.request("http://localhost:8080/affinity/nb/v2/analytics/default/affinitylinkstats/" + link, "GET")
                 al_stats = json.loads(content)
 
                 if (action == "bytes"):
@@ -159,5 +176,51 @@ def main():
             print "Error"
 
 
+def get_all_hosts():
+
+    h = httplib2.Http(".cache")
+    h.add_credentials("admin", "admin")
+
+    resp, content = h.request("http://localhost:8080/controller/nb/v2/hosttracker/default/hosts/active", "GET")
+    host_content = json.loads(content)
+
+    # Even if there are no active hosts, host_content['hostConfig']
+    # still exists (and is empty)
+    active_hosts = []
+    for host_data in host_content['hostConfig']:
+        active_hosts.append(host_data['networkAddress'])
+    return active_hosts
+
+
+def run_passive_mode():
+
+    affinity_link_stats = {}
+    affinity_links = set(["testAL"]) # TODO: Get these automatically
+
+    while True:
+        # Go through all affinity link stats
+        for al in affinity_links:
+            if al not in affinity_link_stats:
+                affinity_link_stats[al] = Stats("affinityLink", al=al)
+            stat = affinity_link_stats[al]
+            stat.refresh()
+            print "%d bytes (%1.1f mbit/s) on %s" % (stat.get_bytes(), (stat.get_bit_rate() / (10**6)), al)
+
+        time.sleep(2)
+
+def main():
+
+    # Default subnet is required for the host tracker to work.  Run
+    # this script once *before* you start mininet.
+    subnet_control = SubnetControl()
+    subnet_control.add_subnet("defaultSubnet", "10.0.0.254/8")
+
+    interactive_mode = False
+
+    if interactive_mode:
+        run_interactive_mode()
+    else:
+        run_passive_mode()
+
 if __name__ == "__main__":
     main()