18 "deviceId": "of:0000000000000001",
41 flow_delete_template = {
42 "deviceId": "of:0000000000000001",
43 "flowId": 21392098393151996
48 def __init__(self, verbose=False):
49 self.verbose = verbose
52 self.start = time.time()
55 def __exit__(self, *args):
56 self.end = time.time()
57 self.secs = self.end - self.start
58 self.msecs = self.secs * 1000 # millisecs
60 print("elapsed time: %f ms" % self.msecs)
63 class Counter(object):
64 def __init__(self, start=0):
65 self.lock = threading.Lock()
68 def increment(self, value=1):
78 def _prepare_post(cntl, method, flows, template=None):
79 """Creates a POST http requests to configure a flow in configuration datastore.
82 :param cntl: controller's ip address or hostname
84 :param method: determines http request method
86 :param flows: list of flow details
88 :param template: flow template to be to be filled
91 :returns req: http request object
94 for dev_id, ip in (flows):
95 flow = copy.deepcopy(template)
96 flow["deviceId"] = dev_id
97 flow["selector"]["criteria"][1]["ip"] = '%s/32' % str(netaddr.IPAddress(ip))
98 flow_list.append(flow)
99 body = {"flows": flow_list}
100 url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
101 req_data = json.dumps(body)
102 req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
103 data=req_data, auth=('onos', 'rocks'))
107 def _prepare_delete(cntl, method, flows, template=None):
108 """Creates a DELETE http requests to configure a flow in configuration datastore.
111 :param cntl: controller's ip address or hostname
113 :param method: determines http request method
115 :param flows: list of flow details
117 :param template: flow template to be to be filled
120 :returns req: http request object
123 for dev_id, flow_id in (flows):
124 flow = copy.deepcopy(template)
125 flow["deviceId"] = dev_id
126 flow["flowId"] = flow_id
127 flow_list.append(flow)
128 body = {"flows": flow_list}
129 url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
130 req_data = json.dumps(body)
131 req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
132 data=req_data, auth=('onos', 'rocks'))
136 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
137 template=None, outqueue=None, method=None):
138 """The funcion sends http requests.
140 Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
144 :param thread_id: thread id
146 :param preparefnc: function to preparesthe http request
148 :param inqueue: input queue, flow details are comming from here
150 :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
152 :param controllers: a list of controllers' ip addresses or hostnames
154 :param restport: restconf port
156 :param template: flow template used for creating flow content
158 :param outqueue: queue where the results should be put
160 :param method: method derermines the type of http request
163 nothing, results must be put into the output queue
165 ses = requests.Session()
166 cntl = controllers[0]
167 counter = [0 for i in range(600)]
172 flowlist = inqueue.get(timeout=1)
174 if exitevent.is_set() and inqueue.empty():
177 req = preparefnc(cntl, method, flowlist, template=template)
178 # prep = ses.prepare_request(req)
181 rsp = ses.send(prep, timeout=5)
182 except requests.exceptions.Timeout:
185 counter[rsp.status_code] += 1
187 for i, v in enumerate(counter):
193 def get_device_ids(controller='127.0.0.1', port=8181):
194 """Returns a list of switch ids"""
195 rsp = requests.get(url='http://{0}:{1}/onos/v1/devices'.format(controller, port), auth=('onos', 'rocks'))
196 if rsp.status_code != 200:
198 devices = json.loads(rsp.content)['devices']
199 ids = [d['id'] for d in devices if 'of:' in d['id']]
203 def get_flow_ids(controller='127.0.0.1', port=8181):
204 """Returns a list of flow ids"""
205 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
206 if rsp.status_code != 200:
208 flows = json.loads(rsp.content)['flows']
209 ids = [f['id'] for f in flows]
213 def get_flow_simple_stats(controller='127.0.0.1', port=8181):
214 """Returns a list of flow ids"""
215 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
216 if rsp.status_code != 200:
218 flows = json.loads(rsp.content)['flows']
221 if f['state'] not in res:
228 def get_flow_device_pairs(controller='127.0.0.1', port=8181, flow_details=[]):
229 """Pairing flows from controller with deteils we used ofr creation"""
230 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
231 if rsp.status_code != 200:
233 flows = json.loads(rsp.content)['flows']
234 # print "Flows", flows
235 # print "Details", flow_details
236 for dev_id, ip in flow_details:
237 # print "looking for details", dev_id, ip
239 # lets identify if it is our flow
240 if f["treatment"]["instructions"][0]["type"] != "DROP":
243 if f["deviceId"] == dev_id:
244 if "ip" in f["selector"]["criteria"][0]:
246 elif "ip" in f["selector"]["criteria"][1]:
250 # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip))
251 if f["selector"]["criteria"][item_idx]["ip"] == '%s/32' % str(netaddr.IPAddress(ip)):
252 # print dev_id, ip, f
253 yield dev_id, f["id"]
257 def get_flow_to_remove(controller='127.0.0.1', port=8181):
258 """Pairing flows from controller with deteils we used ofr creation"""
259 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
260 if rsp.status_code != 200:
262 flows = json.loads(rsp.content)['flows']
263 # print "Flows", flows
264 # print "Details", flow_details
267 # lets identify if it is our flow
268 if f["treatment"]["instructions"][0]["type"] != "NOACTION":
271 if "ip" in f["selector"]["criteria"][0]:
273 elif "ip" in f["selector"]["criteria"][1]:
277 # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip))
278 ipstr = f["selector"]["criteria"][item_idx]["ip"]
279 if '10.' in ipstr and '/32' in ipstr:
280 # print dev_id, ip, f
281 yield (f["deviceId"], f["id"])
286 parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
287 'into the config tree, as specified by optional parameters.')
289 parser.add_argument('--host', default='127.0.0.1',
290 help='Host where onos controller is running (default is 127.0.0.1)')
291 parser.add_argument('--port', default='8181',
292 help='Port on which onos\'s RESTCONF is listening (default is 8181)')
293 parser.add_argument('--threads', type=int, default=1,
294 help='Number of request worker threads to start in each cycle; default=1. '
295 'Each thread will add/delete <FLOWS> flows.')
296 parser.add_argument('--flows', type=int, default=10,
297 help='Number of flows that will be added/deleted in total, default 10')
298 parser.add_argument('--fpr', type=int, default=1,
299 help='Number of flows per REST request, default 1')
300 parser.add_argument('--timeout', type=int, default=100,
301 help='The maximum time (seconds) to wait between the add and delete cycles; default=100')
302 parser.add_argument('--no-delete', dest='no_delete', action='store_true', default=False,
303 help='Delete all added flows one by one, benchmark delete '
305 parser.add_argument('--bulk-delete', dest='bulk_delete', action='store_true', default=False,
306 help='Delete all flows in bulk; default=False')
307 parser.add_argument('--outfile', default='', help='Stores add and delete flow rest api rate; default=""')
309 in_args = parser.parse_args(*argv)
313 base_dev_ids = get_device_ids(controller=in_args.host)
314 base_flow_ids = get_flow_ids(controller=in_args.host)
316 ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
318 preparefnc = _prepare_post
320 base_num_flows = len(base_flow_ids)
323 print " devices:", len(base_dev_ids)
324 print " flows :", base_num_flows
326 # lets fill the queue for workers
330 sendqueue = Queue.Queue()
331 for i in range(in_args.flows):
332 dev_id = random.choice(base_dev_ids)
333 dst_ip = ip_addr.increment()
334 flow_list.append((dev_id, dst_ip))
335 flow_details.append((dev_id, dst_ip))
337 if nflows == in_args.fpr:
338 sendqueue.put(flow_list)
343 resultqueue = Queue.Queue()
345 exitevent = threading.Event()
350 for i in range(int(in_args.threads)):
351 thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
352 kwargs={"inqueue": sendqueue, "exitevent": exitevent,
353 "controllers": [in_args.host], "restport": in_args.port,
354 "template": flow_template, "outqueue": resultqueue, "method": "POST"})
361 # waitng for reqults and sum them up
364 # reading partial resutls from sender thread
365 part_result = resultqueue.get()
366 for k, v in part_result.iteritems():
372 print "Added", in_args.flows, "flows in", tmr.secs, "seconds", result
373 add_details = {"duration": tmr.secs, "flows": len(flow_details)}
375 # lets print some stats
376 print "\n\nStats monitoring ..."
379 for i in range(rounds):
380 flow_stats = get_flow_simple_stats(controller=in_args.host)
383 pending_adds = int(flow_stats[u'PENDING_ADD']) # noqa # FIXME: Print this somewhere.
389 print "... monitoring finished in +%d seconds\n\n" % t.secs
391 print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs)
393 if in_args.no_delete:
397 time.sleep(in_args.timeout)
399 # lets delete flows, need to get ids to be deleted, becasue we dont have flow_id on config time
400 # we have to pair flows on out own
401 flows_remove_details = []
402 # for a in get_flow_device_pairs(controller=in_args.host, flow_details=flow_details):
403 for a in get_flow_to_remove(controller=in_args.host):
404 flows_remove_details.append(a)
405 print "Flows to be removed: ", len(flows_remove_details)
407 # lets fill the queue for workers
410 sendqueue = Queue.Queue()
411 for fld in flows_remove_details:
412 flow_list.append(fld)
414 if nflows == in_args.fpr:
415 sendqueue.put(flow_list)
420 resultqueue = Queue.Queue()
422 exitevent = threading.Event()
425 preparefnc = _prepare_delete
428 for i in range(int(in_args.threads)):
429 thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
430 kwargs={"inqueue": sendqueue, "exitevent": exitevent,
431 "controllers": [in_args.host], "restport": in_args.port,
432 "template": flow_delete_template, "outqueue": resultqueue,
440 # waitng for reqults and sum them up
443 # reading partial resutls from sender thread
444 part_result = resultqueue.get()
445 for k, v in part_result.iteritems():
451 print "Removed", len(flows_remove_details), "flows in", tmr.secs, "seconds", result
452 del_details = {"duration": tmr.secs, "flows": len(flows_remove_details)}
454 # # lets print some stats
455 # print "\n\nSome stats monitoring ...."
456 # for i in range(100):
457 # print get_flow_simple_stats(controller=in_args.host)
459 # print "... monitoring finished\n\n"
460 # lets print some stats
461 print "\n\nStats monitoring ..."
464 for i in range(rounds):
465 flow_stats = get_flow_simple_stats(controller=in_args.host)
468 pending_rems = int(flow_stats[u'PENDING_REMOVE']) # noqa # FIXME: Print this somewhere.
474 print "... monitoring finished in +%d seconds\n\n" % t.secs
476 print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs)
478 if in_args.outfile != "":
479 addrate = add_details['flows'] / add_details['duration']
480 delrate = del_details['flows'] / del_details['duration']
481 print "addrate", addrate
482 print "delrate", delrate
484 with open(in_args.outfile, "wt") as fd:
485 fd.write("AddRate,DeleteRate\n")
486 fd.write("{0},{1}\n".format(addrate, delrate))
489 if __name__ == "__main__":