1 """Multithreaded utility for rapid LSP updating.
3 This utility updates LSPs whose name correspond
4 to the ones created by pcc-mock tool.
5 The new state is hardcoded to contain one configurable hop
6 and final hop of "1.1.1.1/32".
7 AuthStandalone library is used to handle session and restconf authentication.
9 Number of workers is configurable, each worker
10 issues blocking restconf requests.
11 Work is distributed beforehand in round-robin fashion.
12 The utility waits for the last worker to finish, or for time to run off.
14 The responses are checked for status and content,
15 results are written to collections.Counter and printed at exit.
16 If collections does not contain Counter, "import Counter" is attempted.
18 It is advised to pin the python process to single CPU for optimal performance,
19 as Global Interpreter Lock prevents true utilization on more CPUs
20 (while overhead of context switching remains).
22 Remark: For early implementations, master process CPU suffered from overhead
23 of Queue suitable for multiprocessing, which put performance down
24 even when workers had more CPU for them.
25 But that may not be true for more mature implementation.
28 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
30 # This program and the accompanying materials are made available under the
31 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
32 # and is available at http://www.eclipse.org/legal/epl-v10.html
35 import collections # For deque and Counter.
41 from collections import Counter
42 except ImportError: # Python 2.6 does not have Counter in collections.
43 from Counter import Counter # Assumes that user copies Counter.py around.
47 __author__ = "Vratko Polak"
48 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
49 __license__ = "Eclipse Public License v1.0"
50 __email__ = "vrpolak@cisco.com"
54 """Utility converter, based on http://stackoverflow.com/a/19227287"""
55 return text.lower() in ("yes", "true", "y", "t", "1")
58 # Note: JSON data contains '"', so using "'" to quote Pythons strings.
59 parser = argparse.ArgumentParser()
60 parser.add_argument("--pccs", default="1", type=int, help="number of PCCs to simulate")
62 "--lsps", default="1", type=int, help="number of LSPs pre PCC to update"
65 "--workers", default="1", type=int, help="number of blocking https threads to use"
68 "--hop", default="2.2.2.2/32", help="ipv4 prefix (including /32) of hop to use"
71 "--timeout", default="300", type=float, help="seconds to bail out after"
77 help="seconds to sleep in main thread if nothing to do",
80 "--pccaddress", default="127.0.0.1", help="IP address of the first simulated PCC"
83 "--odladdress", default="127.0.0.1", help="IP address of ODL acting as PCE"
86 "--user", default="admin", help="Username for restconf authentication"
89 "--password", default="admin", help="Password for restconf authentication"
91 parser.add_argument("--scope", default="sdn", help="Scope for restconf authentication")
96 help="Should single requests session be re-used",
101 help='should delegate the lsp or set "false" if lsp not be delegated',
103 parser.add_argument("--pccip", default=None, help="IP address of the simulated PCC")
105 "--tunnelnumber", default=None, help="Tunnel Number for the simulated PCC"
107 args = parser.parse_args() # arguments are read
109 expected = """{"output":{}}"""
111 payload_list_data = [
122 ' "network-topology-ref":"/network-topology:network-topology/network-topology:topology',
123 '[network-topology:topology-id=\\"pcep-topology\\"]",',
128 ' ,"administrative":true',
144 ' "ip-prefix":"1.1.1.1/32"',
155 class CounterDown(object):
156 """Counter which also knows how many items are left to be added."""
158 def __init__(self, tasks):
159 self.counter = Counter()
162 def add(self, result):
163 self.counter[result] += 1
167 def iterable_msg(pccs, lsps, workers, hop, delegate):
168 """Generator yielding tuple of worker number and kwargs to post."""
169 first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
170 # Headers are constant, but it is easier to add them to kwargs in this generator.
171 headers = {"Content-Type": "application/json"}
172 # TODO: Perhaps external text file with Template? May affect performance.
173 list_data = payload_list_data
174 for lsp in range(1, lsps + 1):
176 list_data[8] = str_lsp # Replaces with new pointer.
177 for pcc in range(pccs):
178 pcc_ip = str(ipaddr.IPv4Address(first_pcc_int + pcc))
179 list_data[3] = pcc_ip
180 list_data[6] = pcc_ip
181 list_data[15] = delegate
183 whole_data = "".join(list_data)
184 worker = (lsp * pccs + pcc) % workers
185 post_kwargs = {"data": whole_data, "headers": headers}
186 yield worker, post_kwargs
189 def generate_payload_for_single_pcc(hop, delegate, pccip, tunnel_no):
190 """Generator yielding single kwargs to post."""
191 first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
192 # Headers are constant, but it is easier to add them to kwargs in this generator.
193 headers = {"Content-Type": "application/json"}
194 # TODO: Perhaps external text file with Template? May affect performance.
195 list_data = payload_list_data
196 if tunnel_no == "None":
199 str_lsp = str(tunnel_no)
200 list_data[8] = str_lsp # Replaces with new pointer.
202 pcc_ip = str(ipaddr.IPv4Address(first_pcc_int))
205 list_data[3] = pcc_ip
206 list_data[6] = pcc_ip
207 list_data[15] = delegate
209 whole_data = "".join(list_data)
211 post_kwargs = {"data": whole_data, "headers": headers}
213 yield worker, post_kwargs
216 def queued_send(session, queue_messages, queue_responses):
217 """Pop from queue, Post and append result; repeat until empty."""
218 uri = "operations/network-topology-pcep:update-lsp"
221 post_kwargs = queue_messages.popleft()
222 except IndexError: # nothing more to send
224 response = AuthStandalone.Post_Using_Session(session, uri, **post_kwargs)
225 # The response perhaps points to some data stored in session,
226 # and the session implementation may explicitly call close() to free that data.
227 # To be sure, we clone information before further processing.
228 status = int(response.status_code) # copy constructor
229 content = str(response.content) # copy constructor
230 resp_tuple = (status, content)
231 queue_responses.append(resp_tuple)
234 def classify(resp_tuple):
235 """Return 'pass' or a reason what is wrong with response."""
237 status = resp_tuple[0]
238 if (status != 200) and (status != 204): # is it int?
239 prepend = "status: " + str(status) + " "
240 content = resp_tuple[1]
241 if prepend or (content != expected and content != ""):
242 return prepend + "content: " + str(content)
247 list_q_msg = [collections.deque() for _ in range(args.workers)]
248 if args.pccip == "None":
249 for worker, post_kwargs in iterable_msg(
250 args.pccs, args.lsps, args.workers, args.hop, args.delegate
252 list_q_msg[worker].append(post_kwargs)
254 for worker, post_kwargs in generate_payload_for_single_pcc(
255 args.hop, args.delegate, args.pccip, args.tunnelnumber
257 list_q_msg[worker].append(post_kwargs)
258 queue_responses = collections.deque() # thread safe
260 for worker in range(args.workers):
261 session = AuthStandalone.Init_Session(
262 args.odladdress, args.user, args.password, args.scope, args.reuse
264 queue_messages = list_q_msg[worker]
265 thread_args = (session, queue_messages, queue_responses)
266 thread = threading.Thread(target=queued_send, args=thread_args)
268 threads.append(thread)
269 tasks = sum(map(len, list_q_msg)) # fancy way of counting, should equal to pccs*lsps.
270 counter = CounterDown(tasks)
271 print("work is going to start with %s tasks" % tasks)
272 time_start = time.time()
273 for thread in threads:
276 # time_result = time_start
278 """Main loop for reading and classifying responses, sleeps when there is nothing to process."""
279 timedelta_left = time_start + args.timeout - time.time()
280 if timedelta_left > 0:
281 while counter.opened > 0:
283 resp_tuple = queue_responses.popleft() # thread safe
286 result = classify(resp_tuple)
288 # time_now = time.time()
289 # timedelta_fromlast = time_now - time_result
290 # debug_msg = 'DEBUG: opened: ' + str(counter.opened)
291 # debug_msg += ' fromlast: ' + str(timedelta_fromlast)
292 # debug_list.append(debug_msg)
293 # time_result = time_now
294 if counter.opened > 0:
295 # debug_list.append('DEBUG: sleep ' + str(args.refresh))
296 time.sleep(args.refresh)
298 left = len(queue_responses)
300 print("error: more responses left inqueue", left)
303 left = len(queue_responses) # can be still increasing
304 for _ in range(left):
305 resp_tuple = queue_responses.popleft() # thread safe
306 result = classify(resp_tuple)
308 break # may leave late items in queue_reponses
309 time_stop = time.time()
310 timedelta_duration = time_stop - time_start
311 print("took", timedelta_duration)
312 print(repr(counter.counter))
313 # for message in debug_list: