18 "deviceId": "of:0000000000000001",
19 "treatment": {"instructions": [{"type": "NOACTION"}], "deferred": []},
22 {"type": "ETH_TYPE", "ethType": 2048},
23 {"type": "IPV4_DST", "ip": "10.0.0.0/32"},
28 flow_delete_template = {"deviceId": "of:0000000000000001", "flowId": 21392098393151996}
32 def __init__(self, verbose=False):
33 self.verbose = verbose
36 self.start = time.time()
39 def __exit__(self, *args):
40 self.end = time.time()
41 self.secs = self.end - self.start
42 self.msecs = self.secs * 1000 # millisecs
44 print("elapsed time: %f ms" % self.msecs)
47 class Counter(object):
48 def __init__(self, start=0):
49 self.lock = threading.Lock()
52 def increment(self, value=1):
62 def _prepare_post(cntl, method, flows, template=None):
63 """Creates a POST http requests to configure a flow in configuration datastore.
66 :param cntl: controller's ip address or hostname
68 :param method: determines http request method
70 :param flows: list of flow details
72 :param template: flow template to be to be filled
75 :returns req: http request object
78 for dev_id, ip in flows:
79 flow = copy.deepcopy(template)
80 flow["deviceId"] = dev_id
81 flow["selector"]["criteria"][1]["ip"] = "%s/32" % str(netaddr.IPAddress(ip))
82 flow_list.append(flow)
83 body = {"flows": flow_list}
84 url = "http://" + cntl + ":" + "8181/onos/v1/flows/"
85 req_data = json.dumps(body)
86 req = requests.Request(
89 headers={"Content-Type": "application/json"},
91 auth=("onos", "rocks"),
96 def _prepare_delete(cntl, method, flows, template=None):
97 """Creates a DELETE http requests to configure a flow in configuration datastore.
100 :param cntl: controller's ip address or hostname
102 :param method: determines http request method
104 :param flows: list of flow details
106 :param template: flow template to be to be filled
109 :returns req: http request object
112 for dev_id, flow_id in flows:
113 flow = copy.deepcopy(template)
114 flow["deviceId"] = dev_id
115 flow["flowId"] = flow_id
116 flow_list.append(flow)
117 body = {"flows": flow_list}
118 url = "http://" + cntl + ":" + "8181/onos/v1/flows/"
119 req_data = json.dumps(body)
120 req = requests.Request(
123 headers={"Content-Type": "application/json"},
125 auth=("onos", "rocks"),
130 def _wt_request_sender(
141 """The funcion sends http requests.
143 Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
147 :param thread_id: thread id
149 :param preparefnc: function to preparesthe http request
151 :param inqueue: input queue, flow details are comming from here
153 :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
155 :param controllers: a list of controllers' ip addresses or hostnames
157 :param restport: restconf port
159 :param template: flow template used for creating flow content
161 :param outqueue: queue where the results should be put
163 :param method: method derermines the type of http request
166 nothing, results must be put into the output queue
168 ses = requests.Session()
169 cntl = controllers[0]
170 counter = [0 for i in range(600)]
175 flowlist = inqueue.get(timeout=1)
177 if exitevent.is_set() and inqueue.empty():
180 req = preparefnc(cntl, method, flowlist, template=template)
181 # prep = ses.prepare_request(req)
184 rsp = ses.send(prep, timeout=5)
185 except requests.exceptions.Timeout:
188 counter[rsp.status_code] += 1
190 for i, v in enumerate(counter):
196 def get_device_ids(controller="127.0.0.1", port=8181):
197 """Returns a list of switch ids"""
199 url="http://{0}:{1}/onos/v1/devices".format(controller, port),
200 auth=("onos", "rocks"),
202 if rsp.status_code != 200:
204 devices = json.loads(rsp.content)["devices"]
205 ids = [d["id"] for d in devices if "of:" in d["id"]]
209 def get_flow_ids(controller="127.0.0.1", port=8181):
210 """Returns a list of flow ids"""
212 url="http://{0}:{1}/onos/v1/flows".format(controller, port),
213 auth=("onos", "rocks"),
215 if rsp.status_code != 200:
217 flows = json.loads(rsp.content)["flows"]
218 ids = [f["id"] for f in flows]
222 def get_flow_simple_stats(controller="127.0.0.1", port=8181):
223 """Returns a list of flow ids"""
225 url="http://{0}:{1}/onos/v1/flows".format(controller, port),
226 auth=("onos", "rocks"),
228 if rsp.status_code != 200:
230 flows = json.loads(rsp.content)["flows"]
233 if f["state"] not in res:
240 def get_flow_device_pairs(controller="127.0.0.1", port=8181, flow_details=[]):
241 """Pairing flows from controller with deteils we used ofr creation"""
243 url="http://{0}:{1}/onos/v1/flows".format(controller, port),
244 auth=("onos", "rocks"),
246 if rsp.status_code != 200:
248 flows = json.loads(rsp.content)["flows"]
249 for dev_id, ip in flow_details:
251 # lets identify if it is our flow
252 if f["treatment"]["instructions"][0]["type"] != "DROP":
254 if f["deviceId"] == dev_id:
255 if "ip" in f["selector"]["criteria"][0]:
257 elif "ip" in f["selector"]["criteria"][1]:
261 if f["selector"]["criteria"][item_idx]["ip"] == "%s/32" % str(
262 netaddr.IPAddress(ip)
264 yield dev_id, f["id"]
268 def get_flow_to_remove(controller="127.0.0.1", port=8181):
269 """Pairing flows from controller with deteils we used ofr creation"""
271 url="http://{0}:{1}/onos/v1/flows".format(controller, port),
272 auth=("onos", "rocks"),
274 if rsp.status_code != 200:
276 flows = json.loads(rsp.content)["flows"]
279 # lets identify if it is our flow
280 if f["treatment"]["instructions"][0]["type"] != "NOACTION":
282 if "ip" in f["selector"]["criteria"][0]:
284 elif "ip" in f["selector"]["criteria"][1]:
288 ipstr = f["selector"]["criteria"][item_idx]["ip"]
289 if "10." in ipstr and "/32" in ipstr:
290 yield (f["deviceId"], f["id"])
295 parser = argparse.ArgumentParser(
296 description="Flow programming performance test: First adds and then deletes flows "
297 "into the config tree, as specified by optional parameters."
303 help="Host where onos controller is running (default is 127.0.0.1)",
308 help="Port on which onos's RESTCONF is listening (default is 8181)",
314 help="Number of request worker threads to start in each cycle; default=1. "
315 "Each thread will add/delete <FLOWS> flows.",
321 help="Number of flows that will be added/deleted in total, default 10",
324 "--fpr", type=int, default=1, help="Number of flows per REST request, default 1"
330 help="The maximum time (seconds) to wait between the add and delete cycles; default=100",
337 help="Delete all added flows one by one, benchmark delete " "performance.",
344 help="Delete all flows in bulk; default=False",
349 help='Stores add and delete flow rest api rate; default=""',
352 in_args = parser.parse_args(*argv)
356 base_dev_ids = get_device_ids(controller=in_args.host)
357 base_flow_ids = get_flow_ids(controller=in_args.host)
359 ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
361 preparefnc = _prepare_post
363 base_num_flows = len(base_flow_ids)
366 print(" devices:", len(base_dev_ids))
367 print(" flows :", base_num_flows)
369 # lets fill the queue for workers
373 sendqueue = Queue.Queue()
374 for i in range(in_args.flows):
375 dev_id = random.choice(base_dev_ids)
376 dst_ip = ip_addr.increment()
377 flow_list.append((dev_id, dst_ip))
378 flow_details.append((dev_id, dst_ip))
380 if nflows == in_args.fpr:
381 sendqueue.put(flow_list)
386 resultqueue = Queue.Queue()
388 exitevent = threading.Event()
393 for i in range(int(in_args.threads)):
394 thr = threading.Thread(
395 target=_wt_request_sender,
396 args=(i, preparefnc),
398 "inqueue": sendqueue,
399 "exitevent": exitevent,
400 "controllers": [in_args.host],
401 "restport": in_args.port,
402 "template": flow_template,
403 "outqueue": resultqueue,
413 # waitng for reqults and sum them up
416 # reading partial resutls from sender thread
417 part_result = resultqueue.get()
418 for k, v in part_result.iteritems():
424 print("Added", in_args.flows, "flows in", tmr.secs, "seconds", result)
425 add_details = {"duration": tmr.secs, "flows": len(flow_details)}
427 # lets print some stats
428 print("\n\nStats monitoring ...")
431 for i in range(rounds):
432 flow_stats = get_flow_simple_stats(controller=in_args.host)
436 flow_stats["PENDING_ADD"]
437 ) # noqa # FIXME: Print this somewhere.
443 print("... monitoring finished in +%d seconds\n\n" % (t.secs))
446 "... monitoring aborted after %d rounds, elapsed time %d\n\n"
450 if in_args.no_delete:
454 time.sleep(in_args.timeout)
456 # lets delete flows, need to get ids to be deleted, becasue we dont have flow_id on config time
457 # we have to pair flows on out own
458 flows_remove_details = []
459 # for a in get_flow_device_pairs(controller=in_args.host, flow_details=flow_details):
460 for a in get_flow_to_remove(controller=in_args.host):
461 flows_remove_details.append(a)
462 print("Flows to be removed: ", len(flows_remove_details))
464 # lets fill the queue for workers
467 sendqueue = Queue.Queue()
468 for fld in flows_remove_details:
469 flow_list.append(fld)
471 if nflows == in_args.fpr:
472 sendqueue.put(flow_list)
477 resultqueue = Queue.Queue()
479 exitevent = threading.Event()
482 preparefnc = _prepare_delete
485 for i in range(int(in_args.threads)):
486 thr = threading.Thread(
487 target=_wt_request_sender,
488 args=(i, preparefnc),
490 "inqueue": sendqueue,
491 "exitevent": exitevent,
492 "controllers": [in_args.host],
493 "restport": in_args.port,
494 "template": flow_delete_template,
495 "outqueue": resultqueue,
505 # waitng for reqults and sum them up
508 # reading partial resutls from sender thread
509 part_result = resultqueue.get()
510 for k, v in part_result.iteritems():
516 print("Removed", len(flows_remove_details), "flows in", tmr.secs, "seconds", result)
517 del_details = {"duration": tmr.secs, "flows": len(flows_remove_details)}
519 print("\n\nStats monitoring ...")
522 for i in range(rounds):
523 flow_stats = get_flow_simple_stats(controller=in_args.host)
527 flow_stats["PENDING_REMOVE"]
528 ) # noqa # FIXME: Print this somewhere.
534 print("... monitoring finished in +%d seconds\n\n" % (t.secs))
537 "... monitoring aborted after %d rounds, elapsed time %d\n\n"
541 if in_args.outfile != "":
542 addrate = add_details["flows"] / add_details["duration"]
543 delrate = del_details["flows"] / del_details["duration"]
544 print("addrate", addrate)
545 print("delrate", delrate)
547 with open(in_args.outfile, "wt") as fd:
548 fd.write("AddRate,DeleteRate\n")
549 fd.write("{0},{1}\n".format(addrate, delrate))
552 if __name__ == "__main__":