1 #!/usr/local/bin/python
4 Copyright (c) 2013 Plexxi, Inc. All rights reserved.
6 This program and the accompanying materials are made available under the
7 terms of the Eclipse Public License v1.0 which accompanies this distribution,
8 and is available at http://www.eclipse.org/legal/epl-v10.html
16 from threading import Thread
18 from affinity_control import AffinityControl
19 from subnet import SubnetControl
20 from stats import Stats
25 The instructions for running this demo are located at:
26 https://wiki.opendaylight.org/view/Project_Proposals:Affinity_Metadata_Service#Current_Status
29 1. In config.ini, make sure affinity jars are loaded and that of.flowStatsPollInterval = 1
30 2. Start the controller
31 3. Start mininet with a 2-level tree topology
35 You will see an anomaly detected in demo.py's output, and the pings between h3 and h1 will halt.
38 # If True, SIG_INT has been captured
43 def signal_handler(signal, frame):
48 class WaypointMonitor(Thread):
50 def __init__(self, monitor_type, **kwargs):
52 self.stat = Stats(monitor_type, **kwargs)
53 self.stat_type = monitor_type
54 self.waypoint_address = None
55 print "Created waypoint monitor for %s" % self.stat
57 def set_waypoint(self, waypoint_ip):
58 self.waypoint_address = waypoint_ip
59 print "Registered waypoint for %s. Any large flows will be redirected to %s." % (self.stat, waypoint_ip)
61 def set_large_flow_threshold(self, s):
62 self.stat.set_large_flow_threshold(s)
63 print "Set threshold for large flows to %d bytes" % s
64 print("-------------------------")
70 _, is_big = self.stat.refresh()
71 if is_big and not did_waypoint:
72 print "Large flow detected (%d bytes, %d packets, %3.3f bit/s)" % (self.stat.get_bytes(), self.stat.get_packets(), self.stat.get_bit_rate())
73 print " ICMP: %d bytes, %d packets" % (self.stat.get_bytes(1), self.stat.get_packets(1))
74 print " UDP: %d bytes, %d packets" % (self.stat.get_bytes(17), self.stat.get_packets(17))
75 print " TCP: %d bytes, %d packets" % (self.stat.get_bytes(6), self.stat.get_packets(6))
76 print " other: %d bytes, %d packets" % (self.stat.get_bytes(-1), self.stat.get_packets(-1))
77 print("-------------------------")
78 ac = AffinityControl()
79 # First AG: Sources sending data into this subnet
80 src_ag_name = "sources"
81 src_ips = self.stat.get_large_incoming_hosts()
82 if (self.waypoint_address in src_ips):
83 src_ips.remove(self.waypoint_address)
84 ac.add_affinity_group(src_ag_name, ips=src_ips)
85 # Second AG: This entity
86 dst_ag_name = "client"
87 if (self.stat_type == Stats.TYPE_SUBNET):
88 ac.add_affinity_group(dst_ag_name, subnet=self.stat.subnet)
89 elif (self.stat_type == Stats.TYPE_HOST):
92 print "type", self.stat_type, "not supported for redirection"
95 ac.add_affinity_link(link_name, src_ag_name, dst_ag_name)
96 ac.add_waypoint(link_name, self.waypoint_address)
97 # ac.enable_waypoint(link_name)
100 raw_input("[Press Enter to disable affinity rules] ")
101 ac.disable_affinity()
102 # ac.disable_waypoint(link_name)
107 # Default subnet is required for the host tracker to work.
108 subnet_control = SubnetControl()
109 subnet_control.add_subnet("defaultSubnet", "10.0.0.254/8")
111 raw_input("[Press enter when mininet is ready] ")
112 print("-------------------------")
114 # Add per-protocol flows so we can monitor stats that way
115 x = analytics.add_protocol_flows()
117 print "Unable to add per-protocol flows"
119 m = WaypointMonitor(Stats.TYPE_SUBNET, subnet="10.0.0.0/31")
120 m.set_waypoint("10.0.0.2")
121 m.set_large_flow_threshold(2000) # 2000 bytes
124 # Register signal-handler to catch SIG_INT
125 signal.signal(signal.SIGINT, signal_handler)
128 # join() won't return until SIG_INT has been captured
131 if __name__ == "__main__":