2 Python invocation of several parallel publish-notifications RPCs.
4 from robot.api import logger
15 def _send_http_request_thread_impl(rqueue, prefix_id, url, data, http_timeout):
16 """Start either publish or write transactions rpc based on input.
18 :param rqueue: result queue
19 :type rqueue: Queue.Queue
20 :param prefix_id: identifier for prefix, should imply cluster member index
24 :param data: http request content
26 :param http_timeout: http response timeout
27 :type http_timeout: int
29 logger.info('rpc invoked with details: {}'.format(data))
31 resp = requests.post(url=url, headers={'Content-Type': 'application/xml'},
32 data=data, auth=('admin', 'admin'), timeout=http_timeout)
33 except Exception as exc:
36 rqueue.put((time.ctime(), prefix_id, resp))
39 def _initiate_rpcs(host_list, index_list, url_templ, data_templ, subst_dict):
40 """Initiate rpc on given hosts.
42 :param host_list: IP addresses of odl nodes
43 :type host_list: list[str]
44 :param index_list: node indices which correspond to the ip addresses
45 :type index_list: list[int]
46 :param url_templ: url template
47 :type url_templ: string.Template object
48 :param data_templ: http request data
49 :type data_templ: string.Template object
50 :param subst_dict: dictionary with key value pairs to be used with template
51 :type subst_dict: dict
53 resqueue = _globals.pop('result_queue', Queue.Queue())
54 lthreads = _globals.pop('threads', [])
55 for i, host in enumerate(host_list):
56 url = url_templ.substitute({'HOST': host})
57 timeout = int(subst_dict['DURATION']) + 3 * 125 + 10
58 prefix_id = subst_dict['ID_PREFIX'] + str(index_list[i])
59 subst_dict['ID'] = prefix_id
60 data = data_templ.substitute(subst_dict)
61 logger.info('url: {}, data: {}, timeout: {}'.format(url, data, timeout))
62 t = threading.Thread(target=_send_http_request_thread_impl,
63 args=(resqueue, prefix_id, url, data, timeout))
68 _globals.update({'threads': lthreads, 'result_queue': resqueue})
71 def start_write_transactions_on_nodes(host_list, index_list, id_prefix, duration, rate, chained_flag=False,
73 """Invoke write-transactions rpc on given nodes.
75 :param host_list: IP addresses of odl nodes
76 :type host_list: list[str]
77 :param index_list: node indices which correspond to the ip addresses
78 :type index_list: list[int]
79 :param id_prefix: identifier prefix
81 :param duration: time in seconds
83 :param rate: writing transactions rate in transactions per second
85 :param chained_flag: specify chained vs. simple transactions
86 :type chained_flag: bool
87 :param reset_globals: reset global variable dict
88 :type reset_globals: bool
94 "Input parameters: host_list:{}, index_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
95 host_list, index_list, id_prefix, duration, rate, chained_flag))
96 datat = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
98 <seconds>$DURATION</seconds>
99 <transactions-per-second>$RATE</transactions-per-second>
100 <chained-transactions>$CHAINED_FLAG</chained-transactions>
102 subst_dict = {'ID_PREFIX': id_prefix, 'DURATION': duration, 'RATE': rate, 'CHAINED_FLAG': chained_flag}
103 urlt = string.Template('''http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions''')
104 _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict)
107 def start_produce_transactions_on_nodes(host_list, index_list, id_prefix,
108 duration, rate, isolated_transactions_flag=False, reset_globals=True):
109 """Invoke produce-transactions rpcs on given nodes.
111 :param host_list: IP addresses of odl nodes
112 :type host_list: list[str]
113 :param index_list: node indices which correspond to the ip addresses
114 :type index_list: list[int]
115 :param id_prefix: identifier prefix
117 :param duration: time in seconds
119 :param rate: produce transactions rate in transactions per second
121 :param isolated_transactions_flag: isolated transactions flag
122 :type isolated_transactions_flag: bool
123 :param reset_globals: reset global variable dict
124 :type reset_globals: bool
129 msg = "host_list:{}, index_list:{} ,id_prefix:{}, duration:{}, rate:{}, isolated_transactions:{}".format(
130 host_list, index_list, id_prefix, duration, rate, isolated_transactions_flag)
131 msg = "Input parameters: " + msg
133 datat = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
135 <seconds>$DURATION</seconds>
136 <transactions-per-second>$RATE</transactions-per-second>
137 <isolated-transactions>$ISOLATED_TRANSACTIONS</isolated-transactions>
139 subst_dict = {'ID_PREFIX': id_prefix, 'DURATION': duration, 'RATE': rate,
140 'ISOLATED_TRANSACTIONS': isolated_transactions_flag}
141 urlt = string.Template('''http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:produce-transactions''')
142 _initiate_rpcs(host_list, index_list, urlt, datat, subst_dict)
145 def wait_for_transactions():
146 """Blocking call, waitig for responses from all threads.
148 :return: list of triples; triple consists of response time, prefix identifier and response object
149 :rtype: list[(str, str, requests.Response)]
151 lthreads = _globals.pop('threads')
152 resqueue = _globals.pop('result_queue')
158 while not resqueue.empty():
159 results.append(resqueue.get())
161 if isinstance(rsp[2], requests.Response):
162 logger.info(rsp[2].text)
168 def get_next_transactions_response():
169 """Get http response from write-transactions rpc if available.
171 :return: None or a triple consisting of response time, prefix identifier and response object
172 :rtype: (str, str, requests.Response)
174 resqueue = _globals.get('result_queue')
176 if not resqueue.empty():
178 if isinstance(rsp[2], requests.Response):
179 logger.info(rsp[2].text)