Auto-generated patch by python-black
[integration/test.git] / tools / odl-mdsal-clustering-tests / clustering-performance-test / shard_perf_test.py
1 #!/usr/bin/python
2 from random import randrange
3 import json
4 import argparse
5 import time
6 import threading
7 import sys
8 import requests
9
10
11 __author__ = "Jan Medved"
12 __copyright__ = "Copyright(c) 2014, Cisco Systems, Inc."
13 __license__ = "New-style BSD"
14 __email__ = "jmedved@cisco.com"
15
16
17 class Counter(object):
18     def __init__(self, start=0):
19         self.lock = threading.Lock()
20         self.value = start
21
22     def increment(self, value=1):
23         self.lock.acquire()
24         val = self.value
25         try:
26             self.value += value
27         finally:
28             self.lock.release()
29         return val
30
31
32 class Timer(object):
33     def __init__(self, verbose=False):
34         self.verbose = verbose
35
36     def __enter__(self):
37         self.start = time.time()
38         return self
39
40     def __exit__(self, *args):
41         self.end = time.time()
42         self.secs = self.end - self.start
43         self.msecs = self.secs * 1000  # millisecs
44         if self.verbose:
45             print("elapsed time: %f ms" % self.msecs)
46
47
48 class ShardPerformanceTester(object):
49     """
50     The ShardPerformanceTester class facilitates performance testing of CDS shards. The test starts a number of
51     threads, where each thread issues a specified number of resource retrieval requests to URLs specified at the
52     beginning of a test. A ShardPerformanceTester object gets its connection parameters to the system-under-test
53     and its test parameters when it is instantiated.  The set of URLs from where to retrieve resources is
54     specified when a test is started. By passing in the appropriate URLs, the test can be used to test data
55     retrieval performance of different shards or different resources at different granularities, etc.
56     """
57
58     headers = {"Accept": "application/json"}
59
60     def __init__(self, host, port, auth, threads, nrequests, plevel):
61         """
62         """
63         self.host = host
64         self.port = port
65         self.auth = auth
66         self.requests = nrequests
67         self.threads = threads
68         self.plevel = plevel
69
70         self.print_lock = threading.Lock()
71         self.cond = threading.Condition()
72         self.threads_done = 0
73
74         self.ok_requests = 0
75         self.url_counters = []
76         self.total_rate = 0
77
78     def make_request(self, session, urls):
79         """
80         Makes a request for a resource at a random URL selected from a list of URLs passed as input parameter
81         :param session: Session to system under test
82         :param urls: List of resource URLs
83         :return: Status code from the resource request call
84         """
85         url_index = randrange(0, len(urls))
86         r_url = urls[url_index]
87         self.url_counters[url_index].increment()
88
89         if not self.auth:
90             r = session.get(r_url, headers=self.headers, stream=False)
91         else:
92             r = session.get(
93                 r_url, headers=self.headers, stream=False, auth=("admin", "admin")
94             )
95         return r.status_code
96
97     def worker(self, tid, urls):
98         """
99         Worker thread function. Connects to system-under-test and makes 'self.requests' requests for
100         resources to URLs randomly selected from 'urls'
101         :param tid: Worker thread ID
102         :param urls: List of resource URLs
103         :return: None
104         """
105         res = {200: 0}
106
107         s = requests.Session()
108
109         with self.print_lock:
110             print("    Thread %d: Performing %d requests" % (tid, self.requests))
111
112         with Timer() as t:
113             for r in range(self.requests):
114                 sts = self.make_request(s, urls)
115                 try:
116                     res[sts] += 1
117                 except KeyError:
118                     res[sts] = 1
119
120         ok_rate = res[200] / t.secs
121         total_rate = sum(res.values()) / t.secs
122
123         with self.print_lock:
124             print("Thread %d done:" % tid)
125             print("    Time: %.2f," % t.secs)
126             print("    Success rate:  %.2f, Total rate: %.2f" % (ok_rate, total_rate))
127             print("    Per-thread stats: ")
128             print(res)
129             self.threads_done += 1
130             self.total_rate += total_rate
131
132         s.close()
133
134         with self.cond:
135             self.cond.notifyAll()
136
137     def run_test(self, urls):
138         """
139         Runs the performance test. Starts 'self.threads' worker threads, waits for all of them to finish and
140         prints results.
141         :param urls: List of urls from which to request resources
142         :return: None
143         """
144
145         threads = []
146         self.total_rate = 0
147
148         # Initialize url counters
149         del self.url_counters[:]
150         for i in range(len(urls)):
151             self.url_counters.append(Counter(0))
152
153         # Start all worker threads
154         for i in range(self.threads):
155             t = threading.Thread(target=self.worker, args=(i, urls))
156             threads.append(t)
157             t.start()
158
159         # Wait for all threads to finish and measure the execution time
160         with Timer() as t:
161             while self.threads_done < self.threads:
162                 with self.cond:
163                     self.cond.wait()
164
165         # Print summary results. Each worker prints its owns results too.
166         print("\nSummary Results:")
167         print(
168             "    Requests/sec (total_sum): %.2f"
169             % ((self.threads * self.requests) / t.secs)
170         )
171         print(
172             "    Requests/sec (measured):  %.2f"
173             % ((self.threads * self.requests) / t.secs)
174         )
175         print("    Time: %.2f" % t.secs)
176         self.threads_done = 0
177
178         if self.plevel > 0:
179             print("    Per URL Counts: ")
180             for i in range(len(urls)):
181                 print("%d" % self.url_counters[i].value)
182             print("\n")
183
184
185 class TestUrlGenerator(object):
186     """
187     Base abstract class to generate test URLs for ShardPerformanceTester. First, an entire subtree representing
188     a shard or a set of resources is retrieved, then a set of URLS to access small data stanzas is created. This
189     class only defines the framework, the methods that create URL sets are defined in derived classes.
190     """
191
192     def __init__(self, host, port, auth):
193         """
194         Initialization
195         :param host: Controller's IP address
196         :param port: Controller's RESTCONF port
197         :param auth: Indicates whether to use authentication with default user/password (admin/admin)
198         :return: None
199         """
200         self.host = host
201         self.port = port
202         self.auth = auth
203         self.resource_string = ""
204
205     def url_generator(self, data):
206         """
207         Abstract  URL generator. Must be overridden in a derived class
208         :param data: Bulk resource data (JSON) from which to generate the URLs
209         :return: List of generated Resources
210         """
211         print(
212             "Abstract class '%s' should never be used standalone"
213             % (self.__class__.__name__)
214         )
215         return []
216
217     def generate(self):
218         """
219         Drives the generation of test URLs. First, it gets a 'bulk' resource (e.g. the entire inventory
220          or the entire topology) from the controller specified during int()  and then invokes a resource-specific
221          URL generator to create a set of resource-specific URLs.
222         """
223         t_url = "http://" + self.host + ":" + self.port + "/" + self.resource_string
224         headers = {"Accept": "application/json"}
225         r_url = []
226
227         if not self.auth:
228             r = requests.get(t_url, headers=headers, stream=False)
229         else:
230             r = requests.get(
231                 t_url, headers=headers, stream=False, auth=("admin", "admin")
232             )
233
234         if r.status_code != 200:
235             print(
236                 "Failed to get HTTP response from '%s', code %d"
237                 % ((t_url, r.status_code))
238             )
239         else:
240             try:
241                 r_url = self.url_generator(json.loads(r.content))
242             except Exception:
243                 print(
244                     "Failed to get json from '%s'. Please make sure you are connected to mininet."
245                     % (r_url)
246                 )
247
248         return r_url
249
250
251 class TopoUrlGenerator(TestUrlGenerator):
252     """
253     Class to generate test URLs from the topology shard.
254     :return: List of generated Resources
255     """
256
257     def __init__(self, host, port, auth):
258         TestUrlGenerator.__init__(self, host, port, auth)
259         self.resource_string = (
260             "restconf/operational/network-topology:network-topology/topology/flow:1"
261         )
262
263     def url_generator(self, topo_data):
264         url_list = []
265         try:
266             nodes = topo_data["topology"][0]["node"]
267             for node in nodes:
268                 tpoints = node["termination-point"]
269                 for tpoint in tpoints:
270                     t_url = (
271                         "http://"
272                         + self.host
273                         + ":"
274                         + self.port
275                         + "/restconf/operational/network-topology:network-topology/topology/flow:1/node/"
276                         + node["node-id"]
277                         + "/termination-point/"
278                         + tpoint["tp-id"]
279                     )
280                     url_list.append(t_url)
281             return url_list
282         except KeyError:
283             print("Error parsing topology json")
284             return []
285
286
287 class InvUrlGenerator(TestUrlGenerator):
288     """
289     Class to generate test URLs from the inventory shard.
290     """
291
292     def __init__(self, host, port, auth):
293         TestUrlGenerator.__init__(self, host, port, auth)
294         self.resource_string = "restconf/operational/opendaylight-inventory:nodes"
295
296     def url_generator(self, inv_data):
297         url_list = []
298         try:
299             nodes = inv_data["nodes"]["node"]
300             for node in nodes:
301                 nconns = node["node-connector"]
302                 for nconn in nconns:
303                     i_url = (
304                         "http://"
305                         + self.host
306                         + ":"
307                         + self.port
308                         + "/restconf/operational/opendaylight-inventory:nodes/node/"
309                         + node["id"]
310                         + "/node-connector/"
311                         + nconn["id"]
312                         + "/opendaylight-port-statistics:flow-capable-node-connector-statistics"
313                     )
314                     url_list.append(i_url)
315             return url_list
316         except KeyError:
317             print("Error parsing inventory json")
318             return []
319
320
321 if __name__ == "__main__":
322     parser = argparse.ArgumentParser(
323         description="Flow programming performance test: First adds and then deletes flows "
324         "into the config tree, as specified by optional parameters."
325     )
326
327     parser.add_argument(
328         "--host",
329         default="127.0.0.1",
330         help="Host where odl controller is running (default is 127.0.0.1)",
331     )
332     parser.add_argument(
333         "--port",
334         default="8181",
335         help="Port on which odl's RESTCONF is listening (default is 8181)",
336     )
337     parser.add_argument(
338         "--auth",
339         dest="auth",
340         action="store_true",
341         default=False,
342         help="Use the ODL default username/password 'admin'/'admin' to authenticate access to REST; "
343         "default: no authentication",
344     )
345     parser.add_argument(
346         "--threads",
347         type=int,
348         default=1,
349         help="Number of request worker threads to start in each cycle; default=1. ",
350     )
351     parser.add_argument(
352         "--requests",
353         type=int,
354         default=100,
355         help="Number of requests each worker thread will send to the controller; default=100.",
356     )
357     parser.add_argument(
358         "--resource",
359         choices=["inv", "topo", "topo+inv", "all"],
360         default="both",
361         help="Which resource to test: inventory, topology, or both; default both",
362     )
363     parser.add_argument(
364         "--plevel",
365         type=int,
366         default=0,
367         help="Print level: controls output verbosity. 0-lowest, 1-highest; default 0",
368     )
369     in_args = parser.parse_args()
370
371     topo_urls = []
372     inv_urls = []
373
374     # If required, get topology resource URLs
375     if in_args.resource != "inventory":
376         tg = TopoUrlGenerator(in_args.host, in_args.port, in_args.auth)
377         topo_urls += tg.generate()
378         if len(topo_urls) == 0:
379             print("Failed to generate topology URLs")
380             sys.exit(-1)
381
382     # If required, get inventory resource URLs
383     if in_args.resource != "topology":
384         ig = InvUrlGenerator(in_args.host, in_args.port, in_args.auth)
385         inv_urls += ig.generate()
386         if len(inv_urls) == 0:
387             print("Failed to generate inventory URLs")
388             sys.exit(-1)
389
390     if in_args.resource == "topo+inv" or in_args.resource == "all":
391         # To have balanced test results, the number of URLs for topology and inventory must be the same
392         if len(topo_urls) != len(inv_urls):
393             print("The number of topology and inventory URLs don't match")
394             sys.exit(-1)
395
396     st = ShardPerformanceTester(
397         in_args.host,
398         in_args.port,
399         in_args.auth,
400         in_args.threads,
401         in_args.requests,
402         in_args.plevel,
403     )
404
405     if in_args.resource == "all" or in_args.resource == "topo":
406         print("===================================")
407         print("Testing topology shard performance:")
408         print("===================================")
409         st.run_test(topo_urls)
410
411     if in_args.resource == "all" or in_args.resource == "inv":
412         print("====================================")
413         print("Testing inventory shard performance:")
414         print("====================================")
415         st.run_test(inv_urls)
416
417     if in_args.resource == "topo+inv" or in_args.resource == "all":
418         print("===============================================")
419         print("Testing combined shards (topo+inv) performance:")
420         print("===============================================")
421         st.run_test(topo_urls + inv_urls)