3 from random import randrange
15 __author__ = "Jan Medved"
16 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
17 __license__ = "New-style BSD"
18 __email__ = "jmedved@cisco.com"
21 class Counter(object):
22 def __init__(self, start=0):
23 self.lock = threading.Lock()
26 def increment(self, value=1):
37 def __init__(self, verbose=False):
38 self.verbose = verbose
41 self.start = time.time()
44 def __exit__(self, *args):
45 self.end = time.time()
46 self.secs = self.end - self.start
47 self.msecs = self.secs * 1000 # millisecs
49 print("elapsed time: %f ms" % self.msecs)
52 class FlowConfigBlaster(object):
53 putheaders = {'content-type': 'application/json'}
54 getheaders = {'Accept': 'application/json'}
56 FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
57 TBLURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0"
58 INVURL = 'restconf/operational/opendaylight-inventory:nodes'
63 # The "built-in" flow template
64 flow_mode_template = {
67 u'hard-timeout': 65000,
68 u'idle-timeout': 65000,
69 u'cookie_mask': 4294967295,
70 u'flow-name': u'FLOW-NAME-TEMPLATE',
76 u'id': u'FLOW-ID-TEMPLATE',
78 u'ipv4-destination': u'0.0.0.0/32',
104 class FcbStats(object):
106 FlowConfigBlaster Statistics: a class that stores and further processes
107 statistics collected by Blaster worker threads during their execution.
111 self.ok_rqst_rate = Counter(0.0)
112 self.total_rqst_rate = Counter(0.0)
113 self.ok_flow_rate = Counter(0.0)
114 self.total_flow_rate = Counter(0.0)
115 self.ok_rqsts = Counter(0)
116 self.total_rqsts = Counter(0)
117 self.ok_flows = Counter(0)
118 self.total_flows = Counter(0)
120 def process_stats(self, rqst_stats, flow_stats, elapsed_time):
122 Calculates the stats for RESTCONF request and flow programming
123 throughput, and aggregates statistics across all Blaster threads.
126 rqst_stats: Request statistics dictionary
127 flow_stats: Flow statistcis dictionary
128 elapsed_time: Elapsed time for the test
130 Returns: Rates (requests/sec) for successfully finished requests,
131 the total number of requests, sucessfully installed flow and
132 the total number of flows
134 ok_rqsts = rqst_stats[200] + rqst_stats[204]
135 total_rqsts = sum(rqst_stats.values())
136 ok_flows = flow_stats[200] + flow_stats[204]
137 total_flows = sum(flow_stats.values())
139 ok_rqst_rate = ok_rqsts / elapsed_time
140 total_rqst_rate = total_rqsts / elapsed_time
141 ok_flow_rate = ok_flows / elapsed_time
142 total_flow_rate = total_flows / elapsed_time
144 self.ok_rqsts.increment(ok_rqsts)
145 self.total_rqsts.increment(total_rqsts)
146 self.ok_flows.increment(ok_flows)
147 self.total_flows.increment(total_flows)
149 self.ok_rqst_rate.increment(ok_rqst_rate)
150 self.total_rqst_rate.increment(total_rqst_rate)
151 self.ok_flow_rate.increment(ok_flow_rate)
152 self.total_flow_rate.increment(total_flow_rate)
154 return ok_rqst_rate, total_rqst_rate, ok_flow_rate, total_flow_rate
156 def get_ok_rqst_rate(self):
157 return self.ok_rqst_rate.value
159 def get_total_rqst_rate(self):
160 return self.total_rqst_rate.value
162 def get_ok_flow_rate(self):
163 return self.ok_flow_rate.value
165 def get_total_flow_rate(self):
166 return self.total_flow_rate.value
168 def get_ok_rqsts(self):
169 return self.ok_rqsts.value
171 def get_total_rqsts(self):
172 return self.total_rqsts.value
174 def get_ok_flows(self):
175 return self.ok_flows.value
177 def get_total_flows(self):
178 return self.total_flows.value
180 def __init__(self, host, port, ncycles, nthreads, fpr, nnodes, nflows, startflow, auth, flow_mod_template=None):
183 self.ncycles = ncycles
184 self.nthreads = nthreads
188 self.startflow = startflow
191 if flow_mod_template:
192 self.flow_mode_template = flow_mod_template
194 self.post_url_template = 'http://%s:' + self.port + '/' + self.TBLURL
195 self.del_url_template = 'http://%s:' + self.port + '/' + self.FLWURL
197 self.stats = self.FcbStats()
198 self.total_ok_flows = 0
199 self.total_ok_rqsts = 0
201 self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
203 self.print_lock = threading.Lock()
204 self.cond = threading.Condition()
205 self.threads_done = 0
207 for i in range(self.nthreads):
210 def get_num_nodes(self, session):
212 Determines the number of OF nodes in the connected mininet network. If
213 mininet is not connected, the default number of flows is set to 16.
214 :param session: 'requests' session which to use to query the controller
218 hosts = self.host.split(",")
220 inventory_url = 'http://' + host + ":" + self.port + '/' + self.INVURL
224 r = session.get(inventory_url, headers=self.getheaders, stream=False, timeout=self.TIMEOUT)
226 r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'),
227 timeout=self.TIMEOUT)
229 if r.status_code == 200:
231 inv = json.loads(r.content)['nodes']['node']
233 for n in range(len(inv)):
234 if re.search('openflow', inv[n]['id']) is not None:
243 def create_flow_from_template(self, flow_id, ipaddr, node_id):
245 Create a new flow instance from the flow template specified during
246 FlowConfigBlaster instantiation. Flow templates are json-compatible
247 dictionaries that MUST contain elements for flow cookie, flow name,
248 flow id and the destination IPv4 address in the flow match field.
251 flow_id: Id for the new flow to create
252 ipaddr: IP Address to put into the flow's match
253 node_id: ID of the node where to create the flow
255 Returns: The flow that gas been created from the template
258 flow = copy.deepcopy(self.flow_mode_template['flow'][0])
259 flow['cookie'] = flow_id
260 flow['flow-name'] = self.create_flow_name(flow_id)
261 flow['id'] = str(flow_id)
262 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ipaddr))
265 def post_flows(self, session, node, flow_list, flow_count):
267 Performs a RESTCONF post of flows passed in the 'flow_list' parameters
268 :param session: 'requests' session on which to perform the POST
269 :param node: The ID of the openflow node to which to post the flows
270 :param flow_list: List of flows (in dictionary form) to POST
271 :param flow_count: Flow counter for round-robin host load balancing
273 :return: status code from the POST operation
275 flow_data = self.convert_to_json(flow_list, node)
277 hosts = self.host.split(",")
278 host = hosts[flow_count % len(hosts)]
279 flow_url = self.assemble_post_url(host, node)
282 r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, timeout=self.TIMEOUT)
284 r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'),
285 timeout=self.TIMEOUT)
289 def assemble_post_url(self, host, node):
291 Creates url pointing to config dataStore: /nodes/node/<node-id>/table/<table-id>
292 :param host: ip address or host name pointing to controller
293 :param node: id of node (without protocol prefix and colon)
294 :return: url suitable for sending a flow to controller via POST method
296 return self.post_url_template % (host, node)
298 def convert_to_json(self, flow_list, node_id=None):
300 Dumps flows to json form.
301 :param flow_list: list of flows in json friendly structure
302 :param node_id: node identifier of corresponding node
303 :return: string containing plain json
305 fmod = dict(self.flow_mode_template)
306 fmod['flow'] = flow_list
307 flow_data = json.dumps(fmod)
310 def add_flows(self, start_flow_id, tid):
312 Adds flows into the ODL config data store. This function is executed by
313 a worker thread (the Blaster thread). The number of flows created and
314 the batch size (i.e. how many flows will be put into a RESTCONF request)
315 are determined by control parameters initialized when FlowConfigBlaster
317 :param start_flow_id - the ID of the first flow. Each Blaster thread
318 programs a different set of flows
319 :param tid: Thread ID - used to id the Blaster thread when statistics
320 for the thread are printed out
323 rqst_stats = {200: 0, 204: 0}
324 flow_stats = {200: 0, 204: 0}
326 s = requests.Session()
328 n_nodes = self.get_num_nodes(s)
330 with self.print_lock:
331 print(' Thread %d:\n Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes))
335 while nflows < self.nflows:
336 node_id = randrange(1, n_nodes + 1)
338 for i in range(self.fpr):
339 flow_id = tid * (self.ncycles * self.nflows) + nflows + start_flow_id + self.startflow
340 self.flows[tid][flow_id] = node_id
341 flow_list.append(self.create_flow_from_template(flow_id, self.ip_addr.increment(), node_id))
343 if nflows >= self.nflows:
345 nb_actions.append((s, node_id, flow_list, nflows))
348 for nb_action in nb_actions:
349 sts = self.post_flows(*nb_action)
352 flow_stats[sts] += len(nb_action[2])
355 flow_stats[sts] = len(nb_action[2])
357 ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, flow_stats, t.secs)
359 with self.print_lock:
360 print('\n Thread %d results (ADD): ' % tid)
361 print(' Elapsed time: %.2fs,' % t.secs)
362 print(' Requests/s: %.2f OK, %.2f Total' % (ok_rps, total_rps))
363 print(' Flows/s: %.2f OK, %.2f Total' % (ok_fps, total_fps))
364 print(' Stats ({Requests}, {Flows}): ')
367 self.threads_done += 1
372 self.cond.notifyAll()
374 def delete_flow(self, session, node, flow_id, flow_count):
376 Deletes a single flow from the ODL config data store using RESTCONF
378 session: 'requests' session on which to perform the POST
379 node: Id of the openflow node from which to delete the flow
380 flow_id: ID of the to-be-deleted flow
381 flow_count: Index of the flow being processed (for round-robin LB)
383 Returns: status code from the DELETE operation
387 hosts = self.host.split(",")
388 host = hosts[flow_count % len(hosts)]
389 flow_url = self.del_url_template % (host, node, flow_id)
392 r = session.delete(flow_url, headers=self.getheaders, timeout=self.TIMEOUT)
394 r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'), timeout=self.TIMEOUT)
398 def delete_flows(self, start_flow, tid):
400 Deletes flow from the ODL config space that have been added using the
401 'add_flows()' function. This function is executed by a worker thread
402 :param start_flow - the ID of the first flow. Each Blaster thread
403 deletes a different set of flows
404 :param tid: Thread ID - used to id the Blaster thread when statistics
405 for the thread are printed out
409 rqst_stats = {200: 0, 204: 0}
411 s = requests.Session()
412 n_nodes = self.get_num_nodes(s)
414 with self.print_lock:
415 print('Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes))
418 for flow in range(self.nflows):
419 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
420 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id, flow)
426 ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, rqst_stats, t.secs)
428 with self.print_lock:
429 print('\n Thread %d results (DELETE): ' % tid)
430 print(' Elapsed time: %.2fs,' % t.secs)
431 print(' Requests/s: %.2f OK, %.2f Total' % (ok_rps, total_rps))
432 print(' Flows/s: %.2f OK, %.2f Total' % (ok_fps, total_fps))
433 print(' Stats ({Requests})',)
435 self.threads_done += 1
440 self.cond.notifyAll()
442 def run_cycle(self, function):
444 Runs a flow-add or flow-delete test cycle. Each test consists of a
445 <cycles> test cycles, where <threads> worker (Blaster) threads are
446 started in each test cycle. Each Blaster thread programs <flows>
447 OpenFlow flows into the controller using the controller's RESTCONF API.
448 :param function: Add or delete, determines what test will be executed.
451 self.total_ok_flows = 0
452 self.total_ok_rqsts = 0
454 for c in range(self.ncycles):
455 self.stats = self.FcbStats()
456 with self.print_lock:
457 print('\nCycle %d:' % c)
460 for i in range(self.nthreads):
461 t = threading.Thread(target=function, args=(c * self.nflows, i))
465 # Wait for all threads to finish and measure the execution time
467 for thread in threads:
470 with self.print_lock:
471 print('\n*** Test summary:')
472 print(' Elapsed time: %.2fs' % t.secs)
473 print(' Peak requests/s: %.2f OK, %.2f Total' % (
474 self.stats.get_ok_rqst_rate(), self.stats.get_total_rqst_rate()))
475 print(' Peak flows/s: %.2f OK, %.2f Total' % (
476 self.stats.get_ok_flow_rate(), self.stats.get_total_flow_rate()))
477 print(' Avg. requests/s: %.2f OK, %.2f Total (%.2f%% of peak total)' % (
478 self.stats.get_ok_rqsts() / t.secs,
479 self.stats.get_total_rqsts() / t.secs,
480 (self.stats.get_total_rqsts() / t.secs * 100) / self.stats.get_total_rqst_rate()))
481 print(' Avg. flows/s: %.2f OK, %.2f Total (%.2f%% of peak total)' % (
482 self.stats.get_ok_flows() / t.secs,
483 self.stats.get_total_flows() / t.secs,
484 (self.stats.get_total_flows() / t.secs * 100) / self.stats.get_total_flow_rate()))
486 self.total_ok_flows += self.stats.get_ok_flows()
487 self.total_ok_rqsts += self.stats.get_ok_rqsts()
488 self.threads_done = 0
490 def add_blaster(self):
491 self.run_cycle(self.add_flows)
493 def delete_blaster(self):
494 self.run_cycle(self.delete_flows)
496 def get_ok_flows(self):
497 return self.total_ok_flows
499 def get_ok_rqsts(self):
500 return self.total_ok_rqsts
502 def create_flow_name(self, flow_id):
503 return 'TestFlow-%d' % flow_id
506 def get_json_from_file(filename):
508 Get a flow programming template from a file
509 :param filename: File from which to get the template
510 :return: The json flow template (string)
512 with open(filename, 'r') as f:
515 keys = ft['flow'][0].keys()
516 if (u'cookie' in keys) and (u'flow-name' in keys) and (u'id' in keys) and (u'match' in keys):
517 if u'ipv4-destination' in ft[u'flow'][0]['match'].keys():
518 print('File "%s" ok to use as flow template' % filename)
521 print('JSON parsing of file %s failed' % filename)
527 ###############################################################################
528 # This is an example of what the content of a JSON flow mode template should
529 # look like. Cut & paste to create a custom template. "id" and "ipv4-destination"
530 # MUST be unique if multiple flows will be programmed in the same test. It's
531 # also beneficial to have unique "cookie" and "flow-name" attributes for easier
532 # identification of the flow.
533 ###############################################################################
534 example_flow_mod_json = '''{
554 "hard-timeout": 65000,
561 "ipv4-destination": "10.0.0.38/32"
563 "flow-name": "TestFlow-8",
565 "cookie_mask": 4294967295,
568 "idle-timeout": 65000,
576 def create_arguments_parser():
578 Shorthand to arg parser on library level in order to access and eventually enhance in ancestors.
579 :return: argument parser supporting config blaster arguments and parameters
581 my_parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then'
582 ' deletes flows into the config tree, as specified by'
583 ' optional parameters.')
585 my_parser.add_argument('--host', default='127.0.0.1',
586 help='Host where odl controller is running (default is 127.0.0.1). '
587 'Specify a comma-separated list of hosts to perform round-robin load-balancing.')
588 my_parser.add_argument('--port', default='8181',
589 help='Port on which odl\'s RESTCONF is listening (default is 8181)')
590 my_parser.add_argument('--cycles', type=int, default=1,
591 help='Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are '
592 'performed in cycles. <THREADS> worker threads are started in each cycle and the cycle '
593 'ends when all threads finish. Another cycle is started when the previous cycle '
595 my_parser.add_argument('--threads', type=int, default=1,
596 help='Number of request worker threads to start in each cycle; default=1. '
597 'Each thread will add/delete <FLOWS> flows.')
598 my_parser.add_argument('--flows', type=int, default=10,
599 help='Number of flows that will be added/deleted by each worker thread in each cycle; '
601 my_parser.add_argument('--fpr', type=int, default=1,
602 help='Flows-per-Request - number of flows (batch size) sent in each HTTP request; '
604 my_parser.add_argument('--nodes', type=int, default=16,
605 help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
606 'flows will be evenly distributed (programmed) into connected nodes.')
607 my_parser.add_argument('--delay', type=int, default=0,
608 help='Time (in seconds) to wait between the add and delete cycles; default=0')
609 my_parser.add_argument('--delete', dest='delete', action='store_true', default=True,
610 help='Delete all added flows one by one, benchmark delete '
612 my_parser.add_argument('--no-delete', dest='delete', action='store_false',
613 help='Do not perform the delete cycle.')
614 my_parser.add_argument('--auth', dest='auth', action='store_true', default=False,
615 help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
616 'default: no authentication')
617 my_parser.add_argument('--startflow', type=int, default=0,
618 help='The starting Flow ID; default=0')
619 my_parser.add_argument('--file', default='',
620 help='File from which to read the JSON flow template; default: no file, use a built in '
625 if __name__ == "__main__":
626 ############################################################################
627 # This program executes the base performance test. The test adds flows into
628 # the controller's config space. This function is basically the CLI frontend
629 # to the FlowConfigBlaster class and drives its main functions: adding and
630 # deleting flows from the controller's config data store
631 ############################################################################
633 parser = create_arguments_parser()
634 in_args = parser.parse_args()
636 if in_args.file != '':
637 flow_template = get_json_from_file(in_args.file)
641 fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.fpr, in_args.nodes,
642 in_args.flows, in_args.startflow, in_args.auth)
644 # Run through <cycles>, where <threads> are started in each cycle and
645 # <flows> are added from each thread
648 print('\n*** Total flows added: %s' % fct.get_ok_flows())
649 print(' HTTP[OK] results: %d\n' % fct.get_ok_rqsts())
651 if in_args.delay > 0:
652 print('*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay)
653 time.sleep(in_args.delay)
655 # Run through <cycles>, where <threads> are started in each cycle and
656 # <flows> previously added in an add cycle are deleted in each thread
659 print('\n*** Total flows deleted: %s' % fct.get_ok_flows())
660 print(' HTTP[OK] results: %d\n' % fct.get_ok_rqsts())