Add initial files for dom data broker testing
[integration/test.git] / csit / libraries / MdsalLowlevelPy.py
index c9e4c4520efb617a81e55ebc347ea8b44b615a4b..012f87c2cf2c4bc3ddf3d0612d498cf47791989b 100644 (file)
@@ -1,53 +1,81 @@
 """
 Python invocation of several parallel publish-notifications RPCs.
 """
+from robot.api import logger
 import Queue
 import requests
 import string
 import threading
 
 
-def publish_notifications(host, grprefix, duration, rate, nrpairs=1):
+_globals = {}
+
+
+def start_write_transactions_on_nodes(host_list, id_prefix, duration, rate, chained_flag=True):
     """Invoke publish notification rpcs and verify the response.
 
-    :param host: ip address of odl node
-    :type host: string
-    :param grprefix: prefix identifier for publisher/listener pair
-    :type grprefix: string
-    :param duration: publishing notification duration in seconds
+    :param host_list: list of ip address of odl nodes
+    :type host_list: list of strings
+    :param id_prefix: identifier prefix
+    :type id_prefix: string
+    :param duration: time in seconds
     :type duration: int
-    :param rate: events rate per second
+    :param rate: writing transactions rate per second
     :type rate: int
-    :param nrpairs: number of publisher/listener pairs, id suffix is counted from it
-    :type nrpairs: int
+    :param chained_flag: specify chained vs. simple transactions
+    :type chained_flag: bool
     """
-    def _publ_notifications(rqueue, url, grid, duration, rate):
+    def _write_transactions(rqueue, url, grid, duration, rate, chained_flag):
         dtmpl = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
   <id>$ID</id>
   <seconds>$DURATION</seconds>
-  <notifications-per-second>$RATE</notifications-per-second>
+  <transactions-per-second>$RATE</transactions-per-second>
+  <chained-transactions>$CHAINED_FLAG</chained-transactions>
 </input>''')
-        data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate})
+        data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate, 'CHAINED_FLAG': chained_flag})
+        logger.info('write-transactions rpc indoked with details: {}'.format(data))
         try:
             resp = requests.post(url=url, headers={'Content-Type': 'application/xml'},
                                  data=data, auth=('admin', 'admin'), timeout=int(duration)+60)
         except Exception as exc:
             resp = exc
+            logger.debug(exc)
         rqueue.put(resp)
 
-    resqueue = Queue.Queue()
-    lthreads = []
-    url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:publish-notifications'.format(host)
-    for i in range(nrpairs):
-        t = threading.Thread(target=_publ_notifications,
-                             args=(resqueue, url, '{}{}'.format(grprefix, i+1), duration, rate))
+    logger.info("Input parameters: host_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
+        host_list, id_prefix, duration, rate, chained_flag))
+    resqueue = _globals.pop('result_queue', Queue.Queue())
+    lthreads = _globals.pop('threads', [])
+    for i, host in enumerate(host_list):
+        url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions'.format(host)
+        t = threading.Thread(target=_write_transactions,
+                             args=(resqueue, url, '{}{}'.format(id_prefix, i), duration, rate, chained_flag))
         t.daemon = True
         t.start()
         lthreads.append(t)
 
+    _globals.update({'threads': lthreads, 'result_queue': resqueue})
+
+
+def wait_for_write_transactions():
+    """Blocking call, waiting for responses from all threads."""
+    lthreads = _globals.pop('threads')
+    resqueue = _globals.pop('result_queue')
+
     for t in lthreads:
         t.join()
 
-    for i in range(nrpairs):
-        resp = resqueue.get()
-        assert resp.status_code == 200
+    results = []
+    while not resqueue.empty():
+        results.append(resqueue.get())
+    logger.info(results)
+    return results
+
+
+def get_next_write_transactions_response():
+    """Get http response from write-transactions rpc if available."""
+    resqueue = _globals.get('result_queue')
+
+    if not resqueue.empty():
+        return resqueue.get()
+    return None