"""
Python invocation of several parallel publish-notifications RPCs.
"""
+from robot.api import logger
import Queue
import requests
import string
import threading
-def publish_notifications(host, grprefix, duration, rate, nrpairs=1):
+_globals = {}
+
+
+def start_write_transactions_on_nodes(host_list, id_prefix, duration, rate, chained_flag=True):
"""Invoke publish notification rpcs and verify the response.
- :param host: ip address of odl node
- :type host: string
- :param grprefix: prefix identifier for publisher/listener pair
- :type grprefix: string
- :param duration: publishing notification duration in seconds
+ :param host_list: list of ip address of odl nodes
+ :type host_list: list of strings
+ :param id_prefix: identifier prefix
+ :type id_prefix: string
+ :param duration: time in seconds
:type duration: int
- :param rate: events rate per second
+ :param rate: writing transactions rate per second
:type rate: int
- :param nrpairs: number of publisher/listener pairs, id suffix is counted from it
- :type nrpairs: int
+ :param chained_flag: specify chained vs. simple transactions
+ :type chained_flag: bool
"""
- def _publ_notifications(rqueue, url, grid, duration, rate):
+ def _write_transactions(rqueue, url, grid, duration, rate, chained_flag):
dtmpl = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
<id>$ID</id>
<seconds>$DURATION</seconds>
- <notifications-per-second>$RATE</notifications-per-second>
+ <transactions-per-second>$RATE</transactions-per-second>
+ <chained-transactions>$CHAINED_FLAG</chained-transactions>
</input>''')
- data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate})
+ data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate, 'CHAINED_FLAG': chained_flag})
+ logger.info('write-transactions rpc indoked with details: {}'.format(data))
try:
resp = requests.post(url=url, headers={'Content-Type': 'application/xml'},
data=data, auth=('admin', 'admin'), timeout=int(duration)+60)
except Exception as exc:
resp = exc
+ logger.debug(exc)
rqueue.put(resp)
- resqueue = Queue.Queue()
- lthreads = []
- url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:publish-notifications'.format(host)
- for i in range(nrpairs):
- t = threading.Thread(target=_publ_notifications,
- args=(resqueue, url, '{}{}'.format(grprefix, i+1), duration, rate))
+ logger.info("Input parameters: host_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
+ host_list, id_prefix, duration, rate, chained_flag))
+ resqueue = _globals.pop('result_queue', Queue.Queue())
+ lthreads = _globals.pop('threads', [])
+ for i, host in enumerate(host_list):
+ url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions'.format(host)
+ t = threading.Thread(target=_write_transactions,
+ args=(resqueue, url, '{}{}'.format(id_prefix, i), duration, rate, chained_flag))
t.daemon = True
t.start()
lthreads.append(t)
+ _globals.update({'threads': lthreads, 'result_queue': resqueue})
+
+
+def wait_for_write_transactions():
+ """Blocking call, waiting for responses from all threads."""
+ lthreads = _globals.pop('threads')
+ resqueue = _globals.pop('result_queue')
+
for t in lthreads:
t.join()
- for i in range(nrpairs):
- resp = resqueue.get()
- assert resp.status_code == 200
+ results = []
+ while not resqueue.empty():
+ results.append(resqueue.get())
+ logger.info(results)
+ return results
+
+
+def get_next_write_transactions_response():
+ """Get http response from write-transactions rpc if available."""
+ resqueue = _globals.get('result_queue')
+
+ if not resqueue.empty():
+ return resqueue.get()
+ return None