Replace Bierman02 with RFC8040 for OpenFlow Plugin
[integration/test.git] / csit / libraries / ScaleClient.py
1 """
2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
4
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
8 """
9 import random
10 import threading
11 import netaddr
12 import queue
13 import requests
14 import json
15 import copy
16
17
18 class Counter(object):
19     def __init__(self, start=0):
20         self.lock = threading.Lock()
21         self.value = start
22
23     def increment(self, value=1):
24         self.lock.acquire()
25         val = self.value
26         try:
27             self.value += value
28         finally:
29             self.lock.release()
30         return val
31
32
33 _spreads = ["gauss", "linear", "first"]  # possible defined spreads at the moment
34 _default_flow_template_json = {  # templease used for config datastore
35     "flow": [
36         {
37             "hard-timeout": 65000,
38             "idle-timeout": 65000,
39             "cookie_mask": 4294967295,
40             "flow-name": "FLOW-NAME-TEMPLATE",
41             "priority": 2,
42             "strict": False,
43             "cookie": 0,
44             "table_id": 0,
45             "installHw": False,
46             "id": "FLOW-ID-TEMPLATE",
47             "match": {
48                 "ipv4-destination": "0.0.0.0/32",
49                 "ethernet-match": {"ethernet-type": {"type": 2048}},
50             },
51             "instructions": {
52                 "instruction": [
53                     {
54                         "order": 0,
55                         "apply-actions": {"action": [{"drop-action": {}, "order": 0}]},
56                     }
57                 ]
58             },
59         }
60     ]
61 }
62
63
64 _node_tmpl = '/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id="openflow:{0}"]'
65
66
67 _default_operations_item_json = {  # template used for sal operations
68     "input": {
69         "bulk-flow-item": [
70             {
71                 "node": "to_be_replaced",
72                 "cookie": 0,
73                 "cookie_mask": 4294967295,
74                 "flags": "SEND_FLOW_REM",
75                 "hard-timeout": 65000,
76                 "idle-timeout": 65000,
77                 "instructions": {
78                     "instruction": [
79                         {
80                             "apply-actions": {
81                                 "action": [{"drop-action": {}, "order": 0}]
82                             },
83                             "order": 0,
84                         }
85                     ]
86                 },
87                 "match": {
88                     "ipv4-destination": "0.0.0.0/32",
89                     "ethernet-match": {"ethernet-type": {"type": 2048}},
90                 },
91                 "priority": 2,
92                 "table_id": 0,
93             }
94         ]
95     }
96 }
97
98
99 def _get_notes(fldet=[]):
100     """For given list of flow details it produces a dictionary with statistics
101     { swId1 : { tabId1 : flows_count1,
102                 tabId2 : flows_count2,
103                ...
104                 'total' : switch count }
105       swId2 ...
106     }
107     """
108     notes = {}
109     for (sw, tab, flow) in fldet:
110         if sw not in notes:
111             notes[sw] = {"total": 0}
112         if tab not in notes[sw]:
113             notes[sw][tab] = 0
114         notes[sw][tab] += 1
115         notes[sw]["total"] += 1
116     return notes
117
118
119 def _randomize(spread, maxn):
120     """Returns a randomized switch or table id"""
121     if spread not in _spreads:
122         raise Exception("Spread method {} not available".format(spread))
123     while True:
124         if spread == "gauss":
125             ga = abs(random.gauss(0, 1))
126             rv = int(ga * float(maxn) / 3)
127             if rv < maxn:
128                 return rv
129         elif spread == "linear":
130             rv = int(random.random() * float(maxn))
131             if rv < maxn:
132                 return rv
133             else:
134                 raise ValueError("rv >= maxn")
135         elif spread == "first":
136             return 0
137
138
139 def generate_new_flow_details(
140     flows=10, switches=1, swspread="gauss", tables=250, tabspread="gauss"
141 ):
142     """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
143     according to the spread rules between swithces and tables.
144     It also returns a dictionary with statsistics."""
145     swflows = [_randomize(swspread, switches) for f in range(int(flows))]
146     # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
147     fltables = [
148         (s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)
149     ]
150     notes = _get_notes(fltables)
151     return fltables, notes
152
153
154 def _prepare_add(cntl, method, flows, template=None):
155     """Creates a PUT http requests to configure a flow in configuration datastore.
156
157     Args:
158         :param cntl: controller's ip address or hostname
159
160         :param method: determines http request method
161
162         :param flows: list of flow details
163
164         :param template: flow template to be to be filled
165
166     Returns:
167         :returns req: http request object
168     """
169     fl1 = flows[0]
170     sw, tab, fl, ip = fl1
171     url = "http://" + cntl + ":" + "8181"
172     url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
173     url += "/table=" + str(tab) + "/flow=" + str(fl)
174     flow = copy.deepcopy(template["flow"][0])
175     flow["cookie"] = fl
176     flow["flow-name"] = "TestFlow-%d" % fl
177     flow["id"] = str(fl)
178     flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
179     flow["table_id"] = tab
180     fmod = dict(template)
181     fmod["flow"] = flow
182     req_data = json.dumps(fmod)
183     req = requests.Request(
184         "PUT",
185         url,
186         headers={"Content-Type": "application/yang-data+json"},
187         data=req_data,
188         auth=("admin", "admin"),
189     )
190     return req
191
192
193 def _prepare_table_add(cntl, method, flows, template=None):
194     """Creates a POST http requests to configure a flow in configuration datastore.
195
196     Args:
197         :param cntl: controller's ip address or hostname
198
199         :param method: determines http request method
200
201         :param flows: list of flow details
202
203         :param template: flow template to be to be filled
204
205     Returns:
206         :returns req: http request object
207     """
208     fl1 = flows[0]
209     sw, tab, fl, ip = fl1
210     url = "http://" + cntl + ":" + "8181"
211     url += (
212         "/rests/data/opendaylight-inventory:nodes/node=openflow%3A"
213         + str(sw)
214         + "/table="
215         + str(tab)
216     )
217     fdets = []
218     for sw, tab, fl, ip in flows:
219         flow = copy.deepcopy(template["flow"][0])
220         flow["cookie"] = fl
221         flow["flow-name"] = "TestFlow-%d" % fl
222         flow["id"] = str(fl)
223         flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
224         flow["table_id"] = tab
225         fdets.append(flow)
226     fmod = copy.deepcopy(template)
227     fmod["flow"] = fdets
228     req_data = json.dumps(fmod)
229     req = requests.Request(
230         "POST",
231         url,
232         headers={"Content-Type": "application/yang-data+json"},
233         data=req_data,
234         auth=("admin", "admin"),
235     )
236     return req
237
238
239 def _prepare_delete(cntl, method, flows, template=None):
240     """Creates a DELETE http request to remove the flow from configuration datastore.
241
242     Args:
243         :param cntl: controller's ip address or hostname
244
245         :param method: determines http request method
246
247         :param flows: list of flow details
248
249         :param template: flow template to be to be filled
250
251     Returns:
252         :returns req: http request object
253     """
254     fl1 = flows[0]
255     sw, tab, fl, ip = fl1
256     url = "http://" + cntl + ":" + "8181"
257     url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
258     url += "/table=" + str(tab) + "/flow=" + str(fl)
259     req = requests.Request(
260         "DELETE",
261         url,
262         headers={"Content-Type": "application/yang-data+json"},
263         auth=("admin", "admin"),
264     )
265     return req
266
267
268 def _prepare_rpc_item(cntl, method, flows, template=None):
269     """Creates a POST http requests to add or remove a flow using openflowplugin rpc.
270
271     Args:
272         :param cntl: controller's ip address or hostname
273
274         :param method: determines http request method
275
276         :param flows: list of flow details
277
278         :param template: flow template to be to be filled
279
280     Returns:
281         :returns req: http request object
282     """
283     f1 = flows[0]
284     sw, tab, fl, ip = f1
285     url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
286     fdets = []
287     for sw, tab, fl, ip in flows:
288         flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
289         flow["node"] = _node_tmpl.format(sw)
290         flow["cookie"] = fl
291         flow["flow-name"] = "TestFlow-%d" % fl
292         flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
293         flow["table_id"] = tab
294         fdets.append(flow)
295     fmod = copy.deepcopy(template)
296     fmod["input"]["bulk-flow-item"] = fdets
297     req_data = json.dumps(fmod)
298     req = requests.Request(
299         "POST",
300         url,
301         headers={"Content-Type": "application/yang-data+json"},
302         data=req_data,
303         auth=("admin", "admin"),
304     )
305     return req
306
307
308 def _prepare_ds_item(cntl, method, flows, template=None):
309     """Creates a POST http requests to configure a flow in configuration datastore.
310
311     Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level)
312
313     Args:
314         :param cntl: controller's ip address or hostname
315
316         :param method: determines http request method
317
318         :param flows: list of flow details
319
320         :param template: flow template to be to be filled
321
322     Returns:
323         :returns req: http request object
324     """
325     f1 = flows[0]
326     sw, tab, fl, ip = f1
327     url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
328     fdets = []
329     for sw, tab, fl, ip in flows:
330         flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
331         flow["node"] = _node_tmpl.format(sw)
332         flow["cookie"] = fl
333         flow["flow-name"] = "TestFlow-%d" % fl
334         flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
335         flow["table_id"] = tab
336         flow["flow-id"] = fl
337         fdets.append(flow)
338     fmod = copy.deepcopy(template)
339     del fmod["input"]["bulk-flow-item"]
340     fmod["input"]["bulk-flow-ds-item"] = fdets
341     req_data = json.dumps(fmod)
342     req = requests.Request(
343         "POST",
344         url,
345         headers={"Content-Type": "application/yang-data+json"},
346         data=req_data,
347         auth=("admin", "admin"),
348     )
349     return req
350
351
352 def _wt_request_sender(
353     thread_id,
354     preparefnc,
355     inqueue=None,
356     exitevent=None,
357     controllers=[],
358     restport="",
359     template=None,
360     outqueue=None,
361     method=None,
362 ):
363     """The funcion sends http requests.
364
365     Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
366     to the controller
367
368     Args:
369         :param thread_id: thread id
370
371         :param preparefnc: function to preparesthe http request
372
373         :param inqueue: input queue, flow details are comming from here
374
375         :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
376
377         :param controllers: a list of controllers' ip addresses or hostnames
378
379         :param restport: restconf port
380
381         :param template: flow template used for creating flow content
382
383         :param outqueue: queue where the results should be put
384
385         :param method: method derermines the type of http request
386
387     Returns:
388         nothing, results must be put into the output queue
389     """
390     ses = requests.Session()
391     cntl = controllers[0]
392     counter = [0 for i in range(600)]
393     loop = True
394
395     while loop:
396         try:
397             flowlist = inqueue.get(timeout=1)
398         except queue.Empty:
399             if exitevent.is_set() and inqueue.empty():
400                 loop = False
401             continue
402         req = preparefnc(cntl, method, flowlist, template=template)
403         # prep = ses.prepare_request(req)
404         prep = req.prepare()
405         try:
406             rsp = ses.send(prep, timeout=5)
407         except requests.exceptions.Timeout:
408             print(f"*WARN* Timeout: {req.method} {req.url}")
409             counter[99] += 1
410             if counter[99] > 10:
411                 print("*ERROR* Too many timeouts.")
412                 break
413             continue
414         else:
415             if rsp.status_code not in [200, 201, 204]:
416                 print(
417                     f"*WARN* Status code {rsp.status_code}: {req.method} {req.url}\n{rsp.text}"
418                 )
419         counter[rsp.status_code] += 1
420     res = {}
421     for i, v in enumerate(counter):
422         if v > 0:
423             res[i] = v
424     outqueue.put(res)
425
426
427 def _task_executor(
428     method="",
429     flow_template=None,
430     flow_details=[],
431     controllers=["127.0.0.1"],
432     restport="8181",
433     nrthreads=1,
434     fpr=1,
435 ):
436     """The main function which drives sending of http requests.
437
438     Creates 2 queues and requested number of 'working threads'.  One queue is filled with flow details and working
439     threads read them out and send http requests. The other queue is for sending results from working threads back.
440     After the threads' join, it produces a summary result.
441
442     Args:
443         :param method: based on this the function which prepares http request is choosen
444
445         :param flow_template: template to generate a flow content
446
447         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
448
449         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
450
451         :param restport: restconf port (default='8181')
452
453         :param nrthreads: number of threads used to send http requests (default=1)
454
455         :param fpr: flow per request, number of flows sent in one http request
456
457     Returns:
458         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
459     """
460     # TODO: multi controllers support
461     ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
462
463     # choose message prepare function
464     if method == "PUT":
465         preparefnc = _prepare_add
466         # put can contain only 1 flow, lets overwrite any value of flows per request
467         fpr = 1
468     elif method == "POST":
469         preparefnc = _prepare_table_add
470     elif method == "DELETE":
471         preparefnc = _prepare_delete
472         # delete flow can contain only 1 flow, lets overwrite any value of flows per request
473         fpr = 1
474     elif method in ["add-flows-ds", "remove-flows-ds"]:
475         preparefnc = _prepare_ds_item
476     elif method in ["add-flows-rpc", "remove-flows-rpc"]:
477         preparefnc = _prepare_rpc_item
478     else:
479         raise NotImplementedError(
480             "Method {0} does not have it's prepeare function defined".format(method)
481         )
482
483     # lets enlarge the tupple of flow details with IP, to be used with the template
484     flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
485     # lels divide flows into switches and tables - flow groups
486     flowgroups = {}
487     for flow in flows:
488         sw, tab, _, _ = flow
489         flowkey = (sw, tab)
490         if flowkey in flowgroups:
491             flowgroups[flowkey].append(flow)
492         else:
493             flowgroups[flowkey] = [flow]
494
495     # lets fill the queue with details needed for one http requests
496     # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists
497     # according to the flows per request (fpr) paramer
498     sendqueue = queue.Queue()
499     for flowgroup, flow_list in flowgroups.items():
500         while len(flow_list) > 0:
501             sendqueue.put(flow_list[: int(fpr)])
502             flow_list = flow_list[int(fpr) :]
503
504     # result_gueue
505     resultqueue = queue.Queue()
506     # creaet exit event
507     exitevent = threading.Event()
508
509     # lets start threads whic will read flow details fro queues and send
510     threads = []
511     for i in range(int(nrthreads)):
512         thr = threading.Thread(
513             target=_wt_request_sender,
514             args=(i, preparefnc),
515             kwargs={
516                 "inqueue": sendqueue,
517                 "exitevent": exitevent,
518                 "controllers": controllers,
519                 "restport": restport,
520                 "template": flow_template,
521                 "outqueue": resultqueue,
522                 "method": method,
523             },
524         )
525         threads.append(thr)
526         thr.start()
527
528     exitevent.set()
529
530     result = {}
531     # waitng for reqults and sum them up
532     for t in threads:
533         t.join()
534         # reading partial resutls from sender thread
535         part_result = resultqueue.get()
536         for k, v in part_result.items():
537             if k not in result:
538                 result[k] = v
539             else:
540                 result[k] += v
541     return result
542
543
544 def configure_flows(*args, **kwargs):
545     """Configure flows based on given flow details.
546
547     Args:
548         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
549
550         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
551
552         :param restport: restconf port (default='8181')
553
554         :param nrthreads: number of threads used to send http requests (default=1)
555
556     Returns:
557         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
558     """
559     return _task_executor(
560         method="PUT", flow_template=_default_flow_template_json, **kwargs
561     )
562
563
564 def deconfigure_flows(*args, **kwargs):
565     """Deconfigure flows based on given flow details.
566
567     Args:
568         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
569
570         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
571
572         :param restport: restconf port (default='8181')
573
574         :param nrthreads: number of threads used to send http requests (default=1)
575
576     Returns:
577         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
578     """
579     return _task_executor(
580         method="DELETE", flow_template=_default_flow_template_json, **kwargs
581     )
582
583
584 def configure_flows_bulk(*args, **kwargs):
585     """Configure flows based on given flow details using a POST http request..
586
587     Args:
588         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
589
590         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
591
592         :param restport: restconf port (default='8181')
593
594         :param nrthreads: number of threads used to send http requests (default=1)
595
596     Returns:
597         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
598     """
599     return _task_executor(
600         method="POST", flow_template=_default_flow_template_json, **kwargs
601     )
602
603
604 def operations_add_flows_ds(*args, **kwargs):
605     """Configure flows based on given flow details.
606
607     Args:
608         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
609
610         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
611
612         :param restport: restconf port (default='8181')
613
614         :param nrthreads: number of threads used to send http requests (default=1)
615
616     Returns:
617         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
618     """
619     return _task_executor(
620         method="add-flows-ds", flow_template=_default_operations_item_json, **kwargs
621     )
622
623
624 def operations_remove_flows_ds(*args, **kwargs):
625     """Remove flows based on given flow details.
626
627     Args:
628         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
629
630         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
631
632         :param restport: restconf port (default='8181')
633
634         :param nrthreads: number of threads used to send http requests (default=1)
635
636     Returns:
637         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
638     """
639     return _task_executor(
640         method="remove-flows-ds", flow_template=_default_operations_item_json, **kwargs
641     )
642
643
644 def operations_add_flows_rpc(*args, **kwargs):
645     """Configure flows based on given flow details using rpc calls.
646
647     Args:
648         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
649
650         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
651
652         :param restport: restconf port (default='8181')
653
654         :param nrthreads: number of threads used to send http requests (default=1)
655
656     Returns:
657         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
658     """
659     return _task_executor(
660         method="add-flows-rpc", flow_template=_default_operations_item_json, **kwargs
661     )
662
663
664 def operations_remove_flows_rpc(*args, **kwargs):
665     """Remove flows based on given flow details using rpc calls.
666
667     Args:
668         :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
669
670         :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
671
672         :param restport: restconf port (default='8181')
673
674         :param nrthreads: number of threads used to send http requests (default=1)
675
676     Returns:
677         :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
678     """
679     return _task_executor(
680         method="remove-flows-rpc", flow_template=_default_operations_item_json, **kwargs
681     )
682
683
684 def _get_operational_inventory_of_switches(controller):
685     """Gets number of switches present in the operational inventory
686
687     Args:
688         :param controller: controller's ip or host name
689
690     Returns:
691         :returns switches: number of switches connected
692     """
693     url = (
694         "http://"
695         + controller
696         + ":8181/rests/data/opendaylight-inventory:nodes?content=nonconfig"
697     )
698     rsp = requests.get(
699         url,
700         headers={"Accept": "application/yang-data+json"},
701         stream=False,
702         auth=("admin", "admin"),
703     )
704     if rsp.status_code != 200:
705         return None
706     inv = json.loads(rsp.content)
707     if "opendaylight-inventory:nodes" not in inv:
708         return None
709     if "node" not in inv["opendaylight-inventory:nodes"]:
710         return []
711     inv = inv["opendaylight-inventory:nodes"]["node"]
712     switches = [sw for sw in inv if "openflow:" in sw["id"]]
713     return switches
714
715
716 def flow_stats_collected(controller=""):
717     """Provides the operational inventory counts counts of switches and flows.
718
719     Args:
720         :param controller: controller's ip address or host name
721
722     Returns:
723         :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows
724     """
725     active_flows = 0
726     found_flows = 0
727     switches = _get_operational_inventory_of_switches(controller)
728     if switches is None:
729         return 0, 0, 0
730     for sw in switches:
731         tabs = sw["flow-node-inventory:table"]
732         for t in tabs:
733             active_flows += t[
734                 "opendaylight-flow-table-statistics:flow-table-statistics"
735             ]["active-flows"]
736             if "flow" in t:
737                 found_flows += len(t["flow"])
738     print(
739         (
740             "Switches,ActiveFlows(reported)/FlowsFound",
741             len(switches),
742             active_flows,
743             found_flows,
744         )
745     )
746     return len(switches), active_flows, found_flows
747
748
749 def get_switches_count(controller=""):
750     """Gives the count of the switches presnt in the operational inventory nodes datastore.
751
752     Args:
753         :param controller: controller's ip address or host name
754
755     Returns:
756         :returns switches: returns the number of connected switches
757     """
758     switches = _get_operational_inventory_of_switches(controller)
759     if switches is None:
760         return 0
761     return len(switches)
762
763
764 def validate_responses(received, expected):
765     """Compares given response summary with expected results.
766
767     Args:
768         :param received: dictionary returned from operations_* and (de)configure_flows*
769                          of this library
770                          e.g. received = { 200:41 } - this means that we 41x receives response with status code 200
771
772         :param expected: list of expected http result codes
773                          e.g. expected=[200] - we expect only http status 200 to be present
774
775     Returns:
776         :returns True: if list of http statuses from received responses is the same as exxpected
777         :returns False: elseware
778     """
779     return True if list(received.keys()) == expected else False