+
+def _send_http_request_thread_impl(rqueue, prefix_id, url, data, http_timeout):
+ """Start either publish or write transactions rpc based on input.
+
+ :param rqueue: result queue
+ :type rqueue: queue.Queue
+ :param prefix_id: identifier for prefix, should imply cluster member index
+ :type prefix_id: str
+ :param url: rpc url
+ :type url: str
+ :param data: http request content
+ :type data: str
+ :param http_timeout: http response timeout
+ :type http_timeout: int
+ """
+ logger.info("rpc invoked with details: {}".format(data))
+ try:
+ resp = requests.post(
+ url=url,
+ headers={"Content-Type": "application/xml"},
+ data=data,
+ auth=("admin", "admin"),
+ timeout=http_timeout,
+ )
+ except Exception as exc:
+ resp = exc
+ logger.debug(exc)
+ rqueue.put((time.ctime(), prefix_id, resp))
+
+
+def _initiate_rpcs(host_list, index_list, url_templ, data_templ, subst_dict):
+ """Initiate rpc on given hosts.
+
+ :param host_list: IP addresses of odl nodes
+ :type host_list: list[str]
+ :param index_list: node indices which correspond to the ip addresses
+ :type index_list: list[int]
+ :param url_templ: url template
+ :type url_templ: string.Template object
+ :param data_templ: http request data
+ :type data_templ: string.Template object
+ :param subst_dict: dictionary with key value pairs to be used with template
+ :type subst_dict: dict
+ """
+ resqueue = _globals.pop("result_queue", queue.Queue())
+ lthreads = _globals.pop("threads", [])
+ for i, host in enumerate(host_list):
+ url = url_templ.substitute({"HOST": host})
+ timeout = int(subst_dict["DURATION"]) + 3 * 125 + 10
+ prefix_id = subst_dict["ID_PREFIX"] + str(index_list[i])
+ subst_dict["ID"] = prefix_id
+ data = data_templ.substitute(subst_dict)
+ logger.info("url: {}, data: {}, timeout: {}".format(url, data, timeout))
+ t = threading.Thread(
+ target=_send_http_request_thread_impl,
+ args=(resqueue, prefix_id, url, data, timeout),
+ )
+ t.daemon = True
+ t.start()
+ lthreads.append(t)
+
+ _globals.update({"threads": lthreads, "result_queue": resqueue})
+
+
+def start_write_transactions_on_nodes(
+ host_list,
+ index_list,
+ id_prefix,
+ duration,
+ rate,
+ chained_flag=False,
+ reset_globals=True,
+):
+ """Invoke write-transactions rpc on given nodes.
+
+ :param host_list: IP addresses of odl nodes
+ :type host_list: list[str]
+ :param index_list: node indices which correspond to the ip addresses
+ :type index_list: list[int]
+ :param id_prefix: identifier prefix
+ :type id_prefix: str
+ :param duration: time in seconds