2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
18 class Counter(object):
19 def __init__(self, start=0):
20 self.lock = threading.Lock()
23 def increment(self, value=1):
33 _spreads = ["gauss", "linear", "first"] # possible defined spreads at the moment
34 _default_flow_template_json = { # templease used for config datastore
37 "hard-timeout": 65000,
38 "idle-timeout": 65000,
39 "cookie_mask": 4294967295,
40 "flow-name": "FLOW-NAME-TEMPLATE",
46 "id": "FLOW-ID-TEMPLATE",
48 "ipv4-destination": "0.0.0.0/32",
49 "ethernet-match": {"ethernet-type": {"type": 2048}},
55 "apply-actions": {"action": [{"drop-action": {}, "order": 0}]},
64 _node_tmpl = '/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id="openflow:{0}"]'
67 _default_operations_item_json = { # template used for sal operations
71 "node": "to_be_replaced",
73 "cookie_mask": 4294967295,
74 "flags": "SEND_FLOW_REM",
75 "hard-timeout": 65000,
76 "idle-timeout": 65000,
81 "action": [{"drop-action": {}, "order": 0}]
88 "ipv4-destination": "0.0.0.0/32",
89 "ethernet-match": {"ethernet-type": {"type": 2048}},
99 def _get_notes(fldet=[]):
100 """For given list of flow details it produces a dictionary with statistics
101 { swId1 : { tabId1 : flows_count1,
102 tabId2 : flows_count2,
104 'total' : switch count }
109 for (sw, tab, flow) in fldet:
111 notes[sw] = {"total": 0}
112 if tab not in notes[sw]:
115 notes[sw]["total"] += 1
119 def _randomize(spread, maxn):
120 """Returns a randomized switch or table id"""
121 if spread not in _spreads:
122 raise Exception("Spread method {} not available".format(spread))
124 if spread == "gauss":
125 ga = abs(random.gauss(0, 1))
126 rv = int(ga * float(maxn) / 3)
129 elif spread == "linear":
130 rv = int(random.random() * float(maxn))
134 raise ValueError("rv >= maxn")
135 elif spread == "first":
139 def generate_new_flow_details(
140 flows=10, switches=1, swspread="gauss", tables=250, tabspread="gauss"
142 """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
143 according to the spread rules between swithces and tables.
144 It also returns a dictionary with statsistics."""
145 swflows = [_randomize(swspread, switches) for f in range(int(flows))]
146 # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
148 (s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)
150 notes = _get_notes(fltables)
151 return fltables, notes
154 def _prepare_add(cntl, method, flows, template=None):
155 """Creates a PUT http requests to configure a flow in configuration datastore.
158 :param cntl: controller's ip address or hostname
160 :param method: determines http request method
162 :param flows: list of flow details
164 :param template: flow template to be to be filled
167 :returns req: http request object
170 sw, tab, fl, ip = fl1
171 url = "http://" + cntl + ":" + "8181"
172 url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
173 url += "/table=" + str(tab) + "/flow=" + str(fl)
174 flow = copy.deepcopy(template["flow"][0])
176 flow["flow-name"] = "TestFlow-%d" % fl
178 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
179 flow["table_id"] = tab
180 fmod = dict(template)
182 req_data = json.dumps(fmod)
183 req = requests.Request(
186 headers={"Content-Type": "application/yang-data+json"},
188 auth=("admin", "admin"),
193 def _prepare_table_add(cntl, method, flows, template=None):
194 """Creates a POST http requests to configure a flow in configuration datastore.
197 :param cntl: controller's ip address or hostname
199 :param method: determines http request method
201 :param flows: list of flow details
203 :param template: flow template to be to be filled
206 :returns req: http request object
209 sw, tab, fl, ip = fl1
210 url = "http://" + cntl + ":" + "8181"
212 "/rests/data/opendaylight-inventory:nodes/node=openflow%3A"
218 for sw, tab, fl, ip in flows:
219 flow = copy.deepcopy(template["flow"][0])
221 flow["flow-name"] = "TestFlow-%d" % fl
223 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
224 flow["table_id"] = tab
226 fmod = copy.deepcopy(template)
228 req_data = json.dumps(fmod)
229 req = requests.Request(
232 headers={"Content-Type": "application/yang-data+json"},
234 auth=("admin", "admin"),
239 def _prepare_delete(cntl, method, flows, template=None):
240 """Creates a DELETE http request to remove the flow from configuration datastore.
243 :param cntl: controller's ip address or hostname
245 :param method: determines http request method
247 :param flows: list of flow details
249 :param template: flow template to be to be filled
252 :returns req: http request object
255 sw, tab, fl, ip = fl1
256 url = "http://" + cntl + ":" + "8181"
257 url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
258 url += "/table=" + str(tab) + "/flow=" + str(fl)
259 req = requests.Request(
262 headers={"Content-Type": "application/yang-data+json"},
263 auth=("admin", "admin"),
268 def _prepare_rpc_item(cntl, method, flows, template=None):
269 """Creates a POST http requests to add or remove a flow using openflowplugin rpc.
272 :param cntl: controller's ip address or hostname
274 :param method: determines http request method
276 :param flows: list of flow details
278 :param template: flow template to be to be filled
281 :returns req: http request object
285 url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
287 for sw, tab, fl, ip in flows:
288 flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
289 flow["node"] = _node_tmpl.format(sw)
291 flow["flow-name"] = "TestFlow-%d" % fl
292 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
293 flow["table_id"] = tab
295 fmod = copy.deepcopy(template)
296 fmod["input"]["bulk-flow-item"] = fdets
297 req_data = json.dumps(fmod)
298 req = requests.Request(
301 headers={"Content-Type": "application/yang-data+json"},
303 auth=("admin", "admin"),
308 def _prepare_ds_item(cntl, method, flows, template=None):
309 """Creates a POST http requests to configure a flow in configuration datastore.
311 Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level)
314 :param cntl: controller's ip address or hostname
316 :param method: determines http request method
318 :param flows: list of flow details
320 :param template: flow template to be to be filled
323 :returns req: http request object
327 url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
329 for sw, tab, fl, ip in flows:
330 flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
331 flow["node"] = _node_tmpl.format(sw)
333 flow["flow-name"] = "TestFlow-%d" % fl
334 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
335 flow["table_id"] = tab
338 fmod = copy.deepcopy(template)
339 del fmod["input"]["bulk-flow-item"]
340 fmod["input"]["bulk-flow-ds-item"] = fdets
341 req_data = json.dumps(fmod)
342 req = requests.Request(
345 headers={"Content-Type": "application/yang-data+json"},
347 auth=("admin", "admin"),
352 def _wt_request_sender(
363 """The funcion sends http requests.
365 Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
369 :param thread_id: thread id
371 :param preparefnc: function to preparesthe http request
373 :param inqueue: input queue, flow details are comming from here
375 :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
377 :param controllers: a list of controllers' ip addresses or hostnames
379 :param restport: restconf port
381 :param template: flow template used for creating flow content
383 :param outqueue: queue where the results should be put
385 :param method: method derermines the type of http request
388 nothing, results must be put into the output queue
390 ses = requests.Session()
391 cntl = controllers[0]
392 counter = [0 for i in range(600)]
397 flowlist = inqueue.get(timeout=1)
399 if exitevent.is_set() and inqueue.empty():
402 req = preparefnc(cntl, method, flowlist, template=template)
403 # prep = ses.prepare_request(req)
406 rsp = ses.send(prep, timeout=5)
407 except requests.exceptions.Timeout:
408 print(f"*WARN* Timeout: {req.method} {req.url}")
411 print("*ERROR* Too many timeouts.")
415 if rsp.status_code not in [200, 201, 204]:
417 f"*WARN* Status code {rsp.status_code}: {req.method} {req.url}\n{rsp.text}"
419 counter[rsp.status_code] += 1
421 for i, v in enumerate(counter):
431 controllers=["127.0.0.1"],
436 """The main function which drives sending of http requests.
438 Creates 2 queues and requested number of 'working threads'. One queue is filled with flow details and working
439 threads read them out and send http requests. The other queue is for sending results from working threads back.
440 After the threads' join, it produces a summary result.
443 :param method: based on this the function which prepares http request is choosen
445 :param flow_template: template to generate a flow content
447 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
449 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
451 :param restport: restconf port (default='8181')
453 :param nrthreads: number of threads used to send http requests (default=1)
455 :param fpr: flow per request, number of flows sent in one http request
458 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
460 # TODO: multi controllers support
461 ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
463 # choose message prepare function
465 preparefnc = _prepare_add
466 # put can contain only 1 flow, lets overwrite any value of flows per request
468 elif method == "POST":
469 preparefnc = _prepare_table_add
470 elif method == "DELETE":
471 preparefnc = _prepare_delete
472 # delete flow can contain only 1 flow, lets overwrite any value of flows per request
474 elif method in ["add-flows-ds", "remove-flows-ds"]:
475 preparefnc = _prepare_ds_item
476 elif method in ["add-flows-rpc", "remove-flows-rpc"]:
477 preparefnc = _prepare_rpc_item
479 raise NotImplementedError(
480 "Method {0} does not have it's prepeare function defined".format(method)
483 # lets enlarge the tupple of flow details with IP, to be used with the template
484 flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
485 # lels divide flows into switches and tables - flow groups
490 if flowkey in flowgroups:
491 flowgroups[flowkey].append(flow)
493 flowgroups[flowkey] = [flow]
495 # lets fill the queue with details needed for one http requests
496 # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists
497 # according to the flows per request (fpr) paramer
498 sendqueue = queue.Queue()
499 for flowgroup, flow_list in flowgroups.items():
500 while len(flow_list) > 0:
501 sendqueue.put(flow_list[: int(fpr)])
502 flow_list = flow_list[int(fpr) :]
505 resultqueue = queue.Queue()
507 exitevent = threading.Event()
509 # lets start threads whic will read flow details fro queues and send
511 for i in range(int(nrthreads)):
512 thr = threading.Thread(
513 target=_wt_request_sender,
514 args=(i, preparefnc),
516 "inqueue": sendqueue,
517 "exitevent": exitevent,
518 "controllers": controllers,
519 "restport": restport,
520 "template": flow_template,
521 "outqueue": resultqueue,
531 # waitng for reqults and sum them up
534 # reading partial resutls from sender thread
535 part_result = resultqueue.get()
536 for k, v in part_result.items():
544 def configure_flows(*args, **kwargs):
545 """Configure flows based on given flow details.
548 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
550 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
552 :param restport: restconf port (default='8181')
554 :param nrthreads: number of threads used to send http requests (default=1)
557 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
559 return _task_executor(
560 method="PUT", flow_template=_default_flow_template_json, **kwargs
564 def deconfigure_flows(*args, **kwargs):
565 """Deconfigure flows based on given flow details.
568 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
570 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
572 :param restport: restconf port (default='8181')
574 :param nrthreads: number of threads used to send http requests (default=1)
577 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
579 return _task_executor(
580 method="DELETE", flow_template=_default_flow_template_json, **kwargs
584 def configure_flows_bulk(*args, **kwargs):
585 """Configure flows based on given flow details using a POST http request..
588 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
590 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
592 :param restport: restconf port (default='8181')
594 :param nrthreads: number of threads used to send http requests (default=1)
597 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
599 return _task_executor(
600 method="POST", flow_template=_default_flow_template_json, **kwargs
604 def operations_add_flows_ds(*args, **kwargs):
605 """Configure flows based on given flow details.
608 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
610 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
612 :param restport: restconf port (default='8181')
614 :param nrthreads: number of threads used to send http requests (default=1)
617 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
619 return _task_executor(
620 method="add-flows-ds", flow_template=_default_operations_item_json, **kwargs
624 def operations_remove_flows_ds(*args, **kwargs):
625 """Remove flows based on given flow details.
628 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
630 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
632 :param restport: restconf port (default='8181')
634 :param nrthreads: number of threads used to send http requests (default=1)
637 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
639 return _task_executor(
640 method="remove-flows-ds", flow_template=_default_operations_item_json, **kwargs
644 def operations_add_flows_rpc(*args, **kwargs):
645 """Configure flows based on given flow details using rpc calls.
648 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
650 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
652 :param restport: restconf port (default='8181')
654 :param nrthreads: number of threads used to send http requests (default=1)
657 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
659 return _task_executor(
660 method="add-flows-rpc", flow_template=_default_operations_item_json, **kwargs
664 def operations_remove_flows_rpc(*args, **kwargs):
665 """Remove flows based on given flow details using rpc calls.
668 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
670 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
672 :param restport: restconf port (default='8181')
674 :param nrthreads: number of threads used to send http requests (default=1)
677 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
679 return _task_executor(
680 method="remove-flows-rpc", flow_template=_default_operations_item_json, **kwargs
684 def _get_operational_inventory_of_switches(controller):
685 """Gets number of switches present in the operational inventory
688 :param controller: controller's ip or host name
691 :returns switches: number of switches connected
696 + ":8181/rests/data/opendaylight-inventory:nodes?content=nonconfig"
700 headers={"Accept": "application/yang-data+json"},
702 auth=("admin", "admin"),
704 if rsp.status_code != 200:
706 inv = json.loads(rsp.content)
707 if "opendaylight-inventory:nodes" not in inv:
709 if "node" not in inv["opendaylight-inventory:nodes"]:
711 inv = inv["opendaylight-inventory:nodes"]["node"]
712 switches = [sw for sw in inv if "openflow:" in sw["id"]]
716 def flow_stats_collected(controller=""):
717 """Provides the operational inventory counts counts of switches and flows.
720 :param controller: controller's ip address or host name
723 :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows
727 switches = _get_operational_inventory_of_switches(controller)
731 tabs = sw["flow-node-inventory:table"]
734 "opendaylight-flow-table-statistics:flow-table-statistics"
737 found_flows += len(t["flow"])
740 "Switches,ActiveFlows(reported)/FlowsFound",
746 return len(switches), active_flows, found_flows
749 def get_switches_count(controller=""):
750 """Gives the count of the switches presnt in the operational inventory nodes datastore.
753 :param controller: controller's ip address or host name
756 :returns switches: returns the number of connected switches
758 switches = _get_operational_inventory_of_switches(controller)
764 def validate_responses(received, expected):
765 """Compares given response summary with expected results.
768 :param received: dictionary returned from operations_* and (de)configure_flows*
770 e.g. received = { 200:41 } - this means that we 41x receives response with status code 200
772 :param expected: list of expected http result codes
773 e.g. expected=[200] - we expect only http status 200 to be present
776 :returns True: if list of http statuses from received responses is the same as exxpected
777 :returns False: elseware
779 return True if list(received.keys()) == expected else False