Migrate Get Requests invocations(libraries)
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / odl_tester.py
1 import requests
2 import json
3 import argparse
4 import sys
5 import netaddr
6 import threading
7 import Queue
8 import random
9 import copy
10 import time
11
12
13 flow_template = {
14     "id": "2",
15     "match": {
16         "ethernet-match": {"ethernet-type": {"type": 2048}},
17         "ipv4-destination": "10.0.20.0/24",
18     },
19     "priority": 2,
20     "table_id": 0,
21 }
22 odl_node_url = "/restconf/config/opendaylight-inventory:nodes/node/"
23
24
25 class Timer(object):
26     def __init__(self, verbose=False):
27         self.verbose = verbose
28
29     def __enter__(self):
30         self.start = time.time()
31         return self
32
33     def __exit__(self, *args):
34         self.end = time.time()
35         self.secs = self.end - self.start
36         self.msecs = self.secs * 1000  # millisecs
37         if self.verbose:
38             print("elapsed time: %f ms" % self.msecs)
39
40
41 class Counter(object):
42     def __init__(self, start=0):
43         self.lock = threading.Lock()
44         self.value = start
45
46     def increment(self, value=1):
47         self.lock.acquire()
48         val = self.value
49         try:
50             self.value += value
51         finally:
52             self.lock.release()
53         return val
54
55
56 def _prepare_post(cntl, method, flows, template=None):
57     """Creates a POST http requests to configure a flow in configuration datastore.
58
59     Args:
60         :param cntl: controller's ip address or hostname
61
62         :param method: determines http request method
63
64         :param flows: list of flow details
65
66         :param template: flow template to be to be filled
67
68     Returns:
69         :returns req: http request object
70     """
71     flow_list = []
72     for dev_id, ip in flows:
73         flow = copy.deepcopy(template)
74         flow["id"] = ip
75         flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
76         flow_list.append(flow)
77     body = {"flow": flow_list}
78     url = "http://" + cntl + ":8181" + odl_node_url + dev_id + "/table/0"
79     req_data = json.dumps(body)
80     req = requests.Request(
81         method,
82         url,
83         headers={"Content-Type": "application/json"},
84         data=req_data,
85         auth=("admin", "admin"),
86     )
87     return req
88
89
90 def _prepare_delete(cntl, method, flows, template=None):
91     """Creates a DELETE http requests to configure a flow in configuration datastore.
92
93     Args:
94         :param cntl: controller's ip address or hostname
95
96         :param method: determines http request method
97
98         :param flows: list of flow details
99
100         :param template: flow template to be to be filled
101
102     Returns:
103         :returns req: http request object
104     """
105     dev_id, flow_id = flows[0]
106     url = (
107         "http://"
108         + cntl
109         + ":8181"
110         + odl_node_url
111         + dev_id
112         + "/table/0/flow/"
113         + str(flow_id)
114     )
115     req = requests.Request(
116         method,
117         url,
118         headers={"Content-Type": "application/json"},
119         data=None,
120         auth=("admin", "admin"),
121     )
122     return req
123
124
125 def _wt_request_sender(
126     thread_id,
127     preparefnc,
128     inqueue=None,
129     exitevent=None,
130     controllers=[],
131     restport="",
132     template=None,
133     outqueue=None,
134     method=None,
135 ):
136     """The funcion sends http requests.
137
138     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
139     to the controller
140
141     Args:
142         :param thread_id: thread id
143
144         :param preparefnc: function to preparesthe http request
145
146         :param inqueue: input queue, flow details are comming from here
147
148         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
149
150         :param controllers: a list of controllers' ip addresses or hostnames
151
152         :param restport: restconf port
153
154         :param template: flow template used for creating flow content
155
156         :param outqueue: queue where the results should be put
157
158         :param method: method derermines the type of http request
159
160     Returns:
161         nothing, results must be put into the output queue
162     """
163     ses = requests.Session()
164     cntl = controllers[0]
165     counter = [0 for i in range(600)]
166     loop = True
167
168     while loop:
169         try:
170             flowlist = inqueue.get(timeout=1)
171         except Queue.Empty:
172             if exitevent.is_set() and inqueue.empty():
173                 loop = False
174             continue
175         req = preparefnc(cntl, method, flowlist, template=template)
176         # prep = ses.prepare_request(req)
177         prep = req.prepare()
178         try:
179             rsp = ses.send(prep, timeout=5)
180         except requests.exceptions.Timeout:
181             counter[99] += 1
182             continue
183         counter[rsp.status_code] += 1
184     res = {}
185     for i, v in enumerate(counter):
186         if v > 0:
187             res[i] = v
188     outqueue.put(res)
189
190
191 def get_device_ids(controller="127.0.0.1", port=8181):
192     """Returns a list of switch ids"""
193     ids = []
194     rsp = requests.get(
195         url="http://{0}:{1}/restconf/operational/opendaylight-inventory:nodes".format(
196             controller, port
197         ),
198         auth=("admin", "admin"),
199     )
200     if rsp.status_code != 200:
201         return []
202     try:
203         devices = json.loads(rsp.content)["nodes"]["node"]
204         ids = [d["id"] for d in devices]
205     except KeyError:
206         pass
207     return ids
208
209
210 def get_flow_ids(controller="127.0.0.1", port=8181):
211     """Returns a list of flow ids"""
212     ids = []
213     device_ids = get_device_ids(controller, port)
214     for device_id in device_ids:
215         rsp = requests.get(
216             url="http://{0}:{1}/restconf/operational/opendaylight-inventory:nodes/node/%s/table/0".format(
217                 controller, port
218             )
219             % device_id,
220             auth=("admin", "admin"),
221         )
222         if rsp.status_code != 200:
223             return []
224         try:
225             flows = json.loads(rsp.content)["flow-node-inventory:table"][0]["flow"]
226             for f in flows:
227                 ids.append(f["id"])
228         except KeyError:
229             pass
230     return ids
231
232
233 def main(*argv):
234
235     parser = argparse.ArgumentParser(
236         description="Flow programming performance test: First adds and then deletes flows "
237         "into the config tree, as specified by optional parameters."
238     )
239
240     parser.add_argument(
241         "--host",
242         default="127.0.0.1",
243         help="Host where ODL controller is running (default is 127.0.0.1)",
244     )
245     parser.add_argument(
246         "--port",
247         default="8181",
248         help="Port on which ODL's RESTCONF is listening (default is 8181)",
249     )
250     parser.add_argument(
251         "--threads",
252         type=int,
253         default=1,
254         help="Number of request worker threads to start in each cycle; default=1. "
255         "Each thread will add/delete <FLOWS> flows.",
256     )
257     parser.add_argument(
258         "--flows",
259         type=int,
260         default=10,
261         help="Number of flows that will be added/deleted in total, default 10",
262     )
263     parser.add_argument(
264         "--fpr", type=int, default=1, help="Number of flows per REST request, default 1"
265     )
266     parser.add_argument(
267         "--timeout",
268         type=int,
269         default=100,
270         help="The maximum time (seconds) to wait between the add and delete cycles; default=100",
271     )
272     parser.add_argument(
273         "--no-delete",
274         dest="no_delete",
275         action="store_true",
276         default=False,
277         help="Delete all added flows one by one, benchmark delete " "performance.",
278     )
279     parser.add_argument(
280         "--bulk-delete",
281         dest="bulk_delete",
282         action="store_true",
283         default=False,
284         help="Delete all flows in bulk; default=False",
285     )
286     parser.add_argument(
287         "--outfile",
288         default="",
289         help='Stores add and delete flow rest api rate; default=""',
290     )
291
292     in_args = parser.parse_args(*argv)
293     print(in_args)
294
295     # get device ids
296     base_dev_ids = get_device_ids(controller=in_args.host)
297     base_flow_ids = get_flow_ids(controller=in_args.host)
298     # ip
299     ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
300     # prepare func
301     preparefnc = _prepare_post
302
303     base_num_flows = len(base_flow_ids)
304
305     print("BASELINE:")
306     print("    devices:", len(base_dev_ids))
307     print("    flows  :", base_num_flows)
308
309     # lets fill the queue for workers
310     nflows = 0
311     flow_list = []
312     flow_details = []
313     sendqueue = Queue.Queue()
314     dev_id = random.choice(base_dev_ids)
315     for i in range(in_args.flows):
316         dst_ip = ip_addr.increment()
317         flow_list.append((dev_id, dst_ip))
318         flow_details.append((dev_id, dst_ip))
319         nflows += 1
320         if nflows == in_args.fpr:
321             sendqueue.put(flow_list)
322             nflows = 0
323             flow_list = []
324             dev_id = random.choice(base_dev_ids)
325
326     # result_gueue
327     resultqueue = Queue.Queue()
328     # creaet exit event
329     exitevent = threading.Event()
330
331     # run workers
332     with Timer() as tmr:
333         threads = []
334         for i in range(int(in_args.threads)):
335             thr = threading.Thread(
336                 target=_wt_request_sender,
337                 args=(i, preparefnc),
338                 kwargs={
339                     "inqueue": sendqueue,
340                     "exitevent": exitevent,
341                     "controllers": [in_args.host],
342                     "restport": in_args.port,
343                     "template": flow_template,
344                     "outqueue": resultqueue,
345                     "method": "POST",
346                 },
347             )
348             threads.append(thr)
349             thr.start()
350
351         exitevent.set()
352
353         result = {}
354         # waitng for reqults and sum them up
355         for t in threads:
356             t.join()
357             # reading partial resutls from sender thread
358             part_result = resultqueue.get()
359             for k, v in part_result.iteritems():
360                 if k not in result:
361                     result[k] = v
362                 else:
363                     result[k] += v
364
365     print("Added", in_args.flows, "flows in", tmr.secs, "seconds", result)
366     add_details = {"duration": tmr.secs, "flows": len(flow_details)}
367
368     # lets print some stats
369     print("\n\nStats monitoring ...")
370     rounds = 200
371     with Timer() as t:
372         for i in range(rounds):
373             reported_flows = len(get_flow_ids(controller=in_args.host))
374             expected_flows = base_num_flows + in_args.flows
375             print("Reported Flows: %d/%d" % ((reported_flows, expected_flows)))
376             if reported_flows >= expected_flows:
377                 break
378             time.sleep(1)
379
380     if i < rounds:
381         print("... monitoring finished in +%d seconds\n\n" % (t.secs))
382     else:
383         print(
384             "... monitoring aborted after %d rounds, elapsed time %d\n\n"
385             % ((rounds, t.secs))
386         )
387
388     if in_args.no_delete:
389         return
390
391     # sleep in between
392     time.sleep(in_args.timeout)
393
394     print("Flows to be removed: %d" % (len(flow_details)))
395     # lets fill the queue for workers
396     sendqueue = Queue.Queue()
397     for fld in flow_details:
398         sendqueue.put([fld])
399
400     # result_gueue
401     resultqueue = Queue.Queue()
402     # creaet exit event
403     exitevent = threading.Event()
404
405     # run workers
406     preparefnc = _prepare_delete
407     with Timer() as tmr:
408         if in_args.bulk_delete:
409             url = "http://" + in_args.host + ":" + "8181"
410             url += "/restconf/config/opendaylight-inventory:nodes"
411             rsp = requests.delete(
412                 url,
413                 headers={"Content-Type": "application/json"},
414                 auth=("admin", "admin"),
415             )
416             result = {rsp.status_code: 1}
417         else:
418             threads = []
419             for i in range(int(in_args.threads)):
420                 thr = threading.Thread(
421                     target=_wt_request_sender,
422                     args=(i, preparefnc),
423                     kwargs={
424                         "inqueue": sendqueue,
425                         "exitevent": exitevent,
426                         "controllers": [in_args.host],
427                         "restport": in_args.port,
428                         "template": None,
429                         "outqueue": resultqueue,
430                         "method": "DELETE",
431                     },
432                 )
433                 threads.append(thr)
434                 thr.start()
435
436             exitevent.set()
437
438             result = {}
439             # waitng for reqults and sum them up
440             for t in threads:
441                 t.join()
442                 # reading partial resutls from sender thread
443                 part_result = resultqueue.get()
444                 for k, v in part_result.iteritems():
445                     if k not in result:
446                         result[k] = v
447                     else:
448                         result[k] += v
449
450     print("Removed", len(flow_details), "flows in", tmr.secs, "seconds", result)
451     del_details = {"duration": tmr.secs, "flows": len(flow_details)}
452
453     print("\n\nStats monitoring ...")
454     rounds = 200
455     with Timer() as t:
456         for i in range(rounds):
457             reported_flows = len(get_flow_ids(controller=in_args.host))
458             expected_flows = base_num_flows
459             print("Reported Flows: %d/%d" % ((reported_flows, expected_flows)))
460             if reported_flows <= expected_flows:
461                 break
462             time.sleep(1)
463
464     if i < rounds:
465         print("... monitoring finished in +%d seconds\n\n" % (t.secs))
466     else:
467         print(
468             "... monitoring aborted after %d rounds, elapsed time %d\n\n"
469             % ((rounds, t.secs))
470         )
471
472     if in_args.outfile != "":
473         addrate = add_details["flows"] / add_details["duration"]
474         delrate = del_details["flows"] / del_details["duration"]
475         print("addrate", addrate)
476         print("delrate", delrate)
477
478         with open(in_args.outfile, "wt") as fd:
479             fd.write("AddRate,DeleteRate\n")
480             fd.write("{0},{1}\n".format(addrate, delrate))
481
482
483 if __name__ == "__main__":
484     main(sys.argv[1:])