fixing pep8 problems for test verify tox job
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / odl_tester.py
1 import requests
2 import json
3 import argparse
4 import sys
5 import netaddr
6 import threading
7 import Queue
8 import random
9 import copy
10 import time
11
12
13 flow_template = {
14     "id": "2",
15     "match": {
16         "ethernet-match": {
17             "ethernet-type": {
18                 "type": 2048
19             }
20         },
21         "ipv4-destination": "10.0.20.0/24"
22     },
23     "priority": 2,
24     "table_id": 0
25 }
26 odl_node_url = '/restconf/config/opendaylight-inventory:nodes/node/'
27
28
29 class Timer(object):
30     def __init__(self, verbose=False):
31         self.verbose = verbose
32
33     def __enter__(self):
34         self.start = time.time()
35         return self
36
37     def __exit__(self, *args):
38         self.end = time.time()
39         self.secs = self.end - self.start
40         self.msecs = self.secs * 1000  # millisecs
41         if self.verbose:
42             print("elapsed time: %f ms" % self.msecs)
43
44
45 class Counter(object):
46     def __init__(self, start=0):
47         self.lock = threading.Lock()
48         self.value = start
49
50     def increment(self, value=1):
51         self.lock.acquire()
52         val = self.value
53         try:
54             self.value += value
55         finally:
56             self.lock.release()
57         return val
58
59
60 def _prepare_post(cntl, method, flows, template=None):
61     """Creates a POST http requests to configure a flow in configuration datastore.
62
63     Args:
64         :param cntl: controller's ip address or hostname
65
66         :param method: determines http request method
67
68         :param flows: list of flow details
69
70         :param template: flow template to be to be filled
71
72     Returns:
73         :returns req: http request object
74     """
75     flow_list = []
76     for dev_id, ip in (flows):
77         flow = copy.deepcopy(template)
78         flow["id"] = ip
79         flow["match"]["ipv4-destination"] = '%s/32' % str(netaddr.IPAddress(ip))
80         flow_list.append(flow)
81     body = {"flow": flow_list}
82     url = 'http://' + cntl + ':8181' + odl_node_url + dev_id + '/table/0'
83     req_data = json.dumps(body)
84     req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
85                            data=req_data, auth=('admin', 'admin'))
86     return req
87
88
89 def _prepare_delete(cntl, method, flows, template=None):
90     """Creates a DELETE http requests to configure a flow in configuration datastore.
91
92     Args:
93         :param cntl: controller's ip address or hostname
94
95         :param method: determines http request method
96
97         :param flows: list of flow details
98
99         :param template: flow template to be to be filled
100
101     Returns:
102         :returns req: http request object
103     """
104     dev_id, flow_id = flows[0]
105     url = 'http://' + cntl + ':8181' + odl_node_url + dev_id + '/table/0/flow/' + str(flow_id)
106     req = requests.Request(method, url, headers={'Content-Type': 'application/json'},
107                            data=None, auth=('admin', 'admin'))
108     return req
109
110
111 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
112                        template=None, outqueue=None, method=None):
113     """The funcion sends http requests.
114
115     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
116     to the controller
117
118     Args:
119         :param thread_id: thread id
120
121         :param preparefnc: function to preparesthe http request
122
123         :param inqueue: input queue, flow details are comming from here
124
125         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
126
127         :param controllers: a list of controllers' ip addresses or hostnames
128
129         :param restport: restconf port
130
131         :param template: flow template used for creating flow content
132
133         :param outqueue: queue where the results should be put
134
135         :param method: method derermines the type of http request
136
137     Returns:
138         nothing, results must be put into the output queue
139     """
140     ses = requests.Session()
141     cntl = controllers[0]
142     counter = [0 for i in range(600)]
143     loop = True
144
145     while loop:
146         try:
147             flowlist = inqueue.get(timeout=1)
148         except Queue.Empty:
149             if exitevent.is_set() and inqueue.empty():
150                 loop = False
151             continue
152         req = preparefnc(cntl, method, flowlist, template=template)
153         # prep = ses.prepare_request(req)
154         prep = req.prepare()
155         try:
156             rsp = ses.send(prep, timeout=5)
157         except requests.exceptions.Timeout:
158             counter[99] += 1
159             continue
160         counter[rsp.status_code] += 1
161     res = {}
162     for i, v in enumerate(counter):
163         if v > 0:
164             res[i] = v
165     outqueue.put(res)
166
167
168 def get_device_ids(controller='127.0.0.1', port=8181):
169     """Returns a list of switch ids"""
170     ids = []
171     rsp = requests.get(url='http://{0}:{1}/restconf/operational/opendaylight-inventory:nodes'
172                        .format(controller, port), auth=('admin', 'admin'))
173     if rsp.status_code != 200:
174         return []
175     try:
176         devices = json.loads(rsp.content)['nodes']['node']
177         ids = [d['id'] for d in devices]
178     except KeyError:
179         pass
180     return ids
181
182
183 def get_flow_ids(controller='127.0.0.1', port=8181):
184     """Returns a list of flow ids"""
185     ids = []
186     device_ids = get_device_ids(controller, port)
187     for device_id in device_ids:
188         rsp = requests.get(url='http://{0}:{1}/restconf/operational/opendaylight-inventory:nodes/node/%s/table/0'
189                            .format(controller, port) % device_id, auth=('admin', 'admin'))
190         if rsp.status_code != 200:
191             return []
192         try:
193             flows = json.loads(rsp.content)['flow-node-inventory:table'][0]['flow']
194             for f in flows:
195                 ids.append(f['id'])
196         except KeyError:
197             pass
198     return ids
199
200
201 def main(*argv):
202
203     parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
204                                                  'into the config tree, as specified by optional parameters.')
205
206     parser.add_argument('--host', default='127.0.0.1',
207                         help='Host where ODL controller is running (default is 127.0.0.1)')
208     parser.add_argument('--port', default='8181',
209                         help='Port on which ODL\'s RESTCONF is listening (default is 8181)')
210     parser.add_argument('--threads', type=int, default=1,
211                         help='Number of request worker threads to start in each cycle; default=1. '
212                              'Each thread will add/delete <FLOWS> flows.')
213     parser.add_argument('--flows', type=int, default=10,
214                         help='Number of flows that will be added/deleted in total, default 10')
215     parser.add_argument('--fpr', type=int, default=1,
216                         help='Number of flows per REST request, default 1')
217     parser.add_argument('--timeout', type=int, default=100,
218                         help='The maximum time (seconds) to wait between the add and delete cycles; default=100')
219     parser.add_argument('--no-delete', dest='no_delete', action='store_true', default=False,
220                         help='Delete all added flows one by one, benchmark delete '
221                              'performance.')
222     parser.add_argument('--bulk-delete', dest='bulk_delete', action='store_true', default=False,
223                         help='Delete all flows in bulk; default=False')
224     parser.add_argument('--outfile', default='', help='Stores add and delete flow rest api rate; default=""')
225
226     in_args = parser.parse_args(*argv)
227     print in_args
228
229     # get device ids
230     base_dev_ids = get_device_ids(controller=in_args.host)
231     base_flow_ids = get_flow_ids(controller=in_args.host)
232     # ip
233     ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
234     # prepare func
235     preparefnc = _prepare_post
236
237     base_num_flows = len(base_flow_ids)
238
239     print "BASELINE:"
240     print "    devices:", len(base_dev_ids)
241     print "    flows  :", base_num_flows
242
243     # lets fill the queue for workers
244     nflows = 0
245     flow_list = []
246     flow_details = []
247     sendqueue = Queue.Queue()
248     dev_id = random.choice(base_dev_ids)
249     for i in range(in_args.flows):
250         dst_ip = ip_addr.increment()
251         flow_list.append((dev_id, dst_ip))
252         flow_details.append((dev_id, dst_ip))
253         nflows += 1
254         if nflows == in_args.fpr:
255             sendqueue.put(flow_list)
256             nflows = 0
257             flow_list = []
258             dev_id = random.choice(base_dev_ids)
259
260     # result_gueue
261     resultqueue = Queue.Queue()
262     # creaet exit event
263     exitevent = threading.Event()
264
265     # run workers
266     with Timer() as tmr:
267         threads = []
268         for i in range(int(in_args.threads)):
269             thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
270                                    kwargs={"inqueue": sendqueue, "exitevent": exitevent,
271                                            "controllers": [in_args.host], "restport": in_args.port,
272                                            "template": flow_template, "outqueue": resultqueue, "method": "POST"})
273             threads.append(thr)
274             thr.start()
275
276         exitevent.set()
277
278         result = {}
279         # waitng for reqults and sum them up
280         for t in threads:
281             t.join()
282             # reading partial resutls from sender thread
283             part_result = resultqueue.get()
284             for k, v in part_result.iteritems():
285                 if k not in result:
286                     result[k] = v
287                 else:
288                     result[k] += v
289
290     print "Added", in_args.flows, "flows in", tmr.secs, "seconds", result
291     add_details = {"duration": tmr.secs, "flows": len(flow_details)}
292
293     # lets print some stats
294     print "\n\nStats monitoring ..."
295     rounds = 200
296     with Timer() as t:
297         for i in range(rounds):
298             reported_flows = len(get_flow_ids(controller=in_args.host))
299             expected_flows = base_num_flows + in_args.flows
300             print "Reported Flows: %d/%d" % (reported_flows, expected_flows)
301             if reported_flows >= expected_flows:
302                 break
303             time.sleep(1)
304
305     if i < rounds:
306         print "... monitoring finished in +%d seconds\n\n" % t.secs
307     else:
308         print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs)
309
310     if in_args.no_delete:
311         return
312
313     # sleep in between
314     time.sleep(in_args.timeout)
315
316     print "Flows to be removed: %d" % len(flow_details)
317     # lets fill the queue for workers
318     sendqueue = Queue.Queue()
319     for fld in flow_details:
320         sendqueue.put([fld])
321
322     # result_gueue
323     resultqueue = Queue.Queue()
324     # creaet exit event
325     exitevent = threading.Event()
326
327     # run workers
328     preparefnc = _prepare_delete
329     with Timer() as tmr:
330         if in_args.bulk_delete:
331             url = 'http://' + in_args.host + ':' + '8181'
332             url += '/restconf/config/opendaylight-inventory:nodes'
333             rsp = requests.delete(url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
334             result = {rsp.status_code: 1}
335         else:
336             threads = []
337             for i in range(int(in_args.threads)):
338                 thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
339                                        kwargs={"inqueue": sendqueue, "exitevent": exitevent,
340                                                "controllers": [in_args.host], "restport": in_args.port,
341                                                "template": None, "outqueue": resultqueue, "method": "DELETE"})
342                 threads.append(thr)
343                 thr.start()
344
345             exitevent.set()
346
347             result = {}
348             # waitng for reqults and sum them up
349             for t in threads:
350                 t.join()
351                 # reading partial resutls from sender thread
352                 part_result = resultqueue.get()
353                 for k, v in part_result.iteritems():
354                     if k not in result:
355                         result[k] = v
356                     else:
357                         result[k] += v
358
359     print "Removed", len(flow_details), "flows in", tmr.secs, "seconds", result
360     del_details = {"duration": tmr.secs, "flows": len(flow_details)}
361
362 #    # lets print some stats
363 #    print "\n\nSome stats monitoring ...."
364 #    for i in range(100):
365 #        print get_flow_simple_stats(controller=in_args.host)
366 #        time.sleep(5)
367 #    print "... monitoring finished\n\n"
368     # lets print some stats
369     print "\n\nStats monitoring ..."
370     rounds = 200
371     with Timer() as t:
372         for i in range(rounds):
373             reported_flows = len(get_flow_ids(controller=in_args.host))
374             expected_flows = base_num_flows
375             print "Reported Flows: %d/%d" % (reported_flows, expected_flows)
376             if reported_flows <= expected_flows:
377                 break
378             time.sleep(1)
379
380     if i < rounds:
381         print "... monitoring finished in +%d seconds\n\n" % t.secs
382     else:
383         print "... monitoring aborted after %d rounds, elapsed time %d\n\n" % (rounds, t.secs)
384
385     if in_args.outfile != "":
386         addrate = add_details['flows'] / add_details['duration']
387         delrate = del_details['flows'] / del_details['duration']
388         print "addrate", addrate
389         print "delrate", delrate
390
391         with open(in_args.outfile, "wt") as fd:
392             fd.write("AddRate,DeleteRate\n")
393             fd.write("{0},{1}\n".format(addrate, delrate))
394
395
396 if __name__ == "__main__":
397     main(sys.argv[1:])