Merge "Reworked FlowConfigBlaster to be able to batch multiple flows in a single...
[integration/test.git] / test / tools / odl-mdsal-clustering-tests / clustering-performance-test / flow_config_blaster.py
1 #!/usr/bin/python
2 __author__ = "Jan Medved"
3 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
4 __license__ = "New-style BSD"
5 __email__ = "jmedved@cisco.com"
6
7 from random import randrange
8 import json
9 import argparse
10 import time
11 import threading
12 import re
13 import copy
14
15 import requests
16 import netaddr
17
18
19 class Counter(object):
20     def __init__(self, start=0):
21         self.lock = threading.Lock()
22         self.value = start
23
24     def increment(self, value=1):
25         self.lock.acquire()
26         val = self.value
27         try:
28             self.value += value
29         finally:
30             self.lock.release()
31         return val
32
33
34 class Timer(object):
35     def __init__(self, verbose=False):
36         self.verbose = verbose
37
38     def __enter__(self):
39         self.start = time.time()
40         return self
41
42     def __exit__(self, *args):
43         self.end = time.time()
44         self.secs = self.end - self.start
45         self.msecs = self.secs * 1000  # millisecs
46         if self.verbose:
47             print ("elapsed time: %f ms" % self.msecs)
48
49
50 class FlowConfigBlaster(object):
51     putheaders = {'content-type': 'application/json'}
52     getheaders = {'Accept': 'application/json'}
53
54     FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
55     TBLURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0"
56     INVURL = 'restconf/operational/opendaylight-inventory:nodes'
57
58     flows = {}
59
60     # The "built-in" flow template
61     flow_mode_template = {
62         u'flow': [
63             {
64                 u'hard-timeout': 65000,
65                 u'idle-timeout': 65000,
66                 u'cookie_mask': 4294967295,
67                 u'flow-name': u'FLOW-NAME-TEMPLATE',
68                 u'priority': 2,
69                 u'strict': False,
70                 u'cookie': 0,
71                 u'table_id': 0,
72                 u'installHw': False,
73                 u'id': u'FLOW-ID-TEMPLATE',
74                 u'match': {
75                     u'ipv4-destination': u'0.0.0.0/32',
76                     u'ethernet-match': {
77                         u'ethernet-type': {
78                             u'type': 2048
79                         }
80                     }
81                 },
82                 u'instructions': {
83                     u'instruction': [
84                         {
85                             u'order': 0,
86                             u'apply-actions': {
87                                 u'action': [
88                                     {
89                                         u'drop-action': {},
90                                         u'order': 0
91                                     }
92                                 ]
93                             }
94                         }
95                     ]
96                 }
97             }
98         ]
99     }
100
101     class FcbStats(object):
102         """
103         FlowConfigBlaster Statistics: a class that stores and further processes
104         statistics collected by Blaster worker threads during their execution.
105         """
106         def __init__(self):
107             self.ok_rqst_rate = Counter(0.0)
108             self.total_rqst_rate = Counter(0.0)
109             self.ok_flow_rate = Counter(0.0)
110             self.total_flow_rate = Counter(0.0)
111             self.ok_rqsts = Counter(0)
112             self.total_rqsts = Counter(0)
113             self.ok_flows = Counter(0)
114             self.total_flows = Counter(0)
115
116         def process_stats(self, rqst_stats, flow_stats, elapsed_time):
117             """
118             Calculates the stats for RESTCONF request and flow programming
119             throughput, and aggregates statistics across all Blaster threads.
120             """
121             ok_rqsts = rqst_stats[200] + rqst_stats[204]
122             total_rqsts = sum(rqst_stats.values())
123             ok_flows = flow_stats[200] + flow_stats[204]
124             total_flows = sum(flow_stats.values())
125
126             ok_rqst_rate = ok_rqsts / elapsed_time
127             total_rqst_rate = total_rqsts / elapsed_time
128             ok_flow_rate = ok_flows / elapsed_time
129             total_flow_rate = total_flows / elapsed_time
130
131             self.ok_rqsts.increment(ok_rqsts)
132             self.total_rqsts.increment(total_rqsts)
133             self.ok_flows.increment(ok_flows)
134             self.total_flows.increment(total_flows)
135
136             self.ok_rqst_rate.increment(ok_rqst_rate)
137             self.total_rqst_rate.increment(total_rqst_rate)
138             self.ok_flow_rate.increment(ok_flow_rate)
139             self.total_flow_rate.increment(total_flow_rate)
140
141             return ok_rqst_rate, total_rqst_rate, ok_flow_rate, total_flow_rate
142
143         def get_ok_rqst_rate(self):
144             return self.ok_rqst_rate.value
145
146         def get_total_rqst_rate(self):
147             return self.total_rqst_rate.value
148
149         def get_ok_flow_rate(self):
150             return self.ok_flow_rate.value
151
152         def get_total_flow_rate(self):
153             return self.total_flow_rate.value
154
155         def get_ok_rqsts(self):
156             return self.ok_rqsts.value
157
158         def get_total_rqsts(self):
159             return self.total_rqsts.value
160
161         def get_ok_flows(self):
162             return self.ok_flows.value
163
164         def get_total_flows(self):
165             return self.total_flows.value
166
167     def __init__(self, host, port, ncycles, nthreads, fpr, nnodes, nflows, startflow, auth, flow_mod_template=None):
168         self.host = host
169         self.port = port
170         self.ncycles = ncycles
171         self.nthreads = nthreads
172         self.fpr = fpr
173         self.nnodes = nnodes
174         self.nflows = nflows
175         self.startflow = startflow
176         self.auth = auth
177
178         if flow_mod_template:
179             self.flow_mode_template = flow_mod_template
180
181         self.post_url_template = 'http://' + self.host + ":" + self.port + '/' + self.TBLURL
182         self.del_url_template = 'http://' + self.host + ":" + self.port + '/' + self.FLWURL
183
184         self.stats = self.FcbStats()
185         self.total_ok_flows = 0
186         self.total_ok_rqsts = 0
187
188         self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
189
190         self.print_lock = threading.Lock()
191         self.cond = threading.Condition()
192         self.threads_done = 0
193
194         for i in range(self.nthreads):
195             self.flows[i] = {}
196
197     def get_num_nodes(self, session):
198         """
199         Determines the number of OF nodes in the connected mininet network. If
200         mininet is not connected, the default number of flows is set to 16.
201         :param session: 'requests' session which to use to query the controller
202                         for openflow nodes
203         :return: None
204         """
205         inventory_url = 'http://' + self.host + ":" + self.port + '/' + self.INVURL
206         nodes = self.nnodes
207
208         if not self.auth:
209             r = session.get(inventory_url, headers=self.getheaders, stream=False)
210         else:
211             r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'))
212
213         if r.status_code == 200:
214             try:
215                 inv = json.loads(r.content)['nodes']['node']
216                 nn = 0
217                 for n in range(len(inv)):
218                     if re.search('openflow', inv[n]['id']) is not None:
219                         nn += 1
220                 if nn != 0:
221                     nodes = nn
222             except KeyError:
223                 pass
224
225         return nodes
226
227     def create_flow_from_template(self, flow_id, ipaddr):
228         """
229         Create a new flow instance from the flow template specified during
230         FlowConfigBlaster instantiation. Flow templates are json-compatible
231         dictionaries that MUST contain elements for flow cookie, flow name,
232         flow id and the destination IPv4 address in the flow match field.
233         :param flow_id: Id for the new flow to create
234         :param ipaddr: IP Address to put into the flow's match
235         :return: The newly created flow instance
236         """
237         flow = copy.deepcopy(self.flow_mode_template['flow'][0])
238         flow['cookie'] = flow_id
239         flow['flow-name'] = 'TestFlow-%d' % flow_id
240         flow['id'] = str(flow_id)
241         flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ipaddr))
242         return flow
243
244     def post_flows(self, session, node, flow_list):
245         """
246         Performs a RESTCONF post of flows passed in the 'flow_list' parameters
247         :param session: 'requests' session on which to perform the POST
248         :param node: The ID of the openflow node to which to post the flows
249         :param flow_list: List of flows (in dictionary form) to POST
250         :return: status code from the POST operation
251         """
252         fmod = dict(self.flow_mode_template)
253         fmod['flow'] = flow_list
254         flow_data = json.dumps(fmod)
255         # print flow_data
256         flow_url = self.post_url_template % node
257         # print flow_url
258
259         if not self.auth:
260             r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False)
261         else:
262             r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'))
263
264         return r.status_code
265
266     def add_flows(self, start_flow_id, tid):
267         """
268         Adds flows into the ODL config data store. This function is executed by
269         a worker thread (the Blaster thread). The number of flows created and
270         the batch size (i.e. how many flows will be put into a RESTCONF request)
271         are determined by control parameters initialized when FlowConfigBlaster
272         is created.
273         :param start_flow_id - the ID of the first flow. Each Blaster thread
274                                programs a different set of flows
275         :param tid: Thread ID - used to id the Blaster thread when statistics
276                                 for the thread are printed out
277         :return: None
278         """
279         rqst_stats = {200: 0, 204: 0}
280         flow_stats = {200: 0, 204: 0}
281
282         s = requests.Session()
283
284         n_nodes = self.get_num_nodes(s)
285
286         with self.print_lock:
287             print '    Thread %d:\n        Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes)
288
289         nflows = 0
290         with Timer() as t:
291             while nflows < self.nflows:
292                 node_id = randrange(1, n_nodes + 1)
293                 flow_list = []
294                 for i in range(self.fpr):
295                     flow_id = tid * (self.ncycles * self.nflows) + nflows + start_flow_id + self.startflow
296                     self.flows[tid][flow_id] = node_id
297                     flow_list.append(self.create_flow_from_template(flow_id, self.ip_addr.increment()))
298                     nflows += 1
299                     if nflows >= self.nflows:
300                         break
301                 sts = self.post_flows(s, node_id, flow_list)
302                 try:
303                     rqst_stats[sts] += 1
304                     flow_stats[sts] += len(flow_list)
305                 except KeyError:
306                     rqst_stats[sts] = 1
307                     flow_stats[sts] = len(flow_list)
308
309         ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, flow_stats, t.secs)
310
311         with self.print_lock:
312             print '\n    Thread %d results (ADD): ' % tid
313             print '        Elapsed time: %.2fs,' % t.secs
314             print '        Requests/s: %.2f OK, %.2f Total' % (ok_rps, total_rps)
315             print '        Flows/s:    %.2f OK, %.2f Total' % (ok_fps, total_fps)
316             print '        Stats ({Requests}, {Flows}): ',
317             print rqst_stats,
318             print flow_stats
319             self.threads_done += 1
320
321         s.close()
322
323         with self.cond:
324             self.cond.notifyAll()
325
326     def delete_flow(self, session, node, flow_id):
327         """
328         Deletes a single flow from the ODL config data store using RESTCONF
329         :param session: 'requests' session on which to perform the POST
330         :param node: Id of the openflow node from which to delete the flow
331         :param flow_id: ID of the to-be-deleted flow
332         :return: status code from the DELETE operation
333         """
334         flow_url = self.del_url_template % (node, flow_id)
335
336         if not self.auth:
337             r = session.delete(flow_url, headers=self.getheaders)
338         else:
339             r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'))
340
341         return r.status_code
342
343     def delete_flows(self, start_flow, tid):
344         """
345         Deletes flow from the ODL config space that have been added using the
346         'add_flows()' function. This function is executed by a worker thread
347         :param start_flow_id - the ID of the first flow. Each Blaster thread
348                                deletes a different set of flows
349         :param tid: Thread ID - used to id the Blaster thread when statistics
350                                 for the thread are printed out
351         :return:
352         """
353         """
354         """
355         rqst_stats = {200: 0, 204: 0}
356
357         s = requests.Session()
358         n_nodes = self.get_num_nodes(s)
359
360         with self.print_lock:
361             print 'Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes)
362
363         with Timer() as t:
364             for flow in range(self.nflows):
365                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
366                 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id)
367                 try:
368                     rqst_stats[sts] += 1
369                 except KeyError:
370                     rqst_stats[sts] = 1
371
372         ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, rqst_stats, t.secs)
373
374         with self.print_lock:
375             print '\n    Thread %d results (DELETE): ' % tid
376             print '        Elapsed time: %.2fs,' % t.secs
377             print '        Requests/s:  %.2f OK,  %.2f Total' % (ok_rps, total_rps)
378             print '        Flows/s:     %.2f OK,  %.2f Total' % (ok_fps, total_fps)
379             print '        Stats ({Requests})',
380             print rqst_stats
381             self.threads_done += 1
382
383         s.close()
384
385         with self.cond:
386             self.cond.notifyAll()
387
388     def run_cycle(self, function):
389         """
390         Runs a flow-add or flow-delete test cycle. Each test consists of a
391         <cycles> test cycles, where <threads> worker (Blaster) threads are
392         started in each test cycle. Each Blaster thread programs <flows>
393         OpenFlow flows into the controller using the controller's RESTCONF API.
394         :param function: Add or delete, determines what test will be executed.
395         :return: None
396         """
397         self.total_ok_flows = 0
398         self.total_ok_rqsts = 0
399
400         for c in range(self.ncycles):
401             self.stats = self.FcbStats()
402             with self.print_lock:
403                 print '\nCycle %d:' % c
404
405             threads = []
406             for i in range(self.nthreads):
407                 t = threading.Thread(target=function, args=(c * self.nflows, i))
408                 threads.append(t)
409                 t.start()
410
411             # Wait for all threads to finish and measure the execution time
412             with Timer() as t:
413                 while self.threads_done < self.nthreads:
414                     with self.cond:
415                         self.cond.wait()
416
417             with self.print_lock:
418                 print '\n*** Test summary:'
419                 print '    Elapsed time:    %.2fs' % t.secs
420                 print '    Peak requests/s: %.2f OK, %.2f Total' % (
421                     self.stats.get_ok_rqst_rate(), self.stats.get_total_rqst_rate())
422                 print '    Peak flows/s:    %.2f OK, %.2f Total' % (
423                     self.stats.get_ok_flow_rate(), self.stats.get_total_flow_rate())
424                 print '    Avg. requests/s: %.2f OK, %.2f Total (%.2f%% of peak total)' % (
425                     self.stats.get_ok_rqsts() / t.secs,
426                     self.stats.get_total_rqsts() / t.secs,
427                     (self.stats.get_total_rqsts() / t.secs * 100) / self.stats.get_total_rqst_rate())
428                 print '    Avg. flows/s:    %.2f OK, %.2f Total (%.2f%% of peak total)' % (
429                     self.stats.get_ok_flows() / t.secs,
430                     self.stats.get_total_flows() / t.secs,
431                     (self.stats.get_total_flows() / t.secs * 100) / self.stats.get_total_flow_rate())
432
433                 self.total_ok_flows += self.stats.get_ok_flows()
434                 self.total_ok_rqsts += self.stats.get_ok_rqsts()
435                 self.threads_done = 0
436
437     def add_blaster(self):
438         self.run_cycle(self.add_flows)
439
440     def delete_blaster(self):
441         self.run_cycle(self.delete_flows)
442
443     def get_ok_flows(self):
444         return self.total_ok_flows
445
446     def get_ok_rqsts(self):
447         return self.total_ok_rqsts
448
449
450 def get_json_from_file(filename):
451     """
452     Get a flow programming template from a file
453     :param filename: File from which to get the template
454     :return: The json flow template (string)
455     """
456     with open(filename, 'r') as f:
457         try:
458             ft = json.load(f)
459             keys = ft['flow'][0].keys()
460             if (u'cookie' in keys) and (u'flow-name' in keys) and (u'id' in keys) and (u'match' in keys):
461                 if u'ipv4-destination' in ft[u'flow'][0]['match'].keys():
462                     print 'File "%s" ok to use as flow template' % filename
463                     return ft
464         except ValueError:
465             print 'JSON parsing of file %s failed' % filename
466             pass
467
468     return None
469
470 ###############################################################################
471 # This is an example of what the content of a JSON flow mode template should
472 # look like. Cut & paste to create a custom template. "id" and "ipv4-destination"
473 # MUST be unique if multiple flows will be programmed in the same test. It's
474 # also beneficial to have unique "cookie" and "flow-name" attributes for easier
475 # identification of the flow.
476 ###############################################################################
477 example_flow_mod_json = '''{
478     "flow": [
479         {
480             "id": "38",
481             "cookie": 38,
482             "instructions": {
483                 "instruction": [
484                     {
485                         "order": 0,
486                         "apply-actions": {
487                             "action": [
488                                 {
489                                     "order": 0,
490                                     "drop-action": { }
491                                 }
492                             ]
493                         }
494                     }
495                 ]
496             },
497             "hard-timeout": 65000,
498             "match": {
499                 "ethernet-match": {
500                     "ethernet-type": {
501                         "type": 2048
502                     }
503                 },
504                 "ipv4-destination": "10.0.0.38/32"
505             },
506             "flow-name": "TestFlow-8",
507             "strict": false,
508             "cookie_mask": 4294967295,
509             "priority": 2,
510             "table_id": 0,
511             "idle-timeout": 65000,
512             "installHw": false
513         }
514
515     ]
516 }'''
517
518 if __name__ == "__main__":
519     ############################################################################
520     # This program executes the base performance test. The test adds flows into
521     # the controller's config space. This function is basically the CLI frontend
522     # to the FlowConfigBlaster class and drives its main functions: adding and
523     # deleting flows from the controller's config data store
524     ############################################################################
525     parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
526                                                  'into the config tree, as specified by optional parameters.')
527
528     parser.add_argument('--host', default='127.0.0.1',
529                         help='Host where odl controller is running (default is 127.0.0.1)')
530     parser.add_argument('--port', default='8181',
531                         help='Port on which odl\'s RESTCONF is listening (default is 8181)')
532     parser.add_argument('--cycles', type=int, default=1,
533                         help='Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are '
534                              'performed in cycles. <THREADS> worker threads are started in each cycle and the cycle '
535                              'ends when all threads finish. Another cycle is started when the previous cycle finished.')
536     parser.add_argument('--threads', type=int, default=1,
537                         help='Number of request worker threads to start in each cycle; default=1. '
538                              'Each thread will add/delete <FLOWS> flows.')
539     parser.add_argument('--flows', type=int, default=10,
540                         help='Number of flows that will be added/deleted by each worker thread in each cycle; '
541                              'default 10')
542     parser.add_argument('--fpr', type=int, default=1,
543                         help='Flows-per-Request - number of flows (batch size) sent in each HTTP request; '
544                              'default 1')
545     parser.add_argument('--nodes', type=int, default=16,
546                         help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
547                              'flows will be evenly distributed (programmed) into connected nodes.')
548     parser.add_argument('--delay', type=int, default=0,
549                         help='Time (in seconds) to wait between the add and delete cycles; default=0')
550     parser.add_argument('--delete', dest='delete', action='store_true', default=True,
551                         help='Delete all added flows one by one, benchmark delete '
552                              'performance.')
553     parser.add_argument('--no-delete', dest='delete', action='store_false',
554                         help='Do not perform the delete cycle.')
555     parser.add_argument('--auth', dest='auth', action='store_true', default=False,
556                         help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
557                              'default: no authentication')
558     parser.add_argument('--startflow', type=int, default=0,
559                         help='The starting Flow ID; default=0')
560     parser.add_argument('--file', default='',
561                         help='File from which to read the JSON flow template; default: no file, use a built in '
562                              'template.')
563
564     in_args = parser.parse_args()
565
566     if in_args.file != '':
567         flow_template = get_json_from_file(in_args.file)
568     else:
569         flow_template = None
570
571     fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.fpr, in_args.nodes,
572                             in_args.flows, in_args.startflow, in_args.auth)
573
574     # Run through <cycles>, where <threads> are started in each cycle and
575     # <flows> are added from each thread
576     fct.add_blaster()
577
578     print '\n*** Total flows added: %s' % fct.get_ok_flows()
579     print '    HTTP[OK] results:  %d\n' % fct.get_ok_rqsts()
580
581     if in_args.delay > 0:
582         print '*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay
583         time.sleep(in_args.delay)
584
585     # Run through <cycles>, where <threads> are started in each cycle and
586     # <flows> previously added in an add cycle are deleted in each thread
587     if in_args.delete:
588         fct.delete_blaster()
589         print '\n*** Total flows deleted: %s' % fct.get_ok_flows()
590         print '    HTTP[OK] results:    %d\n' % fct.get_ok_rqsts()