1 """Multithreaded utility for rapid LSP updating.
3 This utility updates LSPs whose name correspond
4 to the ones created by pcc-mock tool.
5 The new state is hardcoded to contain one configurable hop
6 and final hop of "1.1.1.1/32".
7 AuthStandalone library is used to handle session and restconf authentication.
9 Number of workers is configurable, each worker
10 issues blocking restconf requests.
11 Work is distributed beforehand in round-robin fashion.
12 The utility waits for the last worker to finish, or for time to run off.
14 The responses are checked for status and content,
15 results are written to collections.Counter and printed at exit.
16 If collections does not contain Counter, "import Counter" is attempted.
18 It is advised to pin the python process to single CPU for optimal performance,
19 as Global Interpreter Lock prevents true utilization on more CPUs
20 (while overhead of context switching remains).
22 Remark: For early implementations, master process CPU suffered from overhead
23 of Queue suitable for multiprocessing, which put performance down
24 even when workers had more CPU for them.
25 But that may not be true for more mature implementation.
28 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
30 # This program and the accompanying materials are made available under the
31 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
32 # and is available at http://www.eclipse.org/legal/epl-v10.html
35 import collections # For deque and Counter.
40 from collections import Counter
44 __author__ = "Vratko Polak"
45 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
46 __license__ = "Eclipse Public License v1.0"
47 __email__ = "vrpolak@cisco.com"
51 """Utility converter, based on http://stackoverflow.com/a/19227287"""
52 return text.lower() in ("yes", "true", "y", "t", "1")
55 # Note: JSON data contains '"', so using "'" to quote Pythons strings.
56 parser = argparse.ArgumentParser()
57 parser.add_argument("--pccs", default="1", type=int, help="number of PCCs to simulate")
59 "--lsps", default="1", type=int, help="number of LSPs pre PCC to update"
62 "--workers", default="1", type=int, help="number of blocking https threads to use"
65 "--hop", default="2.2.2.2/32", help="ipv4 prefix (including /32) of hop to use"
68 "--timeout", default="300", type=float, help="seconds to bail out after"
74 help="seconds to sleep in main thread if nothing to do",
77 "--pccaddress", default="127.0.0.1", help="IP address of the first simulated PCC"
80 "--odladdress", default="127.0.0.1", help="IP address of ODL acting as PCE"
83 "--user", default="admin", help="Username for restconf authentication"
86 "--password", default="admin", help="Password for restconf authentication"
88 parser.add_argument("--scope", default="sdn", help="Scope for restconf authentication")
93 help="Should single requests session be re-used",
98 help='should delegate the lsp or set "false" if lsp not be delegated',
100 parser.add_argument("--pccip", default=None, help="IP address of the simulated PCC")
102 "--tunnelnumber", default=None, help="Tunnel Number for the simulated PCC"
104 args = parser.parse_args() # arguments are read
106 expected = """{"output":{}}"""
108 payload_list_data = [
119 ' "network-topology-ref":"/network-topology:network-topology/network-topology:topology',
120 '[network-topology:topology-id=\\"pcep-topology\\"]",',
125 ' ,"administrative":true',
141 ' "ip-prefix":"1.1.1.1/32"',
152 class CounterDown(object):
153 """Counter which also knows how many items are left to be added."""
155 def __init__(self, tasks):
156 self.counter = Counter()
159 def add(self, result):
160 self.counter[result] += 1
164 def iterable_msg(pccs, lsps, workers, hop, delegate):
165 """Generator yielding tuple of worker number and kwargs to post."""
166 first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
167 # Headers are constant, but it is easier to add them to kwargs in this generator.
168 headers = {"Content-Type": "application/json"}
169 # TODO: Perhaps external text file with Template? May affect performance.
170 list_data = payload_list_data
171 for lsp in range(1, lsps + 1):
173 list_data[8] = str_lsp # Replaces with new pointer.
174 for pcc in range(pccs):
175 pcc_ip = str(ipaddr.IPv4Address(first_pcc_int + pcc))
176 list_data[3] = pcc_ip
177 list_data[6] = pcc_ip
178 list_data[15] = delegate
180 whole_data = "".join(list_data)
181 worker = (lsp * pccs + pcc) % workers
182 post_kwargs = {"data": whole_data, "headers": headers}
183 yield worker, post_kwargs
186 def generate_payload_for_single_pcc(hop, delegate, pccip, tunnel_no):
187 """Generator yielding single kwargs to post."""
188 first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
189 # Headers are constant, but it is easier to add them to kwargs in this generator.
190 headers = {"Content-Type": "application/json"}
191 # TODO: Perhaps external text file with Template? May affect performance.
192 list_data = payload_list_data
193 if tunnel_no == "None":
196 str_lsp = str(tunnel_no)
197 list_data[8] = str_lsp # Replaces with new pointer.
199 pcc_ip = str(ipaddr.IPv4Address(first_pcc_int))
202 list_data[3] = pcc_ip
203 list_data[6] = pcc_ip
204 list_data[15] = delegate
206 whole_data = "".join(list_data)
208 post_kwargs = {"data": whole_data, "headers": headers}
210 yield worker, post_kwargs
213 def queued_send(session, queue_messages, queue_responses):
214 """Pop from queue, Post and append result; repeat until empty."""
215 uri = "operations/network-topology-pcep:update-lsp"
218 post_kwargs = queue_messages.popleft()
219 except IndexError: # nothing more to send
221 response = AuthStandalone.Post_Using_Session(session, uri, **post_kwargs)
222 # The response perhaps points to some data stored in session,
223 # and the session implementation may explicitly call close() to free that data.
224 # To be sure, we clone information before further processing.
225 status = int(response.status_code)
226 content = response.content.decode()
227 resp_tuple = (status, content)
228 queue_responses.append(resp_tuple)
231 def classify(resp_tuple):
232 """Return 'pass' or a reason what is wrong with response."""
234 status = resp_tuple[0]
235 if (status != 200) and (status != 204):
236 prepend = f"status: {status}"
237 content = resp_tuple[1]
238 if prepend or (content != expected and content != ""):
239 return f"{prepend} content: {content}"
244 list_q_msg = [collections.deque() for _ in range(args.workers)]
245 if args.pccip == "None":
246 for worker, post_kwargs in iterable_msg(
247 args.pccs, args.lsps, args.workers, args.hop, args.delegate
249 list_q_msg[worker].append(post_kwargs)
251 for worker, post_kwargs in generate_payload_for_single_pcc(
252 args.hop, args.delegate, args.pccip, args.tunnelnumber
254 list_q_msg[worker].append(post_kwargs)
255 queue_responses = collections.deque() # thread safe
257 for worker in range(args.workers):
258 session = AuthStandalone.Init_Session(
259 args.odladdress, args.user, args.password, args.scope, args.reuse
261 queue_messages = list_q_msg[worker]
262 thread_args = (session, queue_messages, queue_responses)
263 thread = threading.Thread(target=queued_send, args=thread_args)
265 threads.append(thread)
266 tasks = sum(map(len, list_q_msg)) # fancy way of counting, should equal to pccs*lsps.
267 counter = CounterDown(tasks)
268 print("work is going to start with %s tasks" % tasks)
269 time_start = time.time()
270 for thread in threads:
273 # time_result = time_start
275 """Main loop for reading and classifying responses, sleeps when there is nothing to process."""
276 timedelta_left = time_start + args.timeout - time.time()
277 if timedelta_left > 0:
278 while counter.opened > 0:
280 resp_tuple = queue_responses.popleft() # thread safe
283 result = classify(resp_tuple)
285 # time_now = time.time()
286 # timedelta_fromlast = time_now - time_result
287 # debug_msg = 'DEBUG: opened: ' + str(counter.opened)
288 # debug_msg += ' fromlast: ' + str(timedelta_fromlast)
289 # debug_list.append(debug_msg)
290 # time_result = time_now
291 if counter.opened > 0:
292 # debug_list.append('DEBUG: sleep ' + str(args.refresh))
293 time.sleep(args.refresh)
295 left = len(queue_responses)
297 print("error: more responses left inqueue", left)
300 left = len(queue_responses) # can be still increasing
301 for _ in range(left):
302 resp_tuple = queue_responses.popleft() # thread safe
303 result = classify(resp_tuple)
305 break # may leave late items in queue_reponses
306 time_stop = time.time()
307 timedelta_duration = time_stop - time_start
308 print("took", timedelta_duration)
309 print(repr(counter.counter))
310 # for message in debug_list: