1 """Multithreaded utility for rapid Netconf device GET requesting.
3 This utility sends GET requests to ODL Netconf through Restconf to get a
4 bunch of configuration data from Netconf mounted devices and then checks the
5 results against caller provided content. The requests are sent via a
6 configurable number of workers. Each worker issues a bunch of blocking
7 restconf requests. Work is distributed in round-robin fashion. The utility
8 waits for the last worker to finish, or for time to run off.
10 The responses are checked for status (200 OK is expected) and content
11 (provided by user via the "--data" command line option). Results are written
12 to collections.Counter and printed at exit. If collections does not contain
13 Counter, "import Counter" is attempted.
15 It is advised to pin the python process to single CPU for optimal performance
16 as Global Interpreter Lock prevents true utilization on more CPUs (while
17 overhead of context switching remains).
20 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
22 # This program and the accompanying materials are made available under the
23 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
24 # and is available at http://www.eclipse.org/legal/epl-v10.html
26 __author__ = "Vratko Polak"
27 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
28 __license__ = "Eclipse Public License v1.0"
29 __email__ = "vrpolak@cisco.com"
33 import collections # For deque and Counter.
40 """Utility converter, based on http://stackoverflow.com/a/19227287"""
41 return text.lower() in ("yes", "true", "y", "t", "1")
44 def parse_arguments():
45 parser = argparse.ArgumentParser()
47 # Netconf and Restconf related arguments.
48 parser.add_argument('--odladdress', default='127.0.0.1',
49 help='IP address of ODL Restconf to be used')
50 parser.add_argument('--restconfport', default='8181',
51 help='Port on which ODL Restconf to be used')
52 parser.add_argument('--user', default='admin',
53 help='Username for ODL Restconf authentication')
54 parser.add_argument('--password', default='admin',
55 help='Password for ODL Restconf authentication')
56 parser.add_argument('--scope', default='sdn',
57 help='Scope for ODL Restconf authentication')
58 parser.add_argument('--count', type=int,
59 help='Count of devices to query')
60 parser.add_argument('--name',
61 help='Name of device without the ID suffix')
62 parser.add_argument('--reuse', default='True', type=str2bool,
63 help='Should single requests session be re-used')
65 # Work related arguments.
66 parser.add_argument('--workers', default='1', type=int,
67 help='number of blocking http threads to use')
68 parser.add_argument('--timeout', default='300', type=float,
69 help='timeout in seconds for all jobs to complete')
70 parser.add_argument('--refresh', default='0.1', type=float,
71 help='seconds to sleep in main thread if nothing to do')
73 return parser.parse_args() # arguments are read
76 class TRequestWithResponse(object):
78 def __init__(self, uri, kwargs):
81 self.response_ready = threading.Event()
83 def set_response(self, runtime, status, content):
85 self.runtime = runtime
86 self.content = content
87 self.response_ready.set()
89 def wait_for_response(self):
90 self.response_ready.wait()
93 def queued_send(session, queue_messages):
94 """Pop from queue, Post and append result; repeat until empty."""
97 request = queue_messages.popleft()
98 except IndexError: # nothing more to send
101 response = AuthStandalone.Get_Using_Session(session, request.uri, **request.kwargs)
103 status = int(response.status_code)
104 content = repr(response.content)
105 runtime = stop - start
106 request.set_response((start, stop, runtime), status, content)
109 def collect_results(request_list, response_queue):
110 for request in request_list:
111 request.wait_for_response()
112 response = (request.status, request.runtime, request.content)
113 response_queue.append(response)
116 def watch_for_timeout(timeout, response_queue):
118 response_queue.append((None, 'Time is up!'))
121 def run_thread(thread_target, *thread_args):
122 thread = threading.Thread(target=thread_target, args=thread_args)
128 # Parse the command line arguments
129 args = parse_arguments()
131 # Construct the work for the workers.
134 "network-topology:network-topology/topology/topology-netconf/node/"
137 url_end = "/yang-ext:mount"
138 headers = {'Content-Type': 'application/xml', "Accept": "application/xml"}
139 kwargs = {"headers": headers}
141 for device_number in range(args.count):
142 device_url = url_start + str(device_number + 1) + url_end
143 request = TRequestWithResponse(device_url, kwargs)
144 requests.append(request)
146 # Organize the work into the work queues.
147 list_q_msg = [collections.deque() for _ in range(args.workers)]
149 for request in requests:
150 queue = list_q_msg[index]
151 queue.append(request)
153 if index == len(list_q_msg):
156 # Spawn the workers, giving each a queue.
158 for queue_messages in list_q_msg:
159 session = AuthStandalone.Init_Session(args.odladdress, args.user, args.password, args.scope, args.reuse)
160 thread = run_thread(queued_send, session, queue_messages)
161 threads.append(thread)
163 # Spawn the results collector worker
164 responses = collections.deque()
165 collector = run_thread(collect_results, requests, responses)
167 # Spawn the watchdog thread
168 watchdog = run_thread(watch_for_timeout, args.timeout, responses)
170 # Watch the response queue, outputting the lines
171 request_count = args.count
172 while request_count > 0:
173 if len(responses) > 0:
174 result = responses.popleft()
175 if result[0] is None:
176 print "ERROR|" + result[1]+"|"
178 runtime = "%5.3f|%5.3f|%5.3f" % result[1]
179 print "%03d|%s|%s|" % (result[0], runtime, result[2])
182 time.sleep(args.refresh)