2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
18 class Counter(object):
19 def __init__(self, start=0):
20 self.lock = threading.Lock()
23 def increment(self, value=1):
33 _spreads = ["gauss", "linear", "first"] # possible defined spreads at the moment
34 _default_flow_template_json = { # templease used for config datastore
37 u"hard-timeout": 65000,
38 u"idle-timeout": 65000,
39 u"cookie_mask": 4294967295,
40 u"flow-name": u"FLOW-NAME-TEMPLATE",
46 u"id": u"FLOW-ID-TEMPLATE",
48 u"ipv4-destination": u"0.0.0.0/32",
49 u"ethernet-match": {u"ethernet-type": {u"type": 2048}},
56 u"action": [{u"drop-action": {}, u"order": 0}]
66 _node_tmpl = '/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id="openflow:{0}"]'
69 _default_operations_item_json = { # template used for sal operations
73 "node": "to_be_replaced",
75 "cookie_mask": 4294967295,
76 "flags": "SEND_FLOW_REM",
77 "hard-timeout": 65000,
78 "idle-timeout": 65000,
83 "action": [{"drop-action": {}, "order": 0}]
90 "ipv4-destination": "0.0.0.0/32",
91 "ethernet-match": {"ethernet-type": {"type": 2048}},
101 def _get_notes(fldet=[]):
102 """For given list of flow details it produces a dictionary with statistics
103 { swId1 : { tabId1 : flows_count1,
104 tabId2 : flows_count2,
106 'total' : switch count }
111 for (sw, tab, flow) in fldet:
113 notes[sw] = {"total": 0}
114 if tab not in notes[sw]:
117 notes[sw]["total"] += 1
121 def _randomize(spread, maxn):
122 """Returns a randomized switch or table id"""
123 if spread not in _spreads:
124 raise Exception("Spread method {} not available".format(spread))
126 if spread == "gauss":
127 ga = abs(random.gauss(0, 1))
128 rv = int(ga * float(maxn) / 3)
131 elif spread == "linear":
132 rv = int(random.random() * float(maxn))
136 raise ValueError("rv >= maxn")
137 elif spread == "first":
141 def generate_new_flow_details(
142 flows=10, switches=1, swspread="gauss", tables=250, tabspread="gauss"
144 """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
145 according to the spread rules between swithces and tables.
146 It also returns a dictionary with statsistics."""
147 swflows = [_randomize(swspread, switches) for f in range(int(flows))]
148 # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
150 (s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)
152 notes = _get_notes(fltables)
153 return fltables, notes
156 def _prepare_add(cntl, method, flows, template=None):
157 """Creates a PUT http requests to configure a flow in configuration datastore.
160 :param cntl: controller's ip address or hostname
162 :param method: determines http request method
164 :param flows: list of flow details
166 :param template: flow template to be to be filled
169 :returns req: http request object
172 sw, tab, fl, ip = fl1
173 url = "http://" + cntl + ":" + "8181"
174 url += "/restconf/config/opendaylight-inventory:nodes/node/openflow:" + str(sw)
175 url += "/table/" + str(tab) + "/flow/" + str(fl)
176 flow = copy.deepcopy(template["flow"][0])
178 flow["flow-name"] = "TestFlow-%d" % fl
180 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
181 flow["table_id"] = tab
182 fmod = dict(template)
184 req_data = json.dumps(fmod)
185 req = requests.Request(
188 headers={"Content-Type": "application/json"},
190 auth=("admin", "admin"),
195 def _prepare_table_add(cntl, method, flows, template=None):
196 """Creates a POST http requests to configure a flow in configuration datastore.
199 :param cntl: controller's ip address or hostname
201 :param method: determines http request method
203 :param flows: list of flow details
205 :param template: flow template to be to be filled
208 :returns req: http request object
211 sw, tab, fl, ip = fl1
212 url = "http://" + cntl + ":" + "8181"
214 "/restconf/config/opendaylight-inventory:nodes/node/openflow:"
220 for sw, tab, fl, ip in flows:
221 flow = copy.deepcopy(template["flow"][0])
223 flow["flow-name"] = "TestFlow-%d" % fl
225 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
226 flow["table_id"] = tab
228 fmod = copy.deepcopy(template)
230 req_data = json.dumps(fmod)
231 req = requests.Request(
234 headers={"Content-Type": "application/json"},
236 auth=("admin", "admin"),
241 def _prepare_delete(cntl, method, flows, template=None):
242 """Creates a DELETE http request to remove the flow from configuration datastore.
245 :param cntl: controller's ip address or hostname
247 :param method: determines http request method
249 :param flows: list of flow details
251 :param template: flow template to be to be filled
254 :returns req: http request object
257 sw, tab, fl, ip = fl1
258 url = "http://" + cntl + ":" + "8181"
259 url += "/restconf/config/opendaylight-inventory:nodes/node/openflow:" + str(sw)
260 url += "/table/" + str(tab) + "/flow/" + str(fl)
261 req = requests.Request(
264 headers={"Content-Type": "application/json"},
265 auth=("admin", "admin"),
270 def _prepare_rpc_item(cntl, method, flows, template=None):
271 """Creates a POST http requests to add or remove a flow using openflowplugin rpc.
274 :param cntl: controller's ip address or hostname
276 :param method: determines http request method
278 :param flows: list of flow details
280 :param template: flow template to be to be filled
283 :returns req: http request object
287 url = "http://" + cntl + ":" + "8181/restconf/operations/sal-bulk-flow:" + method
289 for sw, tab, fl, ip in flows:
290 flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
291 flow["node"] = _node_tmpl.format(sw)
293 flow["flow-name"] = "TestFlow-%d" % fl
294 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
295 flow["table_id"] = tab
297 fmod = copy.deepcopy(template)
298 fmod["input"]["bulk-flow-item"] = fdets
299 req_data = json.dumps(fmod)
300 req = requests.Request(
303 headers={"Content-Type": "application/json"},
305 auth=("admin", "admin"),
310 def _prepare_ds_item(cntl, method, flows, template=None):
311 """Creates a POST http requests to configure a flow in configuration datastore.
313 Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level)
316 :param cntl: controller's ip address or hostname
318 :param method: determines http request method
320 :param flows: list of flow details
322 :param template: flow template to be to be filled
325 :returns req: http request object
329 url = "http://" + cntl + ":" + "8181/restconf/operations/sal-bulk-flow:" + method
331 for sw, tab, fl, ip in flows:
332 flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
333 flow["node"] = _node_tmpl.format(sw)
335 flow["flow-name"] = "TestFlow-%d" % fl
336 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
337 flow["table_id"] = tab
340 fmod = copy.deepcopy(template)
341 del fmod["input"]["bulk-flow-item"]
342 fmod["input"]["bulk-flow-ds-item"] = fdets
343 req_data = json.dumps(fmod)
344 req = requests.Request(
347 headers={"Content-Type": "application/json"},
349 auth=("admin", "admin"),
354 def _wt_request_sender(
365 """The funcion sends http requests.
367 Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
371 :param thread_id: thread id
373 :param preparefnc: function to preparesthe http request
375 :param inqueue: input queue, flow details are comming from here
377 :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
379 :param controllers: a list of controllers' ip addresses or hostnames
381 :param restport: restconf port
383 :param template: flow template used for creating flow content
385 :param outqueue: queue where the results should be put
387 :param method: method derermines the type of http request
390 nothing, results must be put into the output queue
392 ses = requests.Session()
393 cntl = controllers[0]
394 counter = [0 for i in range(600)]
399 flowlist = inqueue.get(timeout=1)
401 if exitevent.is_set() and inqueue.empty():
404 req = preparefnc(cntl, method, flowlist, template=template)
405 # prep = ses.prepare_request(req)
408 rsp = ses.send(prep, timeout=5)
409 except requests.exceptions.Timeout:
412 counter[rsp.status_code] += 1
414 for i, v in enumerate(counter):
424 controllers=["127.0.0.1"],
429 """The main function which drives sending of http requests.
431 Creates 2 queues and requested number of 'working threads'. One queue is filled with flow details and working
432 threads read them out and send http requests. The other queue is for sending results from working threads back.
433 After the threads' join, it produces a summary result.
436 :param method: based on this the function which prepares http request is choosen
438 :param flow_template: template to generate a flow content
440 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
442 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
444 :param restport: restconf port (default='8181')
446 :param nrthreads: number of threads used to send http requests (default=1)
448 :param fpr: flow per request, number of flows sent in one http request
451 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
453 # TODO: multi controllers support
454 ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
456 # choose message prepare function
458 preparefnc = _prepare_add
459 # put can contain only 1 flow, lets overwrite any value of flows per request
461 elif method == "POST":
462 preparefnc = _prepare_table_add
463 elif method == "DELETE":
464 preparefnc = _prepare_delete
465 # delete flow can contain only 1 flow, lets overwrite any value of flows per request
467 elif method in ["add-flows-ds", "remove-flows-ds"]:
468 preparefnc = _prepare_ds_item
469 elif method in ["add-flows-rpc", "remove-flows-rpc"]:
470 preparefnc = _prepare_rpc_item
472 raise NotImplementedError(
473 "Method {0} does not have it's prepeare function defined".format(method)
476 # lets enlarge the tupple of flow details with IP, to be used with the template
477 flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
478 # lels divide flows into switches and tables - flow groups
483 if flowkey in flowgroups:
484 flowgroups[flowkey].append(flow)
486 flowgroups[flowkey] = [flow]
488 # lets fill the queue with details needed for one http requests
489 # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists
490 # according to the flows per request (fpr) paramer
491 sendqueue = queue.Queue()
492 for flowgroup, flow_list in flowgroups.items():
493 while len(flow_list) > 0:
494 sendqueue.put(flow_list[: int(fpr)])
495 flow_list = flow_list[int(fpr) :]
498 resultqueue = queue.Queue()
500 exitevent = threading.Event()
502 # lets start threads whic will read flow details fro queues and send
504 for i in range(int(nrthreads)):
505 thr = threading.Thread(
506 target=_wt_request_sender,
507 args=(i, preparefnc),
509 "inqueue": sendqueue,
510 "exitevent": exitevent,
511 "controllers": controllers,
512 "restport": restport,
513 "template": flow_template,
514 "outqueue": resultqueue,
524 # waitng for reqults and sum them up
527 # reading partial resutls from sender thread
528 part_result = resultqueue.get()
529 for k, v in part_result.items():
537 def configure_flows(*args, **kwargs):
538 """Configure flows based on given flow details.
541 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
543 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
545 :param restport: restconf port (default='8181')
547 :param nrthreads: number of threads used to send http requests (default=1)
550 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
552 return _task_executor(
553 method="PUT", flow_template=_default_flow_template_json, **kwargs
557 def deconfigure_flows(*args, **kwargs):
558 """Deconfigure flows based on given flow details.
561 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
563 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
565 :param restport: restconf port (default='8181')
567 :param nrthreads: number of threads used to send http requests (default=1)
570 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
572 return _task_executor(
573 method="DELETE", flow_template=_default_flow_template_json, **kwargs
577 def configure_flows_bulk(*args, **kwargs):
578 """Configure flows based on given flow details using a POST http request..
581 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
583 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
585 :param restport: restconf port (default='8181')
587 :param nrthreads: number of threads used to send http requests (default=1)
590 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
592 return _task_executor(
593 method="POST", flow_template=_default_flow_template_json, **kwargs
597 def operations_add_flows_ds(*args, **kwargs):
598 """Configure flows based on given flow details.
601 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
603 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
605 :param restport: restconf port (default='8181')
607 :param nrthreads: number of threads used to send http requests (default=1)
610 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
612 return _task_executor(
613 method="add-flows-ds", flow_template=_default_operations_item_json, **kwargs
617 def operations_remove_flows_ds(*args, **kwargs):
618 """Remove flows based on given flow details.
621 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
623 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
625 :param restport: restconf port (default='8181')
627 :param nrthreads: number of threads used to send http requests (default=1)
630 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
632 return _task_executor(
633 method="remove-flows-ds", flow_template=_default_operations_item_json, **kwargs
637 def operations_add_flows_rpc(*args, **kwargs):
638 """Configure flows based on given flow details using rpc calls.
641 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
643 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
645 :param restport: restconf port (default='8181')
647 :param nrthreads: number of threads used to send http requests (default=1)
650 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
652 return _task_executor(
653 method="add-flows-rpc", flow_template=_default_operations_item_json, **kwargs
657 def operations_remove_flows_rpc(*args, **kwargs):
658 """Remove flows based on given flow details using rpc calls.
661 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
663 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
665 :param restport: restconf port (default='8181')
667 :param nrthreads: number of threads used to send http requests (default=1)
670 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
672 return _task_executor(
673 method="remove-flows-rpc", flow_template=_default_operations_item_json, **kwargs
677 def _get_operational_inventory_of_switches(controller):
678 """Gets number of switches present in the operational inventory
681 :param controller: controller's ip or host name
684 :returns switches: number of switches connected
689 + ":8181/restconf/operational/opendaylight-inventory:nodes"
693 headers={"Accept": "application/json"},
695 auth=("admin", "admin"),
697 if rsp.status_code != 200:
699 inv = json.loads(rsp.content)
700 if "nodes" not in inv:
702 if "node" not in inv["nodes"]:
704 inv = inv["nodes"]["node"]
705 switches = [sw for sw in inv if "openflow:" in sw["id"]]
709 def flow_stats_collected(controller=""):
710 """Provides the operational inventory counts counts of switches and flows.
713 :param controller: controller's ip address or host name
716 :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows
720 switches = _get_operational_inventory_of_switches(controller)
724 tabs = sw["flow-node-inventory:table"]
727 "opendaylight-flow-table-statistics:flow-table-statistics"
730 found_flows += len(t["flow"])
733 "Switches,ActiveFlows(reported)/FlowsFound",
739 return len(switches), active_flows, found_flows
742 def get_switches_count(controller=""):
743 """Gives the count of the switches presnt in the operational inventory nodes datastore.
746 :param controller: controller's ip address or host name
749 :returns switches: returns the number of connected switches
751 switches = _get_operational_inventory_of_switches(controller)
757 def validate_responses(received, expected):
758 """Compares given response summary with expected results.
761 :param received: dictionary returned from operations_* and (de)configure_flows*
763 e.g. received = { 200:41 } - this means that we 41x receives response with status code 200
765 :param expected: list of expected http result codes
766 e.g. expected=[200] - we expect only http status 200 to be present
769 :returns True: if list of http statuses from received responses is the same as exxpected
770 :returns False: elseware
772 return True if list(received.keys()) == expected else False