Make pep8 more picky
[integration/test.git] / tools / netconf_tools / getter.py
1 """Multithreaded utility for rapid Netconf device GET requesting.
2
3 This utility sends GET requests to ODL Netconf through Restconf to get a
4 bunch of configuration data from Netconf mounted devices and then checks the
5 results against caller provided content. The requests are sent via a
6 configurable number of workers. Each worker issues a bunch of blocking
7 restconf requests. Work is distributed in round-robin fashion. The utility
8 waits for the last worker to finish, or for time to run off.
9
10 The responses are checked for status (200 OK is expected) and content
11 (provided by user via the "--data" command line option). Results are written
12 to collections.Counter and printed at exit. If collections does not contain
13 Counter, "import Counter" is attempted.
14
15 It is advised to pin the python process to single CPU for optimal performance
16 as Global Interpreter Lock prevents true utilization on more CPUs (while
17 overhead of context switching remains).
18 """
19
20 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
21 #
22 # This program and the accompanying materials are made available under the
23 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
24 # and is available at http://www.eclipse.org/legal/epl-v10.html
25
26 import argparse
27 import collections  # For deque and Counter.
28 import threading
29 import time
30 import AuthStandalone
31
32
33 __author__ = "Vratko Polak"
34 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
35 __license__ = "Eclipse Public License v1.0"
36 __email__ = "vrpolak@cisco.com"
37
38
39 def str2bool(text):
40     """Utility converter, based on http://stackoverflow.com/a/19227287"""
41     return text.lower() in ("yes", "true", "y", "t", "1")
42
43
44 def parse_arguments():
45     parser = argparse.ArgumentParser()
46
47     # Netconf and Restconf related arguments.
48     parser.add_argument('--odladdress', default='127.0.0.1',
49                         help='IP address of ODL Restconf to be used')
50     parser.add_argument('--restconfport', default='8181',
51                         help='Port on which ODL Restconf to be used')
52     parser.add_argument('--user', default='admin',
53                         help='Username for ODL Restconf authentication')
54     parser.add_argument('--password', default='admin',
55                         help='Password for ODL Restconf authentication')
56     parser.add_argument('--scope', default='sdn',
57                         help='Scope for ODL Restconf authentication')
58     parser.add_argument('--count', type=int,
59                         help='Count of devices to query')
60     parser.add_argument('--name',
61                         help='Name of device without the ID suffix')
62     parser.add_argument('--reuse', default='True', type=str2bool,
63                         help='Should single requests session be re-used')
64
65     # Work related arguments.
66     parser.add_argument('--workers', default='1', type=int,
67                         help='number of blocking http threads to use')
68     parser.add_argument('--timeout', default='300', type=float,
69                         help='timeout in seconds for all jobs to complete')
70     parser.add_argument('--refresh', default='0.1', type=float,
71                         help='seconds to sleep in main thread if nothing to do')
72
73     return parser.parse_args()  # arguments are read
74
75
76 class TRequestWithResponse(object):
77
78     def __init__(self, uri, kwargs):
79         self.uri = uri
80         self.kwargs = kwargs
81         self.response_ready = threading.Event()
82
83     def set_response(self, runtime, status, content):
84         self.status = status
85         self.runtime = runtime
86         self.content = content
87         self.response_ready.set()
88
89     def wait_for_response(self):
90         self.response_ready.wait()
91
92
93 def queued_send(session, queue_messages):
94     """Pop from queue, Post and append result; repeat until empty."""
95     while 1:
96         try:
97             request = queue_messages.popleft()
98         except IndexError:  # nothing more to send
99             break
100         start = time.time()
101         response = AuthStandalone.Get_Using_Session(session, request.uri, **request.kwargs)
102         stop = time.time()
103         status = int(response.status_code)
104         content = repr(response.content)
105         runtime = stop - start
106         request.set_response((start, stop, runtime), status, content)
107
108
109 def collect_results(request_list, response_queue):
110     for request in request_list:
111         request.wait_for_response()
112         response = (request.status, request.runtime, request.content)
113         response_queue.append(response)
114
115
116 def watch_for_timeout(timeout, response_queue):
117     time.sleep(timeout)
118     response_queue.append((None, 'Time is up!'))
119
120
121 def run_thread(thread_target, *thread_args):
122     thread = threading.Thread(target=thread_target, args=thread_args)
123     thread.daemon = True
124     thread.start()
125     return thread
126
127
128 # Parse the command line arguments
129 args = parse_arguments()
130
131 # Construct the work for the workers.
132 url_start = 'config/network-topology:network-topology/'
133 url_start += "topology/topology-netconf/node/"
134 url_start += args.name + "-"
135 url_end = "/yang-ext:mount"
136 headers = {'Content-Type': 'application/xml', "Accept": "application/xml"}
137 kwargs = {"headers": headers}
138 requests = []
139 for device_number in range(args.count):
140     device_url = url_start + str(device_number + 1) + url_end
141     request = TRequestWithResponse(device_url, kwargs)
142     requests.append(request)
143
144 # Organize the work into the work queues.
145 list_q_msg = [collections.deque() for _ in range(args.workers)]
146 index = 0
147 for request in requests:
148     queue = list_q_msg[index]
149     queue.append(request)
150     index += 1
151     if index == len(list_q_msg):
152         index = 0
153
154 # Spawn the workers, giving each a queue.
155 threads = []
156 for queue_messages in list_q_msg:
157     session = AuthStandalone.Init_Session(args.odladdress, args.user, args.password, args.scope, args.reuse)
158     thread = run_thread(queued_send, session, queue_messages)
159     threads.append(thread)
160
161 # Spawn the results collector worker
162 responses = collections.deque()
163 collector = run_thread(collect_results, requests, responses)
164
165 # Spawn the watchdog thread
166 watchdog = run_thread(watch_for_timeout, args.timeout, responses)
167
168 # Watch the response queue, outputting the lines
169 request_count = args.count
170 while request_count > 0:
171     if len(responses) > 0:
172         result = responses.popleft()
173         if result[0] is None:
174             print "ERROR|" + result[1] + "|"
175             break
176         runtime = "%5.3f|%5.3f|%5.3f" % result[1]
177         print "%03d|%s|%s|" % (result[0], runtime, result[2])
178         request_count -= 1
179         continue
180     time.sleep(args.refresh)