X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=csit%2Flibraries%2FMdsalLowlevelPy.py;h=4e1b24243ac55948cf9919384c17d6f146f21d4b;hb=HEAD;hp=c9e4c4520efb617a81e55ebc347ea8b44b615a4b;hpb=9ad8a09b94b0d1a5a1167e6baf92af872b7d4123;p=integration%2Ftest.git diff --git a/csit/libraries/MdsalLowlevelPy.py b/csit/libraries/MdsalLowlevelPy.py index c9e4c4520e..4e1b24243a 100644 --- a/csit/libraries/MdsalLowlevelPy.py +++ b/csit/libraries/MdsalLowlevelPy.py @@ -1,53 +1,224 @@ """ Python invocation of several parallel publish-notifications RPCs. """ -import Queue +from robot.api import logger +import time +import queue import requests import string import threading -def publish_notifications(host, grprefix, duration, rate, nrpairs=1): - """Invoke publish notification rpcs and verify the response. +_globals = {} - :param host: ip address of odl node - :type host: string - :param grprefix: prefix identifier for publisher/listener pair - :type grprefix: string - :param duration: publishing notification duration in seconds + +def _send_http_request_thread_impl(rqueue, prefix_id, url, data, http_timeout): + """Start either publish or write transactions rpc based on input. + + :param rqueue: result queue + :type rqueue: queue.Queue + :param prefix_id: identifier for prefix, should imply cluster member index + :type prefix_id: str + :param url: rpc url + :type url: str + :param data: http request content + :type data: str + :param http_timeout: http response timeout + :type http_timeout: int + """ + logger.info("rpc invoked with details: {}".format(data)) + try: + resp = requests.post( + url=url, + headers={"Content-Type": "application/xml"}, + data=data, + auth=("admin", "admin"), + timeout=http_timeout, + ) + except Exception as exc: + resp = exc + logger.debug(exc) + rqueue.put((time.ctime(), prefix_id, resp)) + + +def _initiate_rpcs(host_list, index_list, url_templ, data_templ, subst_dict): + """Initiate rpc on given hosts. + + :param host_list: IP addresses of odl nodes + :type host_list: list[str] + :param index_list: node indices which correspond to the ip addresses + :type index_list: list[int] + :param url_templ: url template + :type url_templ: string.Template object + :param data_templ: http request data + :type data_templ: string.Template object + :param subst_dict: dictionary with key value pairs to be used with template + :type subst_dict: dict + """ + resqueue = _globals.pop("result_queue", queue.Queue()) + lthreads = _globals.pop("threads", []) + for i, host in enumerate(host_list): + url = url_templ.substitute({"HOST": host}) + timeout = int(subst_dict["DURATION"]) + 3 * 125 + 10 + prefix_id = subst_dict["ID_PREFIX"] + str(index_list[i]) + subst_dict["ID"] = prefix_id + data = data_templ.substitute(subst_dict) + logger.info("url: {}, data: {}, timeout: {}".format(url, data, timeout)) + t = threading.Thread( + target=_send_http_request_thread_impl, + args=(resqueue, prefix_id, url, data, timeout), + ) + t.daemon = True + t.start() + lthreads.append(t) + + _globals.update({"threads": lthreads, "result_queue": resqueue}) + + +def start_write_transactions_on_nodes( + host_list, + index_list, + id_prefix, + duration, + rate, + chained_flag=False, + reset_globals=True, +): + """Invoke write-transactions rpc on given nodes. + + :param host_list: IP addresses of odl nodes + :type host_list: list[str] + :param index_list: node indices which correspond to the ip addresses + :type index_list: list[int] + :param id_prefix: identifier prefix + :type id_prefix: str + :param duration: time in seconds :type duration: int - :param rate: events rate per second + :param rate: writing transactions rate in transactions per second :type rate: int - :param nrpairs: number of publisher/listener pairs, id suffix is counted from it - :type nrpairs: int + :param chained_flag: specify chained vs. simple transactions + :type chained_flag: bool + :param reset_globals: reset global variable dict + :type reset_globals: bool """ - def _publ_notifications(rqueue, url, grid, duration, rate): - dtmpl = string.Template(''' + if reset_globals: + _globals.clear() + + logger.info( + "Input parameters: host_list:{}, index_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format( + host_list, index_list, id_prefix, duration, rate, chained_flag + ) + ) + datat = string.Template( + """ $ID $DURATION - $RATE -''') - data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate}) - try: - resp = requests.post(url=url, headers={'Content-Type': 'application/xml'}, - data=data, auth=('admin', 'admin'), timeout=int(duration)+60) - except Exception as exc: - resp = exc - rqueue.put(resp) - - resqueue = Queue.Queue() - lthreads = [] - url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:publish-notifications'.format(host) - for i in range(nrpairs): - t = threading.Thread(target=_publ_notifications, - args=(resqueue, url, '{}{}'.format(grprefix, i+1), duration, rate)) - t.daemon = True - t.start() - lthreads.append(t) + $RATE + $CHAINED_FLAG +""" + ) + subst_dict = { + "ID_PREFIX": id_prefix, + "DURATION": duration, + "RATE": rate, + "CHAINED_FLAG": "true" if chained_flag else "false", + } + urlt = string.Template( + """http://$HOST:8181/rests/operations/odl-mdsal-lowlevel-control:write-transactions""" + ) + _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict) + + +def start_produce_transactions_on_nodes( + host_list, + index_list, + id_prefix, + duration, + rate, + isolated_transactions_flag=False, + reset_globals=True, +): + """Invoke produce-transactions rpcs on given nodes. + + :param host_list: IP addresses of odl nodes + :type host_list: list[str] + :param index_list: node indices which correspond to the ip addresses + :type index_list: list[int] + :param id_prefix: identifier prefix + :type id_prefix: str + :param duration: time in seconds + :type duration: int + :param rate: produce transactions rate in transactions per second + :type rate: int + :param isolated_transactions_flag: isolated transactions flag + :type isolated_transactions_flag: bool + :param reset_globals: reset global variable dict + :type reset_globals: bool + """ + if reset_globals: + _globals.clear() + + msg = "host_list:{}, index_list:{} ,id_prefix:{}, duration:{}, rate:{}, isolated_transactions:{}".format( + host_list, index_list, id_prefix, duration, rate, isolated_transactions_flag + ) + msg = "Input parameters: " + msg + logger.info(msg) + datat = string.Template( + """ + $ID + $DURATION + $RATE + $ISOLATED_TRANSACTIONS +""" + ) + subst_dict = { + "ID_PREFIX": id_prefix, + "DURATION": duration, + "RATE": rate, + "ISOLATED_TRANSACTIONS": "true" if isolated_transactions_flag else "false", + } + urlt = string.Template( + """http://$HOST:8181/rests/operations/odl-mdsal-lowlevel-control:produce-transactions""" + ) + _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict) + + +def wait_for_transactions(): + """Blocking call, waitig for responses from all threads. + + :return: list of triples; triple consists of response time, prefix identifier and response object + :rtype: list[(str, str, requests.Response)] + """ + lthreads = _globals.pop("threads") + resqueue = _globals.pop("result_queue") for t in lthreads: t.join() - for i in range(nrpairs): - resp = resqueue.get() - assert resp.status_code == 200 + results = [] + while not resqueue.empty(): + results.append(resqueue.get()) + for rsp in results: + if isinstance(rsp[2], requests.Response): + logger.info(rsp[2].text) + else: + logger.info(rsp[2]) + return results + + +def get_next_transactions_response(): + """Get http response from write-transactions rpc if available. + + :return: None or a triple consisting of response time, prefix identifier and response object + :rtype: (str, str, requests.Response) + """ + resqueue = _globals.get("result_queue") + + if not resqueue.empty(): + rsp = resqueue.get() + if isinstance(rsp[2], requests.Response): + logger.info(rsp[2].text) + else: + logger.info(rsp[2]) + return rsp + return None