Migrate 030_bgp_functional_evpn
[integration/test.git] / tools / odl-mdsal-clustering-tests / scripts / cluster_rest_script.py
index ce6342d37b23ccdf8b58059240f0dd72003a4c42..52d717053af89ea76248496a6cfb5421009c33a0 100644 (file)
@@ -9,6 +9,7 @@ import json
 import copy
 import argparse
 import logging
+import time
 
 
 _template_add_car = {
@@ -18,19 +19,19 @@ _template_add_car = {
             "category": "my_category",
             "model": "to be replaced",
             "manufacturer": "my_manufacturer",
-            "year": "2015"
+            "year": "2015",
         }
     ]
 }
 
-_template_add_people = {
-    "person": [
+_template_add_people_rpc = {
+    "input": [
         {
-            "id": "to be replaced",
-            "gender": "male",
-            "age": "99",
-            "address": "to be replaced",
-            "contactNo": "to be replaced"
+            "people:id": "to be replaced",
+            "people:gender": "male",
+            "people:age": "99",
+            "people:address": "to be replaced",
+            "people:contactNo": "to be replaced",
         }
     ]
 }
@@ -39,7 +40,7 @@ _template_add_cp_rpc = {
     "input": {
         "car-purchase:person": "to be replaced",
         "car-purchase:person-id": "to be replaced",
-        "car-purchase:car-id": "to be replaced"
+        "car-purchase:car-id": "to be replaced",
     }
 }
 
@@ -113,7 +114,7 @@ def _prepare_add_car(odl_ip, port, item_list, auth):
     return req
 
 
-def _prepare_add_people(odl_ip, port, item_list, auth):
+def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
     """Creates a POST http requests to configure people in configuration datastore.
 
     Args:
@@ -129,14 +130,14 @@ def _prepare_add_people(odl_ip, port, item_list, auth):
         :returns req: http request object
     """
 
-    container = {"person": []}
-    for item in item_list:
-        entry = copy.deepcopy(_template_add_people["person"][0])
-        entry["id"] = str(item)
-        entry["address"] = "address" + str(item)
-        entry["contactNo"] = str(item)
-        container["person"].append(entry)
-    req = _build_post(odl_ip, port, "config/people:people", container, auth)
+    container = {"input": {}}
+    item = item_list[0]
+    entry = container["input"]
+    entry["people:id"] = str(item)
+    entry["people:address"] = "address" + str(item)
+    entry["people:contactNo"] = str(item)
+    container["input"] = entry
+    req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
     return req
 
 
@@ -160,7 +161,9 @@ def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
     container = {"input": {}}
     item = item_list[0]
     entry = container["input"]
-    entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
+    entry["car-purchase:person"] = (
+        "/people:people/people:person[people:id='" + str(item) + "']"
+    )
     entry["car-purchase:person-id"] = str(item)
     entry["car-purchase:car-id"] = str(item)
     container["input"] = entry
@@ -168,8 +171,19 @@ def _prepare_add_car_people_rpc(odl_ip, port, item_list, auth):
     return req
 
 
-def _request_sender(thread_id, preparing_function, auth, in_queue=None,
-                    exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
+def _request_sender(
+    thread_id,
+    preparing_function,
+    auth,
+    in_queue=None,
+    exit_event=None,
+    odl_ip="127.0.0.1",
+    port="8181",
+    out_queue=None,
+    req_timeout=60,
+    retry_timeout=15,
+    retry_rcs=[],
+):
     """The funcion sends http requests.
 
     Runs in the working thread. It reads out flow details from the queue and
@@ -191,6 +205,12 @@ def _request_sender(thread_id, preparing_function, auth, in_queue=None,
 
         :param out_queue: queue where the results should be put
 
+        :param req_timeout: http request timeout
+
+        :param retry_timeout: timout to give up retry attempts to send http requests
+
+        :param retry_rcs: list of return codes when retry should be performed
+
     Returns:
         None (results is put into the output queue)
     """
@@ -207,18 +227,32 @@ def _request_sender(thread_id, preparing_function, auth, in_queue=None,
             continue
         req = preparing_function(odl_ip, port, item_list, auth)
         prep = req.prepare()
-        try:
-            rsp = ses.send(prep, timeout=60)
-        except requests.exceptions.Timeout:
-            counter[99] += 1
-            logger.error("No response from %s", odl_ip)
-            continue
-        logger.debug("%s %s", rsp.request, rsp.request.url)
-        logger.debug("Headers %s:", rsp.request.headers)
-        logger.debug("Body: %s", rsp.request.body)
-        logger.debug("Response: %s", rsp.text)
-        logger.debug("%s %s", rsp, rsp.reason)
-        counter[rsp.status_code] += 1
+        start_time = time_now = time.time()
+        while start_time + retry_timeout > time_now:
+            try:
+                rsp = ses.send(prep, timeout=req_timeout)
+            except requests.exceptions.Timeout:
+                counter[99] += 1
+                logger.error("No response from %s", odl_ip)
+                rc = 99
+            else:
+                counter[rsp.status_code] += 1
+                rc = rsp.status_code
+                lvl = logging.INFO if rc > 299 else logging.DEBUG
+                logger.log(
+                    lvl,
+                    "Request started at {} finished with following detais".format(
+                        time.ctime(start_time)
+                    ),
+                )
+                logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
+                logger.log(lvl, "Headers %s:", rsp.request.headers)
+                logger.log(lvl, "Body: %s", rsp.request.body)
+                logger.log(lvl, "Response: %s", rsp.text)
+                logger.log(lvl, "%s %s", rsp, rsp.reason)
+            if rc not in retry_rcs:
+                break
+            time_now = time.time()
     responses = {}
     for response_code, count in enumerate(counter):
         if count > 0:
@@ -227,9 +261,18 @@ def _request_sender(thread_id, preparing_function, auth, in_queue=None,
     logger.info("Response code(s) got per number of requests: %s", responses)
 
 
-def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
-                   thread_count=1, item_count=1, items_per_request=1,
-                   auth=('admin', 'admin')):
+def _task_executor(
+    preparing_function,
+    odl_ip="127.0.0.1",
+    port="8181",
+    thread_count=1,
+    item_count=1,
+    items_per_request=1,
+    auth=("admin", "admin"),
+    req_timeout=600,
+    retry_timeout=15,
+    retry_rcs=[],
+):
     """The main function which drives sending of http requests.
 
     Creates 2 queues and requested number of "working threads".
@@ -241,7 +284,7 @@ def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
     Args:
         :param preparing_function: function to prepare http request object
 
-        :param odl_ip: ip address of ODL; default="127.0.0.1"
+        :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
 
         :param port: restconf port; default="8181"
 
@@ -253,15 +296,25 @@ def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
 
         :param auth: authentication credentials
 
+        :param req_timeout: http request timeout
+
+        :param retry_timeout: timout to give up retry attempts to send http requests
+
+        :param retry_rcs: list of return codes when retry should be performed
+
     Returns:
         :returns dict: dictionary of http response counts like
                        {"http_status_code1: "count1", etc.}
     """
 
-    items = [i+1 for i in range(item_count)]
+    # geting hosts
+    hosts = odl_ip.split(",")
+    nrhosts = len(hosts)
+
+    items = [i + 1 for i in range(item_count)]
     item_groups = []
     for i in range(0, item_count, items_per_request):
-        item_groups.append(items[i:i+items_per_request])
+        item_groups.append(items[i : i + items_per_request])
 
     # fill the queue with details needed for one http requests
     send_queue = Queue.Queue()
@@ -276,11 +329,20 @@ def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
     # start threads to read details from queues and to send http requests
     threads = []
     for i in range(int(thread_count)):
-        thr = threading.Thread(target=_request_sender,
-                               args=(i, preparing_function, auth),
-                               kwargs={"in_queue": send_queue, "exit_event": exit_event,
-                                       "odl_ip": odl_ip, "port": port,
-                                       "out_queue": result_queue})
+        thr = threading.Thread(
+            target=_request_sender,
+            args=(i, preparing_function, auth),
+            kwargs={
+                "in_queue": send_queue,
+                "exit_event": exit_event,
+                "odl_ip": hosts[i % nrhosts],
+                "port": port,
+                "out_queue": result_queue,
+                "req_timeout": req_timeout,
+                "retry_timeout": retry_timeout,
+                "retry_rcs": retry_rcs,
+            },
+        )
         threads.append(thr)
         thr.start()
 
@@ -528,17 +590,80 @@ def add_car(odl_ip, port, thread_count, item_count, auth, items_per_request):
         None
     """
 
-    logger.info("Add %s car(s) to %s:%s (%s per request)",
-                item_count, odl_ip, port, items_per_request)
-    res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
-                         thread_count=thread_count, item_count=item_count,
-                         items_per_request=items_per_request, auth=auth)
+    logger.info(
+        "Add %s car(s) to %s:%s (%s per request)",
+        item_count,
+        odl_ip,
+        port,
+        items_per_request,
+    )
+    res = _task_executor(
+        _prepare_add_car,
+        odl_ip=odl_ip,
+        port=port,
+        thread_count=thread_count,
+        item_count=item_count,
+        items_per_request=items_per_request,
+        auth=auth,
+    )
     if res.keys() != [204]:
         logger.error("Not all cars were configured: " + repr(res))
         raise Exception("Not all cars were configured: " + repr(res))
 
 
-def add_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
+def add_car_with_retries(
+    odl_ip, port, thread_count, item_count, auth, items_per_request
+):
+    """Configure car entries to the config datastore.
+
+    Args:
+        :param odl_ip: ip address of ODL
+
+        :param port: restconf port
+
+        :param thread_count: number of threads used to send http requests; default=1
+
+        :param item_count: number of items to be configured
+
+        :param auth: authentication credentials
+
+        :param items_per_request: items per request, not used here,
+                                  just to keep the same api
+
+    Returns:
+        None
+    """
+
+    logger.info(
+        "Add %s car(s) to %s:%s (%s per request)",
+        item_count,
+        odl_ip,
+        port,
+        items_per_request,
+    )
+    retry_rcs = [401, 404, 500, 503]
+    res = _task_executor(
+        _prepare_add_car,
+        odl_ip=odl_ip,
+        port=port,
+        thread_count=thread_count,
+        item_count=item_count,
+        items_per_request=items_per_request,
+        auth=auth,
+        req_timeout=15,
+        retry_timeout=30,
+        retry_rcs=retry_rcs,
+    )
+    acceptable_rcs = [204] + retry_rcs
+    for key in res.keys():
+        if key not in acceptable_rcs:
+            logger.error("Problems during cars' configuration appeared: " + repr(res))
+            raise Exception(
+                "Problems during cars' configuration appeared: " + repr(res)
+            )
+
+
+def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
     """Configure people entries to the config datastore.
 
     Args:
@@ -559,18 +684,37 @@ def add_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
         None
     """
 
-    logger.info("Add %s people to %s:%s (%s per request)",
-                item_count, odl_ip, port, items_per_request)
-    res = _task_executor(_prepare_add_people, odl_ip=odl_ip, port=port,
-                         thread_count=thread_count, item_count=item_count,
-                         items_per_request=items_per_request, auth=auth)
-    if res.keys() != [204]:
+    logger.info(
+        "Add %s people to %s:%s (%s per request)",
+        item_count,
+        odl_ip,
+        port,
+        items_per_request,
+    )
+    if items_per_request != 1:
+        logger.error(
+            "Only 1 item per request is supported, "
+            + "you specified: {0}".format(item_count)
+        )
+        raise NotImplementedError(
+            "Only 1 item per request is supported, "
+            + "you specified: {0}".format(item_count)
+        )
+    res = _task_executor(
+        _prepare_add_people_rpc,
+        odl_ip=odl_ip,
+        port=port,
+        thread_count=thread_count,
+        item_count=item_count,
+        items_per_request=items_per_request,
+        auth=auth,
+    )
+    if res.keys() != [200]:
         logger.error("Not all people were configured: " + repr(res))
         raise Exception("Not all people were configured: " + repr(res))
 
 
-def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
-                       items_per_request):
+def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
     """Configure car-people entries to the config datastore one by one using rpc
 
     Args:
@@ -591,30 +735,50 @@ def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
         None
     """
 
-    logger.info("Add %s purchase(s) to %s:%s (%s per request)",
-                item_count, odl_ip, port, items_per_request)
+    logger.info(
+        "Add %s purchase(s) to %s:%s (%s per request)",
+        item_count,
+        odl_ip,
+        port,
+        items_per_request,
+    )
     if items_per_request != 1:
-        logger.error("Only 1 item per request is supported, " +
-                     "you specified: {0}".format(item_count))
-        raise NotImplementedError("Only 1 item per request is supported, " +
-                                  "you specified: {0}".format(item_count))
-
-    res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
-                         thread_count=thread_count, item_count=item_count,
-                         items_per_request=items_per_request, auth=auth)
-    if res.keys() != [204]:
+        logger.error(
+            "Only 1 item per request is supported, "
+            + "you specified: {0}".format(item_count)
+        )
+        raise NotImplementedError(
+            "Only 1 item per request is supported, "
+            + "you specified: {0}".format(item_count)
+        )
+
+    res = _task_executor(
+        _prepare_add_car_people_rpc,
+        odl_ip=odl_ip,
+        port=port,
+        thread_count=thread_count,
+        item_count=item_count,
+        items_per_request=items_per_request,
+        auth=auth,
+    )
+    if res.keys() != [200]:
         logger.error("Not all rpc calls passed: " + repr(res))
         raise Exception("Not all rpc calls passed: " + repr(res))
 
 
-_actions = ["add", "get", "delete", "add-rpc"]
+_actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
 _items = ["car", "people", "car-people"]
 
 _handler_matrix = {
-    "add": {"car": add_car, "people": add_people},
+    "add": {"car": add_car},
     "get": {"car": get_car, "people": get_people, "car-people": get_car_people},
-    "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
-    "add-rpc": {"car-people": add_car_people_rpc},
+    "delete": {
+        "car": delete_car,
+        "people": delete_people,
+        "car-people": delete_car_people,
+    },
+    "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
+    "add-with-retries": {"car": add_car_with_retries},
 }
 
 
@@ -625,37 +789,56 @@ if __name__ == "__main__":
     It provides "car", "people" and "car-people" crud operations.
     """
 
-    parser = argparse.ArgumentParser(description="Cluster datastore"
-                                                 "performance test script")
-    parser.add_argument("--host", default="127.0.0.1",
-                        help="Host where odl controller is running"
-                             "(default is 127.0.0.1)")
-    parser.add_argument("--port", default="8181",
-                        help="Port on which odl's RESTCONF is listening"
-                             "(default is 8181)")
-    parser.add_argument("--threads", type=int, default=1,
-                        help="Number of request worker threads to start in"
-                             "each cycle (default=1)")
-    parser.add_argument("action", choices=_actions, metavar="action",
-                        help="Action to be performed.")
-    parser.add_argument("--itemtype", choices=_items, default="car",
-                        help="Flows-per-Request - number of flows (batch size)"
-                             "sent in each HTTP request (default 1)")
-    parser.add_argument("--itemcount", type=int, help="Items per request",
-                        default=1)
+    parser = argparse.ArgumentParser(
+        description="Cluster datastore" "performance test script"
+    )
+    parser.add_argument(
+        "--host",
+        default="127.0.0.1",
+        help="Host where odl controller is running."
+        "Or comma separated list of hosts."
+        "(default is 127.0.0.1)",
+    )
+    parser.add_argument(
+        "--port",
+        default="8181",
+        help="Port on which odl's RESTCONF is listening" "(default is 8181)",
+    )
+    parser.add_argument(
+        "--threads",
+        type=int,
+        default=1,
+        help="Number of request worker threads to start in" "each cycle (default=1)",
+    )
+    parser.add_argument(
+        "action", choices=_actions, metavar="action", help="Action to be performed."
+    )
+    parser.add_argument(
+        "--itemtype",
+        choices=_items,
+        default="car",
+        help="Flows-per-Request - number of flows (batch size)"
+        "sent in each HTTP request (default 1)",
+    )
+    parser.add_argument("--itemcount", type=int, help="Items per request", default=1)
     parser.add_argument("--user", help="Restconf user name", default="admin")
     parser.add_argument("--password", help="Restconf password", default="admin")
     parser.add_argument("--ipr", type=int, help="Items per request", default=1)
-    parser.add_argument("--debug", dest="loglevel", action="store_const",
-                        const=logging.DEBUG, default=logging.INFO,
-                        help="Set log level to debug (default is error)")
+    parser.add_argument(
+        "--debug",
+        dest="loglevel",
+        action="store_const",
+        const=logging.DEBUG,
+        default=logging.INFO,
+        help="Set log level to debug (default is error)",
+    )
 
     args = parser.parse_args()
 
     logger = logging.getLogger("logger")
-    log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+    log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
     console_handler = logging.StreamHandler()
-    file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
+    file_handler = logging.FileHandler("cluster_rest_script.log", mode="w")
     console_handler.setFormatter(log_formatter)
     file_handler.setFormatter(log_formatter)
     logger.addHandler(console_handler)
@@ -664,17 +847,18 @@ if __name__ == "__main__":
 
     auth = (args.user, args.password)
 
-    if (args.action not in _handler_matrix or
-            args.itemtype not in _handler_matrix[args.action]):
-            msg = "Unsupported combination of action: " + str(args.action)
-            msg += " and item: " + str(args.itemtype)
-            logger.error(msg)
-            raise NotImplementedError(msg)
+    if (
+        args.action not in _handler_matrix
+        or args.itemtype not in _handler_matrix[args.action]
+    ):
+        msg = "Unsupported combination of action: " + str(args.action)
+        msg += " and item: " + str(args.itemtype)
+        logger.error(msg)
+        raise NotImplementedError(msg)
 
     # TODO: need to filter out situations when we cannot use more items
     # in one rest request (rpc or delete?)
     # this should be done inside handler functions
 
     handler_function = _handler_matrix[args.action][args.itemtype]
-    handler_function(args.host, args.port, args.threads,
-                     args.itemcount, auth, args.ipr)
+    handler_function(args.host, args.port, args.threads, args.itemcount, auth, args.ipr)