18 "deviceId": "of:0000000000000001",
41 flow_delete_template = {
42 "deviceId": "of:0000000000000001",
43 "flowId": 21392098393151996
48 def __init__(self, verbose=False):
49 self.verbose = verbose
52 self.start = time.time()
55 def __exit__(self, *args):
56 self.end = time.time()
57 self.secs = self.end - self.start
58 self.msecs = self.secs * 1000 # millisecs
60 print("elapsed time: %f ms" % self.msecs)
63 class Counter(object):
64 def __init__(self, start=0):
65 self.lock = threading.Lock()
68 def increment(self, value=1):
78 def _prepare_post(cntl, method, flows, template=None):
79 """Creates a POST http requests to configure a flow in configuration datastore.
82 :param cntl: controller's ip address or hostname
84 :param method: determines http request method
86 :param flows: list of flow details
88 :param template: flow template to be to be filled
91 :returns req: http request object
94 for dev_id, ip in (flows):
95 flow = copy.deepcopy(template)
96 flow["deviceId"] = dev_id
97 flow["selector"]["criteria"][1]["ip"] = '%s/32' % str(netaddr.IPAddress(ip))
98 flow_list.append(flow)
99 body = {"flows": flow_list}
100 url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
101 req_data = json.dumps(body)
102 req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
103 data=req_data, auth=('onos', 'rocks'))
107 def _prepare_delete(cntl, method, flows, template=None):
108 """Creates a DELETE http requests to configure a flow in configuration datastore.
111 :param cntl: controller's ip address or hostname
113 :param method: determines http request method
115 :param flows: list of flow details
117 :param template: flow template to be to be filled
120 :returns req: http request object
123 for dev_id, flow_id in (flows):
124 flow = copy.deepcopy(template)
125 flow["deviceId"] = dev_id
126 flow["flowId"] = flow_id
127 flow_list.append(flow)
128 body = {"flows": flow_list}
129 url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
130 req_data = json.dumps(body)
131 req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
132 data=req_data, auth=('onos', 'rocks'))
136 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
137 template=None, outqueue=None, method=None):
138 """The funcion sends http requests.
140 Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
144 :param thread_id: thread id
146 :param preparefnc: function to preparesthe http request
148 :param inqueue: input queue, flow details are comming from here
150 :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
152 :param controllers: a list of controllers' ip addresses or hostnames
154 :param restport: restconf port
156 :param template: flow template used for creating flow content
158 :param outqueue: queue where the results should be put
160 :param method: method derermines the type of http request
163 nothing, results must be put into the output queue
165 ses = requests.Session()
166 cntl = controllers[0]
167 counter = [0 for i in range(600)]
172 flowlist = inqueue.get(timeout=1)
174 if exitevent.is_set() and inqueue.empty():
177 req = preparefnc(cntl, method, flowlist, template=template)
178 # prep = ses.prepare_request(req)
181 rsp = ses.send(prep, timeout=5)
182 except requests.exceptions.Timeout:
185 counter[rsp.status_code] += 1
187 for i, v in enumerate(counter):
193 def get_device_ids(controller='127.0.0.1', port=8181):
194 """Returns a list of switch ids"""
195 rsp = requests.get(url='http://{0}:{1}/onos/v1/devices'.format(controller, port), auth=('onos', 'rocks'))
196 if rsp.status_code != 200:
198 devices = json.loads(rsp.content)['devices']
199 ids = [d['id'] for d in devices if 'of:' in d['id']]
203 def get_flow_ids(controller='127.0.0.1', port=8181):
204 """Returns a list of flow ids"""
205 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
206 if rsp.status_code != 200:
208 flows = json.loads(rsp.content)['flows']
209 ids = [f['id'] for f in flows]
213 def get_flow_simple_stats(controller='127.0.0.1', port=8181):
214 """Returns a list of flow ids"""
215 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
216 if rsp.status_code != 200:
218 flows = json.loads(rsp.content)['flows']
221 if f['state'] not in res:
228 def get_flow_device_pairs(controller='127.0.0.1', port=8181, flow_details=[]):
229 """Pairing flows from controller with deteils we used ofr creation"""
230 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
231 if rsp.status_code != 200:
233 flows = json.loads(rsp.content)['flows']
234 for dev_id, ip in flow_details:
236 # lets identify if it is our flow
237 if f["treatment"]["instructions"][0]["type"] != "DROP":
239 if f["deviceId"] == dev_id:
240 if "ip" in f["selector"]["criteria"][0]:
242 elif "ip" in f["selector"]["criteria"][1]:
246 if f["selector"]["criteria"][item_idx]["ip"] == '%s/32' % str(netaddr.IPAddress(ip)):
247 yield dev_id, f["id"]
251 def get_flow_to_remove(controller='127.0.0.1', port=8181):
252 """Pairing flows from controller with deteils we used ofr creation"""
253 rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
254 if rsp.status_code != 200:
256 flows = json.loads(rsp.content)['flows']
259 # lets identify if it is our flow
260 if f["treatment"]["instructions"][0]["type"] != "NOACTION":
262 if "ip" in f["selector"]["criteria"][0]:
264 elif "ip" in f["selector"]["criteria"][1]:
268 ipstr = f["selector"]["criteria"][item_idx]["ip"]
269 if '10.' in ipstr and '/32' in ipstr:
270 yield (f["deviceId"], f["id"])
275 parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
276 'into the config tree, as specified by optional parameters.')
278 parser.add_argument('--host', default='127.0.0.1',
279 help='Host where onos controller is running (default is 127.0.0.1)')
280 parser.add_argument('--port', default='8181',
281 help='Port on which onos\'s RESTCONF is listening (default is 8181)')
282 parser.add_argument('--threads', type=int, default=1,
283 help='Number of request worker threads to start in each cycle; default=1. '
284 'Each thread will add/delete <FLOWS> flows.')
285 parser.add_argument('--flows', type=int, default=10,
286 help='Number of flows that will be added/deleted in total, default 10')
287 parser.add_argument('--fpr', type=int, default=1,
288 help='Number of flows per REST request, default 1')
289 parser.add_argument('--timeout', type=int, default=100,
290 help='The maximum time (seconds) to wait between the add and delete cycles; default=100')
291 parser.add_argument('--no-delete', dest='no_delete', action='store_true', default=False,
292 help='Delete all added flows one by one, benchmark delete '
294 parser.add_argument('--bulk-delete', dest='bulk_delete', action='store_true', default=False,
295 help='Delete all flows in bulk; default=False')
296 parser.add_argument('--outfile', default='', help='Stores add and delete flow rest api rate; default=""')
298 in_args = parser.parse_args(*argv)
302 base_dev_ids = get_device_ids(controller=in_args.host)
303 base_flow_ids = get_flow_ids(controller=in_args.host)
305 ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
307 preparefnc = _prepare_post
309 base_num_flows = len(base_flow_ids)
312 print(" devices:", len(base_dev_ids))
313 print(" flows :", base_num_flows)
315 # lets fill the queue for workers
319 sendqueue = Queue.Queue()
320 for i in range(in_args.flows):
321 dev_id = random.choice(base_dev_ids)
322 dst_ip = ip_addr.increment()
323 flow_list.append((dev_id, dst_ip))
324 flow_details.append((dev_id, dst_ip))
326 if nflows == in_args.fpr:
327 sendqueue.put(flow_list)
332 resultqueue = Queue.Queue()
334 exitevent = threading.Event()
339 for i in range(int(in_args.threads)):
340 thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
341 kwargs={"inqueue": sendqueue, "exitevent": exitevent,
342 "controllers": [in_args.host], "restport": in_args.port,
343 "template": flow_template, "outqueue": resultqueue, "method": "POST"})
350 # waitng for reqults and sum them up
353 # reading partial resutls from sender thread
354 part_result = resultqueue.get()
355 for k, v in part_result.iteritems():
361 print("Added", in_args.flows, "flows in", tmr.secs, "seconds", result)
362 add_details = {"duration": tmr.secs, "flows": len(flow_details)}
364 # lets print some stats
365 print("\n\nStats monitoring ...")
368 for i in range(rounds):
369 flow_stats = get_flow_simple_stats(controller=in_args.host)
372 pending_adds = int(flow_stats[u'PENDING_ADD']) # noqa # FIXME: Print this somewhere.
378 print("... monitoring finished in +%d seconds\n\n" % (t.secs))
380 print("... monitoring aborted after %d rounds, elapsed time %d\n\n" % ((rounds, t.secs)))
382 if in_args.no_delete:
386 time.sleep(in_args.timeout)
388 # lets delete flows, need to get ids to be deleted, becasue we dont have flow_id on config time
389 # we have to pair flows on out own
390 flows_remove_details = []
391 # for a in get_flow_device_pairs(controller=in_args.host, flow_details=flow_details):
392 for a in get_flow_to_remove(controller=in_args.host):
393 flows_remove_details.append(a)
394 print("Flows to be removed: ", len(flows_remove_details))
396 # lets fill the queue for workers
399 sendqueue = Queue.Queue()
400 for fld in flows_remove_details:
401 flow_list.append(fld)
403 if nflows == in_args.fpr:
404 sendqueue.put(flow_list)
409 resultqueue = Queue.Queue()
411 exitevent = threading.Event()
414 preparefnc = _prepare_delete
417 for i in range(int(in_args.threads)):
418 thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
419 kwargs={"inqueue": sendqueue, "exitevent": exitevent,
420 "controllers": [in_args.host], "restport": in_args.port,
421 "template": flow_delete_template, "outqueue": resultqueue,
429 # waitng for reqults and sum them up
432 # reading partial resutls from sender thread
433 part_result = resultqueue.get()
434 for k, v in part_result.iteritems():
440 print("Removed", len(flows_remove_details), "flows in", tmr.secs, "seconds", result)
441 del_details = {"duration": tmr.secs, "flows": len(flows_remove_details)}
443 print("\n\nStats monitoring ...")
446 for i in range(rounds):
447 flow_stats = get_flow_simple_stats(controller=in_args.host)
450 pending_rems = int(flow_stats[u'PENDING_REMOVE']) # noqa # FIXME: Print this somewhere.
456 print("... monitoring finished in +%d seconds\n\n" % (t.secs))
458 print("... monitoring aborted after %d rounds, elapsed time %d\n\n" % ((rounds, t.secs)))
460 if in_args.outfile != "":
461 addrate = add_details['flows'] / add_details['duration']
462 delrate = del_details['flows'] / del_details['duration']
463 print("addrate", addrate)
464 print("delrate", delrate)
466 with open(in_args.outfile, "wt") as fd:
467 fd.write("AddRate,DeleteRate\n")
468 fd.write("{0},{1}\n".format(addrate, delrate))
471 if __name__ == "__main__":