Fix PEP8 issues
[integration/test.git] / tools / odl-mdsal-clustering-tests / scripts / cluster_rest_script.py
1 """
2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
4 """
5 import threading
6 import Queue
7 import requests
8 import json
9 import copy
10 import argparse
11 import logging
12 import time
13
14
15 _template_add_car = {
16     "car-entry": [
17         {
18             "id": "to be replaced",
19             "category": "my_category",
20             "model": "to be replaced",
21             "manufacturer": "my_manufacturer",
22             "year": "2015"
23         }
24     ]
25 }
26
27 _template_add_people_rpc = {
28     "input": [
29         {
30             "people:id": "to be replaced",
31             "people:gender": "male",
32             "people:age": "99",
33             "people:address": "to be replaced",
34             "people:contactNo": "to be replaced"
35         }
36     ]
37 }
38
39 _template_add_cp_rpc = {
40     "input": {
41         "car-purchase:person": "to be replaced",
42         "car-purchase:person-id": "to be replaced",
43         "car-purchase:car-id": "to be replaced"
44     }
45 }
46
47
48 def _build_url(odl_ip, port, uri):
49     """Compose URL from generic IP, port and URI fragment.
50
51     Args:
52         :param odl_ip: controller's ip address or hostname
53
54         :param port: controller's restconf port
55
56         :param uri: URI without /restconf/ to complete URL
57
58     Returns:
59         :returns url: full restconf url corresponding to params
60     """
61
62     url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
63     return url
64
65
66 def _build_post(odl_ip, port, uri, python_data, auth):
67     """Create a POST http request with generic on URI and data.
68
69     Args:
70         :param odl_ip: controller's ip address or hostname
71
72         :param port: controller's restconf port
73
74         :param uri: URI without /restconf/ to complete URL
75
76         :param python_data: python object to serialize into textual data
77
78         :param auth: authentication credentials
79
80     Returns:
81         :returns http request object
82     """
83
84     url = _build_url(odl_ip, port, uri)
85     text_data = json.dumps(python_data)
86     header = {"Content-Type": "application/json"}
87     req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
88     return req
89
90
91 def _prepare_add_car(odl_ip, port, item_list, auth):
92     """Creates a POST http requests to configure a car item in configuration datastore.
93
94     Args:
95         :param odl_ip: controller's ip address or hostname
96
97         :param port: controller's restconf port
98
99         :param item_list: controller item's list contains a list of ids of the cars
100
101         :param auth: authentication credentials
102
103     Returns:
104         :returns req: http request object
105     """
106
107     container = {"car-entry": []}
108     for item in item_list:
109         entry = copy.deepcopy(_template_add_car["car-entry"][0])
110         entry["id"] = item
111         entry["model"] = "model" + str(item)
112         container["car-entry"].append(entry)
113     req = _build_post(odl_ip, port, "config/car:cars", container, auth)
114     return req
115
116
117 def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
118     """Creates a POST http requests to configure people in configuration datastore.
119
120     Args:
121         :param odl_ip: controller's ip address or hostname
122
123         :param port: controller's restconf port
124
125         :param item_list: controller item's list contains a list of ids of the people
126
127         :param auth: authentication credentials
128
129     Returns:
130         :returns req: http request object
131     """
132
133     container = {"input": {}}
134     item = item_list[0]
135     entry = container["input"]
136     entry["people:id"] = str(item)
137     entry["people:address"] = "address" + str(item)
138     entry["people:contactNo"] = str(item)
139     container["input"] = entry
140     req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
141     return req
142
143
144 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
145     """Creates a POST http requests to purchase cars using an rpc.
146
147     Args:
148         :param odl_ip: controller's ip address or hostname
149
150         :param port: controller's restconf port
151
152         :param item_list: controller item's list contains a list of ids of the people
153         only the first item is considered
154
155         :param auth: authentication credentials
156
157     Returns:
158         :returns req: http request object
159     """
160
161     container = {"input": {}}
162     item = item_list[0]
163     entry = container["input"]
164     entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
165     entry["car-purchase:person-id"] = str(item)
166     entry["car-purchase:car-id"] = str(item)
167     container["input"] = entry
168     req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
169     return req
170
171
172 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
173                     exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None,
174                     req_timeout=60, retry_timeout=15, retry_rcs=[]):
175     """The funcion sends http requests.
176
177     Runs in the working thread. It reads out flow details from the queue and
178     sends apropriate http requests to the controller
179
180     Args:
181         :param thread_id: thread id
182
183         :param preparing_function: function to prepare the http request
184
185         :param in_queue: input queue, flow details are comming from here
186
187         :param exit_event: event to notify working thread that the parent
188                            (task executor) stopped filling the input queue
189
190         :param odl_ip: ip address of ODL; default="127.0.0.1"
191
192         :param port: restconf port; default="8181"
193
194         :param out_queue: queue where the results should be put
195
196         :param req_timeout: http request timeout
197
198         :param retry_timeout: timout to give up retry attempts to send http requests
199
200         :param retry_rcs: list of return codes when retry should be performed
201
202     Returns:
203         None (results is put into the output queue)
204     """
205
206     ses = requests.Session()
207     counter = [0 for i in range(600)]
208
209     while True:
210         try:
211             item_list = in_queue.get(timeout=1)
212         except Queue.Empty:
213             if exit_event.is_set() and in_queue.empty():
214                 break
215             continue
216         req = preparing_function(odl_ip, port, item_list, auth)
217         prep = req.prepare()
218         start_time = time_now = time.time()
219         while start_time + retry_timeout > time_now:
220             try:
221                 rsp = ses.send(prep, timeout=req_timeout)
222             except requests.exceptions.Timeout:
223                 counter[99] += 1
224                 logger.error("No response from %s", odl_ip)
225                 rc = 99
226             else:
227                 logger.debug("%s %s", rsp.request, rsp.request.url)
228                 logger.debug("Headers %s:", rsp.request.headers)
229                 logger.debug("Body: %s", rsp.request.body)
230                 logger.debug("Response: %s", rsp.text)
231                 logger.debug("%s %s", rsp, rsp.reason)
232                 counter[rsp.status_code] += 1
233                 rc = rsp.status_code
234             if rc not in retry_rcs:
235                 break
236             time_now = time.time()
237     responses = {}
238     for response_code, count in enumerate(counter):
239         if count > 0:
240             responses[response_code] = count
241     out_queue.put(responses)
242     logger.info("Response code(s) got per number of requests: %s", responses)
243
244
245 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
246                    thread_count=1, item_count=1, items_per_request=1,
247                    auth=('admin', 'admin'), req_timeout=600, retry_timeout=15, retry_rcs=[]):
248     """The main function which drives sending of http requests.
249
250     Creates 2 queues and requested number of "working threads".
251     One queue is filled with flow details and working
252     threads read them out and send http requests.
253     The other queue is for sending results from working threads back.
254     After the threads' join, it produces a summary result.
255
256     Args:
257         :param preparing_function: function to prepare http request object
258
259         :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
260
261         :param port: restconf port; default="8181"
262
263         :param thread_count: number of threads used to send http requests; default=1
264
265         :param items_per_request: items per request, number of items sent in one http request
266
267         :param item_countpr: number of items to be sent in total
268
269         :param auth: authentication credentials
270
271         :param req_timeout: http request timeout
272
273         :param retry_timeout: timout to give up retry attempts to send http requests
274
275         :param retry_rcs: list of return codes when retry should be performed
276
277     Returns:
278         :returns dict: dictionary of http response counts like
279                        {"http_status_code1: "count1", etc.}
280     """
281
282     # geting hosts
283     hosts = odl_ip.split(',')
284     nrhosts = len(hosts)
285
286     items = [i + 1 for i in range(item_count)]
287     item_groups = []
288     for i in range(0, item_count, items_per_request):
289         item_groups.append(items[i:i + items_per_request])
290
291     # fill the queue with details needed for one http requests
292     send_queue = Queue.Queue()
293     for item_list in item_groups:
294         send_queue.put(item_list)
295
296     # create an empty result queue
297     result_queue = Queue.Queue()
298     # create exit event
299     exit_event = threading.Event()
300
301     # start threads to read details from queues and to send http requests
302     threads = []
303     for i in range(int(thread_count)):
304         thr = threading.Thread(target=_request_sender,
305                                args=(i, preparing_function, auth),
306                                kwargs={"in_queue": send_queue, "exit_event": exit_event,
307                                        "odl_ip": hosts[i % nrhosts], "port": port,
308                                        "out_queue": result_queue, "req_timeout": req_timeout,
309                                        "retry_timeout": retry_timeout, "retry_rcs": retry_rcs})
310         threads.append(thr)
311         thr.start()
312
313     exit_event.set()
314
315     result = {}
316     # wait for reqults and sum them up
317     for t in threads:
318         t.join()
319         # read partial resutls from sender thread
320         part_result = result_queue.get()
321         for k, v in part_result.iteritems():
322             if k not in result:
323                 result[k] = v
324             else:
325                 result[k] += v
326     return result
327
328
329 def _build_delete(odl_ip, port, uri):
330     """Send DELETE to generic URI, assert status code is 200.
331
332     Args:
333         :param odl_ip: ip address of ODL
334
335         :param port: restconf port
336
337         :param uri: URI without /restconf/ to complete URL
338
339     Returns:
340         None
341
342     Note:
343          Raise AssertionError if response status code != 200
344     """
345
346     url = _build_url(odl_ip, port, uri)
347     rsp = requests.delete(url, auth=auth)
348     logger.debug("%s %s", rsp.request, rsp.request.url)
349     logger.debug("Headers %s:", rsp.request.headers)
350     logger.debug("Body: %s", rsp.request.body)
351     logger.debug("Response: %s", rsp.text)
352     logger.info("%s %s", rsp, rsp.reason)
353     assert rsp.status_code == 200, rsp.text
354
355
356 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
357     """Delete cars container from config datastore, assert success.
358
359     Args:
360         :param odl_ip: ip address of ODL
361
362         :param port: restconf port
363
364         :param thread_count: ignored; only 1 thread needed
365
366         :param item_count: ignored; whole container is deleted
367
368         :param auth: authentication credentials
369
370         :param items_per_request: ignored; only 1 request needed
371
372     Returns:
373         None
374     """
375
376     logger.info("Delete all cars from %s:%s", odl_ip, port)
377     _build_delete(odl_ip, port, "config/car:cars")
378
379
380 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
381     """Delete people container from config datastore.
382
383     Args:
384         :param odl_ip: ip address of ODL
385
386         :param port: restconf port
387
388         :param thread_count: ignored; only 1 thread needed
389
390         :param item_count: ignored; whole container is deleted
391
392         :param auth: authentication credentials
393
394         :param items_per_request: ignored; only 1 request needed
395
396     Returns:
397         None
398     """
399
400     logger.info("Delete all people from %s:%s", odl_ip, port)
401     _build_delete(odl_ip, port, "config/people:people")
402
403
404 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
405     """Delete car-people container from config datastore.
406
407     Args:
408         :param odl_ip: ip address of ODL
409
410         :param port: restconf port
411
412         :param thread_count: ignored; only 1 thread needed
413
414         :param item_count: ignored; whole container is deleted
415
416         :param auth: authentication credentials
417
418         :param items_per_request: ignored; only 1 request needed
419
420     Returns:
421         None
422     """
423
424     logger.info("Delete all purchases from %s:%s", odl_ip, port)
425     _build_delete(odl_ip, port, "config/car-people:car-people")
426
427
428 def _build_get(odl_ip, port, uri):
429     """Send GET to generic URI.
430
431     Args:
432         :param odl_ip: ip address of ODL
433
434         :param port: restconf port
435
436         :param uri: URI without /restconf/ to complete URL
437
438     Returns:
439         None
440
441     Note:
442          Raise AssertionError if response status code != 200
443     """
444
445     url = _build_url(odl_ip, port, uri)
446     rsp = requests.get(url, auth=auth)
447     logger.debug("%s %s", rsp.request, rsp.request.url)
448     logger.debug("Headers %s:", rsp.request.headers)
449     logger.debug("Body: %s", rsp.request.body)
450     logger.debug("Response: %s", rsp.text)
451     logger.info("%s %s", rsp, rsp.reason)
452     assert rsp.status_code == 200, rsp.text
453
454
455 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
456     """Reads car entries from config datastore.
457
458     TODO: some needed logic to be added handle http response in the future,
459           e.g. count items in response's content
460
461     Args:
462         :param odl_ip: ip address of ODL
463
464         :param port: restconf port
465
466         :param thread_count: ignored; only 1 thread needed
467
468         :param item_count: ignored; whole container is deleted
469
470         :param auth: authentication credentials
471
472         :param items_per_request: ignored; only 1 request needed
473
474     Returns:
475         None
476     """
477
478     logger.info("Get all cars from %s:%s", odl_ip, port)
479     _build_get(odl_ip, port, "config/car:cars")
480
481
482 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
483     """Reads people entries from config datastore.
484
485     TODO: some needed logic to be added handle http response in the future,
486           e.g. count items in response's content
487
488     Args:
489         :param odl_ip: ip address of ODL
490
491         :param port: restconf port
492
493         :param thread_count: ignored; only 1 thread needed
494
495         :param item_count: ignored; whole container is deleted
496
497         :param auth: authentication credentials
498
499         :param items_per_request: ignored; only 1 request needed
500
501     Returns:
502         None
503     """
504
505     logger.info("Get all people from %s:%s", odl_ip, port)
506     _build_get(odl_ip, port, "config/people:people")
507
508
509 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
510     """Reads car-people entries from config datastore.
511
512     TODO: some needed logic to be added handle http response in the future,
513           e.g. count items in response's content
514
515     Args:
516         :param odl_ip: ip address of ODL
517
518         :param port: restconf port
519
520         :param thread_count: ignored; only 1 thread needed
521
522         :param item_count: ignored; whole container is deleted
523
524         :param auth: authentication credentials
525
526         :param items_per_request: ignored; only 1 request needed
527
528     Returns:
529         None
530     """
531
532     logger.info("Get all purchases from %s:%s", odl_ip, port)
533     _build_get(odl_ip, port, "config/car-people:car-people")
534
535
536 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
537     """Configure car entries to the config datastore.
538
539     Args:
540         :param odl_ip: ip address of ODL
541
542         :param port: restconf port
543
544         :param thread_count: number of threads used to send http requests; default=1
545
546         :param item_count: number of items to be configured
547
548         :param auth: authentication credentials
549
550         :param items_per_request: items per request, not used here,
551                                   just to keep the same api
552
553     Returns:
554         None
555     """
556
557     logger.info("Add %s car(s) to %s:%s (%s per request)",
558                 item_count, odl_ip, port, items_per_request)
559     res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
560                          thread_count=thread_count, item_count=item_count,
561                          items_per_request=items_per_request, auth=auth)
562     if res.keys() != [204]:
563         logger.error("Not all cars were configured: " + repr(res))
564         raise Exception("Not all cars were configured: " + repr(res))
565
566
567 def add_car_with_retries(odl_ip, port, thread_count, item_count, auth, items_per_request):
568     """Configure car entries to the config datastore.
569
570     Args:
571         :param odl_ip: ip address of ODL
572
573         :param port: restconf port
574
575         :param thread_count: number of threads used to send http requests; default=1
576
577         :param item_count: number of items to be configured
578
579         :param auth: authentication credentials
580
581         :param items_per_request: items per request, not used here,
582                                   just to keep the same api
583
584     Returns:
585         None
586     """
587
588     logger.info("Add %s car(s) to %s:%s (%s per request)",
589                 item_count, odl_ip, port, items_per_request)
590     retry_rcs = [401, 404, 503]
591     res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
592                          thread_count=thread_count, item_count=item_count,
593                          items_per_request=items_per_request, auth=auth,
594                          req_timeout=15, retry_timeout=30, retry_rcs=retry_rcs)
595     acceptable_rcs = [204] + retry_rcs
596     for key in res.keys():
597         if key not in acceptable_rcs:
598             logger.error("Problems during cars' configuration appeared: " + repr(res))
599             raise Exception("Problems during cars' configuration appeared: " + repr(res))
600
601
602 def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
603     """Configure people entries to the config datastore.
604
605     Args:
606         :param odl_ip: ip address of ODL; default="127.0.0.1"
607
608         :param port: restconf port; default="8181"
609
610         :param thread_count: number of threads used to send http requests; default=1
611
612         :param item_count: number of items to be condigured
613
614         :param auth: authentication credentials
615
616         :param items_per_request: items per request, not used here,
617                                   just to keep the same api
618
619     Returns:
620         None
621     """
622
623     logger.info("Add %s people to %s:%s (%s per request)",
624                 item_count, odl_ip, port, items_per_request)
625     if items_per_request != 1:
626         logger.error("Only 1 item per request is supported, " +
627                      "you specified: {0}".format(item_count))
628         raise NotImplementedError("Only 1 item per request is supported, " +
629                                   "you specified: {0}".format(item_count))
630     res = _task_executor(_prepare_add_people_rpc, odl_ip=odl_ip, port=port,
631                          thread_count=thread_count, item_count=item_count,
632                          items_per_request=items_per_request, auth=auth)
633     if res.keys() != [200]:
634         logger.error("Not all people were configured: " + repr(res))
635         raise Exception("Not all people were configured: " + repr(res))
636
637
638 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
639                        items_per_request):
640     """Configure car-people entries to the config datastore one by one using rpc
641
642     Args:
643         :param odl_ip: ip address of ODL; default="127.0.0.1"
644
645         :param port: restconf port; default="8181"
646
647         :param thread_count: number of threads used to send http requests; default=1
648
649         :param item_count: number of items to be condigured
650
651         :param auth: authentication credentials
652
653         :param items_per_request: items per request, not used here,
654                                   just to keep the same api
655
656     Returns:
657         None
658     """
659
660     logger.info("Add %s purchase(s) to %s:%s (%s per request)",
661                 item_count, odl_ip, port, items_per_request)
662     if items_per_request != 1:
663         logger.error("Only 1 item per request is supported, " +
664                      "you specified: {0}".format(item_count))
665         raise NotImplementedError("Only 1 item per request is supported, " +
666                                   "you specified: {0}".format(item_count))
667
668     res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
669                          thread_count=thread_count, item_count=item_count,
670                          items_per_request=items_per_request, auth=auth)
671     if res.keys() != [200]:
672         logger.error("Not all rpc calls passed: " + repr(res))
673         raise Exception("Not all rpc calls passed: " + repr(res))
674
675
676 _actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
677 _items = ["car", "people", "car-people"]
678
679 _handler_matrix = {
680     "add": {"car": add_car},
681     "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
682     "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
683     "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
684     "add-with-retries": {"car": add_car_with_retries},
685 }
686
687
688 if __name__ == "__main__":
689     """
690     This program executes requested action based in given parameters
691
692     It provides "car", "people" and "car-people" crud operations.
693     """
694
695     parser = argparse.ArgumentParser(description="Cluster datastore"
696                                                  "performance test script")
697     parser.add_argument("--host", default="127.0.0.1",
698                         help="Host where odl controller is running."
699                              "Or comma separated list of hosts."
700                              "(default is 127.0.0.1)")
701     parser.add_argument("--port", default="8181",
702                         help="Port on which odl's RESTCONF is listening"
703                              "(default is 8181)")
704     parser.add_argument("--threads", type=int, default=1,
705                         help="Number of request worker threads to start in"
706                              "each cycle (default=1)")
707     parser.add_argument("action", choices=_actions, metavar="action",
708                         help="Action to be performed.")
709     parser.add_argument("--itemtype", choices=_items, default="car",
710                         help="Flows-per-Request - number of flows (batch size)"
711                              "sent in each HTTP request (default 1)")
712     parser.add_argument("--itemcount", type=int, help="Items per request",
713                         default=1)
714     parser.add_argument("--user", help="Restconf user name", default="admin")
715     parser.add_argument("--password", help="Restconf password", default="admin")
716     parser.add_argument("--ipr", type=int, help="Items per request", default=1)
717     parser.add_argument("--debug", dest="loglevel", action="store_const",
718                         const=logging.DEBUG, default=logging.INFO,
719                         help="Set log level to debug (default is error)")
720
721     args = parser.parse_args()
722
723     logger = logging.getLogger("logger")
724     log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
725     console_handler = logging.StreamHandler()
726     file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
727     console_handler.setFormatter(log_formatter)
728     file_handler.setFormatter(log_formatter)
729     logger.addHandler(console_handler)
730     logger.addHandler(file_handler)
731     logger.setLevel(args.loglevel)
732
733     auth = (args.user, args.password)
734
735     if (args.action not in _handler_matrix or
736             args.itemtype not in _handler_matrix[args.action]):
737         msg = "Unsupported combination of action: " + str(args.action)
738         msg += " and item: " + str(args.itemtype)
739         logger.error(msg)
740         raise NotImplementedError(msg)
741
742     # TODO: need to filter out situations when we cannot use more items
743     # in one rest request (rpc or delete?)
744     # this should be done inside handler functions
745
746     handler_function = _handler_matrix[args.action][args.itemtype]
747     handler_function(args.host, args.port, args.threads,
748                      args.itemcount, auth, args.ipr)