2 The purpose of this library is the ability to spread configured flows
3 over the specified tables and switches.
5 The idea how to configure and checks inventory operational data is taken from
6 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/flow_config_blaster.py
7 ../../../../tools/odl-mdsal-clustering-tests/clustering-performance-test/inventory_crawler.py
18 class Counter(object):
19 def __init__(self, start=0):
20 self.lock = threading.Lock()
23 def increment(self, value=1):
33 _spreads = ['gauss', 'linear', 'first'] # possible defined spreads at the moment
34 _default_flow_template_json = { # templease used for config datastore
37 u'hard-timeout': 65000,
38 u'idle-timeout': 65000,
39 u'cookie_mask': 4294967295,
40 u'flow-name': u'FLOW-NAME-TEMPLATE',
46 u'id': u'FLOW-ID-TEMPLATE',
48 u'ipv4-destination': u'0.0.0.0/32',
75 _node_tmpl = "/opendaylight-inventory:nodes/opendaylight-inventory:node[opendaylight-inventory:id=\"openflow:{0}\"]"
78 _default_operations_item_json = { # template used for sal operations
82 "node": "to_be_replaced",
84 "cookie_mask": 4294967295,
85 "flags": "SEND_FLOW_REM",
86 "hard-timeout": 65000,
87 "idle-timeout": 65000,
102 "ipv4-destination": "0.0.0.0/32",
117 def _get_notes(fldet=[]):
118 """For given list of flow details it produces a dictionary with statistics
119 { swId1 : { tabId1 : flows_count1,
120 tabId2 : flows_count2,
122 'total' : switch count }
127 for (sw, tab, flow) in fldet:
129 notes[sw] = {'total': 0}
130 if tab not in notes[sw]:
133 notes[sw]['total'] += 1
137 def _randomize(spread, maxn):
138 """Returns a randomized switch or table id"""
139 if spread not in _spreads:
140 raise Exception('Spread method {} not available'.format(spread))
142 if spread == 'gauss':
143 ga = abs(random.gauss(0, 1))
144 rv = int(ga * float(maxn) / 3)
147 elif spread == 'linear':
148 rv = int(random.random() * float(maxn))
152 raise ValueError('rv >= maxn')
153 elif spread == 'first':
157 def generate_new_flow_details(flows=10, switches=1, swspread='gauss', tables=250, tabspread='gauss'):
158 """Generate a list of tupples (switch_id, table_id, flow_id) which are generated
159 according to the spread rules between swithces and tables.
160 It also returns a dictionary with statsistics."""
161 swflows = [_randomize(swspread, switches) for f in range(int(flows))]
162 # we have to increse the switch index because mininet start indexing switches from 1 (not 0)
163 fltables = [(s + 1, _randomize(tabspread, tables), idx) for idx, s in enumerate(swflows)]
164 notes = _get_notes(fltables)
165 return fltables, notes
168 def _prepare_add(cntl, method, flows, template=None):
169 """Creates a PUT http requests to configure a flow in configuration datastore.
172 :param cntl: controller's ip address or hostname
174 :param method: determines http request method
176 :param flows: list of flow details
178 :param template: flow template to be to be filled
181 :returns req: http request object
184 sw, tab, fl, ip = fl1
185 url = 'http://' + cntl + ':' + '8181'
186 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw)
187 url += '/table/' + str(tab) + '/flow/' + str(fl)
188 flow = copy.deepcopy(template['flow'][0])
190 flow['flow-name'] = 'TestFlow-%d' % fl
192 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
193 flow['table_id'] = tab
194 fmod = dict(template)
196 req_data = json.dumps(fmod)
197 req = requests.Request('PUT', url, headers={'Content-Type': 'application/json'}, data=req_data,
198 auth=('admin', 'admin'))
202 def _prepare_table_add(cntl, method, flows, template=None):
203 """Creates a POST http requests to configure a flow in configuration datastore.
206 :param cntl: controller's ip address or hostname
208 :param method: determines http request method
210 :param flows: list of flow details
212 :param template: flow template to be to be filled
215 :returns req: http request object
218 sw, tab, fl, ip = fl1
219 url = 'http://' + cntl + ':' + '8181'
220 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw) + '/table/' + str(tab)
222 for sw, tab, fl, ip in flows:
223 flow = copy.deepcopy(template['flow'][0])
225 flow['flow-name'] = 'TestFlow-%d' % fl
227 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
228 flow['table_id'] = tab
230 fmod = copy.deepcopy(template)
232 req_data = json.dumps(fmod)
233 req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
234 auth=('admin', 'admin'))
238 def _prepare_delete(cntl, method, flows, template=None):
239 """Creates a DELETE http request to remove the flow from configuration datastore.
242 :param cntl: controller's ip address or hostname
244 :param method: determines http request method
246 :param flows: list of flow details
248 :param template: flow template to be to be filled
251 :returns req: http request object
254 sw, tab, fl, ip = fl1
255 url = 'http://' + cntl + ':' + '8181'
256 url += '/restconf/config/opendaylight-inventory:nodes/node/openflow:' + str(sw)
257 url += '/table/' + str(tab) + '/flow/' + str(fl)
258 req = requests.Request('DELETE', url, headers={'Content-Type': 'application/json'}, auth=('admin', 'admin'))
262 def _prepare_rpc_item(cntl, method, flows, template=None):
263 """Creates a POST http requests to add or remove a flow using openflowplugin rpc.
266 :param cntl: controller's ip address or hostname
268 :param method: determines http request method
270 :param flows: list of flow details
272 :param template: flow template to be to be filled
275 :returns req: http request object
279 url = 'http://' + cntl + ':' + '8181/restconf/operations/sal-bulk-flow:' + method
281 for sw, tab, fl, ip in flows:
282 flow = copy.deepcopy(template['input']['bulk-flow-item'][0])
283 flow['node'] = _node_tmpl.format(sw)
285 flow['flow-name'] = 'TestFlow-%d' % fl
286 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
287 flow['table_id'] = tab
289 fmod = copy.deepcopy(template)
290 fmod['input']['bulk-flow-item'] = fdets
291 req_data = json.dumps(fmod)
292 req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
293 auth=('admin', 'admin'))
297 def _prepare_ds_item(cntl, method, flows, template=None):
298 """Creates a POST http requests to configure a flow in configuration datastore.
300 Ofp uses write operation, standrd POST to config datastore uses read-write operation (on java level)
303 :param cntl: controller's ip address or hostname
305 :param method: determines http request method
307 :param flows: list of flow details
309 :param template: flow template to be to be filled
312 :returns req: http request object
316 url = 'http://' + cntl + ':' + '8181/restconf/operations/sal-bulk-flow:' + method
318 for sw, tab, fl, ip in flows:
319 flow = copy.deepcopy(template['input']['bulk-flow-item'][0])
320 flow['node'] = _node_tmpl.format(sw)
322 flow['flow-name'] = 'TestFlow-%d' % fl
323 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ip))
324 flow['table_id'] = tab
327 fmod = copy.deepcopy(template)
328 del fmod['input']['bulk-flow-item']
329 fmod['input']['bulk-flow-ds-item'] = fdets
330 req_data = json.dumps(fmod)
331 req = requests.Request('POST', url, headers={'Content-Type': 'application/json'}, data=req_data,
332 auth=('admin', 'admin'))
336 def _wt_request_sender(thread_id, preparefnc, inqueue=None, exitevent=None, controllers=[], restport='',
337 template=None, outqueue=None, method=None):
338 """The funcion sends http requests.
340 Runs in the working thread. It reads out flow details from the queue and sends apropriate http requests
344 :param thread_id: thread id
346 :param preparefnc: function to preparesthe http request
348 :param inqueue: input queue, flow details are comming from here
350 :param exitevent: event to notify working thread that parent (task executor) stopped filling the input queue
352 :param controllers: a list of controllers' ip addresses or hostnames
354 :param restport: restconf port
356 :param template: flow template used for creating flow content
358 :param outqueue: queue where the results should be put
360 :param method: method derermines the type of http request
363 nothing, results must be put into the output queue
365 ses = requests.Session()
366 cntl = controllers[0]
367 counter = [0 for i in range(600)]
372 flowlist = inqueue.get(timeout=1)
374 if exitevent.is_set() and inqueue.empty():
377 req = preparefnc(cntl, method, flowlist, template=template)
378 # prep = ses.prepare_request(req)
381 rsp = ses.send(prep, timeout=5)
382 except requests.exceptions.Timeout:
385 counter[rsp.status_code] += 1
387 for i, v in enumerate(counter):
393 def _task_executor(method='', flow_template=None, flow_details=[], controllers=['127.0.0.1'],
394 restport='8181', nrthreads=1, fpr=1):
395 """The main function which drives sending of http requests.
397 Creates 2 queues and requested number of 'working threads'. One queue is filled with flow details and working
398 threads read them out and send http requests. The other queue is for sending results from working threads back.
399 After the threads' join, it produces a summary result.
402 :param method: based on this the function which prepares http request is choosen
404 :param flow_template: template to generate a flow content
406 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
408 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
410 :param restport: restconf port (default='8181')
412 :param nrthreads: number of threads used to send http requests (default=1)
414 :param fpr: flow per request, number of flows sent in one http request
417 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
419 # TODO: multi controllers support
420 ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')))
422 # choose message prepare function
424 preparefnc = _prepare_add
425 # put can contain only 1 flow, lets overwrite any value of flows per request
427 elif method == 'POST':
428 preparefnc = _prepare_table_add
429 elif method == 'DELETE':
430 preparefnc = _prepare_delete
431 # delete flow can contain only 1 flow, lets overwrite any value of flows per request
433 elif method in ['add-flows-ds', 'remove-flows-ds']:
434 preparefnc = _prepare_ds_item
435 elif method in ['add-flows-rpc', 'remove-flows-rpc']:
436 preparefnc = _prepare_rpc_item
438 raise NotImplementedError('Method {0} does not have it\'s prepeare function defined'.format(method))
440 # lets enlarge the tupple of flow details with IP, to be used with the template
441 flows = [(sw, tab, flo, ip_addr.increment()) for sw, tab, flo in flow_details]
442 # lels divide flows into switches and tables - flow groups
447 if flowkey in flowgroups:
448 flowgroups[flowkey].append(flow)
450 flowgroups[flowkey] = [flow]
452 # lets fill the queue with details needed for one http requests
453 # we have lists with flow details for particular (switch, table) tupples, now we need to split the lists
454 # according to the flows per request (fpr) paramer
455 sendqueue = Queue.Queue()
456 for flowgroup, flow_list in flowgroups.iteritems():
457 while len(flow_list) > 0:
458 sendqueue.put(flow_list[:int(fpr)])
459 flow_list = flow_list[int(fpr):]
462 resultqueue = Queue.Queue()
464 exitevent = threading.Event()
466 # lets start threads whic will read flow details fro queues and send
468 for i in range(int(nrthreads)):
469 thr = threading.Thread(target=_wt_request_sender, args=(i, preparefnc),
470 kwargs={"inqueue": sendqueue, "exitevent": exitevent,
471 "controllers": controllers, "restport": restport,
472 "template": flow_template, "outqueue": resultqueue, "method": method})
479 # waitng for reqults and sum them up
482 # reading partial resutls from sender thread
483 part_result = resultqueue.get()
484 for k, v in part_result.iteritems():
492 def configure_flows(*args, **kwargs):
493 """Configure flows based on given flow details.
496 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
498 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
500 :param restport: restconf port (default='8181')
502 :param nrthreads: number of threads used to send http requests (default=1)
505 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
507 return _task_executor(method='PUT', flow_template=_default_flow_template_json, **kwargs)
510 def deconfigure_flows(*args, **kwargs):
511 """Deconfigure flows based on given flow details.
514 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
516 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
518 :param restport: restconf port (default='8181')
520 :param nrthreads: number of threads used to send http requests (default=1)
523 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
525 return _task_executor(method='DELETE', flow_template=_default_flow_template_json, **kwargs)
528 def configure_flows_bulk(*args, **kwargs):
529 """Configure flows based on given flow details using a POST http request..
532 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
534 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
536 :param restport: restconf port (default='8181')
538 :param nrthreads: number of threads used to send http requests (default=1)
541 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
543 return _task_executor(method='POST', flow_template=_default_flow_template_json, **kwargs)
546 def operations_add_flows_ds(*args, **kwargs):
547 """Configure flows based on given flow details.
550 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
552 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
554 :param restport: restconf port (default='8181')
556 :param nrthreads: number of threads used to send http requests (default=1)
559 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
561 return _task_executor(method='add-flows-ds', flow_template=_default_operations_item_json, **kwargs)
564 def operations_remove_flows_ds(*args, **kwargs):
565 """Remove flows based on given flow details.
568 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
570 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
572 :param restport: restconf port (default='8181')
574 :param nrthreads: number of threads used to send http requests (default=1)
577 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
579 return _task_executor(method='remove-flows-ds', flow_template=_default_operations_item_json, **kwargs)
582 def operations_add_flows_rpc(*args, **kwargs):
583 """Configure flows based on given flow details using rpc calls.
586 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
588 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
590 :param restport: restconf port (default='8181')
592 :param nrthreads: number of threads used to send http requests (default=1)
595 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
597 return _task_executor(method='add-flows-rpc', flow_template=_default_operations_item_json, **kwargs)
600 def operations_remove_flows_rpc(*args, **kwargs):
601 """Remove flows based on given flow details using rpc calls.
604 :param flow_details: a list of tupples with flow details (switch_id, table_id, flow_id, ip_addr) (default=[])
606 :param controllers: a list of controllers host names or ip addresses (default=['127.0.0.1'])
608 :param restport: restconf port (default='8181')
610 :param nrthreads: number of threads used to send http requests (default=1)
613 :returns dict: dictionary of http response counts like {'http_status_code1: 'count1', etc.}
615 return _task_executor(method='remove-flows-rpc', flow_template=_default_operations_item_json, **kwargs)
618 def _get_operational_inventory_of_switches(controller):
619 """Gets number of switches present in the operational inventory
622 :param controller: controller's ip or host name
625 :returns switches: number of switches connected
627 url = 'http://' + controller + ':8181/restconf/operational/opendaylight-inventory:nodes'
628 rsp = requests.get(url, headers={'Accept': 'application/json'}, stream=False, auth=('admin', 'admin'))
629 if rsp.status_code != 200:
631 inv = json.loads(rsp.content)
632 if 'nodes' not in inv:
634 if 'node' not in inv['nodes']:
636 inv = inv['nodes']['node']
637 switches = [sw for sw in inv if 'openflow:' in sw['id']]
641 def flow_stats_collected(controller=''):
642 """Provides the operational inventory counts counts of switches and flows.
645 :param controller: controller's ip address or host name
648 :returns (switches, flows_reported, flows-found): tupple with counts of switches, reported and found flows
650 # print type(flow_details), flow_details
653 switches = _get_operational_inventory_of_switches(controller)
657 tabs = sw['flow-node-inventory:table']
659 active_flows += t['opendaylight-flow-table-statistics:flow-table-statistics']['active-flows']
661 found_flows += len(t['flow'])
662 print "Switches,ActiveFlows(reported)/FlowsFound", len(switches), active_flows, found_flows
663 return len(switches), active_flows, found_flows
666 def get_switches_count(controller=''):
667 """Gives the count of the switches presnt in the operational inventory nodes datastore.
670 :param controller: controller's ip address or host name
673 :returns switches: returns the number of connected switches
675 switches = _get_operational_inventory_of_switches(controller)
681 def validate_responses(received, expected):
682 """Compares given response summary with expected results.
685 :param received: dictionary returned from operations_* and (de)configure_flows*
687 e.g. received = { 200:41 } - this means that we 41x receives response with status code 200
689 :param expected: list of expected http result codes
690 e.g. expected=[200] - we expect only http status 200 to be present
693 :returns True: if list of http statuses from received responses is the same as exxpected
694 :returns False: elseware
696 return True if received.keys() == expected else False