Correctly space expected_status
[integration/test.git] / csit / libraries / MdsalLowlevelPy.py
1 """
2 Python invocation of several parallel publish-notifications RPCs.
3 """
4 from robot.api import logger
5 import time
6 import queue
7 import requests
8 import string
9 import threading
10
11
12 _globals = {}
13
14
15 def _send_http_request_thread_impl(rqueue, prefix_id, url, data, http_timeout):
16     """Start either publish or write transactions rpc based on input.
17
18     :param rqueue: result queue
19     :type rqueue: queue.Queue
20     :param prefix_id: identifier for prefix, should imply cluster member index
21     :type prefix_id: str
22     :param url: rpc url
23     :type url: str
24     :param data: http request content
25     :type data: str
26     :param http_timeout: http response timeout
27     :type http_timeout: int
28     """
29     logger.info("rpc invoked with details: {}".format(data))
30     try:
31         resp = requests.post(
32             url=url,
33             headers={"Content-Type": "application/xml"},
34             data=data,
35             auth=("admin", "admin"),
36             timeout=http_timeout,
37         )
38     except Exception as exc:
39         resp = exc
40         logger.debug(exc)
41     rqueue.put((time.ctime(), prefix_id, resp))
42
43
44 def _initiate_rpcs(host_list, index_list, url_templ, data_templ, subst_dict):
45     """Initiate rpc on given hosts.
46
47     :param host_list: IP addresses of odl nodes
48     :type host_list: list[str]
49     :param index_list: node indices which correspond to the ip addresses
50     :type index_list: list[int]
51     :param url_templ: url template
52     :type url_templ: string.Template object
53     :param data_templ: http request data
54     :type data_templ: string.Template object
55     :param subst_dict: dictionary with key value pairs to be used with template
56     :type subst_dict: dict
57     """
58     resqueue = _globals.pop("result_queue", queue.Queue())
59     lthreads = _globals.pop("threads", [])
60     for i, host in enumerate(host_list):
61         url = url_templ.substitute({"HOST": host})
62         timeout = int(subst_dict["DURATION"]) + 3 * 125 + 10
63         prefix_id = subst_dict["ID_PREFIX"] + str(index_list[i])
64         subst_dict["ID"] = prefix_id
65         data = data_templ.substitute(subst_dict)
66         logger.info("url: {}, data: {}, timeout: {}".format(url, data, timeout))
67         t = threading.Thread(
68             target=_send_http_request_thread_impl,
69             args=(resqueue, prefix_id, url, data, timeout),
70         )
71         t.daemon = True
72         t.start()
73         lthreads.append(t)
74
75     _globals.update({"threads": lthreads, "result_queue": resqueue})
76
77
78 def start_write_transactions_on_nodes(
79     host_list,
80     index_list,
81     id_prefix,
82     duration,
83     rate,
84     chained_flag=False,
85     reset_globals=True,
86 ):
87     """Invoke write-transactions rpc on given nodes.
88
89     :param host_list: IP addresses of odl nodes
90     :type host_list: list[str]
91     :param index_list: node indices which correspond to the ip addresses
92     :type index_list: list[int]
93     :param id_prefix: identifier prefix
94     :type id_prefix: str
95     :param duration: time in seconds
96     :type duration: int
97     :param rate: writing transactions rate in transactions per second
98     :type rate: int
99     :param chained_flag: specify chained vs. simple transactions
100     :type chained_flag: bool
101     :param reset_globals: reset global variable dict
102     :type reset_globals: bool
103     """
104     if reset_globals:
105         _globals.clear()
106
107     logger.info(
108         "Input parameters: host_list:{}, index_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
109             host_list, index_list, id_prefix, duration, rate, chained_flag
110         )
111     )
112     datat = string.Template(
113         """<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
114   <id>$ID</id>
115   <seconds>$DURATION</seconds>
116   <transactions-per-second>$RATE</transactions-per-second>
117   <chained-transactions>$CHAINED_FLAG</chained-transactions>
118 </input>"""
119     )
120     subst_dict = {
121         "ID_PREFIX": id_prefix,
122         "DURATION": duration,
123         "RATE": rate,
124         "CHAINED_FLAG": "true" if chained_flag else "false",
125     }
126     urlt = string.Template(
127         """http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions"""
128     )
129     _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict)
130
131
132 def start_produce_transactions_on_nodes(
133     host_list,
134     index_list,
135     id_prefix,
136     duration,
137     rate,
138     isolated_transactions_flag=False,
139     reset_globals=True,
140 ):
141     """Invoke produce-transactions rpcs on given nodes.
142
143     :param host_list: IP addresses of odl nodes
144     :type host_list: list[str]
145     :param index_list: node indices which correspond to the ip addresses
146     :type index_list: list[int]
147     :param id_prefix: identifier prefix
148     :type id_prefix: str
149     :param duration: time in seconds
150     :type duration: int
151     :param rate: produce transactions rate in transactions per second
152     :type rate: int
153     :param isolated_transactions_flag: isolated transactions flag
154     :type isolated_transactions_flag: bool
155     :param reset_globals: reset global variable dict
156     :type reset_globals: bool
157     """
158     if reset_globals:
159         _globals.clear()
160
161     msg = "host_list:{}, index_list:{} ,id_prefix:{}, duration:{}, rate:{}, isolated_transactions:{}".format(
162         host_list, index_list, id_prefix, duration, rate, isolated_transactions_flag
163     )
164     msg = "Input parameters: " + msg
165     logger.info(msg)
166     datat = string.Template(
167         """<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
168   <id>$ID</id>
169   <seconds>$DURATION</seconds>
170   <transactions-per-second>$RATE</transactions-per-second>
171   <isolated-transactions>$ISOLATED_TRANSACTIONS</isolated-transactions>
172 </input>"""
173     )
174     subst_dict = {
175         "ID_PREFIX": id_prefix,
176         "DURATION": duration,
177         "RATE": rate,
178         "ISOLATED_TRANSACTIONS": "true" if isolated_transactions_flag else "false",
179     }
180     urlt = string.Template(
181         """http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:produce-transactions"""
182     )
183     _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict)
184
185
186 def wait_for_transactions():
187     """Blocking call, waitig for responses from all threads.
188
189     :return: list of triples; triple consists of response time, prefix identifier and response object
190     :rtype: list[(str, str, requests.Response)]
191     """
192     lthreads = _globals.pop("threads")
193     resqueue = _globals.pop("result_queue")
194
195     for t in lthreads:
196         t.join()
197
198     results = []
199     while not resqueue.empty():
200         results.append(resqueue.get())
201     for rsp in results:
202         if isinstance(rsp[2], requests.Response):
203             logger.info(rsp[2].text)
204         else:
205             logger.info(rsp[2])
206     return results
207
208
209 def get_next_transactions_response():
210     """Get http response from write-transactions rpc if available.
211
212     :return: None or a triple consisting of response time, prefix identifier and response object
213     :rtype: (str, str, requests.Response)
214     """
215     resqueue = _globals.get("result_queue")
216
217     if not resqueue.empty():
218         rsp = resqueue.get()
219         if isinstance(rsp[2], requests.Response):
220             logger.info(rsp[2].text)
221         else:
222             logger.info(rsp[2])
223         return rsp
224     return None