import copy
import argparse
import logging
+import time
_template_add_car = {
"category": "my_category",
"model": "to be replaced",
"manufacturer": "my_manufacturer",
- "year": "2015"
+ "year": "2015",
}
]
}
-_template_add_people = {
- "person": [
+_template_add_people_rpc = {
+ "input": [
{
- "id": "to be replaced",
- "gender": "male",
- "age": "99",
- "address": "to be replaced",
- "contactNo": "to be replaced"
+ "people:id": "to be replaced",
+ "people:gender": "male",
+ "people:age": "99",
+ "people:address": "to be replaced",
+ "people:contactNo": "to be replaced",
}
]
}
"input": {
"car-purchase:person": "to be replaced",
"car-purchase:person-id": "to be replaced",
- "car-purchase:car-id": "to be replaced"
+ "car-purchase:car-id": "to be replaced",
}
}
return req
-def _prepare_add_people(odl_ip, port, item_list, auth):
+def _prepare_add_people_rpc(odl_ip, port, item_list, auth):
"""Creates a POST http requests to configure people in configuration datastore.
Args:
:returns req: http request object
"""
- container = {"person": []}
- for item in item_list:
- entry = copy.deepcopy(_template_add_people["person"][0])
- entry["id"] = str(item)
- entry["address"] = "address" + str(item)
- entry["contactNo"] = str(item)
- container["person"].append(entry)
- req = _build_post(odl_ip, port, "config/people:people", container, auth)
+ container = {"input": {}}
+ item = item_list[0]
+ entry = container["input"]
+ entry["people:id"] = str(item)
+ entry["people:address"] = "address" + str(item)
+ entry["people:contactNo"] = str(item)
+ container["input"] = entry
+ req = _build_post(odl_ip, port, "operations/people:add-person", container, auth)
return req
container = {"input": {}}
item = item_list[0]
entry = container["input"]
- entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
+ entry["car-purchase:person"] = (
+ "/people:people/people:person[people:id='" + str(item) + "']"
+ )
entry["car-purchase:person-id"] = str(item)
entry["car-purchase:car-id"] = str(item)
container["input"] = entry
return req
-def _request_sender(thread_id, preparing_function, auth, in_queue=None,
- exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
+def _request_sender(
+ thread_id,
+ preparing_function,
+ auth,
+ in_queue=None,
+ exit_event=None,
+ odl_ip="127.0.0.1",
+ port="8181",
+ out_queue=None,
+ req_timeout=60,
+ retry_timeout=15,
+ retry_rcs=[],
+):
"""The funcion sends http requests.
Runs in the working thread. It reads out flow details from the queue and
:param out_queue: queue where the results should be put
+ :param req_timeout: http request timeout
+
+ :param retry_timeout: timout to give up retry attempts to send http requests
+
+ :param retry_rcs: list of return codes when retry should be performed
+
Returns:
None (results is put into the output queue)
"""
continue
req = preparing_function(odl_ip, port, item_list, auth)
prep = req.prepare()
- try:
- rsp = ses.send(prep, timeout=60)
- except requests.exceptions.Timeout:
- counter[99] += 1
- logger.error("No response from %s", odl_ip)
- continue
- logger.debug("%s %s", rsp.request, rsp.request.url)
- logger.debug("Headers %s:", rsp.request.headers)
- logger.debug("Body: %s", rsp.request.body)
- logger.debug("Response: %s", rsp.text)
- logger.debug("%s %s", rsp, rsp.reason)
- counter[rsp.status_code] += 1
+ start_time = time_now = time.time()
+ while start_time + retry_timeout > time_now:
+ try:
+ rsp = ses.send(prep, timeout=req_timeout)
+ except requests.exceptions.Timeout:
+ counter[99] += 1
+ logger.error("No response from %s", odl_ip)
+ rc = 99
+ else:
+ counter[rsp.status_code] += 1
+ rc = rsp.status_code
+ lvl = logging.INFO if rc > 299 else logging.DEBUG
+ logger.log(
+ lvl,
+ "Request started at {} finished with following detais".format(
+ time.ctime(start_time)
+ ),
+ )
+ logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
+ logger.log(lvl, "Headers %s:", rsp.request.headers)
+ logger.log(lvl, "Body: %s", rsp.request.body)
+ logger.log(lvl, "Response: %s", rsp.text)
+ logger.log(lvl, "%s %s", rsp, rsp.reason)
+ if rc not in retry_rcs:
+ break
+ time_now = time.time()
responses = {}
for response_code, count in enumerate(counter):
if count > 0:
logger.info("Response code(s) got per number of requests: %s", responses)
-def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
- thread_count=1, item_count=1, items_per_request=1,
- auth=('admin', 'admin')):
+def _task_executor(
+ preparing_function,
+ odl_ip="127.0.0.1",
+ port="8181",
+ thread_count=1,
+ item_count=1,
+ items_per_request=1,
+ auth=("admin", "admin"),
+ req_timeout=600,
+ retry_timeout=15,
+ retry_rcs=[],
+):
"""The main function which drives sending of http requests.
Creates 2 queues and requested number of "working threads".
Args:
:param preparing_function: function to prepare http request object
- :param odl_ip: ip address of ODL; default="127.0.0.1"
+ :param odl_ip: ip address of ODL or comma separated addesses; default="127.0.0.1"
:param port: restconf port; default="8181"
:param auth: authentication credentials
+ :param req_timeout: http request timeout
+
+ :param retry_timeout: timout to give up retry attempts to send http requests
+
+ :param retry_rcs: list of return codes when retry should be performed
+
Returns:
:returns dict: dictionary of http response counts like
{"http_status_code1: "count1", etc.}
"""
- items = [i+1 for i in range(item_count)]
+ # geting hosts
+ hosts = odl_ip.split(",")
+ nrhosts = len(hosts)
+
+ items = [i + 1 for i in range(item_count)]
item_groups = []
for i in range(0, item_count, items_per_request):
- item_groups.append(items[i:i+items_per_request])
+ item_groups.append(items[i : i + items_per_request])
# fill the queue with details needed for one http requests
send_queue = Queue.Queue()
# start threads to read details from queues and to send http requests
threads = []
for i in range(int(thread_count)):
- thr = threading.Thread(target=_request_sender,
- args=(i, preparing_function, auth),
- kwargs={"in_queue": send_queue, "exit_event": exit_event,
- "odl_ip": odl_ip, "port": port,
- "out_queue": result_queue})
+ thr = threading.Thread(
+ target=_request_sender,
+ args=(i, preparing_function, auth),
+ kwargs={
+ "in_queue": send_queue,
+ "exit_event": exit_event,
+ "odl_ip": hosts[i % nrhosts],
+ "port": port,
+ "out_queue": result_queue,
+ "req_timeout": req_timeout,
+ "retry_timeout": retry_timeout,
+ "retry_rcs": retry_rcs,
+ },
+ )
threads.append(thr)
thr.start()
None
"""
- logger.info("Add %s car(s) to %s:%s (%s per request)",
- item_count, odl_ip, port, items_per_request)
- res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
- thread_count=thread_count, item_count=item_count,
- items_per_request=items_per_request, auth=auth)
+ logger.info(
+ "Add %s car(s) to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
+ res = _task_executor(
+ _prepare_add_car,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ )
if res.keys() != [204]:
logger.error("Not all cars were configured: " + repr(res))
raise Exception("Not all cars were configured: " + repr(res))
-def add_people(odl_ip, port, thread_count, item_count, auth, items_per_request):
+def add_car_with_retries(
+ odl_ip, port, thread_count, item_count, auth, items_per_request
+):
+ """Configure car entries to the config datastore.
+
+ Args:
+ :param odl_ip: ip address of ODL
+
+ :param port: restconf port
+
+ :param thread_count: number of threads used to send http requests; default=1
+
+ :param item_count: number of items to be configured
+
+ :param auth: authentication credentials
+
+ :param items_per_request: items per request, not used here,
+ just to keep the same api
+
+ Returns:
+ None
+ """
+
+ logger.info(
+ "Add %s car(s) to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
+ retry_rcs = [401, 404, 500, 503]
+ res = _task_executor(
+ _prepare_add_car,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ req_timeout=15,
+ retry_timeout=30,
+ retry_rcs=retry_rcs,
+ )
+ acceptable_rcs = [204] + retry_rcs
+ for key in res.keys():
+ if key not in acceptable_rcs:
+ logger.error("Problems during cars' configuration appeared: " + repr(res))
+ raise Exception(
+ "Problems during cars' configuration appeared: " + repr(res)
+ )
+
+
+def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
"""Configure people entries to the config datastore.
Args:
None
"""
- logger.info("Add %s people to %s:%s (%s per request)",
- item_count, odl_ip, port, items_per_request)
- res = _task_executor(_prepare_add_people, odl_ip=odl_ip, port=port,
- thread_count=thread_count, item_count=item_count,
- items_per_request=items_per_request, auth=auth)
- if res.keys() != [204]:
+ logger.info(
+ "Add %s people to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
+ if items_per_request != 1:
+ logger.error(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+ raise NotImplementedError(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+ res = _task_executor(
+ _prepare_add_people_rpc,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ )
+ if res.keys() != [200]:
logger.error("Not all people were configured: " + repr(res))
raise Exception("Not all people were configured: " + repr(res))
-def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
- items_per_request):
+def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
"""Configure car-people entries to the config datastore one by one using rpc
Args:
None
"""
- logger.info("Add %s purchase(s) to %s:%s (%s per request)",
- item_count, odl_ip, port, items_per_request)
+ logger.info(
+ "Add %s purchase(s) to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
if items_per_request != 1:
- logger.error("Only 1 item per request is supported, " +
- "you specified: {0}".format(item_count))
- raise NotImplementedError("Only 1 item per request is supported, " +
- "you specified: {0}".format(item_count))
-
- res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
- thread_count=thread_count, item_count=item_count,
- items_per_request=items_per_request, auth=auth)
- if res.keys() != [204]:
+ logger.error(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+ raise NotImplementedError(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+
+ res = _task_executor(
+ _prepare_add_car_people_rpc,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ )
+ if res.keys() != [200]:
logger.error("Not all rpc calls passed: " + repr(res))
raise Exception("Not all rpc calls passed: " + repr(res))
-_actions = ["add", "get", "delete", "add-rpc"]
+_actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
_items = ["car", "people", "car-people"]
_handler_matrix = {
- "add": {"car": add_car, "people": add_people},
+ "add": {"car": add_car},
"get": {"car": get_car, "people": get_people, "car-people": get_car_people},
- "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
- "add-rpc": {"car-people": add_car_people_rpc},
+ "delete": {
+ "car": delete_car,
+ "people": delete_people,
+ "car-people": delete_car_people,
+ },
+ "add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
+ "add-with-retries": {"car": add_car_with_retries},
}
It provides "car", "people" and "car-people" crud operations.
"""
- parser = argparse.ArgumentParser(description="Cluster datastore"
- "performance test script")
- parser.add_argument("--host", default="127.0.0.1",
- help="Host where odl controller is running"
- "(default is 127.0.0.1)")
- parser.add_argument("--port", default="8181",
- help="Port on which odl's RESTCONF is listening"
- "(default is 8181)")
- parser.add_argument("--threads", type=int, default=1,
- help="Number of request worker threads to start in"
- "each cycle (default=1)")
- parser.add_argument("action", choices=_actions, metavar="action",
- help="Action to be performed.")
- parser.add_argument("--itemtype", choices=_items, default="car",
- help="Flows-per-Request - number of flows (batch size)"
- "sent in each HTTP request (default 1)")
- parser.add_argument("--itemcount", type=int, help="Items per request",
- default=1)
+ parser = argparse.ArgumentParser(
+ description="Cluster datastore" "performance test script"
+ )
+ parser.add_argument(
+ "--host",
+ default="127.0.0.1",
+ help="Host where odl controller is running."
+ "Or comma separated list of hosts."
+ "(default is 127.0.0.1)",
+ )
+ parser.add_argument(
+ "--port",
+ default="8181",
+ help="Port on which odl's RESTCONF is listening" "(default is 8181)",
+ )
+ parser.add_argument(
+ "--threads",
+ type=int,
+ default=1,
+ help="Number of request worker threads to start in" "each cycle (default=1)",
+ )
+ parser.add_argument(
+ "action", choices=_actions, metavar="action", help="Action to be performed."
+ )
+ parser.add_argument(
+ "--itemtype",
+ choices=_items,
+ default="car",
+ help="Flows-per-Request - number of flows (batch size)"
+ "sent in each HTTP request (default 1)",
+ )
+ parser.add_argument("--itemcount", type=int, help="Items per request", default=1)
parser.add_argument("--user", help="Restconf user name", default="admin")
parser.add_argument("--password", help="Restconf password", default="admin")
parser.add_argument("--ipr", type=int, help="Items per request", default=1)
- parser.add_argument("--debug", dest="loglevel", action="store_const",
- const=logging.DEBUG, default=logging.INFO,
- help="Set log level to debug (default is error)")
+ parser.add_argument(
+ "--debug",
+ dest="loglevel",
+ action="store_const",
+ const=logging.DEBUG,
+ default=logging.INFO,
+ help="Set log level to debug (default is error)",
+ )
args = parser.parse_args()
logger = logging.getLogger("logger")
- log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+ log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
console_handler = logging.StreamHandler()
- file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
+ file_handler = logging.FileHandler("cluster_rest_script.log", mode="w")
console_handler.setFormatter(log_formatter)
file_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
auth = (args.user, args.password)
- if (args.action not in _handler_matrix or
- args.itemtype not in _handler_matrix[args.action]):
- msg = "Unsupported combination of action: " + str(args.action)
- msg += " and item: " + str(args.itemtype)
- logger.error(msg)
- raise NotImplementedError(msg)
+ if (
+ args.action not in _handler_matrix
+ or args.itemtype not in _handler_matrix[args.action]
+ ):
+ msg = "Unsupported combination of action: " + str(args.action)
+ msg += " and item: " + str(args.itemtype)
+ logger.error(msg)
+ raise NotImplementedError(msg)
# TODO: need to filter out situations when we cannot use more items
# in one rest request (rpc or delete?)
# this should be done inside handler functions
handler_function = _handler_matrix[args.action][args.itemtype]
- handler_function(args.host, args.port, args.threads,
- args.itemcount, auth, args.ipr)
+ handler_function(args.host, args.port, args.threads, args.itemcount, auth, args.ipr)