Netconf Scaling test for multithreaded GET requests
[integration/test.git] / tools / netconf_tools / getter.py
diff --git a/tools/netconf_tools/getter.py b/tools/netconf_tools/getter.py
new file mode 100644 (file)
index 0000000..8289209
--- /dev/null
@@ -0,0 +1,182 @@
+"""Multithreaded utility for rapid Netconf device GET requesting.
+
+This utility sends GET requests to ODL Netconf through Restconf to get a
+bunch of configuration data from Netconf mounted devices and then checks the
+results against caller provided content. The requests are sent via a
+configurable number of workers. Each worker issues a bunch of blocking
+restconf requests. Work is distributed in round-robin fashion. The utility
+waits for the last worker to finish, or for time to run off.
+
+The responses are checked for status (200 OK is expected) and content
+(provided by user via the "--data" command line option). Results are written
+to collections.Counter and printed at exit. If collections does not contain
+Counter, "import Counter" is attempted.
+
+It is advised to pin the python process to single CPU for optimal performance
+as Global Interpreter Lock prevents true utilization on more CPUs (while
+overhead of context switching remains).
+"""
+
+# Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+#
+# This program and the accompanying materials are made available under the
+# terms of the Eclipse Public License v1.0 which accompanies this distribution,
+# and is available at http://www.eclipse.org/legal/epl-v10.html
+
+__author__ = "Vratko Polak"
+__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
+__license__ = "Eclipse Public License v1.0"
+__email__ = "vrpolak@cisco.com"
+
+
+import argparse
+import collections  # For deque and Counter.
+import threading
+import time
+import AuthStandalone
+
+
+def str2bool(text):
+    """Utility converter, based on http://stackoverflow.com/a/19227287"""
+    return text.lower() in ("yes", "true", "y", "t", "1")
+
+
+def parse_arguments():
+    parser = argparse.ArgumentParser()
+
+    # Netconf and Restconf related arguments.
+    parser.add_argument('--odladdress', default='127.0.0.1',
+                        help='IP address of ODL Restconf to be used')
+    parser.add_argument('--restconfport', default='8181',
+                        help='Port on which ODL Restconf to be used')
+    parser.add_argument('--user', default='admin',
+                        help='Username for ODL Restconf authentication')
+    parser.add_argument('--password', default='admin',
+                        help='Password for ODL Restconf authentication')
+    parser.add_argument('--scope', default='sdn',
+                        help='Scope for ODL Restconf authentication')
+    parser.add_argument('--count', type=int,
+                        help='Count of devices to query')
+    parser.add_argument('--name',
+                        help='Name of device without the ID suffix')
+    parser.add_argument('--reuse', default='True', type=str2bool,
+                        help='Should single requests session be re-used')
+
+    # Work related arguments.
+    parser.add_argument('--workers', default='1', type=int,
+                        help='number of blocking http threads to use')
+    parser.add_argument('--timeout', default='300', type=float,
+                        help='timeout in seconds for all jobs to complete')
+    parser.add_argument('--refresh', default='0.1', type=float,
+                        help='seconds to sleep in main thread if nothing to do')
+
+    return parser.parse_args()  # arguments are read
+
+
+class TRequestWithResponse(object):
+
+    def __init__(self, uri, kwargs):
+        self.uri = uri
+        self.kwargs = kwargs
+        self.response_ready = threading.Event()
+
+    def set_response(self, runtime, status, content):
+        self.status = status
+        self.runtime = runtime
+        self.content = content
+        self.response_ready.set()
+
+    def wait_for_response(self):
+        self.response_ready.wait()
+
+
+def queued_send(session, queue_messages):
+    """Pop from queue, Post and append result; repeat until empty."""
+    while 1:
+        try:
+            request = queue_messages.popleft()
+        except IndexError:  # nothing more to send
+            break
+        start = time.time()
+        response = AuthStandalone.Get_Using_Session(session, request.uri, **request.kwargs)
+        stop = time.time()
+        status = int(response.status_code)
+        content = repr(response.content)
+        runtime = stop - start
+        request.set_response((start, stop, runtime), status, content)
+
+
+def collect_results(request_list, response_queue):
+    for request in request_list:
+        request.wait_for_response()
+        response = (request.status, request.runtime, request.content)
+        response_queue.append(response)
+
+
+def watch_for_timeout(timeout, response_queue):
+    time.sleep(timeout)
+    response_queue.append((None, 'Time is up!'))
+
+
+def run_thread(thread_target, *thread_args):
+    thread = threading.Thread(target=thread_target, args=thread_args)
+    thread.daemon = True
+    thread.start()
+    return thread
+
+
+# Parse the command line arguments
+args = parse_arguments()
+
+# Construct the work for the workers.
+url_start = (
+    'config/'
+    "network-topology:network-topology/topology/topology-netconf/node/"
+    + args.name + "-"
+)
+url_end = "/yang-ext:mount"
+headers = {'Content-Type': 'application/xml', "Accept": "application/xml"}
+kwargs = {"headers": headers}
+requests = []
+for device_number in range(args.count):
+    device_url = url_start + str(device_number + 1) + url_end
+    request = TRequestWithResponse(device_url, kwargs)
+    requests.append(request)
+
+# Organize the work into the work queues.
+list_q_msg = [collections.deque() for _ in range(args.workers)]
+index = 0
+for request in requests:
+    queue = list_q_msg[index]
+    queue.append(request)
+    index += 1
+    if index == len(list_q_msg):
+        index = 0
+
+# Spawn the workers, giving each a queue.
+threads = []
+for queue_messages in list_q_msg:
+    session = AuthStandalone.Init_Session(args.odladdress, args.user, args.password, args.scope, args.reuse)
+    thread = run_thread(queued_send, session, queue_messages)
+    threads.append(thread)
+
+# Spawn the results collector worker
+responses = collections.deque()
+collector = run_thread(collect_results, requests, responses)
+
+# Spawn the watchdog thread
+watchdog = run_thread(watch_for_timeout, args.timeout, responses)
+
+# Watch the response queue, outputting the lines
+request_count = args.count
+while request_count > 0:
+    if len(responses) > 0:
+        result = responses.popleft()
+        if result[0] is None:
+            print "ERROR|" + result[1]+"|"
+            break
+        runtime = "%5.3f|%5.3f|%5.3f" % result[1]
+        print "%03d|%s|%s|" % (result[0], runtime, result[2])
+        request_count -= 1
+        continue
+    time.sleep(args.refresh)