import copy
import argparse
import logging
+import time
_template_add_car = {
def _request_sender(thread_id, preparing_function, auth, in_queue=None,
- exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None):
+ exit_event=None, odl_ip="127.0.0.1", port="8181", out_queue=None,
+ req_timeout=60, retry_timeout=15, retry_rcs=[]):
"""The funcion sends http requests.
Runs in the working thread. It reads out flow details from the queue and
:param out_queue: queue where the results should be put
+ :param req_timeout: http request timeout
+
+ :param retry_timeout: timout to give up retry attempts to send http requests
+
+ :param retry_rcs: list of return codes when retry should be performed
+
Returns:
None (results is put into the output queue)
"""
continue
req = preparing_function(odl_ip, port, item_list, auth)
prep = req.prepare()
- try:
- rsp = ses.send(prep, timeout=60)
- except requests.exceptions.Timeout:
- counter[99] += 1
- logger.error("No response from %s", odl_ip)
- continue
- logger.debug("%s %s", rsp.request, rsp.request.url)
- logger.debug("Headers %s:", rsp.request.headers)
- logger.debug("Body: %s", rsp.request.body)
- logger.debug("Response: %s", rsp.text)
- logger.debug("%s %s", rsp, rsp.reason)
- counter[rsp.status_code] += 1
+ start_time = time_now = time.time()
+ while start_time + retry_timeout > time_now:
+ try:
+ rsp = ses.send(prep, timeout=req_timeout)
+ except requests.exceptions.Timeout:
+ counter[99] += 1
+ logger.error("No response from %s", odl_ip)
+ rc = 99
+ else:
+ counter[rsp.status_code] += 1
+ rc = rsp.status_code
+ lvl = logging.INFO if rc > 299 else logging.DEBUG
+ logger.log(lvl, "Request started at {} finished with following detais".format(time.ctime(start_time)))
+ logger.log(lvl, "%s %s", rsp.request, rsp.request.url)
+ logger.log(lvl, "Headers %s:", rsp.request.headers)
+ logger.log(lvl, "Body: %s", rsp.request.body)
+ logger.log(lvl, "Response: %s", rsp.text)
+ logger.log(lvl, "%s %s", rsp, rsp.reason)
+ if rc not in retry_rcs:
+ break
+ time_now = time.time()
responses = {}
for response_code, count in enumerate(counter):
if count > 0:
def _task_executor(preparing_function, odl_ip="127.0.0.1", port="8181",
thread_count=1, item_count=1, items_per_request=1,
- auth=('admin', 'admin')):
+ auth=('admin', 'admin'), req_timeout=600, retry_timeout=15, retry_rcs=[]):
"""The main function which drives sending of http requests.
Creates 2 queues and requested number of "working threads".
:param auth: authentication credentials
+ :param req_timeout: http request timeout
+
+ :param retry_timeout: timout to give up retry attempts to send http requests
+
+ :param retry_rcs: list of return codes when retry should be performed
+
Returns:
:returns dict: dictionary of http response counts like
{"http_status_code1: "count1", etc.}
args=(i, preparing_function, auth),
kwargs={"in_queue": send_queue, "exit_event": exit_event,
"odl_ip": hosts[i % nrhosts], "port": port,
- "out_queue": result_queue})
+ "out_queue": result_queue, "req_timeout": req_timeout,
+ "retry_timeout": retry_timeout, "retry_rcs": retry_rcs})
threads.append(thr)
thr.start()
raise Exception("Not all cars were configured: " + repr(res))
+def add_car_with_retries(odl_ip, port, thread_count, item_count, auth, items_per_request):
+ """Configure car entries to the config datastore.
+
+ Args:
+ :param odl_ip: ip address of ODL
+
+ :param port: restconf port
+
+ :param thread_count: number of threads used to send http requests; default=1
+
+ :param item_count: number of items to be configured
+
+ :param auth: authentication credentials
+
+ :param items_per_request: items per request, not used here,
+ just to keep the same api
+
+ Returns:
+ None
+ """
+
+ logger.info("Add %s car(s) to %s:%s (%s per request)",
+ item_count, odl_ip, port, items_per_request)
+ retry_rcs = [401, 404, 500, 503]
+ res = _task_executor(_prepare_add_car, odl_ip=odl_ip, port=port,
+ thread_count=thread_count, item_count=item_count,
+ items_per_request=items_per_request, auth=auth,
+ req_timeout=15, retry_timeout=30, retry_rcs=retry_rcs)
+ acceptable_rcs = [204] + retry_rcs
+ for key in res.keys():
+ if key not in acceptable_rcs:
+ logger.error("Problems during cars' configuration appeared: " + repr(res))
+ raise Exception("Problems during cars' configuration appeared: " + repr(res))
+
+
def add_people_rpc(odl_ip, port, thread_count, item_count, auth, items_per_request):
"""Configure people entries to the config datastore.
raise Exception("Not all rpc calls passed: " + repr(res))
-_actions = ["add", "get", "delete", "add-rpc"]
+_actions = ["add", "get", "delete", "add-rpc", "add-with-retries"]
_items = ["car", "people", "car-people"]
_handler_matrix = {
"get": {"car": get_car, "people": get_people, "car-people": get_car_people},
"delete": {"car": delete_car, "people": delete_people, "car-people": delete_car_people},
"add-rpc": {"car-people": add_car_people_rpc, "people": add_people_rpc},
+ "add-with-retries": {"car": add_car_with_retries},
}
if (args.action not in _handler_matrix or
args.itemtype not in _handler_matrix[args.action]):
- msg = "Unsupported combination of action: " + str(args.action)
- msg += " and item: " + str(args.itemtype)
- logger.error(msg)
- raise NotImplementedError(msg)
+ msg = "Unsupported combination of action: " + str(args.action)
+ msg += " and item: " + str(args.itemtype)
+ logger.error(msg)
+ raise NotImplementedError(msg)
# TODO: need to filter out situations when we cannot use more items
# in one rest request (rpc or delete?)