Fix bgpcep-1node-userfeatures tests
[integration/test.git] / tools / pcep_updater / updater.py
1 """Multithreaded utility for rapid LSP updating.
2
3 This utility updates LSPs whose name correspond
4 to the ones created by pcc-mock tool.
5 The new state is hardcoded to contain one configurable hop
6 and final hop of "1.1.1.1/32".
7 AuthStandalone library is used to handle session and restconf authentication.
8
9 Number of workers is configurable, each worker
10 issues blocking restconf requests.
11 Work is distributed beforehand in round-robin fashion.
12 The utility waits for the last worker to finish, or for time to run off.
13
14 The responses are checked for status and content,
15 results are written to collections.Counter and printed at exit.
16 If collections does not contain Counter, "import Counter" is attempted.
17
18 It is advised to pin the python process to single CPU for optimal performance,
19 as Global Interpreter Lock prevents true utilization on more CPUs
20 (while overhead of context switching remains).
21
22 Remark: For early implementations, master process CPU suffered from overhead
23 of Queue suitable for multiprocessing, which put performance down
24 even when workers had more CPU for them.
25 But that may not be true for more mature implementation.
26 """
27
28 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
29 #
30 # This program and the accompanying materials are made available under the
31 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
32 # and is available at http://www.eclipse.org/legal/epl-v10.html
33
34 import argparse
35 import collections  # For deque and Counter.
36 import ipaddr
37 import threading
38 import time
39
40 try:
41     from collections import Counter
42 except ImportError:  # Python 2.6 does not have Counter in collections.
43     from Counter import Counter  # Assumes that user copies Counter.py around.
44 import AuthStandalone
45
46
47 __author__ = "Vratko Polak"
48 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
49 __license__ = "Eclipse Public License v1.0"
50 __email__ = "vrpolak@cisco.com"
51
52
53 def str2bool(text):
54     """Utility converter, based on http://stackoverflow.com/a/19227287"""
55     return text.lower() in ("yes", "true", "y", "t", "1")
56
57
58 # Note: JSON data contains '"', so using "'" to quote Pythons strings.
59 parser = argparse.ArgumentParser()
60 parser.add_argument("--pccs", default="1", type=int, help="number of PCCs to simulate")
61 parser.add_argument(
62     "--lsps", default="1", type=int, help="number of LSPs pre PCC to update"
63 )
64 parser.add_argument(
65     "--workers", default="1", type=int, help="number of blocking https threads to use"
66 )
67 parser.add_argument(
68     "--hop", default="2.2.2.2/32", help="ipv4 prefix (including /32) of hop to use"
69 )
70 parser.add_argument(
71     "--timeout", default="300", type=float, help="seconds to bail out after"
72 )  # FIXME: grammar
73 parser.add_argument(
74     "--refresh",
75     default="0.1",
76     type=float,
77     help="seconds to sleep in main thread if nothing to do",
78 )
79 parser.add_argument(
80     "--pccaddress", default="127.0.0.1", help="IP address of the first simulated PCC"
81 )
82 parser.add_argument(
83     "--odladdress", default="127.0.0.1", help="IP address of ODL acting as PCE"
84 )
85 parser.add_argument(
86     "--user", default="admin", help="Username for restconf authentication"
87 )
88 parser.add_argument(
89     "--password", default="admin", help="Password for restconf authentication"
90 )
91 parser.add_argument("--scope", default="sdn", help="Scope for restconf authentication")
92 parser.add_argument(
93     "--reuse",
94     default="True",
95     type=str2bool,
96     help="Should single requests session be re-used",
97 )
98 parser.add_argument(
99     "--delegate",
100     default="true",
101     help='should delegate the lsp or set "false" if lsp not be delegated',
102 )
103 parser.add_argument("--pccip", default=None, help="IP address of the simulated PCC")
104 parser.add_argument(
105     "--tunnelnumber", default=None, help="Tunnel Number for the simulated PCC"
106 )
107 args = parser.parse_args()  # arguments are read
108
109 expected = """{"output":{}}"""
110
111 payload_list_data = [
112     "{",
113     '   "input":{',
114     '       "node":"pcc://',
115     "",
116     '",',
117     '       "name":"pcc_',
118     "",
119     "_tunnel_",
120     "",
121     '",',
122     '       "network-topology-ref":"/network-topology:network-topology/network-topology:topology',
123     '[network-topology:topology-id=\\"pcep-topology\\"]",',
124     '       "arguments":{',
125     '           "lsp":{',
126     '           "delegate":',
127     "",
128     '           ,"administrative":true',
129     "},",
130     '"ero":{',
131     '   "subobject":[',
132     "       {",
133     '           "loose":false,',
134     '           "ip-prefix":{',
135     '                "ip-prefix":',
136     '"',
137     "",
138     '"',
139     "           }",
140     "       },",
141     "       {",
142     '           "loose":false,',
143     '           "ip-prefix":{',
144     '                "ip-prefix":"1.1.1.1/32"',
145     "            }",
146     "        }",
147     "        ]",
148     "       }",
149     "   }",
150     " }",
151     "}",
152 ]
153
154
155 class CounterDown(object):
156     """Counter which also knows how many items are left to be added."""
157
158     def __init__(self, tasks):
159         self.counter = Counter()
160         self.opened = tasks
161
162     def add(self, result):
163         self.counter[result] += 1
164         self.opened -= 1
165
166
167 def iterable_msg(pccs, lsps, workers, hop, delegate):
168     """Generator yielding tuple of worker number and kwargs to post."""
169     first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
170     # Headers are constant, but it is easier to add them to kwargs in this generator.
171     headers = {"Content-Type": "application/json"}
172     # TODO: Perhaps external text file with Template? May affect performance.
173     list_data = payload_list_data
174     for lsp in range(1, lsps + 1):
175         str_lsp = str(lsp)
176         list_data[8] = str_lsp  # Replaces with new pointer.
177         for pcc in range(pccs):
178             pcc_ip = str(ipaddr.IPv4Address(first_pcc_int + pcc))
179             list_data[3] = pcc_ip
180             list_data[6] = pcc_ip
181             list_data[15] = delegate
182             list_data[25] = hop
183             whole_data = "".join(list_data)
184             worker = (lsp * pccs + pcc) % workers
185             post_kwargs = {"data": whole_data, "headers": headers}
186             yield worker, post_kwargs
187
188
189 def generate_payload_for_single_pcc(hop, delegate, pccip, tunnel_no):
190     """Generator yielding single kwargs to post."""
191     first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
192     # Headers are constant, but it is easier to add them to kwargs in this generator.
193     headers = {"Content-Type": "application/json"}
194     # TODO: Perhaps external text file with Template? May affect performance.
195     list_data = payload_list_data
196     if tunnel_no == "None":
197         str_lsp = str(1)
198     else:
199         str_lsp = str(tunnel_no)
200     list_data[8] = str_lsp  # Replaces with new pointer.
201     if pccip == "None":
202         pcc_ip = str(ipaddr.IPv4Address(first_pcc_int))
203     else:
204         pcc_ip = pccip
205     list_data[3] = pcc_ip
206     list_data[6] = pcc_ip
207     list_data[15] = delegate
208     list_data[25] = hop
209     whole_data = "".join(list_data)
210     worker = 0
211     post_kwargs = {"data": whole_data, "headers": headers}
212     print(post_kwargs)
213     yield worker, post_kwargs
214
215
216 def queued_send(session, queue_messages, queue_responses):
217     """Pop from queue, Post and append result; repeat until empty."""
218     uri = "operations/network-topology-pcep:update-lsp"
219     while 1:
220         try:
221             post_kwargs = queue_messages.popleft()
222         except IndexError:  # nothing more to send
223             return
224         response = AuthStandalone.Post_Using_Session(session, uri, **post_kwargs)
225         # The response perhaps points to some data stored in session,
226         # and the session implementation may explicitly call close() to free that data.
227         # To be sure, we clone information before further processing.
228         status = int(response.status_code)  # copy constructor
229         content = str(response.content)  # copy constructor
230         resp_tuple = (status, content)
231         queue_responses.append(resp_tuple)
232
233
234 def classify(resp_tuple):
235     """Return 'pass' or a reason what is wrong with response."""
236     prepend = ""
237     status = resp_tuple[0]
238     if (status != 200) and (status != 204):  # is it int?
239         prepend = "status: " + str(status) + " "
240     content = resp_tuple[1]
241     if prepend or (content != expected and content != ""):
242         return prepend + "content: " + str(content)
243     return "pass"
244
245
246 # Main.
247 list_q_msg = [collections.deque() for _ in range(args.workers)]
248 if args.pccip == "None":
249     for worker, post_kwargs in iterable_msg(
250         args.pccs, args.lsps, args.workers, args.hop, args.delegate
251     ):
252         list_q_msg[worker].append(post_kwargs)
253 else:
254     for worker, post_kwargs in generate_payload_for_single_pcc(
255         args.hop, args.delegate, args.pccip, args.tunnelnumber
256     ):
257         list_q_msg[worker].append(post_kwargs)
258 queue_responses = collections.deque()  # thread safe
259 threads = []
260 for worker in range(args.workers):
261     session = AuthStandalone.Init_Session(
262         args.odladdress, args.user, args.password, args.scope, args.reuse
263     )
264     queue_messages = list_q_msg[worker]
265     thread_args = (session, queue_messages, queue_responses)
266     thread = threading.Thread(target=queued_send, args=thread_args)
267     thread.daemon = True
268     threads.append(thread)
269 tasks = sum(map(len, list_q_msg))  # fancy way of counting, should equal to pccs*lsps.
270 counter = CounterDown(tasks)
271 print("work is going to start with %s tasks" % tasks)
272 time_start = time.time()
273 for thread in threads:
274     thread.start()
275 # debug_list = []
276 # time_result = time_start
277 while 1:
278     """Main loop for reading and classifying responses, sleeps when there is nothing to process."""
279     timedelta_left = time_start + args.timeout - time.time()
280     if timedelta_left > 0:
281         while counter.opened > 0:
282             try:
283                 resp_tuple = queue_responses.popleft()  # thread safe
284             except IndexError:
285                 break
286             result = classify(resp_tuple)
287             counter.add(result)
288             # time_now = time.time()
289             # timedelta_fromlast = time_now - time_result
290             # debug_msg = 'DEBUG: opened: ' + str(counter.opened)
291             # debug_msg += ' fromlast: ' + str(timedelta_fromlast)
292             # debug_list.append(debug_msg)
293             # time_result = time_now
294         if counter.opened > 0:
295             # debug_list.append('DEBUG: sleep ' + str(args.refresh))
296             time.sleep(args.refresh)
297             continue
298         left = len(queue_responses)
299         if left:
300             print("error: more responses left inqueue", left)
301     else:
302         print("Time is up!")
303         left = len(queue_responses)  # can be still increasing
304         for _ in range(left):
305             resp_tuple = queue_responses.popleft()  # thread safe
306             result = classify(resp_tuple)
307             counter.add(result)
308     break  # may leave late items in queue_reponses
309 time_stop = time.time()
310 timedelta_duration = time_stop - time_start
311 print("took", timedelta_duration)
312 print(repr(counter.counter))
313 # for message in debug_list:
314 #     print message