3 from random import randrange
15 __author__ = "Jan Medved"
16 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
17 __license__ = "New-style BSD"
18 __email__ = "jmedved@cisco.com"
21 class Counter(object):
22 def __init__(self, start=0):
23 self.lock = threading.Lock()
26 def increment(self, value=1):
37 def __init__(self, verbose=False):
38 self.verbose = verbose
41 self.start = time.time()
44 def __exit__(self, *args):
45 self.end = time.time()
46 self.secs = self.end - self.start
47 self.msecs = self.secs * 1000 # millisecs
49 print("elapsed time: %f ms" % self.msecs)
52 class FlowConfigBlaster(object):
53 putheaders = {"content-type": "application/json"}
54 getheaders = {"Accept": "application/json"}
57 "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0/flow/%d"
59 TBLURL = "restconf/config/opendaylight-inventory:nodes/node/openflow:%d/table/0"
60 INVURL = "restconf/operational/opendaylight-inventory:nodes"
65 # The "built-in" flow template
66 flow_mode_template = {
69 "hard-timeout": 65000,
70 "idle-timeout": 65000,
71 "cookie_mask": 4294967295,
72 "flow-name": "FLOW-NAME-TEMPLATE",
78 "id": "FLOW-ID-TEMPLATE",
80 "ipv4-destination": "0.0.0.0/32",
81 "ethernet-match": {"ethernet-type": {"type": 2048}},
88 "action": [{"drop-action": {}, "order": 0}]
97 class FcbStats(object):
99 FlowConfigBlaster Statistics: a class that stores and further processes
100 statistics collected by Blaster worker threads during their execution.
104 self.ok_rqst_rate = Counter(0.0)
105 self.total_rqst_rate = Counter(0.0)
106 self.ok_flow_rate = Counter(0.0)
107 self.total_flow_rate = Counter(0.0)
108 self.ok_rqsts = Counter(0)
109 self.total_rqsts = Counter(0)
110 self.ok_flows = Counter(0)
111 self.total_flows = Counter(0)
113 def process_stats(self, rqst_stats, flow_stats, elapsed_time):
115 Calculates the stats for RESTCONF request and flow programming
116 throughput, and aggregates statistics across all Blaster threads.
119 rqst_stats: Request statistics dictionary
120 flow_stats: Flow statistcis dictionary
121 elapsed_time: Elapsed time for the test
123 Returns: Rates (requests/sec) for successfully finished requests,
124 the total number of requests, sucessfully installed flow and
125 the total number of flows
127 ok_rqsts = rqst_stats[200] + rqst_stats[204]
128 total_rqsts = sum(rqst_stats.values())
129 ok_flows = flow_stats[200] + flow_stats[204]
130 total_flows = sum(flow_stats.values())
132 ok_rqst_rate = ok_rqsts / elapsed_time
133 total_rqst_rate = total_rqsts / elapsed_time
134 ok_flow_rate = ok_flows / elapsed_time
135 total_flow_rate = total_flows / elapsed_time
137 self.ok_rqsts.increment(ok_rqsts)
138 self.total_rqsts.increment(total_rqsts)
139 self.ok_flows.increment(ok_flows)
140 self.total_flows.increment(total_flows)
142 self.ok_rqst_rate.increment(ok_rqst_rate)
143 self.total_rqst_rate.increment(total_rqst_rate)
144 self.ok_flow_rate.increment(ok_flow_rate)
145 self.total_flow_rate.increment(total_flow_rate)
147 return ok_rqst_rate, total_rqst_rate, ok_flow_rate, total_flow_rate
149 def get_ok_rqst_rate(self):
150 return self.ok_rqst_rate.value
152 def get_total_rqst_rate(self):
153 return self.total_rqst_rate.value
155 def get_ok_flow_rate(self):
156 return self.ok_flow_rate.value
158 def get_total_flow_rate(self):
159 return self.total_flow_rate.value
161 def get_ok_rqsts(self):
162 return self.ok_rqsts.value
164 def get_total_rqsts(self):
165 return self.total_rqsts.value
167 def get_ok_flows(self):
168 return self.ok_flows.value
170 def get_total_flows(self):
171 return self.total_flows.value
184 flow_mod_template=None,
188 self.ncycles = ncycles
189 self.nthreads = nthreads
193 self.startflow = startflow
196 if flow_mod_template:
197 self.flow_mode_template = flow_mod_template
199 self.post_url_template = "http://%s:" + self.port + "/" + self.TBLURL
200 self.del_url_template = "http://%s:" + self.port + "/" + self.FLWURL
202 self.stats = self.FcbStats()
203 self.total_ok_flows = 0
204 self.total_ok_rqsts = 0
206 self.ip_addr = Counter(int(netaddr.IPAddress("10.0.0.1")) + startflow)
208 self.print_lock = threading.Lock()
209 self.cond = threading.Condition()
210 self.threads_done = 0
212 for i in range(self.nthreads):
215 def get_num_nodes(self, session):
217 Determines the number of OF nodes in the connected mininet network. If
218 mininet is not connected, the default number of flows is set to 16.
219 :param session: 'requests' session which to use to query the controller
223 hosts = self.host.split(",")
225 inventory_url = "http://" + host + ":" + self.port + "/" + self.INVURL
231 headers=self.getheaders,
233 timeout=self.TIMEOUT,
238 headers=self.getheaders,
240 auth=("admin", "admin"),
241 timeout=self.TIMEOUT,
244 if r.status_code == 200:
246 inv = json.loads(r.content)["nodes"]["node"]
248 for n in range(len(inv)):
249 if re.search("openflow", inv[n]["id"]) is not None:
258 def create_flow_from_template(self, flow_id, ipaddr, node_id):
260 Create a new flow instance from the flow template specified during
261 FlowConfigBlaster instantiation. Flow templates are json-compatible
262 dictionaries that MUST contain elements for flow cookie, flow name,
263 flow id and the destination IPv4 address in the flow match field.
266 flow_id: Id for the new flow to create
267 ipaddr: IP Address to put into the flow's match
268 node_id: ID of the node where to create the flow
270 Returns: The flow that gas been created from the template
273 flow = copy.deepcopy(self.flow_mode_template["flow"][0])
274 flow["cookie"] = flow_id
275 flow["flow-name"] = self.create_flow_name(flow_id)
276 flow["id"] = str(flow_id)
277 flow["match"]["ipv4-destination"] = "%s/32" % str(netaddr.IPAddress(ipaddr))
280 def post_flows(self, session, node, flow_list, flow_count):
282 Performs a RESTCONF post of flows passed in the 'flow_list' parameters
283 :param session: 'requests' session on which to perform the POST
284 :param node: The ID of the openflow node to which to post the flows
285 :param flow_list: List of flows (in dictionary form) to POST
286 :param flow_count: Flow counter for round-robin host load balancing
288 :return: status code from the POST operation
290 flow_data = self.convert_to_json(flow_list, node)
292 hosts = self.host.split(",")
293 host = hosts[flow_count % len(hosts)]
294 flow_url = self.assemble_post_url(host, node)
300 headers=self.putheaders,
302 timeout=self.TIMEOUT,
308 headers=self.putheaders,
310 auth=("admin", "admin"),
311 timeout=self.TIMEOUT,
316 def assemble_post_url(self, host, node):
318 Creates url pointing to config dataStore: /nodes/node/<node-id>/table/<table-id>
319 :param host: ip address or host name pointing to controller
320 :param node: id of node (without protocol prefix and colon)
321 :return: url suitable for sending a flow to controller via POST method
323 return self.post_url_template % (host, node)
325 def convert_to_json(self, flow_list, node_id=None):
327 Dumps flows to json form.
328 :param flow_list: list of flows in json friendly structure
329 :param node_id: node identifier of corresponding node
330 :return: string containing plain json
332 fmod = dict(self.flow_mode_template)
333 fmod["flow"] = flow_list
334 flow_data = json.dumps(fmod)
337 def add_flows(self, start_flow_id, tid):
339 Adds flows into the ODL config data store. This function is executed by
340 a worker thread (the Blaster thread). The number of flows created and
341 the batch size (i.e. how many flows will be put into a RESTCONF request)
342 are determined by control parameters initialized when FlowConfigBlaster
344 :param start_flow_id - the ID of the first flow. Each Blaster thread
345 programs a different set of flows
346 :param tid: Thread ID - used to id the Blaster thread when statistics
347 for the thread are printed out
350 rqst_stats = {200: 0, 204: 0}
351 flow_stats = {200: 0, 204: 0}
353 s = requests.Session()
355 n_nodes = self.get_num_nodes(s)
357 with self.print_lock:
359 " Thread %d:\n Adding %d flows on %d nodes"
360 % (tid, self.nflows, n_nodes)
365 while nflows < self.nflows:
366 node_id = randrange(1, n_nodes + 1)
368 for i in range(self.fpr):
370 tid * (self.ncycles * self.nflows)
375 self.flows[tid][flow_id] = node_id
377 self.create_flow_from_template(
378 flow_id, self.ip_addr.increment(), node_id
382 if nflows >= self.nflows:
384 nb_actions.append((s, node_id, flow_list, nflows))
387 for nb_action in nb_actions:
388 sts = self.post_flows(*nb_action)
391 flow_stats[sts] += len(nb_action[2])
394 flow_stats[sts] = len(nb_action[2])
396 ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(
397 rqst_stats, flow_stats, t.secs
400 with self.print_lock:
401 print("\n Thread %d results (ADD): " % tid)
402 print(" Elapsed time: %.2fs," % t.secs)
403 print(" Requests/s: %.2f OK, %.2f Total" % (ok_rps, total_rps))
404 print(" Flows/s: %.2f OK, %.2f Total" % (ok_fps, total_fps))
405 print(" Stats ({Requests}, {Flows}): ")
408 self.threads_done += 1
413 self.cond.notifyAll()
415 def delete_flow(self, session, node, flow_id, flow_count):
417 Deletes a single flow from the ODL config data store using RESTCONF
419 session: 'requests' session on which to perform the POST
420 node: Id of the openflow node from which to delete the flow
421 flow_id: ID of the to-be-deleted flow
422 flow_count: Index of the flow being processed (for round-robin LB)
424 Returns: status code from the DELETE operation
428 hosts = self.host.split(",")
429 host = hosts[flow_count % len(hosts)]
430 flow_url = self.del_url_template % (host, node, flow_id)
433 r = session.delete(flow_url, headers=self.getheaders, timeout=self.TIMEOUT)
437 headers=self.getheaders,
438 auth=("admin", "admin"),
439 timeout=self.TIMEOUT,
444 def delete_flows(self, start_flow, tid):
446 Deletes flow from the ODL config space that have been added using the
447 'add_flows()' function. This function is executed by a worker thread
448 :param start_flow - the ID of the first flow. Each Blaster thread
449 deletes a different set of flows
450 :param tid: Thread ID - used to id the Blaster thread when statistics
451 for the thread are printed out
455 rqst_stats = {200: 0, 204: 0}
457 s = requests.Session()
458 n_nodes = self.get_num_nodes(s)
460 with self.print_lock:
462 "Thread %d: Deleting %d flows on %d nodes" % (tid, self.nflows, n_nodes)
466 for flow in range(self.nflows):
468 tid * (self.ncycles * self.nflows)
473 sts = self.delete_flow(s, self.flows[tid][flow_id], flow_id, flow)
479 ok_rps, total_rps, ok_fps, total_fps = self.stats.process_stats(
480 rqst_stats, rqst_stats, t.secs
483 with self.print_lock:
484 print("\n Thread %d results (DELETE): " % tid)
485 print(" Elapsed time: %.2fs," % t.secs)
486 print(" Requests/s: %.2f OK, %.2f Total" % (ok_rps, total_rps))
487 print(" Flows/s: %.2f OK, %.2f Total" % (ok_fps, total_fps))
488 print(" Stats ({Requests})")
490 self.threads_done += 1
495 self.cond.notifyAll()
497 def run_cycle(self, function):
499 Runs a flow-add or flow-delete test cycle. Each test consists of a
500 <cycles> test cycles, where <threads> worker (Blaster) threads are
501 started in each test cycle. Each Blaster thread programs <flows>
502 OpenFlow flows into the controller using the controller's RESTCONF API.
503 :param function: Add or delete, determines what test will be executed.
506 self.total_ok_flows = 0
507 self.total_ok_rqsts = 0
509 for c in range(self.ncycles):
510 self.stats = self.FcbStats()
511 with self.print_lock:
512 print("\nCycle %d:" % c)
515 for i in range(self.nthreads):
516 t = threading.Thread(target=function, args=(c * self.nflows, i))
520 # Wait for all threads to finish and measure the execution time
522 for thread in threads:
525 with self.print_lock:
526 print("\n*** Test summary:")
527 print(" Elapsed time: %.2fs" % t.secs)
529 " Peak requests/s: %.2f OK, %.2f Total"
530 % (self.stats.get_ok_rqst_rate(), self.stats.get_total_rqst_rate())
533 " Peak flows/s: %.2f OK, %.2f Total"
534 % (self.stats.get_ok_flow_rate(), self.stats.get_total_flow_rate())
537 " Avg. requests/s: %.2f OK, %.2f Total (%.2f%% of peak total)"
539 self.stats.get_ok_rqsts() / t.secs,
540 self.stats.get_total_rqsts() / t.secs,
541 (self.stats.get_total_rqsts() / t.secs * 100)
542 / self.stats.get_total_rqst_rate(),
546 " Avg. flows/s: %.2f OK, %.2f Total (%.2f%% of peak total)"
548 self.stats.get_ok_flows() / t.secs,
549 self.stats.get_total_flows() / t.secs,
550 (self.stats.get_total_flows() / t.secs * 100)
551 / self.stats.get_total_flow_rate(),
555 self.total_ok_flows += self.stats.get_ok_flows()
556 self.total_ok_rqsts += self.stats.get_ok_rqsts()
557 self.threads_done = 0
559 def add_blaster(self):
560 self.run_cycle(self.add_flows)
562 def delete_blaster(self):
563 self.run_cycle(self.delete_flows)
565 def get_ok_flows(self):
566 return self.total_ok_flows
568 def get_ok_rqsts(self):
569 return self.total_ok_rqsts
571 def create_flow_name(self, flow_id):
572 return "TestFlow-%d" % flow_id
575 def get_json_from_file(filename):
577 Get a flow programming template from a file
578 :param filename: File from which to get the template
579 :return: The json flow template (string)
581 with open(filename, "r") as f:
584 keys = ft["flow"][0].keys()
587 and ("flow-name" in keys)
589 and ("match" in keys)
591 if "ipv4-destination" in ft["flow"][0]["match"].keys():
592 print('File "%s" ok to use as flow template' % filename)
595 print("JSON parsing of file %s failed" % filename)
601 ###############################################################################
602 # This is an example of what the content of a JSON flow mode template should
603 # look like. Cut & paste to create a custom template. "id" and "ipv4-destination"
604 # MUST be unique if multiple flows will be programmed in the same test. It's
605 # also beneficial to have unique "cookie" and "flow-name" attributes for easier
606 # identification of the flow.
607 ###############################################################################
608 example_flow_mod_json = """{
628 "hard-timeout": 65000,
635 "ipv4-destination": "10.0.0.38/32"
637 "flow-name": "TestFlow-8",
639 "cookie_mask": 4294967295,
642 "idle-timeout": 65000,
650 def create_arguments_parser():
652 Shorthand to arg parser on library level in order to access and eventually enhance in ancestors.
653 :return: argument parser supporting config blaster arguments and parameters
655 my_parser = argparse.ArgumentParser(
656 description="Flow programming performance test: First adds and then"
657 " deletes flows into the config tree, as specified by"
658 " optional parameters."
661 my_parser.add_argument(
664 help="Host where odl controller is running (default is 127.0.0.1). "
665 "Specify a comma-separated list of hosts to perform round-robin load-balancing.",
667 my_parser.add_argument(
670 help="Port on which odl's RESTCONF is listening (default is 8181)",
672 my_parser.add_argument(
676 help="Number of flow add/delete cycles; default 1. Both Flow Adds and Flow Deletes are "
677 "performed in cycles. <THREADS> worker threads are started in each cycle and the cycle "
678 "ends when all threads finish. Another cycle is started when the previous cycle "
681 my_parser.add_argument(
685 help="Number of request worker threads to start in each cycle; default=1. "
686 "Each thread will add/delete <FLOWS> flows.",
688 my_parser.add_argument(
692 help="Number of flows that will be added/deleted by each worker thread in each cycle; "
695 my_parser.add_argument(
699 help="Flows-per-Request - number of flows (batch size) sent in each HTTP request; "
702 my_parser.add_argument(
706 help="Number of nodes if mininet is not connected; default=16. If mininet is connected, "
707 "flows will be evenly distributed (programmed) into connected nodes.",
709 my_parser.add_argument(
713 help="Time (in seconds) to wait between the add and delete cycles; default=0",
715 my_parser.add_argument(
720 help="Delete all added flows one by one, benchmark delete " "performance.",
722 my_parser.add_argument(
725 action="store_false",
726 help="Do not perform the delete cycle.",
728 my_parser.add_argument(
733 help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
734 "default: no authentication",
736 my_parser.add_argument(
737 "--startflow", type=int, default=0, help="The starting Flow ID; default=0"
739 my_parser.add_argument(
742 help="File from which to read the JSON flow template; default: no file, use a built in "
748 if __name__ == "__main__":
749 ############################################################################
750 # This program executes the base performance test. The test adds flows into
751 # the controller's config space. This function is basically the CLI frontend
752 # to the FlowConfigBlaster class and drives its main functions: adding and
753 # deleting flows from the controller's config data store
754 ############################################################################
756 parser = create_arguments_parser()
757 in_args = parser.parse_args()
759 if in_args.file != "":
760 flow_template = get_json_from_file(in_args.file)
764 fct = FlowConfigBlaster(
776 # Run through <cycles>, where <threads> are started in each cycle and
777 # <flows> are added from each thread
780 print("\n*** Total flows added: %s" % fct.get_ok_flows())
781 print(" HTTP[OK] results: %d\n" % fct.get_ok_rqsts())
783 if in_args.delay > 0:
785 "*** Waiting for %d seconds before the delete cycle ***\n" % in_args.delay
787 time.sleep(in_args.delay)
789 # Run through <cycles>, where <threads> are started in each cycle and
790 # <flows> previously added in an add cycle are deleted in each thread
793 print("\n*** Total flows deleted: %s" % fct.get_ok_flows())
794 print(" HTTP[OK] results: %d\n" % fct.get_ok_rqsts())