Updated code to match new rules
[integration/test.git] / tools / odl-mdsal-clustering-tests / scripts / cluster_rest_script.py
1 """
2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
4 """
5 import threading
6 import Queue
7 import requests
8 import json
9 import copy
10 import argparse
11 import logging
12
13
14 _template_add_car = {
15     "car-entry": [
16         {
17             "id": "to be replaced",
18             "category": "my_category",
19             "model": "to be replaced",
20             "manufacturer": "my_manufacturer",
21             "year": "2015"
22         }
23     ]
24 }
25
26 _template_add_people = {
27     "person": [
28         {
29             "id": "to be replaced",
30             "gender": "male",
31             "age": "99",
32             "address": "to be replaced",
33             "contactNo": "to be replaced"
34         }
35     ]
36 }
37
38 _template_add_cp_rpc = {
39     "input": {
40         "car-purchase:person": "to be replaced",
41         "car-purchase:person-id": "to be replaced",
42         "car-purchase:car-id": "to be replaced"
43     }
44 }
45
46
47 def _build_url(odl_ip, port, uri):
48     """Compose URL from generic IP, port and URI fragment.
49
50     Args:
51         :param odl_ip: controller's ip address or hostname
52
53         :param port: controller's restconf port
54
55         :param uri: URI without /restconf/ to complete URL
56
57     Returns:
58         :returns url: full restconf url corresponding to params
59     """
60
61     url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
62     return url
63
64
65 def _build_post(odl_ip, port, uri, python_data, auth):
66     """Create a POST http request with generic on URI and data.
67
68     Args:
69         :param odl_ip: controller's ip address or hostname
70
71         :param port: controller's restconf port
72
73         :param uri: URI without /restconf/ to complete URL
74
75         :param python_data: python object to serialize into textual data
76
77         :param auth: authentication credentials
78
79     Returns:
80         :returns http request object
81     """
82
83     url = _build_url(odl_ip, port, uri)
84     text_data = json.dumps(python_data)
85     header = {"Content-Type": "application/json"}
86     req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
87     return req
88
89
90 def _prepare_add_car(odl_ip, port, item_list, auth):
91     """Creates a POST http requests to configure a car item in configuration datastore.
92
93     Args:
94         :param odl_ip: controller's ip address or hostname
95
96         :param port: controller's restconf port
97
98         :param item_list: controller item's list contains a list of ids of the cars
99
100         :param auth: authentication credentials
101
102     Returns:
103         :returns req: http request object
104     """
105
106     container = {"car-entry": []}
107     for item in item_list:
108         entry = copy.deepcopy(_template_add_car["car-entry"][0])
109         entry["id"] = item
110         entry["model"] = "model" + str(item)
111         container["car-entry"].append(entry)
112     req = _build_post(odl_ip, port, "config/car:cars", container, auth)
113     return req
114
115
116 def _prepare_add_people(odl_ip, port, item_list, auth):
117     """Creates a POST http requests to configure people in configuration datastore.
118
119     Args:
120         :param odl_ip: controller's ip address or hostname
121
122         :param port: controller's restconf port
123
124         :param item_list: controller item's list contains a list of ids of the people
125
126         :param auth: authentication credentials
127
128     Returns:
129         :returns req: http request object
130     """
131
132     container = {"person": []}
133     for item in item_list:
134         entry = copy.deepcopy(_template_add_people["person"][0])
135         entry["id"] = str(item)
136         entry["address"] = "address" + str(item)
137         entry["contactNo"] = str(item)
138         container["person"].append(entry)
139     req = _build_post(odl_ip, port, "config/people:people", container, auth)
140     return req
141
142
143 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
144     """Creates a POST http requests to purchase cars using an rpc.
145
146     Args:
147         :param odl_ip: controller's ip address or hostname
148
149         :param port: controller's restconf port
150
151         :param item_list: controller item's list contains a list of ids of the people
152         only the first item is considered
153
154         :param auth: authentication credentials
155
156     Returns:
157         :returns req: http request object
158     """
159
160     container = {"input": {}}
161     item = item_list[0]
162     entry = container["input"]
163     entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
164     entry["car-purchase:person-id"] = str(item)
165     entry["car-purchase:car-id"] = str(item)
166     container["input"] = entry
167     req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
168     return req
169
170
171 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
172                     exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
173     """The funcion sends http requests.
174
175     Runs in the working thread. It reads out flow details from the queue and
176     sends apropriate http requests to the controller
177
178     Args:
179         :param thread_id: thread id
180
181         :param preparing_function: function to prepare the http request
182
183         :param in_queue: input queue, flow details are comming from here
184
185         :param exit_event: event to notify working thread that the parent
186                            (task executor) stopped filling the input queue
187
188         :param odl_ip: ip address of ODL; default="127.0.0.1"
189
190         :param port: restconf port; default="8181"
191
192         :param out_queue: queue where the results should be put
193
194     Returns:
195         None (results is put into the output queue)
196     """
197
198     ses = requests.Session()
199     counter = [0 for i in range(600)]
200
201     while True:
202         try:
203             item_list = in_queue.get(timeout=1)
204         except Queue.Empty:
205             if exit_event.is_set() and in_queue.empty():
206                 break
207             continue
208         req = preparing_function(odl_ip, port, item_list, auth)
209         prep = req.prepare()
210         try:
211             rsp = ses.send(prep, timeout=60)
212         except requests.exceptions.Timeout:
213             counter[99] += 1
214             logger.error("No response from %s", odl_ip)
215             continue
216         logger.debug("%s %s", rsp.request, rsp.request.url)
217         logger.debug("Headers %s:", rsp.request.headers)
218         logger.debug("Body: %s", rsp.request.body)
219         logger.debug("Response: %s", rsp.text)
220         logger.debug("%s %s", rsp, rsp.reason)
221         counter[rsp.status_code] += 1
222     responses = {}
223     for response_code, count in enumerate(counter):
224         if count > 0:
225             responses[response_code] = count
226     out_queue.put(responses)
227     logger.info("Response code(s) got per number of requests: %s", responses)
228
229
230 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
231                    thread_count=1, item_count=1, items_per_request=1,
232                    auth=('admin', 'admin')):
233     """The main function which drives sending of http requests.
234
235     Creates 2 queues and requested number of "working threads".
236     One queue is filled with flow details and working
237     threads read them out and send http requests.
238     The other queue is for sending results from working threads back.
239     After the threads' join, it produces a summary result.
240
241     Args:
242         :param preparing_function: function to prepare http request object
243
244         :param odl_ip: ip address of ODL; default="127.0.0.1"
245
246         :param port: restconf port; default="8181"
247
248         :param thread_count: number of threads used to send http requests; default=1
249
250         :param items_per_request: items per request, number of items sent in one http request
251
252         :param item_countpr: number of items to be sent in total
253
254         :param auth: authentication credentials
255
256     Returns:
257         :returns dict: dictionary of http response counts like
258                        {"http_status_code1: "count1", etc.}
259     """
260
261     items = [i+1 for i in range(item_count)]
262     item_groups = []
263     for i in range(0, item_count, items_per_request):
264         item_groups.append(items[i:i+items_per_request])
265
266     # fill the queue with details needed for one http requests
267     send_queue = Queue.Queue()
268     for item_list in item_groups:
269         send_queue.put(item_list)
270
271     # create an empty result queue
272     result_queue = Queue.Queue()
273     # create exit event
274     exit_event = threading.Event()
275
276     # start threads to read details from queues and to send http requests
277     threads = []
278     for i in range(int(thread_count)):
279         thr = threading.Thread(target=_request_sender,
280                                args=(i, preparing_function, auth),
281                                kwargs={"in_queue": send_queue, "exit_event": exit_event,
282                                        "odl_ip": odl_ip, "port": port,
283                                        "out_queue": result_queue})
284         threads.append(thr)
285         thr.start()
286
287     exit_event.set()
288
289     result = {}
290     # wait for reqults and sum them up
291     for t in threads:
292         t.join()
293         # read partial resutls from sender thread
294         part_result = result_queue.get()
295         for k, v in part_result.iteritems():
296             if k not in result:
297                 result[k] = v
298             else:
299                 result[k] += v
300     return result
301
302
303 def _build_delete(odl_ip, port, uri):
304     """Send DELETE to generic URI, assert status code is 200.
305
306     Args:
307         :param odl_ip: ip address of ODL
308
309         :param port: restconf port
310
311         :param uri: URI without /restconf/ to complete URL
312
313     Returns:
314         None
315
316     Note:
317          Raise AssertionError if response status code != 200
318     """
319
320     url = _build_url(odl_ip, port, uri)
321     rsp = requests.delete(url, auth=auth)
322     logger.debug("%s %s", rsp.request, rsp.request.url)
323     logger.debug("Headers %s:", rsp.request.headers)
324     logger.debug("Body: %s", rsp.request.body)
325     logger.debug("Response: %s", rsp.text)
326     logger.info("%s %s", rsp, rsp.reason)
327     assert rsp.status_code == 200, rsp.text
328
329
330 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
331     """Delete cars container from config datastore, assert success.
332
333     Args:
334         :param odl_ip: ip address of ODL
335
336         :param port: restconf port
337
338         :param thread_count: ignored; only 1 thread needed
339
340         :param item_count: ignored; whole container is deleted
341
342         :param auth: authentication credentials
343
344         :param items_per_request: ignored; only 1 request needed
345
346     Returns:
347         None
348     """
349
350     logger.info("Delete all cars from %s:%s", odl_ip, port)
351     _build_delete(odl_ip, port, "config/car:cars")
352
353
354 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
355     """Delete people container from config datastore.
356
357     Args:
358         :param odl_ip: ip address of ODL
359
360         :param port: restconf port
361
362         :param thread_count: ignored; only 1 thread needed
363
364         :param item_count: ignored; whole container is deleted
365
366         :param auth: authentication credentials
367
368         :param items_per_request: ignored; only 1 request needed
369
370     Returns:
371         None
372     """
373
374     logger.info("Delete all people from %s:%s", odl_ip, port)
375     _build_delete(odl_ip, port, "config/people:people")
376
377
378 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
379     """Delete car-people container from config datastore.
380
381     Args:
382         :param odl_ip: ip address of ODL
383
384         :param port: restconf port
385
386         :param thread_count: ignored; only 1 thread needed
387
388         :param item_count: ignored; whole container is deleted
389
390         :param auth: authentication credentials
391
392         :param items_per_request: ignored; only 1 request needed
393
394     Returns:
395         None
396     """
397
398     logger.info("Delete all purchases from %s:%s", odl_ip, port)
399     _build_delete(odl_ip, port, "config/car-people:car-people")
400
401
402 def _build_get(odl_ip, port, uri):
403     """Send GET to generic URI.
404
405     Args:
406         :param odl_ip: ip address of ODL
407
408         :param port: restconf port
409
410         :param uri: URI without /restconf/ to complete URL
411
412     Returns:
413         None
414
415     Note:
416          Raise AssertionError if response status code != 200
417     """
418
419     url = _build_url(odl_ip, port, uri)
420     rsp = requests.get(url, auth=auth)
421     logger.debug("%s %s", rsp.request, rsp.request.url)
422     logger.debug("Headers %s:", rsp.request.headers)
423     logger.debug("Body: %s", rsp.request.body)
424     logger.debug("Response: %s", rsp.text)
425     logger.info("%s %s", rsp, rsp.reason)
426     assert rsp.status_code == 200, rsp.text
427
428
429 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
430     """Reads car entries from config datastore.
431
432     TODO: some needed logic to be added handle http response in the future,
433           e.g. count items in response's content
434
435     Args:
436         :param odl_ip: ip address of ODL
437
438         :param port: restconf port
439
440         :param thread_count: ignored; only 1 thread needed
441
442         :param item_count: ignored; whole container is deleted
443
444         :param auth: authentication credentials
445
446         :param items_per_request: ignored; only 1 request needed
447
448     Returns:
449         None
450     """
451
452     logger.info("Get all cars from %s:%s", odl_ip, port)
453     _build_get(odl_ip, port, "config/car:cars")
454
455
456 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
457     """Reads people entries from config datastore.
458
459     TODO: some needed logic to be added handle http response in the future,
460           e.g. count items in response's content
461
462     Args:
463         :param odl_ip: ip address of ODL
464
465         :param port: restconf port
466
467         :param thread_count: ignored; only 1 thread needed
468
469         :param item_count: ignored; whole container is deleted
470
471         :param auth: authentication credentials
472
473         :param items_per_request: ignored; only 1 request needed
474
475     Returns:
476         None
477     """
478
479     logger.info("Get all people from %s:%s", odl_ip, port)
480     _build_get(odl_ip, port, "config/people:people")
481
482
483 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
484     """Reads car-people entries from config datastore.
485
486     TODO: some needed logic to be added handle http response in the future,
487           e.g. count items in response's content
488
489     Args:
490         :param odl_ip: ip address of ODL
491
492         :param port: restconf port
493
494         :param thread_count: ignored; only 1 thread needed
495
496         :param item_count: ignored; whole container is deleted
497
498         :param auth: authentication credentials
499
500         :param items_per_request: ignored; only 1 request needed
501
502     Returns:
503         None
504     """
505
506     logger.info("Get all purchases from %s:%s", odl_ip, port)
507     _build_get(odl_ip, port, "config/car-people:car-people")
508
509
510 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
511     """Configure car entries to the config datastore.
512
513     Args:
514         :param odl_ip: ip address of ODL
515
516         :param port: restconf port
517
518         :param thread_count: number of threads used to send http requests; default=1
519
520         :param item_count: number of items to be configured
521
522         :param auth: authentication credentials
523
524         :param items_per_request: items per request, not used here,
525                                   just to keep the same api
526
527     Returns:
528         None
529     """
530
531     logger.info("Add %s car(s) to %s:%s (%s per request)",
532                 item_count, odl_ip, port, items_per_request)
533     res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
534                          thread_count=thread_count, item_count=item_count,
535                          items_per_request=items_per_request, auth=auth)
536     if res.keys() != [204]:
537         logger.error("Not all cars were configured: " + repr(res))
538         raise Exception("Not all cars were configured: " + repr(res))
539
540
541 def add_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
542     """Configure people entries to the config datastore.
543
544     Args:
545         :param odl_ip: ip address of ODL; default="127.0.0.1"
546
547         :param port: restconf port; default="8181"
548
549         :param thread_count: number of threads used to send http requests; default=1
550
551         :param item_count: number of items to be condigured
552
553         :param auth: authentication credentials
554
555         :param items_per_request: items per request, not used here,
556                                   just to keep the same api
557
558     Returns:
559         None
560     """
561
562     logger.info("Add %s people to %s:%s (%s per request)",
563                 item_count, odl_ip, port, items_per_request)
564     res = _task_executor(_prepare_add_people, odl_ip=odl_ip, port=port,
565                          thread_count=thread_count, item_count=item_count,
566                          items_per_request=items_per_request, auth=auth)
567     if res.keys() != [204]:
568         logger.error("Not all people were configured: " + repr(res))
569         raise Exception("Not all people were configured: " + repr(res))
570
571
572 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
573                        items_per_request):
574     """Configure car-people entries to the config datastore one by one using rpc
575
576     Args:
577         :param odl_ip: ip address of ODL; default="127.0.0.1"
578
579         :param port: restconf port; default="8181"
580
581         :param thread_count: number of threads used to send http requests; default=1
582
583         :param item_count: number of items to be condigured
584
585         :param auth: authentication credentials
586
587         :param items_per_request: items per request, not used here,
588                                   just to keep the same api
589
590     Returns:
591         None
592     """
593
594     logger.info("Add %s purchase(s) to %s:%s (%s per request)",
595                 item_count, odl_ip, port, items_per_request)
596     if items_per_request != 1:
597         logger.error("Only 1 item per request is supported, " +
598                      "you specified: {0}".format(item_count))
599         raise NotImplementedError("Only 1 item per request is supported, " +
600                                   "you specified: {0}".format(item_count))
601
602     res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
603                          thread_count=thread_count, item_count=item_count,
604                          items_per_request=items_per_request, auth=auth)
605     if res.keys() != [204]:
606         logger.error("Not all rpc calls passed: " + repr(res))
607         raise Exception("Not all rpc calls passed: " + repr(res))
608
609
610 _actions = ["add", "get", "delete", "add-rpc"]
611 _items = ["car", "people", "car-people"]
612
613 _handler_matrix = {
614     "add": {"car": add_car, "people": add_people},
615     "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
616     "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
617     "add-rpc": {"car-people": add_car_people_rpc},
618 }
619
620
621 if __name__ == "__main__":
622     """
623     This program executes requested action based in given parameters
624
625     It provides "car", "people" and "car-people" crud operations.
626     """
627
628     parser = argparse.ArgumentParser(description="Cluster datastore"
629                                                  "performance test script")
630     parser.add_argument("--host", default="127.0.0.1",
631                         help="Host where odl controller is running"
632                              "(default is 127.0.0.1)")
633     parser.add_argument("--port", default="8181",
634                         help="Port on which odl's RESTCONF is listening"
635                              "(default is 8181)")
636     parser.add_argument("--threads", type=int, default=1,
637                         help="Number of request worker threads to start in"
638                              "each cycle (default=1)")
639     parser.add_argument("action", choices=_actions, metavar="action",
640                         help="Action to be performed.")
641     parser.add_argument("--itemtype", choices=_items, default="car",
642                         help="Flows-per-Request - number of flows (batch size)"
643                              "sent in each HTTP request (default 1)")
644     parser.add_argument("--itemcount", type=int, help="Items per request",
645                         default=1)
646     parser.add_argument("--user", help="Restconf user name", default="admin")
647     parser.add_argument("--password", help="Restconf password", default="admin")
648     parser.add_argument("--ipr", type=int, help="Items per request", default=1)
649     parser.add_argument("--debug", dest="loglevel", action="store_const",
650                         const=logging.DEBUG, default=logging.INFO,
651                         help="Set log level to debug (default is error)")
652
653     args = parser.parse_args()
654
655     logger = logging.getLogger("logger")
656     log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
657     console_handler = logging.StreamHandler()
658     file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
659     console_handler.setFormatter(log_formatter)
660     file_handler.setFormatter(log_formatter)
661     logger.addHandler(console_handler)
662     logger.addHandler(file_handler)
663     logger.setLevel(args.loglevel)
664
665     auth = (args.user, args.password)
666
667     if (args.action not in _handler_matrix or
668             args.itemtype not in _handler_matrix[args.action]):
669             msg = "Unsupported combination of action: " + str(args.action)
670             msg += " and item: " + str(args.itemtype)
671             logger.error(msg)
672             raise NotImplementedError(msg)
673
674     # TODO: need to filter out situations when we cannot use more items
675     # in one rest request (rpc or delete?)
676     # this should be done inside handler functions
677
678     handler_function = _handler_matrix[args.action][args.itemtype]
679     handler_function(args.host, args.port, args.threads,
680                      args.itemcount, auth, args.ipr)