52b53eb1e6b93f34a315e385608f8c8b05fd9561
[integration/test.git] / test / tools / odl-mdsal-clustering-tests / clustering-performance-test / flow_config_blaster.py
1 #!/usr/bin/python
2 __author__ = "Jan Medved"
3 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
4 __license__ = "New-style BSD"
5 __email__ = "jmedved@cisco.com"
6
7 from random import randrange
8 import json
9 import argparse
10 import time
11 import threading
12 import re
13
14 import requests
15 import netaddr
16
17
18 class Counter(object):
19     def __init__(self, start=0):
20         self.lock = threading.Lock()
21         self.value = start
22
23     def increment(self, value=1):
24         self.lock.acquire()
25         val = self.value
26         try:
27             self.value += value
28         finally:
29             self.lock.release()
30         return val
31
32
33 class Timer(object):
34     def __init__(self, verbose=False):
35         self.verbose = verbose
36
37     def __enter__(self):
38         self.start = time.time()
39         return self
40
41     def __exit__(self, *args):
42         self.end = time.time()
43         self.secs = self.end - self.start
44         self.msecs = self.secs * 1000  # millisecs
45         if self.verbose:
46             print ("elapsed time: %f ms" % self.msecs)
47
48
49 class FlowConfigBlaster(object):
50     putheaders = {'content-type': 'application/json'}
51     getheaders = {'Accept': 'application/json'}
52
53     FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
54     INVURL = 'restconf/operational/opendaylight-inventory:nodes'
55
56     ok_total = 0
57
58     flows = {}
59
60     def __init__(self, host, port, ncycles, nthreads, nnodes, nflows, startflow, auth, json_template):
61         self.host = host
62         self.port = port
63         self.ncycles = ncycles
64         self.nthreads = nthreads
65         self.nnodes = nnodes
66         self.nflows = nflows
67         self.startflow = startflow
68         self.auth = auth
69
70         self.json_template = json_template
71         self.url_template = 'http://' + self.host + ":" + self.port + '/' + self.FLWURL
72
73         self.ok_rate = Counter(0.0)
74         self.total_rate = Counter(0.0)
75
76         self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
77
78         self.print_lock = threading.Lock()
79         self.cond = threading.Condition()
80         self.threads_done = 0
81
82         for i in range(self.nthreads):
83             self.flows[i] = {}
84
85     def get_num_nodes(self, session):
86         """
87         Determines the number of OF nodes in the connected mininet network. If mininet is not connected, the default
88         number of flows is 16
89         """
90         inventory_url = 'http://' + self.host + ":" + self.port + '/' + self.INVURL
91         nodes = self.nnodes
92
93         if not self.auth:
94             r = session.get(inventory_url, headers=self.getheaders, stream=False)
95         else:
96             r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'))
97
98         if r.status_code == 200:
99             try:
100                 inv = json.loads(r.content)['nodes']['node']
101                 nn = 0
102                 for n in range(len(inv)):
103                     if re.search('openflow', inv[n]['id']) is not None:
104                         nn += 1
105                 if nn != 0:
106                     nodes = nn
107             except KeyError:
108                 pass
109
110         return nodes
111
112     def add_flow(self, session, node, flow_id, ipaddr):
113         """
114         Adds a single flow to the config data store via REST
115         """
116         flow_data = self.json_template % (flow_id, 'TestFlow-%d' % flow_id, 65000, str(flow_id), 65000,
117                                           str(netaddr.IPAddress(ipaddr)))
118         # print flow_data
119         flow_url = self.url_template % (node, flow_id)
120         # print flow_url
121
122         if not self.auth:
123             r = session.put(flow_url, data=flow_data, headers=self.putheaders, stream=False)
124         else:
125             r = session.put(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'))
126
127         return r.status_code
128
129     def add_flows(self, start_flow, tid):
130         """
131         Adds flows into the ODL config space. This function is executed by a worker thread
132         """
133
134         add_res = {200: 0}
135
136         s = requests.Session()
137
138         n_nodes = self.get_num_nodes(s)
139
140         with self.print_lock:
141             print '    Thread %d:\n        Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes)
142
143         with Timer() as t:
144             for flow in range(self.nflows):
145                 node_id = randrange(1, n_nodes + 1)
146                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
147                 self.flows[tid][flow_id] = node_id
148                 sts = self.add_flow(s, node_id, flow_id, self.ip_addr.increment())
149                 try:
150                     add_res[sts] += 1
151                 except KeyError:
152                     add_res[sts] = 1
153
154         add_time = t.secs
155         add_ok_rate = add_res[200] / add_time
156         add_total_rate = sum(add_res.values()) / add_time
157
158         self.ok_rate.increment(add_ok_rate)
159         self.total_rate.increment(add_total_rate)
160
161         with self.print_lock:
162             print '    Thread %d: ' % tid
163             print '        Add time: %.2f,' % add_time
164             print '        Add success rate:  %.2f, Add total rate: %.2f' % (add_ok_rate, add_total_rate)
165             print '        Add Results: ',
166             print add_res
167             self.ok_total += add_res[200]
168             self.threads_done += 1
169
170         s.close()
171
172         with self.cond:
173             self.cond.notifyAll()
174
175     def delete_flow(self, session, node, flow_id):
176         """
177         Deletes a single flow from the ODL config data store via REST
178         """
179         flow_url = self.url_template % (node, flow_id)
180
181         if not self.auth:
182             r = session.delete(flow_url, headers=self.getheaders)
183         else:
184             r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'))
185
186         return r.status_code
187
188     def delete_flows(self, start_flow, tid):
189         """
190         Deletes flow from the ODL config space that have been added using the 'add_flows()' function. This function is
191         executed by a worker thread
192         """
193         del_res = {200: 0}
194
195         s = requests.Session()
196         n_nodes = self.get_num_nodes(s)
197
198         with self.print_lock:
199             print 'Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes)
200
201         with Timer() as t:
202             for flow in range(self.nflows):
203                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
204                 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id)
205                 try:
206                     del_res[sts] += 1
207                 except KeyError:
208                     del_res[sts] = 1
209
210         del_time = t.secs
211
212         del_ok_rate = del_res[200] / del_time
213         del_total_rate = sum(del_res.values()) / del_time
214
215         self.ok_rate.increment(del_ok_rate)
216         self.total_rate.increment(del_total_rate)
217
218         with self.print_lock:
219             print '    Thread %d: ' % tid
220             print '        Delete time: %.2f,' % del_time
221             print '        Delete success rate:  %.2f, Delete total rate: %.2f' % (del_ok_rate, del_total_rate)
222             print '        Delete Results: ',
223             print del_res
224             self.threads_done += 1
225
226         s.close()
227
228         with self.cond:
229             self.cond.notifyAll()
230
231     def run_cycle(self, function):
232         """
233         Runs an add or delete cycle. Starts a number of worker threads that each add a bunch of flows. Work is done
234         in context of the worker threads.
235         """
236
237         for c in range(self.ncycles):
238             with self.print_lock:
239                 print '\nCycle %d:' % c
240
241             threads = []
242             for i in range(self.nthreads):
243                 t = threading.Thread(target=function, args=(c * self.nflows, i))
244                 threads.append(t)
245                 t.start()
246
247             # Wait for all threads to finish and measure the execution time
248             with Timer() as t:
249                 while self.threads_done < self.nthreads:
250                     with self.cond:
251                         self.cond.wait()
252
253             with self.print_lock:
254                 print '    Total success rate: %.2f, Total rate: %.2f' % (
255                     self.ok_rate.value, self.total_rate.value)
256                 measured_rate = (self.nthreads * self.nflows) / t.secs
257                 print '    Measured rate:      %.2f (%.2f%% of Total success rate)' % \
258                       (measured_rate, measured_rate / self.total_rate.value * 100)
259                 print '    Measured time:      %.2fs' % t.secs
260                 self.threads_done = 0
261
262             self.ok_rate.value = 0
263             self.total_rate.value = 0
264
265     def add_blaster(self):
266         self.run_cycle(self.add_flows)
267
268     def delete_blaster(self):
269         self.run_cycle(self.delete_flows)
270
271     def get_total_flows(self):
272         return sum(len(self.flows[key]) for key in self.flows.keys())
273
274     def get_ok_flows(self):
275         return self.ok_total
276
277
278 def get_json_from_file(filename):
279     """
280     Get a flow programming template from a file
281     :param filename: File from which to get the template
282     :return: The json flow template (string)
283     """
284     with open(filename, 'r') as f:
285         read_data = f.read()
286     return read_data
287
288
289 if __name__ == "__main__":
290     JSON_FLOW_MOD1 = '''{
291         "flow-node-inventory:flow": [
292             {
293                 "flow-node-inventory:cookie": %d,
294                 "flow-node-inventory:cookie_mask": 4294967295,
295                 "flow-node-inventory:flow-name": "%s",
296                 "flow-node-inventory:hard-timeout": %d,
297                 "flow-node-inventory:id": "%s",
298                 "flow-node-inventory:idle-timeout": %d,
299                 "flow-node-inventory:installHw": false,
300                 "flow-node-inventory:instructions": {
301                     "flow-node-inventory:instruction": [
302                         {
303                             "flow-node-inventory:apply-actions": {
304                                 "flow-node-inventory:action": [
305                                     {
306                                         "flow-node-inventory:drop-action": {},
307                                         "flow-node-inventory:order": 0
308                                     }
309                                 ]
310                             },
311                             "flow-node-inventory:order": 0
312                         }
313                     ]
314                 },
315                 "flow-node-inventory:match": {
316                     "flow-node-inventory:ipv4-destination": "%s/32",
317                     "flow-node-inventory:ethernet-match": {
318                         "flow-node-inventory:ethernet-type": {
319                             "flow-node-inventory:type": 2048
320                         }
321                     }
322                 },
323                 "flow-node-inventory:priority": 2,
324                 "flow-node-inventory:strict": false,
325                 "flow-node-inventory:table_id": 0
326             }
327         ]
328     }'''
329
330     parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
331                                                  'into the config tree, as specified by optional parameters.')
332
333     parser.add_argument('--host', default='127.0.0.1',
334                         help='Host where odl controller is running (default is 127.0.0.1)')
335     parser.add_argument('--port', default='8181',
336                         help='Port on which odl\'s RESTCONF is listening (default is 8181)')
337     parser.add_argument('--cycles', type=int, default=1,
338                         help='Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are '
339                              'performed in cycles. <THREADS> worker threads are started in each cycle and the cycle '
340                              'ends when all threads finish. Another cycle is started when the previous cycle finished.')
341     parser.add_argument('--threads', type=int, default=1,
342                         help='Number of request worker threads to start in each cycle; default=1. '
343                              'Each thread will add/delete <FLOWS> flows.')
344     parser.add_argument('--flows', type=int, default=10,
345                         help='Number of flows that will be added/deleted by each worker thread in each cycle; '
346                              'default 10')
347     parser.add_argument('--nodes', type=int, default=16,
348                         help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
349                              'flows will be evenly distributed (programmed) into connected nodes.')
350     parser.add_argument('--delay', type=int, default=0,
351                         help='Time (in seconds) to wait between the add and delete cycles; default=0')
352     parser.add_argument('--delete', dest='delete', action='store_true', default=True,
353                         help='Delete all added flows one by one, benchmark delete '
354                              'performance.')
355     parser.add_argument('--no-delete', dest='delete', action='store_false',
356                         help='Do not perform the delete cycle.')
357     parser.add_argument('--auth', dest='auth', action='store_true', default=False,
358                         help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
359                              'default: no authentication')
360     parser.add_argument('--startflow', type=int, default=0,
361                         help='The starting Flow ID; default=0')
362     parser.add_argument('--file', default='',
363                         help='File from which to read the JSON flow template; default: no file, use a built in '
364                              'template.')
365
366     in_args = parser.parse_args()
367
368     if in_args.file != '':
369         flow_template = get_json_from_file(in_args.file)
370     else:
371         flow_template = JSON_FLOW_MOD1
372
373     fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.nodes,
374                             in_args.flows, in_args.startflow, in_args.auth, flow_template)
375
376     # Run through <cycles>, where <threads> are started in each cycle and <flows> are added from each thread
377     fct.add_blaster()
378
379     print '\n*** Total flows added: %s' % fct.get_total_flows()
380     print '    HTTP[OK] results:  %d\n' % fct.get_ok_flows()
381
382     if in_args.delay > 0:
383         print '*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay
384         time.sleep(in_args.delay)
385
386     # Run through <cycles>, where <threads> are started in each cycle and <flows> previously added in an add cycle are
387     # deleted in each thread
388     if in_args.delete:
389         fct.delete_blaster()