Additional comments & spellcheck for the README file
[integration/test.git] / test / tools / odl-mdsal-clustering-tests / clustering-performance-test / flow_config_blaster.py
1 #!/usr/bin/python
2 __author__ = "Jan Medved"
3 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
4 __license__ = "New-style BSD"
5 __email__ = "jmedved@cisco.com"
6
7 from random import randrange
8 import json
9 import argparse
10 import requests
11 import time
12 import threading
13 import re
14 import netaddr
15
16
17 class Counter(object):
18     def __init__(self, start=0):
19         self.lock = threading.Lock()
20         self.value = start
21
22     def increment(self, value=1):
23         self.lock.acquire()
24         val = self.value
25         try:
26             self.value += value
27         finally:
28             self.lock.release()
29         return val
30
31
32 class Timer(object):
33     def __init__(self, verbose=False):
34         self.verbose = verbose
35
36     def __enter__(self):
37         self.start = time.time()
38         return self
39
40     def __exit__(self, *args):
41         self.end = time.time()
42         self.secs = self.end - self.start
43         self.msecs = self.secs * 1000  # millisecs
44         if self.verbose:
45             print ("elapsed time: %f ms" % self.msecs)
46
47
48 class FlowConfigBlaster(object):
49     putheaders = {'content-type': 'application/json'}
50     getheaders = {'Accept': 'application/json'}
51
52     FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
53     INVURL = 'restconf/operational/opendaylight-inventory:nodes'
54
55     ok_total = 0
56
57     flows = {}
58
59     def __init__(self, host, port, ncycles, nthreads, nnodes, nflows, startflow, auth, json_template):
60         self.host = host
61         self.port = port
62         self.ncycles = ncycles
63         self.nthreads = nthreads
64         self.nnodes = nnodes
65         self.nflows = nflows
66         self.startflow = startflow
67         self.auth = auth
68         self.json_template = json_template
69
70         self.ok_rate = Counter(0.0)
71         self.total_rate = Counter(0.0)
72
73         self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
74
75         self.print_lock = threading.Lock()
76         self.cond = threading.Condition()
77         self.threads_done = 0
78
79         for i in range(self.nthreads):
80             self.flows[i] = {}
81
82
83     def get_num_nodes(self, session):
84         """
85         Determines the number of OF nodes in the connected mininet network. If mininet is not connected, the default
86         number of flows is 16
87         """
88         inventory_url = 'http://' + self.host + ":" + self.port + '/' + self.INVURL
89         nodes = self.nnodes
90
91         if not self.auth:
92             r = session.get(inventory_url, headers=self.getheaders, stream=False)
93         else:
94             r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'))
95
96         if r.status_code == 200:
97             try:
98                 inv = json.loads(r.content)['nodes']['node']
99                 nn = 0
100                 for n in range(len(inv)):
101                     if re.search('openflow', inv[n]['id']) is not None:
102                         nn += 1
103                 if nn != 0:
104                     nodes = nn
105             except KeyError:
106                 pass
107
108         return nodes
109
110
111     def add_flow(self, session, url_template, tid, node, flow_id, ipaddr):
112         """
113         Adds a single flow to the config data store via REST
114         """
115         flow_data = self.json_template % (tid + flow_id, 'TestFlow-%d' % flow_id, 65000,
116                                       str(flow_id), 65000, str(netaddr.IPAddress(ipaddr)))
117         # print flow_data
118         flow_url = url_template % (node, flow_id)
119         # print flow_url
120
121         if not self.auth:
122             r = session.put(flow_url, data=flow_data, headers=self.putheaders, stream=False)
123         else:
124             r = session.put(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'))
125
126         return r.status_code
127
128
129     def add_flows(self, start_flow, tid):
130         """
131         Adds flows into the ODL config space. This function is executed by a worker thread
132         """
133
134         put_url = 'http://' + self.host + ":" + self.port + '/' + self.FLWURL
135
136         add_res = {200: 0}
137
138         s = requests.Session()
139
140         n_nodes = self.get_num_nodes(s)
141
142         with self.print_lock:
143             print '    Thread %d:\n        Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes)
144
145         with Timer() as t:
146             for flow in range(self.nflows):
147                 node_id = randrange(1, n_nodes + 1)
148                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
149                 self.flows[tid][flow_id] = node_id
150                 sts = self.add_flow(s, put_url, tid, node_id, flow_id, self.ip_addr.increment())
151                 try:
152                     add_res[sts] += 1
153                 except KeyError:
154                     add_res[sts] = 1
155
156         add_time = t.secs
157         add_ok_rate = add_res[200] / add_time
158         add_total_rate = sum(add_res.values()) / add_time
159
160         self.ok_rate.increment(add_ok_rate)
161         self.total_rate.increment(add_total_rate)
162
163         with self.print_lock:
164             print '    Thread %d: ' % tid
165             print '        Add time: %.2f,' % add_time
166             print '        Add success rate:  %.2f, Add total rate: %.2f' % (add_ok_rate, add_total_rate)
167             print '        Add Results: ',
168             print add_res
169             self.ok_total += add_res[200]
170             self.threads_done += 1
171
172         s.close()
173
174         with self.cond:
175             self.cond.notifyAll()
176
177
178     def delete_flow(self, session, url_template, node, flow_id):
179         flow_url = url_template % (node, flow_id)
180
181         if not self.auth:
182             r = session.delete(flow_url, headers=self.getheaders)
183         else:
184             r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'))
185
186         return r.status_code
187
188
189     def delete_flows(self, start_flow, tid):
190         """
191         Deletes flow from the ODL config space that have been added using the 'add_flows()' function. This function is
192         executed by a worker thread
193         """
194         del_url = 'http://' + self.host + ":" + self.port + '/' + self.FLWURL
195
196         del_res = {200: 0}
197
198         s = requests.Session()
199         n_nodes = self.get_num_nodes(s)
200
201         with self.print_lock:
202             print 'Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes)
203
204         with Timer() as t:
205             for flow in range(self.nflows):
206                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
207                 sts = self.delete_flow(s, del_url, self.flows[tid][flow_id], flow_id)
208                 try:
209                     del_res[sts] += 1
210                 except KeyError:
211                     del_res[sts] = 1
212
213         del_time = t.secs
214
215         del_ok_rate = del_res[200] / del_time
216         del_total_rate = sum(del_res.values()) / del_time
217
218         self.ok_rate.increment(del_ok_rate)
219         self.total_rate.increment(del_total_rate)
220
221         with self.print_lock:
222             print '    Thread %d: ' % tid
223             print '        Delete time: %.2f,' % del_time
224             print '        Delete success rate:  %.2f, Delete total rate: %.2f' % (del_ok_rate, del_total_rate)
225             print '        Delete Results: ',
226             print del_res
227             self.threads_done += 1
228
229         s.close()
230
231         with self.cond:
232             self.cond.notifyAll()
233
234
235     def run_cycle(self, function):
236         """
237         Runs an add or delete cycle. Starts a number of worker threads that each add a bunch of flows. Work is done
238         in context of the worker threads
239         """
240
241         for c in range(self.ncycles):
242             with self.print_lock:
243                 print '\nCycle %d:' % c
244
245             threads = []
246             for i in range(self.nthreads):
247                 t = threading.Thread(target=function, args=(c * self.nflows, i))
248                 threads.append(t)
249                 t.start()
250
251             # Wait for all threads to finish
252             while self.threads_done < self.nthreads:
253                 with self.cond:
254                     self.cond.wait()
255
256             with self.print_lock:
257                 print '    Overall success rate:  %.2f, Overall rate: %.2f' % (
258                     self.ok_rate.value, self.total_rate.value)
259                 self.threads_done = 0
260
261             self.ok_rate.value = 0
262             self.total_rate.value = 0
263
264
265     def add_blaster(self):
266         self.run_cycle(self.add_flows)
267
268     def delete_blaster(self):
269         self.run_cycle(self.delete_flows)
270
271     def get_total_flows(self):
272         return sum(len(self.flows[key]) for key in self.flows.keys())
273
274     def get_ok_flows(self):
275         return self.ok_total
276
277
278 if __name__ == "__main__":
279
280     JSON_FLOW_MOD1 = '''{
281         "flow-node-inventory:flow": [
282             {
283                 "flow-node-inventory:cookie": %d,
284                 "flow-node-inventory:cookie_mask": 65535,
285                 "flow-node-inventory:flow-name": "%s",
286                 "flow-node-inventory:hard-timeout": %d,
287                 "flow-node-inventory:id": "%s",
288                 "flow-node-inventory:idle-timeout": %d,
289                 "flow-node-inventory:installHw": false,
290                 "flow-node-inventory:instructions": {
291                     "flow-node-inventory:instruction": [
292                         {
293                             "flow-node-inventory:apply-actions": {
294                                 "flow-node-inventory:action": [
295                                     {
296                                         "flow-node-inventory:drop-action": {},
297                                         "flow-node-inventory:order": 0
298                                     }
299                                 ]
300                             },
301                             "flow-node-inventory:order": 0
302                         }
303                     ]
304                 },
305                 "flow-node-inventory:match": {
306                     "flow-node-inventory:ipv4-destination": "%s/32",
307                     "flow-node-inventory:ethernet-match": {
308                         "flow-node-inventory:ethernet-type": {
309                             "flow-node-inventory:type": 2048
310                         }
311                     }
312                 },
313                 "flow-node-inventory:priority": 2,
314                 "flow-node-inventory:strict": false,
315                 "flow-node-inventory:table_id": 0
316             }
317         ]
318     }'''
319
320
321     parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
322                                                  'into the config tree, as specified by optional parameters.')
323
324     parser.add_argument('--host', default='127.0.0.1',
325                         help='Host where odl controller is running (default is 127.0.0.1)')
326     parser.add_argument('--port', default='8181',
327                         help='Port on which odl\'s RESTCONF is listening (default is 8181)')
328     parser.add_argument('--flows', type=int, default=10,
329                         help='Number of flow add/delete requests to send in each cycle; default 10')
330     parser.add_argument('--cycles', type=int, default=1,
331                         help='Number of flow add/delete cycles to send in each thread; default 1')
332     parser.add_argument('--threads', type=int, default=1,
333                         help='Number of request worker threads, default=1. '
334                              'Each thread will add/delete nflows.')
335     parser.add_argument('--nodes', type=int, default=16,
336                         help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
337                              'flows will be evenly distributed (programmed) into connected nodes.')
338     parser.add_argument('--delay', type=int, default=0,
339                         help='Time to wait between the add and delete cycles; default=0')
340     parser.add_argument('--delete', dest='delete', action='store_true', default=True,
341                         help='Delete all added flows one by one, benchmark delete '
342                              'performance.')
343     parser.add_argument('--no-delete', dest='delete', action='store_false',
344                         help='Do not perform the delete cycle.')
345     parser.add_argument('--no-auth', dest='auth', action='store_false', default=False,
346                         help="Do not use authenticated access to REST (default)")
347     parser.add_argument('--auth', dest='auth', action='store_true',
348                         help="Use authenticated access to REST (username: 'admin', password: 'admin').")
349     parser.add_argument('--startflow', type=int, default=0,
350                         help='The starting Flow ID; default=0')
351
352     in_args = parser.parse_args()
353
354     fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.nodes,
355                             in_args.flows, in_args.startflow, in_args.auth, JSON_FLOW_MOD1)
356
357     # Run through <cycles>, where <threads> are started in each cycle and <flows> are added from each thread
358     fct.add_blaster()
359
360     print '\n*** Total flows added: %s' % fct.get_total_flows()
361     print '    HTTP[OK] results:  %d\n' % fct.get_ok_flows()
362
363     if in_args.delay > 0:
364         print '*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay
365         time.sleep(in_args.delay)
366
367     # Run through <cycles>, where <threads> are started in each cycle and <flows> previously added in an add cycle are
368     # deleted in each thread
369     if in_args.delete:
370         fct.delete_blaster()