#!/usr/local/bin/python import httplib2 import json import signal import sys import time from threading import Thread from affinity_control import AffinityControl from subnet import SubnetControl from stats import Stats # If True, SIG_INT has been captured global sigint sigint = False # Handle SIG_INT def signal_handler(signal, frame): global sigint sigint = True # Monitors statistics class WaypointMonitor(Thread): def __init__(self, monitor_type, **kwargs): Thread.__init__(self) self.stat = Stats(monitor_type, **kwargs) self.stat_type = monitor_type self.waypoint_address = None def set_waypoint(self, waypoint_ip): self.waypoint_address = waypoint_ip print "Registered waypoint for %s. Any large flows will be redirected to %s." % (self.stat, waypoint_ip) def run(self): global sigint did_waypoint = False while not sigint: _, is_big = self.stat.refresh() if is_big and not did_waypoint: print "Large flow detected" ac = AffinityControl() # First AG: Sources sending data into this subnet src_ag_name = "sources" src_ips = self.stat.get_incoming_hosts() ac.add_affinity_group(src_ag_name, ips=src_ips) # Second AG: This entity dst_ag_name = "client" if (self.stat_type == Stats.TYPE_PREFIX): ac.add_affinity_group(dst_ag_name, subnet=self.stat.subnet) elif (self.stat_type == Stats.TYPE_HOST): pass else: print "type", self.stat_type, "not supported for redirection" # AL: Between them link_name = "inflows" ac.add_affinity_link(link_name, src_ag_name, dst_ag_name) ac.add_waypoint(link_name, self.waypoint_address) ac.enable_waypoint(link_name) did_waypoint = True time.sleep(1) def main(): # Default subnet is required for the host tracker to work. subnet_control = SubnetControl() subnet_control.add_subnet("defaultSubnet", "") m = WaypointMonitor(Stats.TYPE_PREFIX, subnet="") m.set_waypoint("") m.start() # Register signal-handler to catch SIG_INT signal.signal(signal.SIGINT, signal_handler) signal.pause() # join() won't return until SIG_INT has been captured m.join() if __name__ == "__main__": main()