"category": "my_category",
"model": "to be replaced",
"manufacturer": "my_manufacturer",
- "year": "2015"
+ "year": "2015",
}
]
}
"people:gender": "male",
"people:age": "99",
"people:address": "to be replaced",
- "people:contactNo": "to be replaced"
+ "people:contactNo": "to be replaced",
}
]
}
"input": {
"car-purchase:person": "to be replaced",
"car-purchase:person-id": "to be replaced",
- "car-purchase:car-id": "to be replaced"
+ "car-purchase:car-id": "to be replaced",
}
}
container = {"input": {}}
item = item_list[0]
entry = container["input"]
- entry["car-purchase:person"] = "/people:people/people:person[people:id='" + str(item) + "']"
+ entry["car-purchase:person"] = (
+ "/people:people/people:person[people:id='" + str(item) + "']"
+ )
entry["car-purchase:person-id"] = str(item)
entry["car-purchase:car-id"] = str(item)
container["input"] = entry
return req
-def _request_sender(thread_id, preparing_function, auth, in_queue=None,
- exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None,
- req_timeout=60, retry_timeout=15, retry_rcs=[]):
+def _request_sender(
+ thread_id,
+ preparing_function,
+ auth,
+ in_queue=None,
+ exit_event=None,
+ odl_ip="127.0.0.1",
+ port="8181",
+ out_queue=None,
+ req_timeout=60,
+ retry_timeout=15,
+ retry_rcs=[],
+):
"""The funcion sends http requests.
Runs in the working thread. It reads out flow details from the queue and
counter[rsp.status_code] += 1
rc = rsp.status_code
lvl = logging.INFO if rc > 299 else logging.DEBUG
- logger.log(lvl, "Request started at {} finished with following detais".format(time.ctime(start_time)))
+ logger.log(
+ lvl,
+ "Request started at {} finished with following detais".format(
+ time.ctime(start_time)
+ ),
+ )
logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
logger.log(lvl, "Headers %s:", rsp.request.headers)
logger.log(lvl, "Body: %s", rsp.request.body)
logger.info("Response code(s) got per number of requests: %s", responses)
-def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
- thread_count=1, item_count=1, items_per_request=1,
- auth=('admin', 'admin'), req_timeout=600, retry_timeout=15, retry_rcs=[]):
+def _task_executor(
+ preparing_function,
+ odl_ip="127.0.0.1",
+ port="8181",
+ thread_count=1,
+ item_count=1,
+ items_per_request=1,
+ auth=("admin", "admin"),
+ req_timeout=600,
+ retry_timeout=15,
+ retry_rcs=[],
+):
"""The main function which drives sending of http requests.
Creates 2 queues and requested number of "working threads".
"""
# geting hosts
- hosts = odl_ip.split(',')
+ hosts = odl_ip.split(",")
nrhosts = len(hosts)
items = [i + 1 for i in range(item_count)]
item_groups = []
for i in range(0, item_count, items_per_request):
- item_groups.append(items[i:i + items_per_request])
+ item_groups.append(items[i : i + items_per_request])
# fill the queue with details needed for one http requests
send_queue = Queue.Queue()
# start threads to read details from queues and to send http requests
threads = []
for i in range(int(thread_count)):
- thr = threading.Thread(target=_request_sender,
- args=(i, preparing_function, auth),
- kwargs={"in_queue": send_queue, "exit_event": exit_event,
- "odl_ip": hosts[i % nrhosts], "port": port,
- "out_queue": result_queue, "req_timeout": req_timeout,
- "retry_timeout": retry_timeout, "retry_rcs": retry_rcs})
+ thr = threading.Thread(
+ target=_request_sender,
+ args=(i, preparing_function, auth),
+ kwargs={
+ "in_queue": send_queue,
+ "exit_event": exit_event,
+ "odl_ip": hosts[i % nrhosts],
+ "port": port,
+ "out_queue": result_queue,
+ "req_timeout": req_timeout,
+ "retry_timeout": retry_timeout,
+ "retry_rcs": retry_rcs,
+ },
+ )
threads.append(thr)
thr.start()
None
"""
- logger.info("Add %s car(s) to %s:%s (%s per request)",
- item_count, odl_ip, port, items_per_request)
- res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
- thread_count=thread_count, item_count=item_count,
- items_per_request=items_per_request, auth=auth)
+ logger.info(
+ "Add %s car(s) to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
+ res = _task_executor(
+ _prepare_add_car,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ )
if res.keys() != [204]:
logger.error("Not all cars were configured: " + repr(res))
raise Exception("Not all cars were configured: " + repr(res))
-def add_car_with_retries(odl_ip, port, thread_count, item_count, auth, items_per_request):
+def add_car_with_retries(
+ odl_ip, port, thread_count, item_count, auth, items_per_request
+):
"""Configure car entries to the config datastore.
Args:
None
"""
- logger.info("Add %s car(s) to %s:%s (%s per request)",
- item_count, odl_ip, port, items_per_request)
+ logger.info(
+ "Add %s car(s) to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
retry_rcs = [401, 404, 500, 503]
- res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
- thread_count=thread_count, item_count=item_count,
- items_per_request=items_per_request, auth=auth,
- req_timeout=15, retry_timeout=30, retry_rcs=retry_rcs)
+ res = _task_executor(
+ _prepare_add_car,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ req_timeout=15,
+ retry_timeout=30,
+ retry_rcs=retry_rcs,
+ )
acceptable_rcs = [204] + retry_rcs
for key in res.keys():
if key not in acceptable_rcs:
logger.error("Problems during cars' configuration appeared: " + repr(res))
- raise Exception("Problems during cars' configuration appeared: " + repr(res))
+ raise Exception(
+ "Problems during cars' configuration appeared: " + repr(res)
+ )
def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
None
"""
- logger.info("Add %s people to %s:%s (%s per request)",
- item_count, odl_ip, port, items_per_request)
+ logger.info(
+ "Add %s people to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
if items_per_request != 1:
- logger.error("Only 1 item per request is supported, " +
- "you specified: {0}".format(item_count))
- raise NotImplementedError("Only 1 item per request is supported, " +
- "you specified: {0}".format(item_count))
- res = _task_executor(_prepare_add_people_rpc, odl_ip=odl_ip, port=port,
- thread_count=thread_count, item_count=item_count,
- items_per_request=items_per_request, auth=auth)
+ logger.error(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+ raise NotImplementedError(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+ res = _task_executor(
+ _prepare_add_people_rpc,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ )
if res.keys() != [200]:
logger.error("Not all people were configured: " + repr(res))
raise Exception("Not all people were configured: " + repr(res))
-def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth,
- items_per_request):
+def add_car_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
"""Configure car-people entries to the config datastore one by one using rpc
Args:
None
"""
- logger.info("Add %s purchase(s) to %s:%s (%s per request)",
- item_count, odl_ip, port, items_per_request)
+ logger.info(
+ "Add %s purchase(s) to %s:%s (%s per request)",
+ item_count,
+ odl_ip,
+ port,
+ items_per_request,
+ )
if items_per_request != 1:
- logger.error("Only 1 item per request is supported, " +
- "you specified: {0}".format(item_count))
- raise NotImplementedError("Only 1 item per request is supported, " +
- "you specified: {0}".format(item_count))
-
- res = _task_executor(_prepare_add_car_people_rpc, odl_ip=odl_ip, port=port,
- thread_count=thread_count, item_count=item_count,
- items_per_request=items_per_request, auth=auth)
+ logger.error(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+ raise NotImplementedError(
+ "Only 1 item per request is supported, "
+ + "you specified: {0}".format(item_count)
+ )
+
+ res = _task_executor(
+ _prepare_add_car_people_rpc,
+ odl_ip=odl_ip,
+ port=port,
+ thread_count=thread_count,
+ item_count=item_count,
+ items_per_request=items_per_request,
+ auth=auth,
+ )
if res.keys() != [200]:
logger.error("Not all rpc calls passed: " + repr(res))
raise Exception("Not all rpc calls passed: " + repr(res))
_handler_matrix = {
"add": {"car": add_car},
"get": {"car": get_car, "people": get_people, "car-people": get_car_people},
- "delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
+ "delete": {
+ "car": delete_car,
+ "people": delete_people,
+ "car-people": delete_car_people,
+ },
"add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
"add-with-retries": {"car": add_car_with_retries},
}
It provides "car", "people" and "car-people" crud operations.
"""
- parser = argparse.ArgumentParser(description="Cluster datastore"
- "performance test script")
- parser.add_argument("--host", default="127.0.0.1",
- help="Host where odl controller is running."
- "Or comma separated list of hosts."
- "(default is 127.0.0.1)")
- parser.add_argument("--port", default="8181",
- help="Port on which odl's RESTCONF is listening"
- "(default is 8181)")
- parser.add_argument("--threads", type=int, default=1,
- help="Number of request worker threads to start in"
- "each cycle (default=1)")
- parser.add_argument("action", choices=_actions, metavar="action",
- help="Action to be performed.")
- parser.add_argument("--itemtype", choices=_items, default="car",
- help="Flows-per-Request - number of flows (batch size)"
- "sent in each HTTP request (default 1)")
- parser.add_argument("--itemcount", type=int, help="Items per request",
- default=1)
+ parser = argparse.ArgumentParser(
+ description="Cluster datastore" "performance test script"
+ )
+ parser.add_argument(
+ "--host",
+ default="127.0.0.1",
+ help="Host where odl controller is running."
+ "Or comma separated list of hosts."
+ "(default is 127.0.0.1)",
+ )
+ parser.add_argument(
+ "--port",
+ default="8181",
+ help="Port on which odl's RESTCONF is listening" "(default is 8181)",
+ )
+ parser.add_argument(
+ "--threads",
+ type=int,
+ default=1,
+ help="Number of request worker threads to start in" "each cycle (default=1)",
+ )
+ parser.add_argument(
+ "action", choices=_actions, metavar="action", help="Action to be performed."
+ )
+ parser.add_argument(
+ "--itemtype",
+ choices=_items,
+ default="car",
+ help="Flows-per-Request - number of flows (batch size)"
+ "sent in each HTTP request (default 1)",
+ )
+ parser.add_argument("--itemcount", type=int, help="Items per request", default=1)
parser.add_argument("--user", help="Restconf user name", default="admin")
parser.add_argument("--password", help="Restconf password", default="admin")
parser.add_argument("--ipr", type=int, help="Items per request", default=1)
- parser.add_argument("--debug", dest="loglevel", action="store_const",
- const=logging.DEBUG, default=logging.INFO,
- help="Set log level to debug (default is error)")
+ parser.add_argument(
+ "--debug",
+ dest="loglevel",
+ action="store_const",
+ const=logging.DEBUG,
+ default=logging.INFO,
+ help="Set log level to debug (default is error)",
+ )
args = parser.parse_args()
logger = logging.getLogger("logger")
- log_formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+ log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
console_handler = logging.StreamHandler()
- file_handler = logging.FileHandler('cluster_rest_script.log', mode="w")
+ file_handler = logging.FileHandler("cluster_rest_script.log", mode="w")
console_handler.setFormatter(log_formatter)
file_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
auth = (args.user, args.password)
- if (args.action not in _handler_matrix or
- args.itemtype not in _handler_matrix[args.action]):
+ if (
+ args.action not in _handler_matrix
+ or args.itemtype not in _handler_matrix[args.action]
+ ):
msg = "Unsupported combination of action: " + str(args.action)
msg += " and item: " + str(args.itemtype)
logger.error(msg)
# this should be done inside handler functions
handler_function = _handler_matrix[args.action][args.itemtype]
- handler_function(args.host, args.port, args.threads,
- args.itemcount, auth, args.ipr)
+ handler_function(args.host, args.port, args.threads, args.itemcount, auth, args.ipr)