Fix car-people performance test
[integration/test.git] / tools / odl-mdsal-clustering-tests / scripts / cluster_rest_script.py
1 """
2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
4 """
5 import threading
6 import Queue
7 import requests
8 import json
9 import copy
10 import argparse
11 import logging
12
13
14 _template_add_car = {
15     "car-entry": [
16         {
17             "id": "to be replaced",
18             "category": "my_category",
19             "model": "to be replaced",
20             "manufacturer": "my_manufacturer",
21             "year": "2015"
22         }
23     ]
24 }
25
26 _template_add_people_rpc = {
27     "input": [
28         {
29             "people:id": "to be replaced",
30             "people:gender": "male",
31             "people:age": "99",
32             "people:address": "to be replaced",
33             "people:contactNo": "to be replaced"
34         }
35     ]
36 }
37
38 _template_add_cp_rpc = {
39     "input": {
40         "car-purchase:person": "to be replaced",
41         "car-purchase:person-id": "to be replaced",
42         "car-purchase:car-id": "to be replaced"
43     }
44 }
45
46
47 def _build_url(odl_ip, port, uri):
48     """Compose URL from generic IP, port and URI fragment.
49
50     Args:
51         :param odl_ip: controller's ip address or hostname
52
53         :param port: controller's restconf port
54
55         :param uri: URI without /restconf/ to complete URL
56
57     Returns:
58         :returns url: full restconf url corresponding to params
59     """
60
61     url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
62     return url
63
64
65 def _build_post(odl_ip, port, uri, python_data, auth):
66     """Create a POST http request with generic on URI and data.
67
68     Args:
69         :param odl_ip: controller's ip address or hostname
70
71         :param port: controller's restconf port
72
73         :param uri: URI without /restconf/ to complete URL
74
75         :param python_data: python object to serialize into textual data
76
77         :param auth: authentication credentials
78
79     Returns:
80         :returns http request object
81     """
82
83     url = _build_url(odl_ip, port, uri)
84     text_data = json.dumps(python_data)
85     header = {"Content-Type": "application/json"}
86     req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
87     return req
88
89
90 def _prepare_add_car(odl_ip, port, item_list, auth):
91     """Creates a POST http requests to configure a car item in configuration datastore.
92
93     Args:
94         :param odl_ip: controller's ip address or hostname
95
96         :param port: controller's restconf port
97
98         :param item_list: controller item's list contains a list of ids of the cars
99
100         :param auth: authentication credentials
101
102     Returns:
103         :returns req: http request object
104     """
105
106     container = {"car-entry": []}
107     for item in item_list:
108         entry = copy.deepcopy(_template_add_car["car-entry"][0])
109         entry["id"] = item
110         entry["model"] = "model" + str(item)
111         container["car-entry"].append(entry)
112     req = _build_post(odl_ip, port, "config/car:cars", container, auth)
113     return req
114
115
116 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
117     """Creates a POST http requests to configure people in configuration datastore.
118
119     Args:
120         :param odl_ip: controller's ip address or hostname
121
122         :param port: controller's restconf port
123
124         :param item_list: controller item's list contains a list of ids of the people
125
126         :param auth: authentication credentials
127
128     Returns:
129         :returns req: http request object
130     """
131
132     container = {"input": {}}
133     item = item_list[0]
134     entry = container["input"]
135     entry["people:id"] = str(item)
136     entry["people:address"] = "address" + str(item)
137     entry["people:contactNo"] = str(item)
138     container["input"] = entry
139     req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
140     return req
141
142
143 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
144     """Creates a POST http requests to purchase cars using an rpc.
145
146     Args:
147         :param odl_ip: controller's ip address or hostname
148
149         :param port: controller's restconf port
150
151         :param item_list: controller item's list contains a list of ids of the people
152         only the first item is considered
153
154         :param auth: authentication credentials
155
156     Returns:
157         :returns req: http request object
158     """
159
160     container = {"input": {}}
161     item = item_list[0]
162     entry = container["input"]
163     entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
164     entry["car-purchase:person-id"] = str(item)
165     entry["car-purchase:car-id"] = str(item)
166     container["input"] = entry
167     req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
168     return req
169
170
171 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
172                     exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
173     """The funcion sends http requests.
174
175     Runs in the working thread. It reads out flow details from the queue and
176     sends apropriate http requests to the controller
177
178     Args:
179         :param thread_id: thread id
180
181         :param preparing_function: function to prepare the http request
182
183         :param in_queue: input queue, flow details are comming from here
184
185         :param exit_event: event to notify working thread that the parent
186                            (task executor) stopped filling the input queue
187
188         :param odl_ip: ip address of ODL; default="127.0.0.1"
189
190         :param port: restconf port; default="8181"
191
192         :param out_queue: queue where the results should be put
193
194     Returns:
195         None (results is put into the output queue)
196     """
197
198     ses = requests.Session()
199     counter = [0 for i in range(600)]
200
201     while True:
202         try:
203             item_list = in_queue.get(timeout=1)
204         except Queue.Empty:
205             if exit_event.is_set() and in_queue.empty():
206                 break
207             continue
208         req = preparing_function(odl_ip, port, item_list, auth)
209         prep = req.prepare()
210         try:
211             rsp = ses.send(prep, timeout=60)
212         except requests.exceptions.Timeout:
213             counter[99] += 1
214             logger.error("No response from %s", odl_ip)
215             continue
216         logger.debug("%s %s", rsp.request, rsp.request.url)
217         logger.debug("Headers %s:", rsp.request.headers)
218         logger.debug("Body: %s", rsp.request.body)
219         logger.debug("Response: %s", rsp.text)
220         logger.debug("%s %s", rsp, rsp.reason)
221         counter[rsp.status_code] += 1
222     responses = {}
223     for response_code, count in enumerate(counter):
224         if count > 0:
225             responses[response_code] = count
226     out_queue.put(responses)
227     logger.info("Response code(s) got per number of requests: %s", responses)
228
229
230 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
231                    thread_count=1, item_count=1, items_per_request=1,
232                    auth=('admin', 'admin')):
233     """The main function which drives sending of http requests.
234
235     Creates 2 queues and requested number of "working threads".
236     One queue is filled with flow details and working
237     threads read them out and send http requests.
238     The other queue is for sending results from working threads back.
239     After the threads' join, it produces a summary result.
240
241     Args:
242         :param preparing_function: function to prepare http request object
243
244         :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
245
246         :param port: restconf port; default="8181"
247
248         :param thread_count: number of threads used to send http requests; default=1
249
250         :param items_per_request: items per request, number of items sent in one http request
251
252         :param item_countpr: number of items to be sent in total
253
254         :param auth: authentication credentials
255
256     Returns:
257         :returns dict: dictionary of http response counts like
258                        {"http_status_code1: "count1", etc.}
259     """
260
261     # geting hosts
262     hosts = odl_ip.split(',')
263     nrhosts = len(hosts)
264
265     items = [i + 1 for i in range(item_count)]
266     item_groups = []
267     for i in range(0, item_count, items_per_request):
268         item_groups.append(items[i:i + items_per_request])
269
270     # fill the queue with details needed for one http requests
271     send_queue = Queue.Queue()
272     for item_list in item_groups:
273         send_queue.put(item_list)
274
275     # create an empty result queue
276     result_queue = Queue.Queue()
277     # create exit event
278     exit_event = threading.Event()
279
280     # start threads to read details from queues and to send http requests
281     threads = []
282     for i in range(int(thread_count)):
283         thr = threading.Thread(target=_request_sender,
284                                args=(i, preparing_function, auth),
285                                kwargs={"in_queue": send_queue, "exit_event": exit_event,
286                                        "odl_ip": hosts[i % nrhosts], "port": port,
287                                        "out_queue": result_queue})
288         threads.append(thr)
289         thr.start()
290
291     exit_event.set()
292
293     result = {}
294     # wait for reqults and sum them up
295     for t in threads:
296         t.join()
297         # read partial resutls from sender thread
298         part_result = result_queue.get()
299         for k, v in part_result.iteritems():
300             if k not in result:
301                 result[k] = v
302             else:
303                 result[k] += v
304     return result
305
306
307 def _build_delete(odl_ip, port, uri):
308     """Send DELETE to generic URI, assert status code is 200.
309
310     Args:
311         :param odl_ip: ip address of ODL
312
313         :param port: restconf port
314
315         :param uri: URI without /restconf/ to complete URL
316
317     Returns:
318         None
319
320     Note:
321          Raise AssertionError if response status code != 200
322     """
323
324     url = _build_url(odl_ip, port, uri)
325     rsp = requests.delete(url, auth=auth)
326     logger.debug("%s %s", rsp.request, rsp.request.url)
327     logger.debug("Headers %s:", rsp.request.headers)
328     logger.debug("Body: %s", rsp.request.body)
329     logger.debug("Response: %s", rsp.text)
330     logger.info("%s %s", rsp, rsp.reason)
331     assert rsp.status_code == 200, rsp.text
332
333
334 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
335     """Delete cars container from config datastore, assert success.
336
337     Args:
338         :param odl_ip: ip address of ODL
339
340         :param port: restconf port
341
342         :param thread_count: ignored; only 1 thread needed
343
344         :param item_count: ignored; whole container is deleted
345
346         :param auth: authentication credentials
347
348         :param items_per_request: ignored; only 1 request needed
349
350     Returns:
351         None
352     """
353
354     logger.info("Delete all cars from %s:%s", odl_ip, port)
355     _build_delete(odl_ip, port, "config/car:cars")
356
357
358 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
359     """Delete people container from config datastore.
360
361     Args:
362         :param odl_ip: ip address of ODL
363
364         :param port: restconf port
365
366         :param thread_count: ignored; only 1 thread needed
367
368         :param item_count: ignored; whole container is deleted
369
370         :param auth: authentication credentials
371
372         :param items_per_request: ignored; only 1 request needed
373
374     Returns:
375         None
376     """
377
378     logger.info("Delete all people from %s:%s", odl_ip, port)
379     _build_delete(odl_ip, port, "config/people:people")
380
381
382 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
383     """Delete car-people container from config datastore.
384
385     Args:
386         :param odl_ip: ip address of ODL
387
388         :param port: restconf port
389
390         :param thread_count: ignored; only 1 thread needed
391
392         :param item_count: ignored; whole container is deleted
393
394         :param auth: authentication credentials
395
396         :param items_per_request: ignored; only 1 request needed
397
398     Returns:
399         None
400     """
401
402     logger.info("Delete all purchases from %s:%s", odl_ip, port)
403     _build_delete(odl_ip, port, "config/car-people:car-people")
404
405
406 def _build_get(odl_ip, port, uri):
407     """Send GET to generic URI.
408
409     Args:
410         :param odl_ip: ip address of ODL
411
412         :param port: restconf port
413
414         :param uri: URI without /restconf/ to complete URL
415
416     Returns:
417         None
418
419     Note:
420          Raise AssertionError if response status code != 200
421     """
422
423     url = _build_url(odl_ip, port, uri)
424     rsp = requests.get(url, auth=auth)
425     logger.debug("%s %s", rsp.request, rsp.request.url)
426     logger.debug("Headers %s:", rsp.request.headers)
427     logger.debug("Body: %s", rsp.request.body)
428     logger.debug("Response: %s", rsp.text)
429     logger.info("%s %s", rsp, rsp.reason)
430     assert rsp.status_code == 200, rsp.text
431
432
433 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
434     """Reads car entries from config datastore.
435
436     TODO: some needed logic to be added handle http response in the future,
437           e.g. count items in response's content
438
439     Args:
440         :param odl_ip: ip address of ODL
441
442         :param port: restconf port
443
444         :param thread_count: ignored; only 1 thread needed
445
446         :param item_count: ignored; whole container is deleted
447
448         :param auth: authentication credentials
449
450         :param items_per_request: ignored; only 1 request needed
451
452     Returns:
453         None
454     """
455
456     logger.info("Get all cars from %s:%s", odl_ip, port)
457     _build_get(odl_ip, port, "config/car:cars")
458
459
460 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
461     """Reads people entries from config datastore.
462
463     TODO: some needed logic to be added handle http response in the future,
464           e.g. count items in response's content
465
466     Args:
467         :param odl_ip: ip address of ODL
468
469         :param port: restconf port
470
471         :param thread_count: ignored; only 1 thread needed
472
473         :param item_count: ignored; whole container is deleted
474
475         :param auth: authentication credentials
476
477         :param items_per_request: ignored; only 1 request needed
478
479     Returns:
480         None
481     """
482
483     logger.info("Get all people from %s:%s", odl_ip, port)
484     _build_get(odl_ip, port, "config/people:people")
485
486
487 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
488     """Reads car-people entries from config datastore.
489
490     TODO: some needed logic to be added handle http response in the future,
491           e.g. count items in response's content
492
493     Args:
494         :param odl_ip: ip address of ODL
495
496         :param port: restconf port
497
498         :param thread_count: ignored; only 1 thread needed
499
500         :param item_count: ignored; whole container is deleted
501
502         :param auth: authentication credentials
503
504         :param items_per_request: ignored; only 1 request needed
505
506     Returns:
507         None
508     """
509
510     logger.info("Get all purchases from %s:%s", odl_ip, port)
511     _build_get(odl_ip, port, "config/car-people:car-people")
512
513
514 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
515     """Configure car entries to the config datastore.
516
517     Args:
518         :param odl_ip: ip address of ODL
519
520         :param port: restconf port
521
522         :param thread_count: number of threads used to send http requests; default=1
523
524         :param item_count: number of items to be configured
525
526         :param auth: authentication credentials
527
528         :param items_per_request: items per request, not used here,
529                                   just to keep the same api
530
531     Returns:
532         None
533     """
534
535     logger.info("Add %s car(s) to %s:%s (%s per request)",
536                 item_count, odl_ip, port, items_per_request)
537     res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
538                          thread_count=thread_count, item_count=item_count,
539                          items_per_request=items_per_request, auth=auth)
540     if res.keys() != [204]:
541         logger.error("Not all cars were configured: " + repr(res))
542         raise Exception("Not all cars were configured: " + repr(res))
543
544
545 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
546     """Configure people entries to the config datastore.
547
548     Args:
549         :param odl_ip: ip address of ODL; default="127.0.0.1"
550
551         :param port: restconf port; default="8181"
552
553         :param thread_count: number of threads used to send http requests; default=1
554
555         :param item_count: number of items to be condigured
556
557         :param auth: authentication credentials
558
559         :param items_per_request: items per request, not used here,
560                                   just to keep the same api
561
562     Returns:
563         None
564     """
565
566     logger.info("Add %s people to %s:%s (%s per request)",
567                 item_count, odl_ip, port, items_per_request)
568     if items_per_request != 1:
569         logger.error("Only 1 item per request is supported, " +
570                      "you specified: {0}".format(item_count))
571         raise NotImplementedError("Only 1 item per request is supported, " +
572                                   "you specified: {0}".format(item_count))
573     res = _task_executor(_prepare_add_people_rpc, odl_ip=odl_ip, port=port,
574                          thread_count=thread_count, item_count=item_count,
575                          items_per_request=items_per_request, auth=auth)
576     if res.keys() != [200]:
577         logger.error("Not all people were configured: " + repr(res))
578         raise Exception("Not all people were configured: " + repr(res))
579
580
581 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
582                        items_per_request):
583     """Configure car-people entries to the config datastore one by one using rpc
584
585     Args:
586         :param odl_ip: ip address of ODL; default="127.0.0.1"
587
588         :param port: restconf port; default="8181"
589
590         :param thread_count: number of threads used to send http requests; default=1
591
592         :param item_count: number of items to be condigured
593
594         :param auth: authentication credentials
595
596         :param items_per_request: items per request, not used here,
597                                   just to keep the same api
598
599     Returns:
600         None
601     """
602
603     logger.info("Add %s purchase(s) to %s:%s (%s per request)",
604                 item_count, odl_ip, port, items_per_request)
605     if items_per_request != 1:
606         logger.error("Only 1 item per request is supported, " +
607                      "you specified: {0}".format(item_count))
608         raise NotImplementedError("Only 1 item per request is supported, " +
609                                   "you specified: {0}".format(item_count))
610
611     res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
612                          thread_count=thread_count, item_count=item_count,
613                          items_per_request=items_per_request, auth=auth)
614     if res.keys() != [200]:
615         logger.error("Not all rpc calls passed: " + repr(res))
616         raise Exception("Not all rpc calls passed: " + repr(res))
617
618
619 _actions = ["add", "get", "delete", "add-rpc"]
620 _items = ["car", "people", "car-people"]
621
622 _handler_matrix = {
623     "add": {"car": add_car},
624     "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
625     "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
626     "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
627 }
628
629
630 if __name__ == "__main__":
631     """
632     This program executes requested action based in given parameters
633
634     It provides "car", "people" and "car-people" crud operations.
635     """
636
637     parser = argparse.ArgumentParser(description="Cluster datastore"
638                                                  "performance test script")
639     parser.add_argument("--host", default="127.0.0.1",
640                         help="Host where odl controller is running."
641                              "Or comma separated list of hosts."
642                              "(default is 127.0.0.1)")
643     parser.add_argument("--port", default="8181",
644                         help="Port on which odl's RESTCONF is listening"
645                              "(default is 8181)")
646     parser.add_argument("--threads", type=int, default=1,
647                         help="Number of request worker threads to start in"
648                              "each cycle (default=1)")
649     parser.add_argument("action", choices=_actions, metavar="action",
650                         help="Action to be performed.")
651     parser.add_argument("--itemtype", choices=_items, default="car",
652                         help="Flows-per-Request - number of flows (batch size)"
653                              "sent in each HTTP request (default 1)")
654     parser.add_argument("--itemcount", type=int, help="Items per request",
655                         default=1)
656     parser.add_argument("--user", help="Restconf user name", default="admin")
657     parser.add_argument("--password", help="Restconf password", default="admin")
658     parser.add_argument("--ipr", type=int, help="Items per request", default=1)
659     parser.add_argument("--debug", dest="loglevel", action="store_const",
660                         const=logging.DEBUG, default=logging.INFO,
661                         help="Set log level to debug (default is error)")
662
663     args = parser.parse_args()
664
665     logger = logging.getLogger("logger")
666     log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
667     console_handler = logging.StreamHandler()
668     file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
669     console_handler.setFormatter(log_formatter)
670     file_handler.setFormatter(log_formatter)
671     logger.addHandler(console_handler)
672     logger.addHandler(file_handler)
673     logger.setLevel(args.loglevel)
674
675     auth = (args.user, args.password)
676
677     if (args.action not in _handler_matrix or
678             args.itemtype not in _handler_matrix[args.action]):
679             msg = "Unsupported combination of action: " + str(args.action)
680             msg += " and item: " + str(args.itemtype)
681             logger.error(msg)
682             raise NotImplementedError(msg)
683
684     # TODO: need to filter out situations when we cannot use more items
685     # in one rest request (rpc or delete?)
686     # this should be done inside handler functions
687
688     handler_function = _handler_matrix[args.action][args.itemtype]
689     handler_function(args.host, args.port, args.threads,
690                      args.itemcount, auth, args.ipr)