From 04ea3dcb6593603e19b2a1aee5bd1808eda469b8 Mon Sep 17 00:00:00 2001 From: Peter Gubka Date: Tue, 8 Mar 2016 15:44:01 +0100 Subject: [PATCH] adding onos benchmarking scripts Change-Id: I505ba2f1085392fa5031816c6fc1c7e2f50a0fd2 Signed-off-by: Peter Gubka --- .../clustering-performance-test/onos_stats.py | 300 ++++++++++++ .../onos_tester.py | 454 ++++++++++++++++++ 2 files changed, 754 insertions(+) create mode 100644 tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_stats.py create mode 100644 tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_tester.py diff --git a/tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_stats.py b/tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_stats.py new file mode 100644 index 0000000000..216610f4e2 --- /dev/null +++ b/tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_stats.py @@ -0,0 +1,300 @@ +import requests +import json +import argparse +import sys +import netaddr +import threading +import Queue +import random +import copy +import time + + +flow_template = { + "appId": 10, + "priority": 40000, + "timeout": 0, + "isPermanent": True, + "deviceId": "of:0000000000000001", + "treatment": { + "instructions": [ + { + "type": "NOACTION" + } + ], + "deferred": [] + }, + "selector": { + "criteria": [ + { + "type": "ETH_TYPE", + "ethType": 2048 + }, + { + "type": "IPV4_DST", + "ip": "10.0.0.0/32" + } + ] + } +} + + +class Timer(object): + def __init__(self, verbose=False): + self.verbose = verbose + + def __enter__(self): + self.start = time.time() + return self + + def __exit__(self, *args): + self.end = time.time() + self.secs = self.end - self.start + self.msecs = self.secs * 1000 # millisecs + if self.verbose: + print ("elapsed time: %f ms" % self.msecs) + + +class Counter(object): + def __init__(self, start=0): + self.lock = threading.Lock() + self.value = start + + def increment(self, value=1): + self.lock.acquire() + val = self.value + try: + self.value += value + finally: + self.lock.release() + return val + + +def _prepare_post(cntl, method, flows, template=None): + """Creates a POST http requests to configure a flow in configuration datastore. + + Args: + :param cntl: controller's ip address or hostname + + :param method: determines http request method + + :param flows: list of flow details + + :param template: flow template to be to be filled + + Returns: + :returns req: http request object + """ + fl1 = flows[0] + dev_id, ip = fl1 + url = 'http://' + cntl + ':' + '8181/onos/v1/flows/' + dev_id + flow = copy.deepcopy(template) + flow["deviceId"] = dev_id + flow["selector"]["criteria"][1]["ip"] = '%s/32' % str(netaddr.IPAddress(ip)) + req_data = json.dumps(flow) + req = requests.Request(method, url, headers={'Content-Type': 'application/json'}, + data=req_data, auth=('onos', 'rocks')) + return req + + +def _prepare_delete(cntl, method, flows, template=None): + """Creates a DELETE http requests to configure a flow in configuration datastore. + + Args: + :param cntl: controller's ip address or hostname + + :param method: determines http request method + + :param flows: list of flow details + + :param template: flow template to be to be filled + + Returns: + :returns req: http request object + """ + fl1 = flows[0] + dev_id, flow_id = fl1 + url = 'http://' + cntl + ':' + '8181/onos/v1/flows/' + dev_id + '/' + flow_id + req = requests.Request(method, url, auth=('onos', 'rocks')) + return req + + +def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='', + template=None, outqueue=None, method=None): + """The funcion sends http requests. + + Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests + to the controller + + Args: + :param thread_id: thread id + + :param preparefnc: function to preparesthe http request + + :param inqueue: input queue, flow details are comming from here + + :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue + + :param controllers: a list of controllers' ip addresses or hostnames + + :param restport: restconf port + + :param template: flow template used for creating flow content + + :param outqueue: queue where the results should be put + + :param method: method derermines the type of http request + + Returns: + nothing, results must be put into the output queue + """ + ses = requests.Session() + cntl = controllers[0] + counter = [0 for i in range(600)] + loop = True + + while loop: + try: + flowlist = inqueue.get(timeout=1) + except Queue.Empty: + if exitevent.is_set() and inqueue.empty(): + loop = False + continue + req = preparefnc(cntl, method, flowlist, template=template) + # prep = ses.prepare_request(req) + prep = req.prepare() + try: + rsp = ses.send(prep, timeout=5) + except requests.exceptions.Timeout: + counter[99] += 1 + continue + counter[rsp.status_code] += 1 + res = {} + for i, v in enumerate(counter): + if v > 0: + res[i] = v + outqueue.put(res) + + +def get_device_ids(controller='127.0.0.1', port=8181): + """Returns a list of switch ids""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/devices'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return [] + devices = json.loads(rsp.content)['devices'] + ids = [d['id'] for d in devices if 'of:' in d['id']] + return ids + + +def get_flow_ids(controller='127.0.0.1', port=8181): + """Returns a list of flow ids""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return [] + flows = json.loads(rsp.content)['flows'] + ids = [f['id'] for f in flows] + return ids + + +def get_flow_simple_stats(controller='127.0.0.1', port=8181): + """Returns a list of flow ids""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return [] + flows = json.loads(rsp.content)['flows'] + res = {} + for f in flows: + if f['state'] not in res: + res[f['state']] = 1 + else: + res[f['state']] += 1 + return res + + +def get_flow_device_pairs(controller='127.0.0.1', port=8181, flow_details=[]): + """Pairing flows from controller with deteils we used ofr creation""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return + flows = json.loads(rsp.content)['flows'] + # print "Flows", flows + # print "Details", flow_details + for dev_id, ip in flow_details: + # print "looking for details", dev_id, ip + for f in flows: + # lets identify if it is our flow + if f["treatment"]["instructions"][0]["type"] != "DROP": + # print "NOT DROP" + continue + if f["deviceId"] == dev_id: + if "ip" in f["selector"]["criteria"][0]: + item_idx = 0 + elif "ip" in f["selector"]["criteria"][1]: + item_idx = 1 + else: + continue + # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip)) + if f["selector"]["criteria"][item_idx]["ip"] == '%s/32' % str(netaddr.IPAddress(ip)): + # print dev_id, ip, f + yield dev_id, f["id"] + break + + +def get_flow_to_remove(controller='127.0.0.1', port=8181): + """Pairing flows from controller with deteils we used ofr creation""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return + flows = json.loads(rsp.content)['flows'] + # print "Flows", flows + # print "Details", flow_details + + for f in flows: + # lets identify if it is our flow + if f["treatment"]["instructions"][0]["type"] != "NOACTION": + # print "NOT DROP" + continue + if "ip" in f["selector"]["criteria"][0]: + item_idx = 0 + elif "ip" in f["selector"]["criteria"][1]: + item_idx = 1 + else: + continue + # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip)) + ipstr = f["selector"]["criteria"][item_idx]["ip"] + if '10.' in ipstr and '/32' in ipstr: + # print dev_id, ip, f + yield (f["deviceId"], f["id"]) + + +def main(*argv): + + parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows ' + 'into the config tree, as specified by optional parameters.') + + parser.add_argument('--host', default='127.0.0.1', + help='Host where onos controller is running (default is 127.0.0.1)') + parser.add_argument('--port', default='8181', + help='Port on which onos\'s RESTCONF is listening (default is 8181)') + + in_args = parser.parse_args(*argv) + print in_args + + # get device ids + base_dev_ids = get_device_ids(controller=in_args.host) + base_flow_ids = get_flow_ids(controller=in_args.host) + # ip + ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1'))) + # prepare func + preparefnc = _prepare_post + + print "BASELINE:" + print " devices:", len(base_dev_ids) + print " flows :", len(base_flow_ids) + + # lets print some stats + print "\n\nSome stats monitoring ...." + print get_flow_simple_stats(controller=in_args.host) + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_tester.py b/tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_tester.py new file mode 100644 index 0000000000..ee81c0380f --- /dev/null +++ b/tools/odl-mdsal-clustering-tests/clustering-performance-test/onos_tester.py @@ -0,0 +1,454 @@ +import requests +import json +import argparse +import sys +import netaddr +import threading +import Queue +import random +import copy +import time + + +flow_template = { + "appId": 10, + "priority": 40000, + "timeout": 0, + "isPermanent": True, + "deviceId": "of:0000000000000001", + "treatment": { + "instructions": [ + { + "type": "NOACTION" + } + ], + "deferred": [] + }, + "selector": { + "criteria": [ + { + "type": "ETH_TYPE", + "ethType": 2048 + }, + { + "type": "IPV4_DST", + "ip": "10.0.0.0/32" + } + ] + } +} + + +class Timer(object): + def __init__(self, verbose=False): + self.verbose = verbose + + def __enter__(self): + self.start = time.time() + return self + + def __exit__(self, *args): + self.end = time.time() + self.secs = self.end - self.start + self.msecs = self.secs * 1000 # millisecs + if self.verbose: + print ("elapsed time: %f ms" % self.msecs) + + +class Counter(object): + def __init__(self, start=0): + self.lock = threading.Lock() + self.value = start + + def increment(self, value=1): + self.lock.acquire() + val = self.value + try: + self.value += value + finally: + self.lock.release() + return val + + +def _prepare_post(cntl, method, flows, template=None): + """Creates a POST http requests to configure a flow in configuration datastore. + + Args: + :param cntl: controller's ip address or hostname + + :param method: determines http request method + + :param flows: list of flow details + + :param template: flow template to be to be filled + + Returns: + :returns req: http request object + """ + fl1 = flows[0] + dev_id, ip = fl1 + url = 'http://' + cntl + ':' + '8181/onos/v1/flows/' + dev_id + flow = copy.deepcopy(template) + flow["deviceId"] = dev_id + flow["selector"]["criteria"][1]["ip"] = '%s/32' % str(netaddr.IPAddress(ip)) + req_data = json.dumps(flow) + req = requests.Request(method, url, headers={'Content-Type': 'application/json'}, + data=req_data, auth=('onos', 'rocks')) + return req + + +def _prepare_delete(cntl, method, flows, template=None): + """Creates a DELETE http requests to configure a flow in configuration datastore. + + Args: + :param cntl: controller's ip address or hostname + + :param method: determines http request method + + :param flows: list of flow details + + :param template: flow template to be to be filled + + Returns: + :returns req: http request object + """ + fl1 = flows[0] + dev_id, flow_id = fl1 + url = 'http://' + cntl + ':' + '8181/onos/v1/flows/' + dev_id + '/' + flow_id + req = requests.Request(method, url, auth=('onos', 'rocks')) + return req + + +def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='', + template=None, outqueue=None, method=None): + """The funcion sends http requests. + + Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests + to the controller + + Args: + :param thread_id: thread id + + :param preparefnc: function to preparesthe http request + + :param inqueue: input queue, flow details are comming from here + + :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue + + :param controllers: a list of controllers' ip addresses or hostnames + + :param restport: restconf port + + :param template: flow template used for creating flow content + + :param outqueue: queue where the results should be put + + :param method: method derermines the type of http request + + Returns: + nothing, results must be put into the output queue + """ + ses = requests.Session() + cntl = controllers[0] + counter = [0 for i in range(600)] + loop = True + + while loop: + try: + flowlist = inqueue.get(timeout=1) + except Queue.Empty: + if exitevent.is_set() and inqueue.empty(): + loop = False + continue + req = preparefnc(cntl, method, flowlist, template=template) + # prep = ses.prepare_request(req) + prep = req.prepare() + try: + rsp = ses.send(prep, timeout=5) + except requests.exceptions.Timeout: + counter[99] += 1 + continue + counter[rsp.status_code] += 1 + res = {} + for i, v in enumerate(counter): + if v > 0: + res[i] = v + outqueue.put(res) + + +def get_device_ids(controller='127.0.0.1', port=8181): + """Returns a list of switch ids""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/devices'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return [] + devices = json.loads(rsp.content)['devices'] + ids = [d['id'] for d in devices if 'of:' in d['id']] + return ids + + +def get_flow_ids(controller='127.0.0.1', port=8181): + """Returns a list of flow ids""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return [] + flows = json.loads(rsp.content)['flows'] + ids = [f['id'] for f in flows] + return ids + + +def get_flow_simple_stats(controller='127.0.0.1', port=8181): + """Returns a list of flow ids""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return [] + flows = json.loads(rsp.content)['flows'] + res = {} + for f in flows: + if f['state'] not in res: + res[f['state']] = 1 + else: + res[f['state']] += 1 + return res + + +def get_flow_device_pairs(controller='127.0.0.1', port=8181, flow_details=[]): + """Pairing flows from controller with deteils we used ofr creation""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return + flows = json.loads(rsp.content)['flows'] + # print "Flows", flows + # print "Details", flow_details + for dev_id, ip in flow_details: + # print "looking for details", dev_id, ip + for f in flows: + # lets identify if it is our flow + if f["treatment"]["instructions"][0]["type"] != "DROP": + # print "NOT DROP" + continue + if f["deviceId"] == dev_id: + if "ip" in f["selector"]["criteria"][0]: + item_idx = 0 + elif "ip" in f["selector"]["criteria"][1]: + item_idx = 1 + else: + continue + # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip)) + if f["selector"]["criteria"][item_idx]["ip"] == '%s/32' % str(netaddr.IPAddress(ip)): + # print dev_id, ip, f + yield dev_id, f["id"] + break + + +def get_flow_to_remove(controller='127.0.0.1', port=8181): + """Pairing flows from controller with deteils we used ofr creation""" + rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks')) + if rsp.status_code != 200: + return + flows = json.loads(rsp.content)['flows'] + # print "Flows", flows + # print "Details", flow_details + + for f in flows: + # lets identify if it is our flow + if f["treatment"]["instructions"][0]["type"] != "NOACTION": + # print "NOT DROP" + continue + if "ip" in f["selector"]["criteria"][0]: + item_idx = 0 + elif "ip" in f["selector"]["criteria"][1]: + item_idx = 1 + else: + continue + # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip)) + ipstr = f["selector"]["criteria"][item_idx]["ip"] + if '10.' in ipstr and '/32' in ipstr: + # print dev_id, ip, f + yield (f["deviceId"], f["id"]) + + +def main(*argv): + + parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows ' + 'into the config tree, as specified by optional parameters.') + + parser.add_argument('--host', default='127.0.0.1', + help='Host where onos controller is running (default is 127.0.0.1)') + parser.add_argument('--port', default='8181', + help='Port on which onos\'s RESTCONF is listening (default is 8181)') + parser.add_argument('--threads', type=int, default=1, + help='Number of request worker threads to start in each cycle; default=1. ' + 'Each thread will add/delete flows.') + parser.add_argument('--flows', type=int, default=10, + help='Number of flows that will be added/deleted in total, default 10') + parser.add_argument('--timeout', type=int, default=100, + help='The maximum time (seconds) to wait between the add and delete cycles; default=100') + parser.add_argument('--no-delete', dest='no_delete', action='store_true', default=False, + help='Delete all added flows one by one, benchmark delete ' + 'performance.') + parser.add_argument('--bulk-delete', dest='bulk_delete', action='store_true', default=False, + help='Delete all flows in bulk; default=False') + parser.add_argument('--outfile', default='', help='Stores add and delete flow rest api rate; default=""') + + in_args = parser.parse_args(*argv) + print in_args + + # get device ids + base_dev_ids = get_device_ids(controller=in_args.host) + base_flow_ids = get_flow_ids(controller=in_args.host) + # ip + ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1'))) + # prepare func + preparefnc = _prepare_post + + base_num_flows = len(base_flow_ids) + + print "BASELINE:" + print " devices:", len(base_dev_ids) + print " flows :", base_num_flows + + # lets fill the queue for workers + flow_details = [] + sendqueue = Queue.Queue() + for i in range(in_args.flows): + dev_id = random.choice(base_dev_ids) + dst_ip = ip_addr.increment() + sendqueue.put([(dev_id, dst_ip)]) + flow_details.append((dev_id, dst_ip)) + + # result_gueue + resultqueue = Queue.Queue() + # creaet exit event + exitevent = threading.Event() + + # run workers + with Timer() as tmr: + threads = [] + for i in range(int(in_args.threads)): + thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc), + kwargs={"inqueue": sendqueue, "exitevent": exitevent, + "controllers": [in_args.host], "restport": in_args.port, + "template": flow_template, "outqueue": resultqueue, "method": "POST"}) + threads.append(thr) + thr.start() + + exitevent.set() + + result = {} + # waitng for reqults and sum them up + for t in threads: + t.join() + # reading partial resutls from sender thread + part_result = resultqueue.get() + for k, v in part_result.iteritems(): + if k not in result: + result[k] = v + else: + result[k] += v + + print "Added", in_args.flows, "flows in", tmr.secs, "seconds", result + add_details = {"duration": tmr.secs, "flows": len(flow_details)} + + # lets print some stats + print "\n\nStats monitoring ..." + rounds = 200 + with Timer() as t: + for i in range(rounds): + flow_stats = get_flow_simple_stats(controller=in_args.host) + print flow_stats + try: + pending_adds = int(flow_stats[u'PENDING_ADD']) + except KeyError: + break + time.sleep(1) + + if i < rounds: + print "... monitoring finished in +%d seconds\n\n" % t.secs + else: + print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs) + + if in_args.no_delete: + return + # lets delete flows, need to get ids to be deleted, becasue we dont have flow_id on config time + # we have to pair flows on out own + flows_remove_details = [] + # for a in get_flow_device_pairs(controller=in_args.host, flow_details=flow_details): + for a in get_flow_to_remove(controller=in_args.host): + flows_remove_details.append(a) + print "Flows to be removed: ", len(flows_remove_details) + + # lets fill the queue for workers + sendqueue = Queue.Queue() + for fld in flows_remove_details: + sendqueue.put([fld]) + + # result_gueue + resultqueue = Queue.Queue() + # creaet exit event + exitevent = threading.Event() + + # run workers + preparefnc = _prepare_delete + with Timer() as tmr: + threads = [] + for i in range(int(in_args.threads)): + thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc), + kwargs={"inqueue": sendqueue, "exitevent": exitevent, + "controllers": [in_args.host], "restport": in_args.port, + "template": flow_template, "outqueue": resultqueue, "method": "DELETE"}) + threads.append(thr) + thr.start() + + exitevent.set() + + result = {} + # waitng for reqults and sum them up + for t in threads: + t.join() + # reading partial resutls from sender thread + part_result = resultqueue.get() + for k, v in part_result.iteritems(): + if k not in result: + result[k] = v + else: + result[k] += v + + print "Removed", len(flows_remove_details), "flows in", tmr.secs, "seconds", result + del_details = {"duration": tmr.secs, "flows": len(flows_remove_details)} + +# # lets print some stats +# print "\n\nSome stats monitoring ...." +# for i in range(100): +# print get_flow_simple_stats(controller=in_args.host) +# time.sleep(5) +# print "... monitoring finished\n\n" + # lets print some stats + print "\n\nStats monitoring ..." + rounds = 200 + with Timer() as t: + for i in range(rounds): + flow_stats = get_flow_simple_stats(controller=in_args.host) + print flow_stats + try: + pending_rems = int(flow_stats[u'PENDING_REMOVE']) + except KeyError: + break + time.sleep(1) + + if i < rounds: + print "... monitoring finished in +%d seconds\n\n" % t.secs + else: + print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs) + + if in_args.outfile != "": + addrate = add_details['flows'] / add_details['duration'] + delrate = del_details['flows'] / del_details['duration'] + print "addrate", addrate + print "delrate", delrate + + with open(in_args.outfile, "wt") as fd: + fd.write("AddRate,DeleteRate\n") + fd.write("{0},{1}\n".format(addrate, delrate)) + +if __name__ == "__main__": + main(sys.argv[1:]) -- 2.36.6