Fix bgpcep-1node-throughpcep
[integration/test.git] / tools / pcep_updater / updater.py
1 """Multithreaded utility for rapid LSP updating.
2
3 This utility updates LSPs whose name correspond
4 to the ones created by pcc-mock tool.
5 The new state is hardcoded to contain one configurable hop
6 and final hop of "1.1.1.1/32".
7 AuthStandalone library is used to handle session and restconf authentication.
8
9 Number of workers is configurable, each worker
10 issues blocking restconf requests.
11 Work is distributed beforehand in round-robin fashion.
12 The utility waits for the last worker to finish, or for time to run off.
13
14 The responses are checked for status and content,
15 results are written to collections.Counter and printed at exit.
16 If collections does not contain Counter, "import Counter" is attempted.
17
18 It is advised to pin the python process to single CPU for optimal performance,
19 as Global Interpreter Lock prevents true utilization on more CPUs
20 (while overhead of context switching remains).
21
22 Remark: For early implementations, master process CPU suffered from overhead
23 of Queue suitable for multiprocessing, which put performance down
24 even when workers had more CPU for them.
25 But that may not be true for more mature implementation.
26 """
27
28 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
29 #
30 # This program and the accompanying materials are made available under the
31 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
32 # and is available at http://www.eclipse.org/legal/epl-v10.html
33
34 import argparse
35 import collections  # For deque and Counter.
36 import ipaddr
37 import threading
38 import time
39
40 from collections import Counter
41 import AuthStandalone
42
43
44 __author__ = "Vratko Polak"
45 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
46 __license__ = "Eclipse Public License v1.0"
47 __email__ = "vrpolak@cisco.com"
48
49
50 def str2bool(text):
51     """Utility converter, based on http://stackoverflow.com/a/19227287"""
52     return text.lower() in ("yes", "true", "y", "t", "1")
53
54
55 # Note: JSON data contains '"', so using "'" to quote Pythons strings.
56 parser = argparse.ArgumentParser()
57 parser.add_argument("--pccs", default="1", type=int, help="number of PCCs to simulate")
58 parser.add_argument(
59     "--lsps", default="1", type=int, help="number of LSPs pre PCC to update"
60 )
61 parser.add_argument(
62     "--workers", default="1", type=int, help="number of blocking https threads to use"
63 )
64 parser.add_argument(
65     "--hop", default="2.2.2.2/32", help="ipv4 prefix (including /32) of hop to use"
66 )
67 parser.add_argument(
68     "--timeout", default="300", type=float, help="seconds to bail out after"
69 )  # FIXME: grammar
70 parser.add_argument(
71     "--refresh",
72     default="0.1",
73     type=float,
74     help="seconds to sleep in main thread if nothing to do",
75 )
76 parser.add_argument(
77     "--pccaddress", default="127.0.0.1", help="IP address of the first simulated PCC"
78 )
79 parser.add_argument(
80     "--odladdress", default="127.0.0.1", help="IP address of ODL acting as PCE"
81 )
82 parser.add_argument(
83     "--user", default="admin", help="Username for restconf authentication"
84 )
85 parser.add_argument(
86     "--password", default="admin", help="Password for restconf authentication"
87 )
88 parser.add_argument("--scope", default="sdn", help="Scope for restconf authentication")
89 parser.add_argument(
90     "--reuse",
91     default="True",
92     type=str2bool,
93     help="Should single requests session be re-used",
94 )
95 parser.add_argument(
96     "--delegate",
97     default="true",
98     help='should delegate the lsp or set "false" if lsp not be delegated',
99 )
100 parser.add_argument("--pccip", default=None, help="IP address of the simulated PCC")
101 parser.add_argument(
102     "--tunnelnumber", default=None, help="Tunnel Number for the simulated PCC"
103 )
104 args = parser.parse_args()  # arguments are read
105
106 expected = """{"output":{}}"""
107
108 payload_list_data = [
109     "{",
110     '   "input":{',
111     '       "node":"pcc://',
112     "",
113     '",',
114     '       "name":"pcc_',
115     "",
116     "_tunnel_",
117     "",
118     '",',
119     '       "network-topology-ref":"/network-topology:network-topology/network-topology:topology',
120     '[network-topology:topology-id=\\"pcep-topology\\"]",',
121     '       "arguments":{',
122     '           "lsp":{',
123     '           "delegate":',
124     "",
125     '           ,"administrative":true',
126     "},",
127     '"ero":{',
128     '   "subobject":[',
129     "       {",
130     '           "loose":false,',
131     '           "ip-prefix":{',
132     '                "ip-prefix":',
133     '"',
134     "",
135     '"',
136     "           }",
137     "       },",
138     "       {",
139     '           "loose":false,',
140     '           "ip-prefix":{',
141     '                "ip-prefix":"1.1.1.1/32"',
142     "            }",
143     "        }",
144     "        ]",
145     "       }",
146     "   }",
147     " }",
148     "}",
149 ]
150
151
152 class CounterDown(object):
153     """Counter which also knows how many items are left to be added."""
154
155     def __init__(self, tasks):
156         self.counter = Counter()
157         self.opened = tasks
158
159     def add(self, result):
160         self.counter[result] += 1
161         self.opened -= 1
162
163
164 def iterable_msg(pccs, lsps, workers, hop, delegate):
165     """Generator yielding tuple of worker number and kwargs to post."""
166     first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
167     # Headers are constant, but it is easier to add them to kwargs in this generator.
168     headers = {"Content-Type": "application/json"}
169     # TODO: Perhaps external text file with Template? May affect performance.
170     list_data = payload_list_data
171     for lsp in range(1, lsps + 1):
172         str_lsp = str(lsp)
173         list_data[8] = str_lsp  # Replaces with new pointer.
174         for pcc in range(pccs):
175             pcc_ip = str(ipaddr.IPv4Address(first_pcc_int + pcc))
176             list_data[3] = pcc_ip
177             list_data[6] = pcc_ip
178             list_data[15] = delegate
179             list_data[25] = hop
180             whole_data = "".join(list_data)
181             worker = (lsp * pccs + pcc) % workers
182             post_kwargs = {"data": whole_data, "headers": headers}
183             yield worker, post_kwargs
184
185
186 def generate_payload_for_single_pcc(hop, delegate, pccip, tunnel_no):
187     """Generator yielding single kwargs to post."""
188     first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
189     # Headers are constant, but it is easier to add them to kwargs in this generator.
190     headers = {"Content-Type": "application/json"}
191     # TODO: Perhaps external text file with Template? May affect performance.
192     list_data = payload_list_data
193     if tunnel_no == "None":
194         str_lsp = str(1)
195     else:
196         str_lsp = str(tunnel_no)
197     list_data[8] = str_lsp  # Replaces with new pointer.
198     if pccip == "None":
199         pcc_ip = str(ipaddr.IPv4Address(first_pcc_int))
200     else:
201         pcc_ip = pccip
202     list_data[3] = pcc_ip
203     list_data[6] = pcc_ip
204     list_data[15] = delegate
205     list_data[25] = hop
206     whole_data = "".join(list_data)
207     worker = 0
208     post_kwargs = {"data": whole_data, "headers": headers}
209     print(post_kwargs)
210     yield worker, post_kwargs
211
212
213 def queued_send(session, queue_messages, queue_responses):
214     """Pop from queue, Post and append result; repeat until empty."""
215     uri = "operations/network-topology-pcep:update-lsp"
216     while 1:
217         try:
218             post_kwargs = queue_messages.popleft()
219         except IndexError:  # nothing more to send
220             return
221         response = AuthStandalone.Post_Using_Session(session, uri, **post_kwargs)
222         # The response perhaps points to some data stored in session,
223         # and the session implementation may explicitly call close() to free that data.
224         # To be sure, we clone information before further processing.
225         status = int(response.status_code)
226         content = response.content.decode()
227         resp_tuple = (status, content)
228         queue_responses.append(resp_tuple)
229
230
231 def classify(resp_tuple):
232     """Return 'pass' or a reason what is wrong with response."""
233     prepend = ""
234     status = resp_tuple[0]
235     if (status != 200) and (status != 204):
236         prepend = f"status: {status}"
237     content = resp_tuple[1]
238     if prepend or (content != expected and content != ""):
239         return f"{prepend} content: {content}"
240     return "pass"
241
242
243 # Main.
244 list_q_msg = [collections.deque() for _ in range(args.workers)]
245 if args.pccip == "None":
246     for worker, post_kwargs in iterable_msg(
247         args.pccs, args.lsps, args.workers, args.hop, args.delegate
248     ):
249         list_q_msg[worker].append(post_kwargs)
250 else:
251     for worker, post_kwargs in generate_payload_for_single_pcc(
252         args.hop, args.delegate, args.pccip, args.tunnelnumber
253     ):
254         list_q_msg[worker].append(post_kwargs)
255 queue_responses = collections.deque()  # thread safe
256 threads = []
257 for worker in range(args.workers):
258     session = AuthStandalone.Init_Session(
259         args.odladdress, args.user, args.password, args.scope, args.reuse
260     )
261     queue_messages = list_q_msg[worker]
262     thread_args = (session, queue_messages, queue_responses)
263     thread = threading.Thread(target=queued_send, args=thread_args)
264     thread.daemon = True
265     threads.append(thread)
266 tasks = sum(map(len, list_q_msg))  # fancy way of counting, should equal to pccs*lsps.
267 counter = CounterDown(tasks)
268 print("work is going to start with %s tasks" % tasks)
269 time_start = time.time()
270 for thread in threads:
271     thread.start()
272 # debug_list = []
273 # time_result = time_start
274 while 1:
275     """Main loop for reading and classifying responses, sleeps when there is nothing to process."""
276     timedelta_left = time_start + args.timeout - time.time()
277     if timedelta_left > 0:
278         while counter.opened > 0:
279             try:
280                 resp_tuple = queue_responses.popleft()  # thread safe
281             except IndexError:
282                 break
283             result = classify(resp_tuple)
284             counter.add(result)
285             # time_now = time.time()
286             # timedelta_fromlast = time_now - time_result
287             # debug_msg = 'DEBUG: opened: ' + str(counter.opened)
288             # debug_msg += ' fromlast: ' + str(timedelta_fromlast)
289             # debug_list.append(debug_msg)
290             # time_result = time_now
291         if counter.opened > 0:
292             # debug_list.append('DEBUG: sleep ' + str(args.refresh))
293             time.sleep(args.refresh)
294             continue
295         left = len(queue_responses)
296         if left:
297             print("error: more responses left inqueue", left)
298     else:
299         print("Time is up!")
300         left = len(queue_responses)  # can be still increasing
301         for _ in range(left):
302             resp_tuple = queue_responses.popleft()  # thread safe
303             result = classify(resp_tuple)
304             counter.add(result)
305     break  # may leave late items in queue_reponses
306 time_stop = time.time()
307 timedelta_duration = time_stop - time_start
308 print("took", timedelta_duration)
309 print(repr(counter.counter))
310 # for message in debug_list:
311 #     print message