3731d0c516bb9ee0d12be815715d5be959d97297
[integration/test.git] / tools / odl-mdsal-clustering-tests / scripts / cluster_rest_script.py
1 """
2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
4 """
5 import threading
6 import Queue
7 import requests
8 import json
9 import copy
10 import argparse
11 import logging
12 import time
13
14
15 _template_add_car = {
16     "car-entry": [
17         {
18             "id": "to be replaced",
19             "category": "my_category",
20             "model": "to be replaced",
21             "manufacturer": "my_manufacturer",
22             "year": "2015"
23         }
24     ]
25 }
26
27 _template_add_people_rpc = {
28     "input": [
29         {
30             "people:id": "to be replaced",
31             "people:gender": "male",
32             "people:age": "99",
33             "people:address": "to be replaced",
34             "people:contactNo": "to be replaced"
35         }
36     ]
37 }
38
39 _template_add_cp_rpc = {
40     "input": {
41         "car-purchase:person": "to be replaced",
42         "car-purchase:person-id": "to be replaced",
43         "car-purchase:car-id": "to be replaced"
44     }
45 }
46
47
48 def _build_url(odl_ip, port, uri):
49     """Compose URL from generic IP, port and URI fragment.
50
51     Args:
52         :param odl_ip: controller's ip address or hostname
53
54         :param port: controller's restconf port
55
56         :param uri: URI without /restconf/ to complete URL
57
58     Returns:
59         :returns url: full restconf url corresponding to params
60     """
61
62     url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
63     return url
64
65
66 def _build_post(odl_ip, port, uri, python_data, auth):
67     """Create a POST http request with generic on URI and data.
68
69     Args:
70         :param odl_ip: controller's ip address or hostname
71
72         :param port: controller's restconf port
73
74         :param uri: URI without /restconf/ to complete URL
75
76         :param python_data: python object to serialize into textual data
77
78         :param auth: authentication credentials
79
80     Returns:
81         :returns http request object
82     """
83
84     url = _build_url(odl_ip, port, uri)
85     text_data = json.dumps(python_data)
86     header = {"Content-Type": "application/json"}
87     req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
88     return req
89
90
91 def _prepare_add_car(odl_ip, port, item_list, auth):
92     """Creates a POST http requests to configure a car item in configuration datastore.
93
94     Args:
95         :param odl_ip: controller's ip address or hostname
96
97         :param port: controller's restconf port
98
99         :param item_list: controller item's list contains a list of ids of the cars
100
101         :param auth: authentication credentials
102
103     Returns:
104         :returns req: http request object
105     """
106
107     container = {"car-entry": []}
108     for item in item_list:
109         entry = copy.deepcopy(_template_add_car["car-entry"][0])
110         entry["id"] = item
111         entry["model"] = "model" + str(item)
112         container["car-entry"].append(entry)
113     req = _build_post(odl_ip, port, "config/car:cars", container, auth)
114     return req
115
116
117 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
118     """Creates a POST http requests to configure people in configuration datastore.
119
120     Args:
121         :param odl_ip: controller's ip address or hostname
122
123         :param port: controller's restconf port
124
125         :param item_list: controller item's list contains a list of ids of the people
126
127         :param auth: authentication credentials
128
129     Returns:
130         :returns req: http request object
131     """
132
133     container = {"input": {}}
134     item = item_list[0]
135     entry = container["input"]
136     entry["people:id"] = str(item)
137     entry["people:address"] = "address" + str(item)
138     entry["people:contactNo"] = str(item)
139     container["input"] = entry
140     req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
141     return req
142
143
144 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
145     """Creates a POST http requests to purchase cars using an rpc.
146
147     Args:
148         :param odl_ip: controller's ip address or hostname
149
150         :param port: controller's restconf port
151
152         :param item_list: controller item's list contains a list of ids of the people
153         only the first item is considered
154
155         :param auth: authentication credentials
156
157     Returns:
158         :returns req: http request object
159     """
160
161     container = {"input": {}}
162     item = item_list[0]
163     entry = container["input"]
164     entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
165     entry["car-purchase:person-id"] = str(item)
166     entry["car-purchase:car-id"] = str(item)
167     container["input"] = entry
168     req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
169     return req
170
171
172 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
173                     exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None,
174                     req_timeout=60, retry_timeout=15, retry_rcs=[]):
175     """The funcion sends http requests.
176
177     Runs in the working thread. It reads out flow details from the queue and
178     sends apropriate http requests to the controller
179
180     Args:
181         :param thread_id: thread id
182
183         :param preparing_function: function to prepare the http request
184
185         :param in_queue: input queue, flow details are comming from here
186
187         :param exit_event: event to notify working thread that the parent
188                            (task executor) stopped filling the input queue
189
190         :param odl_ip: ip address of ODL; default="127.0.0.1"
191
192         :param port: restconf port; default="8181"
193
194         :param out_queue: queue where the results should be put
195
196         :param req_timeout: http request timeout
197
198         :param retry_timeout: timout to give up retry attempts to send http requests
199
200         :param retry_rcs: list of return codes when retry should be performed
201
202     Returns:
203         None (results is put into the output queue)
204     """
205
206     ses = requests.Session()
207     counter = [0 for i in range(600)]
208
209     while True:
210         try:
211             item_list = in_queue.get(timeout=1)
212         except Queue.Empty:
213             if exit_event.is_set() and in_queue.empty():
214                 break
215             continue
216         req = preparing_function(odl_ip, port, item_list, auth)
217         prep = req.prepare()
218         start_time = time_now = time.time()
219         while start_time + retry_timeout > time_now:
220             try:
221                 rsp = ses.send(prep, timeout=req_timeout)
222             except requests.exceptions.Timeout:
223                 counter[99] += 1
224                 logger.error("No response from %s", odl_ip)
225                 rc = 99
226             else:
227                 counter[rsp.status_code] += 1
228                 rc = rsp.status_code
229                 lvl = logging.INFO if rc > 299 else logging.DEBUG
230                 logger.log(lvl, "Request started at {} finished with following detais".format(time.ctime(start_time)))
231                 logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
232                 logger.log(lvl, "Headers %s:", rsp.request.headers)
233                 logger.log(lvl, "Body: %s", rsp.request.body)
234                 logger.log(lvl, "Response: %s", rsp.text)
235                 logger.log(lvl, "%s %s", rsp, rsp.reason)
236             if rc not in retry_rcs:
237                 break
238             time_now = time.time()
239     responses = {}
240     for response_code, count in enumerate(counter):
241         if count > 0:
242             responses[response_code] = count
243     out_queue.put(responses)
244     logger.info("Response code(s) got per number of requests: %s", responses)
245
246
247 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
248                    thread_count=1, item_count=1, items_per_request=1,
249                    auth=('admin', 'admin'), req_timeout=600, retry_timeout=15, retry_rcs=[]):
250     """The main function which drives sending of http requests.
251
252     Creates 2 queues and requested number of "working threads".
253     One queue is filled with flow details and working
254     threads read them out and send http requests.
255     The other queue is for sending results from working threads back.
256     After the threads' join, it produces a summary result.
257
258     Args:
259         :param preparing_function: function to prepare http request object
260
261         :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
262
263         :param port: restconf port; default="8181"
264
265         :param thread_count: number of threads used to send http requests; default=1
266
267         :param items_per_request: items per request, number of items sent in one http request
268
269         :param item_countpr: number of items to be sent in total
270
271         :param auth: authentication credentials
272
273         :param req_timeout: http request timeout
274
275         :param retry_timeout: timout to give up retry attempts to send http requests
276
277         :param retry_rcs: list of return codes when retry should be performed
278
279     Returns:
280         :returns dict: dictionary of http response counts like
281                        {"http_status_code1: "count1", etc.}
282     """
283
284     # geting hosts
285     hosts = odl_ip.split(',')
286     nrhosts = len(hosts)
287
288     items = [i + 1 for i in range(item_count)]
289     item_groups = []
290     for i in range(0, item_count, items_per_request):
291         item_groups.append(items[i:i + items_per_request])
292
293     # fill the queue with details needed for one http requests
294     send_queue = Queue.Queue()
295     for item_list in item_groups:
296         send_queue.put(item_list)
297
298     # create an empty result queue
299     result_queue = Queue.Queue()
300     # create exit event
301     exit_event = threading.Event()
302
303     # start threads to read details from queues and to send http requests
304     threads = []
305     for i in range(int(thread_count)):
306         thr = threading.Thread(target=_request_sender,
307                                args=(i, preparing_function, auth),
308                                kwargs={"in_queue": send_queue, "exit_event": exit_event,
309                                        "odl_ip": hosts[i % nrhosts], "port": port,
310                                        "out_queue": result_queue, "req_timeout": req_timeout,
311                                        "retry_timeout": retry_timeout, "retry_rcs": retry_rcs})
312         threads.append(thr)
313         thr.start()
314
315     exit_event.set()
316
317     result = {}
318     # wait for reqults and sum them up
319     for t in threads:
320         t.join()
321         # read partial resutls from sender thread
322         part_result = result_queue.get()
323         for k, v in part_result.iteritems():
324             if k not in result:
325                 result[k] = v
326             else:
327                 result[k] += v
328     return result
329
330
331 def _build_delete(odl_ip, port, uri):
332     """Send DELETE to generic URI, assert status code is 200.
333
334     Args:
335         :param odl_ip: ip address of ODL
336
337         :param port: restconf port
338
339         :param uri: URI without /restconf/ to complete URL
340
341     Returns:
342         None
343
344     Note:
345          Raise AssertionError if response status code != 200
346     """
347
348     url = _build_url(odl_ip, port, uri)
349     rsp = requests.delete(url, auth=auth)
350     logger.debug("%s %s", rsp.request, rsp.request.url)
351     logger.debug("Headers %s:", rsp.request.headers)
352     logger.debug("Body: %s", rsp.request.body)
353     logger.debug("Response: %s", rsp.text)
354     logger.info("%s %s", rsp, rsp.reason)
355     assert rsp.status_code == 200, rsp.text
356
357
358 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
359     """Delete cars container from config datastore, assert success.
360
361     Args:
362         :param odl_ip: ip address of ODL
363
364         :param port: restconf port
365
366         :param thread_count: ignored; only 1 thread needed
367
368         :param item_count: ignored; whole container is deleted
369
370         :param auth: authentication credentials
371
372         :param items_per_request: ignored; only 1 request needed
373
374     Returns:
375         None
376     """
377
378     logger.info("Delete all cars from %s:%s", odl_ip, port)
379     _build_delete(odl_ip, port, "config/car:cars")
380
381
382 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
383     """Delete people container from config datastore.
384
385     Args:
386         :param odl_ip: ip address of ODL
387
388         :param port: restconf port
389
390         :param thread_count: ignored; only 1 thread needed
391
392         :param item_count: ignored; whole container is deleted
393
394         :param auth: authentication credentials
395
396         :param items_per_request: ignored; only 1 request needed
397
398     Returns:
399         None
400     """
401
402     logger.info("Delete all people from %s:%s", odl_ip, port)
403     _build_delete(odl_ip, port, "config/people:people")
404
405
406 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
407     """Delete car-people container from config datastore.
408
409     Args:
410         :param odl_ip: ip address of ODL
411
412         :param port: restconf port
413
414         :param thread_count: ignored; only 1 thread needed
415
416         :param item_count: ignored; whole container is deleted
417
418         :param auth: authentication credentials
419
420         :param items_per_request: ignored; only 1 request needed
421
422     Returns:
423         None
424     """
425
426     logger.info("Delete all purchases from %s:%s", odl_ip, port)
427     _build_delete(odl_ip, port, "config/car-people:car-people")
428
429
430 def _build_get(odl_ip, port, uri):
431     """Send GET to generic URI.
432
433     Args:
434         :param odl_ip: ip address of ODL
435
436         :param port: restconf port
437
438         :param uri: URI without /restconf/ to complete URL
439
440     Returns:
441         None
442
443     Note:
444          Raise AssertionError if response status code != 200
445     """
446
447     url = _build_url(odl_ip, port, uri)
448     rsp = requests.get(url, auth=auth)
449     logger.debug("%s %s", rsp.request, rsp.request.url)
450     logger.debug("Headers %s:", rsp.request.headers)
451     logger.debug("Body: %s", rsp.request.body)
452     logger.debug("Response: %s", rsp.text)
453     logger.info("%s %s", rsp, rsp.reason)
454     assert rsp.status_code == 200, rsp.text
455
456
457 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
458     """Reads car entries from config datastore.
459
460     TODO: some needed logic to be added handle http response in the future,
461           e.g. count items in response's content
462
463     Args:
464         :param odl_ip: ip address of ODL
465
466         :param port: restconf port
467
468         :param thread_count: ignored; only 1 thread needed
469
470         :param item_count: ignored; whole container is deleted
471
472         :param auth: authentication credentials
473
474         :param items_per_request: ignored; only 1 request needed
475
476     Returns:
477         None
478     """
479
480     logger.info("Get all cars from %s:%s", odl_ip, port)
481     _build_get(odl_ip, port, "config/car:cars")
482
483
484 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
485     """Reads people entries from config datastore.
486
487     TODO: some needed logic to be added handle http response in the future,
488           e.g. count items in response's content
489
490     Args:
491         :param odl_ip: ip address of ODL
492
493         :param port: restconf port
494
495         :param thread_count: ignored; only 1 thread needed
496
497         :param item_count: ignored; whole container is deleted
498
499         :param auth: authentication credentials
500
501         :param items_per_request: ignored; only 1 request needed
502
503     Returns:
504         None
505     """
506
507     logger.info("Get all people from %s:%s", odl_ip, port)
508     _build_get(odl_ip, port, "config/people:people")
509
510
511 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
512     """Reads car-people entries from config datastore.
513
514     TODO: some needed logic to be added handle http response in the future,
515           e.g. count items in response's content
516
517     Args:
518         :param odl_ip: ip address of ODL
519
520         :param port: restconf port
521
522         :param thread_count: ignored; only 1 thread needed
523
524         :param item_count: ignored; whole container is deleted
525
526         :param auth: authentication credentials
527
528         :param items_per_request: ignored; only 1 request needed
529
530     Returns:
531         None
532     """
533
534     logger.info("Get all purchases from %s:%s", odl_ip, port)
535     _build_get(odl_ip, port, "config/car-people:car-people")
536
537
538 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
539     """Configure car entries to the config datastore.
540
541     Args:
542         :param odl_ip: ip address of ODL
543
544         :param port: restconf port
545
546         :param thread_count: number of threads used to send http requests; default=1
547
548         :param item_count: number of items to be configured
549
550         :param auth: authentication credentials
551
552         :param items_per_request: items per request, not used here,
553                                   just to keep the same api
554
555     Returns:
556         None
557     """
558
559     logger.info("Add %s car(s) to %s:%s (%s per request)",
560                 item_count, odl_ip, port, items_per_request)
561     res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
562                          thread_count=thread_count, item_count=item_count,
563                          items_per_request=items_per_request, auth=auth)
564     if res.keys() != [204]:
565         logger.error("Not all cars were configured: " + repr(res))
566         raise Exception("Not all cars were configured: " + repr(res))
567
568
569 def add_car_with_retries(odl_ip, port, thread_count, item_count, auth, items_per_request):
570     """Configure car entries to the config datastore.
571
572     Args:
573         :param odl_ip: ip address of ODL
574
575         :param port: restconf port
576
577         :param thread_count: number of threads used to send http requests; default=1
578
579         :param item_count: number of items to be configured
580
581         :param auth: authentication credentials
582
583         :param items_per_request: items per request, not used here,
584                                   just to keep the same api
585
586     Returns:
587         None
588     """
589
590     logger.info("Add %s car(s) to %s:%s (%s per request)",
591                 item_count, odl_ip, port, items_per_request)
592     retry_rcs = [401, 404, 500, 503]
593     res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
594                          thread_count=thread_count, item_count=item_count,
595                          items_per_request=items_per_request, auth=auth,
596                          req_timeout=15, retry_timeout=30, retry_rcs=retry_rcs)
597     acceptable_rcs = [204] + retry_rcs
598     for key in res.keys():
599         if key not in acceptable_rcs:
600             logger.error("Problems during cars' configuration appeared: " + repr(res))
601             raise Exception("Problems during cars' configuration appeared: " + repr(res))
602
603
604 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
605     """Configure people entries to the config datastore.
606
607     Args:
608         :param odl_ip: ip address of ODL; default="127.0.0.1"
609
610         :param port: restconf port; default="8181"
611
612         :param thread_count: number of threads used to send http requests; default=1
613
614         :param item_count: number of items to be condigured
615
616         :param auth: authentication credentials
617
618         :param items_per_request: items per request, not used here,
619                                   just to keep the same api
620
621     Returns:
622         None
623     """
624
625     logger.info("Add %s people to %s:%s (%s per request)",
626                 item_count, odl_ip, port, items_per_request)
627     if items_per_request != 1:
628         logger.error("Only 1 item per request is supported, " +
629                      "you specified: {0}".format(item_count))
630         raise NotImplementedError("Only 1 item per request is supported, " +
631                                   "you specified: {0}".format(item_count))
632     res = _task_executor(_prepare_add_people_rpc, odl_ip=odl_ip, port=port,
633                          thread_count=thread_count, item_count=item_count,
634                          items_per_request=items_per_request, auth=auth)
635     if res.keys() != [200]:
636         logger.error("Not all people were configured: " + repr(res))
637         raise Exception("Not all people were configured: " + repr(res))
638
639
640 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
641                        items_per_request):
642     """Configure car-people entries to the config datastore one by one using rpc
643
644     Args:
645         :param odl_ip: ip address of ODL; default="127.0.0.1"
646
647         :param port: restconf port; default="8181"
648
649         :param thread_count: number of threads used to send http requests; default=1
650
651         :param item_count: number of items to be condigured
652
653         :param auth: authentication credentials
654
655         :param items_per_request: items per request, not used here,
656                                   just to keep the same api
657
658     Returns:
659         None
660     """
661
662     logger.info("Add %s purchase(s) to %s:%s (%s per request)",
663                 item_count, odl_ip, port, items_per_request)
664     if items_per_request != 1:
665         logger.error("Only 1 item per request is supported, " +
666                      "you specified: {0}".format(item_count))
667         raise NotImplementedError("Only 1 item per request is supported, " +
668                                   "you specified: {0}".format(item_count))
669
670     res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
671                          thread_count=thread_count, item_count=item_count,
672                          items_per_request=items_per_request, auth=auth)
673     if res.keys() != [200]:
674         logger.error("Not all rpc calls passed: " + repr(res))
675         raise Exception("Not all rpc calls passed: " + repr(res))
676
677
678 _actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
679 _items = ["car", "people", "car-people"]
680
681 _handler_matrix = {
682     "add": {"car": add_car},
683     "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
684     "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
685     "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
686     "add-with-retries": {"car": add_car_with_retries},
687 }
688
689
690 if __name__ == "__main__":
691     """
692     This program executes requested action based in given parameters
693
694     It provides "car", "people" and "car-people" crud operations.
695     """
696
697     parser = argparse.ArgumentParser(description="Cluster datastore"
698                                                  "performance test script")
699     parser.add_argument("--host", default="127.0.0.1",
700                         help="Host where odl controller is running."
701                              "Or comma separated list of hosts."
702                              "(default is 127.0.0.1)")
703     parser.add_argument("--port", default="8181",
704                         help="Port on which odl's RESTCONF is listening"
705                              "(default is 8181)")
706     parser.add_argument("--threads", type=int, default=1,
707                         help="Number of request worker threads to start in"
708                              "each cycle (default=1)")
709     parser.add_argument("action", choices=_actions, metavar="action",
710                         help="Action to be performed.")
711     parser.add_argument("--itemtype", choices=_items, default="car",
712                         help="Flows-per-Request - number of flows (batch size)"
713                              "sent in each HTTP request (default 1)")
714     parser.add_argument("--itemcount", type=int, help="Items per request",
715                         default=1)
716     parser.add_argument("--user", help="Restconf user name", default="admin")
717     parser.add_argument("--password", help="Restconf password", default="admin")
718     parser.add_argument("--ipr", type=int, help="Items per request", default=1)
719     parser.add_argument("--debug", dest="loglevel", action="store_const",
720                         const=logging.DEBUG, default=logging.INFO,
721                         help="Set log level to debug (default is error)")
722
723     args = parser.parse_args()
724
725     logger = logging.getLogger("logger")
726     log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
727     console_handler = logging.StreamHandler()
728     file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
729     console_handler.setFormatter(log_formatter)
730     file_handler.setFormatter(log_formatter)
731     logger.addHandler(console_handler)
732     logger.addHandler(file_handler)
733     logger.setLevel(args.loglevel)
734
735     auth = (args.user, args.password)
736
737     if (args.action not in _handler_matrix or
738             args.itemtype not in _handler_matrix[args.action]):
739         msg = "Unsupported combination of action: " + str(args.action)
740         msg += " and item: " + str(args.itemtype)
741         logger.error(msg)
742         raise NotImplementedError(msg)
743
744     # TODO: need to filter out situations when we cannot use more items
745     # in one rest request (rpc or delete?)
746     # this should be done inside handler functions
747
748     handler_function = _handler_matrix[args.action][args.itemtype]
749     handler_function(args.host, args.port, args.threads,
750                      args.itemcount, auth, args.ipr)