a07748150cbe768f743b9e537069725ef9f71f87
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / flow_config_blaster.py
1 #!/usr/bin/python
2
3 from random import randrange
4 import json
5 import argparse
6 import time
7 import threading
8 import re
9 import copy
10
11 import requests
12 import netaddr
13
14
15 __author__ = "Jan Medved"
16 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
17 __license__ = "New-style BSD"
18 __email__ = "jmedved@cisco.com"
19
20
21 class Counter(object):
22     def __init__(self, start=0):
23         self.lock = threading.Lock()
24         self.value = start
25
26     def increment(self, value=1):
27         self.lock.acquire()
28         val = self.value
29         try:
30             self.value += value
31         finally:
32             self.lock.release()
33         return val
34
35
36 class Timer(object):
37     def __init__(self, verbose=False):
38         self.verbose = verbose
39
40     def __enter__(self):
41         self.start = time.time()
42         return self
43
44     def __exit__(self, *args):
45         self.end = time.time()
46         self.secs = self.end - self.start
47         self.msecs = self.secs * 1000  # millisecs
48         if self.verbose:
49             print("elapsed time: %f ms" % self.msecs)
50
51
52 class FlowConfigBlaster(object):
53     putheaders = {'content-type': 'application/json'}
54     getheaders = {'Accept': 'application/json'}
55
56     FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
57     TBLURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0"
58     INVURL = 'restconf/operational/opendaylight-inventory:nodes'
59     TIMEOUT = 10
60
61     flows = {}
62
63     # The "built-in" flow template
64     flow_mode_template = {
65         u'flow': [
66             {
67                 u'hard-timeout': 65000,
68                 u'idle-timeout': 65000,
69                 u'cookie_mask': 4294967295,
70                 u'flow-name': u'FLOW-NAME-TEMPLATE',
71                 u'priority': 2,
72                 u'strict': False,
73                 u'cookie': 0,
74                 u'table_id': 0,
75                 u'installHw': False,
76                 u'id': u'FLOW-ID-TEMPLATE',
77                 u'match': {
78                     u'ipv4-destination': u'0.0.0.0/32',
79                     u'ethernet-match': {
80                         u'ethernet-type': {
81                             u'type': 2048
82                         }
83                     }
84                 },
85                 u'instructions': {
86                     u'instruction': [
87                         {
88                             u'order': 0,
89                             u'apply-actions': {
90                                 u'action': [
91                                     {
92                                         u'drop-action': {},
93                                         u'order': 0
94                                     }
95                                 ]
96                             }
97                         }
98                     ]
99                 }
100             }
101         ]
102     }
103
104     class FcbStats(object):
105         """
106         FlowConfigBlaster Statistics: a class that stores and further processes
107         statistics collected by Blaster worker threads during their execution.
108         """
109
110         def __init__(self):
111             self.ok_rqst_rate = Counter(0.0)
112             self.total_rqst_rate = Counter(0.0)
113             self.ok_flow_rate = Counter(0.0)
114             self.total_flow_rate = Counter(0.0)
115             self.ok_rqsts = Counter(0)
116             self.total_rqsts = Counter(0)
117             self.ok_flows = Counter(0)
118             self.total_flows = Counter(0)
119
120         def process_stats(self, rqst_stats, flow_stats, elapsed_time):
121             """
122             Calculates the stats for RESTCONF request and flow programming
123             throughput, and aggregates statistics across all Blaster threads.
124
125             Args:
126                 rqst_stats: Request statistics dictionary
127                 flow_stats: Flow statistcis dictionary
128                 elapsed_time: Elapsed time for the test
129
130             Returns: Rates (requests/sec) for successfully finished requests,
131                      the total number of requests, sucessfully installed flow and
132                      the total number of flows
133             """
134             ok_rqsts = rqst_stats[200] + rqst_stats[204]
135             total_rqsts = sum(rqst_stats.values())
136             ok_flows = flow_stats[200] + flow_stats[204]
137             total_flows = sum(flow_stats.values())
138
139             ok_rqst_rate = ok_rqsts / elapsed_time
140             total_rqst_rate = total_rqsts / elapsed_time
141             ok_flow_rate = ok_flows / elapsed_time
142             total_flow_rate = total_flows / elapsed_time
143
144             self.ok_rqsts.increment(ok_rqsts)
145             self.total_rqsts.increment(total_rqsts)
146             self.ok_flows.increment(ok_flows)
147             self.total_flows.increment(total_flows)
148
149             self.ok_rqst_rate.increment(ok_rqst_rate)
150             self.total_rqst_rate.increment(total_rqst_rate)
151             self.ok_flow_rate.increment(ok_flow_rate)
152             self.total_flow_rate.increment(total_flow_rate)
153
154             return ok_rqst_rate, total_rqst_rate, ok_flow_rate, total_flow_rate
155
156         def get_ok_rqst_rate(self):
157             return self.ok_rqst_rate.value
158
159         def get_total_rqst_rate(self):
160             return self.total_rqst_rate.value
161
162         def get_ok_flow_rate(self):
163             return self.ok_flow_rate.value
164
165         def get_total_flow_rate(self):
166             return self.total_flow_rate.value
167
168         def get_ok_rqsts(self):
169             return self.ok_rqsts.value
170
171         def get_total_rqsts(self):
172             return self.total_rqsts.value
173
174         def get_ok_flows(self):
175             return self.ok_flows.value
176
177         def get_total_flows(self):
178             return self.total_flows.value
179
180     def __init__(self, host, port, ncycles, nthreads, fpr, nnodes, nflows, startflow, auth, flow_mod_template=None):
181         self.host = host
182         self.port = port
183         self.ncycles = ncycles
184         self.nthreads = nthreads
185         self.fpr = fpr
186         self.nnodes = nnodes
187         self.nflows = nflows
188         self.startflow = startflow
189         self.auth = auth
190
191         if flow_mod_template:
192             self.flow_mode_template = flow_mod_template
193
194         self.post_url_template = 'http://%s:' + self.port + '/' + self.TBLURL
195         self.del_url_template = 'http://%s:' + self.port + '/' + self.FLWURL
196
197         self.stats = self.FcbStats()
198         self.total_ok_flows = 0
199         self.total_ok_rqsts = 0
200
201         self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
202
203         self.print_lock = threading.Lock()
204         self.cond = threading.Condition()
205         self.threads_done = 0
206
207         for i in range(self.nthreads):
208             self.flows[i] = {}
209
210     def get_num_nodes(self, session):
211         """
212         Determines the number of OF nodes in the connected mininet network. If
213         mininet is not connected, the default number of flows is set to 16.
214         :param session: 'requests' session which to use to query the controller
215                         for openflow nodes
216         :return: None
217         """
218         hosts = self.host.split(",")
219         host = hosts[0]
220         inventory_url = 'http://' + host + ":" + self.port + '/' + self.INVURL
221         nodes = self.nnodes
222
223         if not self.auth:
224             r = session.get(inventory_url, headers=self.getheaders, stream=False, timeout=self.TIMEOUT)
225         else:
226             r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'),
227                             timeout=self.TIMEOUT)
228
229         if r.status_code == 200:
230             try:
231                 inv = json.loads(r.content)['nodes']['node']
232                 nn = 0
233                 for n in range(len(inv)):
234                     if re.search('openflow', inv[n]['id']) is not None:
235                         nn += 1
236                 if nn != 0:
237                     nodes = nn
238             except KeyError:
239                 pass
240
241         return nodes
242
243     def create_flow_from_template(self, flow_id, ipaddr, node_id):
244         """
245         Create a new flow instance from the flow template specified during
246         FlowConfigBlaster instantiation. Flow templates are json-compatible
247         dictionaries that MUST contain elements for flow cookie, flow name,
248         flow id and the destination IPv4 address in the flow match field.
249
250         Args:
251             flow_id: Id for the new flow to create
252             ipaddr: IP Address to put into the flow's match
253             node_id: ID of the node where to create the flow
254
255         Returns: The flow that gas been created from the template
256
257         """
258         flow = copy.deepcopy(self.flow_mode_template['flow'][0])
259         flow['cookie'] = flow_id
260         flow['flow-name'] = self.create_flow_name(flow_id)
261         flow['id'] = str(flow_id)
262         flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ipaddr))
263         return flow
264
265     def post_flows(self, session, node, flow_list, flow_count):
266         """
267         Performs a RESTCONF post of flows passed in the 'flow_list' parameters
268         :param session: 'requests' session on which to perform the POST
269         :param node: The ID of the openflow node to which to post the flows
270         :param flow_list: List of flows (in dictionary form) to POST
271         :param flow_count: Flow counter for round-robin host load balancing
272
273         :return: status code from the POST operation
274         """
275         flow_data = self.convert_to_json(flow_list, node)
276
277         hosts = self.host.split(",")
278         host = hosts[flow_count % len(hosts)]
279         flow_url = self.assemble_post_url(host, node)
280
281         if not self.auth:
282             r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, timeout=self.TIMEOUT)
283         else:
284             r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'),
285                              timeout=self.TIMEOUT)
286
287         return r.status_code
288
289     def assemble_post_url(self, host, node):
290         """
291         Creates url pointing to config dataStore: /nodes/node/<node-id>/table/<table-id>
292         :param host: ip address or host name pointing to controller
293         :param node: id of node (without protocol prefix and colon)
294         :return: url suitable for sending a flow to controller via POST method
295         """
296         return self.post_url_template % (host, node)
297
298     def convert_to_json(self, flow_list, node_id=None):
299         """
300         Dumps flows to json form.
301         :param flow_list: list of flows in json friendly structure
302         :param node_id: node identifier of corresponding node
303         :return: string containing plain json
304         """
305         fmod = dict(self.flow_mode_template)
306         fmod['flow'] = flow_list
307         flow_data = json.dumps(fmod)
308         return flow_data
309
310     def add_flows(self, start_flow_id, tid):
311         """
312         Adds flows into the ODL config data store. This function is executed by
313         a worker thread (the Blaster thread). The number of flows created and
314         the batch size (i.e. how many flows will be put into a RESTCONF request)
315         are determined by control parameters initialized when FlowConfigBlaster
316         is created.
317         :param start_flow_id - the ID of the first flow. Each Blaster thread
318                                programs a different set of flows
319         :param tid: Thread ID - used to id the Blaster thread when statistics
320                                 for the thread are printed out
321         :return: None
322         """
323         rqst_stats = {200: 0, 204: 0}
324         flow_stats = {200: 0, 204: 0}
325
326         s = requests.Session()
327
328         n_nodes = self.get_num_nodes(s)
329
330         with self.print_lock:
331             print('    Thread %d:\n        Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes))
332
333         nflows = 0
334         nb_actions = []
335         while nflows < self.nflows:
336             node_id = randrange(1, n_nodes + 1)
337             flow_list = []
338             for i in range(self.fpr):
339                 flow_id = tid * (self.ncycles * self.nflows) + nflows + start_flow_id + self.startflow
340                 self.flows[tid][flow_id] = node_id
341                 flow_list.append(self.create_flow_from_template(flow_id, self.ip_addr.increment(), node_id))
342                 nflows += 1
343                 if nflows >= self.nflows:
344                     break
345             nb_actions.append((s, node_id, flow_list, nflows))
346
347         with Timer() as t:
348             for nb_action in nb_actions:
349                 sts = self.post_flows(*nb_action)
350                 try:
351                     rqst_stats[sts] += 1
352                     flow_stats[sts] += len(nb_action[2])
353                 except KeyError:
354                     rqst_stats[sts] = 1
355                     flow_stats[sts] = len(nb_action[2])
356
357         ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, flow_stats, t.secs)
358
359         with self.print_lock:
360             print('\n    Thread %d results (ADD): ' % tid)
361             print('        Elapsed time: %.2fs,' % t.secs)
362             print('        Requests/s: %.2f OK, %.2f Total' % (ok_rps, total_rps))
363             print('        Flows/s:    %.2f OK, %.2f Total' % (ok_fps, total_fps))
364             print('        Stats ({Requests}, {Flows}): ')
365             print(rqst_stats,)
366             print(flow_stats)
367             self.threads_done += 1
368
369         s.close()
370
371         with self.cond:
372             self.cond.notifyAll()
373
374     def delete_flow(self, session, node, flow_id, flow_count):
375         """
376         Deletes a single flow from the ODL config data store using RESTCONF
377         Args:
378             session: 'requests' session on which to perform the POST
379             node:  Id of the openflow node from which to delete the flow
380             flow_id: ID of the to-be-deleted flow
381             flow_count: Index of the flow being processed (for round-robin LB)
382
383         Returns: status code from the DELETE operation
384
385         """
386
387         hosts = self.host.split(",")
388         host = hosts[flow_count % len(hosts)]
389         flow_url = self.del_url_template % (host, node, flow_id)
390
391         if not self.auth:
392             r = session.delete(flow_url, headers=self.getheaders, timeout=self.TIMEOUT)
393         else:
394             r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'), timeout=self.TIMEOUT)
395
396         return r.status_code
397
398     def delete_flows(self, start_flow, tid):
399         """
400         Deletes flow from the ODL config space that have been added using the
401         'add_flows()' function. This function is executed by a worker thread
402         :param start_flow - the ID of the first flow. Each Blaster thread
403                                deletes a different set of flows
404         :param tid: Thread ID - used to id the Blaster thread when statistics
405                                 for the thread are printed out
406         :return:
407         """
408
409         rqst_stats = {200: 0, 204: 0}
410
411         s = requests.Session()
412         n_nodes = self.get_num_nodes(s)
413
414         with self.print_lock:
415             print('Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes))
416
417         with Timer() as t:
418             for flow in range(self.nflows):
419                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
420                 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id, flow)
421                 try:
422                     rqst_stats[sts] += 1
423                 except KeyError:
424                     rqst_stats[sts] = 1
425
426         ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, rqst_stats, t.secs)
427
428         with self.print_lock:
429             print('\n    Thread %d results (DELETE): ' % tid)
430             print('        Elapsed time: %.2fs,' % t.secs)
431             print('        Requests/s:  %.2f OK,  %.2f Total' % (ok_rps, total_rps))
432             print('        Flows/s:     %.2f OK,  %.2f Total' % (ok_fps, total_fps))
433             print('        Stats ({Requests})',)
434             print(rqst_stats)
435             self.threads_done += 1
436
437         s.close()
438
439         with self.cond:
440             self.cond.notifyAll()
441
442     def run_cycle(self, function):
443         """
444         Runs a flow-add or flow-delete test cycle. Each test consists of a
445         <cycles> test cycles, where <threads> worker (Blaster) threads are
446         started in each test cycle. Each Blaster thread programs <flows>
447         OpenFlow flows into the controller using the controller's RESTCONF API.
448         :param function: Add or delete, determines what test will be executed.
449         :return: None
450         """
451         self.total_ok_flows = 0
452         self.total_ok_rqsts = 0
453
454         for c in range(self.ncycles):
455             self.stats = self.FcbStats()
456             with self.print_lock:
457                 print('\nCycle %d:' % c)
458
459             threads = []
460             for i in range(self.nthreads):
461                 t = threading.Thread(target=function, args=(c * self.nflows, i))
462                 threads.append(t)
463                 t.start()
464
465             # Wait for all threads to finish and measure the execution time
466             with Timer() as t:
467                 for thread in threads:
468                     thread.join()
469
470             with self.print_lock:
471                 print('\n*** Test summary:')
472                 print('    Elapsed time:    %.2fs' % t.secs)
473                 print('    Peak requests/s: %.2f OK, %.2f Total' % (
474                     self.stats.get_ok_rqst_rate(), self.stats.get_total_rqst_rate()))
475                 print('    Peak flows/s:    %.2f OK, %.2f Total' % (
476                     self.stats.get_ok_flow_rate(), self.stats.get_total_flow_rate()))
477                 print('    Avg. requests/s: %.2f OK, %.2f Total (%.2f%% of peak total)' % (
478                     self.stats.get_ok_rqsts() / t.secs,
479                     self.stats.get_total_rqsts() / t.secs,
480                     (self.stats.get_total_rqsts() / t.secs * 100) / self.stats.get_total_rqst_rate()))
481                 print('    Avg. flows/s:    %.2f OK, %.2f Total (%.2f%% of peak total)' % (
482                       self.stats.get_ok_flows() / t.secs,
483                       self.stats.get_total_flows() / t.secs,
484                       (self.stats.get_total_flows() / t.secs * 100) / self.stats.get_total_flow_rate()))
485
486                 self.total_ok_flows += self.stats.get_ok_flows()
487                 self.total_ok_rqsts += self.stats.get_ok_rqsts()
488                 self.threads_done = 0
489
490     def add_blaster(self):
491         self.run_cycle(self.add_flows)
492
493     def delete_blaster(self):
494         self.run_cycle(self.delete_flows)
495
496     def get_ok_flows(self):
497         return self.total_ok_flows
498
499     def get_ok_rqsts(self):
500         return self.total_ok_rqsts
501
502     def create_flow_name(self, flow_id):
503         return 'TestFlow-%d' % flow_id
504
505
506 def get_json_from_file(filename):
507     """
508     Get a flow programming template from a file
509     :param filename: File from which to get the template
510     :return: The json flow template (string)
511     """
512     with open(filename, 'r') as f:
513         try:
514             ft = json.load(f)
515             keys = ft['flow'][0].keys()
516             if (u'cookie' in keys) and (u'flow-name' in keys) and (u'id' in keys) and (u'match' in keys):
517                 if u'ipv4-destination' in ft[u'flow'][0]['match'].keys():
518                     print('File "%s" ok to use as flow template' % filename)
519                     return ft
520         except ValueError:
521             print('JSON parsing of file %s failed' % filename)
522             pass
523
524     return None
525
526
527 ###############################################################################
528 # This is an example of what the content of a JSON flow mode template should
529 # look like. Cut & paste to create a custom template. "id" and "ipv4-destination"
530 # MUST be unique if multiple flows will be programmed in the same test. It's
531 # also beneficial to have unique "cookie" and "flow-name" attributes for easier
532 # identification of the flow.
533 ###############################################################################
534 example_flow_mod_json = '''{
535     "flow": [
536         {
537             "id": "38",
538             "cookie": 38,
539             "instructions": {
540                 "instruction": [
541                     {
542                         "order": 0,
543                         "apply-actions": {
544                             "action": [
545                                 {
546                                     "order": 0,
547                                     "drop-action": { }
548                                 }
549                             ]
550                         }
551                     }
552                 ]
553             },
554             "hard-timeout": 65000,
555             "match": {
556                 "ethernet-match": {
557                     "ethernet-type": {
558                         "type": 2048
559                     }
560                 },
561                 "ipv4-destination": "10.0.0.38/32"
562             },
563             "flow-name": "TestFlow-8",
564             "strict": false,
565             "cookie_mask": 4294967295,
566             "priority": 2,
567             "table_id": 0,
568             "idle-timeout": 65000,
569             "installHw": false
570         }
571
572     ]
573 }'''
574
575
576 def create_arguments_parser():
577     """
578     Shorthand to arg parser on library level in order to access and eventually enhance in ancestors.
579     :return: argument parser supporting config blaster arguments and parameters
580     """
581     my_parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then'
582                                                     ' deletes flows into the config tree, as specified by'
583                                                     ' optional parameters.')
584
585     my_parser.add_argument('--host', default='127.0.0.1',
586                            help='Host where odl controller is running (default is 127.0.0.1).  '
587                                 'Specify a comma-separated list of hosts to perform round-robin load-balancing.')
588     my_parser.add_argument('--port', default='8181',
589                            help='Port on which odl\'s RESTCONF is listening (default is 8181)')
590     my_parser.add_argument('--cycles', type=int, default=1,
591                            help='Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are '
592                                 'performed in cycles. <THREADS> worker threads are started in each cycle and the cycle '
593                                 'ends when all threads finish. Another cycle is started when the previous cycle '
594                                 'finished.')
595     my_parser.add_argument('--threads', type=int, default=1,
596                            help='Number of request worker threads to start in each cycle; default=1. '
597                                 'Each thread will add/delete <FLOWS> flows.')
598     my_parser.add_argument('--flows', type=int, default=10,
599                            help='Number of flows that will be added/deleted by each worker thread in each cycle; '
600                                 'default 10')
601     my_parser.add_argument('--fpr', type=int, default=1,
602                            help='Flows-per-Request - number of flows (batch size) sent in each HTTP request; '
603                                 'default 1')
604     my_parser.add_argument('--nodes', type=int, default=16,
605                            help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
606                                 'flows will be evenly distributed (programmed) into connected nodes.')
607     my_parser.add_argument('--delay', type=int, default=0,
608                            help='Time (in seconds) to wait between the add and delete cycles; default=0')
609     my_parser.add_argument('--delete', dest='delete', action='store_true', default=True,
610                            help='Delete all added flows one by one, benchmark delete '
611                                 'performance.')
612     my_parser.add_argument('--no-delete', dest='delete', action='store_false',
613                            help='Do not perform the delete cycle.')
614     my_parser.add_argument('--auth', dest='auth', action='store_true', default=False,
615                            help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
616                                 'default: no authentication')
617     my_parser.add_argument('--startflow', type=int, default=0,
618                            help='The starting Flow ID; default=0')
619     my_parser.add_argument('--file', default='',
620                            help='File from which to read the JSON flow template; default: no file, use a built in '
621                                 'template.')
622     return my_parser
623
624
625 if __name__ == "__main__":
626     ############################################################################
627     # This program executes the base performance test. The test adds flows into
628     # the controller's config space. This function is basically the CLI frontend
629     # to the FlowConfigBlaster class and drives its main functions: adding and
630     # deleting flows from the controller's config data store
631     ############################################################################
632
633     parser = create_arguments_parser()
634     in_args = parser.parse_args()
635
636     if in_args.file != '':
637         flow_template = get_json_from_file(in_args.file)
638     else:
639         flow_template = None
640
641     fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.fpr, in_args.nodes,
642                             in_args.flows, in_args.startflow, in_args.auth)
643
644     # Run through <cycles>, where <threads> are started in each cycle and
645     # <flows> are added from each thread
646     fct.add_blaster()
647
648     print('\n*** Total flows added: %s' % fct.get_ok_flows())
649     print('    HTTP[OK] results:  %d\n' % fct.get_ok_rqsts())
650
651     if in_args.delay > 0:
652         print('*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay)
653         time.sleep(in_args.delay)
654
655     # Run through <cycles>, where <threads> are started in each cycle and
656     # <flows> previously added in an add cycle are deleted in each thread
657     if in_args.delete:
658         fct.delete_blaster()
659         print('\n*** Total flows deleted: %s' % fct.get_ok_flows())
660         print('    HTTP[OK] results:    %d\n' % fct.get_ok_rqsts())