Migrate Get Requests invocations(libraries)
[integration/test.git] / tools / netconf_tools / getter.py
1 """Multithreaded utility for rapid Netconf device GET requesting.
2
3 This utility sends GET requests to ODL Netconf through Restconf to get a
4 bunch of configuration data from Netconf mounted devices and then checks the
5 results against caller provided content. The requests are sent via a
6 configurable number of workers. Each worker issues a bunch of blocking
7 restconf requests. Work is distributed in round-robin fashion. The utility
8 waits for the last worker to finish, or for time to run off.
9
10 The responses are checked for status (200 OK is expected) and content
11 (provided by user via the "--data" command line option). Results are written
12 to collections.Counter and printed at exit. If collections does not contain
13 Counter, "import Counter" is attempted.
14
15 It is advised to pin the python process to single CPU for optimal performance
16 as Global Interpreter Lock prevents true utilization on more CPUs (while
17 overhead of context switching remains).
18 """
19
20 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
21 #
22 # This program and the accompanying materials are made available under the
23 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
24 # and is available at http://www.eclipse.org/legal/epl-v10.html
25
26 import argparse
27 import collections  # For deque and Counter.
28 import threading
29 import time
30 import AuthStandalone
31
32
33 __author__ = "Vratko Polak"
34 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
35 __license__ = "Eclipse Public License v1.0"
36 __email__ = "vrpolak@cisco.com"
37
38
39 def str2bool(text):
40     """Utility converter, based on http://stackoverflow.com/a/19227287"""
41     return text.lower() in ("yes", "true", "y", "t", "1")
42
43
44 def parse_arguments():
45     parser = argparse.ArgumentParser()
46
47     # Netconf and Restconf related arguments.
48     parser.add_argument(
49         "--odladdress",
50         default="127.0.0.1",
51         help="IP address of ODL Restconf to be used",
52     )
53     parser.add_argument(
54         "--restconfport", default="8181", help="Port on which ODL Restconf to be used"
55     )
56     parser.add_argument(
57         "--user", default="admin", help="Username for ODL Restconf authentication"
58     )
59     parser.add_argument(
60         "--password", default="admin", help="Password for ODL Restconf authentication"
61     )
62     parser.add_argument("--scope", help="Scope for ODL Restconf authentication")
63     parser.add_argument("--count", type=int, help="Count of devices to query")
64     parser.add_argument("--name", help="Name of device without the ID suffix")
65     parser.add_argument(
66         "--reuse",
67         default="True",
68         type=str2bool,
69         help="Should single requests session be re-used",
70     )
71
72     # Work related arguments.
73     parser.add_argument(
74         "--workers",
75         default="1",
76         type=int,
77         help="number of blocking http threads to use",
78     )
79     parser.add_argument(
80         "--timeout",
81         default="300",
82         type=float,
83         help="timeout in seconds for all jobs to complete",
84     )
85     parser.add_argument(
86         "--refresh",
87         default="0.1",
88         type=float,
89         help="seconds to sleep in main thread if nothing to do",
90     )
91
92     return parser.parse_args()  # arguments are read
93
94
95 class TRequestWithResponse(object):
96     def __init__(self, uri, kwargs):
97         self.uri = uri
98         self.kwargs = kwargs
99         self.response_ready = threading.Event()
100
101     def set_response(self, runtime, status, content):
102         self.status = status
103         self.runtime = runtime
104         self.content = content
105         self.response_ready.set()
106
107     def wait_for_response(self):
108         self.response_ready.wait()
109
110
111 def queued_send(session, queue_messages):
112     """Pop from queue, Post and append result; repeat until empty."""
113     while 1:
114         try:
115             request = queue_messages.popleft()
116         except IndexError:  # nothing more to send
117             break
118         start = time.time()
119         response = AuthStandalone.Get_Using_Session(
120             session, request.uri, **request.kwargs
121         )
122         stop = time.time()
123         status = int(response.status_code)
124         content = repr(response.content)
125         runtime = stop - start
126         request.set_response((start, stop, runtime), status, content)
127
128
129 def collect_results(request_list, response_queue):
130     for request in request_list:
131         request.wait_for_response()
132         response = (request.status, request.runtime, request.content)
133         response_queue.append(response)
134
135
136 def watch_for_timeout(timeout, response_queue):
137     time.sleep(timeout)
138     response_queue.append((None, "Time is up!"))
139
140
141 def run_thread(thread_target, *thread_args):
142     thread = threading.Thread(target=thread_target, args=thread_args)
143     thread.daemon = True
144     thread.start()
145     return thread
146
147
148 # Parse the command line arguments
149 args = parse_arguments()
150
151 # Construct the work for the workers.
152 url_start = "config/network-topology:network-topology/"
153 url_start += "topology/topology-netconf/node/"
154 url_start += args.name + "-"
155 url_end = "/yang-ext:mount"
156 headers = {"Content-Type": "application/xml", "Accept": "application/xml"}
157 kwargs = {"headers": headers}
158 requests = []
159 for device_number in range(args.count):
160     device_url = url_start + str(device_number + 1) + url_end
161     request = TRequestWithResponse(device_url, kwargs)
162     requests.append(request)
163
164 # Organize the work into the work queues.
165 list_q_msg = [collections.deque() for _ in range(args.workers)]
166 index = 0
167 for request in requests:
168     queue = list_q_msg[index]
169     queue.append(request)
170     index += 1
171     if index == len(list_q_msg):
172         index = 0
173
174 # Spawn the workers, giving each a queue.
175 threads = []
176 for queue_messages in list_q_msg:
177     session = AuthStandalone.Init_Session(
178         args.odladdress, args.user, args.password, args.scope, args.reuse
179     )
180     thread = run_thread(queued_send, session, queue_messages)
181     threads.append(thread)
182
183 # Spawn the results collector worker
184 responses = collections.deque()
185 collector = run_thread(collect_results, requests, responses)
186
187 # Spawn the watchdog thread
188 watchdog = run_thread(watch_for_timeout, args.timeout, responses)
189
190 # Watch the response queue, outputting the lines
191 request_count = args.count
192 while request_count > 0:
193     if len(responses) > 0:
194         result = responses.popleft()
195         if result[0] is None:
196             print("ERROR|" + result[1] + "|")
197             break
198         runtime = "%5.3f|%5.3f|%5.3f" % result[1]
199         print("%03d|%s|%s|" % ((result[0], runtime, result[2])))
200         request_count -= 1
201         continue
202     time.sleep(args.refresh)