1 """Multithreaded utility for rapid Netconf device GET requesting.
3 This utility sends GET requests to ODL Netconf through Restconf to get a
4 bunch of configuration data from Netconf mounted devices and then checks the
5 results against caller provided content. The requests are sent via a
6 configurable number of workers. Each worker issues a bunch of blocking
7 restconf requests. Work is distributed in round-robin fashion. The utility
8 waits for the last worker to finish, or for time to run off.
10 The responses are checked for status (200 OK is expected) and content
11 (provided by user via the "--data" command line option). Results are written
12 to collections.Counter and printed at exit. If collections does not contain
13 Counter, "import Counter" is attempted.
15 It is advised to pin the python process to single CPU for optimal performance
16 as Global Interpreter Lock prevents true utilization on more CPUs (while
17 overhead of context switching remains).
20 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
22 # This program and the accompanying materials are made available under the
23 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
24 # and is available at http://www.eclipse.org/legal/epl-v10.html
27 import collections # For deque and Counter.
33 __author__ = "Vratko Polak"
34 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
35 __license__ = "Eclipse Public License v1.0"
36 __email__ = "vrpolak@cisco.com"
40 """Utility converter, based on http://stackoverflow.com/a/19227287"""
41 return text.lower() in ("yes", "true", "y", "t", "1")
44 def parse_arguments():
45 parser = argparse.ArgumentParser()
47 # Netconf and Restconf related arguments.
51 help="IP address of ODL Restconf to be used",
54 "--restconfport", default="8181", help="Port on which ODL Restconf to be used"
57 "--user", default="admin", help="Username for ODL Restconf authentication"
60 "--password", default="admin", help="Password for ODL Restconf authentication"
62 parser.add_argument("--scope", help="Scope for ODL Restconf authentication")
63 parser.add_argument("--count", type=int, help="Count of devices to query")
64 parser.add_argument("--name", help="Name of device without the ID suffix")
69 help="Should single requests session be re-used",
72 # Work related arguments.
77 help="number of blocking http threads to use",
83 help="timeout in seconds for all jobs to complete",
89 help="seconds to sleep in main thread if nothing to do",
92 return parser.parse_args() # arguments are read
95 class TRequestWithResponse(object):
96 def __init__(self, uri, kwargs):
99 self.response_ready = threading.Event()
101 def set_response(self, runtime, status, content):
103 self.runtime = runtime
104 self.content = content
105 self.response_ready.set()
107 def wait_for_response(self):
108 self.response_ready.wait()
111 def queued_send(session, queue_messages):
112 """Pop from queue, Post and append result; repeat until empty."""
115 request = queue_messages.popleft()
116 except IndexError: # nothing more to send
119 response = AuthStandalone.Get_Using_Session(
120 session, request.uri, **request.kwargs
123 status = int(response.status_code)
124 content = repr(response.content)
125 runtime = stop - start
126 request.set_response((start, stop, runtime), status, content)
129 def collect_results(request_list, response_queue):
130 for request in request_list:
131 request.wait_for_response()
132 response = (request.status, request.runtime, request.content)
133 response_queue.append(response)
136 def watch_for_timeout(timeout, response_queue):
138 response_queue.append((None, "Time is up!"))
141 def run_thread(thread_target, *thread_args):
142 thread = threading.Thread(target=thread_target, args=thread_args)
148 # Parse the command line arguments
149 args = parse_arguments()
151 # Construct the work for the workers.
152 url_start = "config/network-topology:network-topology/"
153 url_start += "topology/topology-netconf/node/"
154 url_start += args.name + "-"
155 url_end = "/yang-ext:mount"
156 headers = {"Content-Type": "application/xml", "Accept": "application/xml"}
157 kwargs = {"headers": headers}
159 for device_number in range(args.count):
160 device_url = url_start + str(device_number + 1) + url_end
161 request = TRequestWithResponse(device_url, kwargs)
162 requests.append(request)
164 # Organize the work into the work queues.
165 list_q_msg = [collections.deque() for _ in range(args.workers)]
167 for request in requests:
168 queue = list_q_msg[index]
169 queue.append(request)
171 if index == len(list_q_msg):
174 # Spawn the workers, giving each a queue.
176 for queue_messages in list_q_msg:
177 session = AuthStandalone.Init_Session(
178 args.odladdress, args.user, args.password, args.scope, args.reuse
180 thread = run_thread(queued_send, session, queue_messages)
181 threads.append(thread)
183 # Spawn the results collector worker
184 responses = collections.deque()
185 collector = run_thread(collect_results, requests, responses)
187 # Spawn the watchdog thread
188 watchdog = run_thread(watch_for_timeout, args.timeout, responses)
190 # Watch the response queue, outputting the lines
191 request_count = args.count
192 while request_count > 0:
193 if len(responses) > 0:
194 result = responses.popleft()
195 if result[0] is None:
196 print("ERROR|" + result[1] + "|")
198 runtime = "%5.3f|%5.3f|%5.3f" % result[1]
199 print("%03d|%s|%s|" % ((result[0], runtime, result[2])))
202 time.sleep(args.refresh)