7a6d060a964ba8648b3a0bd9111eba173d1253d7
[integration/test.git] / tools / odl-mdsal-clustering-tests / scripts / cluster_rest_script.py
1 """
2 The purpose of this script is the ability to perform crud operations over
3 the car-people data model.
4 """
5 import threading
6 import Queue
7 import requests
8 import json
9 import copy
10 import argparse
11 import logging
12
13
14 _template_add_car = {
15     "car-entry": [
16         {
17             "id": "to be replaced",
18             "category": "my_category",
19             "model": "to be replaced",
20             "manufacturer": "my_manufacturer",
21             "year": "2015"
22         }
23     ]
24 }
25
26 _template_add_people = {
27     "person": [
28         {
29             "id": "to be replaced",
30             "gender": "male",
31             "age": "99",
32             "address": "to be replaced",
33             "contactNo": "to be replaced"
34         }
35     ]
36 }
37
38 _template_add_cp_rpc = {
39     "input": {
40         "car-purchase:person": "to be replaced",
41         "car-purchase:person-id": "to be replaced",
42         "car-purchase:car-id": "to be replaced"
43     }
44 }
45
46
47 def _build_url(odl_ip, port, uri):
48     """Compose URL from generic IP, port and URI fragment.
49
50     Args:
51         :param odl_ip: controller's ip address or hostname
52
53         :param port: controller's restconf port
54
55         :param uri: URI without /restconf/ to complete URL
56
57     Returns:
58         :returns url: full restconf url corresponding to params
59     """
60
61     url = "http://" + odl_ip + ":" + port + "/restconf/" + uri
62     return url
63
64
65 def _build_post(odl_ip, port, uri, python_data, auth):
66     """Create a POST http request with generic on URI and data.
67
68     Args:
69         :param odl_ip: controller's ip address or hostname
70
71         :param port: controller's restconf port
72
73         :param uri: URI without /restconf/ to complete URL
74
75         :param python_data: python object to serialize into textual data
76
77         :param auth: authentication credentials
78
79     Returns:
80         :returns http request object
81     """
82
83     url = _build_url(odl_ip, port, uri)
84     text_data = json.dumps(python_data)
85     header = {"Content-Type": "application/json"}
86     req = requests.Request("POST", url, headers=header, data=text_data, auth=auth)
87     return req
88
89
90 def _prepare_add_car(odl_ip, port, item_list, auth):
91     """Creates a POST http requests to configure a car item in configuration datastore.
92
93     Args:
94         :param odl_ip: controller's ip address or hostname
95
96         :param port: controller's restconf port
97
98         :param item_list: controller item's list contains a list of ids of the cars
99
100         :param auth: authentication credentials
101
102     Returns:
103         :returns req: http request object
104     """
105
106     container = {"car-entry": []}
107     for item in item_list:
108         entry = copy.deepcopy(_template_add_car["car-entry"][0])
109         entry["id"] = item
110         entry["model"] = "model" + str(item)
111         container["car-entry"].append(entry)
112     req = _build_post(odl_ip, port, "config/car:cars", container, auth)
113     return req
114
115
116 def _prepare_add_people(odl_ip, port, item_list, auth):
117     """Creates a POST http requests to configure people in configuration datastore.
118
119     Args:
120         :param odl_ip: controller's ip address or hostname
121
122         :param port: controller's restconf port
123
124         :param item_list: controller item's list contains a list of ids of the people
125
126         :param auth: authentication credentials
127
128     Returns:
129         :returns req: http request object
130     """
131
132     container = {"person": []}
133     for item in item_list:
134         entry = copy.deepcopy(_template_add_people["person"][0])
135         entry["id"] = str(item)
136         entry["address"] = "address" + str(item)
137         entry["contactNo"] = str(item)
138         container["person"].append(entry)
139     req = _build_post(odl_ip, port, "config/people:people", container, auth)
140     return req
141
142
143 def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
144     """Creates a POST http requests to purchase cars using an rpc.
145
146     Args:
147         :param odl_ip: controller's ip address or hostname
148
149         :param port: controller's restconf port
150
151         :param item_list: controller item's list contains a list of ids of the people
152         only the first item is considered
153
154         :param auth: authentication credentials
155
156     Returns:
157         :returns req: http request object
158     """
159
160     container = {"input": {}}
161     item = item_list[0]
162     entry = container["input"]
163     entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
164     entry["car-purchase:person-id"] = str(item)
165     entry["car-purchase:car-id"] = str(item)
166     container["input"] = entry
167     req = _build_post(odl_ip, port, "operations/car-purchase:buy-car", container, auth)
168     return req
169
170
171 def _request_sender(thread_id, preparing_function, auth, in_queue=None,
172                     exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
173     """The funcion sends http requests.
174
175     Runs in the working thread. It reads out flow details from the queue and
176     sends apropriate http requests to the controller
177
178     Args:
179         :param thread_id: thread id
180
181         :param preparing_function: function to prepare the http request
182
183         :param in_queue: input queue, flow details are comming from here
184
185         :param exit_event: event to notify working thread that the parent
186                            (task executor) stopped filling the input queue
187
188         :param odl_ip: ip address of ODL; default="127.0.0.1"
189
190         :param port: restconf port; default="8181"
191
192         :param out_queue: queue where the results should be put
193
194     Returns:
195         None (results is put into the output queue)
196     """
197
198     ses = requests.Session()
199     counter = [0 for i in range(600)]
200
201     while True:
202         try:
203             item_list = in_queue.get(timeout=1)
204         except Queue.Empty:
205             if exit_event.is_set() and in_queue.empty():
206                 break
207             continue
208         req = preparing_function(odl_ip, port, item_list, auth)
209         prep = req.prepare()
210         try:
211             rsp = ses.send(prep, timeout=60)
212         except requests.exceptions.Timeout:
213             counter[99] += 1
214             logger.error("No response from %s", odl_ip)
215             continue
216         logger.debug("%s %s", rsp.request, rsp.request.url)
217         logger.debug("Headers %s:", rsp.request.headers)
218         logger.debug("Body: %s", rsp.request.body)
219         logger.debug("Response: %s", rsp.text)
220         logger.debug("%s %s", rsp, rsp.reason)
221         counter[rsp.status_code] += 1
222     responses = {}
223     for response_code, count in enumerate(counter):
224         if count > 0:
225             responses[response_code] = count
226     out_queue.put(responses)
227     logger.info("Response code(s) got per number of requests: %s", responses)
228
229
230 def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
231                    thread_count=1, item_count=1, items_per_request=1,
232                    auth=('admin', 'admin')):
233     """The main function which drives sending of http requests.
234
235     Creates 2 queues and requested number of "working threads".
236     One queue is filled with flow details and working
237     threads read them out and send http requests.
238     The other queue is for sending results from working threads back.
239     After the threads' join, it produces a summary result.
240
241     Args:
242         :param preparing_function: function to prepare http request object
243
244         :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
245
246         :param port: restconf port; default="8181"
247
248         :param thread_count: number of threads used to send http requests; default=1
249
250         :param items_per_request: items per request, number of items sent in one http request
251
252         :param item_countpr: number of items to be sent in total
253
254         :param auth: authentication credentials
255
256     Returns:
257         :returns dict: dictionary of http response counts like
258                        {"http_status_code1: "count1", etc.}
259     """
260
261     # geting hosts
262     hosts = odl_ip.split(',')
263     nrhosts = len(hosts)
264
265     items = [i+1 for i in range(item_count)]
266     item_groups = []
267     for i in range(0, item_count, items_per_request):
268         item_groups.append(items[i:i+items_per_request])
269
270     # fill the queue with details needed for one http requests
271     send_queue = Queue.Queue()
272     for item_list in item_groups:
273         send_queue.put(item_list)
274
275     # create an empty result queue
276     result_queue = Queue.Queue()
277     # create exit event
278     exit_event = threading.Event()
279
280     # start threads to read details from queues and to send http requests
281     threads = []
282     for i in range(int(thread_count)):
283         thr = threading.Thread(target=_request_sender,
284                                args=(i, preparing_function, auth),
285                                kwargs={"in_queue": send_queue, "exit_event": exit_event,
286                                        "odl_ip": hosts[i % nrhosts], "port": port,
287                                        "out_queue": result_queue})
288         threads.append(thr)
289         thr.start()
290
291     exit_event.set()
292
293     result = {}
294     # wait for reqults and sum them up
295     for t in threads:
296         t.join()
297         # read partial resutls from sender thread
298         part_result = result_queue.get()
299         for k, v in part_result.iteritems():
300             if k not in result:
301                 result[k] = v
302             else:
303                 result[k] += v
304     return result
305
306
307 def _build_delete(odl_ip, port, uri):
308     """Send DELETE to generic URI, assert status code is 200.
309
310     Args:
311         :param odl_ip: ip address of ODL
312
313         :param port: restconf port
314
315         :param uri: URI without /restconf/ to complete URL
316
317     Returns:
318         None
319
320     Note:
321          Raise AssertionError if response status code != 200
322     """
323
324     url = _build_url(odl_ip, port, uri)
325     rsp = requests.delete(url, auth=auth)
326     logger.debug("%s %s", rsp.request, rsp.request.url)
327     logger.debug("Headers %s:", rsp.request.headers)
328     logger.debug("Body: %s", rsp.request.body)
329     logger.debug("Response: %s", rsp.text)
330     logger.info("%s %s", rsp, rsp.reason)
331     assert rsp.status_code == 200, rsp.text
332
333
334 def delete_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
335     """Delete cars container from config datastore, assert success.
336
337     Args:
338         :param odl_ip: ip address of ODL
339
340         :param port: restconf port
341
342         :param thread_count: ignored; only 1 thread needed
343
344         :param item_count: ignored; whole container is deleted
345
346         :param auth: authentication credentials
347
348         :param items_per_request: ignored; only 1 request needed
349
350     Returns:
351         None
352     """
353
354     logger.info("Delete all cars from %s:%s", odl_ip, port)
355     _build_delete(odl_ip, port, "config/car:cars")
356
357
358 def delete_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
359     """Delete people container from config datastore.
360
361     Args:
362         :param odl_ip: ip address of ODL
363
364         :param port: restconf port
365
366         :param thread_count: ignored; only 1 thread needed
367
368         :param item_count: ignored; whole container is deleted
369
370         :param auth: authentication credentials
371
372         :param items_per_request: ignored; only 1 request needed
373
374     Returns:
375         None
376     """
377
378     logger.info("Delete all people from %s:%s", odl_ip, port)
379     _build_delete(odl_ip, port, "config/people:people")
380
381
382 def delete_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
383     """Delete car-people container from config datastore.
384
385     Args:
386         :param odl_ip: ip address of ODL
387
388         :param port: restconf port
389
390         :param thread_count: ignored; only 1 thread needed
391
392         :param item_count: ignored; whole container is deleted
393
394         :param auth: authentication credentials
395
396         :param items_per_request: ignored; only 1 request needed
397
398     Returns:
399         None
400     """
401
402     logger.info("Delete all purchases from %s:%s", odl_ip, port)
403     _build_delete(odl_ip, port, "config/car-people:car-people")
404
405
406 def _build_get(odl_ip, port, uri):
407     """Send GET to generic URI.
408
409     Args:
410         :param odl_ip: ip address of ODL
411
412         :param port: restconf port
413
414         :param uri: URI without /restconf/ to complete URL
415
416     Returns:
417         None
418
419     Note:
420          Raise AssertionError if response status code != 200
421     """
422
423     url = _build_url(odl_ip, port, uri)
424     rsp = requests.get(url, auth=auth)
425     logger.debug("%s %s", rsp.request, rsp.request.url)
426     logger.debug("Headers %s:", rsp.request.headers)
427     logger.debug("Body: %s", rsp.request.body)
428     logger.debug("Response: %s", rsp.text)
429     logger.info("%s %s", rsp, rsp.reason)
430     assert rsp.status_code == 200, rsp.text
431
432
433 def get_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
434     """Reads car entries from config datastore.
435
436     TODO: some needed logic to be added handle http response in the future,
437           e.g. count items in response's content
438
439     Args:
440         :param odl_ip: ip address of ODL
441
442         :param port: restconf port
443
444         :param thread_count: ignored; only 1 thread needed
445
446         :param item_count: ignored; whole container is deleted
447
448         :param auth: authentication credentials
449
450         :param items_per_request: ignored; only 1 request needed
451
452     Returns:
453         None
454     """
455
456     logger.info("Get all cars from %s:%s", odl_ip, port)
457     _build_get(odl_ip, port, "config/car:cars")
458
459
460 def get_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
461     """Reads people entries from config datastore.
462
463     TODO: some needed logic to be added handle http response in the future,
464           e.g. count items in response's content
465
466     Args:
467         :param odl_ip: ip address of ODL
468
469         :param port: restconf port
470
471         :param thread_count: ignored; only 1 thread needed
472
473         :param item_count: ignored; whole container is deleted
474
475         :param auth: authentication credentials
476
477         :param items_per_request: ignored; only 1 request needed
478
479     Returns:
480         None
481     """
482
483     logger.info("Get all people from %s:%s", odl_ip, port)
484     _build_get(odl_ip, port, "config/people:people")
485
486
487 def get_car_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
488     """Reads car-people entries from config datastore.
489
490     TODO: some needed logic to be added handle http response in the future,
491           e.g. count items in response's content
492
493     Args:
494         :param odl_ip: ip address of ODL
495
496         :param port: restconf port
497
498         :param thread_count: ignored; only 1 thread needed
499
500         :param item_count: ignored; whole container is deleted
501
502         :param auth: authentication credentials
503
504         :param items_per_request: ignored; only 1 request needed
505
506     Returns:
507         None
508     """
509
510     logger.info("Get all purchases from %s:%s", odl_ip, port)
511     _build_get(odl_ip, port, "config/car-people:car-people")
512
513
514 def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
515     """Configure car entries to the config datastore.
516
517     Args:
518         :param odl_ip: ip address of ODL
519
520         :param port: restconf port
521
522         :param thread_count: number of threads used to send http requests; default=1
523
524         :param item_count: number of items to be configured
525
526         :param auth: authentication credentials
527
528         :param items_per_request: items per request, not used here,
529                                   just to keep the same api
530
531     Returns:
532         None
533     """
534
535     logger.info("Add %s car(s) to %s:%s (%s per request)",
536                 item_count, odl_ip, port, items_per_request)
537     res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
538                          thread_count=thread_count, item_count=item_count,
539                          items_per_request=items_per_request, auth=auth)
540     if res.keys() != [204]:
541         logger.error("Not all cars were configured: " + repr(res))
542         raise Exception("Not all cars were configured: " + repr(res))
543
544
545 def add_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
546     """Configure people entries to the config datastore.
547
548     Args:
549         :param odl_ip: ip address of ODL; default="127.0.0.1"
550
551         :param port: restconf port; default="8181"
552
553         :param thread_count: number of threads used to send http requests; default=1
554
555         :param item_count: number of items to be condigured
556
557         :param auth: authentication credentials
558
559         :param items_per_request: items per request, not used here,
560                                   just to keep the same api
561
562     Returns:
563         None
564     """
565
566     logger.info("Add %s people to %s:%s (%s per request)",
567                 item_count, odl_ip, port, items_per_request)
568     res = _task_executor(_prepare_add_people, odl_ip=odl_ip, port=port,
569                          thread_count=thread_count, item_count=item_count,
570                          items_per_request=items_per_request, auth=auth)
571     if res.keys() != [204]:
572         logger.error("Not all people were configured: " + repr(res))
573         raise Exception("Not all people were configured: " + repr(res))
574
575
576 def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
577                        items_per_request):
578     """Configure car-people entries to the config datastore one by one using rpc
579
580     Args:
581         :param odl_ip: ip address of ODL; default="127.0.0.1"
582
583         :param port: restconf port; default="8181"
584
585         :param thread_count: number of threads used to send http requests; default=1
586
587         :param item_count: number of items to be condigured
588
589         :param auth: authentication credentials
590
591         :param items_per_request: items per request, not used here,
592                                   just to keep the same api
593
594     Returns:
595         None
596     """
597
598     logger.info("Add %s purchase(s) to %s:%s (%s per request)",
599                 item_count, odl_ip, port, items_per_request)
600     if items_per_request != 1:
601         logger.error("Only 1 item per request is supported, " +
602                      "you specified: {0}".format(item_count))
603         raise NotImplementedError("Only 1 item per request is supported, " +
604                                   "you specified: {0}".format(item_count))
605
606     res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
607                          thread_count=thread_count, item_count=item_count,
608                          items_per_request=items_per_request, auth=auth)
609     if res.keys() != [204]:
610         logger.error("Not all rpc calls passed: " + repr(res))
611         raise Exception("Not all rpc calls passed: " + repr(res))
612
613
614 _actions = ["add", "get", "delete", "add-rpc"]
615 _items = ["car", "people", "car-people"]
616
617 _handler_matrix = {
618     "add": {"car": add_car, "people": add_people},
619     "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
620     "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
621     "add-rpc": {"car-people": add_car_people_rpc},
622 }
623
624
625 if __name__ == "__main__":
626     """
627     This program executes requested action based in given parameters
628
629     It provides "car", "people" and "car-people" crud operations.
630     """
631
632     parser = argparse.ArgumentParser(description="Cluster datastore"
633                                                  "performance test script")
634     parser.add_argument("--host", default="127.0.0.1",
635                         help="Host where odl controller is running."
636                              "Or comma separated list of hosts."
637                              "(default is 127.0.0.1)")
638     parser.add_argument("--port", default="8181",
639                         help="Port on which odl's RESTCONF is listening"
640                              "(default is 8181)")
641     parser.add_argument("--threads", type=int, default=1,
642                         help="Number of request worker threads to start in"
643                              "each cycle (default=1)")
644     parser.add_argument("action", choices=_actions, metavar="action",
645                         help="Action to be performed.")
646     parser.add_argument("--itemtype", choices=_items, default="car",
647                         help="Flows-per-Request - number of flows (batch size)"
648                              "sent in each HTTP request (default 1)")
649     parser.add_argument("--itemcount", type=int, help="Items per request",
650                         default=1)
651     parser.add_argument("--user", help="Restconf user name", default="admin")
652     parser.add_argument("--password", help="Restconf password", default="admin")
653     parser.add_argument("--ipr", type=int, help="Items per request", default=1)
654     parser.add_argument("--debug", dest="loglevel", action="store_const",
655                         const=logging.DEBUG, default=logging.INFO,
656                         help="Set log level to debug (default is error)")
657
658     args = parser.parse_args()
659
660     logger = logging.getLogger("logger")
661     log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
662     console_handler = logging.StreamHandler()
663     file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
664     console_handler.setFormatter(log_formatter)
665     file_handler.setFormatter(log_formatter)
666     logger.addHandler(console_handler)
667     logger.addHandler(file_handler)
668     logger.setLevel(args.loglevel)
669
670     auth = (args.user, args.password)
671
672     if (args.action not in _handler_matrix or
673             args.itemtype not in _handler_matrix[args.action]):
674             msg = "Unsupported combination of action: " + str(args.action)
675             msg += " and item: " + str(args.itemtype)
676             logger.error(msg)
677             raise NotImplementedError(msg)
678
679     # TODO: need to filter out situations when we cannot use more items
680     # in one rest request (rpc or delete?)
681     # this should be done inside handler functions
682
683     handler_function = _handler_matrix[args.action][args.itemtype]
684     handler_function(args.host, args.port, args.threads,
685                      args.itemcount, auth, args.ipr)