Merge "Added FlowConfigBlaster 'Floodlight Edition' - flow config blaster that uses...
[integration/test.git] / test / tools / odl-mdsal-clustering-tests / clustering-performance-test / flow_config_blaster.py
1 #!/usr/bin/python
2 __author__ = "Jan Medved"
3 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
4 __license__ = "New-style BSD"
5 __email__ = "jmedved@cisco.com"
6
7 from random import randrange
8 import json
9 import argparse
10 import time
11 import threading
12 import re
13
14 import requests
15 import netaddr
16
17
18 class Counter(object):
19     def __init__(self, start=0):
20         self.lock = threading.Lock()
21         self.value = start
22
23     def increment(self, value=1):
24         self.lock.acquire()
25         val = self.value
26         try:
27             self.value += value
28         finally:
29             self.lock.release()
30         return val
31
32
33 class Timer(object):
34     def __init__(self, verbose=False):
35         self.verbose = verbose
36
37     def __enter__(self):
38         self.start = time.time()
39         return self
40
41     def __exit__(self, *args):
42         self.end = time.time()
43         self.secs = self.end - self.start
44         self.msecs = self.secs * 1000  # millisecs
45         if self.verbose:
46             print ("elapsed time: %f ms" % self.msecs)
47
48
49 class FlowConfigBlaster(object):
50     putheaders = {'content-type': 'application/json'}
51     getheaders = {'Accept': 'application/json'}
52
53     FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
54     INVURL = 'restconf/operational/opendaylight-inventory:nodes'
55
56     ok_total = 0
57
58     flows = {}
59
60     def __init__(self, host, port, ncycles, nthreads, nnodes, nflows, startflow, auth, json_template):
61         self.host = host
62         self.port = port
63         self.ncycles = ncycles
64         self.nthreads = nthreads
65         self.nnodes = nnodes
66         self.nflows = nflows
67         self.startflow = startflow
68         self.auth = auth
69
70         self.json_template = json_template
71         self.url_template = 'http://' + self.host + ":" + self.port + '/' + self.FLWURL
72
73         self.ok_rate = Counter(0.0)
74         self.total_rate = Counter(0.0)
75
76         self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
77
78         self.print_lock = threading.Lock()
79         self.cond = threading.Condition()
80         self.threads_done = 0
81
82         for i in range(self.nthreads):
83             self.flows[i] = {}
84
85
86     def get_num_nodes(self, session):
87         """
88         Determines the number of OF nodes in the connected mininet network. If mininet is not connected, the default
89         number of flows is 16
90         """
91         inventory_url = 'http://' + self.host + ":" + self.port + '/' + self.INVURL
92         nodes = self.nnodes
93
94         if not self.auth:
95             r = session.get(inventory_url, headers=self.getheaders, stream=False)
96         else:
97             r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'))
98
99         if r.status_code == 200:
100             try:
101                 inv = json.loads(r.content)['nodes']['node']
102                 nn = 0
103                 for n in range(len(inv)):
104                     if re.search('openflow', inv[n]['id']) is not None:
105                         nn += 1
106                 if nn != 0:
107                     nodes = nn
108             except KeyError:
109                 pass
110
111         return nodes
112
113
114     def add_flow(self, session, node, flow_id, ipaddr):
115         """
116         Adds a single flow to the config data store via REST
117         """
118         flow_data = self.json_template % (flow_id, 'TestFlow-%d' % flow_id, 65000, str(flow_id), 65000,
119                                           str(netaddr.IPAddress(ipaddr)))
120         # print flow_data
121         flow_url = self.url_template % (node, flow_id)
122         # print flow_url
123
124         if not self.auth:
125             r = session.put(flow_url, data=flow_data, headers=self.putheaders, stream=False)
126         else:
127             r = session.put(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'))
128
129         return r.status_code
130
131
132     def add_flows(self, start_flow, tid):
133         """
134         Adds flows into the ODL config space. This function is executed by a worker thread
135         """
136
137         add_res = {200: 0}
138
139         s = requests.Session()
140
141         n_nodes = self.get_num_nodes(s)
142
143         with self.print_lock:
144             print '    Thread %d:\n        Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes)
145
146         with Timer() as t:
147             for flow in range(self.nflows):
148                 node_id = randrange(1, n_nodes + 1)
149                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
150                 self.flows[tid][flow_id] = node_id
151                 sts = self.add_flow(s, node_id, flow_id, self.ip_addr.increment())
152                 try:
153                     add_res[sts] += 1
154                 except KeyError:
155                     add_res[sts] = 1
156
157         add_time = t.secs
158         add_ok_rate = add_res[200] / add_time
159         add_total_rate = sum(add_res.values()) / add_time
160
161         self.ok_rate.increment(add_ok_rate)
162         self.total_rate.increment(add_total_rate)
163
164         with self.print_lock:
165             print '    Thread %d: ' % tid
166             print '        Add time: %.2f,' % add_time
167             print '        Add success rate:  %.2f, Add total rate: %.2f' % (add_ok_rate, add_total_rate)
168             print '        Add Results: ',
169             print add_res
170             self.ok_total += add_res[200]
171             self.threads_done += 1
172
173         s.close()
174
175         with self.cond:
176             self.cond.notifyAll()
177
178
179     def delete_flow(self, session, node, flow_id):
180         """
181         Deletes a single flow from the ODL config data store via REST
182         """
183         flow_url = self.url_template % (node, flow_id)
184
185         if not self.auth:
186             r = session.delete(flow_url, headers=self.getheaders)
187         else:
188             r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'))
189
190         return r.status_code
191
192
193     def delete_flows(self, start_flow, tid):
194         """
195         Deletes flow from the ODL config space that have been added using the 'add_flows()' function. This function is
196         executed by a worker thread
197         """
198         del_res = {200: 0}
199
200         s = requests.Session()
201         n_nodes = self.get_num_nodes(s)
202
203         with self.print_lock:
204             print 'Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes)
205
206         with Timer() as t:
207             for flow in range(self.nflows):
208                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
209                 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id)
210                 try:
211                     del_res[sts] += 1
212                 except KeyError:
213                     del_res[sts] = 1
214
215         del_time = t.secs
216
217         del_ok_rate = del_res[200] / del_time
218         del_total_rate = sum(del_res.values()) / del_time
219
220         self.ok_rate.increment(del_ok_rate)
221         self.total_rate.increment(del_total_rate)
222
223         with self.print_lock:
224             print '    Thread %d: ' % tid
225             print '        Delete time: %.2f,' % del_time
226             print '        Delete success rate:  %.2f, Delete total rate: %.2f' % (del_ok_rate, del_total_rate)
227             print '        Delete Results: ',
228             print del_res
229             self.threads_done += 1
230
231         s.close()
232
233         with self.cond:
234             self.cond.notifyAll()
235
236
237     def run_cycle(self, function):
238         """
239         Runs an add or delete cycle. Starts a number of worker threads that each add a bunch of flows. Work is done
240         in context of the worker threads
241         """
242
243         for c in range(self.ncycles):
244             with self.print_lock:
245                 print '\nCycle %d:' % c
246
247             threads = []
248             for i in range(self.nthreads):
249                 t = threading.Thread(target=function, args=(c * self.nflows, i))
250                 threads.append(t)
251                 t.start()
252
253             # Wait for all threads to finish and measure the execution time
254             with Timer() as t:
255                 while self.threads_done < self.nthreads:
256                     with self.cond:
257                         self.cond.wait()
258
259             with self.print_lock:
260                 print '    Total success rate: %.2f, Total rate: %.2f' % (
261                     self.ok_rate.value, self.total_rate.value)
262                 measured_rate = self.nthreads * self.nflows * self.ncycles / t.secs
263                 print '    Measured rate:      %.2f (%.2f%% of Total success rate)' % \
264                       (measured_rate, measured_rate / self.total_rate.value * 100)
265                 self.threads_done = 0
266
267             self.ok_rate.value = 0
268             self.total_rate.value = 0
269
270
271     def add_blaster(self):
272         self.run_cycle(self.add_flows)
273
274     def delete_blaster(self):
275         self.run_cycle(self.delete_flows)
276
277     def get_total_flows(self):
278         return sum(len(self.flows[key]) for key in self.flows.keys())
279
280     def get_ok_flows(self):
281         return self.ok_total
282
283
284 def get_json_from_file(filename):
285     with open(filename, 'r') as f:
286         read_data = f.read()
287     return read_data
288
289
290 if __name__ == "__main__":
291
292     JSON_FLOW_MOD1 = '''{
293         "flow-node-inventory:flow": [
294             {
295                 "flow-node-inventory:cookie": %d,
296                 "flow-node-inventory:cookie_mask": 4294967295,
297                 "flow-node-inventory:flow-name": "%s",
298                 "flow-node-inventory:hard-timeout": %d,
299                 "flow-node-inventory:id": "%s",
300                 "flow-node-inventory:idle-timeout": %d,
301                 "flow-node-inventory:installHw": false,
302                 "flow-node-inventory:instructions": {
303                     "flow-node-inventory:instruction": [
304                         {
305                             "flow-node-inventory:apply-actions": {
306                                 "flow-node-inventory:action": [
307                                     {
308                                         "flow-node-inventory:drop-action": {},
309                                         "flow-node-inventory:order": 0
310                                     }
311                                 ]
312                             },
313                             "flow-node-inventory:order": 0
314                         }
315                     ]
316                 },
317                 "flow-node-inventory:match": {
318                     "flow-node-inventory:ipv4-destination": "%s/32",
319                     "flow-node-inventory:ethernet-match": {
320                         "flow-node-inventory:ethernet-type": {
321                             "flow-node-inventory:type": 2048
322                         }
323                     }
324                 },
325                 "flow-node-inventory:priority": 2,
326                 "flow-node-inventory:strict": false,
327                 "flow-node-inventory:table_id": 0
328             }
329         ]
330     }'''
331
332     parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
333                                                  'into the config tree, as specified by optional parameters.')
334
335     parser.add_argument('--host', default='127.0.0.1',
336                         help='Host where odl controller is running (default is 127.0.0.1)')
337     parser.add_argument('--port', default='8181',
338                         help='Port on which odl\'s RESTCONF is listening (default is 8181)')
339     parser.add_argument('--cycles', type=int, default=1,
340                         help='Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are '
341                              'performed in cycles. <THREADS> worker threads are started in each cycle and the cycle '
342                              'ends when all threads finish. Another cycle is started when the previous cycle finished.')
343     parser.add_argument('--threads', type=int, default=1,
344                         help='Number of request worker threads to start in each cycle; default=1. '
345                              'Each thread will add/delete <FLOWS> flows.')
346     parser.add_argument('--flows', type=int, default=10,
347                         help='Number of flows that will be added/deleted by each worker thread in each cycle; '
348                              'default 10')
349     parser.add_argument('--nodes', type=int, default=16,
350                         help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
351                              'flows will be evenly distributed (programmed) into connected nodes.')
352     parser.add_argument('--delay', type=int, default=0,
353                         help='Time (in seconds) to wait between the add and delete cycles; default=0')
354     parser.add_argument('--delete', dest='delete', action='store_true', default=True,
355                         help='Delete all added flows one by one, benchmark delete '
356                              'performance.')
357     parser.add_argument('--no-delete', dest='delete', action='store_false',
358                         help='Do not perform the delete cycle.')
359     parser.add_argument('--auth', dest='auth', action='store_true', default=False,
360                         help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
361                              'default: no authentication')
362     parser.add_argument('--startflow', type=int, default=0,
363                         help='The starting Flow ID; default=0')
364     parser.add_argument('--file', default='',
365                         help='File from which to read the JSON flow template; default: no file, use a built in '
366                              'template.')
367
368     in_args = parser.parse_args()
369
370     if in_args.file != '':
371         flow_template = get_json_from_file(in_args.file)
372     else:
373         flow_template = JSON_FLOW_MOD1
374
375     fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.nodes,
376                             in_args.flows, in_args.startflow, in_args.auth, flow_template)
377
378     # Run through <cycles>, where <threads> are started in each cycle and <flows> are added from each thread
379     fct.add_blaster()
380
381     print '\n*** Total flows added: %s' % fct.get_total_flows()
382     print '    HTTP[OK] results:  %d\n' % fct.get_ok_flows()
383
384     if in_args.delay > 0:
385         print '*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay
386         time.sleep(in_args.delay)
387
388     # Run through <cycles>, where <threads> are started in each cycle and <flows> previously added in an add cycle are
389     # deleted in each thread
390     if in_args.delete:
391         fct.delete_blaster()