Add initial files for dom data broker testing
[integration/test.git] / csit / libraries / MdsalLowlevelPy.py
1 """
2 Python invocation of several parallel publish-notifications RPCs.
3 """
4 from robot.api import logger
5 import Queue
6 import requests
7 import string
8 import threading
9
10
11 _globals = {}
12
13
14 def start_write_transactions_on_nodes(host_list, id_prefix, duration, rate, chained_flag=True):
15     """Invoke publish notification rpcs and verify the response.
16
17     :param host_list: list of ip address of odl nodes
18     :type host_list: list of strings
19     :param id_prefix: identifier prefix
20     :type id_prefix: string
21     :param duration: time in seconds
22     :type duration: int
23     :param rate: writing transactions rate per second
24     :type rate: int
25     :param chained_flag: specify chained vs. simple transactions
26     :type chained_flag: bool
27     """
28     def _write_transactions(rqueue, url, grid, duration, rate, chained_flag):
29         dtmpl = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
30   <id>$ID</id>
31   <seconds>$DURATION</seconds>
32   <transactions-per-second>$RATE</transactions-per-second>
33   <chained-transactions>$CHAINED_FLAG</chained-transactions>
34 </input>''')
35         data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate, 'CHAINED_FLAG': chained_flag})
36         logger.info('write-transactions rpc indoked with details: {}'.format(data))
37         try:
38             resp = requests.post(url=url, headers={'Content-Type': 'application/xml'},
39                                  data=data, auth=('admin', 'admin'), timeout=int(duration)+60)
40         except Exception as exc:
41             resp = exc
42             logger.debug(exc)
43         rqueue.put(resp)
44
45     logger.info("Input parameters: host_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
46         host_list, id_prefix, duration, rate, chained_flag))
47     resqueue = _globals.pop('result_queue', Queue.Queue())
48     lthreads = _globals.pop('threads', [])
49     for i, host in enumerate(host_list):
50         url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions'.format(host)
51         t = threading.Thread(target=_write_transactions,
52                              args=(resqueue, url, '{}{}'.format(id_prefix, i), duration, rate, chained_flag))
53         t.daemon = True
54         t.start()
55         lthreads.append(t)
56
57     _globals.update({'threads': lthreads, 'result_queue': resqueue})
58
59
60 def wait_for_write_transactions():
61     """Blocking call, waiting for responses from all threads."""
62     lthreads = _globals.pop('threads')
63     resqueue = _globals.pop('result_queue')
64
65     for t in lthreads:
66         t.join()
67
68     results = []
69     while not resqueue.empty():
70         results.append(resqueue.get())
71     logger.info(results)
72     return results
73
74
75 def get_next_write_transactions_response():
76     """Get http response from write-transactions rpc if available."""
77     resqueue = _globals.get('result_queue')
78
79     if not resqueue.empty():
80         return resqueue.get()
81     return None