1 """Multithreaded utility for rapid Netconf device GET requesting.
3 This utility sends GET requests to ODL Netconf through Restconf to get a
4 bunch of configuration data from Netconf mounted devices and then checks the
5 results against caller provided content. The requests are sent via a
6 configurable number of workers. Each worker issues a bunch of blocking
7 restconf requests. Work is distributed in round-robin fashion. The utility
8 waits for the last worker to finish, or for time to run off.
10 The responses are checked for status (200 OK is expected) and content
11 (provided by user via the "--data" command line option). Results are written
12 to collections.Counter and printed at exit. If collections does not contain
13 Counter, "import Counter" is attempted.
15 It is advised to pin the python process to single CPU for optimal performance
16 as Global Interpreter Lock prevents true utilization on more CPUs (while
17 overhead of context switching remains).
20 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
22 # This program and the accompanying materials are made available under the
23 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
24 # and is available at http://www.eclipse.org/legal/epl-v10.html
27 import collections # For deque and Counter.
33 __author__ = "Vratko Polak"
34 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
35 __license__ = "Eclipse Public License v1.0"
36 __email__ = "vrpolak@cisco.com"
40 """Utility converter, based on http://stackoverflow.com/a/19227287"""
41 return text.lower() in ("yes", "true", "y", "t", "1")
44 def parse_arguments():
45 parser = argparse.ArgumentParser()
47 # Netconf and Restconf related arguments.
48 parser.add_argument('--odladdress', default='127.0.0.1',
49 help='IP address of ODL Restconf to be used')
50 parser.add_argument('--restconfport', default='8181',
51 help='Port on which ODL Restconf to be used')
52 parser.add_argument('--user', default='admin',
53 help='Username for ODL Restconf authentication')
54 parser.add_argument('--password', default='admin',
55 help='Password for ODL Restconf authentication')
56 parser.add_argument('--scope', default='sdn',
57 help='Scope for ODL Restconf authentication')
58 parser.add_argument('--count', type=int,
59 help='Count of devices to query')
60 parser.add_argument('--name',
61 help='Name of device without the ID suffix')
62 parser.add_argument('--reuse', default='True', type=str2bool,
63 help='Should single requests session be re-used')
65 # Work related arguments.
66 parser.add_argument('--workers', default='1', type=int,
67 help='number of blocking http threads to use')
68 parser.add_argument('--timeout', default='300', type=float,
69 help='timeout in seconds for all jobs to complete')
70 parser.add_argument('--refresh', default='0.1', type=float,
71 help='seconds to sleep in main thread if nothing to do')
73 return parser.parse_args() # arguments are read
76 class TRequestWithResponse(object):
78 def __init__(self, uri, kwargs):
81 self.response_ready = threading.Event()
83 def set_response(self, runtime, status, content):
85 self.runtime = runtime
86 self.content = content
87 self.response_ready.set()
89 def wait_for_response(self):
90 self.response_ready.wait()
93 def queued_send(session, queue_messages):
94 """Pop from queue, Post and append result; repeat until empty."""
97 request = queue_messages.popleft()
98 except IndexError: # nothing more to send
101 response = AuthStandalone.Get_Using_Session(session, request.uri, **request.kwargs)
103 status = int(response.status_code)
104 content = repr(response.content)
105 runtime = stop - start
106 request.set_response((start, stop, runtime), status, content)
109 def collect_results(request_list, response_queue):
110 for request in request_list:
111 request.wait_for_response()
112 response = (request.status, request.runtime, request.content)
113 response_queue.append(response)
116 def watch_for_timeout(timeout, response_queue):
118 response_queue.append((None, 'Time is up!'))
121 def run_thread(thread_target, *thread_args):
122 thread = threading.Thread(target=thread_target, args=thread_args)
128 # Parse the command line arguments
129 args = parse_arguments()
131 # Construct the work for the workers.
132 url_start = 'config/network-topology:network-topology/'
133 url_start += "topology/topology-netconf/node/"
134 url_start += args.name + "-"
135 url_end = "/yang-ext:mount"
136 headers = {'Content-Type': 'application/xml', "Accept": "application/xml"}
137 kwargs = {"headers": headers}
139 for device_number in range(args.count):
140 device_url = url_start + str(device_number + 1) + url_end
141 request = TRequestWithResponse(device_url, kwargs)
142 requests.append(request)
144 # Organize the work into the work queues.
145 list_q_msg = [collections.deque() for _ in range(args.workers)]
147 for request in requests:
148 queue = list_q_msg[index]
149 queue.append(request)
151 if index == len(list_q_msg):
154 # Spawn the workers, giving each a queue.
156 for queue_messages in list_q_msg:
157 session = AuthStandalone.Init_Session(args.odladdress, args.user, args.password, args.scope, args.reuse)
158 thread = run_thread(queued_send, session, queue_messages)
159 threads.append(thread)
161 # Spawn the results collector worker
162 responses = collections.deque()
163 collector = run_thread(collect_results, requests, responses)
165 # Spawn the watchdog thread
166 watchdog = run_thread(watch_for_timeout, args.timeout, responses)
168 # Watch the response queue, outputting the lines
169 request_count = args.count
170 while request_count > 0:
171 if len(responses) > 0:
172 result = responses.popleft()
173 if result[0] is None:
174 print "ERROR|" + result[1] + "|"
176 runtime = "%5.3f|%5.3f|%5.3f" % result[1]
177 print "%03d|%s|%s|" % (result[0], runtime, result[2])
180 time.sleep(args.refresh)