Support OVSDB entity ownership ID format
[integration/test.git] / csit / libraries / ScaleClient.py
1 """
2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
4
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
8 """
9 import random
10 import threading
11 import netaddr
12 import queue
13 import requests
14 import json
15 import copy
16
17
18 class Counter(object):
19     def __init__(self, start=0):
20         self.lock = threading.Lock()
21         self.value = start
22
23     def increment(self, value=1):
24         self.lock.acquire()
25         val = self.value
26         try:
27             self.value += value
28         finally:
29             self.lock.release()
30         return val
31
32
33 _spreads = ["gauss", "linear", "first"]  # possible defined spreads at the moment
34 _default_flow_template_json = {  # templease used for config datastore
35     u"flow": [
36         {
37             u"hard-timeout": 65000,
38             u"idle-timeout": 65000,
39             u"cookie_mask": 4294967295,
40             u"flow-name": u"FLOW-NAME-TEMPLATE",
41             u"priority": 2,
42             u"strict": False,
43             u"cookie": 0,
44             u"table_id": 0,
45             u"installHw": False,
46             u"id": u"FLOW-ID-TEMPLATE",
47             u"match": {
48                 u"ipv4-destination": u"0.0.0.0/32",
49                 u"ethernet-match": {u"ethernet-type": {u"type": 2048}},
50             },
51             u"instructions": {
52                 u"instruction": [
53                     {
54                         u"order": 0,
55                         u"apply-actions": {
56                             u"action": [{u"drop-action": {}, u"order": 0}]
57                         },
58                     }
59                 ]
60             },
61         }
62     ]
63 }
64
65
66 _node_tmpl = '/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id="openflow:{0}"]'
67
68
69 _default_operations_item_json = {  # template used for sal operations
70     "input": {
71         "bulk-flow-item": [
72             {
73                 "node": "to_be_replaced",
74                 "cookie": 0,
75                 "cookie_mask": 4294967295,
76                 "flags": "SEND_FLOW_REM",
77                 "hard-timeout": 65000,
78                 "idle-timeout": 65000,
79                 "instructions": {
80                     "instruction": [
81                         {
82                             "apply-actions": {
83                                 "action": [{"drop-action": {}, "order": 0}]
84                             },
85                             "order": 0,
86                         }
87                     ]
88                 },
89                 "match": {
90                     "ipv4-destination": "0.0.0.0/32",
91                     "ethernet-match": {"ethernet-type": {"type": 2048}},
92                 },
93                 "priority": 2,
94                 "table_id": 0,
95             }
96         ]
97     }
98 }
99
100
101 def _get_notes(fldet=[]):
102     """For given list of flow details it produces a dictionary with statistics
103     { swId1 : { tabId1 : flows_count1,
104                 tabId2 : flows_count2,
105                ...
106                 'total' : switch count }
107       swId2 ...
108     }
109     """
110     notes = {}
111     for (sw, tab, flow) in fldet:
112         if sw not in notes:
113             notes[sw] = {"total": 0}
114         if tab not in notes[sw]:
115             notes[sw][tab] = 0
116         notes[sw][tab] += 1
117         notes[sw]["total"] += 1
118     return notes
119
120
121 def _randomize(spread, maxn):
122     """Returns a randomized switch or table id"""
123     if spread not in _spreads:
124         raise Exception("Spread method {} not available".format(spread))
125     while True:
126         if spread == "gauss":
127             ga = abs(random.gauss(0, 1))
128             rv = int(ga * float(maxn) / 3)
129             if rv < maxn:
130                 return rv
131         elif spread == "linear":
132             rv = int(random.random() * float(maxn))
133             if rv < maxn:
134                 return rv
135             else:
136                 raise ValueError("rv >= maxn")
137         elif spread == "first":
138             return 0
139
140
141 def generate_new_flow_details(
142     flows=10, switches=1, swspread="gauss", tables=250, tabspread="gauss"
143 ):
144     """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
145     according to the spread rules between swithces and tables.
146     It also returns a dictionary with statsistics."""
147     swflows = [_randomize(swspread, switches) for f in range(int(flows))]
148     # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
149     fltables = [
150         (s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)
151     ]
152     notes = _get_notes(fltables)
153     return fltables, notes
154
155
156 def _prepare_add(cntl, method, flows, template=None):
157     """Creates a PUT http requests to configure a flow in configuration datastore.
158
159     Args:
160         :param cntl: controller's ip address or hostname
161
162         :param method: determines http request method
163
164         :param flows: list of flow details
165
166         :param template: flow template to be to be filled
167
168     Returns:
169         :returns req: http request object
170     """
171     fl1 = flows[0]
172     sw, tab, fl, ip = fl1
173     url = "http://" + cntl + ":" + "8181"
174     url += "/restconf/config/opendaylight-inventory:nodes/node/openflow:" + str(sw)
175     url += "/table/" + str(tab) + "/flow/" + str(fl)
176     flow = copy.deepcopy(template["flow"][0])
177     flow["cookie"] = fl
178     flow["flow-name"] = "TestFlow-%d" % fl
179     flow["id"] = str(fl)
180     flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
181     flow["table_id"] = tab
182     fmod = dict(template)
183     fmod["flow"] = flow
184     req_data = json.dumps(fmod)
185     req = requests.Request(
186         "PUT",
187         url,
188         headers={"Content-Type": "application/json"},
189         data=req_data,
190         auth=("admin", "admin"),
191     )
192     return req
193
194
195 def _prepare_table_add(cntl, method, flows, template=None):
196     """Creates a POST http requests to configure a flow in configuration datastore.
197
198     Args:
199         :param cntl: controller's ip address or hostname
200
201         :param method: determines http request method
202
203         :param flows: list of flow details
204
205         :param template: flow template to be to be filled
206
207     Returns:
208         :returns req: http request object
209     """
210     fl1 = flows[0]
211     sw, tab, fl, ip = fl1
212     url = "http://" + cntl + ":" + "8181"
213     url += (
214         "/restconf/config/opendaylight-inventory:nodes/node/openflow:"
215         + str(sw)
216         + "/table/"
217         + str(tab)
218     )
219     fdets = []
220     for sw, tab, fl, ip in flows:
221         flow = copy.deepcopy(template["flow"][0])
222         flow["cookie"] = fl
223         flow["flow-name"] = "TestFlow-%d" % fl
224         flow["id"] = str(fl)
225         flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
226         flow["table_id"] = tab
227         fdets.append(flow)
228     fmod = copy.deepcopy(template)
229     fmod["flow"] = fdets
230     req_data = json.dumps(fmod)
231     req = requests.Request(
232         "POST",
233         url,
234         headers={"Content-Type": "application/json"},
235         data=req_data,
236         auth=("admin", "admin"),
237     )
238     return req
239
240
241 def _prepare_delete(cntl, method, flows, template=None):
242     """Creates a DELETE http request to remove the flow from configuration datastore.
243
244     Args:
245         :param cntl: controller's ip address or hostname
246
247         :param method: determines http request method
248
249         :param flows: list of flow details
250
251         :param template: flow template to be to be filled
252
253     Returns:
254         :returns req: http request object
255     """
256     fl1 = flows[0]
257     sw, tab, fl, ip = fl1
258     url = "http://" + cntl + ":" + "8181"
259     url += "/restconf/config/opendaylight-inventory:nodes/node/openflow:" + str(sw)
260     url += "/table/" + str(tab) + "/flow/" + str(fl)
261     req = requests.Request(
262         "DELETE",
263         url,
264         headers={"Content-Type": "application/json"},
265         auth=("admin", "admin"),
266     )
267     return req
268
269
270 def _prepare_rpc_item(cntl, method, flows, template=None):
271     """Creates a POST http requests to add or remove a flow using openflowplugin rpc.
272
273     Args:
274         :param cntl: controller's ip address or hostname
275
276         :param method: determines http request method
277
278         :param flows: list of flow details
279
280         :param template: flow template to be to be filled
281
282     Returns:
283         :returns req: http request object
284     """
285     f1 = flows[0]
286     sw, tab, fl, ip = f1
287     url = "http://" + cntl + ":" + "8181/restconf/operations/sal-bulk-flow:" + method
288     fdets = []
289     for sw, tab, fl, ip in flows:
290         flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
291         flow["node"] = _node_tmpl.format(sw)
292         flow["cookie"] = fl
293         flow["flow-name"] = "TestFlow-%d" % fl
294         flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
295         flow["table_id"] = tab
296         fdets.append(flow)
297     fmod = copy.deepcopy(template)
298     fmod["input"]["bulk-flow-item"] = fdets
299     req_data = json.dumps(fmod)
300     req = requests.Request(
301         "POST",
302         url,
303         headers={"Content-Type": "application/json"},
304         data=req_data,
305         auth=("admin", "admin"),
306     )
307     return req
308
309
310 def _prepare_ds_item(cntl, method, flows, template=None):
311     """Creates a POST http requests to configure a flow in configuration datastore.
312
313     Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level)
314
315     Args:
316         :param cntl: controller's ip address or hostname
317
318         :param method: determines http request method
319
320         :param flows: list of flow details
321
322         :param template: flow template to be to be filled
323
324     Returns:
325         :returns req: http request object
326     """
327     f1 = flows[0]
328     sw, tab, fl, ip = f1
329     url = "http://" + cntl + ":" + "8181/restconf/operations/sal-bulk-flow:" + method
330     fdets = []
331     for sw, tab, fl, ip in flows:
332         flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
333         flow["node"] = _node_tmpl.format(sw)
334         flow["cookie"] = fl
335         flow["flow-name"] = "TestFlow-%d" % fl
336         flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
337         flow["table_id"] = tab
338         flow["flow-id"] = fl
339         fdets.append(flow)
340     fmod = copy.deepcopy(template)
341     del fmod["input"]["bulk-flow-item"]
342     fmod["input"]["bulk-flow-ds-item"] = fdets
343     req_data = json.dumps(fmod)
344     req = requests.Request(
345         "POST",
346         url,
347         headers={"Content-Type": "application/json"},
348         data=req_data,
349         auth=("admin", "admin"),
350     )
351     return req
352
353
354 def _wt_request_sender(
355     thread_id,
356     preparefnc,
357     inqueue=None,
358     exitevent=None,
359     controllers=[],
360     restport="",
361     template=None,
362     outqueue=None,
363     method=None,
364 ):
365     """The funcion sends http requests.
366
367     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
368     to the controller
369
370     Args:
371         :param thread_id: thread id
372
373         :param preparefnc: function to preparesthe http request
374
375         :param inqueue: input queue, flow details are comming from here
376
377         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
378
379         :param controllers: a list of controllers' ip addresses or hostnames
380
381         :param restport: restconf port
382
383         :param template: flow template used for creating flow content
384
385         :param outqueue: queue where the results should be put
386
387         :param method: method derermines the type of http request
388
389     Returns:
390         nothing, results must be put into the output queue
391     """
392     ses = requests.Session()
393     cntl = controllers[0]
394     counter = [0 for i in range(600)]
395     loop = True
396
397     while loop:
398         try:
399             flowlist = inqueue.get(timeout=1)
400         except queue.Empty:
401             if exitevent.is_set() and inqueue.empty():
402                 loop = False
403             continue
404         req = preparefnc(cntl, method, flowlist, template=template)
405         # prep = ses.prepare_request(req)
406         prep = req.prepare()
407         try:
408             rsp = ses.send(prep, timeout=5)
409         except requests.exceptions.Timeout:
410             counter[99] += 1
411             continue
412         counter[rsp.status_code] += 1
413     res = {}
414     for i, v in enumerate(counter):
415         if v > 0:
416             res[i] = v
417     outqueue.put(res)
418
419
420 def _task_executor(
421     method="",
422     flow_template=None,
423     flow_details=[],
424     controllers=["127.0.0.1"],
425     restport="8181",
426     nrthreads=1,
427     fpr=1,
428 ):
429     """The main function which drives sending of http requests.
430
431     Creates 2 queues and requested number of 'working threads'.  One queue is filled with flow details and working
432     threads read them out and send http requests. The other queue is for sending results from working threads back.
433     After the threads' join, it produces a summary result.
434
435     Args:
436         :param method: based on this the function which prepares http request is choosen
437
438         :param flow_template: template to generate a flow content
439
440         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
441
442         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
443
444         :param restport: restconf port (default='8181')
445
446         :param nrthreads: number of threads used to send http requests (default=1)
447
448         :param fpr: flow per request, number of flows sent in one http request
449
450     Returns:
451         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
452     """
453     # TODO: multi controllers support
454     ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
455
456     # choose message prepare function
457     if method == "PUT":
458         preparefnc = _prepare_add
459         # put can contain only 1 flow, lets overwrite any value of flows per request
460         fpr = 1
461     elif method == "POST":
462         preparefnc = _prepare_table_add
463     elif method == "DELETE":
464         preparefnc = _prepare_delete
465         # delete flow can contain only 1 flow, lets overwrite any value of flows per request
466         fpr = 1
467     elif method in ["add-flows-ds", "remove-flows-ds"]:
468         preparefnc = _prepare_ds_item
469     elif method in ["add-flows-rpc", "remove-flows-rpc"]:
470         preparefnc = _prepare_rpc_item
471     else:
472         raise NotImplementedError(
473             "Method {0} does not have it's prepeare function defined".format(method)
474         )
475
476     # lets enlarge the tupple of flow details with IP, to be used with the template
477     flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
478     # lels divide flows into switches and tables - flow groups
479     flowgroups = {}
480     for flow in flows:
481         sw, tab, _, _ = flow
482         flowkey = (sw, tab)
483         if flowkey in flowgroups:
484             flowgroups[flowkey].append(flow)
485         else:
486             flowgroups[flowkey] = [flow]
487
488     # lets fill the queue with details needed for one http requests
489     # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists
490     # according to the flows per request (fpr) paramer
491     sendqueue = queue.Queue()
492     for flowgroup, flow_list in flowgroups.items():
493         while len(flow_list) > 0:
494             sendqueue.put(flow_list[: int(fpr)])
495             flow_list = flow_list[int(fpr) :]
496
497     # result_gueue
498     resultqueue = queue.Queue()
499     # creaet exit event
500     exitevent = threading.Event()
501
502     # lets start threads whic will read flow details fro queues and send
503     threads = []
504     for i in range(int(nrthreads)):
505         thr = threading.Thread(
506             target=_wt_request_sender,
507             args=(i, preparefnc),
508             kwargs={
509                 "inqueue": sendqueue,
510                 "exitevent": exitevent,
511                 "controllers": controllers,
512                 "restport": restport,
513                 "template": flow_template,
514                 "outqueue": resultqueue,
515                 "method": method,
516             },
517         )
518         threads.append(thr)
519         thr.start()
520
521     exitevent.set()
522
523     result = {}
524     # waitng for reqults and sum them up
525     for t in threads:
526         t.join()
527         # reading partial resutls from sender thread
528         part_result = resultqueue.get()
529         for k, v in part_result.items():
530             if k not in result:
531                 result[k] = v
532             else:
533                 result[k] += v
534     return result
535
536
537 def configure_flows(*args, **kwargs):
538     """Configure flows based on given flow details.
539
540     Args:
541         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
542
543         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
544
545         :param restport: restconf port (default='8181')
546
547         :param nrthreads: number of threads used to send http requests (default=1)
548
549     Returns:
550         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
551     """
552     return _task_executor(
553         method="PUT", flow_template=_default_flow_template_json, **kwargs
554     )
555
556
557 def deconfigure_flows(*args, **kwargs):
558     """Deconfigure flows based on given flow details.
559
560     Args:
561         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
562
563         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
564
565         :param restport: restconf port (default='8181')
566
567         :param nrthreads: number of threads used to send http requests (default=1)
568
569     Returns:
570         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
571     """
572     return _task_executor(
573         method="DELETE", flow_template=_default_flow_template_json, **kwargs
574     )
575
576
577 def configure_flows_bulk(*args, **kwargs):
578     """Configure flows based on given flow details using a POST http request..
579
580     Args:
581         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
582
583         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
584
585         :param restport: restconf port (default='8181')
586
587         :param nrthreads: number of threads used to send http requests (default=1)
588
589     Returns:
590         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
591     """
592     return _task_executor(
593         method="POST", flow_template=_default_flow_template_json, **kwargs
594     )
595
596
597 def operations_add_flows_ds(*args, **kwargs):
598     """Configure flows based on given flow details.
599
600     Args:
601         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
602
603         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
604
605         :param restport: restconf port (default='8181')
606
607         :param nrthreads: number of threads used to send http requests (default=1)
608
609     Returns:
610         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
611     """
612     return _task_executor(
613         method="add-flows-ds", flow_template=_default_operations_item_json, **kwargs
614     )
615
616
617 def operations_remove_flows_ds(*args, **kwargs):
618     """Remove flows based on given flow details.
619
620     Args:
621         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
622
623         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
624
625         :param restport: restconf port (default='8181')
626
627         :param nrthreads: number of threads used to send http requests (default=1)
628
629     Returns:
630         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
631     """
632     return _task_executor(
633         method="remove-flows-ds", flow_template=_default_operations_item_json, **kwargs
634     )
635
636
637 def operations_add_flows_rpc(*args, **kwargs):
638     """Configure flows based on given flow details using rpc calls.
639
640     Args:
641         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
642
643         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
644
645         :param restport: restconf port (default='8181')
646
647         :param nrthreads: number of threads used to send http requests (default=1)
648
649     Returns:
650         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
651     """
652     return _task_executor(
653         method="add-flows-rpc", flow_template=_default_operations_item_json, **kwargs
654     )
655
656
657 def operations_remove_flows_rpc(*args, **kwargs):
658     """Remove flows based on given flow details using rpc calls.
659
660     Args:
661         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
662
663         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
664
665         :param restport: restconf port (default='8181')
666
667         :param nrthreads: number of threads used to send http requests (default=1)
668
669     Returns:
670         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
671     """
672     return _task_executor(
673         method="remove-flows-rpc", flow_template=_default_operations_item_json, **kwargs
674     )
675
676
677 def _get_operational_inventory_of_switches(controller):
678     """Gets number of switches present in the operational inventory
679
680     Args:
681         :param controller: controller's ip or host name
682
683     Returns:
684         :returns switches: number of switches connected
685     """
686     url = (
687         "http://"
688         + controller
689         + ":8181/restconf/operational/opendaylight-inventory:nodes"
690     )
691     rsp = requests.get(
692         url,
693         headers={"Accept": "application/json"},
694         stream=False,
695         auth=("admin", "admin"),
696     )
697     if rsp.status_code != 200:
698         return None
699     inv = json.loads(rsp.content)
700     if "nodes" not in inv:
701         return None
702     if "node" not in inv["nodes"]:
703         return []
704     inv = inv["nodes"]["node"]
705     switches = [sw for sw in inv if "openflow:" in sw["id"]]
706     return switches
707
708
709 def flow_stats_collected(controller=""):
710     """Provides the operational inventory counts counts of switches and flows.
711
712     Args:
713         :param controller: controller's ip address or host name
714
715     Returns:
716         :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows
717     """
718     active_flows = 0
719     found_flows = 0
720     switches = _get_operational_inventory_of_switches(controller)
721     if switches is None:
722         return 0, 0, 0
723     for sw in switches:
724         tabs = sw["flow-node-inventory:table"]
725         for t in tabs:
726             active_flows += t[
727                 "opendaylight-flow-table-statistics:flow-table-statistics"
728             ]["active-flows"]
729             if "flow" in t:
730                 found_flows += len(t["flow"])
731     print(
732         (
733             "Switches,ActiveFlows(reported)/FlowsFound",
734             len(switches),
735             active_flows,
736             found_flows,
737         )
738     )
739     return len(switches), active_flows, found_flows
740
741
742 def get_switches_count(controller=""):
743     """Gives the count of the switches presnt in the operational inventory nodes datastore.
744
745     Args:
746         :param controller: controller's ip address or host name
747
748     Returns:
749         :returns switches: returns the number of connected switches
750     """
751     switches = _get_operational_inventory_of_switches(controller)
752     if switches is None:
753         return 0
754     return len(switches)
755
756
757 def validate_responses(received, expected):
758     """Compares given response summary with expected results.
759
760     Args:
761         :param received: dictionary returned from operations_* and (de)configure_flows*
762                          of this library
763                          e.g. received = { 200:41 } - this means that we 41x receives response with status code 200
764
765         :param expected: list of expected http result codes
766                          e.g. expected=[200] - we expect only http status 200 to be present
767
768     Returns:
769         :returns True: if list of http statuses from received responses is the same as exxpected
770         :returns False: elseware
771     """
772     return True if list(received.keys()) == expected else False