a75f3eeaf4786e4c18c49de167417566c25d817e
[integration/test.git] / csit / libraries / MdsalLowlevelPy.py
1 """
2 Python invocation of several parallel publish-notifications RPCs.
3 """
4 from robot.api import logger
5 import Queue
6 import requests
7 import string
8 import threading
9
10
11 _globals = {}
12
13
14 def _send_http_request_thread_impl(rqueue, url, data, http_timeout):
15     """Start either publish or write transactions rpc based on input.
16
17     :param rqueue: result queue
18     :type rqueue: Queue.Queue
19     :param url: rpc url
20     :type url: string
21     :param data: http request content
22     :type data: string
23     :param http_timeout: http response timeout
24     :type http_timeout: int
25     """
26     logger.info('rpc indoked with details: {}'.format(data))
27     try:
28         resp = requests.post(url=url, headers={'Content-Type': 'application/xml'},
29                              data=data, auth=('admin', 'admin'), timeout=http_timeout)
30     except Exception as exc:
31         resp = exc
32         logger.debug(exc)
33     rqueue.put(resp)
34
35
36 def _initiate_rpcs(host_list, prefix_list, url_templ, data_templ, subst_dict):
37     """Initiate rpc on given hosts.
38
39     :param host_list: list of ip address of odl nodes
40     :type host_list: list of strings
41     :param id_prefix: list of node indexes which coresponds to the ip addresses
42     :type id_prefix: string
43     :param url_templ: url template
44     :type url_templ: string.Template object
45     :param data_templ: http request data
46     :type data_templ: string.Template object
47     :param subst_dict: dictionary with key value pairs to be used with template
48     :type subst_dict: dict
49     """
50     resqueue = _globals.pop('result_queue', Queue.Queue())
51     lthreads = _globals.pop('threads', [])
52     for i, host in enumerate(host_list):
53         subst_dict.update({'ID': '{}{}'.format(subst_dict['ID_PREFIX'], prefix_list[i])})
54         url = url_templ.substitute({'HOST': host})
55         data = data_templ.substitute(subst_dict)
56         timeout = int(subst_dict['DURATION'])+150
57         logger.info('url: {}, data: {}, timeout: {}'.format(url, data, timeout))
58         t = threading.Thread(target=_send_http_request_thread_impl,
59                              args=(resqueue, url, data, timeout))
60         t.daemon = True
61         t.start()
62         lthreads.append(t)
63
64     _globals.update({'threads': lthreads, 'result_queue': resqueue})
65
66
67 def start_write_transactions_on_nodes(host_list, prefix_list, id_prefix, duration, rate, chained_flag=False,
68                                       reset_globals=True):
69     """Invoke write-transactions rpc on given nodes.
70
71     :param host_list: list of ip address of odl nodes
72     :type host_list: list of strings
73     :param prefix_list: list of node indexes which coresponds to the ip addresses
74     :type prefix_list: list
75     :param id_prefix: identifier prefix
76     :type id_prefix: string
77     :param duration: time in seconds
78     :type duration: int
79     :param rate: writing transactions rate in transactions per second
80     :type rate: int
81     :param chained_flag: specify chained vs. simple transactions
82     :type chained_flag: bool
83     :param reset_globals: reset global variable dict
84     :type reset_globals: bool
85     """
86     if reset_globals:
87         _globals.clear()
88
89     logger.info(
90         "Input parameters: host_list:{}, prefix_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
91             host_list, prefix_list, id_prefix, duration, rate, chained_flag))
92     datat = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
93   <id>$ID</id>
94   <seconds>$DURATION</seconds>
95   <transactions-per-second>$RATE</transactions-per-second>
96   <chained-transactions>$CHAINED_FLAG</chained-transactions>
97 </input>''')
98     subst_dict = {'ID_PREFIX': id_prefix, 'DURATION': duration, 'RATE': rate, 'CHAINED_FLAG': chained_flag}
99     urlt = string.Template('''http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions''')
100     _initiate_rpcs(host_list, prefix_list, urlt, datat, subst_dict)
101
102
103 def start_produce_transactions_on_nodes(host_list, prefix_list, id_prefix,
104                                         duration, rate, isolated_transactions_flag=False, reset_globals=True):
105     """Invoke produce-transactions rpcs on given nodes.
106
107     :param host_list: list of ip address of odl nodes
108     :type host_list: list of strings
109     :param prefix_list: list of node indexes which coresponds to the ip addresses
110     :type prefix_list: list
111     :param id_prefix: identifier prefix
112     :type id_prefix: string
113     :param duration: time in seconds
114     :type duration: int
115     :param rate: produce transactions rate in transactions per second
116     :type rate: int
117     :param isolated_transactions_flag: isolated transactions flag
118     :type isolated_transactions_flag: bool
119     :param reset_globals: reset global variable dict
120     :type reset_globals: bool
121     """
122     if reset_globals:
123         _globals.clear()
124
125     msg = "host_list:{}, prefix_list:{} ,id_prefix:{}, duration:{}, rate:{}, isolated_transactions:{}".format(
126             host_list, prefix_list, id_prefix, duration, rate, isolated_transactions_flag)
127     msg = "Input parameters: " + msg
128     logger.info(msg)
129     datat = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
130   <id>$ID</id>
131   <seconds>$DURATION</seconds>
132   <transactions-per-second>$RATE</transactions-per-second>
133   <isolated-transactions>$ISOLATED_TRANSACTIONS</isolated-transactions>
134 </input>''')
135     subst_dict = {'ID_PREFIX': id_prefix, 'DURATION': duration, 'RATE': rate,
136                   'ISOLATED_TRANSACTIONS': isolated_transactions_flag}
137     urlt = string.Template('''http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:produce-transactions''')
138     _initiate_rpcs(host_list, prefix_list, urlt, datat, subst_dict)
139
140
141 def wait_for_transactions():
142     """Blocking call, waitig for responses from all threads."""
143     lthreads = _globals.pop('threads')
144     resqueue = _globals.pop('result_queue')
145
146     for t in lthreads:
147         t.join()
148
149     results = []
150     while not resqueue.empty():
151         results.append(resqueue.get())
152     for rsp in results:
153         if isinstance(rsp, requests.Response):
154             logger.info(rsp.text)
155         else:
156             logger.info(rsp)
157     return results
158
159
160 def get_next_transactions_response():
161     """Get http response from write-transactions rpc if available."""
162     resqueue = _globals.get('result_queue')
163
164     if not resqueue.empty():
165         rsp = resqueue.get()
166         if isinstance(rsp, requests.Response):
167             logger.info(rsp.text)
168         else:
169             logger.info(rsp)
170         return rsp
171     return None