2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
18 class Counter(object):
19 def __init__(self, start=0):
20 self.lock = threading.Lock()
23 def increment(self, value=1):
33 _spreads = ['gauss', 'linear', 'first'] # possible defined spreads at the moment
34 _default_flow_template_json = {
37 u'hard-timeout': 65000,
38 u'idle-timeout': 65000,
39 u'cookie_mask': 4294967295,
40 u'flow-name': u'FLOW-NAME-TEMPLATE',
46 u'id': u'FLOW-ID-TEMPLATE',
48 u'ipv4-destination': u'0.0.0.0/32',
75 def _get_notes(fldet=[]):
76 '''For given list of flow details it produces a dictionary with statistics
77 { swId1 : { tabId1 : flows_count1,
78 tabId2 : flows_count2,
80 'total' : switch count }
85 for (sw, tab, flow) in fldet:
87 notes[sw] = {'total': 0}
88 if tab not in notes[sw]:
91 notes[sw]['total'] += 1
95 def _randomize(spread, maxn):
96 '''Returns a randomized switch or table id'''
97 if spread not in _spreads:
98 raise Exception('Spread method {} not available'.format(spread))
100 if spread == 'gauss':
101 ga = abs(random.gauss(0, 1))
102 rv = int(ga*float(maxn)/3)
105 elif spread == 'linear':
106 rv = int(random.random() * float(maxn))
110 raise ValueError('rv >= maxn')
111 elif spread == 'first':
115 def generate_new_flow_details(flows=10, switches=1, swspread='gauss', tables=250, tabspread='gauss'):
116 """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
117 according to the spread rules between swithces and tables.
118 It also returns a dictionary with statsistics."""
119 swflows = [_randomize(swspread, switches) for f in range(int(flows))]
120 # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
121 fltables = [(s+1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)]
122 notes = _get_notes(fltables)
123 return fltables, notes
126 def _prepare_add(cntl, sw, tab, fl, ip, template=None):
127 '''Creates a PUT http requests to configure a flow in configuration datastore'''
128 url = 'http://'+cntl+':'+'8181'
129 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
130 flow = copy.deepcopy(template['flow'][0])
132 flow['flow-name'] = 'TestFlow-%d' % fl
134 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
135 flow['table_id'] = tab
136 fmod = dict(template)
138 req_data = json.dumps(fmod)
139 req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data,
140 auth=('admin', 'admin'))
144 def _prepare_table_add(cntl, flows, template=None):
145 '''Creates a POST http requests to configure a flow in configuration datastore'''
148 url = 'http://'+cntl+':'+'8181'
149 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)
151 for sw, tab, fl, ip in flows:
152 flow = copy.deepcopy(template['flow'][0])
154 flow['flow-name'] = 'TestFlow-%d' % fl
156 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
157 flow['table_id'] = tab
159 fmod = dict(template)
161 req_data = json.dumps(fmod)
162 req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
163 auth=('admin', 'admin'))
167 def _prepare_delete(cntl, sw, tab, fl, ip, template=None):
168 '''Creates a DELETE http request to remove the flow from configuration datastore'''
169 url = 'http://'+cntl+':'+'8181'
170 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:'+str(sw)+'/table/'+str(tab)+'/flow/'+str(fl)
171 req = requests.Request('DELETE', url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
175 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='', template=None,
177 '''The funcion runs in a thread. It reads out flow details from the queue and configures
178 the flow on the controller'''
179 ses = requests.Session()
180 cntl = controllers[0]
181 counter = [0 for i in range(600)]
185 (sw, tab, fl, ip) = inqueue.get(timeout=1)
186 sw, tab, fl, ip = sw+1, tab, fl+1, ip
188 if exitevent.is_set() and inqueue.empty():
191 req = preparefnc(cntl, sw, tab, fl, ip, template=template)
192 # prep = ses.prepare_request(req)
195 rsp = ses.send(prep, timeout=5)
196 except requests.exceptions.Timeout:
199 counter[rsp.status_code] += 1
201 for i, v in enumerate(counter):
207 def _wt_bulk_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
208 template=None, outqueue=None):
209 '''The funcion runs in a thread. It reads out flow details from the queue and configures
210 the flow on the controller'''
211 ses = requests.Session()
212 cntl = controllers[0]
213 counter = [0 for i in range(600)]
218 flowlist = inqueue.get(timeout=1)
220 if exitevent.is_set() and inqueue.empty():
223 req = preparefnc(cntl, flowlist, template=template)
224 # prep = ses.prepare_request(req)
227 rsp = ses.send(prep, timeout=5)
228 except requests.exceptions.Timeout:
231 counter[rsp.status_code] += 1
233 for i, v in enumerate(counter):
239 def _config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'], restport='8181',
241 '''Function starts thread executors and put required information to the queue. Executors read the queue and send
242 http requests. After the thread's join, it produces a summary result.'''
243 # TODO: multi controllers support
244 ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
245 if flow_template is not None:
246 template = flow_template
248 template = _default_flow_template_json
250 # lets enlarge the tupple of flow details with IP, to be used with the template
251 flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
253 # lets fill the qurue
261 ee = threading.Event()
263 # lets start threads whic will read flow details fro queues and send
265 for i in range(int(nrthreads)):
266 t = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
267 kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
268 "template": template, "outqueue": rq})
279 for k, v in res.iteritems():
287 def configure_flows(*args, **kwargs):
288 '''Configure flows based on given flow details
289 Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
290 controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
291 return _config_task_executor(_prepare_add, *args, **kwargs)
294 def deconfigure_flows(*args, **kwargs):
295 '''Deconfigure flows based on given flow details.
296 Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
297 controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
298 return _config_task_executor(_prepare_delete, *args, **kwargs)
301 def _bulk_config_task_executor(preparefnc, flow_details=[], flow_template=None, controllers=['127.0.0.1'],
302 restport='8181', nrthreads=1, fpr=1):
303 '''Function starts thread executors and put required information to the queue. Executors read the queue and send
304 http requests. After the thread's join, it produces a summary result.'''
305 # TODO: multi controllers support
306 ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
307 if flow_template is not None:
308 template = flow_template
310 template = _default_flow_template_json
312 # lets enlarge the tupple of flow details with IP, to be used with the template
313 flows = [(s, t, f, ip_addr.increment()) for s, t, f in flow_details]
314 # lest divide flows into switches and tables
324 # lets fill the qurue
326 for k, v in fg.iteritems():
334 ee = threading.Event()
336 # lets start threads whic will read flow details fro queues and send
338 for i in range(int(nrthreads)):
339 t = threading.Thread(target=_wt_bulk_request_sender, args=(i, preparefnc),
340 kwargs={"inqueue": q, "exitevent": ee, "controllers": controllers, "restport": restport,
341 "template": template, "outqueue": rq})
352 for k, v in res.iteritems():
360 def configure_flows_bulk(*args, **kwargs):
361 '''Configure flows based on given flow details
362 Input parameters with default values: preparefnc, flow_details=[], flow_template=None,
363 controllers=['127.0.0.1'], restport='8181', nrthreads=1'''
364 return _bulk_config_task_executor(_prepare_table_add, *args, **kwargs)
367 def _get_operational_inventory_of_switches(controller):
368 '''GET requests to get operational inventory node details'''
369 url = 'http://'+controller+':8181/restconf/operational/opendaylight-inventory:nodes'
370 rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
371 if rsp.status_code != 200:
373 inv = json.loads(rsp.content)
374 if 'nodes' not in inv:
376 if 'node' not in inv['nodes']:
378 inv = inv['nodes']['node']
379 switches = [sw for sw in inv if 'openflow:' in sw['id']]
383 def flow_stats_collected(controller=''):
384 '''Once flows are configured, thisfunction is used to check if flows are present in the operational datastore'''
385 # print type(flow_details), flow_details
388 switches = _get_operational_inventory_of_switches(controller)
392 tabs = sw['flow-node-inventory:table']
394 active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
396 found_flows += len(t['flow'])
397 print "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows
398 return len(switches), active_flows, found_flows
401 def get_switches_count(controller=''):
402 '''Count the switches presnt in the operational inventory nodes datastore'''
403 switches = _get_operational_inventory_of_switches(controller)