""" The purpose of this library is the ability to spread configured flows over the specified tables and switches. The idea how to configure and checks inventory operational data is taken from ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py """ import random import threading import netaddr import queue import requests import json import copy class Counter(object): def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self, value=1): self.lock.acquire() val = self.value try: self.value += value finally: self.lock.release() return val _spreads = ["gauss", "linear", "first"] # possible defined spreads at the moment _default_flow_template_json = { # templease used for config datastore "flow": [ { "hard-timeout": 65000, "idle-timeout": 65000, "cookie_mask": 4294967295, "flow-name": "FLOW-NAME-TEMPLATE", "priority": 2, "strict": False, "cookie": 0, "table_id": 0, "installHw": False, "id": "FLOW-ID-TEMPLATE", "match": { "ipv4-destination": "0.0.0.0/32", "ethernet-match": {"ethernet-type": {"type": 2048}}, }, "instructions": { "instruction": [ { "order": 0, "apply-actions": {"action": [{"drop-action": {}, "order": 0}]}, } ] }, } ] } _node_tmpl = '/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id="openflow:{0}"]' _default_operations_item_json = { # template used for sal operations "input": { "bulk-flow-item": [ { "node": "to_be_replaced", "cookie": 0, "cookie_mask": 4294967295, "flags": "SEND_FLOW_REM", "hard-timeout": 65000, "idle-timeout": 65000, "instructions": { "instruction": [ { "apply-actions": { "action": [{"drop-action": {}, "order": 0}] }, "order": 0, } ] }, "match": { "ipv4-destination": "0.0.0.0/32", "ethernet-match": {"ethernet-type": {"type": 2048}}, }, "priority": 2, "table_id": 0, } ] } } def _get_notes(fldet=[]): """For given list of flow details it produces a dictionary with statistics { swId1 : { tabId1 : flows_count1, tabId2 : flows_count2, ... 'total' : switch count } swId2 ... } """ notes = {} for (sw, tab, flow) in fldet: if sw not in notes: notes[sw] = {"total": 0} if tab not in notes[sw]: notes[sw][tab] = 0 notes[sw][tab] += 1 notes[sw]["total"] += 1 return notes def _randomize(spread, maxn): """Returns a randomized switch or table id""" if spread not in _spreads: raise Exception("Spread method {} not available".format(spread)) while True: if spread == "gauss": ga = abs(random.gauss(0, 1)) rv = int(ga * float(maxn) / 3) if rv < maxn: return rv elif spread == "linear": rv = int(random.random() * float(maxn)) if rv < maxn: return rv else: raise ValueError("rv >= maxn") elif spread == "first": return 0 def generate_new_flow_details( flows=10, switches=1, swspread="gauss", tables=250, tabspread="gauss" ): """Generate a list of tupples (switch_id, table_id, flow_id) which are generated according to the spread rules between swithces and tables. It also returns a dictionary with statsistics.""" swflows = [_randomize(swspread, switches) for f in range(int(flows))] # we have to increse the switch index because mininet start indexing switches from 1 (not 0) fltables = [ (s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows) ] notes = _get_notes(fltables) return fltables, notes def _prepare_add(cntl, method, flows, template=None): """Creates a PUT http requests to configure a flow in configuration datastore. Args: :param cntl: controller's ip address or hostname :param method: determines http request method :param flows: list of flow details :param template: flow template to be to be filled Returns: :returns req: http request object """ fl1 = flows[0] sw, tab, fl, ip = fl1 url = "http://" + cntl + ":" + "8181" url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw) url += "/flow-node-inventory:table=" + str(tab) + "/flow=" + str(fl) flow = copy.deepcopy(template["flow"][0]) flow["cookie"] = fl flow["flow-name"] = "TestFlow-%d" % fl flow["id"] = str(fl) flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip)) flow["table_id"] = tab fmod = dict(template) fmod["flow"] = flow req_data = json.dumps(fmod) req = requests.Request( "PUT", url, headers={"Content-Type": "application/yang-data+json"}, data=req_data, auth=("admin", "admin"), ) return req def _prepare_table_add(cntl, method, flows, template=None): """Creates a POST http requests to configure a flow in configuration datastore. Args: :param cntl: controller's ip address or hostname :param method: determines http request method :param flows: list of flow details :param template: flow template to be to be filled Returns: :returns req: http request object """ fl1 = flows[0] sw, tab, fl, ip = fl1 url = "http://" + cntl + ":" + "8181" url += ( "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw) + "/flow-node-inventory:table=" + str(tab) ) fdets = [] for sw, tab, fl, ip in flows: flow = copy.deepcopy(template["flow"][0]) flow["cookie"] = fl flow["flow-name"] = "TestFlow-%d" % fl flow["id"] = str(fl) flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip)) flow["table_id"] = tab fdets.append(flow) fmod = copy.deepcopy(template) fmod["flow"] = fdets req_data = json.dumps(fmod) req = requests.Request( "POST", url, headers={"Content-Type": "application/yang-data+json"}, data=req_data, auth=("admin", "admin"), ) return req def _prepare_delete(cntl, method, flows, template=None): """Creates a DELETE http request to remove the flow from configuration datastore. Args: :param cntl: controller's ip address or hostname :param method: determines http request method :param flows: list of flow details :param template: flow template to be to be filled Returns: :returns req: http request object """ fl1 = flows[0] sw, tab, fl, ip = fl1 url = "http://" + cntl + ":" + "8181" url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw) url += "/flow-node-inventory:table=" + str(tab) + "/flow=" + str(fl) req = requests.Request( "DELETE", url, headers={"Content-Type": "application/yang-data+json"}, auth=("admin", "admin"), ) return req def _prepare_rpc_item(cntl, method, flows, template=None): """Creates a POST http requests to add or remove a flow using openflowplugin rpc. Args: :param cntl: controller's ip address or hostname :param method: determines http request method :param flows: list of flow details :param template: flow template to be to be filled Returns: :returns req: http request object """ f1 = flows[0] sw, tab, fl, ip = f1 url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method fdets = [] for sw, tab, fl, ip in flows: flow = copy.deepcopy(template["input"]["bulk-flow-item"][0]) flow["node"] = _node_tmpl.format(sw) flow["cookie"] = fl flow["flow-name"] = "TestFlow-%d" % fl flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip)) flow["table_id"] = tab fdets.append(flow) fmod = copy.deepcopy(template) fmod["input"]["bulk-flow-item"] = fdets req_data = json.dumps(fmod) req = requests.Request( "POST", url, headers={"Content-Type": "application/yang-data+json"}, data=req_data, auth=("admin", "admin"), ) return req def _prepare_ds_item(cntl, method, flows, template=None): """Creates a POST http requests to configure a flow in configuration datastore. Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level) Args: :param cntl: controller's ip address or hostname :param method: determines http request method :param flows: list of flow details :param template: flow template to be to be filled Returns: :returns req: http request object """ f1 = flows[0] sw, tab, fl, ip = f1 url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method fdets = [] for sw, tab, fl, ip in flows: flow = copy.deepcopy(template["input"]["bulk-flow-item"][0]) flow["node"] = _node_tmpl.format(sw) flow["cookie"] = fl flow["flow-name"] = "TestFlow-%d" % fl flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip)) flow["table_id"] = tab flow["flow-id"] = fl fdets.append(flow) fmod = copy.deepcopy(template) del fmod["input"]["bulk-flow-item"] fmod["input"]["bulk-flow-ds-item"] = fdets req_data = json.dumps(fmod) req = requests.Request( "POST", url, headers={"Content-Type": "application/yang-data+json"}, data=req_data, auth=("admin", "admin"), ) return req def _wt_request_sender( thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport="", template=None, outqueue=None, method=None, ): """The funcion sends http requests. Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests to the controller Args: :param thread_id: thread id :param preparefnc: function to preparesthe http request :param inqueue: input queue, flow details are comming from here :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue :param controllers: a list of controllers' ip addresses or hostnames :param restport: restconf port :param template: flow template used for creating flow content :param outqueue: queue where the results should be put :param method: method derermines the type of http request Returns: nothing, results must be put into the output queue """ ses = requests.Session() cntl = controllers[0] counter = [0 for i in range(600)] loop = True req_no = 0 num_errors = 0 while loop: req_no += 1 try: flowlist = inqueue.get(timeout=1) except queue.Empty: if exitevent.is_set() and inqueue.empty(): loop = False continue req = preparefnc(cntl, method, flowlist, template=template) # prep = ses.prepare_request(req) prep = req.prepare() try: rsp = ses.send(prep, timeout=5) except requests.exceptions.Timeout: print(f"*WARN* [{req_no}] Timeout: {req.method} {req.url}") counter[99] += 1 if counter[99] > 10: print("*ERROR* Too many timeouts.") break continue else: if rsp.status_code not in [200, 201, 204]: print( f"*WARN* [{req_no}] Status code {rsp.status_code}:" f" {req.method} {req.url}\n{rsp.text}" ) num_errors += 1 if num_errors > 10: print("*ERROR* Too many errors.") break counter[rsp.status_code] += 1 res = {} for i, v in enumerate(counter): if v > 0: res[i] = v outqueue.put(res) def _task_executor( method="", flow_template=None, flow_details=[], controllers=["127.0.0.1"], restport="8181", nrthreads=1, fpr=1, ): """The main function which drives sending of http requests. Creates 2 queues and requested number of 'working threads'. One queue is filled with flow details and working threads read them out and send http requests. The other queue is for sending results from working threads back. After the threads' join, it produces a summary result. Args: :param method: based on this the function which prepares http request is choosen :param flow_template: template to generate a flow content :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) :param fpr: flow per request, number of flows sent in one http request Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ # TODO: multi controllers support ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1"))) # choose message prepare function if method == "PUT": preparefnc = _prepare_add # put can contain only 1 flow, lets overwrite any value of flows per request fpr = 1 elif method == "POST": preparefnc = _prepare_table_add elif method == "DELETE": preparefnc = _prepare_delete # delete flow can contain only 1 flow, lets overwrite any value of flows per request fpr = 1 elif method in ["add-flows-ds", "remove-flows-ds"]: preparefnc = _prepare_ds_item elif method in ["add-flows-rpc", "remove-flows-rpc"]: preparefnc = _prepare_rpc_item else: raise NotImplementedError( "Method {0} does not have it's prepeare function defined".format(method) ) # lets enlarge the tupple of flow details with IP, to be used with the template flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details] # lels divide flows into switches and tables - flow groups flowgroups = {} for flow in flows: sw, tab, _, _ = flow flowkey = (sw, tab) if flowkey in flowgroups: flowgroups[flowkey].append(flow) else: flowgroups[flowkey] = [flow] # lets fill the queue with details needed for one http requests # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists # according to the flows per request (fpr) paramer sendqueue = queue.Queue() for flowgroup, flow_list in flowgroups.items(): while len(flow_list) > 0: sendqueue.put(flow_list[: int(fpr)]) flow_list = flow_list[int(fpr) :] # result_gueue resultqueue = queue.Queue() # creaet exit event exitevent = threading.Event() # lets start threads whic will read flow details fro queues and send threads = [] for i in range(int(nrthreads)): thr = threading.Thread( target=_wt_request_sender, args=(i, preparefnc), kwargs={ "inqueue": sendqueue, "exitevent": exitevent, "controllers": controllers, "restport": restport, "template": flow_template, "outqueue": resultqueue, "method": method, }, ) threads.append(thr) thr.start() exitevent.set() result = {} # waitng for reqults and sum them up for t in threads: t.join() # reading partial resutls from sender thread part_result = resultqueue.get() for k, v in part_result.items(): if k not in result: result[k] = v else: result[k] += v return result def configure_flows(*args, **kwargs): """Configure flows based on given flow details. Args: :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ return _task_executor( method="PUT", flow_template=_default_flow_template_json, **kwargs ) def deconfigure_flows(*args, **kwargs): """Deconfigure flows based on given flow details. Args: :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ return _task_executor( method="DELETE", flow_template=_default_flow_template_json, **kwargs ) def configure_flows_bulk(*args, **kwargs): """Configure flows based on given flow details using a POST http request.. Args: :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ return _task_executor( method="POST", flow_template=_default_flow_template_json, **kwargs ) def operations_add_flows_ds(*args, **kwargs): """Configure flows based on given flow details. Args: :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ return _task_executor( method="add-flows-ds", flow_template=_default_operations_item_json, **kwargs ) def operations_remove_flows_ds(*args, **kwargs): """Remove flows based on given flow details. Args: :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ return _task_executor( method="remove-flows-ds", flow_template=_default_operations_item_json, **kwargs ) def operations_add_flows_rpc(*args, **kwargs): """Configure flows based on given flow details using rpc calls. Args: :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ return _task_executor( method="add-flows-rpc", flow_template=_default_operations_item_json, **kwargs ) def operations_remove_flows_rpc(*args, **kwargs): """Remove flows based on given flow details using rpc calls. Args: :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[]) :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1']) :param restport: restconf port (default='8181') :param nrthreads: number of threads used to send http requests (default=1) Returns: :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.} """ return _task_executor( method="remove-flows-rpc", flow_template=_default_operations_item_json, **kwargs ) def _get_operational_inventory_of_switches(controller): """Gets number of switches present in the operational inventory Args: :param controller: controller's ip or host name Returns: :returns switches: number of switches connected """ url = ( "http://" + controller + ":8181/rests/data/opendaylight-inventory:nodes?content=nonconfig" ) rsp = requests.get( url, headers={"Accept": "application/yang-data+json"}, stream=False, auth=("admin", "admin"), ) if rsp.status_code != 200: return None inv = json.loads(rsp.content) if "opendaylight-inventory:nodes" not in inv: return None if "node" not in inv["opendaylight-inventory:nodes"]: return [] inv = inv["opendaylight-inventory:nodes"]["node"] switches = [sw for sw in inv if "openflow:" in sw["id"]] return switches def flow_stats_collected(controller=""): """Provides the operational inventory counts counts of switches and flows. Args: :param controller: controller's ip address or host name Returns: :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows """ active_flows = 0 found_flows = 0 switches = _get_operational_inventory_of_switches(controller) if switches is None: return 0, 0, 0 for sw in switches: tabs = sw["flow-node-inventory:table"] for t in tabs: active_flows += t[ "opendaylight-flow-table-statistics:flow-table-statistics" ]["active-flows"] if "flow" in t: found_flows += len(t["flow"]) print( ( "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows, ) ) return len(switches), active_flows, found_flows def get_switches_count(controller=""): """Gives the count of the switches presnt in the operational inventory nodes datastore. Args: :param controller: controller's ip address or host name Returns: :returns switches: returns the number of connected switches """ switches = _get_operational_inventory_of_switches(controller) if switches is None: return 0 return len(switches) def validate_responses(received, expected): """Compares given response summary with expected results. Args: :param received: dictionary returned from operations_* and (de)configure_flows* of this library e.g. received = { 200:41 } - this means that we 41x receives response with status code 200 :param expected: list of expected http result codes e.g. expected=[200] - we expect only http status 200 to be present Returns: :returns True: if list of http statuses from received responses is the same as exxpected :returns False: elseware """ return True if list(received.keys()) == expected else False