2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
17 "id": "to be replaced",
18 "category": "my_category",
19 "model": "to be replaced",
20 "manufacturer": "my_manufacturer",
26 _template_add_people = {
29 "id": "to be replaced",
32 "address": "to be replaced",
33 "contactNo": "to be replaced"
38 _template_add_cp_rpc = {
40 "car-purchase:person": "to be replaced",
41 "car-purchase:person-id": "to be replaced",
42 "car-purchase:car-id": "to be replaced"
47 def _build_url(odl_ip, port, uri):
48 """Compose URL from generic IP, port and URI fragment.
51 :param odl_ip: controller's ip address or hostname
53 :param port: controller's restconf port
55 :param uri: URI without /restconf/ to complete URL
58 :returns url: full restconf url corresponding to params
61 url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
65 def _build_post(odl_ip, port, uri, python_data, auth):
66 """Create a POST http request with generic on URI and data.
69 :param odl_ip: controller's ip address or hostname
71 :param port: controller's restconf port
73 :param uri: URI without /restconf/ to complete URL
75 :param python_data: python object to serialize into textual data
77 :param auth: authentication credentials
80 :returns http request object
83 url = _build_url(odl_ip, port, uri)
84 text_data = json.dumps(python_data)
85 header = {"Content-Type": "application/json"}
86 req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
90 def _prepare_add_car(odl_ip, port, item_list, auth):
91 """Creates a POST http requests to configure a car item in configuration datastore.
94 :param odl_ip: controller's ip address or hostname
96 :param port: controller's restconf port
98 :param item_list: controller item's list contains a list of ids of the cars
100 :param auth: authentication credentials
103 :returns req: http request object
106 container = {"car-entry": []}
107 for item in item_list:
108 entry = copy.deepcopy(_template_add_car["car-entry"][0])
110 entry["model"] = "model" + str(item)
111 container["car-entry"].append(entry)
112 req = _build_post(odl_ip, port, "config/car:cars", container, auth)
116 def _prepare_add_people(odl_ip, port, item_list, auth):
117 """Creates a POST http requests to configure people in configuration datastore.
120 :param odl_ip: controller's ip address or hostname
122 :param port: controller's restconf port
124 :param item_list: controller item's list contains a list of ids of the people
126 :param auth: authentication credentials
129 :returns req: http request object
132 container = {"person": []}
133 for item in item_list:
134 entry = copy.deepcopy(_template_add_people["person"][0])
135 entry["id"] = str(item)
136 entry["address"] = "address" + str(item)
137 entry["contactNo"] = str(item)
138 container["person"].append(entry)
139 req = _build_post(odl_ip, port, "config/people:people", container, auth)
143 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
144 """Creates a POST http requests to purchase cars using an rpc.
147 :param odl_ip: controller's ip address or hostname
149 :param port: controller's restconf port
151 :param item_list: controller item's list contains a list of ids of the people
152 only the first item is considered
154 :param auth: authentication credentials
157 :returns req: http request object
160 container = {"input": {}}
162 entry = container["input"]
163 entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
164 entry["car-purchase:person-id"] = str(item)
165 entry["car-purchase:car-id"] = str(item)
166 container["input"] = entry
167 req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
171 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
172 exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
173 """The funcion sends http requests.
175 Runs in the working thread. It reads out flow details from the queue and
176 sends apropriate http requests to the controller
179 :param thread_id: thread id
181 :param preparing_function: function to prepare the http request
183 :param in_queue: input queue, flow details are comming from here
185 :param exit_event: event to notify working thread that the parent
186 (task executor) stopped filling the input queue
188 :param odl_ip: ip address of ODL; default="127.0.0.1"
190 :param port: restconf port; default="8181"
192 :param out_queue: queue where the results should be put
195 None (results is put into the output queue)
198 ses = requests.Session()
199 counter = [0 for i in range(600)]
203 item_list = in_queue.get(timeout=1)
205 if exit_event.is_set() and in_queue.empty():
208 req = preparing_function(odl_ip, port, item_list, auth)
211 rsp = ses.send(prep, timeout=60)
212 except requests.exceptions.Timeout:
214 logger.error("No response from %s", odl_ip)
216 logger.debug("%s %s", rsp.request, rsp.request.url)
217 logger.debug("Headers %s:", rsp.request.headers)
218 logger.debug("Body: %s", rsp.request.body)
219 logger.debug("Response: %s", rsp.text)
220 logger.debug("%s %s", rsp, rsp.reason)
221 counter[rsp.status_code] += 1
223 for response_code, count in enumerate(counter):
225 responses[response_code] = count
226 out_queue.put(responses)
227 logger.info("Response code(s) got per number of requests: %s", responses)
230 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
231 thread_count=1, item_count=1, items_per_request=1,
232 auth=('admin', 'admin')):
233 """The main function which drives sending of http requests.
235 Creates 2 queues and requested number of "working threads".
236 One queue is filled with flow details and working
237 threads read them out and send http requests.
238 The other queue is for sending results from working threads back.
239 After the threads' join, it produces a summary result.
242 :param preparing_function: function to prepare http request object
244 :param odl_ip: ip address of ODL; default="127.0.0.1"
246 :param port: restconf port; default="8181"
248 :param thread_count: number of threads used to send http requests; default=1
250 :param items_per_request: items per request, number of items sent in one http request
252 :param item_countpr: number of items to be sent in total
254 :param auth: authentication credentials
257 :returns dict: dictionary of http response counts like
258 {"http_status_code1: "count1", etc.}
261 items = [i+1 for i in range(item_count)]
263 for i in range(0, item_count, items_per_request):
264 item_groups.append(items[i:i+items_per_request])
266 # fill the queue with details needed for one http requests
267 send_queue = Queue.Queue()
268 for item_list in item_groups:
269 send_queue.put(item_list)
271 # create an empty result queue
272 result_queue = Queue.Queue()
274 exit_event = threading.Event()
276 # start threads to read details from queues and to send http requests
278 for i in range(int(thread_count)):
279 thr = threading.Thread(target=_request_sender,
280 args=(i, preparing_function, auth),
281 kwargs={"in_queue": send_queue, "exit_event": exit_event,
282 "odl_ip": odl_ip, "port": port,
283 "out_queue": result_queue})
290 # wait for reqults and sum them up
293 # read partial resutls from sender thread
294 part_result = result_queue.get()
295 for k, v in part_result.iteritems():
303 def _build_delete(odl_ip, port, uri):
304 """Send DELETE to generic URI, assert status code is 200.
307 :param odl_ip: ip address of ODL
309 :param port: restconf port
311 :param uri: URI without /restconf/ to complete URL
317 Raise AssertionError if response status code != 200
320 url = _build_url(odl_ip, port, uri)
321 rsp = requests.delete(url, auth=auth)
322 logger.debug("%s %s", rsp.request, rsp.request.url)
323 logger.debug("Headers %s:", rsp.request.headers)
324 logger.debug("Body: %s", rsp.request.body)
325 logger.debug("Response: %s", rsp.text)
326 logger.info("%s %s", rsp, rsp.reason)
327 assert rsp.status_code == 200, rsp.text
330 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
331 """Delete cars container from config datastore, assert success.
334 :param odl_ip: ip address of ODL
336 :param port: restconf port
338 :param thread_count: ignored; only 1 thread needed
340 :param item_count: ignored; whole container is deleted
342 :param auth: authentication credentials
344 :param items_per_request: ignored; only 1 request needed
350 logger.info("Delete all cars from %s:%s", odl_ip, port)
351 _build_delete(odl_ip, port, "config/car:cars")
354 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
355 """Delete people container from config datastore.
358 :param odl_ip: ip address of ODL
360 :param port: restconf port
362 :param thread_count: ignored; only 1 thread needed
364 :param item_count: ignored; whole container is deleted
366 :param auth: authentication credentials
368 :param items_per_request: ignored; only 1 request needed
374 logger.info("Delete all people from %s:%s", odl_ip, port)
375 _build_delete(odl_ip, port, "config/people:people")
378 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
379 """Delete car-people container from config datastore.
382 :param odl_ip: ip address of ODL
384 :param port: restconf port
386 :param thread_count: ignored; only 1 thread needed
388 :param item_count: ignored; whole container is deleted
390 :param auth: authentication credentials
392 :param items_per_request: ignored; only 1 request needed
398 logger.info("Delete all purchases from %s:%s", odl_ip, port)
399 _build_delete(odl_ip, port, "config/car-people:car-people")
402 def _build_get(odl_ip, port, uri):
403 """Send GET to generic URI.
406 :param odl_ip: ip address of ODL
408 :param port: restconf port
410 :param uri: URI without /restconf/ to complete URL
416 Raise AssertionError if response status code != 200
419 url = _build_url(odl_ip, port, uri)
420 rsp = requests.get(url, auth=auth)
421 logger.debug("%s %s", rsp.request, rsp.request.url)
422 logger.debug("Headers %s:", rsp.request.headers)
423 logger.debug("Body: %s", rsp.request.body)
424 logger.debug("Response: %s", rsp.text)
425 logger.info("%s %s", rsp, rsp.reason)
426 assert rsp.status_code == 200, rsp.text
429 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
430 """Reads car entries from config datastore.
432 TODO: some needed logic to be added handle http response in the future,
433 e.g. count items in response's content
436 :param odl_ip: ip address of ODL
438 :param port: restconf port
440 :param thread_count: ignored; only 1 thread needed
442 :param item_count: ignored; whole container is deleted
444 :param auth: authentication credentials
446 :param items_per_request: ignored; only 1 request needed
452 logger.info("Get all cars from %s:%s", odl_ip, port)
453 _build_get(odl_ip, port, "config/car:cars")
456 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
457 """Reads people entries from config datastore.
459 TODO: some needed logic to be added handle http response in the future,
460 e.g. count items in response's content
463 :param odl_ip: ip address of ODL
465 :param port: restconf port
467 :param thread_count: ignored; only 1 thread needed
469 :param item_count: ignored; whole container is deleted
471 :param auth: authentication credentials
473 :param items_per_request: ignored; only 1 request needed
479 logger.info("Get all people from %s:%s", odl_ip, port)
480 _build_get(odl_ip, port, "config/people:people")
483 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
484 """Reads car-people entries from config datastore.
486 TODO: some needed logic to be added handle http response in the future,
487 e.g. count items in response's content
490 :param odl_ip: ip address of ODL
492 :param port: restconf port
494 :param thread_count: ignored; only 1 thread needed
496 :param item_count: ignored; whole container is deleted
498 :param auth: authentication credentials
500 :param items_per_request: ignored; only 1 request needed
506 logger.info("Get all purchases from %s:%s", odl_ip, port)
507 _build_get(odl_ip, port, "config/car-people:car-people")
510 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
511 """Configure car entries to the config datastore.
514 :param odl_ip: ip address of ODL
516 :param port: restconf port
518 :param thread_count: number of threads used to send http requests; default=1
520 :param item_count: number of items to be configured
522 :param auth: authentication credentials
524 :param items_per_request: items per request, not used here,
525 just to keep the same api
531 logger.info("Add %s car(s) to %s:%s (%s per request)",
532 item_count, odl_ip, port, items_per_request)
533 res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
534 thread_count=thread_count, item_count=item_count,
535 items_per_request=items_per_request, auth=auth)
536 if res.keys() != [204]:
537 logger.error("Not all cars were configured: " + repr(res))
538 raise Exception("Not all cars were configured: " + repr(res))
541 def add_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
542 """Configure people entries to the config datastore.
545 :param odl_ip: ip address of ODL; default="127.0.0.1"
547 :param port: restconf port; default="8181"
549 :param thread_count: number of threads used to send http requests; default=1
551 :param item_count: number of items to be condigured
553 :param auth: authentication credentials
555 :param items_per_request: items per request, not used here,
556 just to keep the same api
562 logger.info("Add %s people to %s:%s (%s per request)",
563 item_count, odl_ip, port, items_per_request)
564 res = _task_executor(_prepare_add_people, odl_ip=odl_ip, port=port,
565 thread_count=thread_count, item_count=item_count,
566 items_per_request=items_per_request, auth=auth)
567 if res.keys() != [204]:
568 logger.error("Not all people were configured: " + repr(res))
569 raise Exception("Not all people were configured: " + repr(res))
572 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
574 """Configure car-people entries to the config datastore one by one using rpc
577 :param odl_ip: ip address of ODL; default="127.0.0.1"
579 :param port: restconf port; default="8181"
581 :param thread_count: number of threads used to send http requests; default=1
583 :param item_count: number of items to be condigured
585 :param auth: authentication credentials
587 :param items_per_request: items per request, not used here,
588 just to keep the same api
594 logger.info("Add %s purchase(s) to %s:%s (%s per request)",
595 item_count, odl_ip, port, items_per_request)
596 if items_per_request != 1:
597 logger.error("Only 1 item per request is supported, " +
598 "you specified: {0}".format(item_count))
599 raise NotImplementedError("Only 1 item per request is supported, " +
600 "you specified: {0}".format(item_count))
602 res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
603 thread_count=thread_count, item_count=item_count,
604 items_per_request=items_per_request, auth=auth)
605 if res.keys() != [204]:
606 logger.error("Not all rpc calls passed: " + repr(res))
607 raise Exception("Not all rpc calls passed: " + repr(res))
610 _actions = ["add", "get", "delete", "add-rpc"]
611 _items = ["car", "people", "car-people"]
614 "add": {"car": add_car, "people": add_people},
615 "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
616 "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
617 "add-rpc": {"car-people": add_car_people_rpc},
621 if __name__ == "__main__":
623 This program executes requested action based in given parameters
625 It provides "car", "people" and "car-people" crud operations.
628 parser = argparse.ArgumentParser(description="Cluster datastore"
629 "performance test script")
630 parser.add_argument("--host", default="127.0.0.1",
631 help="Host where odl controller is running"
632 "(default is 127.0.0.1)")
633 parser.add_argument("--port", default="8181",
634 help="Port on which odl's RESTCONF is listening"
636 parser.add_argument("--threads", type=int, default=1,
637 help="Number of request worker threads to start in"
638 "each cycle (default=1)")
639 parser.add_argument("action", choices=_actions, metavar="action",
640 help="Action to be performed.")
641 parser.add_argument("--itemtype", choices=_items, default="car",
642 help="Flows-per-Request - number of flows (batch size)"
643 "sent in each HTTP request (default 1)")
644 parser.add_argument("--itemcount", type=int, help="Items per request",
646 parser.add_argument("--user", help="Restconf user name", default="admin")
647 parser.add_argument("--password", help="Restconf password", default="admin")
648 parser.add_argument("--ipr", type=int, help="Items per request", default=1)
649 parser.add_argument("--debug", dest="loglevel", action="store_const",
650 const=logging.DEBUG, default=logging.INFO,
651 help="Set log level to debug (default is error)")
653 args = parser.parse_args()
655 logger = logging.getLogger("logger")
656 log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
657 console_handler = logging.StreamHandler()
658 file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
659 console_handler.setFormatter(log_formatter)
660 file_handler.setFormatter(log_formatter)
661 logger.addHandler(console_handler)
662 logger.addHandler(file_handler)
663 logger.setLevel(args.loglevel)
665 auth = (args.user, args.password)
667 if (args.action not in _handler_matrix or
668 args.itemtype not in _handler_matrix[args.action]):
669 msg = "Unsupported combination of action: " + str(args.action)
670 msg += " and item: " + str(args.itemtype)
672 raise NotImplementedError(msg)
674 # TODO: need to filter out situations when we cannot use more items
675 # in one rest request (rpc or delete?)
676 # this should be done inside handler functions
678 handler_function = _handler_matrix[args.action][args.itemtype]
679 handler_function(args.host, args.port, args.threads,
680 args.itemcount, auth, args.ipr)