Netconf Scaling test for multithreaded GET requests
[integration/test.git] / tools / netconf_tools / getter.py
1 """Multithreaded utility for rapid Netconf device GET requesting.
2
3 This utility sends GET requests to ODL Netconf through Restconf to get a
4 bunch of configuration data from Netconf mounted devices and then checks the
5 results against caller provided content. The requests are sent via a
6 configurable number of workers. Each worker issues a bunch of blocking
7 restconf requests. Work is distributed in round-robin fashion. The utility
8 waits for the last worker to finish, or for time to run off.
9
10 The responses are checked for status (200 OK is expected) and content
11 (provided by user via the "--data" command line option). Results are written
12 to collections.Counter and printed at exit. If collections does not contain
13 Counter, "import Counter" is attempted.
14
15 It is advised to pin the python process to single CPU for optimal performance
16 as Global Interpreter Lock prevents true utilization on more CPUs (while
17 overhead of context switching remains).
18 """
19
20 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
21 #
22 # This program and the accompanying materials are made available under the
23 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
24 # and is available at http://www.eclipse.org/legal/epl-v10.html
25
26 __author__ = "Vratko Polak"
27 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
28 __license__ = "Eclipse Public License v1.0"
29 __email__ = "vrpolak@cisco.com"
30
31
32 import argparse
33 import collections  # For deque and Counter.
34 import threading
35 import time
36 import AuthStandalone
37
38
39 def str2bool(text):
40     """Utility converter, based on http://stackoverflow.com/a/19227287"""
41     return text.lower() in ("yes", "true", "y", "t", "1")
42
43
44 def parse_arguments():
45     parser = argparse.ArgumentParser()
46
47     # Netconf and Restconf related arguments.
48     parser.add_argument('--odladdress', default='127.0.0.1',
49                         help='IP address of ODL Restconf to be used')
50     parser.add_argument('--restconfport', default='8181',
51                         help='Port on which ODL Restconf to be used')
52     parser.add_argument('--user', default='admin',
53                         help='Username for ODL Restconf authentication')
54     parser.add_argument('--password', default='admin',
55                         help='Password for ODL Restconf authentication')
56     parser.add_argument('--scope', default='sdn',
57                         help='Scope for ODL Restconf authentication')
58     parser.add_argument('--count', type=int,
59                         help='Count of devices to query')
60     parser.add_argument('--name',
61                         help='Name of device without the ID suffix')
62     parser.add_argument('--reuse', default='True', type=str2bool,
63                         help='Should single requests session be re-used')
64
65     # Work related arguments.
66     parser.add_argument('--workers', default='1', type=int,
67                         help='number of blocking http threads to use')
68     parser.add_argument('--timeout', default='300', type=float,
69                         help='timeout in seconds for all jobs to complete')
70     parser.add_argument('--refresh', default='0.1', type=float,
71                         help='seconds to sleep in main thread if nothing to do')
72
73     return parser.parse_args()  # arguments are read
74
75
76 class TRequestWithResponse(object):
77
78     def __init__(self, uri, kwargs):
79         self.uri = uri
80         self.kwargs = kwargs
81         self.response_ready = threading.Event()
82
83     def set_response(self, runtime, status, content):
84         self.status = status
85         self.runtime = runtime
86         self.content = content
87         self.response_ready.set()
88
89     def wait_for_response(self):
90         self.response_ready.wait()
91
92
93 def queued_send(session, queue_messages):
94     """Pop from queue, Post and append result; repeat until empty."""
95     while 1:
96         try:
97             request = queue_messages.popleft()
98         except IndexError:  # nothing more to send
99             break
100         start = time.time()
101         response = AuthStandalone.Get_Using_Session(session, request.uri, **request.kwargs)
102         stop = time.time()
103         status = int(response.status_code)
104         content = repr(response.content)
105         runtime = stop - start
106         request.set_response((start, stop, runtime), status, content)
107
108
109 def collect_results(request_list, response_queue):
110     for request in request_list:
111         request.wait_for_response()
112         response = (request.status, request.runtime, request.content)
113         response_queue.append(response)
114
115
116 def watch_for_timeout(timeout, response_queue):
117     time.sleep(timeout)
118     response_queue.append((None, 'Time is up!'))
119
120
121 def run_thread(thread_target, *thread_args):
122     thread = threading.Thread(target=thread_target, args=thread_args)
123     thread.daemon = True
124     thread.start()
125     return thread
126
127
128 # Parse the command line arguments
129 args = parse_arguments()
130
131 # Construct the work for the workers.
132 url_start = (
133     'config/'
134     "network-topology:network-topology/topology/topology-netconf/node/"
135     + args.name + "-"
136 )
137 url_end = "/yang-ext:mount"
138 headers = {'Content-Type': 'application/xml', "Accept": "application/xml"}
139 kwargs = {"headers": headers}
140 requests = []
141 for device_number in range(args.count):
142     device_url = url_start + str(device_number + 1) + url_end
143     request = TRequestWithResponse(device_url, kwargs)
144     requests.append(request)
145
146 # Organize the work into the work queues.
147 list_q_msg = [collections.deque() for _ in range(args.workers)]
148 index = 0
149 for request in requests:
150     queue = list_q_msg[index]
151     queue.append(request)
152     index += 1
153     if index == len(list_q_msg):
154         index = 0
155
156 # Spawn the workers, giving each a queue.
157 threads = []
158 for queue_messages in list_q_msg:
159     session = AuthStandalone.Init_Session(args.odladdress, args.user, args.password, args.scope, args.reuse)
160     thread = run_thread(queued_send, session, queue_messages)
161     threads.append(thread)
162
163 # Spawn the results collector worker
164 responses = collections.deque()
165 collector = run_thread(collect_results, requests, responses)
166
167 # Spawn the watchdog thread
168 watchdog = run_thread(watch_for_timeout, args.timeout, responses)
169
170 # Watch the response queue, outputting the lines
171 request_count = args.count
172 while request_count > 0:
173     if len(responses) > 0:
174         result = responses.popleft()
175         if result[0] is None:
176             print "ERROR|" + result[1]+"|"
177             break
178         runtime = "%5.3f|%5.3f|%5.3f" % result[1]
179         print "%03d|%s|%s|" % (result[0], runtime, result[2])
180         request_count -= 1
181         continue
182     time.sleep(args.refresh)