Resolve E722 do not use bare except
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / onos_tester.py
1 import requests
2 import json
3 import argparse
4 import sys
5 import netaddr
6 import threading
7 import Queue
8 import random
9 import copy
10 import time
11
12
13 flow_template = {
14     "appId": 10,
15     "priority": 40000,
16     "timeout": 0,
17     "isPermanent": True,
18     "deviceId": "of:0000000000000001",
19     "treatment": {
20         "instructions": [
21             {
22                 "type": "NOACTION"
23             }
24         ],
25         "deferred": []
26     },
27     "selector": {
28         "criteria": [
29             {
30                 "type": "ETH_TYPE",
31                 "ethType": 2048
32             },
33             {
34                 "type": "IPV4_DST",
35                 "ip": "10.0.0.0/32"
36             }
37         ]
38     }
39 }
40
41 flow_delete_template = {
42     "deviceId": "of:0000000000000001",
43     "flowId": 21392098393151996
44 }
45
46
47 class Timer(object):
48     def __init__(self, verbose=False):
49         self.verbose = verbose
50
51     def __enter__(self):
52         self.start = time.time()
53         return self
54
55     def __exit__(self, *args):
56         self.end = time.time()
57         self.secs = self.end - self.start
58         self.msecs = self.secs * 1000  # millisecs
59         if self.verbose:
60             print("elapsed time: %f ms" % self.msecs)
61
62
63 class Counter(object):
64     def __init__(self, start=0):
65         self.lock = threading.Lock()
66         self.value = start
67
68     def increment(self, value=1):
69         self.lock.acquire()
70         val = self.value
71         try:
72             self.value += value
73         finally:
74             self.lock.release()
75         return val
76
77
78 def _prepare_post(cntl, method, flows, template=None):
79     """Creates a POST http requests to configure a flow in configuration datastore.
80
81     Args:
82         :param cntl: controller's ip address or hostname
83
84         :param method: determines http request method
85
86         :param flows: list of flow details
87
88         :param template: flow template to be to be filled
89
90     Returns:
91         :returns req: http request object
92     """
93     flow_list = []
94     for dev_id, ip in (flows):
95         flow = copy.deepcopy(template)
96         flow["deviceId"] = dev_id
97         flow["selector"]["criteria"][1]["ip"] = '%s/32' % str(netaddr.IPAddress(ip))
98         flow_list.append(flow)
99     body = {"flows": flow_list}
100     url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
101     req_data = json.dumps(body)
102     req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
103                            data=req_data, auth=('onos', 'rocks'))
104     return req
105
106
107 def _prepare_delete(cntl, method, flows, template=None):
108     """Creates a DELETE http requests to configure a flow in configuration datastore.
109
110     Args:
111         :param cntl: controller's ip address or hostname
112
113         :param method: determines http request method
114
115         :param flows: list of flow details
116
117         :param template: flow template to be to be filled
118
119     Returns:
120         :returns req: http request object
121     """
122     flow_list = []
123     for dev_id, flow_id in (flows):
124         flow = copy.deepcopy(template)
125         flow["deviceId"] = dev_id
126         flow["flowId"] = flow_id
127         flow_list.append(flow)
128     body = {"flows": flow_list}
129     url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
130     req_data = json.dumps(body)
131     req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
132                            data=req_data, auth=('onos', 'rocks'))
133     return req
134
135
136 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
137                        template=None, outqueue=None, method=None):
138     """The funcion sends http requests.
139
140     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
141     to the controller
142
143     Args:
144         :param thread_id: thread id
145
146         :param preparefnc: function to preparesthe http request
147
148         :param inqueue: input queue, flow details are comming from here
149
150         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
151
152         :param controllers: a list of controllers' ip addresses or hostnames
153
154         :param restport: restconf port
155
156         :param template: flow template used for creating flow content
157
158         :param outqueue: queue where the results should be put
159
160         :param method: method derermines the type of http request
161
162     Returns:
163         nothing, results must be put into the output queue
164     """
165     ses = requests.Session()
166     cntl = controllers[0]
167     counter = [0 for i in range(600)]
168     loop = True
169
170     while loop:
171         try:
172             flowlist = inqueue.get(timeout=1)
173         except Queue.Empty:
174             if exitevent.is_set() and inqueue.empty():
175                 loop = False
176             continue
177         req = preparefnc(cntl, method, flowlist, template=template)
178         # prep = ses.prepare_request(req)
179         prep = req.prepare()
180         try:
181             rsp = ses.send(prep, timeout=5)
182         except requests.exceptions.Timeout:
183             counter[99] += 1
184             continue
185         counter[rsp.status_code] += 1
186     res = {}
187     for i, v in enumerate(counter):
188         if v > 0:
189             res[i] = v
190     outqueue.put(res)
191
192
193 def get_device_ids(controller='127.0.0.1', port=8181):
194     """Returns a list of switch ids"""
195     rsp = requests.get(url='http://{0}:{1}/onos/v1/devices'.format(controller, port), auth=('onos', 'rocks'))
196     if rsp.status_code != 200:
197         return []
198     devices = json.loads(rsp.content)['devices']
199     ids = [d['id'] for d in devices if 'of:' in d['id']]
200     return ids
201
202
203 def get_flow_ids(controller='127.0.0.1', port=8181):
204     """Returns a list of flow ids"""
205     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
206     if rsp.status_code != 200:
207         return []
208     flows = json.loads(rsp.content)['flows']
209     ids = [f['id'] for f in flows]
210     return ids
211
212
213 def get_flow_simple_stats(controller='127.0.0.1', port=8181):
214     """Returns a list of flow ids"""
215     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
216     if rsp.status_code != 200:
217         return []
218     flows = json.loads(rsp.content)['flows']
219     res = {}
220     for f in flows:
221         if f['state'] not in res:
222             res[f['state']] = 1
223         else:
224             res[f['state']] += 1
225     return res
226
227
228 def get_flow_device_pairs(controller='127.0.0.1', port=8181, flow_details=[]):
229     """Pairing flows from controller with deteils we used ofr creation"""
230     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
231     if rsp.status_code != 200:
232         return
233     flows = json.loads(rsp.content)['flows']
234     for dev_id, ip in flow_details:
235         for f in flows:
236             # lets identify if it is our flow
237             if f["treatment"]["instructions"][0]["type"] != "DROP":
238                 continue
239             if f["deviceId"] == dev_id:
240                 if "ip" in f["selector"]["criteria"][0]:
241                     item_idx = 0
242                 elif "ip" in f["selector"]["criteria"][1]:
243                     item_idx = 1
244                 else:
245                     continue
246                 if f["selector"]["criteria"][item_idx]["ip"] == '%s/32' % str(netaddr.IPAddress(ip)):
247                     yield dev_id, f["id"]
248                     break
249
250
251 def get_flow_to_remove(controller='127.0.0.1', port=8181):
252     """Pairing flows from controller with deteils we used ofr creation"""
253     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
254     if rsp.status_code != 200:
255         return
256     flows = json.loads(rsp.content)['flows']
257
258     for f in flows:
259         # lets identify if it is our flow
260         if f["treatment"]["instructions"][0]["type"] != "NOACTION":
261             continue
262         if "ip" in f["selector"]["criteria"][0]:
263             item_idx = 0
264         elif "ip" in f["selector"]["criteria"][1]:
265             item_idx = 1
266         else:
267             continue
268         ipstr = f["selector"]["criteria"][item_idx]["ip"]
269         if '10.' in ipstr and '/32' in ipstr:
270             yield (f["deviceId"], f["id"])
271
272
273 def main(*argv):
274
275     parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
276                                                  'into the config tree, as specified by optional parameters.')
277
278     parser.add_argument('--host', default='127.0.0.1',
279                         help='Host where onos controller is running (default is 127.0.0.1)')
280     parser.add_argument('--port', default='8181',
281                         help='Port on which onos\'s RESTCONF is listening (default is 8181)')
282     parser.add_argument('--threads', type=int, default=1,
283                         help='Number of request worker threads to start in each cycle; default=1. '
284                              'Each thread will add/delete <FLOWS> flows.')
285     parser.add_argument('--flows', type=int, default=10,
286                         help='Number of flows that will be added/deleted in total, default 10')
287     parser.add_argument('--fpr', type=int, default=1,
288                         help='Number of flows per REST request, default 1')
289     parser.add_argument('--timeout', type=int, default=100,
290                         help='The maximum time (seconds) to wait between the add and delete cycles; default=100')
291     parser.add_argument('--no-delete', dest='no_delete', action='store_true', default=False,
292                         help='Delete all added flows one by one, benchmark delete '
293                              'performance.')
294     parser.add_argument('--bulk-delete', dest='bulk_delete', action='store_true', default=False,
295                         help='Delete all flows in bulk; default=False')
296     parser.add_argument('--outfile', default='', help='Stores add and delete flow rest api rate; default=""')
297
298     in_args = parser.parse_args(*argv)
299     print(in_args)
300
301     # get device ids
302     base_dev_ids = get_device_ids(controller=in_args.host)
303     base_flow_ids = get_flow_ids(controller=in_args.host)
304     # ip
305     ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
306     # prepare func
307     preparefnc = _prepare_post
308
309     base_num_flows = len(base_flow_ids)
310
311     print("BASELINE:")
312     print("    devices:", len(base_dev_ids))
313     print("    flows  :", base_num_flows)
314
315     # lets fill the queue for workers
316     nflows = 0
317     flow_list = []
318     flow_details = []
319     sendqueue = Queue.Queue()
320     for i in range(in_args.flows):
321         dev_id = random.choice(base_dev_ids)
322         dst_ip = ip_addr.increment()
323         flow_list.append((dev_id, dst_ip))
324         flow_details.append((dev_id, dst_ip))
325         nflows += 1
326         if nflows == in_args.fpr:
327             sendqueue.put(flow_list)
328             nflows = 0
329             flow_list = []
330
331     # result_gueue
332     resultqueue = Queue.Queue()
333     # creaet exit event
334     exitevent = threading.Event()
335
336     # run workers
337     with Timer() as tmr:
338         threads = []
339         for i in range(int(in_args.threads)):
340             thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
341                                    kwargs={"inqueue": sendqueue, "exitevent": exitevent,
342                                            "controllers": [in_args.host], "restport": in_args.port,
343                                            "template": flow_template, "outqueue": resultqueue, "method": "POST"})
344             threads.append(thr)
345             thr.start()
346
347         exitevent.set()
348
349         result = {}
350         # waitng for reqults and sum them up
351         for t in threads:
352             t.join()
353             # reading partial resutls from sender thread
354             part_result = resultqueue.get()
355             for k, v in part_result.iteritems():
356                 if k not in result:
357                     result[k] = v
358                 else:
359                     result[k] += v
360
361     print("Added", in_args.flows, "flows in", tmr.secs, "seconds", result)
362     add_details = {"duration": tmr.secs, "flows": len(flow_details)}
363
364     # lets print some stats
365     print("\n\nStats monitoring ...")
366     rounds = 200
367     with Timer() as t:
368         for i in range(rounds):
369             flow_stats = get_flow_simple_stats(controller=in_args.host)
370             print(flow_stats)
371             try:
372                 pending_adds = int(flow_stats[u'PENDING_ADD'])  # noqa  # FIXME: Print this somewhere.
373             except KeyError:
374                 break
375             time.sleep(1)
376
377     if i < rounds:
378         print("... monitoring finished in +%d seconds\n\n" % (t.secs))
379     else:
380         print("... monitoring aborted after %d rounds, elapsed time %d\n\n" % ((rounds, t.secs)))
381
382     if in_args.no_delete:
383         return
384
385     # sleep in between
386     time.sleep(in_args.timeout)
387
388     # lets delete flows, need to get ids to be deleted, becasue we dont have flow_id on config time
389     # we have to pair flows on out own
390     flows_remove_details = []
391     # for a in get_flow_device_pairs(controller=in_args.host, flow_details=flow_details):
392     for a in get_flow_to_remove(controller=in_args.host):
393         flows_remove_details.append(a)
394     print("Flows to be removed: ", len(flows_remove_details))
395
396     # lets fill the queue for workers
397     nflows = 0
398     flow_list = []
399     sendqueue = Queue.Queue()
400     for fld in flows_remove_details:
401         flow_list.append(fld)
402         nflows += 1
403         if nflows == in_args.fpr:
404             sendqueue.put(flow_list)
405             nflows = 0
406             flow_list = []
407
408     # result_gueue
409     resultqueue = Queue.Queue()
410     # creaet exit event
411     exitevent = threading.Event()
412
413     # run workers
414     preparefnc = _prepare_delete
415     with Timer() as tmr:
416         threads = []
417         for i in range(int(in_args.threads)):
418             thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
419                                    kwargs={"inqueue": sendqueue, "exitevent": exitevent,
420                                            "controllers": [in_args.host], "restport": in_args.port,
421                                            "template": flow_delete_template, "outqueue": resultqueue,
422                                            "method": "DELETE"})
423             threads.append(thr)
424             thr.start()
425
426         exitevent.set()
427
428         result = {}
429         # waitng for reqults and sum them up
430         for t in threads:
431             t.join()
432             # reading partial resutls from sender thread
433             part_result = resultqueue.get()
434             for k, v in part_result.iteritems():
435                 if k not in result:
436                     result[k] = v
437                 else:
438                     result[k] += v
439
440     print("Removed", len(flows_remove_details), "flows in", tmr.secs, "seconds", result)
441     del_details = {"duration": tmr.secs, "flows": len(flows_remove_details)}
442
443     print("\n\nStats monitoring ...")
444     rounds = 200
445     with Timer() as t:
446         for i in range(rounds):
447             flow_stats = get_flow_simple_stats(controller=in_args.host)
448             print(flow_stats)
449             try:
450                 pending_rems = int(flow_stats[u'PENDING_REMOVE'])  # noqa  # FIXME: Print this somewhere.
451             except KeyError:
452                 break
453             time.sleep(1)
454
455     if i < rounds:
456         print("... monitoring finished in +%d seconds\n\n" % (t.secs))
457     else:
458         print("... monitoring aborted after %d rounds, elapsed time %d\n\n" % ((rounds, t.secs)))
459
460     if in_args.outfile != "":
461         addrate = add_details['flows'] / add_details['duration']
462         delrate = del_details['flows'] / del_details['duration']
463         print("addrate", addrate)
464         print("delrate", delrate)
465
466         with open(in_args.outfile, "wt") as fd:
467             fd.write("AddRate,DeleteRate\n")
468             fd.write("{0},{1}\n".format(addrate, delrate))
469
470
471 if __name__ == "__main__":
472     main(sys.argv[1:])