Fixed bugs while getting demo.py to work.
[affinity.git] / scripts / demo.py
1 #!/usr/local/bin/python
2
3 '''
4 Copyright (c) 2013 Plexxi, Inc.  All rights reserved.
5
6 This program and the accompanying materials are made available under the
7 terms of the Eclipse Public License v1.0 which accompanies this distribution,
8 and is available at http://www.eclipse.org/legal/epl-v10.html
9 '''
10
11 import httplib2
12 import json
13 import signal
14 import time
15
16 from threading import Thread
17
18 from affinity_control import AffinityControl
19 from subnet import SubnetControl
20 from stats import Stats
21
22 import analytics
23
24 '''
25 The instructions for running this demo are located at:
26 https://wiki.opendaylight.org/view/Project_Proposals:Affinity_Metadata_Service#Current_Status
27
28 Briefly:
29 1.  In config.ini, make sure affinity jars are loaded and that of.flowStatsPollInterval = 1
30 2.  Start the controller
31 3.  Start mininet with a 2-level tree topology
32 4.  In mininet:
33      > pingall
34      > h3 ping h1
35 You will see an anomaly detected in demo.py's output, and the pings between h3 and h1 will halt.
36 '''
37
38 # If True, SIG_INT has been captured
39 global sigint
40 sigint = False
41
42 # Handle SIG_INT
43 def signal_handler(signal, frame):
44     global sigint
45     sigint = True
46
47 # Monitors statistics
48 class WaypointMonitor(Thread):
49
50     def __init__(self, monitor_type, **kwargs):
51         Thread.__init__(self)
52         self.stat = Stats(monitor_type, **kwargs)
53         self.stat_type = monitor_type
54         self.waypoint_address = None
55         print "Created waypoint monitor for %s" % self.stat
56
57     def set_waypoint(self, waypoint_ip):
58         self.waypoint_address = waypoint_ip
59         print "Registered waypoint for %s.  Any large flows will be redirected to %s." % (self.stat, waypoint_ip)
60
61     def set_large_flow_threshold(self, s):
62         self.stat.set_large_flow_threshold(s)
63         print "Set threshold for large flows to %d bytes" % s
64         print("-------------------------")
65
66     def run(self):
67         global sigint
68         did_waypoint = False
69         while not sigint:
70             _, is_big = self.stat.refresh()
71             if is_big and not did_waypoint:
72                 print "Large flow detected (%d bytes, %d packets, %3.3f bit/s)" % (self.stat.get_bytes(), self.stat.get_packets(), self.stat.get_bit_rate())
73                 print "   ICMP: %d bytes, %d packets" % (self.stat.get_bytes(1), self.stat.get_packets(1))
74                 print "   UDP: %d bytes, %d packets" % (self.stat.get_bytes(17), self.stat.get_packets(17))
75                 print "   TCP: %d bytes, %d packets" % (self.stat.get_bytes(6), self.stat.get_packets(6))
76                 print "   other: %d bytes, %d packets" % (self.stat.get_bytes(-1), self.stat.get_packets(-1))
77                 print("-------------------------")
78                 ac = AffinityControl()
79                 # First AG: Sources sending data into this subnet
80                 src_ag_name = "sources"
81                 src_ips = self.stat.get_large_incoming_hosts()
82                 if (self.waypoint_address in src_ips):
83                     src_ips.remove(self.waypoint_address)
84                 ac.add_affinity_group(src_ag_name, ips=src_ips)
85                 # Second AG: This entity
86                 dst_ag_name = "client"
87                 if (self.stat_type == Stats.TYPE_SUBNET):
88                     ac.add_affinity_group(dst_ag_name, subnet=self.stat.subnet)
89                 elif (self.stat_type == Stats.TYPE_HOST):
90                     pass
91                 else:
92                     print "type", self.stat_type, "not supported for redirection"
93                 # AL: Between them
94                 link_name = "inflows"
95                 ac.add_affinity_link(link_name, src_ag_name, dst_ag_name)
96                 ac.add_waypoint(link_name, self.waypoint_address)
97 #                ac.enable_waypoint(link_name)
98                 ac.enable_affinity()
99                 did_waypoint = True
100                 raw_input("[Press Enter to disable affinity rules] ")
101                 ac.disable_affinity()
102 #                ac.disable_waypoint(link_name)
103             time.sleep(1)
104
105 def main():
106
107     # Default subnet is required for the host tracker to work.
108     subnet_control = SubnetControl()
109     subnet_control.add_subnet("defaultSubnet", "10.0.0.254/8")
110
111     raw_input("[Press enter when mininet is ready] ")
112     print("-------------------------")
113
114     # Add per-protocol flows so we can monitor stats that way
115     x = analytics.add_protocol_flows()
116     if (not x):
117         print "Unable to add per-protocol flows"
118
119     m = WaypointMonitor(Stats.TYPE_SUBNET, subnet="10.0.0.0/31")
120     m.set_waypoint("10.0.0.2")
121     m.set_large_flow_threshold(2000) # 2000 bytes
122     m.start()
123
124     # Register signal-handler to catch SIG_INT
125     signal.signal(signal.SIGINT, signal_handler)
126     signal.pause()
127
128     # join() won't return until SIG_INT has been captured
129     m.join()
130
131 if __name__ == "__main__":
132     main()