print "Waypoint %s could not be set for link %s" % (ip, link_name)
return
print "Waypoint %s successfully set for link %s" % (ip, link_name)
+
+ # Enable waypoint
+ def enable_waypoint(self, link_name):
+ resp, content = self.http.request(self.url_prefix + "link/%s/enable" % link_name, "PUT")
+ if (resp.status != 201):
+ print "Waypoint could not be enabled for link %s" % link_name
+ print resp.status
+ return
+ print "Waypoint enabled for link %s" % link_name
if (request_type == "host"):
action = request[1]
src, dst = request[2:4]
- host_stat = Stats("host", src=src, dst=dst)
+ host_stat = Stats(Stats.TYPE_HOST, src=src, dst=dst)
if (action == "bytes"):
print("%d bytes between %s and %s" % (host_stat.get_bytes(), src, dst))
elif (action == "rate"):
print("%f bit/s between %s and %s" % (host_stat.get_bit_rate(), src, dst))
else:
- print "wrong action"
raise Exception
elif (request_type == "link"):
action = request[1]
link = request[2]
- link_stat = Stats("affinityLink", al=link)
+ link_stat = Stats(Stats.TYPE_AL, al=link)
if (action == "bytes"):
print("%d bytes on %s" % (link_stat.get_bytes(), link))
elif (action == "rate"):
print("%f bit/s on %s" % (link_stat.get_bit_rate(), link))
else:
- print "wrong action 2"
raise Exception
elif (request_type == "prefix"):
if (resp.status == 200):
data = json.loads(content)
print data['byteCount'], "bytes"
-
else:
- print "something else"
raise Exception
except Exception as e:
print "Error"
print e
-
-def get_all_hosts():
-
- h = httplib2.Http(".cache")
- h.add_credentials("admin", "admin")
-
- resp, content = h.request("http://localhost:8080/controller/nb/v2/hosttracker/default/hosts/active", "GET")
- host_content = json.loads(content)
-
- # Even if there are no active hosts, host_content['hostConfig']
- # still exists (and is empty)
- active_hosts = []
- for host_data in host_content['hostConfig']:
- active_hosts.append(host_data['networkAddress'])
- return active_hosts
-
-
-def run_passive_mode(affinity_links):
- # TODO: Get affinity_links automatically
- affinity_link_stats = {}
-
- # Go through all affinity link stats
- while True:
- for al in affinity_links:
- if al not in affinity_link_stats:
- affinity_link_stats[al] = Stats("affinityLink", al=al)
- stat = affinity_link_stats[al]
- stat.refresh()
- print "%d bytes (%1.1f Mbit/s) on %s" % (stat.get_bytes(), (stat.get_bit_rate() / (10**6)), al)
- time.sleep(2)
-
def main():
# Default subnet is required for the host tracker to work.
# Set up an affinity link
affinity_control = AffinityControl()
- affinity_control.add_affinity_group("testAG1", ["10.0.0.1", "10.0.0.2"])
- affinity_control.add_affinity_group("testAG2", ["10.0.0.3", "10.0.0.4"])
+ affinity_control.add_affinity_group("testAG1", ips=["10.0.0.1", "10.0.0.2"])
+ affinity_control.add_affinity_group("testAG2", ips=["10.0.0.3", "10.0.0.4"])
affinity_control.add_affinity_link("testAL", "testAG1", "testAG2")
raw_input("[Press enter to continue]" )
- interactive_mode = True
-
- if interactive_mode:
- run_interactive_mode()
- else:
- run_passive_mode(["testAL"])
+ run_interactive_mode()
if __name__ == "__main__":
main()
global sigint
sigint = False
+# Handle SIG_INT
+def signal_handler(signal, frame):
+ global sigint
+ sigint = True
+
# Monitors statistics
-class Monitor(Thread):
+class WaypointMonitor(Thread):
def __init__(self, monitor_type, **kwargs):
Thread.__init__(self)
self.stat = Stats(monitor_type, **kwargs)
self.stat_type = monitor_type
- self.did_waypoint = False
+ self.waypoint_address = None
+
+ def set_waypoint(self, waypoint_ip):
+ self.waypoint_address = waypoint_ip
+ print "Registered waypoint for %s. Any large flows will be redirected to %s." % (self.stat, waypoint_ip)
def run(self):
global sigint
+ did_waypoint = False
while not sigint:
- is_fast, is_big = self.stat.refresh()
- self.stat.get_incoming_hosts()
- if is_big and not self.did_waypoint:
- print "Large flow; redirect here"
+ _, is_big = self.stat.refresh()
+ if is_big and not did_waypoint:
+ print "Large flow detected"
ac = AffinityControl()
# First AG: Sources sending data into this subnet
+ src_ag_name = "sources"
src_ips = self.stat.get_incoming_hosts()
- ac.add_affinity_group("webservers", ips=src_ips)
+ ac.add_affinity_group(src_ag_name, ips=src_ips)
# Second AG: This entity
- if (self.stat_type == "prefix"):
- ac.add_affinity_group("clients", subnet=self.stat.subnet)
+ dst_ag_name = "client"
+ if (self.stat_type == Stats.TYPE_PREFIX):
+ ac.add_affinity_group(dst_ag_name, subnet=self.stat.subnet)
+ elif (self.stat_type == Stats.TYPE_HOST):
+ pass
else:
print "type", self.stat_type, "not supported for redirection"
# AL: Between them
- ac.add_affinity_link("inflows", "webservers", "client")
- # TODO: This IP should be an option
- ac.add_waypoint("inflows", "10.0.0.2")
- self.did_waypoint = True
+ link_name = "inflows"
+ ac.add_affinity_link(link_name, src_ag_name, dst_ag_name)
+ ac.add_waypoint(link_name, self.waypoint_address)
+ ac.enable_waypoint(link_name)
+ did_waypoint = True
time.sleep(1)
-# Handle SIG_INT
-def signal_handler(signal, frame):
- global sigint
- sigint = True
-
def main():
# Default subnet is required for the host tracker to work.
subnet_control = SubnetControl()
subnet_control.add_subnet("defaultSubnet", "10.0.0.254/8")
- m = Monitor("prefix", subnet="10.0.0.0/31")
+ m = WaypointMonitor(Stats.TYPE_PREFIX, subnet="10.0.0.0/31")
+ m.set_waypoint("10.0.0.2")
m.start()
# Register signal-handler to catch SIG_INT
import json
'''
-Class for keeping track of host stats or affinity link stats, depending.
+Class for keeping track of host statistics
'''
class Stats:
+ TYPE_HOST = 0
+ TYPE_AL = 1 # AffinityLink
+ TYPE_PREFIX = 2
+
def __init__(self, stat_type, **kwargs):
self.stat_type = stat_type
- if stat_type == "host":
+ if stat_type == Stats.TYPE_HOST:
self.src = kwargs['src']
self.dst = kwargs['dst']
self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/hoststats/"
- elif stat_type == "affinityLink":
+ elif stat_type == Stats.TYPE_AL:
self.al = kwargs['al']
self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/affinitylinkstats/"
- elif stat_type == "prefix":
+ elif stat_type == Stats.TYPE_PREFIX:
self.subnet = kwargs['subnet']
self.url_prefix = "http://localhost:8080/affinity/nb/v2/analytics/default/prefixstats/"
else:
self.http.add_credentials('admin', 'admin')
def __str__(self):
- if (self.stat_type == "host"):
+ if (self.stat_type == Stats.TYPE_HOST):
return "host pair %s -> %s" % (self.src, self.dst)
- elif (self.stat_type == "affinityLink"):
- return "AffinityLink %s" % self.al
- elif (self.stat_type == "prefix"):
- return "Prefix %s" % self.subnet
+ elif (self.stat_type == Stats.TYPE_AL):
+ return "affinity link %s" % self.al
+ elif (self.stat_type == Stats.TYPE_PREFIX):
+ return "prefix %s" % self.subnet
else:
- return "Unknown Stats type"
+ return "unknown Stats type"
# Refresh statistics
def refresh(self):
- if (self.stat_type == "host"):
+ if (self.stat_type == Stats.TYPE_HOST):
resp, content = self.http.request(self.url_prefix + self.src + "/" + self.dst, "GET")
- elif (self.stat_type == "affinityLink"):
+ elif (self.stat_type == Stats.TYPE_AL):
resp, content = self.http.request(self.url_prefix + self.al, "GET")
- elif (self.stat_type == "prefix"):
+ elif (self.stat_type == Stats.TYPE_PREFIX):
resp, content = self.http.request(self.url_prefix + self.subnet, "GET")
try:
self.stats = json.loads(content)
# Return hosts that transferred data into this entity. Right now only supports prefixes.
def get_incoming_hosts(self):
- if (self.stat_type == "prefix"):
+ if (self.stat_type == Stats.TYPE_PREFIX):
resp, content = self.http.request(self.url_prefix + "incoming/" + self.subnet, "GET")
data = json.loads(content)
if (data != {}):
# IPs sometimes (always?) get returned as strings like /1.2.3.4; strip off the leading /
ips = [h.replace("/", "") for h in data['hosts']]
return ips
+ else:
+ print "Stat type not supported for incoming hosts"
return []
# EWMA calculation for bit rate. Returns true if there is an anomaly.