d7ef8ec82893e75e365eb43c452ecabd4f1ab606
[integration/test.git] / csit / libraries / ScaleClient.py
1 '''
2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
4
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
8 '''
9 import random
10 import threading
11 import netaddr
12 import Queue
13 import requests
14 import json
15 import copy
16
17
18 class Counter(object):
19     def __init__(self, start=0):
20         self.lock = threading.Lock()
21         self.value = start
22
23     def increment(self, value=1):
24         self.lock.acquire()
25         val = self.value
26         try:
27             self.value += value
28         finally:
29             self.lock.release()
30         return val
31
32
33 _spreads = ['gauss', 'linear', 'first']    # possible defined spreads at the moment
34 _default_flow_template_json = {  # templease used for config datastore
35     u'flow': [
36         {
37             u'hard-timeout': 65000,
38             u'idle-timeout': 65000,
39             u'cookie_mask': 4294967295,
40             u'flow-name': u'FLOW-NAME-TEMPLATE',
41             u'priority': 2,
42             u'strict': False,
43             u'cookie': 0,
44             u'table_id': 0,
45             u'installHw': False,
46             u'id': u'FLOW-ID-TEMPLATE',
47             u'match': {
48                 u'ipv4-destination': u'0.0.0.0/32',
49                 u'ethernet-match': {
50                     u'ethernet-type': {
51                         u'type': 2048
52                     }
53                 }
54             },
55             u'instructions': {
56                 u'instruction': [
57                     {
58                         u'order': 0,
59                         u'apply-actions': {
60                             u'action': [
61                                 {
62                                     u'drop-action': {},
63                                     u'order': 0
64                                 }
65                             ]
66                         }
67                     }
68                 ]
69             }
70         }
71     ]
72 }
73
74
75 _node_tmpl = "/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id=\"openflow:{0}\"]"
76
77
78 _default_operations_item_json = {  # template used for sal operations
79     "input": {
80         "bulk-flow-item": [
81             {
82                 "node": "to_be_replaced",
83                 "cookie": 0,
84                 "cookie_mask": 4294967295,
85                 "flags": "SEND_FLOW_REM",
86                 "hard-timeout": 65000,
87                 "idle-timeout": 65000,
88                 "instructions": {
89                     "instruction": [{
90                         "apply-actions": {
91                             "action": [
92                                 {
93                                     "drop-action": {},
94                                     "order": 0
95                                 }
96                             ]
97                         },
98                         "order": 0
99                     }]
100                 },
101                 "match": {
102                     "ipv4-destination": "0.0.0.0/32",
103                     "ethernet-match": {
104                         "ethernet-type": {
105                             "type": 2048
106                         }
107                     },
108                 },
109                 "priority": 2,
110                 "table_id": 0
111             }
112         ]
113     }
114 }
115
116
117 def _get_notes(fldet=[]):
118     """For given list of flow details it produces a dictionary with statistics
119     { swId1 : { tabId1 : flows_count1,
120                 tabId2 : flows_count2,
121                ...
122                 'total' : switch count }
123       swId2 ...
124     }
125     """
126     notes = {}
127     for (sw, tab, flow) in fldet:
128         if sw not in notes:
129             notes[sw] = {'total': 0}
130         if tab not in notes[sw]:
131             notes[sw][tab] = 0
132         notes[sw][tab] += 1
133         notes[sw]['total'] += 1
134     return notes
135
136
137 def _randomize(spread, maxn):
138     """Returns a randomized switch or table id"""
139     if spread not in _spreads:
140         raise Exception('Spread method {} not available'.format(spread))
141     while True:
142         if spread == 'gauss':
143             ga = abs(random.gauss(0, 1))
144             rv = int(ga*float(maxn)/3)
145             if rv < maxn:
146                 return rv
147         elif spread == 'linear':
148             rv = int(random.random() * float(maxn))
149             if rv < maxn:
150                 return rv
151             else:
152                 raise ValueError('rv >= maxn')
153         elif spread == 'first':
154             return 0
155
156
157 def generate_new_flow_details(flows=10, switches=1, swspread='gauss', tables=250, tabspread='gauss'):
158     """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
159     according to the spread rules between swithces and tables.
160     It also returns a dictionary with statsistics."""
161     swflows = [_randomize(swspread, switches) for f in range(int(flows))]
162     # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
163     fltables = [(s+1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)]
164     notes = _get_notes(fltables)
165     return fltables, notes
166
167
168 def _prepare_add(cntl, method, flows, template=None):
169     """Creates a PUT http requests to configure a flow in configuration datastore.
170
171     Args:
172         :param cntl: controller's ip address or hostname
173
174         :param method: determines http request method
175
176         :param flows: list of flow details
177
178         :param template: flow template to be to be filled
179
180     Returns:
181         :returns req: http request object
182     """
183     fl1 = flows[0]
184     sw, tab, fl, ip = fl1
185     url = 'http://' + cntl + ':' + '8181'
186     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw)
187     url += '/table/' + str(tab) + '/flow/' + str(fl)
188     flow = copy.deepcopy(template['flow'][0])
189     flow['cookie'] = fl
190     flow['flow-name'] = 'TestFlow-%d' % fl
191     flow['id'] = str(fl)
192     flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
193     flow['table_id'] = tab
194     fmod = dict(template)
195     fmod['flow'] = flow
196     req_data = json.dumps(fmod)
197     req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data,
198                            auth=('admin', 'admin'))
199     return req
200
201
202 def _prepare_table_add(cntl, method, flows, template=None):
203     """Creates a POST http requests to configure a flow in configuration datastore.
204
205     Args:
206         :param cntl: controller's ip address or hostname
207
208         :param method: determines http request method
209
210         :param flows: list of flow details
211
212         :param template: flow template to be to be filled
213
214     Returns:
215         :returns req: http request object
216     """
217     fl1 = flows[0]
218     sw, tab, fl, ip = fl1
219     url = 'http://' + cntl + ':' + '8181'
220     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw) + '/table/' + str(tab)
221     fdets = []
222     for sw, tab, fl, ip in flows:
223         flow = copy.deepcopy(template['flow'][0])
224         flow['cookie'] = fl
225         flow['flow-name'] = 'TestFlow-%d' % fl
226         flow['id'] = str(fl)
227         flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
228         flow['table_id'] = tab
229         fdets.append(flow)
230     fmod = copy.deepcopy(template)
231     fmod['flow'] = fdets
232     req_data = json.dumps(fmod)
233     req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
234                            auth=('admin', 'admin'))
235     return req
236
237
238 def _prepare_delete(cntl, method, flows, template=None):
239     """Creates a DELETE http request to remove the flow from configuration datastore.
240
241     Args:
242         :param cntl: controller's ip address or hostname
243
244         :param method: determines http request method
245
246         :param flows: list of flow details
247
248         :param template: flow template to be to be filled
249
250     Returns:
251         :returns req: http request object
252     """
253     fl1 = flows[0]
254     sw, tab, fl, ip = fl1
255     url = 'http://' + cntl + ':' + '8181'
256     url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw)
257     url += '/table/' + str(tab) + '/flow/' + str(fl)
258     req = requests.Request('DELETE', url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
259     return req
260
261
262 def _prepare_rpc_item(cntl, method, flows, template=None):
263     """Creates a POST http requests to add or remove a flow using openflowplugin rpc.
264
265     Args:
266         :param cntl: controller's ip address or hostname
267
268         :param method: determines http request method
269
270         :param flows: list of flow details
271
272         :param template: flow template to be to be filled
273
274     Returns:
275         :returns req: http request object
276     """
277     f1 = flows[0]
278     sw, tab, fl, ip = f1
279     url = 'http://' + cntl + ':' + '8181/restconf/operations/sal-bulk-flow:' + method
280     fdets = []
281     for sw, tab, fl, ip in flows:
282         flow = copy.deepcopy(template['input']['bulk-flow-item'][0])
283         flow['node'] = _node_tmpl.format(sw)
284         flow['cookie'] = fl
285         flow['flow-name'] = 'TestFlow-%d' % fl
286         flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
287         flow['table_id'] = tab
288         fdets.append(flow)
289     fmod = copy.deepcopy(template)
290     fmod['input']['bulk-flow-item'] = fdets
291     req_data = json.dumps(fmod)
292     req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
293                            auth=('admin', 'admin'))
294     return req
295
296
297 def _prepare_ds_item(cntl, method, flows, template=None):
298     """Creates a POST http requests to configure a flow in configuration datastore.
299
300     Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level)
301
302     Args:
303         :param cntl: controller's ip address or hostname
304
305         :param method: determines http request method
306
307         :param flows: list of flow details
308
309         :param template: flow template to be to be filled
310
311     Returns:
312         :returns req: http request object
313     """
314     f1 = flows[0]
315     sw, tab, fl, ip = f1
316     url = 'http://' + cntl + ':' + '8181/restconf/operations/sal-bulk-flow:' + method
317     fdets = []
318     for sw, tab, fl, ip in flows:
319         flow = copy.deepcopy(template['input']['bulk-flow-item'][0])
320         flow['node'] = _node_tmpl.format(sw)
321         flow['cookie'] = fl
322         flow['flow-name'] = 'TestFlow-%d' % fl
323         flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
324         flow['table_id'] = tab
325         flow['flow-id'] = fl
326         fdets.append(flow)
327     fmod = copy.deepcopy(template)
328     del fmod['input']['bulk-flow-item']
329     fmod['input']['bulk-flow-ds-item'] = fdets
330     req_data = json.dumps(fmod)
331     req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
332                            auth=('admin', 'admin'))
333     return req
334
335
336 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
337                        template=None, outqueue=None, method=None):
338     """The funcion sends http requests.
339
340     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
341     to the controller
342
343     Args:
344         :param thread_id: thread id
345
346         :param preparefnc: function to preparesthe http request
347
348         :param inqueue: input queue, flow details are comming from here
349
350         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
351
352         :param controllers: a list of controllers' ip addresses or hostnames
353
354         :param restport: restconf port
355
356         :param template: flow template used for creating flow content
357
358         :param outqueue: queue where the results should be put
359
360         :param method: method derermines the type of http request
361
362     Returns:
363         nothing, results must be put into the output queue
364     """
365     ses = requests.Session()
366     cntl = controllers[0]
367     counter = [0 for i in range(600)]
368     loop = True
369
370     while loop:
371         try:
372             flowlist = inqueue.get(timeout=1)
373         except Queue.Empty:
374             if exitevent.is_set() and inqueue.empty():
375                 loop = False
376             continue
377         req = preparefnc(cntl, method, flowlist, template=template)
378         # prep = ses.prepare_request(req)
379         prep = req.prepare()
380         try:
381             rsp = ses.send(prep, timeout=5)
382         except requests.exceptions.Timeout:
383             counter[99] += 1
384             continue
385         counter[rsp.status_code] += 1
386     res = {}
387     for i, v in enumerate(counter):
388         if v > 0:
389             res[i] = v
390     outqueue.put(res)
391
392
393 def _task_executor(method='', flow_template=None, flow_details=[], controllers=['127.0.0.1'],
394                    restport='8181', nrthreads=1, fpr=1):
395     """The main function which drives sending of http requests.
396
397     Creates 2 queues and requested number of 'working threads'.  One queue is filled with flow details and working
398     threads read them out and send http requests. The other queue is for sending results from working threads back.
399     After the threads' join, it produces a summary result.
400
401     Args:
402         :param method: based on this the function which prepares http request is choosen
403
404         :param flow_template: template to generate a flow content
405
406         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
407
408         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
409
410         :param restport: restconf port (default='8181')
411
412         :param nrthreads: number of threads used to send http requests (default=1)
413
414         :param fpr: flow per request, number of flows sent in one http request
415
416     Returns:
417         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
418     """
419     # TODO: multi controllers support
420     ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
421
422     # choose message prepare function
423     if method == 'PUT':
424         preparefnc = _prepare_add
425         # put can contain only 1 flow, lets overwrite any value of flows per request
426         fpr = 1
427     elif method == 'POST':
428         preparefnc = _prepare_table_add
429     elif method == 'DELETE':
430         preparefnc = _prepare_delete
431         # delete flow can contain only 1 flow, lets overwrite any value of flows per request
432         fpr = 1
433     elif method in ['add-flows-ds', 'remove-flows-ds']:
434         preparefnc = _prepare_ds_item
435     elif method in ['add-flows-rpc', 'remove-flows-rpc']:
436         preparefnc = _prepare_rpc_item
437     else:
438         raise NotImplementedError('Method {0} does not have it\'s prepeare function defined'.format(method))
439
440     # lets enlarge the tupple of flow details with IP, to be used with the template
441     flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
442     # lels divide flows into switches and tables - flow groups
443     flowgroups = {}
444     for flow in flows:
445         sw, tab, _, _ = flow
446         flowkey = (sw, tab)
447         if flowkey in flowgroups:
448             flowgroups[flowkey].append(flow)
449         else:
450             flowgroups[flowkey] = [flow]
451
452     # lets fill the queue with details needed for one http requests
453     # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists
454     # according to the flows per request (fpr) paramer
455     sendqueue = Queue.Queue()
456     for flowgroup, flow_list in flowgroups.iteritems():
457         while len(flow_list) > 0:
458             sendqueue.put(flow_list[:int(fpr)])
459             flow_list = flow_list[int(fpr):]
460
461     # result_gueue
462     resultqueue = Queue.Queue()
463     # creaet exit event
464     exitevent = threading.Event()
465
466     # lets start threads whic will read flow details fro queues and send
467     threads = []
468     for i in range(int(nrthreads)):
469         thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
470                                kwargs={"inqueue": sendqueue, "exitevent": exitevent,
471                                        "controllers": controllers, "restport": restport,
472                                        "template": flow_template, "outqueue": resultqueue, "method": method})
473         threads.append(thr)
474         thr.start()
475
476     exitevent.set()
477
478     result = {}
479     # waitng for reqults and sum them up
480     for t in threads:
481         t.join()
482         # reading partial resutls from sender thread
483         part_result = resultqueue.get()
484         for k, v in part_result.iteritems():
485             if k not in result:
486                 result[k] = v
487             else:
488                 result[k] += v
489     return result
490
491
492 def configure_flows(*args, **kwargs):
493     """Configure flows based on given flow details.
494
495     Args:
496         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
497
498         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
499
500         :param restport: restconf port (default='8181')
501
502         :param nrthreads: number of threads used to send http requests (default=1)
503
504     Returns:
505         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
506     """
507     return _task_executor(method='PUT', flow_template=_default_flow_template_json, **kwargs)
508
509
510 def deconfigure_flows(*args, **kwargs):
511     """Deconfigure flows based on given flow details.
512
513     Args:
514         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
515
516         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
517
518         :param restport: restconf port (default='8181')
519
520         :param nrthreads: number of threads used to send http requests (default=1)
521
522     Returns:
523         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
524     """
525     return _task_executor(method='DELETE', flow_template=_default_flow_template_json, **kwargs)
526
527
528 def configure_flows_bulk(*args, **kwargs):
529     """Configure flows based on given flow details using a POST http request..
530
531     Args:
532         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
533
534         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
535
536         :param restport: restconf port (default='8181')
537
538         :param nrthreads: number of threads used to send http requests (default=1)
539
540     Returns:
541         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
542     """
543     return _task_executor(method='POST', flow_template=_default_flow_template_json, **kwargs)
544
545
546 def operations_add_flows_ds(*args, **kwargs):
547     """Configure flows based on given flow details.
548
549     Args:
550         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
551
552         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
553
554         :param restport: restconf port (default='8181')
555
556         :param nrthreads: number of threads used to send http requests (default=1)
557
558     Returns:
559         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
560     """
561     return _task_executor(method='add-flows-ds', flow_template=_default_operations_item_json, **kwargs)
562
563
564 def operations_remove_flows_ds(*args, **kwargs):
565     """Remove flows based on given flow details.
566
567     Args:
568         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
569
570         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
571
572         :param restport: restconf port (default='8181')
573
574         :param nrthreads: number of threads used to send http requests (default=1)
575
576     Returns:
577         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
578     """
579     return _task_executor(method='remove-flows-ds', flow_template=_default_operations_item_json, **kwargs)
580
581
582 def operations_add_flows_rpc(*args, **kwargs):
583     """Configure flows based on given flow details using rpc calls.
584
585     Args:
586         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
587
588         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
589
590         :param restport: restconf port (default='8181')
591
592         :param nrthreads: number of threads used to send http requests (default=1)
593
594     Returns:
595         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
596     """
597     return _task_executor(method='add-flows-rpc', flow_template=_default_operations_item_json, **kwargs)
598
599
600 def operations_remove_flows_rpc(*args, **kwargs):
601     """Remove flows based on given flow details using rpc calls.
602
603     Args:
604         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
605
606         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
607
608         :param restport: restconf port (default='8181')
609
610         :param nrthreads: number of threads used to send http requests (default=1)
611
612     Returns:
613         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
614     """
615     return _task_executor(method='remove-flows-rpc', flow_template=_default_operations_item_json, **kwargs)
616
617
618 def _get_operational_inventory_of_switches(controller):
619     """Gets number of switches present in the operational inventory
620
621     Args:
622         :param controller: controller's ip or host name
623
624     Returns:
625         :returns switches: number of switches connected
626     """
627     url = 'http://' + controller + ':8181/restconf/operational/opendaylight-inventory:nodes'
628     rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
629     if rsp.status_code != 200:
630         return None
631     inv = json.loads(rsp.content)
632     if 'nodes' not in inv:
633         return None
634     if 'node' not in inv['nodes']:
635         return []
636     inv = inv['nodes']['node']
637     switches = [sw for sw in inv if 'openflow:' in sw['id']]
638     return switches
639
640
641 def flow_stats_collected(controller=''):
642     """Provides the operational inventory counts counts of switches and flows.
643
644     Args:
645         :param controller: controller's ip address or host name
646
647     Returns:
648         :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows
649     """
650     # print type(flow_details), flow_details
651     active_flows = 0
652     found_flows = 0
653     switches = _get_operational_inventory_of_switches(controller)
654     if switches is None:
655         return 0, 0, 0
656     for sw in switches:
657         tabs = sw['flow-node-inventory:table']
658         for t in tabs:
659             active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
660             if 'flow' in t:
661                 found_flows += len(t['flow'])
662     print "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows
663     return len(switches), active_flows, found_flows
664
665
666 def get_switches_count(controller=''):
667     """Gives the count of the switches presnt in the operational inventory nodes datastore.
668
669     Args:
670         :param controller: controller's ip address or host name
671
672     Returns:
673         :returns switches: returns the number of connected switches
674     """
675     switches = _get_operational_inventory_of_switches(controller)
676     if switches is None:
677         return 0
678     return len(switches)
679
680
681 def validate_responses(received, expected):
682     """Compares given response summary with expected results.
683
684     Args:
685         :param received: dictionary returned from operations_* and (de)configure_flows*
686                          of this library
687                          e.g. received = { 200:41 } - this means that we 41x receives response with status code 200
688
689         :param expected: list of expected http result codes
690                          e.g. expected=[200] - we expect only http status 200 to be present
691
692     Returns:
693         :returns True: if list of http statuses from received responses is the same as exxpected
694         :returns False: elseware
695     """
696     return True if received.keys() == expected else False