import Queue
import requests
import json
+import copy
class Counter(object):
_spreads = ['gauss', 'linear', 'first'] # possible defined spreads at the moment
-_default_flow_template = '''{
- "flow-node-inventory:flow": [
- {
- "flow-node-inventory:cookie": %d,
- "flow-node-inventory:cookie_mask": 4294967295,
- "flow-node-inventory:flow-name": "%s",
- "flow-node-inventory:hard-timeout": %d,
- "flow-node-inventory:id": "%s",
- "flow-node-inventory:idle-timeout": %d,
- "flow-node-inventory:installHw": false,
- "flow-node-inventory:instructions": {
- "flow-node-inventory:instruction": [
- {
- "flow-node-inventory:apply-actions": {
- "flow-node-inventory:action": [
- {
- "flow-node-inventory:drop-action": {},
- "flow-node-inventory:order": 0
- }
- ]
- },
- "flow-node-inventory:order": 0
- }
- ]
- },
- "flow-node-inventory:match": {
- "flow-node-inventory:ipv4-destination": "%s/32",
- "flow-node-inventory:ethernet-match": {
- "flow-node-inventory:ethernet-type": {
- "flow-node-inventory:type": 2048
- }
+_default_flow_template_json = {
+ u'flow': [
+ {
+ u'hard-timeout': 65000,
+ u'idle-timeout': 65000,
+ u'cookie_mask': 4294967295,
+ u'flow-name': u'FLOW-NAME-TEMPLATE',
+ u'priority': 2,
+ u'strict': False,
+ u'cookie': 0,
+ u'table_id': 0,
+ u'installHw': False,
+ u'id': u'FLOW-ID-TEMPLATE',
+ u'match': {
+ u'ipv4-destination': u'0.0.0.0/32',
+ u'ethernet-match': {
+ u'ethernet-type': {
+ u'type': 2048
+ }
+ }
+ },
+ u'instructions': {
+ u'instruction': [
+ {
+ u'order': 0,
+ u'apply-actions': {
+ u'action': [
+ {
+ u'drop-action': {},
+ u'order': 0
+ }
+ ]
+ }
+ }
+ ]
+ }
}
- },
- "flow-node-inventory:priority": 2,
- "flow-node-inventory:strict": false,
- "flow-node-inventory:table_id": %d
- }
- ]
-}'''
+ ]
+}
def _get_notes(fldet=[]):
'''Creates a PUT http requests to configure a flow in configuration datastore'''
url = 'http://'+cntl+':'+'8181'
url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
- flow = template % (fl, 'TestFlow-%d' % fl, 65000, str(fl), 65000, str(netaddr.IPAddress(ip)), tab)
- req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=flow, auth=('admin', 'admin'))
+ flow = copy.deepcopy(template['flow'][0])
+ flow['cookie'] = fl
+ flow['flow-name'] = 'TestFlow-%d' % fl
+ flow['id'] = str(fl)
+ flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
+ flow['table_id'] = tab
+ fmod = dict(template)
+ fmod['flow'] = flow
+ req_data = json.dumps(fmod)
+ req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data,
+ auth=('admin', 'admin'))
+ return req
+
+
+def _prepare_table_add(cntl, flows, template=None):
+ '''Creates a POST http requests to configure a flow in configuration datastore'''
+ f1 = flows[0]
+ sw, tab, fl, ip = f1
+ url = 'http://'+cntl+':'+'8181'
+ url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)
+ fdets = []
+ for sw, tab, fl, ip in flows:
+ flow = copy.deepcopy(template['flow'][0])
+ flow['cookie'] = fl
+ flow['flow-name'] = 'TestFlow-%d' % fl
+ flow['id'] = str(fl)
+ flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
+ flow['table_id'] = tab
+ fdets.append(flow)
+ fmod = dict(template)
+ fmod['flow'] = fdets
+ req_data = json.dumps(fmod)
+ req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
+ auth=('admin', 'admin'))
return req
outqueue.put(res)
+def _wt_bulk_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
+ template=None, outqueue=None):
+ '''The funcion runs in a thread. It reads out flow details from the queue and configures
+ the flow on the controller'''
+ ses = requests.Session()
+ cntl = controllers[0]
+ counter = [0 for i in range(600)]
+ loop = True
+
+ while loop:
+ try:
+ flowlist = inqueue.get(timeout=1)
+ except Queue.Empty:
+ if exitevent.is_set() and inqueue.empty():
+ loop = False
+ continue
+ req = preparefnc(cntl, flowlist, template=template)
+ prep = ses.prepare_request(req)
+ try:
+ rsp = ses.send(prep, timeout=5)
+ except requests.exceptions.Timeout:
+ counter[99] += 1
+ continue
+ counter[rsp.status_code] += 1
+ res = {}
+ for i, v in enumerate(counter):
+ if v > 0:
+ res[i] = v
+ outqueue.put(res)
+
+
def _config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], restport='8181',
nrthreads=1):
'''Function starts thread executors and put required information to the queue. Executors read the queue and send
if flow_template is not None:
template = flow_template
else:
- template = _default_flow_template
+ template = _default_flow_template_json
# lets enlarge the tupple of flow details with IP, to be used with the template
flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
return _config_task_executor(_prepare_delete, *args, **kwargs)
+def _bulk_config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'],
+ restport='8181', nrthreads=1, fpr=1):
+ '''Function starts thread executors and put required information to the queue. Executors read the queue and send
+ http requests. After the thread's join, it produces a summary result.'''
+ # TODO: multi controllers support
+ ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
+ if flow_template is not None:
+ template = flow_template
+ else:
+ template = _default_flow_template_json
+
+ # lets enlarge the tupple of flow details with IP, to be used with the template
+ flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
+ # lest divide flows into switches and tables
+ fg = {}
+ for fl in flows:
+ s, t, f, ip = fl
+ fk = (s, t)
+ if (s, t) in fg:
+ fg[fk].append(fl)
+ else:
+ fg[fk] = [fl]
+
+ # lets fill the qurue
+ q = Queue.Queue()
+ for k, v in fg.iteritems():
+ while len(v) > 0:
+ q.put(v[:int(fpr)])
+ v = v[int(fpr):]
+
+ # result_gueue
+ rq = Queue.Queue()
+ # creaet exit event
+ ee = threading.Event()
+
+ # lets start threads whic will read flow details fro queues and send
+ threads = []
+ for i in range(int(nrthreads)):
+ t = threading.Thread(target=_wt_bulk_request_sender, args=(i, preparefnc),
+ kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
+ "template": template, "outqueue": rq})
+ threads.append(t)
+ t.start()
+
+ ee.set()
+
+ result = {}
+ # waitng for them
+ for t in threads:
+ t.join()
+ res = rq.get()
+ for k, v in res.iteritems():
+ if k not in result:
+ result[k] = v
+ else:
+ result[k] += v
+ return result
+
+
+def configure_flows_bulk(*args, **kwargs):
+ '''Configure flows based on given flow details
+ Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
+ controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
+ return _bulk_config_task_executor(_prepare_table_add, *args, **kwargs)
+
+
def _get_operational_inventory_of_switches(controller):
'''GET requests to get operational inventory node details'''
url = 'http://'+controller+':8181/restconf/operational/opendaylight-inventory:nodes'
rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
if rsp.status_code != 200:
return None
- inv = json.loads(rsp.content)['nodes']['node']
+ inv = json.loads(rsp.content)
+ if 'nodes' not in inv:
+ return None
+ if 'node' not in inv['nodes']:
+ return []
+ inv = inv['nodes']['node']
switches = [sw for sw in inv if 'openflow:' in sw['id']]
return switches
-def flow_stats_collected(flow_details=[], controller=''):
+def flow_stats_collected(controller=''):
'''Once flows are configured, thisfunction is used to check if flows are present in the operational datastore'''
# print type(flow_details), flow_details
- if type(flow_details) is not list:
- raise Exception('List expected')
active_flows = 0
found_flows = 0
switches = _get_operational_inventory_of_switches(controller)
if switches is None:
- return False
+ return 0, 0, 0
for sw in switches:
tabs = sw['flow-node-inventory:table']
for t in tabs:
active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
if 'flow' in t:
found_flows += len(t['flow'])
- print "ActiveFlows(reported)/FlowsFound/FlowsExpected", active_flows, found_flows, len(flow_details)
- if found_flows == len(flow_details):
- return True
- return False
+ print "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows
+ return len(switches), active_flows, found_flows
def get_switches_count(controller=''):
--- /dev/null
+*** Settings ***
+Documentation Suite checks if StatMngr is able to collect flows correctly
+Suite Setup Create Http Session
+Suite Teardown Delete Http Session
+Library OperatingSystem
+Library Collections
+Library XML
+Library SSHLibrary
+Variables ../../../variables/Variables.py
+Library RequestsLibrary
+Library ../../../libraries/Common.py
+Library ../../../libraries/ScaleClient.py
+Resource ../../../libraries/WaitForFailure.robot
+
+*** Variables ***
+${swnr} 63
+${flnr} 100000
+${fpr} 25
+${swspread} linear
+${tabspread} first
+@{cntls} ${CONTROLLER}
+${linux_prompt} >
+${start_cmd} sudo mn --controller=remote,ip=${CONTROLLER} --topo linear,${swnr} --switch ovsk,protocols=OpenFlow13
+${iperiod} 30s
+${imonitor} 600s
+${ichange} 120s
+
+*** Test Cases ***
+Connect Mininet
+ Connect Switches
+
+Configure Flows
+ [Documentation] Configuration of ${flnr} flows into config datastore
+ ${flows} ${notes}= Generate New Flow Details flows=${flnr} switches=${swnr} swspread=${swspread} tabspread=${tabspread}
+ Log ${notes}
+ ${res}= Configure Flows Bulk flow_details=${flows} controllers=@{cntls} nrthreads=5 fpr=${fpr}
+ Log ${res}
+ Set Suite Variable ${flows}
+
+Wait Stats Collected
+ [Documentation] Waits till ${flnr} flows are initially collected
+ Inventory Change Reached ${swnr} ${flnr}
+
+Stable State Monitoring
+ [Documentation] Inventory check if all ${flnr} flows are present for specified time frame
+ Monitor Stable State ${swnr} ${flnr}
+
+Stop Mininet
+ [Documentation] Disconnect/Stop mininet
+ Stop Switches
+
+Check No Flows In Operational After Disconnect
+ [Documentation] With mininet stopped no switches in operational datastore sould be found
+ Inventory Change Reached 0 0
+
+Connect Mininet Again
+ [Documentation] Reconnection of the mininet
+ Connect Switches
+
+Check Flows Are Operational Again
+ [Documentation] All ${flnr} slows should be present in the operational datastore after mininet reconnection
+ Inventory Change Reached ${swnr} ${flnr}
+
+Deconfigure Flows
+ [Documentation] Flows deconfiguration
+ ${resp}= Delete session ${CONFIG_NODES_API}
+ Should Be Equal As Numbers ${resp.status_code} 200
+
+Check No Flows In Operational Last
+ [Documentation] Operational datastore to be without any flows
+ Inventory Change Reached ${swnr} 0
+
+Stop Mininet End
+ Stop Switches
+
+*** Keywords ***
+Connect Switches
+ [Documentation] Starts mininet with requested number of switches (${swnr})
+ Log Starting mininet with ${swnr} switches
+ Open Connection ${MININET} prompt=${linux_prompt} timeout=600
+ Login With Public Key ${MININET_USER} ${USER_HOME}/.ssh/id_rsa any
+ Execute Command sudo ovs-vsctl set-manager ptcp:6644
+ Execute Command sudo mn -c
+ Write ${start_cmd}
+ Read Until mininet>
+ Wait Until Keyword Succeeds 10s 1s Are Switches Connected Topo
+
+Create Http Session
+ Create Session session http://${CONTROLLER}:${RESTCONFPORT} auth=${AUTH} headers=${HEADERS_XML}
+
+Stop Switches
+ [Documentation] Stops mininet
+ Log Stopping mininet
+ Read
+ Write exit
+ Read Until ${linux_prompt}
+ Close Connection
+
+Delete Http Session
+ Delete All Sessions
+
+Are Switches Connected Topo
+ [Documentation] Checks wheather switches are connected to controller
+ ${resp}= Get session ${OPERATIONAL_TOPO_API}/topology/flow:1 headers=${ACCEPT_XML}
+ Log ${resp.content}
+ ${count}= Get Element Count ${resp.content} xpath=node
+ Should Be Equal As Numbers ${count} ${swnr}
+
+Check Flows Inventory
+ [Arguments] ${rswitches} ${rflows}
+ [Documentation] Checks in inventory has required state
+ ${sw} ${repf} ${foundf}= Flow Stats Collected controller=${CONTROLLER}
+ Should Be Equal As Numbers ${rswitches} ${sw}
+ Should Be Equal As Numbers ${rflows} ${foundf}
+
+Inventory Change Reached
+ [Arguments] ${rswitches} ${rflows}
+ [Documentation] This keywordwaits till inventory reaches required state
+ Wait Until Keyword Succeeds ${ichange} ${iperiod} Check Flows Inventory ${rswitches} ${rflows}
+
+Monitor Stable State
+ [Arguments] ${rswitches} ${rflows}
+ [Documentation] This keywordwaits till inventory reaches required state
+ Verify Keyword Does Not Fail Within Timeout ${imonitor} ${iperiod} Check Flows Inventory ${rswitches} ${rflows}