1 """Multithreaded utility for rapid LSP updating.
3 This utility updates LSPs whose name correspond
4 to the ones created by pcc-mock tool.
5 The new state is hardcoded to contain one configurable hop
6 and final hop of "1.1.1.1/32".
7 AuthStandalone library is used to handle session and restconf authentication.
9 Number of workers is configurable, each worker
10 issues blocking restconf requests.
11 Work is distributed beforehand in round-robin fashion.
12 The utility waits for the last worker to finish, or for time to run off.
14 The responses are checked for status and content,
15 results are written to collections.Counter and printed at exit.
16 If collections does not contain Counter, "import Counter" is attempted.
18 It is advised to pin the python process to single CPU for optimal performance,
19 as Global Interpreter Lock prevents true utilization on more CPUs
20 (while overhead of context switching remains).
22 Remark: For early implementations, master process CPU suffered from overhead
23 of Queue suitable for multiprocessing, which put performance down
24 even when workers had more CPU for them.
25 But that may not be true for more mature implementation.
28 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
30 # This program and the accompanying materials are made available under the
31 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
32 # and is available at http://www.eclipse.org/legal/epl-v10.html
35 import collections # For deque and Counter.
40 from collections import Counter
41 except ImportError: # Python 2.6 does not have Counter in collections.
42 from Counter import Counter # Assumes that user copies Counter.py around.
46 __author__ = "Vratko Polak"
47 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
48 __license__ = "Eclipse Public License v1.0"
49 __email__ = "vrpolak@cisco.com"
53 """Utility converter, based on http://stackoverflow.com/a/19227287"""
54 return text.lower() in ("yes", "true", "y", "t", "1")
57 # Note: JSON data contains '"', so using "'" to quote Pythons strings.
58 parser = argparse.ArgumentParser()
59 parser.add_argument('--pccs', default='1', type=int,
60 help='number of PCCs to simulate')
61 parser.add_argument('--lsps', default='1', type=int,
62 help='number of LSPs pre PCC to update')
63 parser.add_argument('--workers', default='1', type=int,
64 help='number of blocking https threads to use')
65 parser.add_argument('--hop', default='2.2.2.2/32',
66 help='ipv4 prefix (including /32) of hop to use')
67 parser.add_argument('--timeout', default='300', type=float,
68 help='seconds to bail out after') # FIXME: grammar
69 parser.add_argument('--refresh', default='0.1', type=float,
70 help='seconds to sleep in main thread if nothing to do')
71 parser.add_argument('--pccaddress', default='127.0.0.1',
72 help='IP address of the first simulated PCC')
73 parser.add_argument('--odladdress', default='127.0.0.1',
74 help='IP address of ODL acting as PCE')
75 parser.add_argument('--user', default='admin',
76 help='Username for restconf authentication')
77 parser.add_argument('--password', default='admin',
78 help='Password for restconf authentication')
79 parser.add_argument('--scope', default='sdn',
80 help='Scope for restconf authentication')
81 parser.add_argument('--reuse', default='True', type=str2bool,
82 help='Should single requests session be re-used')
83 args = parser.parse_args() # arguments are read
85 expected = '''{"output":{}}'''
88 class CounterDown(object):
89 """Counter which also knows how many items are left to be added."""
91 def __init__(self, tasks):
92 self.counter = Counter()
95 def add(self, result):
96 self.counter[result] += 1
100 def iterable_msg(pccs, lsps, workers, hop):
101 """Generator yielding tuple of worker number and kwargs to post."""
102 first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
103 # Headers are constant, but it is easier to add them to kwargs in this generator.
104 headers = {'Content-Type': 'application/json'}
105 # TODO: Perhaps external text file with Template? May affect performance.
107 '{"input":{"node":"pcc://', '', '",',
108 '"name":"pcc_', '', '_tunnel_', '', '","network-topology-ref":',
109 '"/network-topology:network-topology/network-topology:topology',
110 '[network-topology:topology-id=\\\"pcep-topology\\\"]",',
111 '"arguments":{"lsp":{"delegate":true,"administrative":true},',
112 '"ero":{"subobject":[{"loose":false,"ip-prefix":{"ip-prefix":',
113 '"', hop, '"}},{"loose":false,"ip-prefix":{"ip-prefix":',
114 '"1.1.1.1/32"}}]}}}}'
116 for lsp in range(1, lsps + 1):
118 list_data[6] = str_lsp # Replaces with new pointer.
119 for pcc in range(pccs):
120 pcc_ip = str(ipaddr.IPv4Address(first_pcc_int + pcc))
121 list_data[1] = pcc_ip
122 list_data[4] = pcc_ip
123 whole_data = ''.join(list_data)
124 worker = (lsp * pccs + pcc) % workers
125 post_kwargs = {"data": whole_data, "headers": headers}
126 yield worker, post_kwargs
129 def queued_send(session, queue_messages, queue_responses):
130 """Pop from queue, Post and append result; repeat until empty."""
131 uri = 'operations/network-topology-pcep:update-lsp'
134 post_kwargs = queue_messages.popleft()
135 except IndexError: # nothing more to send
137 response = AuthStandalone.Post_Using_Session(session, uri, **post_kwargs)
138 # The response perhaps points to some data stored in session,
139 # and the session implementation may explicitly call close() to free that data.
140 # To be sure, we clone information before further processing.
141 status = int(response.status_code) # copy constructor
142 content = str(response.content) # copy constructor
143 resp_tuple = (status, content)
144 queue_responses.append(resp_tuple)
147 def classify(resp_tuple):
148 """Return 'pass' or a reason what is wrong with response."""
150 status = resp_tuple[0]
151 if (status != 200) and (status != 204): # is it int?
152 prepend = 'status: ' + str(status) + ' '
153 content = resp_tuple[1]
154 if prepend or (content != expected and content != ''):
155 return prepend + 'content: ' + str(content)
160 list_q_msg = [collections.deque() for _ in range(args.workers)]
161 for worker, post_kwargs in iterable_msg(args.pccs, args.lsps, args.workers, args.hop):
162 list_q_msg[worker].append(post_kwargs)
163 queue_responses = collections.deque() # thread safe
165 for worker in range(args.workers):
166 session = AuthStandalone.Init_Session(args.odladdress, args.user, args.password, args.scope, args.reuse)
167 queue_messages = list_q_msg[worker]
168 thread_args = (session, queue_messages, queue_responses)
169 thread = threading.Thread(target=queued_send, args=thread_args)
171 threads.append(thread)
172 tasks = sum(map(len, list_q_msg)) # fancy way of counting, should equal to pccs*lsps.
173 counter = CounterDown(tasks)
174 print('work is going to start with %s tasks' % tasks)
175 time_start = time.time()
176 for thread in threads:
179 # time_result = time_start
181 """Main loop for reading and classifying responses, sleeps when there is nothing to process."""
182 timedelta_left = time_start + args.timeout - time.time()
183 if timedelta_left > 0:
184 while counter.opened > 0:
186 resp_tuple = queue_responses.popleft() # thread safe
189 result = classify(resp_tuple)
191 # time_now = time.time()
192 # timedelta_fromlast = time_now - time_result
193 # debug_msg = 'DEBUG: opened: ' + str(counter.opened)
194 # debug_msg += ' fromlast: ' + str(timedelta_fromlast)
195 # debug_list.append(debug_msg)
196 # time_result = time_now
197 if counter.opened > 0:
198 # debug_list.append('DEBUG: sleep ' + str(args.refresh))
199 time.sleep(args.refresh)
201 left = len(queue_responses)
203 print('error: more responses left inqueue', left)
206 left = len(queue_responses) # can be still increasing
207 for _ in range(left):
208 resp_tuple = queue_responses.popleft() # thread safe
209 result = classify(resp_tuple)
211 break # may leave late items in queue_reponses
212 time_stop = time.time()
213 timedelta_duration = time_stop - time_start
214 print('took', timedelta_duration)
215 print(repr(counter.counter))
216 # for message in debug_list: