2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
17 class Counter(object):
18 def __init__(self, start=0):
19 self.lock = threading.Lock()
22 def increment(self, value=1):
32 _spreads = ['gauss', 'linear', 'first'] # possible defined spreads at the moment
33 _default_flow_template = '''{
34 "flow-node-inventory:flow": [
36 "flow-node-inventory:cookie": %d,
37 "flow-node-inventory:cookie_mask": 4294967295,
38 "flow-node-inventory:flow-name": "%s",
39 "flow-node-inventory:hard-timeout": %d,
40 "flow-node-inventory:id": "%s",
41 "flow-node-inventory:idle-timeout": %d,
42 "flow-node-inventory:installHw": false,
43 "flow-node-inventory:instructions": {
44 "flow-node-inventory:instruction": [
46 "flow-node-inventory:apply-actions": {
47 "flow-node-inventory:action": [
49 "flow-node-inventory:drop-action": {},
50 "flow-node-inventory:order": 0
54 "flow-node-inventory:order": 0
58 "flow-node-inventory:match": {
59 "flow-node-inventory:ipv4-destination": "%s/32",
60 "flow-node-inventory:ethernet-match": {
61 "flow-node-inventory:ethernet-type": {
62 "flow-node-inventory:type": 2048
66 "flow-node-inventory:priority": 2,
67 "flow-node-inventory:strict": false,
68 "flow-node-inventory:table_id": %d
74 def _get_notes(fldet=[]):
75 '''For given list of flow details it produces a dictionary with statistics
76 { swId1 : { tabId1 : flows_count1,
77 tabId2 : flows_count2,
79 'total' : switch count }
84 for (sw, tab, flow) in fldet:
86 notes[sw] = {'total': 0}
87 if tab not in notes[sw]:
90 notes[sw]['total'] += 1
94 def _randomize(spread, maxn):
95 '''Returns a randomized switch or table id'''
96 if spread not in _spreads:
97 raise Exception('Spread method {} not available'.format(spread))
100 ga = abs(random.gauss(0, 1))
101 rv = int(ga*float(maxn)/3)
104 elif spread == 'linear':
105 rv = int(random.random() * float(maxn))
109 raise ValueError('rv >= maxn')
110 elif spread == 'first':
114 def generate_new_flow_details(flows=10, switches=1, swspread='gauss', tables=250, tabspread='gauss'):
115 """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
116 according to the spread rules between swithces and tables.
117 It also returns a dictionary with statsistics."""
118 swflows = [_randomize(swspread, switches) for f in range(int(flows))]
119 fltables = [(s, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)]
120 notes = _get_notes(fltables)
121 return fltables, notes
124 def _prepare_add(cntl, sw, tab, fl, ip, template=None):
125 '''Creates a PUT http requests to configure a flow in configuration datastore'''
126 url = 'http://'+cntl+':'+'8181'
127 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
128 flow = template % (fl, 'TestFlow-%d' % fl, 65000, str(fl), 65000, str(netaddr.IPAddress(ip)), tab)
129 req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=flow, auth=('admin', 'admin'))
133 def _prepare_delete(cntl, sw, tab, fl, ip, template=None):
134 '''Creates a DELETE http request to remove the flow from configuration datastore'''
135 url = 'http://'+cntl+':'+'8181'
136 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
137 req = requests.Request('DELETE', url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
141 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='', template=None,
143 '''The funcion runs in a thread. It reads out flow details from the queue and configures
144 the flow on the controller'''
145 ses = requests.Session()
146 cntl = controllers[0]
147 counter = [0 for i in range(600)]
151 (sw, tab, fl, ip) = inqueue.get(timeout=1)
152 sw, tab, fl, ip = sw+1, tab, fl+1, ip
154 if exitevent.is_set() and inqueue.empty():
157 req = preparefnc(cntl, sw, tab, fl, ip, template=template)
158 prep = ses.prepare_request(req)
160 rsp = ses.send(prep, timeout=5)
161 except requests.exceptions.Timeout:
164 counter[rsp.status_code] += 1
166 for i, v in enumerate(counter):
172 def _config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], restport='8181',
174 '''Function starts thread executors and put required information to the queue. Executors read the queue and send
175 http requests. After the thread's join, it produces a summary result.'''
176 # TODO: multi controllers support
177 ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
178 if flow_template is not None:
179 template = flow_template
181 template = _default_flow_template
183 # lets enlarge the tupple of flow details with IP, to be used with the template
184 flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
186 # lets fill the qurue
194 ee = threading.Event()
196 # lets start threads whic will read flow details fro queues and send
198 for i in range(int(nrthreads)):
199 t = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
200 kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
201 "template": template, "outqueue": rq})
212 for k, v in res.iteritems():
220 def configure_flows(*args, **kwargs):
221 '''Configure flows based on given flow details
222 Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
223 controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
224 return _config_task_executor(_prepare_add, *args, **kwargs)
227 def deconfigure_flows(*args, **kwargs):
228 '''Deconfigure flows based on given flow details.
229 Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
230 controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
231 return _config_task_executor(_prepare_delete, *args, **kwargs)
234 def _get_operational_inventory_of_switches(controller):
235 '''GET requests to get operational inventory node details'''
236 url = 'http://'+controller+':8181/restconf/operational/opendaylight-inventory:nodes'
237 rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
238 if rsp.status_code != 200:
240 inv = json.loads(rsp.content)['nodes']['node']
241 switches = [sw for sw in inv if 'openflow:' in sw['id']]
245 def flow_stats_collected(flow_details=[], controller=''):
246 '''Once flows are configured, thisfunction is used to check if flows are present in the operational datastore'''
247 # print type(flow_details), flow_details
248 if type(flow_details) is not list:
249 raise Exception('List expected')
252 switches = _get_operational_inventory_of_switches(controller)
256 tabs = sw['flow-node-inventory:table']
258 active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
260 found_flows += len(t['flow'])
261 print "ActiveFlows(reported)/FlowsFound/FlowsExpected", active_flows, found_flows, len(flow_details)
262 if found_flows == len(flow_details):
267 def get_switches_count(controller=''):
268 '''Count the switches presnt in the operational inventory nodes datastore'''
269 switches = _get_operational_inventory_of_switches(controller)