2 __author__ = "Jan Medved"
3 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
4 __license__ = "New-style BSD"
5 __email__ = "jmedved@cisco.com"
7 from random import randrange
19 class Counter(object):
20 def __init__(self, start=0):
21 self.lock = threading.Lock()
24 def increment(self, value=1):
35 def __init__(self, verbose=False):
36 self.verbose = verbose
39 self.start = time.time()
42 def __exit__(self, *args):
43 self.end = time.time()
44 self.secs = self.end - self.start
45 self.msecs = self.secs * 1000 # millisecs
47 print ("elapsed time: %f ms" % self.msecs)
50 class FlowConfigBlaster(object):
51 putheaders = {'content-type': 'application/json'}
52 getheaders = {'Accept': 'application/json'}
54 FLWURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
55 TBLURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0"
56 INVURL = 'restconf/operational/opendaylight-inventory:nodes'
60 # The "built-in" flow template
61 flow_mode_template = {
64 u'hard-timeout': 65000,
65 u'idle-timeout': 65000,
66 u'cookie_mask': 4294967295,
67 u'flow-name': u'FLOW-NAME-TEMPLATE',
73 u'id': u'FLOW-ID-TEMPLATE',
75 u'ipv4-destination': u'0.0.0.0/32',
101 class FcbStats(object):
103 FlowConfigBlaster Statistics: a class that stores and further processes
104 statistics collected by Blaster worker threads during their execution.
107 self.ok_rqst_rate = Counter(0.0)
108 self.total_rqst_rate = Counter(0.0)
109 self.ok_flow_rate = Counter(0.0)
110 self.total_flow_rate = Counter(0.0)
111 self.ok_rqsts = Counter(0)
112 self.total_rqsts = Counter(0)
113 self.ok_flows = Counter(0)
114 self.total_flows = Counter(0)
116 def process_stats(self, rqst_stats, flow_stats, elapsed_time):
118 Calculates the stats for RESTCONF request and flow programming
119 throughput, and aggregates statistics across all Blaster threads.
121 ok_rqsts = rqst_stats[200] + rqst_stats[204]
122 total_rqsts = sum(rqst_stats.values())
123 ok_flows = flow_stats[200] + flow_stats[204]
124 total_flows = sum(flow_stats.values())
126 ok_rqst_rate = ok_rqsts / elapsed_time
127 total_rqst_rate = total_rqsts / elapsed_time
128 ok_flow_rate = ok_flows / elapsed_time
129 total_flow_rate = total_flows / elapsed_time
131 self.ok_rqsts.increment(ok_rqsts)
132 self.total_rqsts.increment(total_rqsts)
133 self.ok_flows.increment(ok_flows)
134 self.total_flows.increment(total_flows)
136 self.ok_rqst_rate.increment(ok_rqst_rate)
137 self.total_rqst_rate.increment(total_rqst_rate)
138 self.ok_flow_rate.increment(ok_flow_rate)
139 self.total_flow_rate.increment(total_flow_rate)
141 return ok_rqst_rate, total_rqst_rate, ok_flow_rate, total_flow_rate
143 def get_ok_rqst_rate(self):
144 return self.ok_rqst_rate.value
146 def get_total_rqst_rate(self):
147 return self.total_rqst_rate.value
149 def get_ok_flow_rate(self):
150 return self.ok_flow_rate.value
152 def get_total_flow_rate(self):
153 return self.total_flow_rate.value
155 def get_ok_rqsts(self):
156 return self.ok_rqsts.value
158 def get_total_rqsts(self):
159 return self.total_rqsts.value
161 def get_ok_flows(self):
162 return self.ok_flows.value
164 def get_total_flows(self):
165 return self.total_flows.value
167 def __init__(self, host, port, ncycles, nthreads, fpr, nnodes, nflows, startflow, auth, flow_mod_template=None):
170 self.ncycles = ncycles
171 self.nthreads = nthreads
175 self.startflow = startflow
178 if flow_mod_template:
179 self.flow_mode_template = flow_mod_template
181 self.post_url_template = 'http://' + self.host + ":" + self.port + '/' + self.TBLURL
182 self.del_url_template = 'http://' + self.host + ":" + self.port + '/' + self.FLWURL
184 self.stats = self.FcbStats()
185 self.total_ok_flows = 0
186 self.total_ok_rqsts = 0
188 self.ip_addr = Counter(int(netaddr.IPAddress('10.0.0.1')) + startflow)
190 self.print_lock = threading.Lock()
191 self.cond = threading.Condition()
192 self.threads_done = 0
194 for i in range(self.nthreads):
197 def get_num_nodes(self, session):
199 Determines the number of OF nodes in the connected mininet network. If
200 mininet is not connected, the default number of flows is set to 16.
201 :param session: 'requests' session which to use to query the controller
205 inventory_url = 'http://' + self.host + ":" + self.port + '/' + self.INVURL
209 r = session.get(inventory_url, headers=self.getheaders, stream=False)
211 r = session.get(inventory_url, headers=self.getheaders, stream=False, auth=('admin', 'admin'))
213 if r.status_code == 200:
215 inv = json.loads(r.content)['nodes']['node']
217 for n in range(len(inv)):
218 if re.search('openflow', inv[n]['id']) is not None:
227 def create_flow_from_template(self, flow_id, ipaddr):
229 Create a new flow instance from the flow template specified during
230 FlowConfigBlaster instantiation. Flow templates are json-compatible
231 dictionaries that MUST contain elements for flow cookie, flow name,
232 flow id and the destination IPv4 address in the flow match field.
233 :param flow_id: Id for the new flow to create
234 :param ipaddr: IP Address to put into the flow's match
235 :return: The newly created flow instance
237 flow = copy.deepcopy(self.flow_mode_template['flow'][0])
238 flow['cookie'] = flow_id
239 flow['flow-name'] = 'TestFlow-%d' % flow_id
240 flow['id'] = str(flow_id)
241 flow['match']['ipv4-destination'] = '%s/32' % str(netaddr.IPAddress(ipaddr))
244 def post_flows(self, session, node, flow_list):
246 Performs a RESTCONF post of flows passed in the 'flow_list' parameters
247 :param session: 'requests' session on which to perform the POST
248 :param node: The ID of the openflow node to which to post the flows
249 :param flow_list: List of flows (in dictionary form) to POST
250 :return: status code from the POST operation
252 fmod = dict(self.flow_mode_template)
253 fmod['flow'] = flow_list
254 flow_data = json.dumps(fmod)
256 flow_url = self.post_url_template % node
260 r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False)
262 r = session.post(flow_url, data=flow_data, headers=self.putheaders, stream=False, auth=('admin', 'admin'))
266 def add_flows(self, start_flow_id, tid):
268 Adds flows into the ODL config data store. This function is executed by
269 a worker thread (the Blaster thread). The number of flows created and
270 the batch size (i.e. how many flows will be put into a RESTCONF request)
271 are determined by control parameters initialized when FlowConfigBlaster
273 :param start_flow_id - the ID of the first flow. Each Blaster thread
274 programs a different set of flows
275 :param tid: Thread ID - used to id the Blaster thread when statistics
276 for the thread are printed out
279 rqst_stats = {200: 0, 204: 0}
280 flow_stats = {200: 0, 204: 0}
282 s = requests.Session()
284 n_nodes = self.get_num_nodes(s)
286 with self.print_lock:
287 print ' Thread %d:\n Adding %d flows on %d nodes' % (tid, self.nflows, n_nodes)
291 while nflows < self.nflows:
292 node_id = randrange(1, n_nodes + 1)
294 for i in range(self.fpr):
295 flow_id = tid * (self.ncycles * self.nflows) + nflows + start_flow_id + self.startflow
296 self.flows[tid][flow_id] = node_id
297 flow_list.append(self.create_flow_from_template(flow_id, self.ip_addr.increment()))
299 if nflows >= self.nflows:
301 sts = self.post_flows(s, node_id, flow_list)
304 flow_stats[sts] += len(flow_list)
307 flow_stats[sts] = len(flow_list)
309 ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, flow_stats, t.secs)
311 with self.print_lock:
312 print '\n Thread %d results (ADD): ' % tid
313 print ' Elapsed time: %.2fs,' % t.secs
314 print ' Requests/s: %.2f OK, %.2f Total' % (ok_rps, total_rps)
315 print ' Flows/s: %.2f OK, %.2f Total' % (ok_fps, total_fps)
316 print ' Stats ({Requests}, {Flows}): ',
319 self.threads_done += 1
324 self.cond.notifyAll()
326 def delete_flow(self, session, node, flow_id):
328 Deletes a single flow from the ODL config data store using RESTCONF
329 :param session: 'requests' session on which to perform the POST
330 :param node: Id of the openflow node from which to delete the flow
331 :param flow_id: ID of the to-be-deleted flow
332 :return: status code from the DELETE operation
334 flow_url = self.del_url_template % (node, flow_id)
337 r = session.delete(flow_url, headers=self.getheaders)
339 r = session.delete(flow_url, headers=self.getheaders, auth=('admin', 'admin'))
343 def delete_flows(self, start_flow, tid):
345 Deletes flow from the ODL config space that have been added using the
346 'add_flows()' function. This function is executed by a worker thread
347 :param start_flow_id - the ID of the first flow. Each Blaster thread
348 deletes a different set of flows
349 :param tid: Thread ID - used to id the Blaster thread when statistics
350 for the thread are printed out
355 rqst_stats = {200: 0, 204: 0}
357 s = requests.Session()
358 n_nodes = self.get_num_nodes(s)
360 with self.print_lock:
361 print 'Thread %d: Deleting %d flows on %d nodes' % (tid, self.nflows, n_nodes)
364 for flow in range(self.nflows):
365 flow_id = tid * (self.ncycles * self.nflows) + flow + start_flow + self.startflow
366 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id)
372 ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(rqst_stats, rqst_stats, t.secs)
374 with self.print_lock:
375 print '\n Thread %d results (DELETE): ' % tid
376 print ' Elapsed time: %.2fs,' % t.secs
377 print ' Requests/s: %.2f OK, %.2f Total' % (ok_rps, total_rps)
378 print ' Flows/s: %.2f OK, %.2f Total' % (ok_fps, total_fps)
379 print ' Stats ({Requests})',
381 self.threads_done += 1
386 self.cond.notifyAll()
388 def run_cycle(self, function):
390 Runs a flow-add or flow-delete test cycle. Each test consists of a
391 <cycles> test cycles, where <threads> worker (Blaster) threads are
392 started in each test cycle. Each Blaster thread programs <flows>
393 OpenFlow flows into the controller using the controller's RESTCONF API.
394 :param function: Add or delete, determines what test will be executed.
397 self.total_ok_flows = 0
398 self.total_ok_rqsts = 0
400 for c in range(self.ncycles):
401 self.stats = self.FcbStats()
402 with self.print_lock:
403 print '\nCycle %d:' % c
406 for i in range(self.nthreads):
407 t = threading.Thread(target=function, args=(c * self.nflows, i))
411 # Wait for all threads to finish and measure the execution time
413 while self.threads_done < self.nthreads:
417 with self.print_lock:
418 print '\n*** Test summary:'
419 print ' Elapsed time: %.2fs' % t.secs
420 print ' Peak requests/s: %.2f OK, %.2f Total' % (
421 self.stats.get_ok_rqst_rate(), self.stats.get_total_rqst_rate())
422 print ' Peak flows/s: %.2f OK, %.2f Total' % (
423 self.stats.get_ok_flow_rate(), self.stats.get_total_flow_rate())
424 print ' Avg. requests/s: %.2f OK, %.2f Total (%.2f%% of peak total)' % (
425 self.stats.get_ok_rqsts() / t.secs,
426 self.stats.get_total_rqsts() / t.secs,
427 (self.stats.get_total_rqsts() / t.secs * 100) / self.stats.get_total_rqst_rate())
428 print ' Avg. flows/s: %.2f OK, %.2f Total (%.2f%% of peak total)' % (
429 self.stats.get_ok_flows() / t.secs,
430 self.stats.get_total_flows() / t.secs,
431 (self.stats.get_total_flows() / t.secs * 100) / self.stats.get_total_flow_rate())
433 self.total_ok_flows += self.stats.get_ok_flows()
434 self.total_ok_rqsts += self.stats.get_ok_rqsts()
435 self.threads_done = 0
437 def add_blaster(self):
438 self.run_cycle(self.add_flows)
440 def delete_blaster(self):
441 self.run_cycle(self.delete_flows)
443 def get_ok_flows(self):
444 return self.total_ok_flows
446 def get_ok_rqsts(self):
447 return self.total_ok_rqsts
450 def get_json_from_file(filename):
452 Get a flow programming template from a file
453 :param filename: File from which to get the template
454 :return: The json flow template (string)
456 with open(filename, 'r') as f:
459 keys = ft['flow'][0].keys()
460 if (u'cookie' in keys) and (u'flow-name' in keys) and (u'id' in keys) and (u'match' in keys):
461 if u'ipv4-destination' in ft[u'flow'][0]['match'].keys():
462 print 'File "%s" ok to use as flow template' % filename
465 print 'JSON parsing of file %s failed' % filename
470 ###############################################################################
471 # This is an example of what the content of a JSON flow mode template should
472 # look like. Cut & paste to create a custom template. "id" and "ipv4-destination"
473 # MUST be unique if multiple flows will be programmed in the same test. It's
474 # also beneficial to have unique "cookie" and "flow-name" attributes for easier
475 # identification of the flow.
476 ###############################################################################
477 example_flow_mod_json = '''{
497 "hard-timeout": 65000,
504 "ipv4-destination": "10.0.0.38/32"
506 "flow-name": "TestFlow-8",
508 "cookie_mask": 4294967295,
511 "idle-timeout": 65000,
518 if __name__ == "__main__":
519 ############################################################################
520 # This program executes the base performance test. The test adds flows into
521 # the controller's config space. This function is basically the CLI frontend
522 # to the FlowConfigBlaster class and drives its main functions: adding and
523 # deleting flows from the controller's config data store
524 ############################################################################
525 parser = argparse.ArgumentParser(description='Flow programming performance test: First adds and then deletes flows '
526 'into the config tree, as specified by optional parameters.')
528 parser.add_argument('--host', default='127.0.0.1',
529 help='Host where odl controller is running (default is 127.0.0.1)')
530 parser.add_argument('--port', default='8181',
531 help='Port on which odl\'s RESTCONF is listening (default is 8181)')
532 parser.add_argument('--cycles', type=int, default=1,
533 help='Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are '
534 'performed in cycles. <THREADS> worker threads are started in each cycle and the cycle '
535 'ends when all threads finish. Another cycle is started when the previous cycle finished.')
536 parser.add_argument('--threads', type=int, default=1,
537 help='Number of request worker threads to start in each cycle; default=1. '
538 'Each thread will add/delete <FLOWS> flows.')
539 parser.add_argument('--flows', type=int, default=10,
540 help='Number of flows that will be added/deleted by each worker thread in each cycle; '
542 parser.add_argument('--fpr', type=int, default=1,
543 help='Flows-per-Request - number of flows (batch size) sent in each HTTP request; '
545 parser.add_argument('--nodes', type=int, default=16,
546 help='Number of nodes if mininet is not connected; default=16. If mininet is connected, '
547 'flows will be evenly distributed (programmed) into connected nodes.')
548 parser.add_argument('--delay', type=int, default=0,
549 help='Time (in seconds) to wait between the add and delete cycles; default=0')
550 parser.add_argument('--delete', dest='delete', action='store_true', default=True,
551 help='Delete all added flows one by one, benchmark delete '
553 parser.add_argument('--no-delete', dest='delete', action='store_false',
554 help='Do not perform the delete cycle.')
555 parser.add_argument('--auth', dest='auth', action='store_true', default=False,
556 help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
557 'default: no authentication')
558 parser.add_argument('--startflow', type=int, default=0,
559 help='The starting Flow ID; default=0')
560 parser.add_argument('--file', default='',
561 help='File from which to read the JSON flow template; default: no file, use a built in '
564 in_args = parser.parse_args()
566 if in_args.file != '':
567 flow_template = get_json_from_file(in_args.file)
571 fct = FlowConfigBlaster(in_args.host, in_args.port, in_args.cycles, in_args.threads, in_args.fpr, in_args.nodes,
572 in_args.flows, in_args.startflow, in_args.auth)
574 # Run through <cycles>, where <threads> are started in each cycle and
575 # <flows> are added from each thread
578 print '\n*** Total flows added: %s' % fct.get_ok_flows()
579 print ' HTTP[OK] results: %d\n' % fct.get_ok_rqsts()
581 if in_args.delay > 0:
582 print '*** Waiting for %d seconds before the delete cycle ***\n' % in_args.delay
583 time.sleep(in_args.delay)
585 # Run through <cycles>, where <threads> are started in each cycle and
586 # <flows> previously added in an add cycle are deleted in each thread
589 print '\n*** Total flows deleted: %s' % fct.get_ok_flows()
590 print ' HTTP[OK] results: %d\n' % fct.get_ok_rqsts()