Step 2: Move test folder to root
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / flow_config_blaster.py
1 #!/usr/bin/python
2 __author__ = "Jan Medved"
3 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
4 __license__ = "New-style BSD"
5 __email__ = "jmedved@cisco.com"
6
7 from random import randrange
8 import json
9 import argparse
10 import time
11 import threading
12 import re
13 import copy
14
15 import requests
16 import netaddr
17
18
19 class Counter(object):
20     def __init__(self, start=0):
21         self.lock = threading.Lock()
22         self.value = start
23
24     def increment(self, value=1):
25         self.lock.acquire()
26         val = self.value
27         try:
28             self.value += value
29         finally:
30             self.lock.release()
31         return val
32
33
34 class Timer(object):
35     def __init__(self, verbose=False):
36         self.verbose = verbose
37
38     def __enter__(self):
39         self.start = time.time()
40         return self
41
42     def __exit__(self, *args):
43         self.end = time.time()
44         self.secs = self.end - self.start
45         self.msecs = self.secs * 1000  # millisecs
46         if self.verbose:
47             print ("elapsed time: %f ms" % self.msecs)
48
49
50 class FlowConfigBlaster(object):
51     putheaders = {'content-type': 'application/json'}
52     getheaders = {'Accept': 'application/json'}
53
54     FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
55     TBLURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0"
56     INVURL = 'restconf/operational/opendaylight-inventory:nodes'
57     TIMEOUT = 10
58
59     flows = {}
60
61     # The "built-in" flow template
62     flow_mode_template = {
63         u'flow': [
64             {
65                 u'hard-timeout': 65000,
66                 u'idle-timeout': 65000,
67                 u'cookie_mask': 4294967295,
68                 u'flow-name': u'FLOW-NAME-TEMPLATE',
69                 u'priority': 2,
70                 u'strict': False,
71                 u'cookie': 0,
72                 u'table_id': 0,
73                 u'installHw': False,
74                 u'id': u'FLOW-ID-TEMPLATE',
75                 u'match': {
76                     u'ipv4-destination': u'0.0.0.0/32',
77                     u'ethernet-match': {
78                         u'ethernet-type': {
79                             u'type': 2048
80                         }
81                     }
82                 },
83                 u'instructions': {
84                     u'instruction': [
85                         {
86                             u'order': 0,
87                             u'apply-actions': {
88                                 u'action': [
89                                     {
90                                         u'drop-action': {},
91                                         u'order': 0
92                                     }
93                                 ]
94                             }
95                         }
96                     ]
97                 }
98             }
99         ]
100     }
101
102     class FcbStats(object):
103         """
104         FlowConfigBlaster Statistics: a class that stores and further processes
105         statistics collected by Blaster worker threads during their execution.
106         """
107         def __init__(self):
108             self.ok_rqst_rate = Counter(0.0)
109             self.total_rqst_rate = Counter(0.0)
110             self.ok_flow_rate = Counter(0.0)
111             self.total_flow_rate = Counter(0.0)
112             self.ok_rqsts = Counter(0)
113             self.total_rqsts = Counter(0)
114             self.ok_flows = Counter(0)
115             self.total_flows = Counter(0)
116
117         def process_stats(self, rqst_stats, flow_stats, elapsed_time):
118             """
119             Calculates the stats for RESTCONF request and flow programming
120             throughput, and aggregates statistics across all Blaster threads.
121             """
122             ok_rqsts = rqst_stats[200] + rqst_stats[204]
123             total_rqsts = sum(rqst_stats.values())
124             ok_flows = flow_stats[200] + flow_stats[204]
125             total_flows = sum(flow_stats.values())
126
127             ok_rqst_rate = ok_rqsts / elapsed_time
128             total_rqst_rate = total_rqsts / elapsed_time
129             ok_flow_rate = ok_flows / elapsed_time
130             total_flow_rate = total_flows / elapsed_time
131
132             self.ok_rqsts.increment(ok_rqsts)
133             self.total_rqsts.increment(total_rqsts)
134             self.ok_flows.increment(ok_flows)
135             self.total_flows.increment(total_flows)
136
137             self.ok_rqst_rate.increment(ok_rqst_rate)
138             self.total_rqst_rate.increment(total_rqst_rate)
139             self.ok_flow_rate.increment(ok_flow_rate)
140             self.total_flow_rate.increment(total_flow_rate)
141
142             return ok_rqst_rate, total_rqst_rate, ok_flow_rate, total_flow_rate
143
144         def get_ok_rqst_rate(self):
145             return self.ok_rqst_rate.value
146
147         def get_total_rqst_rate(self):
148             return self.total_rqst_rate.value
149
150         def get_ok_flow_rate(self):
151             return self.ok_flow_rate.value
152
153         def get_total_flow_rate(self):
154             return self.total_flow_rate.value
155
156         def get_ok_rqsts(self):
157             return self.ok_rqsts.value
158
159         def get_total_rqsts(self):
160             return self.total_rqsts.value
161
162         def get_ok_flows(self):
163             return self.ok_flows.value
164
165         def get_total_flows(self):
166             return self.total_flows.value
167
168     def __init__(self, host, port, ncycles, nthreads, fpr, nnodes, nflows, startflow, auth, flow_mod_template=None):
169         self.host = host
170         self.port = port
171         self.ncycles = ncycles
172         self.nthreads = nthreads
173         self.fpr = fpr
174         self.nnodes = nnodes
175         self.nflows = nflows
176         self.startflow = startflow
177         self.auth = auth
178
179         if flow_mod_template:
180             self.flow_mode_template = flow_mod_template
181
182         self.post_url_template = 'http://%s:' + self.port + '/' + self.TBLURL
183         self.del_url_template = 'http://%s:' + self.port + '/' + self.FLWURL
184
185         self.stats = self.FcbStats()
186         self.total_ok_flows = 0
187         self.total_ok_rqsts = 0
188
189         self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
190
191         self.print_lock = threading.Lock()
192         self.cond = threading.Condition()
193         self.threads_done = 0
194
195         for i in range(self.nthreads):
196             self.flows[i] = {}
197
198     def get_num_nodes(self, session):
199         """
200         Determines the number of OF nodes in the connected mininet network. If
201         mininet is not connected, the default number of flows is set to 16.
202         :param session: 'requests' session which to use to query the controller
203                         for openflow nodes
204         :return: None
205         """
206         hosts = self.host.split(",")
207         host = hosts[0]
208         inventory_url = 'http://' + host + ":" + self.port + '/' + self.INVURL
209         nodes = self.nnodes
210
211         if not self.auth:
212             r = session.get(inventory_url, headers=self.getheaders, stream=False, timeout=self.TIMEOUT)
213         else:
214             r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'),
215                             timeout=self.TIMEOUT)
216
217         if r.status_code == 200:
218             try:
219                 inv = json.loads(r.content)['nodes']['node']
220                 nn = 0
221                 for n in range(len(inv)):
222                     if re.search('openflow', inv[n]['id']) is not None:
223                         nn += 1
224                 if nn != 0:
225                     nodes = nn
226             except KeyError:
227                 pass
228
229         return nodes
230
231     def create_flow_from_template(self, flow_id, ipaddr, node_id):
232         """
233         Create a new flow instance from the flow template specified during
234         FlowConfigBlaster instantiation. Flow templates are json-compatible
235         dictionaries that MUST contain elements for flow cookie, flow name,
236         flow id and the destination IPv4 address in the flow match field.
237         :param flow_id: Id for the new flow to create
238         :param ipaddr: IP Address to put into the flow's match
239         :return: The newly created flow instance
240         """
241         flow = copy.deepcopy(self.flow_mode_template['flow'][0])
242         flow['cookie'] = flow_id
243         flow['flow-name'] = 'TestFlow-%d' % flow_id
244         flow['id'] = str(flow_id)
245         flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ipaddr))
246         return flow
247
248     def post_flows(self, session, node, flow_list, flow_count):
249         """
250         Performs a RESTCONF post of flows passed in the 'flow_list' parameters
251         :param session: 'requests' session on which to perform the POST
252         :param node: The ID of the openflow node to which to post the flows
253         :param flow_list: List of flows (in dictionary form) to POST
254         :return: status code from the POST operation
255         """
256         flow_data = self.convert_to_json(flow_list, node)
257
258         hosts = self.host.split(",")
259         host = hosts[flow_count % len(hosts)]
260         flow_url = self.assemble_post_url(host, node)
261         # print flow_url
262
263         if not self.auth:
264             r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, timeout=self.TIMEOUT)
265         else:
266             r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'),
267                              timeout=self.TIMEOUT)
268
269         return r.status_code
270
271     def assemble_post_url(self, host, node):
272         """
273         Creates url pointing to config dataStore: /nodes/node/<node-id>/table/<table-id>
274         :param host: ip address or host name pointing to controller
275         :param node: id of node (without protocol prefix and colon)
276         :return: url suitable for sending a flow to controller via POST method
277         """
278         return self.post_url_template % (host, node)
279
280     def convert_to_json(self, flow_list, node_id=None):
281         """
282         Dumps flows to json form.
283         :param flow_list: list of flows in json friendly structure
284         :param node_id: node identifier of corresponding node
285         :return: string containing plain json
286         """
287         fmod = dict(self.flow_mode_template)
288         fmod['flow'] = flow_list
289         flow_data = json.dumps(fmod)
290         # print flow_data
291         return flow_data
292
293     def add_flows(self, start_flow_id, tid):
294         """
295         Adds flows into the ODL config data store. This function is executed by
296         a worker thread (the Blaster thread). The number of flows created and
297         the batch size (i.e. how many flows will be put into a RESTCONF request)
298         are determined by control parameters initialized when FlowConfigBlaster
299         is created.
300         :param start_flow_id - the ID of the first flow. Each Blaster thread
301                                programs a different set of flows
302         :param tid: Thread ID - used to id the Blaster thread when statistics
303                                 for the thread are printed out
304         :return: None
305         """
306         rqst_stats = {200: 0, 204: 0}
307         flow_stats = {200: 0, 204: 0}
308
309         s = requests.Session()
310
311         n_nodes = self.get_num_nodes(s)
312
313         with self.print_lock:
314             print '    Thread %d:\n        Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes)
315
316         nflows = 0
317         with Timer() as t:
318             while nflows < self.nflows:
319                 node_id = randrange(1, n_nodes + 1)
320                 flow_list = []
321                 for i in range(self.fpr):
322                     flow_id = tid * (self.ncycles * self.nflows) + nflows + start_flow_id + self.startflow
323                     self.flows[tid][flow_id] = node_id
324                     flow_list.append(self.create_flow_from_template(flow_id, self.ip_addr.increment(), node_id))
325                     nflows += 1
326                     if nflows >= self.nflows:
327                         break
328                 sts = self.post_flows(s, node_id, flow_list, nflows)
329                 try:
330                     rqst_stats[sts] += 1
331                     flow_stats[sts] += len(flow_list)
332                 except KeyError:
333                     rqst_stats[sts] = 1
334                     flow_stats[sts] = len(flow_list)
335
336         ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, flow_stats, t.secs)
337
338         with self.print_lock:
339             print '\n    Thread %d results (ADD): ' % tid
340             print '        Elapsed time: %.2fs,' % t.secs
341             print '        Requests/s: %.2f OK, %.2f Total' % (ok_rps, total_rps)
342             print '        Flows/s:    %.2f OK, %.2f Total' % (ok_fps, total_fps)
343             print '        Stats ({Requests}, {Flows}): ',
344             print rqst_stats,
345             print flow_stats
346             self.threads_done += 1
347
348         s.close()
349
350         with self.cond:
351             self.cond.notifyAll()
352
353     def delete_flow(self, session, node, flow_id, flow_count):
354         """
355         Deletes a single flow from the ODL config data store using RESTCONF
356         :param session: 'requests' session on which to perform the POST
357         :param node: Id of the openflow node from which to delete the flow
358         :param flow_id: ID of the to-be-deleted flow
359         :return: status code from the DELETE operation
360         """
361
362         hosts = self.host.split(",")
363         host = hosts[flow_count % len(hosts)]
364         flow_url = self.del_url_template % (host, node, flow_id)
365         # print flow_url
366
367         if not self.auth:
368             r = session.delete(flow_url, headers=self.getheaders, timeout=self.TIMEOUT)
369         else:
370             r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'), timeout=self.TIMEOUT)
371
372         return r.status_code
373
374     def delete_flows(self, start_flow, tid):
375         """
376         Deletes flow from the ODL config space that have been added using the
377         'add_flows()' function. This function is executed by a worker thread
378         :param start_flow - the ID of the first flow. Each Blaster thread
379                                deletes a different set of flows
380         :param tid: Thread ID - used to id the Blaster thread when statistics
381                                 for the thread are printed out
382         :return:
383         """
384
385         rqst_stats = {200: 0, 204: 0}
386
387         s = requests.Session()
388         n_nodes = self.get_num_nodes(s)
389
390         with self.print_lock:
391             print 'Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes)
392
393         with Timer() as t:
394             for flow in range(self.nflows):
395                 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
396                 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id, flow)
397                 try:
398                     rqst_stats[sts] += 1
399                 except KeyError:
400                     rqst_stats[sts] = 1
401
402         ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, rqst_stats, t.secs)
403
404         with self.print_lock:
405             print '\n    Thread %d results (DELETE): ' % tid
406             print '        Elapsed time: %.2fs,' % t.secs
407             print '        Requests/s:  %.2f OK,  %.2f Total' % (ok_rps, total_rps)
408             print '        Flows/s:     %.2f OK,  %.2f Total' % (ok_fps, total_fps)
409             print '        Stats ({Requests})',
410             print rqst_stats
411             self.threads_done += 1
412
413         s.close()
414
415         with self.cond:
416             self.cond.notifyAll()
417
418     def run_cycle(self, function):
419         """
420         Runs a flow-add or flow-delete test cycle. Each test consists of a
421         <cycles> test cycles, where <threads> worker (Blaster) threads are
422         started in each test cycle. Each Blaster thread programs <flows>
423         OpenFlow flows into the controller using the controller's RESTCONF API.
424         :param function: Add or delete, determines what test will be executed.
425         :return: None
426         """
427         self.total_ok_flows = 0
428         self.total_ok_rqsts = 0
429
430         for c in range(self.ncycles):
431             self.stats = self.FcbStats()
432             with self.print_lock:
433                 print '\nCycle %d:' % c
434
435             threads = []
436             for i in range(self.nthreads):
437                 t = threading.Thread(target=function, args=(c * self.nflows, i))
438                 threads.append(t)
439                 t.start()
440
441             # Wait for all threads to finish and measure the execution time
442             with Timer() as t:
443                 for thread in threads:
444                     thread.join()
445
446             with self.print_lock:
447                 print '\n*** Test summary:'
448                 print '    Elapsed time:    %.2fs' % t.secs
449                 print '    Peak requests/s: %.2f OK, %.2f Total' % (
450                     self.stats.get_ok_rqst_rate(), self.stats.get_total_rqst_rate())
451                 print '    Peak flows/s:    %.2f OK, %.2f Total' % (
452                     self.stats.get_ok_flow_rate(), self.stats.get_total_flow_rate())
453                 print '    Avg. requests/s: %.2f OK, %.2f Total (%.2f%% of peak total)' % (
454                     self.stats.get_ok_rqsts() / t.secs,
455                     self.stats.get_total_rqsts() / t.secs,
456                     (self.stats.get_total_rqsts() / t.secs * 100) / self.stats.get_total_rqst_rate())
457                 print '    Avg. flows/s:    %.2f OK, %.2f Total (%.2f%% of peak total)' % (
458                     self.stats.get_ok_flows() / t.secs,
459                     self.stats.get_total_flows() / t.secs,
460                     (self.stats.get_total_flows() / t.secs * 100) / self.stats.get_total_flow_rate())
461
462                 self.total_ok_flows += self.stats.get_ok_flows()
463                 self.total_ok_rqsts += self.stats.get_ok_rqsts()
464                 self.threads_done = 0
465
466     def add_blaster(self):
467         self.run_cycle(self.add_flows)
468
469     def delete_blaster(self):
470         self.run_cycle(self.delete_flows)
471
472     def get_ok_flows(self):
473         return self.total_ok_flows
474
475     def get_ok_rqsts(self):
476         return self.total_ok_rqsts
477
478
479 def get_json_from_file(filename):
480     """
481     Get a flow programming template from a file
482     :param filename: File from which to get the template
483     :return: The json flow template (string)
484     """
485     with open(filename, 'r') as f:
486         try:
487             ft = json.load(f)
488             keys = ft['flow'][0].keys()
489             if (u'cookie' in keys) and (u'flow-name' in keys) and (u'id' in keys) and (u'match' in keys):
490                 if u'ipv4-destination' in ft[u'flow'][0]['match'].keys():
491                     print 'File "%s" ok to use as flow template' % filename
492                     return ft
493         except ValueError:
494             print 'JSON parsing of file %s failed' % filename
495             pass
496
497     return None
498
499 ###############################################################################
500 # This is an example of what the content of a JSON flow mode template should
501 # look like. Cut & paste to create a custom template. "id" and "ipv4-destination"
502 # MUST be unique if multiple flows will be programmed in the same test. It's
503 # also beneficial to have unique "cookie" and "flow-name" attributes for easier
504 # identification of the flow.
505 ###############################################################################
506 example_flow_mod_json = '''{
507     "flow": [
508         {
509             "id": "38",
510             "cookie": 38,
511             "instructions": {
512                 "instruction": [
513                     {
514                         "order": 0,
515                         "apply-actions": {
516                             "action": [
517                                 {
518                                     "order": 0,
519                                     "drop-action": { }
520                                 }
521                             ]
522                         }
523                     }
524                 ]
525             },
526             "hard-timeout": 65000,
527             "match": {
528                 "ethernet-match": {
529                     "ethernet-type": {
530                         "type": 2048
531                     }
532                 },
533                 "ipv4-destination": "10.0.0.38/32"
534             },
535             "flow-name": "TestFlow-8",
536             "strict": false,
537             "cookie_mask": 4294967295,
538             "priority": 2,
539             "table_id": 0,
540             "idle-timeout": 65000,
541             "installHw": false
542         }
543
544     ]
545 }'''
546
547
548 def create_arguments_parser():
549     """
550     Shorthand to arg parser on library level in order to access and eventually enhance in ancestors.
551     :return: argument parser supporting config blaster arguments and parameters
552     """
553     my_parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then'
554                                                     ' deletes flows into the config tree, as specified by'
555                                                     ' optional parameters.')
556
557     my_parser.add_argument('--host', default='127.0.0.1',
558                            help='Host where odl controller is running (default is 127.0.0.1).  '
559                                 'Specify a comma-separated list of hosts to perform round-robin load-balancing.')
560     my_parser.add_argument('--port', default='8181',
561                            help='Port on which odl\'s RESTCONF is listening (default is 8181)')
562     my_parser.add_argument('--cycles', type=int, default=1,
563                            help='Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are '
564                                 'performed in cycles. <THREADS> worker threads are started in each cycle and the cycle '
565                                 'ends when all threads finish. Another cycle is started when the previous cycle '
566                                 'finished.')
567     my_parser.add_argument('--threads', type=int, default=1,
568                            help='Number of request worker threads to start in each cycle; default=1. '
569                                 'Each thread will add/delete <FLOWS> flows.')
570     my_parser.add_argument('--flows', type=int, default=10,
571                            help='Number of flows that will be added/deleted by each worker thread in each cycle; '
572                                 'default 10')
573     my_parser.add_argument('--fpr', type=int, default=1,
574                            help='Flows-per-Request - number of flows (batch size) sent in each HTTP request; '
575                                 'default 1')
576     my_parser.add_argument('--nodes', type=int, default=16,
577                            help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
578                                 'flows will be evenly distributed (programmed) into connected nodes.')
579     my_parser.add_argument('--delay', type=int, default=0,
580                            help='Time (in seconds) to wait between the add and delete cycles; default=0')
581     my_parser.add_argument('--delete', dest='delete', action='store_true', default=True,
582                            help='Delete all added flows one by one, benchmark delete '
583                                 'performance.')
584     my_parser.add_argument('--no-delete', dest='delete', action='store_false',
585                            help='Do not perform the delete cycle.')
586     my_parser.add_argument('--auth', dest='auth', action='store_true', default=False,
587                            help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
588                                 'default: no authentication')
589     my_parser.add_argument('--startflow', type=int, default=0,
590                            help='The starting Flow ID; default=0')
591     my_parser.add_argument('--file', default='',
592                            help='File from which to read the JSON flow template; default: no file, use a built in '
593                                 'template.')
594     return my_parser
595
596
597 if __name__ == "__main__":
598     ############################################################################
599     # This program executes the base performance test. The test adds flows into
600     # the controller's config space. This function is basically the CLI frontend
601     # to the FlowConfigBlaster class and drives its main functions: adding and
602     # deleting flows from the controller's config data store
603     ############################################################################
604
605     parser = create_arguments_parser()
606     in_args = parser.parse_args()
607
608     if in_args.file != '':
609         flow_template = get_json_from_file(in_args.file)
610     else:
611         flow_template = None
612
613     fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.fpr, in_args.nodes,
614                             in_args.flows, in_args.startflow, in_args.auth)
615
616     # Run through <cycles>, where <threads> are started in each cycle and
617     # <flows> are added from each thread
618     fct.add_blaster()
619
620     print '\n*** Total flows added: %s' % fct.get_ok_flows()
621     print '    HTTP[OK] results:  %d\n' % fct.get_ok_rqsts()
622
623     if in_args.delay > 0:
624         print '*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay
625         time.sleep(in_args.delay)
626
627     # Run through <cycles>, where <threads> are started in each cycle and
628     # <flows> previously added in an add cycle are deleted in each thread
629     if in_args.delete:
630         fct.delete_blaster()
631         print '\n*** Total flows deleted: %s' % fct.get_ok_flows()
632         print '    HTTP[OK] results:    %d\n' % fct.get_ok_rqsts()