2 Python invocation of several parallel publish-notifications RPCs.
4 from robot.api import logger
15 def _send_http_request_thread_impl(rqueue, prefix_id, url, data, http_timeout):
16 """Start either publish or write transactions rpc based on input.
18 :param rqueue: result queue
19 :type rqueue: queue.Queue
20 :param prefix_id: identifier for prefix, should imply cluster member index
24 :param data: http request content
26 :param http_timeout: http response timeout
27 :type http_timeout: int
29 logger.info("rpc invoked with details: {}".format(data))
33 headers={"Content-Type": "application/xml"},
35 auth=("admin", "admin"),
38 except Exception as exc:
41 rqueue.put((time.ctime(), prefix_id, resp))
44 def _initiate_rpcs(host_list, index_list, url_templ, data_templ, subst_dict):
45 """Initiate rpc on given hosts.
47 :param host_list: IP addresses of odl nodes
48 :type host_list: list[str]
49 :param index_list: node indices which correspond to the ip addresses
50 :type index_list: list[int]
51 :param url_templ: url template
52 :type url_templ: string.Template object
53 :param data_templ: http request data
54 :type data_templ: string.Template object
55 :param subst_dict: dictionary with key value pairs to be used with template
56 :type subst_dict: dict
58 resqueue = _globals.pop("result_queue", queue.Queue())
59 lthreads = _globals.pop("threads", [])
60 for i, host in enumerate(host_list):
61 url = url_templ.substitute({"HOST": host})
62 timeout = int(subst_dict["DURATION"]) + 3 * 125 + 10
63 prefix_id = subst_dict["ID_PREFIX"] + str(index_list[i])
64 subst_dict["ID"] = prefix_id
65 data = data_templ.substitute(subst_dict)
66 logger.info("url: {}, data: {}, timeout: {}".format(url, data, timeout))
68 target=_send_http_request_thread_impl,
69 args=(resqueue, prefix_id, url, data, timeout),
75 _globals.update({"threads": lthreads, "result_queue": resqueue})
78 def start_write_transactions_on_nodes(
87 """Invoke write-transactions rpc on given nodes.
89 :param host_list: IP addresses of odl nodes
90 :type host_list: list[str]
91 :param index_list: node indices which correspond to the ip addresses
92 :type index_list: list[int]
93 :param id_prefix: identifier prefix
95 :param duration: time in seconds
97 :param rate: writing transactions rate in transactions per second
99 :param chained_flag: specify chained vs. simple transactions
100 :type chained_flag: bool
101 :param reset_globals: reset global variable dict
102 :type reset_globals: bool
108 "Input parameters: host_list:{}, index_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
109 host_list, index_list, id_prefix, duration, rate, chained_flag
112 datat = string.Template(
113 """<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
115 <seconds>$DURATION</seconds>
116 <transactions-per-second>$RATE</transactions-per-second>
117 <chained-transactions>$CHAINED_FLAG</chained-transactions>
121 "ID_PREFIX": id_prefix,
122 "DURATION": duration,
124 "CHAINED_FLAG": "true" if chained_flag else "false",
126 urlt = string.Template(
127 """http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions"""
129 _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict)
132 def start_produce_transactions_on_nodes(
138 isolated_transactions_flag=False,
141 """Invoke produce-transactions rpcs on given nodes.
143 :param host_list: IP addresses of odl nodes
144 :type host_list: list[str]
145 :param index_list: node indices which correspond to the ip addresses
146 :type index_list: list[int]
147 :param id_prefix: identifier prefix
149 :param duration: time in seconds
151 :param rate: produce transactions rate in transactions per second
153 :param isolated_transactions_flag: isolated transactions flag
154 :type isolated_transactions_flag: bool
155 :param reset_globals: reset global variable dict
156 :type reset_globals: bool
161 msg = "host_list:{}, index_list:{} ,id_prefix:{}, duration:{}, rate:{}, isolated_transactions:{}".format(
162 host_list, index_list, id_prefix, duration, rate, isolated_transactions_flag
164 msg = "Input parameters: " + msg
166 datat = string.Template(
167 """<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
169 <seconds>$DURATION</seconds>
170 <transactions-per-second>$RATE</transactions-per-second>
171 <isolated-transactions>$ISOLATED_TRANSACTIONS</isolated-transactions>
175 "ID_PREFIX": id_prefix,
176 "DURATION": duration,
178 "ISOLATED_TRANSACTIONS": "true" if isolated_transactions_flag else "false",
180 urlt = string.Template(
181 """http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:produce-transactions"""
183 _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict)
186 def wait_for_transactions():
187 """Blocking call, waitig for responses from all threads.
189 :return: list of triples; triple consists of response time, prefix identifier and response object
190 :rtype: list[(str, str, requests.Response)]
192 lthreads = _globals.pop("threads")
193 resqueue = _globals.pop("result_queue")
199 while not resqueue.empty():
200 results.append(resqueue.get())
202 if isinstance(rsp[2], requests.Response):
203 logger.info(rsp[2].text)
209 def get_next_transactions_response():
210 """Get http response from write-transactions rpc if available.
212 :return: None or a triple consisting of response time, prefix identifier and response object
213 :rtype: (str, str, requests.Response)
215 resqueue = _globals.get("result_queue")
217 if not resqueue.empty():
219 if isinstance(rsp[2], requests.Response):
220 logger.info(rsp[2].text)