Migrate Get Requests invocations(libraries)
[integration/test.git] / tools / odl-mdsal-clustering-tests / scripts / cluster_rest_script.py
1 """
2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
4 """
5 import threading
6 import Queue
7 import requests
8 import json
9 import copy
10 import argparse
11 import logging
12 import time
13
14
15 _template_add_car = {
16     "car-entry": [
17         {
18             "id": "to be replaced",
19             "category": "my_category",
20             "model": "to be replaced",
21             "manufacturer": "my_manufacturer",
22             "year": "2015",
23         }
24     ]
25 }
26
27 _template_add_people_rpc = {
28     "input": [
29         {
30             "people:id": "to be replaced",
31             "people:gender": "male",
32             "people:age": "99",
33             "people:address": "to be replaced",
34             "people:contactNo": "to be replaced",
35         }
36     ]
37 }
38
39 _template_add_cp_rpc = {
40     "input": {
41         "car-purchase:person": "to be replaced",
42         "car-purchase:person-id": "to be replaced",
43         "car-purchase:car-id": "to be replaced",
44     }
45 }
46
47
48 def _build_url(odl_ip, port, uri):
49     """Compose URL from generic IP, port and URI fragment.
50
51     Args:
52         :param odl_ip: controller's ip address or hostname
53
54         :param port: controller's restconf port
55
56         :param uri: URI without /restconf/ to complete URL
57
58     Returns:
59         :returns url: full restconf url corresponding to params
60     """
61
62     url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
63     return url
64
65
66 def _build_post(odl_ip, port, uri, python_data, auth):
67     """Create a POST http request with generic on URI and data.
68
69     Args:
70         :param odl_ip: controller's ip address or hostname
71
72         :param port: controller's restconf port
73
74         :param uri: URI without /restconf/ to complete URL
75
76         :param python_data: python object to serialize into textual data
77
78         :param auth: authentication credentials
79
80     Returns:
81         :returns http request object
82     """
83
84     url = _build_url(odl_ip, port, uri)
85     text_data = json.dumps(python_data)
86     header = {"Content-Type": "application/json"}
87     req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
88     return req
89
90
91 def _prepare_add_car(odl_ip, port, item_list, auth):
92     """Creates a POST http requests to configure a car item in configuration datastore.
93
94     Args:
95         :param odl_ip: controller's ip address or hostname
96
97         :param port: controller's restconf port
98
99         :param item_list: controller item's list contains a list of ids of the cars
100
101         :param auth: authentication credentials
102
103     Returns:
104         :returns req: http request object
105     """
106
107     container = {"car-entry": []}
108     for item in item_list:
109         entry = copy.deepcopy(_template_add_car["car-entry"][0])
110         entry["id"] = item
111         entry["model"] = "model" + str(item)
112         container["car-entry"].append(entry)
113     req = _build_post(odl_ip, port, "config/car:cars", container, auth)
114     return req
115
116
117 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
118     """Creates a POST http requests to configure people in configuration datastore.
119
120     Args:
121         :param odl_ip: controller's ip address or hostname
122
123         :param port: controller's restconf port
124
125         :param item_list: controller item's list contains a list of ids of the people
126
127         :param auth: authentication credentials
128
129     Returns:
130         :returns req: http request object
131     """
132
133     container = {"input": {}}
134     item = item_list[0]
135     entry = container["input"]
136     entry["people:id"] = str(item)
137     entry["people:address"] = "address" + str(item)
138     entry["people:contactNo"] = str(item)
139     container["input"] = entry
140     req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
141     return req
142
143
144 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
145     """Creates a POST http requests to purchase cars using an rpc.
146
147     Args:
148         :param odl_ip: controller's ip address or hostname
149
150         :param port: controller's restconf port
151
152         :param item_list: controller item's list contains a list of ids of the people
153         only the first item is considered
154
155         :param auth: authentication credentials
156
157     Returns:
158         :returns req: http request object
159     """
160
161     container = {"input": {}}
162     item = item_list[0]
163     entry = container["input"]
164     entry["car-purchase:person"] = (
165         "/people:people/people:person[people:id='" + str(item) + "']"
166     )
167     entry["car-purchase:person-id"] = str(item)
168     entry["car-purchase:car-id"] = str(item)
169     container["input"] = entry
170     req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
171     return req
172
173
174 def _request_sender(
175     thread_id,
176     preparing_function,
177     auth,
178     in_queue=None,
179     exit_event=None,
180     odl_ip="127.0.0.1",
181     port="8181",
182     out_queue=None,
183     req_timeout=60,
184     retry_timeout=15,
185     retry_rcs=[],
186 ):
187     """The funcion sends http requests.
188
189     Runs in the working thread. It reads out flow details from the queue and
190     sends apropriate http requests to the controller
191
192     Args:
193         :param thread_id: thread id
194
195         :param preparing_function: function to prepare the http request
196
197         :param in_queue: input queue, flow details are comming from here
198
199         :param exit_event: event to notify working thread that the parent
200                            (task executor) stopped filling the input queue
201
202         :param odl_ip: ip address of ODL; default="127.0.0.1"
203
204         :param port: restconf port; default="8181"
205
206         :param out_queue: queue where the results should be put
207
208         :param req_timeout: http request timeout
209
210         :param retry_timeout: timout to give up retry attempts to send http requests
211
212         :param retry_rcs: list of return codes when retry should be performed
213
214     Returns:
215         None (results is put into the output queue)
216     """
217
218     ses = requests.Session()
219     counter = [0 for i in range(600)]
220
221     while True:
222         try:
223             item_list = in_queue.get(timeout=1)
224         except Queue.Empty:
225             if exit_event.is_set() and in_queue.empty():
226                 break
227             continue
228         req = preparing_function(odl_ip, port, item_list, auth)
229         prep = req.prepare()
230         start_time = time_now = time.time()
231         while start_time + retry_timeout > time_now:
232             try:
233                 rsp = ses.send(prep, timeout=req_timeout)
234             except requests.exceptions.Timeout:
235                 counter[99] += 1
236                 logger.error("No response from %s", odl_ip)
237                 rc = 99
238             else:
239                 counter[rsp.status_code] += 1
240                 rc = rsp.status_code
241                 lvl = logging.INFO if rc > 299 else logging.DEBUG
242                 logger.log(
243                     lvl,
244                     "Request started at {} finished with following detais".format(
245                         time.ctime(start_time)
246                     ),
247                 )
248                 logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
249                 logger.log(lvl, "Headers %s:", rsp.request.headers)
250                 logger.log(lvl, "Body: %s", rsp.request.body)
251                 logger.log(lvl, "Response: %s", rsp.text)
252                 logger.log(lvl, "%s %s", rsp, rsp.reason)
253             if rc not in retry_rcs:
254                 break
255             time_now = time.time()
256     responses = {}
257     for response_code, count in enumerate(counter):
258         if count > 0:
259             responses[response_code] = count
260     out_queue.put(responses)
261     logger.info("Response code(s) got per number of requests: %s", responses)
262
263
264 def _task_executor(
265     preparing_function,
266     odl_ip="127.0.0.1",
267     port="8181",
268     thread_count=1,
269     item_count=1,
270     items_per_request=1,
271     auth=("admin", "admin"),
272     req_timeout=600,
273     retry_timeout=15,
274     retry_rcs=[],
275 ):
276     """The main function which drives sending of http requests.
277
278     Creates 2 queues and requested number of "working threads".
279     One queue is filled with flow details and working
280     threads read them out and send http requests.
281     The other queue is for sending results from working threads back.
282     After the threads' join, it produces a summary result.
283
284     Args:
285         :param preparing_function: function to prepare http request object
286
287         :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
288
289         :param port: restconf port; default="8181"
290
291         :param thread_count: number of threads used to send http requests; default=1
292
293         :param items_per_request: items per request, number of items sent in one http request
294
295         :param item_countpr: number of items to be sent in total
296
297         :param auth: authentication credentials
298
299         :param req_timeout: http request timeout
300
301         :param retry_timeout: timout to give up retry attempts to send http requests
302
303         :param retry_rcs: list of return codes when retry should be performed
304
305     Returns:
306         :returns dict: dictionary of http response counts like
307                        {"http_status_code1: "count1", etc.}
308     """
309
310     # geting hosts
311     hosts = odl_ip.split(",")
312     nrhosts = len(hosts)
313
314     items = [i + 1 for i in range(item_count)]
315     item_groups = []
316     for i in range(0, item_count, items_per_request):
317         item_groups.append(items[i : i + items_per_request])
318
319     # fill the queue with details needed for one http requests
320     send_queue = Queue.Queue()
321     for item_list in item_groups:
322         send_queue.put(item_list)
323
324     # create an empty result queue
325     result_queue = Queue.Queue()
326     # create exit event
327     exit_event = threading.Event()
328
329     # start threads to read details from queues and to send http requests
330     threads = []
331     for i in range(int(thread_count)):
332         thr = threading.Thread(
333             target=_request_sender,
334             args=(i, preparing_function, auth),
335             kwargs={
336                 "in_queue": send_queue,
337                 "exit_event": exit_event,
338                 "odl_ip": hosts[i % nrhosts],
339                 "port": port,
340                 "out_queue": result_queue,
341                 "req_timeout": req_timeout,
342                 "retry_timeout": retry_timeout,
343                 "retry_rcs": retry_rcs,
344             },
345         )
346         threads.append(thr)
347         thr.start()
348
349     exit_event.set()
350
351     result = {}
352     # wait for reqults and sum them up
353     for t in threads:
354         t.join()
355         # read partial resutls from sender thread
356         part_result = result_queue.get()
357         for k, v in part_result.iteritems():
358             if k not in result:
359                 result[k] = v
360             else:
361                 result[k] += v
362     return result
363
364
365 def _build_delete(odl_ip, port, uri):
366     """Send DELETE to generic URI, assert status code is 200.
367
368     Args:
369         :param odl_ip: ip address of ODL
370
371         :param port: restconf port
372
373         :param uri: URI without /restconf/ to complete URL
374
375     Returns:
376         None
377
378     Note:
379          Raise AssertionError if response status code != 200
380     """
381
382     url = _build_url(odl_ip, port, uri)
383     rsp = requests.delete(url, auth=auth)
384     logger.debug("%s %s", rsp.request, rsp.request.url)
385     logger.debug("Headers %s:", rsp.request.headers)
386     logger.debug("Body: %s", rsp.request.body)
387     logger.debug("Response: %s", rsp.text)
388     logger.info("%s %s", rsp, rsp.reason)
389     assert rsp.status_code == 200, rsp.text
390
391
392 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
393     """Delete cars container from config datastore, assert success.
394
395     Args:
396         :param odl_ip: ip address of ODL
397
398         :param port: restconf port
399
400         :param thread_count: ignored; only 1 thread needed
401
402         :param item_count: ignored; whole container is deleted
403
404         :param auth: authentication credentials
405
406         :param items_per_request: ignored; only 1 request needed
407
408     Returns:
409         None
410     """
411
412     logger.info("Delete all cars from %s:%s", odl_ip, port)
413     _build_delete(odl_ip, port, "config/car:cars")
414
415
416 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
417     """Delete people container from config datastore.
418
419     Args:
420         :param odl_ip: ip address of ODL
421
422         :param port: restconf port
423
424         :param thread_count: ignored; only 1 thread needed
425
426         :param item_count: ignored; whole container is deleted
427
428         :param auth: authentication credentials
429
430         :param items_per_request: ignored; only 1 request needed
431
432     Returns:
433         None
434     """
435
436     logger.info("Delete all people from %s:%s", odl_ip, port)
437     _build_delete(odl_ip, port, "config/people:people")
438
439
440 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
441     """Delete car-people container from config datastore.
442
443     Args:
444         :param odl_ip: ip address of ODL
445
446         :param port: restconf port
447
448         :param thread_count: ignored; only 1 thread needed
449
450         :param item_count: ignored; whole container is deleted
451
452         :param auth: authentication credentials
453
454         :param items_per_request: ignored; only 1 request needed
455
456     Returns:
457         None
458     """
459
460     logger.info("Delete all purchases from %s:%s", odl_ip, port)
461     _build_delete(odl_ip, port, "config/car-people:car-people")
462
463
464 def _build_get(odl_ip, port, uri):
465     """Send GET to generic URI.
466
467     Args:
468         :param odl_ip: ip address of ODL
469
470         :param port: restconf port
471
472         :param uri: URI without /restconf/ to complete URL
473
474     Returns:
475         None
476
477     Note:
478          Raise AssertionError if response status code != 200
479     """
480
481     url = _build_url(odl_ip, port, uri)
482     rsp = requests.get(url, auth=auth)
483     logger.debug("%s %s", rsp.request, rsp.request.url)
484     logger.debug("Headers %s:", rsp.request.headers)
485     logger.debug("Body: %s", rsp.request.body)
486     logger.debug("Response: %s", rsp.text)
487     logger.info("%s %s", rsp, rsp.reason)
488     assert rsp.status_code == 200, rsp.text
489
490
491 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
492     """Reads car entries from config datastore.
493
494     TODO: some needed logic to be added handle http response in the future,
495           e.g. count items in response's content
496
497     Args:
498         :param odl_ip: ip address of ODL
499
500         :param port: restconf port
501
502         :param thread_count: ignored; only 1 thread needed
503
504         :param item_count: ignored; whole container is deleted
505
506         :param auth: authentication credentials
507
508         :param items_per_request: ignored; only 1 request needed
509
510     Returns:
511         None
512     """
513
514     logger.info("Get all cars from %s:%s", odl_ip, port)
515     _build_get(odl_ip, port, "config/car:cars")
516
517
518 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
519     """Reads people entries from config datastore.
520
521     TODO: some needed logic to be added handle http response in the future,
522           e.g. count items in response's content
523
524     Args:
525         :param odl_ip: ip address of ODL
526
527         :param port: restconf port
528
529         :param thread_count: ignored; only 1 thread needed
530
531         :param item_count: ignored; whole container is deleted
532
533         :param auth: authentication credentials
534
535         :param items_per_request: ignored; only 1 request needed
536
537     Returns:
538         None
539     """
540
541     logger.info("Get all people from %s:%s", odl_ip, port)
542     _build_get(odl_ip, port, "config/people:people")
543
544
545 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
546     """Reads car-people entries from config datastore.
547
548     TODO: some needed logic to be added handle http response in the future,
549           e.g. count items in response's content
550
551     Args:
552         :param odl_ip: ip address of ODL
553
554         :param port: restconf port
555
556         :param thread_count: ignored; only 1 thread needed
557
558         :param item_count: ignored; whole container is deleted
559
560         :param auth: authentication credentials
561
562         :param items_per_request: ignored; only 1 request needed
563
564     Returns:
565         None
566     """
567
568     logger.info("Get all purchases from %s:%s", odl_ip, port)
569     _build_get(odl_ip, port, "config/car-people:car-people")
570
571
572 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
573     """Configure car entries to the config datastore.
574
575     Args:
576         :param odl_ip: ip address of ODL
577
578         :param port: restconf port
579
580         :param thread_count: number of threads used to send http requests; default=1
581
582         :param item_count: number of items to be configured
583
584         :param auth: authentication credentials
585
586         :param items_per_request: items per request, not used here,
587                                   just to keep the same api
588
589     Returns:
590         None
591     """
592
593     logger.info(
594         "Add %s car(s) to %s:%s (%s per request)",
595         item_count,
596         odl_ip,
597         port,
598         items_per_request,
599     )
600     res = _task_executor(
601         _prepare_add_car,
602         odl_ip=odl_ip,
603         port=port,
604         thread_count=thread_count,
605         item_count=item_count,
606         items_per_request=items_per_request,
607         auth=auth,
608     )
609     if res.keys() != [204]:
610         logger.error("Not all cars were configured: " + repr(res))
611         raise Exception("Not all cars were configured: " + repr(res))
612
613
614 def add_car_with_retries(
615     odl_ip, port, thread_count, item_count, auth, items_per_request
616 ):
617     """Configure car entries to the config datastore.
618
619     Args:
620         :param odl_ip: ip address of ODL
621
622         :param port: restconf port
623
624         :param thread_count: number of threads used to send http requests; default=1
625
626         :param item_count: number of items to be configured
627
628         :param auth: authentication credentials
629
630         :param items_per_request: items per request, not used here,
631                                   just to keep the same api
632
633     Returns:
634         None
635     """
636
637     logger.info(
638         "Add %s car(s) to %s:%s (%s per request)",
639         item_count,
640         odl_ip,
641         port,
642         items_per_request,
643     )
644     retry_rcs = [401, 404, 500, 503]
645     res = _task_executor(
646         _prepare_add_car,
647         odl_ip=odl_ip,
648         port=port,
649         thread_count=thread_count,
650         item_count=item_count,
651         items_per_request=items_per_request,
652         auth=auth,
653         req_timeout=15,
654         retry_timeout=30,
655         retry_rcs=retry_rcs,
656     )
657     acceptable_rcs = [204] + retry_rcs
658     for key in res.keys():
659         if key not in acceptable_rcs:
660             logger.error("Problems during cars' configuration appeared: " + repr(res))
661             raise Exception(
662                 "Problems during cars' configuration appeared: " + repr(res)
663             )
664
665
666 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
667     """Configure people entries to the config datastore.
668
669     Args:
670         :param odl_ip: ip address of ODL; default="127.0.0.1"
671
672         :param port: restconf port; default="8181"
673
674         :param thread_count: number of threads used to send http requests; default=1
675
676         :param item_count: number of items to be condigured
677
678         :param auth: authentication credentials
679
680         :param items_per_request: items per request, not used here,
681                                   just to keep the same api
682
683     Returns:
684         None
685     """
686
687     logger.info(
688         "Add %s people to %s:%s (%s per request)",
689         item_count,
690         odl_ip,
691         port,
692         items_per_request,
693     )
694     if items_per_request != 1:
695         logger.error(
696             "Only 1 item per request is supported, "
697             + "you specified: {0}".format(item_count)
698         )
699         raise NotImplementedError(
700             "Only 1 item per request is supported, "
701             + "you specified: {0}".format(item_count)
702         )
703     res = _task_executor(
704         _prepare_add_people_rpc,
705         odl_ip=odl_ip,
706         port=port,
707         thread_count=thread_count,
708         item_count=item_count,
709         items_per_request=items_per_request,
710         auth=auth,
711     )
712     if res.keys() != [200]:
713         logger.error("Not all people were configured: " + repr(res))
714         raise Exception("Not all people were configured: " + repr(res))
715
716
717 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
718     """Configure car-people entries to the config datastore one by one using rpc
719
720     Args:
721         :param odl_ip: ip address of ODL; default="127.0.0.1"
722
723         :param port: restconf port; default="8181"
724
725         :param thread_count: number of threads used to send http requests; default=1
726
727         :param item_count: number of items to be condigured
728
729         :param auth: authentication credentials
730
731         :param items_per_request: items per request, not used here,
732                                   just to keep the same api
733
734     Returns:
735         None
736     """
737
738     logger.info(
739         "Add %s purchase(s) to %s:%s (%s per request)",
740         item_count,
741         odl_ip,
742         port,
743         items_per_request,
744     )
745     if items_per_request != 1:
746         logger.error(
747             "Only 1 item per request is supported, "
748             + "you specified: {0}".format(item_count)
749         )
750         raise NotImplementedError(
751             "Only 1 item per request is supported, "
752             + "you specified: {0}".format(item_count)
753         )
754
755     res = _task_executor(
756         _prepare_add_car_people_rpc,
757         odl_ip=odl_ip,
758         port=port,
759         thread_count=thread_count,
760         item_count=item_count,
761         items_per_request=items_per_request,
762         auth=auth,
763     )
764     if res.keys() != [200]:
765         logger.error("Not all rpc calls passed: " + repr(res))
766         raise Exception("Not all rpc calls passed: " + repr(res))
767
768
769 _actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
770 _items = ["car", "people", "car-people"]
771
772 _handler_matrix = {
773     "add": {"car": add_car},
774     "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
775     "delete": {
776         "car": delete_car,
777         "people": delete_people,
778         "car-people": delete_car_people,
779     },
780     "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
781     "add-with-retries": {"car": add_car_with_retries},
782 }
783
784
785 if __name__ == "__main__":
786     """
787     This program executes requested action based in given parameters
788
789     It provides "car", "people" and "car-people" crud operations.
790     """
791
792     parser = argparse.ArgumentParser(
793         description="Cluster datastore" "performance test script"
794     )
795     parser.add_argument(
796         "--host",
797         default="127.0.0.1",
798         help="Host where odl controller is running."
799         "Or comma separated list of hosts."
800         "(default is 127.0.0.1)",
801     )
802     parser.add_argument(
803         "--port",
804         default="8181",
805         help="Port on which odl's RESTCONF is listening" "(default is 8181)",
806     )
807     parser.add_argument(
808         "--threads",
809         type=int,
810         default=1,
811         help="Number of request worker threads to start in" "each cycle (default=1)",
812     )
813     parser.add_argument(
814         "action", choices=_actions, metavar="action", help="Action to be performed."
815     )
816     parser.add_argument(
817         "--itemtype",
818         choices=_items,
819         default="car",
820         help="Flows-per-Request - number of flows (batch size)"
821         "sent in each HTTP request (default 1)",
822     )
823     parser.add_argument("--itemcount", type=int, help="Items per request", default=1)
824     parser.add_argument("--user", help="Restconf user name", default="admin")
825     parser.add_argument("--password", help="Restconf password", default="admin")
826     parser.add_argument("--ipr", type=int, help="Items per request", default=1)
827     parser.add_argument(
828         "--debug",
829         dest="loglevel",
830         action="store_const",
831         const=logging.DEBUG,
832         default=logging.INFO,
833         help="Set log level to debug (default is error)",
834     )
835
836     args = parser.parse_args()
837
838     logger = logging.getLogger("logger")
839     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
840     console_handler = logging.StreamHandler()
841     file_handler = logging.FileHandler("cluster_rest_script.log", mode="w")
842     console_handler.setFormatter(log_formatter)
843     file_handler.setFormatter(log_formatter)
844     logger.addHandler(console_handler)
845     logger.addHandler(file_handler)
846     logger.setLevel(args.loglevel)
847
848     auth = (args.user, args.password)
849
850     if (
851         args.action not in _handler_matrix
852         or args.itemtype not in _handler_matrix[args.action]
853     ):
854         msg = "Unsupported combination of action: " + str(args.action)
855         msg += " and item: " + str(args.itemtype)
856         logger.error(msg)
857         raise NotImplementedError(msg)
858
859     # TODO: need to filter out situations when we cannot use more items
860     # in one rest request (rpc or delete?)
861     # this should be done inside handler functions
862
863     handler_function = _handler_matrix[args.action][args.itemtype]
864     handler_function(args.host, args.port, args.threads, args.itemcount, auth, args.ipr)