Bump pre-commit black to 22.1.0
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / onos_tester.py
1 import requests
2 import json
3 import argparse
4 import sys
5 import netaddr
6 import threading
7 import Queue
8 import random
9 import copy
10 import time
11
12
13 flow_template = {
14     "appId": 10,
15     "priority": 40000,
16     "timeout": 0,
17     "isPermanent": True,
18     "deviceId": "of:0000000000000001",
19     "treatment": {"instructions": [{"type": "NOACTION"}], "deferred": []},
20     "selector": {
21         "criteria": [
22             {"type": "ETH_TYPE", "ethType": 2048},
23             {"type": "IPV4_DST", "ip": "10.0.0.0/32"},
24         ]
25     },
26 }
27
28 flow_delete_template = {"deviceId": "of:0000000000000001", "flowId": 21392098393151996}
29
30
31 class Timer(object):
32     def __init__(self, verbose=False):
33         self.verbose = verbose
34
35     def __enter__(self):
36         self.start = time.time()
37         return self
38
39     def __exit__(self, *args):
40         self.end = time.time()
41         self.secs = self.end - self.start
42         self.msecs = self.secs * 1000  # millisecs
43         if self.verbose:
44             print("elapsed time: %f ms" % self.msecs)
45
46
47 class Counter(object):
48     def __init__(self, start=0):
49         self.lock = threading.Lock()
50         self.value = start
51
52     def increment(self, value=1):
53         self.lock.acquire()
54         val = self.value
55         try:
56             self.value += value
57         finally:
58             self.lock.release()
59         return val
60
61
62 def _prepare_post(cntl, method, flows, template=None):
63     """Creates a POST http requests to configure a flow in configuration datastore.
64
65     Args:
66         :param cntl: controller's ip address or hostname
67
68         :param method: determines http request method
69
70         :param flows: list of flow details
71
72         :param template: flow template to be to be filled
73
74     Returns:
75         :returns req: http request object
76     """
77     flow_list = []
78     for dev_id, ip in flows:
79         flow = copy.deepcopy(template)
80         flow["deviceId"] = dev_id
81         flow["selector"]["criteria"][1]["ip"] = "%s/32" % str(netaddr.IPAddress(ip))
82         flow_list.append(flow)
83     body = {"flows": flow_list}
84     url = "http://" + cntl + ":" + "8181/onos/v1/flows/"
85     req_data = json.dumps(body)
86     req = requests.Request(
87         method,
88         url,
89         headers={"Content-Type": "application/json"},
90         data=req_data,
91         auth=("onos", "rocks"),
92     )
93     return req
94
95
96 def _prepare_delete(cntl, method, flows, template=None):
97     """Creates a DELETE http requests to configure a flow in configuration datastore.
98
99     Args:
100         :param cntl: controller's ip address or hostname
101
102         :param method: determines http request method
103
104         :param flows: list of flow details
105
106         :param template: flow template to be to be filled
107
108     Returns:
109         :returns req: http request object
110     """
111     flow_list = []
112     for dev_id, flow_id in flows:
113         flow = copy.deepcopy(template)
114         flow["deviceId"] = dev_id
115         flow["flowId"] = flow_id
116         flow_list.append(flow)
117     body = {"flows": flow_list}
118     url = "http://" + cntl + ":" + "8181/onos/v1/flows/"
119     req_data = json.dumps(body)
120     req = requests.Request(
121         method,
122         url,
123         headers={"Content-Type": "application/json"},
124         data=req_data,
125         auth=("onos", "rocks"),
126     )
127     return req
128
129
130 def _wt_request_sender(
131     thread_id,
132     preparefnc,
133     inqueue=None,
134     exitevent=None,
135     controllers=[],
136     restport="",
137     template=None,
138     outqueue=None,
139     method=None,
140 ):
141     """The funcion sends http requests.
142
143     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
144     to the controller
145
146     Args:
147         :param thread_id: thread id
148
149         :param preparefnc: function to preparesthe http request
150
151         :param inqueue: input queue, flow details are comming from here
152
153         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
154
155         :param controllers: a list of controllers' ip addresses or hostnames
156
157         :param restport: restconf port
158
159         :param template: flow template used for creating flow content
160
161         :param outqueue: queue where the results should be put
162
163         :param method: method derermines the type of http request
164
165     Returns:
166         nothing, results must be put into the output queue
167     """
168     ses = requests.Session()
169     cntl = controllers[0]
170     counter = [0 for i in range(600)]
171     loop = True
172
173     while loop:
174         try:
175             flowlist = inqueue.get(timeout=1)
176         except Queue.Empty:
177             if exitevent.is_set() and inqueue.empty():
178                 loop = False
179             continue
180         req = preparefnc(cntl, method, flowlist, template=template)
181         # prep = ses.prepare_request(req)
182         prep = req.prepare()
183         try:
184             rsp = ses.send(prep, timeout=5)
185         except requests.exceptions.Timeout:
186             counter[99] += 1
187             continue
188         counter[rsp.status_code] += 1
189     res = {}
190     for i, v in enumerate(counter):
191         if v > 0:
192             res[i] = v
193     outqueue.put(res)
194
195
196 def get_device_ids(controller="127.0.0.1", port=8181):
197     """Returns a list of switch ids"""
198     rsp = requests.get(
199         url="http://{0}:{1}/onos/v1/devices".format(controller, port),
200         auth=("onos", "rocks"),
201     )
202     if rsp.status_code != 200:
203         return []
204     devices = json.loads(rsp.content)["devices"]
205     ids = [d["id"] for d in devices if "of:" in d["id"]]
206     return ids
207
208
209 def get_flow_ids(controller="127.0.0.1", port=8181):
210     """Returns a list of flow ids"""
211     rsp = requests.get(
212         url="http://{0}:{1}/onos/v1/flows".format(controller, port),
213         auth=("onos", "rocks"),
214     )
215     if rsp.status_code != 200:
216         return []
217     flows = json.loads(rsp.content)["flows"]
218     ids = [f["id"] for f in flows]
219     return ids
220
221
222 def get_flow_simple_stats(controller="127.0.0.1", port=8181):
223     """Returns a list of flow ids"""
224     rsp = requests.get(
225         url="http://{0}:{1}/onos/v1/flows".format(controller, port),
226         auth=("onos", "rocks"),
227     )
228     if rsp.status_code != 200:
229         return []
230     flows = json.loads(rsp.content)["flows"]
231     res = {}
232     for f in flows:
233         if f["state"] not in res:
234             res[f["state"]] = 1
235         else:
236             res[f["state"]] += 1
237     return res
238
239
240 def get_flow_device_pairs(controller="127.0.0.1", port=8181, flow_details=[]):
241     """Pairing flows from controller with deteils we used ofr creation"""
242     rsp = requests.get(
243         url="http://{0}:{1}/onos/v1/flows".format(controller, port),
244         auth=("onos", "rocks"),
245     )
246     if rsp.status_code != 200:
247         return
248     flows = json.loads(rsp.content)["flows"]
249     for dev_id, ip in flow_details:
250         for f in flows:
251             # lets identify if it is our flow
252             if f["treatment"]["instructions"][0]["type"] != "DROP":
253                 continue
254             if f["deviceId"] == dev_id:
255                 if "ip" in f["selector"]["criteria"][0]:
256                     item_idx = 0
257                 elif "ip" in f["selector"]["criteria"][1]:
258                     item_idx = 1
259                 else:
260                     continue
261                 if f["selector"]["criteria"][item_idx]["ip"] == "%s/32" % str(
262                     netaddr.IPAddress(ip)
263                 ):
264                     yield dev_id, f["id"]
265                     break
266
267
268 def get_flow_to_remove(controller="127.0.0.1", port=8181):
269     """Pairing flows from controller with deteils we used ofr creation"""
270     rsp = requests.get(
271         url="http://{0}:{1}/onos/v1/flows".format(controller, port),
272         auth=("onos", "rocks"),
273     )
274     if rsp.status_code != 200:
275         return
276     flows = json.loads(rsp.content)["flows"]
277
278     for f in flows:
279         # lets identify if it is our flow
280         if f["treatment"]["instructions"][0]["type"] != "NOACTION":
281             continue
282         if "ip" in f["selector"]["criteria"][0]:
283             item_idx = 0
284         elif "ip" in f["selector"]["criteria"][1]:
285             item_idx = 1
286         else:
287             continue
288         ipstr = f["selector"]["criteria"][item_idx]["ip"]
289         if "10." in ipstr and "/32" in ipstr:
290             yield (f["deviceId"], f["id"])
291
292
293 def main(*argv):
294
295     parser = argparse.ArgumentParser(
296         description="Flow programming performance test: First adds and then deletes flows "
297         "into the config tree, as specified by optional parameters."
298     )
299
300     parser.add_argument(
301         "--host",
302         default="127.0.0.1",
303         help="Host where onos controller is running (default is 127.0.0.1)",
304     )
305     parser.add_argument(
306         "--port",
307         default="8181",
308         help="Port on which onos's RESTCONF is listening (default is 8181)",
309     )
310     parser.add_argument(
311         "--threads",
312         type=int,
313         default=1,
314         help="Number of request worker threads to start in each cycle; default=1. "
315         "Each thread will add/delete <FLOWS> flows.",
316     )
317     parser.add_argument(
318         "--flows",
319         type=int,
320         default=10,
321         help="Number of flows that will be added/deleted in total, default 10",
322     )
323     parser.add_argument(
324         "--fpr", type=int, default=1, help="Number of flows per REST request, default 1"
325     )
326     parser.add_argument(
327         "--timeout",
328         type=int,
329         default=100,
330         help="The maximum time (seconds) to wait between the add and delete cycles; default=100",
331     )
332     parser.add_argument(
333         "--no-delete",
334         dest="no_delete",
335         action="store_true",
336         default=False,
337         help="Delete all added flows one by one, benchmark delete " "performance.",
338     )
339     parser.add_argument(
340         "--bulk-delete",
341         dest="bulk_delete",
342         action="store_true",
343         default=False,
344         help="Delete all flows in bulk; default=False",
345     )
346     parser.add_argument(
347         "--outfile",
348         default="",
349         help='Stores add and delete flow rest api rate; default=""',
350     )
351
352     in_args = parser.parse_args(*argv)
353     print(in_args)
354
355     # get device ids
356     base_dev_ids = get_device_ids(controller=in_args.host)
357     base_flow_ids = get_flow_ids(controller=in_args.host)
358     # ip
359     ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
360     # prepare func
361     preparefnc = _prepare_post
362
363     base_num_flows = len(base_flow_ids)
364
365     print("BASELINE:")
366     print("    devices:", len(base_dev_ids))
367     print("    flows  :", base_num_flows)
368
369     # lets fill the queue for workers
370     nflows = 0
371     flow_list = []
372     flow_details = []
373     sendqueue = Queue.Queue()
374     for i in range(in_args.flows):
375         dev_id = random.choice(base_dev_ids)
376         dst_ip = ip_addr.increment()
377         flow_list.append((dev_id, dst_ip))
378         flow_details.append((dev_id, dst_ip))
379         nflows += 1
380         if nflows == in_args.fpr:
381             sendqueue.put(flow_list)
382             nflows = 0
383             flow_list = []
384
385     # result_gueue
386     resultqueue = Queue.Queue()
387     # creaet exit event
388     exitevent = threading.Event()
389
390     # run workers
391     with Timer() as tmr:
392         threads = []
393         for i in range(int(in_args.threads)):
394             thr = threading.Thread(
395                 target=_wt_request_sender,
396                 args=(i, preparefnc),
397                 kwargs={
398                     "inqueue": sendqueue,
399                     "exitevent": exitevent,
400                     "controllers": [in_args.host],
401                     "restport": in_args.port,
402                     "template": flow_template,
403                     "outqueue": resultqueue,
404                     "method": "POST",
405                 },
406             )
407             threads.append(thr)
408             thr.start()
409
410         exitevent.set()
411
412         result = {}
413         # waitng for reqults and sum them up
414         for t in threads:
415             t.join()
416             # reading partial resutls from sender thread
417             part_result = resultqueue.get()
418             for k, v in part_result.iteritems():
419                 if k not in result:
420                     result[k] = v
421                 else:
422                     result[k] += v
423
424     print("Added", in_args.flows, "flows in", tmr.secs, "seconds", result)
425     add_details = {"duration": tmr.secs, "flows": len(flow_details)}
426
427     # lets print some stats
428     print("\n\nStats monitoring ...")
429     rounds = 200
430     with Timer() as t:
431         for i in range(rounds):
432             flow_stats = get_flow_simple_stats(controller=in_args.host)
433             print(flow_stats)
434             try:
435                 pending_adds = int(
436                     flow_stats["PENDING_ADD"]
437                 )  # noqa  # FIXME: Print this somewhere.
438             except KeyError:
439                 break
440             time.sleep(1)
441
442     if i < rounds:
443         print("... monitoring finished in +%d seconds\n\n" % (t.secs))
444     else:
445         print(
446             "... monitoring aborted after %d rounds, elapsed time %d\n\n"
447             % ((rounds, t.secs))
448         )
449
450     if in_args.no_delete:
451         return
452
453     # sleep in between
454     time.sleep(in_args.timeout)
455
456     # lets delete flows, need to get ids to be deleted, becasue we dont have flow_id on config time
457     # we have to pair flows on out own
458     flows_remove_details = []
459     # for a in get_flow_device_pairs(controller=in_args.host, flow_details=flow_details):
460     for a in get_flow_to_remove(controller=in_args.host):
461         flows_remove_details.append(a)
462     print("Flows to be removed: ", len(flows_remove_details))
463
464     # lets fill the queue for workers
465     nflows = 0
466     flow_list = []
467     sendqueue = Queue.Queue()
468     for fld in flows_remove_details:
469         flow_list.append(fld)
470         nflows += 1
471         if nflows == in_args.fpr:
472             sendqueue.put(flow_list)
473             nflows = 0
474             flow_list = []
475
476     # result_gueue
477     resultqueue = Queue.Queue()
478     # creaet exit event
479     exitevent = threading.Event()
480
481     # run workers
482     preparefnc = _prepare_delete
483     with Timer() as tmr:
484         threads = []
485         for i in range(int(in_args.threads)):
486             thr = threading.Thread(
487                 target=_wt_request_sender,
488                 args=(i, preparefnc),
489                 kwargs={
490                     "inqueue": sendqueue,
491                     "exitevent": exitevent,
492                     "controllers": [in_args.host],
493                     "restport": in_args.port,
494                     "template": flow_delete_template,
495                     "outqueue": resultqueue,
496                     "method": "DELETE",
497                 },
498             )
499             threads.append(thr)
500             thr.start()
501
502         exitevent.set()
503
504         result = {}
505         # waitng for reqults and sum them up
506         for t in threads:
507             t.join()
508             # reading partial resutls from sender thread
509             part_result = resultqueue.get()
510             for k, v in part_result.iteritems():
511                 if k not in result:
512                     result[k] = v
513                 else:
514                     result[k] += v
515
516     print("Removed", len(flows_remove_details), "flows in", tmr.secs, "seconds", result)
517     del_details = {"duration": tmr.secs, "flows": len(flows_remove_details)}
518
519     print("\n\nStats monitoring ...")
520     rounds = 200
521     with Timer() as t:
522         for i in range(rounds):
523             flow_stats = get_flow_simple_stats(controller=in_args.host)
524             print(flow_stats)
525             try:
526                 pending_rems = int(
527                     flow_stats["PENDING_REMOVE"]
528                 )  # noqa  # FIXME: Print this somewhere.
529             except KeyError:
530                 break
531             time.sleep(1)
532
533     if i < rounds:
534         print("... monitoring finished in +%d seconds\n\n" % (t.secs))
535     else:
536         print(
537             "... monitoring aborted after %d rounds, elapsed time %d\n\n"
538             % ((rounds, t.secs))
539         )
540
541     if in_args.outfile != "":
542         addrate = add_details["flows"] / add_details["duration"]
543         delrate = del_details["flows"] / del_details["duration"]
544         print("addrate", addrate)
545         print("delrate", delrate)
546
547         with open(in_args.outfile, "wt") as fd:
548             fd.write("AddRate,DeleteRate\n")
549             fd.write("{0},{1}\n".format(addrate, delrate))
550
551
552 if __name__ == "__main__":
553     main(sys.argv[1:])