2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
17 "id": "to be replaced",
18 "category": "my_category",
19 "model": "to be replaced",
20 "manufacturer": "my_manufacturer",
26 _template_add_people_rpc = {
29 "people:id": "to be replaced",
30 "people:gender": "male",
32 "people:address": "to be replaced",
33 "people:contactNo": "to be replaced"
38 _template_add_cp_rpc = {
40 "car-purchase:person": "to be replaced",
41 "car-purchase:person-id": "to be replaced",
42 "car-purchase:car-id": "to be replaced"
47 def _build_url(odl_ip, port, uri):
48 """Compose URL from generic IP, port and URI fragment.
51 :param odl_ip: controller's ip address or hostname
53 :param port: controller's restconf port
55 :param uri: URI without /restconf/ to complete URL
58 :returns url: full restconf url corresponding to params
61 url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
65 def _build_post(odl_ip, port, uri, python_data, auth):
66 """Create a POST http request with generic on URI and data.
69 :param odl_ip: controller's ip address or hostname
71 :param port: controller's restconf port
73 :param uri: URI without /restconf/ to complete URL
75 :param python_data: python object to serialize into textual data
77 :param auth: authentication credentials
80 :returns http request object
83 url = _build_url(odl_ip, port, uri)
84 text_data = json.dumps(python_data)
85 header = {"Content-Type": "application/json"}
86 req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
90 def _prepare_add_car(odl_ip, port, item_list, auth):
91 """Creates a POST http requests to configure a car item in configuration datastore.
94 :param odl_ip: controller's ip address or hostname
96 :param port: controller's restconf port
98 :param item_list: controller item's list contains a list of ids of the cars
100 :param auth: authentication credentials
103 :returns req: http request object
106 container = {"car-entry": []}
107 for item in item_list:
108 entry = copy.deepcopy(_template_add_car["car-entry"][0])
110 entry["model"] = "model" + str(item)
111 container["car-entry"].append(entry)
112 req = _build_post(odl_ip, port, "config/car:cars", container, auth)
116 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
117 """Creates a POST http requests to configure people in configuration datastore.
120 :param odl_ip: controller's ip address or hostname
122 :param port: controller's restconf port
124 :param item_list: controller item's list contains a list of ids of the people
126 :param auth: authentication credentials
129 :returns req: http request object
132 container = {"input": {}}
134 entry = container["input"]
135 entry["people:id"] = str(item)
136 entry["people:address"] = "address" + str(item)
137 entry["people:contactNo"] = str(item)
138 container["input"] = entry
139 req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
143 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
144 """Creates a POST http requests to purchase cars using an rpc.
147 :param odl_ip: controller's ip address or hostname
149 :param port: controller's restconf port
151 :param item_list: controller item's list contains a list of ids of the people
152 only the first item is considered
154 :param auth: authentication credentials
157 :returns req: http request object
160 container = {"input": {}}
162 entry = container["input"]
163 entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
164 entry["car-purchase:person-id"] = str(item)
165 entry["car-purchase:car-id"] = str(item)
166 container["input"] = entry
167 req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
171 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
172 exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
173 """The funcion sends http requests.
175 Runs in the working thread. It reads out flow details from the queue and
176 sends apropriate http requests to the controller
179 :param thread_id: thread id
181 :param preparing_function: function to prepare the http request
183 :param in_queue: input queue, flow details are comming from here
185 :param exit_event: event to notify working thread that the parent
186 (task executor) stopped filling the input queue
188 :param odl_ip: ip address of ODL; default="127.0.0.1"
190 :param port: restconf port; default="8181"
192 :param out_queue: queue where the results should be put
195 None (results is put into the output queue)
198 ses = requests.Session()
199 counter = [0 for i in range(600)]
203 item_list = in_queue.get(timeout=1)
205 if exit_event.is_set() and in_queue.empty():
208 req = preparing_function(odl_ip, port, item_list, auth)
211 rsp = ses.send(prep, timeout=600)
212 except requests.exceptions.Timeout:
214 logger.error("No response from %s", odl_ip)
216 logger.debug("%s %s", rsp.request, rsp.request.url)
217 logger.debug("Headers %s:", rsp.request.headers)
218 logger.debug("Body: %s", rsp.request.body)
219 logger.debug("Response: %s", rsp.text)
220 logger.debug("%s %s", rsp, rsp.reason)
221 counter[rsp.status_code] += 1
223 for response_code, count in enumerate(counter):
225 responses[response_code] = count
226 out_queue.put(responses)
227 logger.info("Response code(s) got per number of requests: %s", responses)
230 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
231 thread_count=1, item_count=1, items_per_request=1,
232 auth=('admin', 'admin')):
233 """The main function which drives sending of http requests.
235 Creates 2 queues and requested number of "working threads".
236 One queue is filled with flow details and working
237 threads read them out and send http requests.
238 The other queue is for sending results from working threads back.
239 After the threads' join, it produces a summary result.
242 :param preparing_function: function to prepare http request object
244 :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
246 :param port: restconf port; default="8181"
248 :param thread_count: number of threads used to send http requests; default=1
250 :param items_per_request: items per request, number of items sent in one http request
252 :param item_countpr: number of items to be sent in total
254 :param auth: authentication credentials
257 :returns dict: dictionary of http response counts like
258 {"http_status_code1: "count1", etc.}
262 hosts = odl_ip.split(',')
265 items = [i + 1 for i in range(item_count)]
267 for i in range(0, item_count, items_per_request):
268 item_groups.append(items[i:i + items_per_request])
270 # fill the queue with details needed for one http requests
271 send_queue = Queue.Queue()
272 for item_list in item_groups:
273 send_queue.put(item_list)
275 # create an empty result queue
276 result_queue = Queue.Queue()
278 exit_event = threading.Event()
280 # start threads to read details from queues and to send http requests
282 for i in range(int(thread_count)):
283 thr = threading.Thread(target=_request_sender,
284 args=(i, preparing_function, auth),
285 kwargs={"in_queue": send_queue, "exit_event": exit_event,
286 "odl_ip": hosts[i % nrhosts], "port": port,
287 "out_queue": result_queue})
294 # wait for reqults and sum them up
297 # read partial resutls from sender thread
298 part_result = result_queue.get()
299 for k, v in part_result.iteritems():
307 def _build_delete(odl_ip, port, uri):
308 """Send DELETE to generic URI, assert status code is 200.
311 :param odl_ip: ip address of ODL
313 :param port: restconf port
315 :param uri: URI without /restconf/ to complete URL
321 Raise AssertionError if response status code != 200
324 url = _build_url(odl_ip, port, uri)
325 rsp = requests.delete(url, auth=auth)
326 logger.debug("%s %s", rsp.request, rsp.request.url)
327 logger.debug("Headers %s:", rsp.request.headers)
328 logger.debug("Body: %s", rsp.request.body)
329 logger.debug("Response: %s", rsp.text)
330 logger.info("%s %s", rsp, rsp.reason)
331 assert rsp.status_code == 200, rsp.text
334 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
335 """Delete cars container from config datastore, assert success.
338 :param odl_ip: ip address of ODL
340 :param port: restconf port
342 :param thread_count: ignored; only 1 thread needed
344 :param item_count: ignored; whole container is deleted
346 :param auth: authentication credentials
348 :param items_per_request: ignored; only 1 request needed
354 logger.info("Delete all cars from %s:%s", odl_ip, port)
355 _build_delete(odl_ip, port, "config/car:cars")
358 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
359 """Delete people container from config datastore.
362 :param odl_ip: ip address of ODL
364 :param port: restconf port
366 :param thread_count: ignored; only 1 thread needed
368 :param item_count: ignored; whole container is deleted
370 :param auth: authentication credentials
372 :param items_per_request: ignored; only 1 request needed
378 logger.info("Delete all people from %s:%s", odl_ip, port)
379 _build_delete(odl_ip, port, "config/people:people")
382 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
383 """Delete car-people container from config datastore.
386 :param odl_ip: ip address of ODL
388 :param port: restconf port
390 :param thread_count: ignored; only 1 thread needed
392 :param item_count: ignored; whole container is deleted
394 :param auth: authentication credentials
396 :param items_per_request: ignored; only 1 request needed
402 logger.info("Delete all purchases from %s:%s", odl_ip, port)
403 _build_delete(odl_ip, port, "config/car-people:car-people")
406 def _build_get(odl_ip, port, uri):
407 """Send GET to generic URI.
410 :param odl_ip: ip address of ODL
412 :param port: restconf port
414 :param uri: URI without /restconf/ to complete URL
420 Raise AssertionError if response status code != 200
423 url = _build_url(odl_ip, port, uri)
424 rsp = requests.get(url, auth=auth)
425 logger.debug("%s %s", rsp.request, rsp.request.url)
426 logger.debug("Headers %s:", rsp.request.headers)
427 logger.debug("Body: %s", rsp.request.body)
428 logger.debug("Response: %s", rsp.text)
429 logger.info("%s %s", rsp, rsp.reason)
430 assert rsp.status_code == 200, rsp.text
433 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
434 """Reads car entries from config datastore.
436 TODO: some needed logic to be added handle http response in the future,
437 e.g. count items in response's content
440 :param odl_ip: ip address of ODL
442 :param port: restconf port
444 :param thread_count: ignored; only 1 thread needed
446 :param item_count: ignored; whole container is deleted
448 :param auth: authentication credentials
450 :param items_per_request: ignored; only 1 request needed
456 logger.info("Get all cars from %s:%s", odl_ip, port)
457 _build_get(odl_ip, port, "config/car:cars")
460 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
461 """Reads people entries from config datastore.
463 TODO: some needed logic to be added handle http response in the future,
464 e.g. count items in response's content
467 :param odl_ip: ip address of ODL
469 :param port: restconf port
471 :param thread_count: ignored; only 1 thread needed
473 :param item_count: ignored; whole container is deleted
475 :param auth: authentication credentials
477 :param items_per_request: ignored; only 1 request needed
483 logger.info("Get all people from %s:%s", odl_ip, port)
484 _build_get(odl_ip, port, "config/people:people")
487 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
488 """Reads car-people entries from config datastore.
490 TODO: some needed logic to be added handle http response in the future,
491 e.g. count items in response's content
494 :param odl_ip: ip address of ODL
496 :param port: restconf port
498 :param thread_count: ignored; only 1 thread needed
500 :param item_count: ignored; whole container is deleted
502 :param auth: authentication credentials
504 :param items_per_request: ignored; only 1 request needed
510 logger.info("Get all purchases from %s:%s", odl_ip, port)
511 _build_get(odl_ip, port, "config/car-people:car-people")
514 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
515 """Configure car entries to the config datastore.
518 :param odl_ip: ip address of ODL
520 :param port: restconf port
522 :param thread_count: number of threads used to send http requests; default=1
524 :param item_count: number of items to be configured
526 :param auth: authentication credentials
528 :param items_per_request: items per request, not used here,
529 just to keep the same api
535 logger.info("Add %s car(s) to %s:%s (%s per request)",
536 item_count, odl_ip, port, items_per_request)
537 res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
538 thread_count=thread_count, item_count=item_count,
539 items_per_request=items_per_request, auth=auth)
540 if res.keys() != [204]:
541 logger.error("Not all cars were configured: " + repr(res))
542 raise Exception("Not all cars were configured: " + repr(res))
545 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
546 """Configure people entries to the config datastore.
549 :param odl_ip: ip address of ODL; default="127.0.0.1"
551 :param port: restconf port; default="8181"
553 :param thread_count: number of threads used to send http requests; default=1
555 :param item_count: number of items to be condigured
557 :param auth: authentication credentials
559 :param items_per_request: items per request, not used here,
560 just to keep the same api
566 logger.info("Add %s people to %s:%s (%s per request)",
567 item_count, odl_ip, port, items_per_request)
568 if items_per_request != 1:
569 logger.error("Only 1 item per request is supported, " +
570 "you specified: {0}".format(item_count))
571 raise NotImplementedError("Only 1 item per request is supported, " +
572 "you specified: {0}".format(item_count))
573 res = _task_executor(_prepare_add_people_rpc, odl_ip=odl_ip, port=port,
574 thread_count=thread_count, item_count=item_count,
575 items_per_request=items_per_request, auth=auth)
576 if res.keys() != [200]:
577 logger.error("Not all people were configured: " + repr(res))
578 raise Exception("Not all people were configured: " + repr(res))
581 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
583 """Configure car-people entries to the config datastore one by one using rpc
586 :param odl_ip: ip address of ODL; default="127.0.0.1"
588 :param port: restconf port; default="8181"
590 :param thread_count: number of threads used to send http requests; default=1
592 :param item_count: number of items to be condigured
594 :param auth: authentication credentials
596 :param items_per_request: items per request, not used here,
597 just to keep the same api
603 logger.info("Add %s purchase(s) to %s:%s (%s per request)",
604 item_count, odl_ip, port, items_per_request)
605 if items_per_request != 1:
606 logger.error("Only 1 item per request is supported, " +
607 "you specified: {0}".format(item_count))
608 raise NotImplementedError("Only 1 item per request is supported, " +
609 "you specified: {0}".format(item_count))
611 res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
612 thread_count=thread_count, item_count=item_count,
613 items_per_request=items_per_request, auth=auth)
614 if res.keys() != [200]:
615 logger.error("Not all rpc calls passed: " + repr(res))
616 raise Exception("Not all rpc calls passed: " + repr(res))
619 _actions = ["add", "get", "delete", "add-rpc"]
620 _items = ["car", "people", "car-people"]
623 "add": {"car": add_car},
624 "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
625 "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
626 "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
630 if __name__ == "__main__":
632 This program executes requested action based in given parameters
634 It provides "car", "people" and "car-people" crud operations.
637 parser = argparse.ArgumentParser(description="Cluster datastore"
638 "performance test script")
639 parser.add_argument("--host", default="127.0.0.1",
640 help="Host where odl controller is running."
641 "Or comma separated list of hosts."
642 "(default is 127.0.0.1)")
643 parser.add_argument("--port", default="8181",
644 help="Port on which odl's RESTCONF is listening"
646 parser.add_argument("--threads", type=int, default=1,
647 help="Number of request worker threads to start in"
648 "each cycle (default=1)")
649 parser.add_argument("action", choices=_actions, metavar="action",
650 help="Action to be performed.")
651 parser.add_argument("--itemtype", choices=_items, default="car",
652 help="Flows-per-Request - number of flows (batch size)"
653 "sent in each HTTP request (default 1)")
654 parser.add_argument("--itemcount", type=int, help="Items per request",
656 parser.add_argument("--user", help="Restconf user name", default="admin")
657 parser.add_argument("--password", help="Restconf password", default="admin")
658 parser.add_argument("--ipr", type=int, help="Items per request", default=1)
659 parser.add_argument("--debug", dest="loglevel", action="store_const",
660 const=logging.DEBUG, default=logging.INFO,
661 help="Set log level to debug (default is error)")
663 args = parser.parse_args()
665 logger = logging.getLogger("logger")
666 log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
667 console_handler = logging.StreamHandler()
668 file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
669 console_handler.setFormatter(log_formatter)
670 file_handler.setFormatter(log_formatter)
671 logger.addHandler(console_handler)
672 logger.addHandler(file_handler)
673 logger.setLevel(args.loglevel)
675 auth = (args.user, args.password)
677 if (args.action not in _handler_matrix or
678 args.itemtype not in _handler_matrix[args.action]):
679 msg = "Unsupported combination of action: " + str(args.action)
680 msg += " and item: " + str(args.itemtype)
682 raise NotImplementedError(msg)
684 # TODO: need to filter out situations when we cannot use more items
685 # in one rest request (rpc or delete?)
686 # this should be done inside handler functions
688 handler_function = _handler_matrix[args.action][args.itemtype]
689 handler_function(args.host, args.port, args.threads,
690 args.itemcount, auth, args.ipr)