2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
18 class Counter(object):
19 def __init__(self, start=0):
20 self.lock = threading.Lock()
23 def increment(self, value=1):
33 _spreads = ["gauss", "linear", "first"] # possible defined spreads at the moment
34 _default_flow_template_json = { # templease used for config datastore
37 "hard-timeout": 65000,
38 "idle-timeout": 65000,
39 "cookie_mask": 4294967295,
40 "flow-name": "FLOW-NAME-TEMPLATE",
46 "id": "FLOW-ID-TEMPLATE",
48 "ipv4-destination": "0.0.0.0/32",
49 "ethernet-match": {"ethernet-type": {"type": 2048}},
55 "apply-actions": {"action": [{"drop-action": {}, "order": 0}]},
64 _node_tmpl = '/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id="openflow:{0}"]'
67 _default_operations_item_json = { # template used for sal operations
71 "node": "to_be_replaced",
73 "cookie_mask": 4294967295,
74 "flags": "SEND_FLOW_REM",
75 "hard-timeout": 65000,
76 "idle-timeout": 65000,
81 "action": [{"drop-action": {}, "order": 0}]
88 "ipv4-destination": "0.0.0.0/32",
89 "ethernet-match": {"ethernet-type": {"type": 2048}},
99 def _get_notes(fldet=[]):
100 """For given list of flow details it produces a dictionary with statistics
101 { swId1 : { tabId1 : flows_count1,
102 tabId2 : flows_count2,
104 'total' : switch count }
109 for (sw, tab, flow) in fldet:
111 notes[sw] = {"total": 0}
112 if tab not in notes[sw]:
115 notes[sw]["total"] += 1
119 def _randomize(spread, maxn):
120 """Returns a randomized switch or table id"""
121 if spread not in _spreads:
122 raise Exception("Spread method {} not available".format(spread))
124 if spread == "gauss":
125 ga = abs(random.gauss(0, 1))
126 rv = int(ga * float(maxn) / 3)
129 elif spread == "linear":
130 rv = int(random.random() * float(maxn))
134 raise ValueError("rv >= maxn")
135 elif spread == "first":
139 def generate_new_flow_details(
140 flows=10, switches=1, swspread="gauss", tables=250, tabspread="gauss"
142 """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
143 according to the spread rules between swithces and tables.
144 It also returns a dictionary with statsistics."""
145 swflows = [_randomize(swspread, switches) for f in range(int(flows))]
146 # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
148 (s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)
150 notes = _get_notes(fltables)
151 return fltables, notes
154 def _prepare_add(cntl, method, flows, template=None):
155 """Creates a PUT http requests to configure a flow in configuration datastore.
158 :param cntl: controller's ip address or hostname
160 :param method: determines http request method
162 :param flows: list of flow details
164 :param template: flow template to be to be filled
167 :returns req: http request object
170 sw, tab, fl, ip = fl1
171 url = "http://" + cntl + ":" + "8181"
172 url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
173 url += "/flow-node-inventory:table=" + str(tab) + "/flow=" + str(fl)
174 flow = copy.deepcopy(template["flow"][0])
176 flow["flow-name"] = "TestFlow-%d" % fl
178 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
179 flow["table_id"] = tab
180 fmod = dict(template)
182 req_data = json.dumps(fmod)
183 req = requests.Request(
186 headers={"Content-Type": "application/yang-data+json"},
188 auth=("admin", "admin"),
193 def _prepare_table_add(cntl, method, flows, template=None):
194 """Creates a POST http requests to configure a flow in configuration datastore.
197 :param cntl: controller's ip address or hostname
199 :param method: determines http request method
201 :param flows: list of flow details
203 :param template: flow template to be to be filled
206 :returns req: http request object
209 sw, tab, fl, ip = fl1
210 url = "http://" + cntl + ":" + "8181"
212 "/rests/data/opendaylight-inventory:nodes/node=openflow%3A"
214 + "/flow-node-inventory:table="
218 for sw, tab, fl, ip in flows:
219 flow = copy.deepcopy(template["flow"][0])
221 flow["flow-name"] = "TestFlow-%d" % fl
223 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
224 flow["table_id"] = tab
226 fmod = copy.deepcopy(template)
228 req_data = json.dumps(fmod)
229 req = requests.Request(
232 headers={"Content-Type": "application/yang-data+json"},
234 auth=("admin", "admin"),
239 def _prepare_delete(cntl, method, flows, template=None):
240 """Creates a DELETE http request to remove the flow from configuration datastore.
243 :param cntl: controller's ip address or hostname
245 :param method: determines http request method
247 :param flows: list of flow details
249 :param template: flow template to be to be filled
252 :returns req: http request object
255 sw, tab, fl, ip = fl1
256 url = "http://" + cntl + ":" + "8181"
257 url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
258 url += "/flow-node-inventory:table=" + str(tab) + "/flow=" + str(fl)
259 req = requests.Request(
262 headers={"Content-Type": "application/yang-data+json"},
263 auth=("admin", "admin"),
268 def _prepare_rpc_item(cntl, method, flows, template=None):
269 """Creates a POST http requests to add or remove a flow using openflowplugin rpc.
272 :param cntl: controller's ip address or hostname
274 :param method: determines http request method
276 :param flows: list of flow details
278 :param template: flow template to be to be filled
281 :returns req: http request object
285 url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
287 for sw, tab, fl, ip in flows:
288 flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
289 flow["node"] = _node_tmpl.format(sw)
291 flow["flow-name"] = "TestFlow-%d" % fl
292 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
293 flow["table_id"] = tab
295 fmod = copy.deepcopy(template)
296 fmod["input"]["bulk-flow-item"] = fdets
297 req_data = json.dumps(fmod)
298 req = requests.Request(
301 headers={"Content-Type": "application/yang-data+json"},
303 auth=("admin", "admin"),
308 def _prepare_ds_item(cntl, method, flows, template=None):
309 """Creates a POST http requests to configure a flow in configuration datastore.
311 Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level)
314 :param cntl: controller's ip address or hostname
316 :param method: determines http request method
318 :param flows: list of flow details
320 :param template: flow template to be to be filled
323 :returns req: http request object
327 url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
329 for sw, tab, fl, ip in flows:
330 flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
331 flow["node"] = _node_tmpl.format(sw)
333 flow["flow-name"] = "TestFlow-%d" % fl
334 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
335 flow["table_id"] = tab
338 fmod = copy.deepcopy(template)
339 del fmod["input"]["bulk-flow-item"]
340 fmod["input"]["bulk-flow-ds-item"] = fdets
341 req_data = json.dumps(fmod)
342 req = requests.Request(
345 headers={"Content-Type": "application/yang-data+json"},
347 auth=("admin", "admin"),
352 def _wt_request_sender(
363 """The funcion sends http requests.
365 Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
369 :param thread_id: thread id
371 :param preparefnc: function to preparesthe http request
373 :param inqueue: input queue, flow details are comming from here
375 :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
377 :param controllers: a list of controllers' ip addresses or hostnames
379 :param restport: restconf port
381 :param template: flow template used for creating flow content
383 :param outqueue: queue where the results should be put
385 :param method: method derermines the type of http request
388 nothing, results must be put into the output queue
390 ses = requests.Session()
391 cntl = controllers[0]
392 counter = [0 for i in range(600)]
400 flowlist = inqueue.get(timeout=1)
402 if exitevent.is_set() and inqueue.empty():
405 req = preparefnc(cntl, method, flowlist, template=template)
406 # prep = ses.prepare_request(req)
409 rsp = ses.send(prep, timeout=5)
410 except requests.exceptions.Timeout:
411 print(f"*WARN* [{req_no}] Timeout: {req.method} {req.url}")
414 print("*ERROR* Too many timeouts.")
418 if rsp.status_code not in [200, 201, 204]:
420 f"*WARN* [{req_no}] Status code {rsp.status_code}:"
421 f" {req.method} {req.url}\n{rsp.text}"
425 print("*ERROR* Too many errors.")
427 counter[rsp.status_code] += 1
429 for i, v in enumerate(counter):
439 controllers=["127.0.0.1"],
444 """The main function which drives sending of http requests.
446 Creates 2 queues and requested number of 'working threads'. One queue is filled with flow details and working
447 threads read them out and send http requests. The other queue is for sending results from working threads back.
448 After the threads' join, it produces a summary result.
451 :param method: based on this the function which prepares http request is choosen
453 :param flow_template: template to generate a flow content
455 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
457 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
459 :param restport: restconf port (default='8181')
461 :param nrthreads: number of threads used to send http requests (default=1)
463 :param fpr: flow per request, number of flows sent in one http request
466 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
468 # TODO: multi controllers support
469 ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
471 # choose message prepare function
473 preparefnc = _prepare_add
474 # put can contain only 1 flow, lets overwrite any value of flows per request
476 elif method == "POST":
477 preparefnc = _prepare_table_add
478 elif method == "DELETE":
479 preparefnc = _prepare_delete
480 # delete flow can contain only 1 flow, lets overwrite any value of flows per request
482 elif method in ["add-flows-ds", "remove-flows-ds"]:
483 preparefnc = _prepare_ds_item
484 elif method in ["add-flows-rpc", "remove-flows-rpc"]:
485 preparefnc = _prepare_rpc_item
487 raise NotImplementedError(
488 "Method {0} does not have it's prepeare function defined".format(method)
491 # lets enlarge the tupple of flow details with IP, to be used with the template
492 flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
493 # lels divide flows into switches and tables - flow groups
498 if flowkey in flowgroups:
499 flowgroups[flowkey].append(flow)
501 flowgroups[flowkey] = [flow]
503 # lets fill the queue with details needed for one http requests
504 # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists
505 # according to the flows per request (fpr) paramer
506 sendqueue = queue.Queue()
507 for flowgroup, flow_list in flowgroups.items():
508 while len(flow_list) > 0:
509 sendqueue.put(flow_list[: int(fpr)])
510 flow_list = flow_list[int(fpr) :]
513 resultqueue = queue.Queue()
515 exitevent = threading.Event()
517 # lets start threads whic will read flow details fro queues and send
519 for i in range(int(nrthreads)):
520 thr = threading.Thread(
521 target=_wt_request_sender,
522 args=(i, preparefnc),
524 "inqueue": sendqueue,
525 "exitevent": exitevent,
526 "controllers": controllers,
527 "restport": restport,
528 "template": flow_template,
529 "outqueue": resultqueue,
539 # waitng for reqults and sum them up
542 # reading partial resutls from sender thread
543 part_result = resultqueue.get()
544 for k, v in part_result.items():
552 def configure_flows(*args, **kwargs):
553 """Configure flows based on given flow details.
556 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
558 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
560 :param restport: restconf port (default='8181')
562 :param nrthreads: number of threads used to send http requests (default=1)
565 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
567 return _task_executor(
568 method="PUT", flow_template=_default_flow_template_json, **kwargs
572 def deconfigure_flows(*args, **kwargs):
573 """Deconfigure flows based on given flow details.
576 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
578 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
580 :param restport: restconf port (default='8181')
582 :param nrthreads: number of threads used to send http requests (default=1)
585 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
587 return _task_executor(
588 method="DELETE", flow_template=_default_flow_template_json, **kwargs
592 def configure_flows_bulk(*args, **kwargs):
593 """Configure flows based on given flow details using a POST http request..
596 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
598 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
600 :param restport: restconf port (default='8181')
602 :param nrthreads: number of threads used to send http requests (default=1)
605 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
607 return _task_executor(
608 method="POST", flow_template=_default_flow_template_json, **kwargs
612 def operations_add_flows_ds(*args, **kwargs):
613 """Configure flows based on given flow details.
616 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
618 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
620 :param restport: restconf port (default='8181')
622 :param nrthreads: number of threads used to send http requests (default=1)
625 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
627 return _task_executor(
628 method="add-flows-ds", flow_template=_default_operations_item_json, **kwargs
632 def operations_remove_flows_ds(*args, **kwargs):
633 """Remove flows based on given flow details.
636 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
638 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
640 :param restport: restconf port (default='8181')
642 :param nrthreads: number of threads used to send http requests (default=1)
645 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
647 return _task_executor(
648 method="remove-flows-ds", flow_template=_default_operations_item_json, **kwargs
652 def operations_add_flows_rpc(*args, **kwargs):
653 """Configure flows based on given flow details using rpc calls.
656 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
658 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
660 :param restport: restconf port (default='8181')
662 :param nrthreads: number of threads used to send http requests (default=1)
665 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
667 return _task_executor(
668 method="add-flows-rpc", flow_template=_default_operations_item_json, **kwargs
672 def operations_remove_flows_rpc(*args, **kwargs):
673 """Remove flows based on given flow details using rpc calls.
676 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
678 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
680 :param restport: restconf port (default='8181')
682 :param nrthreads: number of threads used to send http requests (default=1)
685 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
687 return _task_executor(
688 method="remove-flows-rpc", flow_template=_default_operations_item_json, **kwargs
692 def _get_operational_inventory_of_switches(controller):
693 """Gets number of switches present in the operational inventory
696 :param controller: controller's ip or host name
699 :returns switches: number of switches connected
704 + ":8181/rests/data/opendaylight-inventory:nodes?content=nonconfig"
708 headers={"Accept": "application/yang-data+json"},
710 auth=("admin", "admin"),
712 if rsp.status_code != 200:
714 inv = json.loads(rsp.content)
715 if "opendaylight-inventory:nodes" not in inv:
717 if "node" not in inv["opendaylight-inventory:nodes"]:
719 inv = inv["opendaylight-inventory:nodes"]["node"]
720 switches = [sw for sw in inv if "openflow:" in sw["id"]]
724 def flow_stats_collected(controller=""):
725 """Provides the operational inventory counts counts of switches and flows.
728 :param controller: controller's ip address or host name
731 :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows
735 switches = _get_operational_inventory_of_switches(controller)
739 tabs = sw["flow-node-inventory:table"]
742 "opendaylight-flow-table-statistics:flow-table-statistics"
745 found_flows += len(t["flow"])
748 "Switches,ActiveFlows(reported)/FlowsFound",
754 return len(switches), active_flows, found_flows
757 def get_switches_count(controller=""):
758 """Gives the count of the switches presnt in the operational inventory nodes datastore.
761 :param controller: controller's ip address or host name
764 :returns switches: returns the number of connected switches
766 switches = _get_operational_inventory_of_switches(controller)
772 def validate_responses(received, expected):
773 """Compares given response summary with expected results.
776 :param received: dictionary returned from operations_* and (de)configure_flows*
778 e.g. received = { 200:41 } - this means that we 41x receives response with status code 200
780 :param expected: list of expected http result codes
781 e.g. expected=[200] - we expect only http status 200 to be present
784 :returns True: if list of http statuses from received responses is the same as exxpected
785 :returns False: elseware
787 return True if list(received.keys()) == expected else False