2 Python invocation of several parallel publish-notifications RPCs.
4 from robot.api import logger
15 def _send_http_request_thread_impl(rqueue, index, url, data, http_timeout):
16 """Start either publish or write transactions rpc based on input.
18 :param rqueue: result queue
19 :type rqueue: Queue.Queue
20 :param index: cluster member index
24 :param data: http request content
26 :param http_timeout: http response timeout
27 :type http_timeout: int
29 logger.info('rpc invoked with details: {}'.format(data))
31 resp = requests.post(url=url, headers={'Content-Type': 'application/xml'},
32 data=data, auth=('admin', 'admin'), timeout=http_timeout)
33 except Exception as exc:
36 rqueue.put(("member-" + str(index), time.ctime(), resp))
39 def _initiate_rpcs(host_list, prefix_list, url_templ, data_templ, subst_dict):
40 """Initiate rpc on given hosts.
42 :param host_list: list of ip address of odl nodes
43 :type host_list: list of strings
44 :param id_prefix: list of node indexes which coresponds to the ip addresses
45 :type id_prefix: string
46 :param url_templ: url template
47 :type url_templ: string.Template object
48 :param data_templ: http request data
49 :type data_templ: string.Template object
50 :param subst_dict: dictionary with key value pairs to be used with template
51 :type subst_dict: dict
53 resqueue = _globals.pop('result_queue', Queue.Queue())
54 lthreads = _globals.pop('threads', [])
55 for i, host in enumerate(host_list):
56 subst_dict.update({'ID': '{}{}'.format(subst_dict['ID_PREFIX'], prefix_list[i])})
57 url = url_templ.substitute({'HOST': host})
58 data = data_templ.substitute(subst_dict)
59 timeout = int(subst_dict['DURATION']) + 3*125+10
60 logger.info('url: {}, data: {}, timeout: {}'.format(url, data, timeout))
61 t = threading.Thread(target=_send_http_request_thread_impl,
62 args=(resqueue, i, url, data, timeout))
67 _globals.update({'threads': lthreads, 'result_queue': resqueue})
70 def start_write_transactions_on_nodes(host_list, prefix_list, id_prefix, duration, rate, chained_flag=False,
72 """Invoke write-transactions rpc on given nodes.
74 :param host_list: list of ip address of odl nodes
75 :type host_list: list of strings
76 :param prefix_list: list of node indexes which coresponds to the ip addresses
77 :type prefix_list: list
78 :param id_prefix: identifier prefix
79 :type id_prefix: string
80 :param duration: time in seconds
82 :param rate: writing transactions rate in transactions per second
84 :param chained_flag: specify chained vs. simple transactions
85 :type chained_flag: bool
86 :param reset_globals: reset global variable dict
87 :type reset_globals: bool
93 "Input parameters: host_list:{}, prefix_list:{}, id_prefix:{}, duration:{}, rate:{}, chained_flag:{}".format(
94 host_list, prefix_list, id_prefix, duration, rate, chained_flag))
95 datat = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
97 <seconds>$DURATION</seconds>
98 <transactions-per-second>$RATE</transactions-per-second>
99 <chained-transactions>$CHAINED_FLAG</chained-transactions>
101 subst_dict = {'ID_PREFIX': id_prefix, 'DURATION': duration, 'RATE': rate, 'CHAINED_FLAG': chained_flag}
102 urlt = string.Template('''http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:write-transactions''')
103 _initiate_rpcs(host_list, prefix_list, urlt, datat, subst_dict)
106 def start_produce_transactions_on_nodes(host_list, prefix_list, id_prefix,
107 duration, rate, isolated_transactions_flag=False, reset_globals=True):
108 """Invoke produce-transactions rpcs on given nodes.
110 :param host_list: list of ip address of odl nodes
111 :type host_list: list of strings
112 :param prefix_list: list of node indexes which coresponds to the ip addresses
113 :type prefix_list: list
114 :param id_prefix: identifier prefix
115 :type id_prefix: string
116 :param duration: time in seconds
118 :param rate: produce transactions rate in transactions per second
120 :param isolated_transactions_flag: isolated transactions flag
121 :type isolated_transactions_flag: bool
122 :param reset_globals: reset global variable dict
123 :type reset_globals: bool
128 msg = "host_list:{}, prefix_list:{} ,id_prefix:{}, duration:{}, rate:{}, isolated_transactions:{}".format(
129 host_list, prefix_list, id_prefix, duration, rate, isolated_transactions_flag)
130 msg = "Input parameters: " + msg
132 datat = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
134 <seconds>$DURATION</seconds>
135 <transactions-per-second>$RATE</transactions-per-second>
136 <isolated-transactions>$ISOLATED_TRANSACTIONS</isolated-transactions>
138 subst_dict = {'ID_PREFIX': id_prefix, 'DURATION': duration, 'RATE': rate,
139 'ISOLATED_TRANSACTIONS': isolated_transactions_flag}
140 urlt = string.Template('''http://$HOST:8181/restconf/operations/odl-mdsal-lowlevel-control:produce-transactions''')
141 _initiate_rpcs(host_list, prefix_list, urlt, datat, subst_dict)
144 def wait_for_transactions():
145 """Blocking call, waitig for responses from all threads.
147 :return: list of triples; triple consists of member name, response time and response object
148 :rtype: list[(str, int, requests.Response)]
150 lthreads = _globals.pop('threads')
151 resqueue = _globals.pop('result_queue')
157 while not resqueue.empty():
158 results.append(resqueue.get())
160 if isinstance(rsp[2], requests.Response):
161 logger.info(rsp[2].text)
167 def get_next_transactions_response():
168 """Get http response from write-transactions rpc if available.
170 :return: None or a triple consisting of member name, response time and response object"""
171 resqueue = _globals.get('result_queue')
173 if not resqueue.empty():
175 if isinstance(rsp[2], requests.Response):
176 logger.info(rsp[2].text)