2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
18 "id": "to be replaced",
19 "category": "my_category",
20 "model": "to be replaced",
21 "manufacturer": "my_manufacturer",
27 _template_add_people_rpc = {
30 "people:id": "to be replaced",
31 "people:gender": "male",
33 "people:address": "to be replaced",
34 "people:contactNo": "to be replaced",
39 _template_add_cp_rpc = {
41 "car-purchase:person": "to be replaced",
42 "car-purchase:person-id": "to be replaced",
43 "car-purchase:car-id": "to be replaced",
48 def _build_url(odl_ip, port, uri):
49 """Compose URL from generic IP, port and URI fragment.
52 :param odl_ip: controller's ip address or hostname
54 :param port: controller's restconf port
56 :param uri: URI without /restconf/ to complete URL
59 :returns url: full restconf url corresponding to params
62 url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
66 def _build_post(odl_ip, port, uri, python_data, auth):
67 """Create a POST http request with generic on URI and data.
70 :param odl_ip: controller's ip address or hostname
72 :param port: controller's restconf port
74 :param uri: URI without /restconf/ to complete URL
76 :param python_data: python object to serialize into textual data
78 :param auth: authentication credentials
81 :returns http request object
84 url = _build_url(odl_ip, port, uri)
85 text_data = json.dumps(python_data)
86 header = {"Content-Type": "application/json"}
87 req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
91 def _prepare_add_car(odl_ip, port, item_list, auth):
92 """Creates a POST http requests to configure a car item in configuration datastore.
95 :param odl_ip: controller's ip address or hostname
97 :param port: controller's restconf port
99 :param item_list: controller item's list contains a list of ids of the cars
101 :param auth: authentication credentials
104 :returns req: http request object
107 container = {"car-entry": []}
108 for item in item_list:
109 entry = copy.deepcopy(_template_add_car["car-entry"][0])
111 entry["model"] = "model" + str(item)
112 container["car-entry"].append(entry)
113 req = _build_post(odl_ip, port, "config/car:cars", container, auth)
117 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
118 """Creates a POST http requests to configure people in configuration datastore.
121 :param odl_ip: controller's ip address or hostname
123 :param port: controller's restconf port
125 :param item_list: controller item's list contains a list of ids of the people
127 :param auth: authentication credentials
130 :returns req: http request object
133 container = {"input": {}}
135 entry = container["input"]
136 entry["people:id"] = str(item)
137 entry["people:address"] = "address" + str(item)
138 entry["people:contactNo"] = str(item)
139 container["input"] = entry
140 req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
144 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
145 """Creates a POST http requests to purchase cars using an rpc.
148 :param odl_ip: controller's ip address or hostname
150 :param port: controller's restconf port
152 :param item_list: controller item's list contains a list of ids of the people
153 only the first item is considered
155 :param auth: authentication credentials
158 :returns req: http request object
161 container = {"input": {}}
163 entry = container["input"]
164 entry["car-purchase:person"] = (
165 "/people:people/people:person[people:id='" + str(item) + "']"
167 entry["car-purchase:person-id"] = str(item)
168 entry["car-purchase:car-id"] = str(item)
169 container["input"] = entry
170 req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
187 """The funcion sends http requests.
189 Runs in the working thread. It reads out flow details from the queue and
190 sends apropriate http requests to the controller
193 :param thread_id: thread id
195 :param preparing_function: function to prepare the http request
197 :param in_queue: input queue, flow details are comming from here
199 :param exit_event: event to notify working thread that the parent
200 (task executor) stopped filling the input queue
202 :param odl_ip: ip address of ODL; default="127.0.0.1"
204 :param port: restconf port; default="8181"
206 :param out_queue: queue where the results should be put
208 :param req_timeout: http request timeout
210 :param retry_timeout: timout to give up retry attempts to send http requests
212 :param retry_rcs: list of return codes when retry should be performed
215 None (results is put into the output queue)
218 ses = requests.Session()
219 counter = [0 for i in range(600)]
223 item_list = in_queue.get(timeout=1)
225 if exit_event.is_set() and in_queue.empty():
228 req = preparing_function(odl_ip, port, item_list, auth)
230 start_time = time_now = time.time()
231 while start_time + retry_timeout > time_now:
233 rsp = ses.send(prep, timeout=req_timeout)
234 except requests.exceptions.Timeout:
236 logger.error("No response from %s", odl_ip)
239 counter[rsp.status_code] += 1
241 lvl = logging.INFO if rc > 299 else logging.DEBUG
244 "Request started at {} finished with following detais".format(
245 time.ctime(start_time)
248 logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
249 logger.log(lvl, "Headers %s:", rsp.request.headers)
250 logger.log(lvl, "Body: %s", rsp.request.body)
251 logger.log(lvl, "Response: %s", rsp.text)
252 logger.log(lvl, "%s %s", rsp, rsp.reason)
253 if rc not in retry_rcs:
255 time_now = time.time()
257 for response_code, count in enumerate(counter):
259 responses[response_code] = count
260 out_queue.put(responses)
261 logger.info("Response code(s) got per number of requests: %s", responses)
271 auth=("admin", "admin"),
276 """The main function which drives sending of http requests.
278 Creates 2 queues and requested number of "working threads".
279 One queue is filled with flow details and working
280 threads read them out and send http requests.
281 The other queue is for sending results from working threads back.
282 After the threads' join, it produces a summary result.
285 :param preparing_function: function to prepare http request object
287 :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
289 :param port: restconf port; default="8181"
291 :param thread_count: number of threads used to send http requests; default=1
293 :param items_per_request: items per request, number of items sent in one http request
295 :param item_countpr: number of items to be sent in total
297 :param auth: authentication credentials
299 :param req_timeout: http request timeout
301 :param retry_timeout: timout to give up retry attempts to send http requests
303 :param retry_rcs: list of return codes when retry should be performed
306 :returns dict: dictionary of http response counts like
307 {"http_status_code1: "count1", etc.}
311 hosts = odl_ip.split(",")
314 items = [i + 1 for i in range(item_count)]
316 for i in range(0, item_count, items_per_request):
317 item_groups.append(items[i : i + items_per_request])
319 # fill the queue with details needed for one http requests
320 send_queue = Queue.Queue()
321 for item_list in item_groups:
322 send_queue.put(item_list)
324 # create an empty result queue
325 result_queue = Queue.Queue()
327 exit_event = threading.Event()
329 # start threads to read details from queues and to send http requests
331 for i in range(int(thread_count)):
332 thr = threading.Thread(
333 target=_request_sender,
334 args=(i, preparing_function, auth),
336 "in_queue": send_queue,
337 "exit_event": exit_event,
338 "odl_ip": hosts[i % nrhosts],
340 "out_queue": result_queue,
341 "req_timeout": req_timeout,
342 "retry_timeout": retry_timeout,
343 "retry_rcs": retry_rcs,
352 # wait for reqults and sum them up
355 # read partial resutls from sender thread
356 part_result = result_queue.get()
357 for k, v in part_result.iteritems():
365 def _build_delete(odl_ip, port, uri):
366 """Send DELETE to generic URI, assert status code is 200.
369 :param odl_ip: ip address of ODL
371 :param port: restconf port
373 :param uri: URI without /restconf/ to complete URL
379 Raise AssertionError if response status code != 200
382 url = _build_url(odl_ip, port, uri)
383 rsp = requests.delete(url, auth=auth)
384 logger.debug("%s %s", rsp.request, rsp.request.url)
385 logger.debug("Headers %s:", rsp.request.headers)
386 logger.debug("Body: %s", rsp.request.body)
387 logger.debug("Response: %s", rsp.text)
388 logger.info("%s %s", rsp, rsp.reason)
389 assert rsp.status_code == 200, rsp.text
392 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
393 """Delete cars container from config datastore, assert success.
396 :param odl_ip: ip address of ODL
398 :param port: restconf port
400 :param thread_count: ignored; only 1 thread needed
402 :param item_count: ignored; whole container is deleted
404 :param auth: authentication credentials
406 :param items_per_request: ignored; only 1 request needed
412 logger.info("Delete all cars from %s:%s", odl_ip, port)
413 _build_delete(odl_ip, port, "config/car:cars")
416 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
417 """Delete people container from config datastore.
420 :param odl_ip: ip address of ODL
422 :param port: restconf port
424 :param thread_count: ignored; only 1 thread needed
426 :param item_count: ignored; whole container is deleted
428 :param auth: authentication credentials
430 :param items_per_request: ignored; only 1 request needed
436 logger.info("Delete all people from %s:%s", odl_ip, port)
437 _build_delete(odl_ip, port, "config/people:people")
440 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
441 """Delete car-people container from config datastore.
444 :param odl_ip: ip address of ODL
446 :param port: restconf port
448 :param thread_count: ignored; only 1 thread needed
450 :param item_count: ignored; whole container is deleted
452 :param auth: authentication credentials
454 :param items_per_request: ignored; only 1 request needed
460 logger.info("Delete all purchases from %s:%s", odl_ip, port)
461 _build_delete(odl_ip, port, "config/car-people:car-people")
464 def _build_get(odl_ip, port, uri):
465 """Send GET to generic URI.
468 :param odl_ip: ip address of ODL
470 :param port: restconf port
472 :param uri: URI without /restconf/ to complete URL
478 Raise AssertionError if response status code != 200
481 url = _build_url(odl_ip, port, uri)
482 rsp = requests.get(url, auth=auth)
483 logger.debug("%s %s", rsp.request, rsp.request.url)
484 logger.debug("Headers %s:", rsp.request.headers)
485 logger.debug("Body: %s", rsp.request.body)
486 logger.debug("Response: %s", rsp.text)
487 logger.info("%s %s", rsp, rsp.reason)
488 assert rsp.status_code == 200, rsp.text
491 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
492 """Reads car entries from config datastore.
494 TODO: some needed logic to be added handle http response in the future,
495 e.g. count items in response's content
498 :param odl_ip: ip address of ODL
500 :param port: restconf port
502 :param thread_count: ignored; only 1 thread needed
504 :param item_count: ignored; whole container is deleted
506 :param auth: authentication credentials
508 :param items_per_request: ignored; only 1 request needed
514 logger.info("Get all cars from %s:%s", odl_ip, port)
515 _build_get(odl_ip, port, "config/car:cars")
518 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
519 """Reads people entries from config datastore.
521 TODO: some needed logic to be added handle http response in the future,
522 e.g. count items in response's content
525 :param odl_ip: ip address of ODL
527 :param port: restconf port
529 :param thread_count: ignored; only 1 thread needed
531 :param item_count: ignored; whole container is deleted
533 :param auth: authentication credentials
535 :param items_per_request: ignored; only 1 request needed
541 logger.info("Get all people from %s:%s", odl_ip, port)
542 _build_get(odl_ip, port, "config/people:people")
545 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
546 """Reads car-people entries from config datastore.
548 TODO: some needed logic to be added handle http response in the future,
549 e.g. count items in response's content
552 :param odl_ip: ip address of ODL
554 :param port: restconf port
556 :param thread_count: ignored; only 1 thread needed
558 :param item_count: ignored; whole container is deleted
560 :param auth: authentication credentials
562 :param items_per_request: ignored; only 1 request needed
568 logger.info("Get all purchases from %s:%s", odl_ip, port)
569 _build_get(odl_ip, port, "config/car-people:car-people")
572 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
573 """Configure car entries to the config datastore.
576 :param odl_ip: ip address of ODL
578 :param port: restconf port
580 :param thread_count: number of threads used to send http requests; default=1
582 :param item_count: number of items to be configured
584 :param auth: authentication credentials
586 :param items_per_request: items per request, not used here,
587 just to keep the same api
594 "Add %s car(s) to %s:%s (%s per request)",
600 res = _task_executor(
604 thread_count=thread_count,
605 item_count=item_count,
606 items_per_request=items_per_request,
609 if res.keys() != [204]:
610 logger.error("Not all cars were configured: " + repr(res))
611 raise Exception("Not all cars were configured: " + repr(res))
614 def add_car_with_retries(
615 odl_ip, port, thread_count, item_count, auth, items_per_request
617 """Configure car entries to the config datastore.
620 :param odl_ip: ip address of ODL
622 :param port: restconf port
624 :param thread_count: number of threads used to send http requests; default=1
626 :param item_count: number of items to be configured
628 :param auth: authentication credentials
630 :param items_per_request: items per request, not used here,
631 just to keep the same api
638 "Add %s car(s) to %s:%s (%s per request)",
644 retry_rcs = [401, 404, 500, 503]
645 res = _task_executor(
649 thread_count=thread_count,
650 item_count=item_count,
651 items_per_request=items_per_request,
657 acceptable_rcs = [204] + retry_rcs
658 for key in res.keys():
659 if key not in acceptable_rcs:
660 logger.error("Problems during cars' configuration appeared: " + repr(res))
662 "Problems during cars' configuration appeared: " + repr(res)
666 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
667 """Configure people entries to the config datastore.
670 :param odl_ip: ip address of ODL; default="127.0.0.1"
672 :param port: restconf port; default="8181"
674 :param thread_count: number of threads used to send http requests; default=1
676 :param item_count: number of items to be condigured
678 :param auth: authentication credentials
680 :param items_per_request: items per request, not used here,
681 just to keep the same api
688 "Add %s people to %s:%s (%s per request)",
694 if items_per_request != 1:
696 "Only 1 item per request is supported, "
697 + "you specified: {0}".format(item_count)
699 raise NotImplementedError(
700 "Only 1 item per request is supported, "
701 + "you specified: {0}".format(item_count)
703 res = _task_executor(
704 _prepare_add_people_rpc,
707 thread_count=thread_count,
708 item_count=item_count,
709 items_per_request=items_per_request,
712 if res.keys() != [200]:
713 logger.error("Not all people were configured: " + repr(res))
714 raise Exception("Not all people were configured: " + repr(res))
717 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
718 """Configure car-people entries to the config datastore one by one using rpc
721 :param odl_ip: ip address of ODL; default="127.0.0.1"
723 :param port: restconf port; default="8181"
725 :param thread_count: number of threads used to send http requests; default=1
727 :param item_count: number of items to be condigured
729 :param auth: authentication credentials
731 :param items_per_request: items per request, not used here,
732 just to keep the same api
739 "Add %s purchase(s) to %s:%s (%s per request)",
745 if items_per_request != 1:
747 "Only 1 item per request is supported, "
748 + "you specified: {0}".format(item_count)
750 raise NotImplementedError(
751 "Only 1 item per request is supported, "
752 + "you specified: {0}".format(item_count)
755 res = _task_executor(
756 _prepare_add_car_people_rpc,
759 thread_count=thread_count,
760 item_count=item_count,
761 items_per_request=items_per_request,
764 if res.keys() != [200]:
765 logger.error("Not all rpc calls passed: " + repr(res))
766 raise Exception("Not all rpc calls passed: " + repr(res))
769 _actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
770 _items = ["car", "people", "car-people"]
773 "add": {"car": add_car},
774 "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
777 "people": delete_people,
778 "car-people": delete_car_people,
780 "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
781 "add-with-retries": {"car": add_car_with_retries},
785 if __name__ == "__main__":
787 This program executes requested action based in given parameters
789 It provides "car", "people" and "car-people" crud operations.
792 parser = argparse.ArgumentParser(
793 description="Cluster datastore" "performance test script"
798 help="Host where odl controller is running."
799 "Or comma separated list of hosts."
800 "(default is 127.0.0.1)",
805 help="Port on which odl's RESTCONF is listening" "(default is 8181)",
811 help="Number of request worker threads to start in" "each cycle (default=1)",
814 "action", choices=_actions, metavar="action", help="Action to be performed."
820 help="Flows-per-Request - number of flows (batch size)"
821 "sent in each HTTP request (default 1)",
823 parser.add_argument("--itemcount", type=int, help="Items per request", default=1)
824 parser.add_argument("--user", help="Restconf user name", default="admin")
825 parser.add_argument("--password", help="Restconf password", default="admin")
826 parser.add_argument("--ipr", type=int, help="Items per request", default=1)
830 action="store_const",
832 default=logging.INFO,
833 help="Set log level to debug (default is error)",
836 args = parser.parse_args()
838 logger = logging.getLogger("logger")
839 log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
840 console_handler = logging.StreamHandler()
841 file_handler = logging.FileHandler("cluster_rest_script.log", mode="w")
842 console_handler.setFormatter(log_formatter)
843 file_handler.setFormatter(log_formatter)
844 logger.addHandler(console_handler)
845 logger.addHandler(file_handler)
846 logger.setLevel(args.loglevel)
848 auth = (args.user, args.password)
851 args.action not in _handler_matrix
852 or args.itemtype not in _handler_matrix[args.action]
854 msg = "Unsupported combination of action: " + str(args.action)
855 msg += " and item: " + str(args.itemtype)
857 raise NotImplementedError(msg)
859 # TODO: need to filter out situations when we cannot use more items
860 # in one rest request (rpc or delete?)
861 # this should be done inside handler functions
863 handler_function = _handler_matrix[args.action][args.itemtype]
864 handler_function(args.host, args.port, args.threads, args.itemcount, auth, args.ipr)