-'''
+"""
The purpose of this library is the ability to spread configured flows
over the specified tables and switches.
The idea how to configure and checks inventory operational data is taken from
../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
-'''
+"""
import random
import threading
import netaddr
return val
-_spreads = ['gauss', 'linear', 'first'] # possible defined spreads at the moment
+_spreads = ["gauss", "linear", "first"] # possible defined spreads at the moment
_default_flow_template_json = { # templease used for config datastore
- u'flow': [
+ "flow": [
{
- u'hard-timeout': 65000,
- u'idle-timeout': 65000,
- u'cookie_mask': 4294967295,
- u'flow-name': u'FLOW-NAME-TEMPLATE',
- u'priority': 2,
- u'strict': False,
- u'cookie': 0,
- u'table_id': 0,
- u'installHw': False,
- u'id': u'FLOW-ID-TEMPLATE',
- u'match': {
- u'ipv4-destination': u'0.0.0.0/32',
- u'ethernet-match': {
- u'ethernet-type': {
- u'type': 2048
- }
- }
+ "hard-timeout": 65000,
+ "idle-timeout": 65000,
+ "cookie_mask": 4294967295,
+ "flow-name": "FLOW-NAME-TEMPLATE",
+ "priority": 2,
+ "strict": False,
+ "cookie": 0,
+ "table_id": 0,
+ "installHw": False,
+ "id": "FLOW-ID-TEMPLATE",
+ "match": {
+ "ipv4-destination": "0.0.0.0/32",
+ "ethernet-match": {"ethernet-type": {"type": 2048}},
},
- u'instructions': {
- u'instruction': [
+ "instructions": {
+ "instruction": [
{
- u'order': 0,
- u'apply-actions': {
- u'action': [
- {
- u'drop-action': {},
- u'order': 0
- }
- ]
- }
+ "order": 0,
+ "apply-actions": {"action": [{"drop-action": {}, "order": 0}]},
}
]
- }
+ },
}
]
}
-_node_tmpl = "/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id=\"openflow:{0}\"]"
+_node_tmpl = '/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id="openflow:{0}"]'
_default_operations_item_json = { # template used for sal operations
"hard-timeout": 65000,
"idle-timeout": 65000,
"instructions": {
- "instruction": [{
- "apply-actions": {
- "action": [
- {
- "drop-action": {},
- "order": 0
- }
- ]
- },
- "order": 0
- }]
+ "instruction": [
+ {
+ "apply-actions": {
+ "action": [{"drop-action": {}, "order": 0}]
+ },
+ "order": 0,
+ }
+ ]
},
"match": {
"ipv4-destination": "0.0.0.0/32",
- "ethernet-match": {
- "ethernet-type": {
- "type": 2048
- }
- },
+ "ethernet-match": {"ethernet-type": {"type": 2048}},
},
"priority": 2,
- "table_id": 0
+ "table_id": 0,
}
]
}
notes = {}
for (sw, tab, flow) in fldet:
if sw not in notes:
- notes[sw] = {'total': 0}
+ notes[sw] = {"total": 0}
if tab not in notes[sw]:
notes[sw][tab] = 0
notes[sw][tab] += 1
- notes[sw]['total'] += 1
+ notes[sw]["total"] += 1
return notes
def _randomize(spread, maxn):
"""Returns a randomized switch or table id"""
if spread not in _spreads:
- raise Exception('Spread method {} not available'.format(spread))
+ raise Exception("Spread method {} not available".format(spread))
while True:
- if spread == 'gauss':
+ if spread == "gauss":
ga = abs(random.gauss(0, 1))
rv = int(ga * float(maxn) / 3)
if rv < maxn:
return rv
- elif spread == 'linear':
+ elif spread == "linear":
rv = int(random.random() * float(maxn))
if rv < maxn:
return rv
else:
- raise ValueError('rv >= maxn')
- elif spread == 'first':
+ raise ValueError("rv >= maxn")
+ elif spread == "first":
return 0
-def generate_new_flow_details(flows=10, switches=1, swspread='gauss', tables=250, tabspread='gauss'):
+def generate_new_flow_details(
+ flows=10, switches=1, swspread="gauss", tables=250, tabspread="gauss"
+):
"""Generate a list of tupples (switch_id, table_id, flow_id) which are generated
according to the spread rules between swithces and tables.
It also returns a dictionary with statsistics."""
swflows = [_randomize(swspread, switches) for f in range(int(flows))]
# we have to increse the switch index because mininet start indexing switches from 1 (not 0)
- fltables = [(s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)]
+ fltables = [
+ (s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)
+ ]
notes = _get_notes(fltables)
return fltables, notes
"""
fl1 = flows[0]
sw, tab, fl, ip = fl1
- url = 'http://' + cntl + ':' + '8181'
- url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw)
- url += '/table/' + str(tab) + '/flow/' + str(fl)
- flow = copy.deepcopy(template['flow'][0])
- flow['cookie'] = fl
- flow['flow-name'] = 'TestFlow-%d' % fl
- flow['id'] = str(fl)
- flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
- flow['table_id'] = tab
+ url = "http://" + cntl + ":" + "8181"
+ url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
+ url += "/flow-node-inventory:table=" + str(tab) + "/flow=" + str(fl)
+ flow = copy.deepcopy(template["flow"][0])
+ flow["cookie"] = fl
+ flow["flow-name"] = "TestFlow-%d" % fl
+ flow["id"] = str(fl)
+ flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
+ flow["table_id"] = tab
fmod = dict(template)
- fmod['flow'] = flow
+ fmod["flow"] = flow
req_data = json.dumps(fmod)
- req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data,
- auth=('admin', 'admin'))
+ req = requests.Request(
+ "PUT",
+ url,
+ headers={"Content-Type": "application/yang-data+json"},
+ data=req_data,
+ auth=("admin", "admin"),
+ )
return req
"""
fl1 = flows[0]
sw, tab, fl, ip = fl1
- url = 'http://' + cntl + ':' + '8181'
- url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw) + '/table/' + str(tab)
+ url = "http://" + cntl + ":" + "8181"
+ url += (
+ "/rests/data/opendaylight-inventory:nodes/node=openflow%3A"
+ + str(sw)
+ + "/flow-node-inventory:table="
+ + str(tab)
+ )
fdets = []
for sw, tab, fl, ip in flows:
- flow = copy.deepcopy(template['flow'][0])
- flow['cookie'] = fl
- flow['flow-name'] = 'TestFlow-%d' % fl
- flow['id'] = str(fl)
- flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
- flow['table_id'] = tab
+ flow = copy.deepcopy(template["flow"][0])
+ flow["cookie"] = fl
+ flow["flow-name"] = "TestFlow-%d" % fl
+ flow["id"] = str(fl)
+ flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
+ flow["table_id"] = tab
fdets.append(flow)
fmod = copy.deepcopy(template)
- fmod['flow'] = fdets
+ fmod["flow"] = fdets
req_data = json.dumps(fmod)
- req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
- auth=('admin', 'admin'))
+ req = requests.Request(
+ "POST",
+ url,
+ headers={"Content-Type": "application/yang-data+json"},
+ data=req_data,
+ auth=("admin", "admin"),
+ )
return req
"""
fl1 = flows[0]
sw, tab, fl, ip = fl1
- url = 'http://' + cntl + ':' + '8181'
- url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw)
- url += '/table/' + str(tab) + '/flow/' + str(fl)
- req = requests.Request('DELETE', url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
+ url = "http://" + cntl + ":" + "8181"
+ url += "/rests/data/opendaylight-inventory:nodes/node=openflow%3A" + str(sw)
+ url += "/flow-node-inventory:table=" + str(tab) + "/flow=" + str(fl)
+ req = requests.Request(
+ "DELETE",
+ url,
+ headers={"Content-Type": "application/yang-data+json"},
+ auth=("admin", "admin"),
+ )
return req
"""
f1 = flows[0]
sw, tab, fl, ip = f1
- url = 'http://' + cntl + ':' + '8181/restconf/operations/sal-bulk-flow:' + method
+ url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
fdets = []
for sw, tab, fl, ip in flows:
- flow = copy.deepcopy(template['input']['bulk-flow-item'][0])
- flow['node'] = _node_tmpl.format(sw)
- flow['cookie'] = fl
- flow['flow-name'] = 'TestFlow-%d' % fl
- flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
- flow['table_id'] = tab
+ flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
+ flow["node"] = _node_tmpl.format(sw)
+ flow["cookie"] = fl
+ flow["flow-name"] = "TestFlow-%d" % fl
+ flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
+ flow["table_id"] = tab
fdets.append(flow)
fmod = copy.deepcopy(template)
- fmod['input']['bulk-flow-item'] = fdets
+ fmod["input"]["bulk-flow-item"] = fdets
req_data = json.dumps(fmod)
- req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
- auth=('admin', 'admin'))
+ req = requests.Request(
+ "POST",
+ url,
+ headers={"Content-Type": "application/yang-data+json"},
+ data=req_data,
+ auth=("admin", "admin"),
+ )
return req
"""
f1 = flows[0]
sw, tab, fl, ip = f1
- url = 'http://' + cntl + ':' + '8181/restconf/operations/sal-bulk-flow:' + method
+ url = "http://" + cntl + ":" + "8181/rests/operations/sal-bulk-flow:" + method
fdets = []
for sw, tab, fl, ip in flows:
- flow = copy.deepcopy(template['input']['bulk-flow-item'][0])
- flow['node'] = _node_tmpl.format(sw)
- flow['cookie'] = fl
- flow['flow-name'] = 'TestFlow-%d' % fl
- flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
- flow['table_id'] = tab
- flow['flow-id'] = fl
+ flow = copy.deepcopy(template["input"]["bulk-flow-item"][0])
+ flow["node"] = _node_tmpl.format(sw)
+ flow["cookie"] = fl
+ flow["flow-name"] = "TestFlow-%d" % fl
+ flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ip))
+ flow["table_id"] = tab
+ flow["flow-id"] = fl
fdets.append(flow)
fmod = copy.deepcopy(template)
- del fmod['input']['bulk-flow-item']
- fmod['input']['bulk-flow-ds-item'] = fdets
+ del fmod["input"]["bulk-flow-item"]
+ fmod["input"]["bulk-flow-ds-item"] = fdets
req_data = json.dumps(fmod)
- req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
- auth=('admin', 'admin'))
+ req = requests.Request(
+ "POST",
+ url,
+ headers={"Content-Type": "application/yang-data+json"},
+ data=req_data,
+ auth=("admin", "admin"),
+ )
return req
-def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
- template=None, outqueue=None, method=None):
+def _wt_request_sender(
+ thread_id,
+ preparefnc,
+ inqueue=None,
+ exitevent=None,
+ controllers=[],
+ restport="",
+ template=None,
+ outqueue=None,
+ method=None,
+):
"""The funcion sends http requests.
Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
cntl = controllers[0]
counter = [0 for i in range(600)]
loop = True
+ req_no = 0
+ num_errors = 0
while loop:
+ req_no += 1
try:
flowlist = inqueue.get(timeout=1)
except queue.Empty:
try:
rsp = ses.send(prep, timeout=5)
except requests.exceptions.Timeout:
+ print(f"*WARN* [{req_no}] Timeout: {req.method} {req.url}")
counter[99] += 1
+ if counter[99] > 10:
+ print("*ERROR* Too many timeouts.")
+ break
continue
+ else:
+ if rsp.status_code not in [200, 201, 204]:
+ print(
+ f"*WARN* [{req_no}] Status code {rsp.status_code}: "
+ f" {req.method} {req.url}\n{rsp.text}"
+ )
+ num_errors += 1
+ if num_errors > 10:
+ print("*ERROR* Too many errors.")
+ break
counter[rsp.status_code] += 1
res = {}
for i, v in enumerate(counter):
outqueue.put(res)
-def _task_executor(method='', flow_template=None, flow_details=[], controllers=['127.0.0.1'],
- restport='8181', nrthreads=1, fpr=1):
+def _task_executor(
+ method="",
+ flow_template=None,
+ flow_details=[],
+ controllers=["127.0.0.1"],
+ restport="8181",
+ nrthreads=1,
+ fpr=1,
+):
"""The main function which drives sending of http requests.
Creates 2 queues and requested number of 'working threads'. One queue is filled with flow details and working
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
# TODO: multi controllers support
- ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
+ ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")))
# choose message prepare function
- if method == 'PUT':
+ if method == "PUT":
preparefnc = _prepare_add
# put can contain only 1 flow, lets overwrite any value of flows per request
fpr = 1
- elif method == 'POST':
+ elif method == "POST":
preparefnc = _prepare_table_add
- elif method == 'DELETE':
+ elif method == "DELETE":
preparefnc = _prepare_delete
# delete flow can contain only 1 flow, lets overwrite any value of flows per request
fpr = 1
- elif method in ['add-flows-ds', 'remove-flows-ds']:
+ elif method in ["add-flows-ds", "remove-flows-ds"]:
preparefnc = _prepare_ds_item
- elif method in ['add-flows-rpc', 'remove-flows-rpc']:
+ elif method in ["add-flows-rpc", "remove-flows-rpc"]:
preparefnc = _prepare_rpc_item
else:
- raise NotImplementedError('Method {0} does not have it\'s prepeare function defined'.format(method))
+ raise NotImplementedError(
+ "Method {0} does not have it's prepeare function defined".format(method)
+ )
# lets enlarge the tupple of flow details with IP, to be used with the template
flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
sendqueue = queue.Queue()
for flowgroup, flow_list in flowgroups.items():
while len(flow_list) > 0:
- sendqueue.put(flow_list[:int(fpr)])
- flow_list = flow_list[int(fpr):]
+ sendqueue.put(flow_list[: int(fpr)])
+ flow_list = flow_list[int(fpr) :]
# result_gueue
resultqueue = queue.Queue()
# lets start threads whic will read flow details fro queues and send
threads = []
for i in range(int(nrthreads)):
- thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
- kwargs={"inqueue": sendqueue, "exitevent": exitevent,
- "controllers": controllers, "restport": restport,
- "template": flow_template, "outqueue": resultqueue, "method": method})
+ thr = threading.Thread(
+ target=_wt_request_sender,
+ args=(i, preparefnc),
+ kwargs={
+ "inqueue": sendqueue,
+ "exitevent": exitevent,
+ "controllers": controllers,
+ "restport": restport,
+ "template": flow_template,
+ "outqueue": resultqueue,
+ "method": method,
+ },
+ )
threads.append(thr)
thr.start()
Returns:
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
- return _task_executor(method='PUT', flow_template=_default_flow_template_json, **kwargs)
+ return _task_executor(
+ method="PUT", flow_template=_default_flow_template_json, **kwargs
+ )
def deconfigure_flows(*args, **kwargs):
Returns:
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
- return _task_executor(method='DELETE', flow_template=_default_flow_template_json, **kwargs)
+ return _task_executor(
+ method="DELETE", flow_template=_default_flow_template_json, **kwargs
+ )
def configure_flows_bulk(*args, **kwargs):
Returns:
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
- return _task_executor(method='POST', flow_template=_default_flow_template_json, **kwargs)
+ return _task_executor(
+ method="POST", flow_template=_default_flow_template_json, **kwargs
+ )
def operations_add_flows_ds(*args, **kwargs):
Returns:
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
- return _task_executor(method='add-flows-ds', flow_template=_default_operations_item_json, **kwargs)
+ return _task_executor(
+ method="add-flows-ds", flow_template=_default_operations_item_json, **kwargs
+ )
def operations_remove_flows_ds(*args, **kwargs):
Returns:
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
- return _task_executor(method='remove-flows-ds', flow_template=_default_operations_item_json, **kwargs)
+ return _task_executor(
+ method="remove-flows-ds", flow_template=_default_operations_item_json, **kwargs
+ )
def operations_add_flows_rpc(*args, **kwargs):
Returns:
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
- return _task_executor(method='add-flows-rpc', flow_template=_default_operations_item_json, **kwargs)
+ return _task_executor(
+ method="add-flows-rpc", flow_template=_default_operations_item_json, **kwargs
+ )
def operations_remove_flows_rpc(*args, **kwargs):
Returns:
:returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
"""
- return _task_executor(method='remove-flows-rpc', flow_template=_default_operations_item_json, **kwargs)
+ return _task_executor(
+ method="remove-flows-rpc", flow_template=_default_operations_item_json, **kwargs
+ )
def _get_operational_inventory_of_switches(controller):
Returns:
:returns switches: number of switches connected
"""
- url = 'http://' + controller + ':8181/restconf/operational/opendaylight-inventory:nodes'
- rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
+ url = (
+ "http://"
+ + controller
+ + ":8181/rests/data/opendaylight-inventory:nodes?content=nonconfig"
+ )
+ rsp = requests.get(
+ url,
+ headers={"Accept": "application/yang-data+json"},
+ stream=False,
+ auth=("admin", "admin"),
+ )
if rsp.status_code != 200:
return None
inv = json.loads(rsp.content)
- if 'nodes' not in inv:
+ if "opendaylight-inventory:nodes" not in inv:
return None
- if 'node' not in inv['nodes']:
+ if "node" not in inv["opendaylight-inventory:nodes"]:
return []
- inv = inv['nodes']['node']
- switches = [sw for sw in inv if 'openflow:' in sw['id']]
+ inv = inv["opendaylight-inventory:nodes"]["node"]
+ switches = [sw for sw in inv if "openflow:" in sw["id"]]
return switches
-def flow_stats_collected(controller=''):
+def flow_stats_collected(controller=""):
"""Provides the operational inventory counts counts of switches and flows.
Args:
if switches is None:
return 0, 0, 0
for sw in switches:
- tabs = sw['flow-node-inventory:table']
+ tabs = sw["flow-node-inventory:table"]
for t in tabs:
- active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
- if 'flow' in t:
- found_flows += len(t['flow'])
- print(("Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows))
+ active_flows += t[
+ "opendaylight-flow-table-statistics:flow-table-statistics"
+ ]["active-flows"]
+ if "flow" in t:
+ found_flows += len(t["flow"])
+ print(
+ (
+ "Switches,ActiveFlows(reported)/FlowsFound",
+ len(switches),
+ active_flows,
+ found_flows,
+ )
+ )
return len(switches), active_flows, found_flows
-def get_switches_count(controller=''):
+def get_switches_count(controller=""):
"""Gives the count of the switches presnt in the operational inventory nodes datastore.
Args: