fixing pep8 problems for test verify tox job
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / onos_tester.py
1 import requests
2 import json
3 import argparse
4 import sys
5 import netaddr
6 import threading
7 import Queue
8 import random
9 import copy
10 import time
11
12
13 flow_template = {
14     "appId": 10,
15     "priority": 40000,
16     "timeout": 0,
17     "isPermanent": True,
18     "deviceId": "of:0000000000000001",
19     "treatment": {
20         "instructions": [
21             {
22                 "type": "NOACTION"
23             }
24         ],
25         "deferred": []
26     },
27     "selector": {
28         "criteria": [
29             {
30                 "type": "ETH_TYPE",
31                 "ethType": 2048
32             },
33             {
34                 "type": "IPV4_DST",
35                 "ip": "10.0.0.0/32"
36             }
37         ]
38     }
39 }
40
41 flow_delete_template = {
42     "deviceId": "of:0000000000000001",
43     "flowId": 21392098393151996
44 }
45
46
47 class Timer(object):
48     def __init__(self, verbose=False):
49         self.verbose = verbose
50
51     def __enter__(self):
52         self.start = time.time()
53         return self
54
55     def __exit__(self, *args):
56         self.end = time.time()
57         self.secs = self.end - self.start
58         self.msecs = self.secs * 1000  # millisecs
59         if self.verbose:
60             print("elapsed time: %f ms" % self.msecs)
61
62
63 class Counter(object):
64     def __init__(self, start=0):
65         self.lock = threading.Lock()
66         self.value = start
67
68     def increment(self, value=1):
69         self.lock.acquire()
70         val = self.value
71         try:
72             self.value += value
73         finally:
74             self.lock.release()
75         return val
76
77
78 def _prepare_post(cntl, method, flows, template=None):
79     """Creates a POST http requests to configure a flow in configuration datastore.
80
81     Args:
82         :param cntl: controller's ip address or hostname
83
84         :param method: determines http request method
85
86         :param flows: list of flow details
87
88         :param template: flow template to be to be filled
89
90     Returns:
91         :returns req: http request object
92     """
93     flow_list = []
94     for dev_id, ip in (flows):
95         flow = copy.deepcopy(template)
96         flow["deviceId"] = dev_id
97         flow["selector"]["criteria"][1]["ip"] = '%s/32' % str(netaddr.IPAddress(ip))
98         flow_list.append(flow)
99     body = {"flows": flow_list}
100     url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
101     req_data = json.dumps(body)
102     req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
103                            data=req_data, auth=('onos', 'rocks'))
104     return req
105
106
107 def _prepare_delete(cntl, method, flows, template=None):
108     """Creates a DELETE http requests to configure a flow in configuration datastore.
109
110     Args:
111         :param cntl: controller's ip address or hostname
112
113         :param method: determines http request method
114
115         :param flows: list of flow details
116
117         :param template: flow template to be to be filled
118
119     Returns:
120         :returns req: http request object
121     """
122     flow_list = []
123     for dev_id, flow_id in (flows):
124         flow = copy.deepcopy(template)
125         flow["deviceId"] = dev_id
126         flow["flowId"] = flow_id
127         flow_list.append(flow)
128     body = {"flows": flow_list}
129     url = 'http://' + cntl + ':' + '8181/onos/v1/flows/'
130     req_data = json.dumps(body)
131     req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
132                            data=req_data, auth=('onos', 'rocks'))
133     return req
134
135
136 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
137                        template=None, outqueue=None, method=None):
138     """The funcion sends http requests.
139
140     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
141     to the controller
142
143     Args:
144         :param thread_id: thread id
145
146         :param preparefnc: function to preparesthe http request
147
148         :param inqueue: input queue, flow details are comming from here
149
150         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
151
152         :param controllers: a list of controllers' ip addresses or hostnames
153
154         :param restport: restconf port
155
156         :param template: flow template used for creating flow content
157
158         :param outqueue: queue where the results should be put
159
160         :param method: method derermines the type of http request
161
162     Returns:
163         nothing, results must be put into the output queue
164     """
165     ses = requests.Session()
166     cntl = controllers[0]
167     counter = [0 for i in range(600)]
168     loop = True
169
170     while loop:
171         try:
172             flowlist = inqueue.get(timeout=1)
173         except Queue.Empty:
174             if exitevent.is_set() and inqueue.empty():
175                 loop = False
176             continue
177         req = preparefnc(cntl, method, flowlist, template=template)
178         # prep = ses.prepare_request(req)
179         prep = req.prepare()
180         try:
181             rsp = ses.send(prep, timeout=5)
182         except requests.exceptions.Timeout:
183             counter[99] += 1
184             continue
185         counter[rsp.status_code] += 1
186     res = {}
187     for i, v in enumerate(counter):
188         if v > 0:
189             res[i] = v
190     outqueue.put(res)
191
192
193 def get_device_ids(controller='127.0.0.1', port=8181):
194     """Returns a list of switch ids"""
195     rsp = requests.get(url='http://{0}:{1}/onos/v1/devices'.format(controller, port), auth=('onos', 'rocks'))
196     if rsp.status_code != 200:
197         return []
198     devices = json.loads(rsp.content)['devices']
199     ids = [d['id'] for d in devices if 'of:' in d['id']]
200     return ids
201
202
203 def get_flow_ids(controller='127.0.0.1', port=8181):
204     """Returns a list of flow ids"""
205     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
206     if rsp.status_code != 200:
207         return []
208     flows = json.loads(rsp.content)['flows']
209     ids = [f['id'] for f in flows]
210     return ids
211
212
213 def get_flow_simple_stats(controller='127.0.0.1', port=8181):
214     """Returns a list of flow ids"""
215     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
216     if rsp.status_code != 200:
217         return []
218     flows = json.loads(rsp.content)['flows']
219     res = {}
220     for f in flows:
221         if f['state'] not in res:
222             res[f['state']] = 1
223         else:
224             res[f['state']] += 1
225     return res
226
227
228 def get_flow_device_pairs(controller='127.0.0.1', port=8181, flow_details=[]):
229     """Pairing flows from controller with deteils we used ofr creation"""
230     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
231     if rsp.status_code != 200:
232         return
233     flows = json.loads(rsp.content)['flows']
234     # print "Flows", flows
235     # print "Details", flow_details
236     for dev_id, ip in flow_details:
237         # print "looking for details", dev_id, ip
238         for f in flows:
239             # lets identify if it is our flow
240             if f["treatment"]["instructions"][0]["type"] != "DROP":
241                 # print "NOT DROP"
242                 continue
243             if f["deviceId"] == dev_id:
244                 if "ip" in f["selector"]["criteria"][0]:
245                     item_idx = 0
246                 elif "ip" in f["selector"]["criteria"][1]:
247                     item_idx = 1
248                 else:
249                     continue
250                 # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip))
251                 if f["selector"]["criteria"][item_idx]["ip"] == '%s/32' % str(netaddr.IPAddress(ip)):
252                     # print dev_id, ip, f
253                     yield dev_id, f["id"]
254                     break
255
256
257 def get_flow_to_remove(controller='127.0.0.1', port=8181):
258     """Pairing flows from controller with deteils we used ofr creation"""
259     rsp = requests.get(url='http://{0}:{1}/onos/v1/flows'.format(controller, port), auth=('onos', 'rocks'))
260     if rsp.status_code != 200:
261         return
262     flows = json.loads(rsp.content)['flows']
263     # print "Flows", flows
264     # print "Details", flow_details
265
266     for f in flows:
267         # lets identify if it is our flow
268         if f["treatment"]["instructions"][0]["type"] != "NOACTION":
269             # print "NOT DROP"
270             continue
271         if "ip" in f["selector"]["criteria"][0]:
272             item_idx = 0
273         elif "ip" in f["selector"]["criteria"][1]:
274             item_idx = 1
275         else:
276             continue
277             # print "Comparing", '%s/32' % str(netaddr.IPAddress(ip))
278         ipstr = f["selector"]["criteria"][item_idx]["ip"]
279         if '10.' in ipstr and '/32' in ipstr:
280             # print dev_id, ip, f
281             yield (f["deviceId"], f["id"])
282
283
284 def main(*argv):
285
286     parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
287                                                  'into the config tree, as specified by optional parameters.')
288
289     parser.add_argument('--host', default='127.0.0.1',
290                         help='Host where onos controller is running (default is 127.0.0.1)')
291     parser.add_argument('--port', default='8181',
292                         help='Port on which onos\'s RESTCONF is listening (default is 8181)')
293     parser.add_argument('--threads', type=int, default=1,
294                         help='Number of request worker threads to start in each cycle; default=1. '
295                              'Each thread will add/delete <FLOWS> flows.')
296     parser.add_argument('--flows', type=int, default=10,
297                         help='Number of flows that will be added/deleted in total, default 10')
298     parser.add_argument('--fpr', type=int, default=1,
299                         help='Number of flows per REST request, default 1')
300     parser.add_argument('--timeout', type=int, default=100,
301                         help='The maximum time (seconds) to wait between the add and delete cycles; default=100')
302     parser.add_argument('--no-delete', dest='no_delete', action='store_true', default=False,
303                         help='Delete all added flows one by one, benchmark delete '
304                              'performance.')
305     parser.add_argument('--bulk-delete', dest='bulk_delete', action='store_true', default=False,
306                         help='Delete all flows in bulk; default=False')
307     parser.add_argument('--outfile', default='', help='Stores add and delete flow rest api rate; default=""')
308
309     in_args = parser.parse_args(*argv)
310     print in_args
311
312     # get device ids
313     base_dev_ids = get_device_ids(controller=in_args.host)
314     base_flow_ids = get_flow_ids(controller=in_args.host)
315     # ip
316     ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
317     # prepare func
318     preparefnc = _prepare_post
319
320     base_num_flows = len(base_flow_ids)
321
322     print "BASELINE:"
323     print "    devices:", len(base_dev_ids)
324     print "    flows  :", base_num_flows
325
326     # lets fill the queue for workers
327     nflows = 0
328     flow_list = []
329     flow_details = []
330     sendqueue = Queue.Queue()
331     for i in range(in_args.flows):
332         dev_id = random.choice(base_dev_ids)
333         dst_ip = ip_addr.increment()
334         flow_list.append((dev_id, dst_ip))
335         flow_details.append((dev_id, dst_ip))
336         nflows += 1
337         if nflows == in_args.fpr:
338             sendqueue.put(flow_list)
339             nflows = 0
340             flow_list = []
341
342     # result_gueue
343     resultqueue = Queue.Queue()
344     # creaet exit event
345     exitevent = threading.Event()
346
347     # run workers
348     with Timer() as tmr:
349         threads = []
350         for i in range(int(in_args.threads)):
351             thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
352                                    kwargs={"inqueue": sendqueue, "exitevent": exitevent,
353                                            "controllers": [in_args.host], "restport": in_args.port,
354                                            "template": flow_template, "outqueue": resultqueue, "method": "POST"})
355             threads.append(thr)
356             thr.start()
357
358         exitevent.set()
359
360         result = {}
361         # waitng for reqults and sum them up
362         for t in threads:
363             t.join()
364             # reading partial resutls from sender thread
365             part_result = resultqueue.get()
366             for k, v in part_result.iteritems():
367                 if k not in result:
368                     result[k] = v
369                 else:
370                     result[k] += v
371
372     print "Added", in_args.flows, "flows in", tmr.secs, "seconds", result
373     add_details = {"duration": tmr.secs, "flows": len(flow_details)}
374
375     # lets print some stats
376     print "\n\nStats monitoring ..."
377     rounds = 200
378     with Timer() as t:
379         for i in range(rounds):
380             flow_stats = get_flow_simple_stats(controller=in_args.host)
381             print flow_stats
382             try:
383                 pending_adds = int(flow_stats[u'PENDING_ADD'])  # noqa  # FIXME: Print this somewhere.
384             except KeyError:
385                 break
386             time.sleep(1)
387
388     if i < rounds:
389         print "... monitoring finished in +%d seconds\n\n" % t.secs
390     else:
391         print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs)
392
393     if in_args.no_delete:
394         return
395
396     # sleep in between
397     time.sleep(in_args.timeout)
398
399     # lets delete flows, need to get ids to be deleted, becasue we dont have flow_id on config time
400     # we have to pair flows on out own
401     flows_remove_details = []
402     # for a in get_flow_device_pairs(controller=in_args.host, flow_details=flow_details):
403     for a in get_flow_to_remove(controller=in_args.host):
404         flows_remove_details.append(a)
405     print "Flows to be removed: ", len(flows_remove_details)
406
407     # lets fill the queue for workers
408     nflows = 0
409     flow_list = []
410     sendqueue = Queue.Queue()
411     for fld in flows_remove_details:
412         flow_list.append(fld)
413         nflows += 1
414         if nflows == in_args.fpr:
415             sendqueue.put(flow_list)
416             nflows = 0
417             flow_list = []
418
419     # result_gueue
420     resultqueue = Queue.Queue()
421     # creaet exit event
422     exitevent = threading.Event()
423
424     # run workers
425     preparefnc = _prepare_delete
426     with Timer() as tmr:
427         threads = []
428         for i in range(int(in_args.threads)):
429             thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
430                                    kwargs={"inqueue": sendqueue, "exitevent": exitevent,
431                                            "controllers": [in_args.host], "restport": in_args.port,
432                                            "template": flow_delete_template, "outqueue": resultqueue,
433                                            "method": "DELETE"})
434             threads.append(thr)
435             thr.start()
436
437         exitevent.set()
438
439         result = {}
440         # waitng for reqults and sum them up
441         for t in threads:
442             t.join()
443             # reading partial resutls from sender thread
444             part_result = resultqueue.get()
445             for k, v in part_result.iteritems():
446                 if k not in result:
447                     result[k] = v
448                 else:
449                     result[k] += v
450
451     print "Removed", len(flows_remove_details), "flows in", tmr.secs, "seconds", result
452     del_details = {"duration": tmr.secs, "flows": len(flows_remove_details)}
453
454 #    # lets print some stats
455 #    print "\n\nSome stats monitoring ...."
456 #    for i in range(100):
457 #        print get_flow_simple_stats(controller=in_args.host)
458 #        time.sleep(5)
459 #    print "... monitoring finished\n\n"
460     # lets print some stats
461     print "\n\nStats monitoring ..."
462     rounds = 200
463     with Timer() as t:
464         for i in range(rounds):
465             flow_stats = get_flow_simple_stats(controller=in_args.host)
466             print flow_stats
467             try:
468                 pending_rems = int(flow_stats[u'PENDING_REMOVE'])  # noqa  # FIXME: Print this somewhere.
469             except KeyError:
470                 break
471             time.sleep(1)
472
473     if i < rounds:
474         print "... monitoring finished in +%d seconds\n\n" % t.secs
475     else:
476         print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs)
477
478     if in_args.outfile != "":
479         addrate = add_details['flows'] / add_details['duration']
480         delrate = del_details['flows'] / del_details['duration']
481         print "addrate", addrate
482         print "delrate", delrate
483
484         with open(in_args.outfile, "wt") as fd:
485             fd.write("AddRate,DeleteRate\n")
486             fd.write("{0},{1}\n".format(addrate, delrate))
487
488
489 if __name__ == "__main__":
490     main(sys.argv[1:])