2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
18 "id": "to be replaced",
19 "category": "my_category",
20 "model": "to be replaced",
21 "manufacturer": "my_manufacturer",
27 _template_add_people_rpc = {
30 "people:id": "to be replaced",
31 "people:gender": "male",
33 "people:address": "to be replaced",
34 "people:contactNo": "to be replaced"
39 _template_add_cp_rpc = {
41 "car-purchase:person": "to be replaced",
42 "car-purchase:person-id": "to be replaced",
43 "car-purchase:car-id": "to be replaced"
48 def _build_url(odl_ip, port, uri):
49 """Compose URL from generic IP, port and URI fragment.
52 :param odl_ip: controller's ip address or hostname
54 :param port: controller's restconf port
56 :param uri: URI without /restconf/ to complete URL
59 :returns url: full restconf url corresponding to params
62 url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
66 def _build_post(odl_ip, port, uri, python_data, auth):
67 """Create a POST http request with generic on URI and data.
70 :param odl_ip: controller's ip address or hostname
72 :param port: controller's restconf port
74 :param uri: URI without /restconf/ to complete URL
76 :param python_data: python object to serialize into textual data
78 :param auth: authentication credentials
81 :returns http request object
84 url = _build_url(odl_ip, port, uri)
85 text_data = json.dumps(python_data)
86 header = {"Content-Type": "application/json"}
87 req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
91 def _prepare_add_car(odl_ip, port, item_list, auth):
92 """Creates a POST http requests to configure a car item in configuration datastore.
95 :param odl_ip: controller's ip address or hostname
97 :param port: controller's restconf port
99 :param item_list: controller item's list contains a list of ids of the cars
101 :param auth: authentication credentials
104 :returns req: http request object
107 container = {"car-entry": []}
108 for item in item_list:
109 entry = copy.deepcopy(_template_add_car["car-entry"][0])
111 entry["model"] = "model" + str(item)
112 container["car-entry"].append(entry)
113 req = _build_post(odl_ip, port, "config/car:cars", container, auth)
117 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
118 """Creates a POST http requests to configure people in configuration datastore.
121 :param odl_ip: controller's ip address or hostname
123 :param port: controller's restconf port
125 :param item_list: controller item's list contains a list of ids of the people
127 :param auth: authentication credentials
130 :returns req: http request object
133 container = {"input": {}}
135 entry = container["input"]
136 entry["people:id"] = str(item)
137 entry["people:address"] = "address" + str(item)
138 entry["people:contactNo"] = str(item)
139 container["input"] = entry
140 req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
144 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
145 """Creates a POST http requests to purchase cars using an rpc.
148 :param odl_ip: controller's ip address or hostname
150 :param port: controller's restconf port
152 :param item_list: controller item's list contains a list of ids of the people
153 only the first item is considered
155 :param auth: authentication credentials
158 :returns req: http request object
161 container = {"input": {}}
163 entry = container["input"]
164 entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
165 entry["car-purchase:person-id"] = str(item)
166 entry["car-purchase:car-id"] = str(item)
167 container["input"] = entry
168 req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
172 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
173 exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None,
174 req_timeout=60, retry_timeout=15, retry_rcs=[]):
175 """The funcion sends http requests.
177 Runs in the working thread. It reads out flow details from the queue and
178 sends apropriate http requests to the controller
181 :param thread_id: thread id
183 :param preparing_function: function to prepare the http request
185 :param in_queue: input queue, flow details are comming from here
187 :param exit_event: event to notify working thread that the parent
188 (task executor) stopped filling the input queue
190 :param odl_ip: ip address of ODL; default="127.0.0.1"
192 :param port: restconf port; default="8181"
194 :param out_queue: queue where the results should be put
196 :param req_timeout: http request timeout
198 :param retry_timeout: timout to give up retry attempts to send http requests
200 :param retry_rcs: list of return codes when retry should be performed
203 None (results is put into the output queue)
206 ses = requests.Session()
207 counter = [0 for i in range(600)]
211 item_list = in_queue.get(timeout=1)
213 if exit_event.is_set() and in_queue.empty():
216 req = preparing_function(odl_ip, port, item_list, auth)
218 start_time = time_now = time.time()
219 while start_time + retry_timeout > time_now:
221 rsp = ses.send(prep, timeout=req_timeout)
222 except requests.exceptions.Timeout:
224 logger.error("No response from %s", odl_ip)
227 counter[rsp.status_code] += 1
229 lvl = logging.INFO if rc > 299 else logging.DEBUG
230 logger.log(lvl, "Request started at {} finished with following detais".format(time.ctime(start_time)))
231 logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
232 logger.log(lvl, "Headers %s:", rsp.request.headers)
233 logger.log(lvl, "Body: %s", rsp.request.body)
234 logger.log(lvl, "Response: %s", rsp.text)
235 logger.log(lvl, "%s %s", rsp, rsp.reason)
236 if rc not in retry_rcs:
238 time_now = time.time()
240 for response_code, count in enumerate(counter):
242 responses[response_code] = count
243 out_queue.put(responses)
244 logger.info("Response code(s) got per number of requests: %s", responses)
247 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
248 thread_count=1, item_count=1, items_per_request=1,
249 auth=('admin', 'admin'), req_timeout=600, retry_timeout=15, retry_rcs=[]):
250 """The main function which drives sending of http requests.
252 Creates 2 queues and requested number of "working threads".
253 One queue is filled with flow details and working
254 threads read them out and send http requests.
255 The other queue is for sending results from working threads back.
256 After the threads' join, it produces a summary result.
259 :param preparing_function: function to prepare http request object
261 :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
263 :param port: restconf port; default="8181"
265 :param thread_count: number of threads used to send http requests; default=1
267 :param items_per_request: items per request, number of items sent in one http request
269 :param item_countpr: number of items to be sent in total
271 :param auth: authentication credentials
273 :param req_timeout: http request timeout
275 :param retry_timeout: timout to give up retry attempts to send http requests
277 :param retry_rcs: list of return codes when retry should be performed
280 :returns dict: dictionary of http response counts like
281 {"http_status_code1: "count1", etc.}
285 hosts = odl_ip.split(',')
288 items = [i + 1 for i in range(item_count)]
290 for i in range(0, item_count, items_per_request):
291 item_groups.append(items[i:i + items_per_request])
293 # fill the queue with details needed for one http requests
294 send_queue = Queue.Queue()
295 for item_list in item_groups:
296 send_queue.put(item_list)
298 # create an empty result queue
299 result_queue = Queue.Queue()
301 exit_event = threading.Event()
303 # start threads to read details from queues and to send http requests
305 for i in range(int(thread_count)):
306 thr = threading.Thread(target=_request_sender,
307 args=(i, preparing_function, auth),
308 kwargs={"in_queue": send_queue, "exit_event": exit_event,
309 "odl_ip": hosts[i % nrhosts], "port": port,
310 "out_queue": result_queue, "req_timeout": req_timeout,
311 "retry_timeout": retry_timeout, "retry_rcs": retry_rcs})
318 # wait for reqults and sum them up
321 # read partial resutls from sender thread
322 part_result = result_queue.get()
323 for k, v in part_result.iteritems():
331 def _build_delete(odl_ip, port, uri):
332 """Send DELETE to generic URI, assert status code is 200.
335 :param odl_ip: ip address of ODL
337 :param port: restconf port
339 :param uri: URI without /restconf/ to complete URL
345 Raise AssertionError if response status code != 200
348 url = _build_url(odl_ip, port, uri)
349 rsp = requests.delete(url, auth=auth)
350 logger.debug("%s %s", rsp.request, rsp.request.url)
351 logger.debug("Headers %s:", rsp.request.headers)
352 logger.debug("Body: %s", rsp.request.body)
353 logger.debug("Response: %s", rsp.text)
354 logger.info("%s %s", rsp, rsp.reason)
355 assert rsp.status_code == 200, rsp.text
358 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
359 """Delete cars container from config datastore, assert success.
362 :param odl_ip: ip address of ODL
364 :param port: restconf port
366 :param thread_count: ignored; only 1 thread needed
368 :param item_count: ignored; whole container is deleted
370 :param auth: authentication credentials
372 :param items_per_request: ignored; only 1 request needed
378 logger.info("Delete all cars from %s:%s", odl_ip, port)
379 _build_delete(odl_ip, port, "config/car:cars")
382 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
383 """Delete people container from config datastore.
386 :param odl_ip: ip address of ODL
388 :param port: restconf port
390 :param thread_count: ignored; only 1 thread needed
392 :param item_count: ignored; whole container is deleted
394 :param auth: authentication credentials
396 :param items_per_request: ignored; only 1 request needed
402 logger.info("Delete all people from %s:%s", odl_ip, port)
403 _build_delete(odl_ip, port, "config/people:people")
406 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
407 """Delete car-people container from config datastore.
410 :param odl_ip: ip address of ODL
412 :param port: restconf port
414 :param thread_count: ignored; only 1 thread needed
416 :param item_count: ignored; whole container is deleted
418 :param auth: authentication credentials
420 :param items_per_request: ignored; only 1 request needed
426 logger.info("Delete all purchases from %s:%s", odl_ip, port)
427 _build_delete(odl_ip, port, "config/car-people:car-people")
430 def _build_get(odl_ip, port, uri):
431 """Send GET to generic URI.
434 :param odl_ip: ip address of ODL
436 :param port: restconf port
438 :param uri: URI without /restconf/ to complete URL
444 Raise AssertionError if response status code != 200
447 url = _build_url(odl_ip, port, uri)
448 rsp = requests.get(url, auth=auth)
449 logger.debug("%s %s", rsp.request, rsp.request.url)
450 logger.debug("Headers %s:", rsp.request.headers)
451 logger.debug("Body: %s", rsp.request.body)
452 logger.debug("Response: %s", rsp.text)
453 logger.info("%s %s", rsp, rsp.reason)
454 assert rsp.status_code == 200, rsp.text
457 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
458 """Reads car entries from config datastore.
460 TODO: some needed logic to be added handle http response in the future,
461 e.g. count items in response's content
464 :param odl_ip: ip address of ODL
466 :param port: restconf port
468 :param thread_count: ignored; only 1 thread needed
470 :param item_count: ignored; whole container is deleted
472 :param auth: authentication credentials
474 :param items_per_request: ignored; only 1 request needed
480 logger.info("Get all cars from %s:%s", odl_ip, port)
481 _build_get(odl_ip, port, "config/car:cars")
484 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
485 """Reads people entries from config datastore.
487 TODO: some needed logic to be added handle http response in the future,
488 e.g. count items in response's content
491 :param odl_ip: ip address of ODL
493 :param port: restconf port
495 :param thread_count: ignored; only 1 thread needed
497 :param item_count: ignored; whole container is deleted
499 :param auth: authentication credentials
501 :param items_per_request: ignored; only 1 request needed
507 logger.info("Get all people from %s:%s", odl_ip, port)
508 _build_get(odl_ip, port, "config/people:people")
511 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
512 """Reads car-people entries from config datastore.
514 TODO: some needed logic to be added handle http response in the future,
515 e.g. count items in response's content
518 :param odl_ip: ip address of ODL
520 :param port: restconf port
522 :param thread_count: ignored; only 1 thread needed
524 :param item_count: ignored; whole container is deleted
526 :param auth: authentication credentials
528 :param items_per_request: ignored; only 1 request needed
534 logger.info("Get all purchases from %s:%s", odl_ip, port)
535 _build_get(odl_ip, port, "config/car-people:car-people")
538 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
539 """Configure car entries to the config datastore.
542 :param odl_ip: ip address of ODL
544 :param port: restconf port
546 :param thread_count: number of threads used to send http requests; default=1
548 :param item_count: number of items to be configured
550 :param auth: authentication credentials
552 :param items_per_request: items per request, not used here,
553 just to keep the same api
559 logger.info("Add %s car(s) to %s:%s (%s per request)",
560 item_count, odl_ip, port, items_per_request)
561 res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
562 thread_count=thread_count, item_count=item_count,
563 items_per_request=items_per_request, auth=auth)
564 if res.keys() != [204]:
565 logger.error("Not all cars were configured: " + repr(res))
566 raise Exception("Not all cars were configured: " + repr(res))
569 def add_car_with_retries(odl_ip, port, thread_count, item_count, auth, items_per_request):
570 """Configure car entries to the config datastore.
573 :param odl_ip: ip address of ODL
575 :param port: restconf port
577 :param thread_count: number of threads used to send http requests; default=1
579 :param item_count: number of items to be configured
581 :param auth: authentication credentials
583 :param items_per_request: items per request, not used here,
584 just to keep the same api
590 logger.info("Add %s car(s) to %s:%s (%s per request)",
591 item_count, odl_ip, port, items_per_request)
592 retry_rcs = [401, 404, 500, 503]
593 res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
594 thread_count=thread_count, item_count=item_count,
595 items_per_request=items_per_request, auth=auth,
596 req_timeout=15, retry_timeout=30, retry_rcs=retry_rcs)
597 acceptable_rcs = [204] + retry_rcs
598 for key in res.keys():
599 if key not in acceptable_rcs:
600 logger.error("Problems during cars' configuration appeared: " + repr(res))
601 raise Exception("Problems during cars' configuration appeared: " + repr(res))
604 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
605 """Configure people entries to the config datastore.
608 :param odl_ip: ip address of ODL; default="127.0.0.1"
610 :param port: restconf port; default="8181"
612 :param thread_count: number of threads used to send http requests; default=1
614 :param item_count: number of items to be condigured
616 :param auth: authentication credentials
618 :param items_per_request: items per request, not used here,
619 just to keep the same api
625 logger.info("Add %s people to %s:%s (%s per request)",
626 item_count, odl_ip, port, items_per_request)
627 if items_per_request != 1:
628 logger.error("Only 1 item per request is supported, " +
629 "you specified: {0}".format(item_count))
630 raise NotImplementedError("Only 1 item per request is supported, " +
631 "you specified: {0}".format(item_count))
632 res = _task_executor(_prepare_add_people_rpc, odl_ip=odl_ip, port=port,
633 thread_count=thread_count, item_count=item_count,
634 items_per_request=items_per_request, auth=auth)
635 if res.keys() != [200]:
636 logger.error("Not all people were configured: " + repr(res))
637 raise Exception("Not all people were configured: " + repr(res))
640 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
642 """Configure car-people entries to the config datastore one by one using rpc
645 :param odl_ip: ip address of ODL; default="127.0.0.1"
647 :param port: restconf port; default="8181"
649 :param thread_count: number of threads used to send http requests; default=1
651 :param item_count: number of items to be condigured
653 :param auth: authentication credentials
655 :param items_per_request: items per request, not used here,
656 just to keep the same api
662 logger.info("Add %s purchase(s) to %s:%s (%s per request)",
663 item_count, odl_ip, port, items_per_request)
664 if items_per_request != 1:
665 logger.error("Only 1 item per request is supported, " +
666 "you specified: {0}".format(item_count))
667 raise NotImplementedError("Only 1 item per request is supported, " +
668 "you specified: {0}".format(item_count))
670 res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
671 thread_count=thread_count, item_count=item_count,
672 items_per_request=items_per_request, auth=auth)
673 if res.keys() != [200]:
674 logger.error("Not all rpc calls passed: " + repr(res))
675 raise Exception("Not all rpc calls passed: " + repr(res))
678 _actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
679 _items = ["car", "people", "car-people"]
682 "add": {"car": add_car},
683 "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
684 "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
685 "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
686 "add-with-retries": {"car": add_car_with_retries},
690 if __name__ == "__main__":
692 This program executes requested action based in given parameters
694 It provides "car", "people" and "car-people" crud operations.
697 parser = argparse.ArgumentParser(description="Cluster datastore"
698 "performance test script")
699 parser.add_argument("--host", default="127.0.0.1",
700 help="Host where odl controller is running."
701 "Or comma separated list of hosts."
702 "(default is 127.0.0.1)")
703 parser.add_argument("--port", default="8181",
704 help="Port on which odl's RESTCONF is listening"
706 parser.add_argument("--threads", type=int, default=1,
707 help="Number of request worker threads to start in"
708 "each cycle (default=1)")
709 parser.add_argument("action", choices=_actions, metavar="action",
710 help="Action to be performed.")
711 parser.add_argument("--itemtype", choices=_items, default="car",
712 help="Flows-per-Request - number of flows (batch size)"
713 "sent in each HTTP request (default 1)")
714 parser.add_argument("--itemcount", type=int, help="Items per request",
716 parser.add_argument("--user", help="Restconf user name", default="admin")
717 parser.add_argument("--password", help="Restconf password", default="admin")
718 parser.add_argument("--ipr", type=int, help="Items per request", default=1)
719 parser.add_argument("--debug", dest="loglevel", action="store_const",
720 const=logging.DEBUG, default=logging.INFO,
721 help="Set log level to debug (default is error)")
723 args = parser.parse_args()
725 logger = logging.getLogger("logger")
726 log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
727 console_handler = logging.StreamHandler()
728 file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
729 console_handler.setFormatter(log_formatter)
730 file_handler.setFormatter(log_formatter)
731 logger.addHandler(console_handler)
732 logger.addHandler(file_handler)
733 logger.setLevel(args.loglevel)
735 auth = (args.user, args.password)
737 if (args.action not in _handler_matrix or
738 args.itemtype not in _handler_matrix[args.action]):
739 msg = "Unsupported combination of action: " + str(args.action)
740 msg += " and item: " + str(args.itemtype)
742 raise NotImplementedError(msg)
744 # TODO: need to filter out situations when we cannot use more items
745 # in one rest request (rpc or delete?)
746 # this should be done inside handler functions
748 handler_function = _handler_matrix[args.action][args.itemtype]
749 handler_function(args.host, args.port, args.threads,
750 args.itemcount, auth, args.ipr)