""" Python invocation of several parallel publish-notifications RPCs. """ from robot.api import logger import Queue import requests import string import threading _globals = {} def start_write_transactions_on_nodes(host_list, id_prefix, duration, rate, chained_flag=True): """Invoke publish notification rpcs and verify the response. :param host_list: list of ip address of odl nodes :type host_list: list of strings :param id_prefix: identifier prefix :type id_prefix: string :param duration: time in seconds :type duration: int :param rate: writing transactions rate per second :type rate: int :param chained_flag: specify chained vs. simple transactions :type chained_flag: bool """ def _write_transactions(rqueue, url, grid, duration, rate, chained_flag): dtmpl = string.Template(''' $ID $DURATION $RATE $CHAINED_FLAG ''') data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate, 'CHAINED_FLAG': chained_flag}) logger.info('write-transactions rpc indoked with details: {}'.format(data)) try: resp = requests.post(url=url, headers={'Content-Type': 'application/xml'}, data=data, auth=('admin', 'admin'), timeout=int(duration)+60) except Exception as exc: resp = exc logger.debug(exc) rqueue.put(resp) logger.info("Input parameters: host_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format( host_list, id_prefix, duration, rate, chained_flag)) resqueue = _globals.pop('result_queue', Queue.Queue()) lthreads = _globals.pop('threads', []) for i, host in enumerate(host_list): url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions'.format(host) t = threading.Thread(target=_write_transactions, args=(resqueue, url, '{}{}'.format(id_prefix, i), duration, rate, chained_flag)) t.daemon = True t.start() lthreads.append(t) _globals.update({'threads': lthreads, 'result_queue': resqueue}) def wait_for_write_transactions(): """Blocking call, waiting for responses from all threads.""" lthreads = _globals.pop('threads') resqueue = _globals.pop('result_queue') for t in lthreads: t.join() results = [] while not resqueue.empty(): results.append(resqueue.get()) logger.info(results) return results def get_next_write_transactions_response(): """Get http response from write-transactions rpc if available.""" resqueue = _globals.get('result_queue') if not resqueue.empty(): return resqueue.get() return None