2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
18 "id": "to be replaced",
19 "category": "my_category",
20 "model": "to be replaced",
21 "manufacturer": "my_manufacturer",
27 _template_add_people_rpc = {
30 "people:id": "to be replaced",
31 "people:gender": "male",
33 "people:address": "to be replaced",
34 "people:contactNo": "to be replaced"
39 _template_add_cp_rpc = {
41 "car-purchase:person": "to be replaced",
42 "car-purchase:person-id": "to be replaced",
43 "car-purchase:car-id": "to be replaced"
48 def _build_url(odl_ip, port, uri):
49 """Compose URL from generic IP, port and URI fragment.
52 :param odl_ip: controller's ip address or hostname
54 :param port: controller's restconf port
56 :param uri: URI without /restconf/ to complete URL
59 :returns url: full restconf url corresponding to params
62 url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
66 def _build_post(odl_ip, port, uri, python_data, auth):
67 """Create a POST http request with generic on URI and data.
70 :param odl_ip: controller's ip address or hostname
72 :param port: controller's restconf port
74 :param uri: URI without /restconf/ to complete URL
76 :param python_data: python object to serialize into textual data
78 :param auth: authentication credentials
81 :returns http request object
84 url = _build_url(odl_ip, port, uri)
85 text_data = json.dumps(python_data)
86 header = {"Content-Type": "application/json"}
87 req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
91 def _prepare_add_car(odl_ip, port, item_list, auth):
92 """Creates a POST http requests to configure a car item in configuration datastore.
95 :param odl_ip: controller's ip address or hostname
97 :param port: controller's restconf port
99 :param item_list: controller item's list contains a list of ids of the cars
101 :param auth: authentication credentials
104 :returns req: http request object
107 container = {"car-entry": []}
108 for item in item_list:
109 entry = copy.deepcopy(_template_add_car["car-entry"][0])
111 entry["model"] = "model" + str(item)
112 container["car-entry"].append(entry)
113 req = _build_post(odl_ip, port, "config/car:cars", container, auth)
117 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
118 """Creates a POST http requests to configure people in configuration datastore.
121 :param odl_ip: controller's ip address or hostname
123 :param port: controller's restconf port
125 :param item_list: controller item's list contains a list of ids of the people
127 :param auth: authentication credentials
130 :returns req: http request object
133 container = {"input": {}}
135 entry = container["input"]
136 entry["people:id"] = str(item)
137 entry["people:address"] = "address" + str(item)
138 entry["people:contactNo"] = str(item)
139 container["input"] = entry
140 req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
144 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
145 """Creates a POST http requests to purchase cars using an rpc.
148 :param odl_ip: controller's ip address or hostname
150 :param port: controller's restconf port
152 :param item_list: controller item's list contains a list of ids of the people
153 only the first item is considered
155 :param auth: authentication credentials
158 :returns req: http request object
161 container = {"input": {}}
163 entry = container["input"]
164 entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
165 entry["car-purchase:person-id"] = str(item)
166 entry["car-purchase:car-id"] = str(item)
167 container["input"] = entry
168 req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
172 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
173 exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None,
174 req_timeout=60, retry_timeout=15, retry_rcs=[]):
175 """The funcion sends http requests.
177 Runs in the working thread. It reads out flow details from the queue and
178 sends apropriate http requests to the controller
181 :param thread_id: thread id
183 :param preparing_function: function to prepare the http request
185 :param in_queue: input queue, flow details are comming from here
187 :param exit_event: event to notify working thread that the parent
188 (task executor) stopped filling the input queue
190 :param odl_ip: ip address of ODL; default="127.0.0.1"
192 :param port: restconf port; default="8181"
194 :param out_queue: queue where the results should be put
196 :param req_timeout: http request timeout
198 :param retry_timeout: timout to give up retry attempts to send http requests
200 :param retry_rcs: list of return codes when retry should be performed
203 None (results is put into the output queue)
206 ses = requests.Session()
207 counter = [0 for i in range(600)]
211 item_list = in_queue.get(timeout=1)
213 if exit_event.is_set() and in_queue.empty():
216 req = preparing_function(odl_ip, port, item_list, auth)
218 start_time = time_now = time.time()
219 while start_time + retry_timeout > time_now:
221 rsp = ses.send(prep, timeout=req_timeout)
222 except requests.exceptions.Timeout:
224 logger.error("No response from %s", odl_ip)
227 logger.debug("%s %s", rsp.request, rsp.request.url)
228 logger.debug("Headers %s:", rsp.request.headers)
229 logger.debug("Body: %s", rsp.request.body)
230 logger.debug("Response: %s", rsp.text)
231 logger.debug("%s %s", rsp, rsp.reason)
232 counter[rsp.status_code] += 1
234 if rc not in retry_rcs:
236 time_now = time.time()
238 for response_code, count in enumerate(counter):
240 responses[response_code] = count
241 out_queue.put(responses)
242 logger.info("Response code(s) got per number of requests: %s", responses)
245 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
246 thread_count=1, item_count=1, items_per_request=1,
247 auth=('admin', 'admin'), req_timeout=600, retry_timeout=15, retry_rcs=[]):
248 """The main function which drives sending of http requests.
250 Creates 2 queues and requested number of "working threads".
251 One queue is filled with flow details and working
252 threads read them out and send http requests.
253 The other queue is for sending results from working threads back.
254 After the threads' join, it produces a summary result.
257 :param preparing_function: function to prepare http request object
259 :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
261 :param port: restconf port; default="8181"
263 :param thread_count: number of threads used to send http requests; default=1
265 :param items_per_request: items per request, number of items sent in one http request
267 :param item_countpr: number of items to be sent in total
269 :param auth: authentication credentials
271 :param req_timeout: http request timeout
273 :param retry_timeout: timout to give up retry attempts to send http requests
275 :param retry_rcs: list of return codes when retry should be performed
278 :returns dict: dictionary of http response counts like
279 {"http_status_code1: "count1", etc.}
283 hosts = odl_ip.split(',')
286 items = [i + 1 for i in range(item_count)]
288 for i in range(0, item_count, items_per_request):
289 item_groups.append(items[i:i + items_per_request])
291 # fill the queue with details needed for one http requests
292 send_queue = Queue.Queue()
293 for item_list in item_groups:
294 send_queue.put(item_list)
296 # create an empty result queue
297 result_queue = Queue.Queue()
299 exit_event = threading.Event()
301 # start threads to read details from queues and to send http requests
303 for i in range(int(thread_count)):
304 thr = threading.Thread(target=_request_sender,
305 args=(i, preparing_function, auth),
306 kwargs={"in_queue": send_queue, "exit_event": exit_event,
307 "odl_ip": hosts[i % nrhosts], "port": port,
308 "out_queue": result_queue, "req_timeout": req_timeout,
309 "retry_timeout": retry_timeout, "retry_rcs": retry_rcs})
316 # wait for reqults and sum them up
319 # read partial resutls from sender thread
320 part_result = result_queue.get()
321 for k, v in part_result.iteritems():
329 def _build_delete(odl_ip, port, uri):
330 """Send DELETE to generic URI, assert status code is 200.
333 :param odl_ip: ip address of ODL
335 :param port: restconf port
337 :param uri: URI without /restconf/ to complete URL
343 Raise AssertionError if response status code != 200
346 url = _build_url(odl_ip, port, uri)
347 rsp = requests.delete(url, auth=auth)
348 logger.debug("%s %s", rsp.request, rsp.request.url)
349 logger.debug("Headers %s:", rsp.request.headers)
350 logger.debug("Body: %s", rsp.request.body)
351 logger.debug("Response: %s", rsp.text)
352 logger.info("%s %s", rsp, rsp.reason)
353 assert rsp.status_code == 200, rsp.text
356 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
357 """Delete cars container from config datastore, assert success.
360 :param odl_ip: ip address of ODL
362 :param port: restconf port
364 :param thread_count: ignored; only 1 thread needed
366 :param item_count: ignored; whole container is deleted
368 :param auth: authentication credentials
370 :param items_per_request: ignored; only 1 request needed
376 logger.info("Delete all cars from %s:%s", odl_ip, port)
377 _build_delete(odl_ip, port, "config/car:cars")
380 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
381 """Delete people container from config datastore.
384 :param odl_ip: ip address of ODL
386 :param port: restconf port
388 :param thread_count: ignored; only 1 thread needed
390 :param item_count: ignored; whole container is deleted
392 :param auth: authentication credentials
394 :param items_per_request: ignored; only 1 request needed
400 logger.info("Delete all people from %s:%s", odl_ip, port)
401 _build_delete(odl_ip, port, "config/people:people")
404 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
405 """Delete car-people container from config datastore.
408 :param odl_ip: ip address of ODL
410 :param port: restconf port
412 :param thread_count: ignored; only 1 thread needed
414 :param item_count: ignored; whole container is deleted
416 :param auth: authentication credentials
418 :param items_per_request: ignored; only 1 request needed
424 logger.info("Delete all purchases from %s:%s", odl_ip, port)
425 _build_delete(odl_ip, port, "config/car-people:car-people")
428 def _build_get(odl_ip, port, uri):
429 """Send GET to generic URI.
432 :param odl_ip: ip address of ODL
434 :param port: restconf port
436 :param uri: URI without /restconf/ to complete URL
442 Raise AssertionError if response status code != 200
445 url = _build_url(odl_ip, port, uri)
446 rsp = requests.get(url, auth=auth)
447 logger.debug("%s %s", rsp.request, rsp.request.url)
448 logger.debug("Headers %s:", rsp.request.headers)
449 logger.debug("Body: %s", rsp.request.body)
450 logger.debug("Response: %s", rsp.text)
451 logger.info("%s %s", rsp, rsp.reason)
452 assert rsp.status_code == 200, rsp.text
455 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
456 """Reads car entries from config datastore.
458 TODO: some needed logic to be added handle http response in the future,
459 e.g. count items in response's content
462 :param odl_ip: ip address of ODL
464 :param port: restconf port
466 :param thread_count: ignored; only 1 thread needed
468 :param item_count: ignored; whole container is deleted
470 :param auth: authentication credentials
472 :param items_per_request: ignored; only 1 request needed
478 logger.info("Get all cars from %s:%s", odl_ip, port)
479 _build_get(odl_ip, port, "config/car:cars")
482 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
483 """Reads people entries from config datastore.
485 TODO: some needed logic to be added handle http response in the future,
486 e.g. count items in response's content
489 :param odl_ip: ip address of ODL
491 :param port: restconf port
493 :param thread_count: ignored; only 1 thread needed
495 :param item_count: ignored; whole container is deleted
497 :param auth: authentication credentials
499 :param items_per_request: ignored; only 1 request needed
505 logger.info("Get all people from %s:%s", odl_ip, port)
506 _build_get(odl_ip, port, "config/people:people")
509 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
510 """Reads car-people entries from config datastore.
512 TODO: some needed logic to be added handle http response in the future,
513 e.g. count items in response's content
516 :param odl_ip: ip address of ODL
518 :param port: restconf port
520 :param thread_count: ignored; only 1 thread needed
522 :param item_count: ignored; whole container is deleted
524 :param auth: authentication credentials
526 :param items_per_request: ignored; only 1 request needed
532 logger.info("Get all purchases from %s:%s", odl_ip, port)
533 _build_get(odl_ip, port, "config/car-people:car-people")
536 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
537 """Configure car entries to the config datastore.
540 :param odl_ip: ip address of ODL
542 :param port: restconf port
544 :param thread_count: number of threads used to send http requests; default=1
546 :param item_count: number of items to be configured
548 :param auth: authentication credentials
550 :param items_per_request: items per request, not used here,
551 just to keep the same api
557 logger.info("Add %s car(s) to %s:%s (%s per request)",
558 item_count, odl_ip, port, items_per_request)
559 res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
560 thread_count=thread_count, item_count=item_count,
561 items_per_request=items_per_request, auth=auth)
562 if res.keys() != [204]:
563 logger.error("Not all cars were configured: " + repr(res))
564 raise Exception("Not all cars were configured: " + repr(res))
567 def add_car_with_retries(odl_ip, port, thread_count, item_count, auth, items_per_request):
568 """Configure car entries to the config datastore.
571 :param odl_ip: ip address of ODL
573 :param port: restconf port
575 :param thread_count: number of threads used to send http requests; default=1
577 :param item_count: number of items to be configured
579 :param auth: authentication credentials
581 :param items_per_request: items per request, not used here,
582 just to keep the same api
588 logger.info("Add %s car(s) to %s:%s (%s per request)",
589 item_count, odl_ip, port, items_per_request)
590 retry_rcs = [401, 404, 503]
591 res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
592 thread_count=thread_count, item_count=item_count,
593 items_per_request=items_per_request, auth=auth,
594 req_timeout=15, retry_timeout=30, retry_rcs=retry_rcs)
595 acceptable_rcs = [204] + retry_rcs
596 for key in res.keys():
597 if key not in acceptable_rcs:
598 logger.error("Problems during cars' configuration appeared: " + repr(res))
599 raise Exception("Problems during cars' configuration appeared: " + repr(res))
602 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
603 """Configure people entries to the config datastore.
606 :param odl_ip: ip address of ODL; default="127.0.0.1"
608 :param port: restconf port; default="8181"
610 :param thread_count: number of threads used to send http requests; default=1
612 :param item_count: number of items to be condigured
614 :param auth: authentication credentials
616 :param items_per_request: items per request, not used here,
617 just to keep the same api
623 logger.info("Add %s people to %s:%s (%s per request)",
624 item_count, odl_ip, port, items_per_request)
625 if items_per_request != 1:
626 logger.error("Only 1 item per request is supported, " +
627 "you specified: {0}".format(item_count))
628 raise NotImplementedError("Only 1 item per request is supported, " +
629 "you specified: {0}".format(item_count))
630 res = _task_executor(_prepare_add_people_rpc, odl_ip=odl_ip, port=port,
631 thread_count=thread_count, item_count=item_count,
632 items_per_request=items_per_request, auth=auth)
633 if res.keys() != [200]:
634 logger.error("Not all people were configured: " + repr(res))
635 raise Exception("Not all people were configured: " + repr(res))
638 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
640 """Configure car-people entries to the config datastore one by one using rpc
643 :param odl_ip: ip address of ODL; default="127.0.0.1"
645 :param port: restconf port; default="8181"
647 :param thread_count: number of threads used to send http requests; default=1
649 :param item_count: number of items to be condigured
651 :param auth: authentication credentials
653 :param items_per_request: items per request, not used here,
654 just to keep the same api
660 logger.info("Add %s purchase(s) to %s:%s (%s per request)",
661 item_count, odl_ip, port, items_per_request)
662 if items_per_request != 1:
663 logger.error("Only 1 item per request is supported, " +
664 "you specified: {0}".format(item_count))
665 raise NotImplementedError("Only 1 item per request is supported, " +
666 "you specified: {0}".format(item_count))
668 res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
669 thread_count=thread_count, item_count=item_count,
670 items_per_request=items_per_request, auth=auth)
671 if res.keys() != [200]:
672 logger.error("Not all rpc calls passed: " + repr(res))
673 raise Exception("Not all rpc calls passed: " + repr(res))
676 _actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
677 _items = ["car", "people", "car-people"]
680 "add": {"car": add_car},
681 "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
682 "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
683 "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
684 "add-with-retries": {"car": add_car_with_retries},
688 if __name__ == "__main__":
690 This program executes requested action based in given parameters
692 It provides "car", "people" and "car-people" crud operations.
695 parser = argparse.ArgumentParser(description="Cluster datastore"
696 "performance test script")
697 parser.add_argument("--host", default="127.0.0.1",
698 help="Host where odl controller is running."
699 "Or comma separated list of hosts."
700 "(default is 127.0.0.1)")
701 parser.add_argument("--port", default="8181",
702 help="Port on which odl's RESTCONF is listening"
704 parser.add_argument("--threads", type=int, default=1,
705 help="Number of request worker threads to start in"
706 "each cycle (default=1)")
707 parser.add_argument("action", choices=_actions, metavar="action",
708 help="Action to be performed.")
709 parser.add_argument("--itemtype", choices=_items, default="car",
710 help="Flows-per-Request - number of flows (batch size)"
711 "sent in each HTTP request (default 1)")
712 parser.add_argument("--itemcount", type=int, help="Items per request",
714 parser.add_argument("--user", help="Restconf user name", default="admin")
715 parser.add_argument("--password", help="Restconf password", default="admin")
716 parser.add_argument("--ipr", type=int, help="Items per request", default=1)
717 parser.add_argument("--debug", dest="loglevel", action="store_const",
718 const=logging.DEBUG, default=logging.INFO,
719 help="Set log level to debug (default is error)")
721 args = parser.parse_args()
723 logger = logging.getLogger("logger")
724 log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
725 console_handler = logging.StreamHandler()
726 file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
727 console_handler.setFormatter(log_formatter)
728 file_handler.setFormatter(log_formatter)
729 logger.addHandler(console_handler)
730 logger.addHandler(file_handler)
731 logger.setLevel(args.loglevel)
733 auth = (args.user, args.password)
735 if (args.action not in _handler_matrix or
736 args.itemtype not in _handler_matrix[args.action]):
737 msg = "Unsupported combination of action: " + str(args.action)
738 msg += " and item: " + str(args.itemtype)
740 raise NotImplementedError(msg)
742 # TODO: need to filter out situations when we cannot use more items
743 # in one rest request (rpc or delete?)
744 # this should be done inside handler functions
746 handler_function = _handler_matrix[args.action][args.itemtype]
747 handler_function(args.host, args.port, args.threads,
748 args.itemcount, auth, args.ipr)