Updated code to match new rules
[integration/test.git] / tools / pcep_updater / updater.py
1 """Multithreaded utility for rapid LSP updating.
2
3 This utility updates LSPs whose name correspond
4 to the ones created by pcc-mock tool.
5 The new state is hardcoded to contain one configurable hop
6 and final hop of "1.1.1.1/32".
7 AuthStandalone library is used to handle session and restconf authentication.
8
9 Number of workers is configurable, each worker
10 issues blocking restconf requests.
11 Work is distributed beforehand in round-robin fashion.
12 The utility waits for the last worker to finish, or for time to run off.
13
14 The responses are checked for status and content,
15 results are written to collections.Counter and printed at exit.
16 If collections does not contain Counter, "import Counter" is attempted.
17
18 It is advised to pin the python process to single CPU for optimal performance,
19 as Global Interpreter Lock prevents true utilization on more CPUs
20 (while overhead of context switching remains).
21
22 Remark: For early implementations, master process CPU suffered from overhead
23 of Queue suitable for multiprocessing, which put performance down
24 even when workers had more CPU for them.
25 But that may not be true for more mature implementation.
26 """
27
28 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
29 #
30 # This program and the accompanying materials are made available under the
31 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
32 # and is available at http://www.eclipse.org/legal/epl-v10.html
33
34 import argparse
35 import collections  # For deque and Counter.
36 import ipaddr
37 import threading
38 import time
39 try:
40     from collections import Counter
41 except ImportError:  # Python 2.6 does not have Counter in collections.
42     import Counter  # Assumes that user copies Counter.py around.
43 import AuthStandalone
44
45
46 __author__ = "Vratko Polak"
47 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
48 __license__ = "Eclipse Public License v1.0"
49 __email__ = "vrpolak@cisco.com"
50
51
52 def str2bool(text):
53     """Utility converter, based on http://stackoverflow.com/a/19227287"""
54     return text.lower() in ("yes", "true", "y", "t", "1")
55
56
57 # Note: JSON data contains '"', so using "'" to quote Pythons strings.
58 parser = argparse.ArgumentParser()
59 parser.add_argument('--pccs', default='1', type=int,
60                     help='number of PCCs to simulate')
61 parser.add_argument('--lsps', default='1', type=int,
62                     help='number of LSPs pre PCC to update')
63 parser.add_argument('--workers', default='1', type=int,
64                     help='number of blocking https threads to use')
65 parser.add_argument('--hop', default='2.2.2.2/32',
66                     help='ipv4 prefix (including /32) of hop to use')
67 parser.add_argument('--timeout', default='300', type=float,
68                     help='seconds to bail out after')  # FIXME: grammar
69 parser.add_argument('--refresh', default='0.1', type=float,
70                     help='seconds to sleep in main thread if nothing to do')
71 parser.add_argument('--pccaddress', default='127.0.0.1',
72                     help='IP address of the first simulated PCC')
73 parser.add_argument('--odladdress', default='127.0.0.1',
74                     help='IP address of ODL acting as PCE')
75 parser.add_argument('--user', default='admin',
76                     help='Username for restconf authentication')
77 parser.add_argument('--password', default='admin',
78                     help='Password for restconf authentication')
79 parser.add_argument('--scope', default='sdn',
80                     help='Scope for restconf authentication')
81 parser.add_argument('--reuse', default='True', type=str2bool,
82                     help='Should single requests session be re-used')
83 args = parser.parse_args()  # arguments are read
84
85 expected = '''{"output":{}}'''
86
87
88 class CounterDown(object):
89     """Counter which also knows how many items are left to be added."""
90
91     def __init__(self, tasks):
92         self.counter = Counter.Counter()
93         self.opened = tasks
94
95     def add(self, result):
96         self.counter[result] += 1
97         self.opened -= 1
98
99
100 def iterable_msg(pccs, lsps, workers, hop):
101     """Generator yielding tuple of worker number and kwargs to post."""
102     first_pcc_int = int(ipaddr.IPv4Address(args.pccaddress))
103     # Headers are constant, but it is easier to add them to kwargs in this generator.
104     headers = {'Content-Type': 'application/json'}
105     # TODO: Perhaps external text file with Template? May affect performance.
106     list_data = [
107         '{"input":{"node":"pcc://', '', '",',
108         '"name":"pcc_', '', '_tunnel_', '', '","network-topology-ref":',
109         '"/network-topology:network-topology/network-topology:topology',
110         '[network-topology:topology-id=\\\"pcep-topology\\\"]",',
111         '"arguments":{"lsp":{"delegate":true,"administrative":true},',
112         '"ero":{"subobject":[{"loose":false,"ip-prefix":{"ip-prefix":',
113         '"', hop, '"}},{"loose":false,"ip-prefix":{"ip-prefix":',
114         '"1.1.1.1/32"}}]}}}}'
115     ]
116     for lsp in range(1, lsps + 1):
117         str_lsp = str(lsp)
118         list_data[6] = str_lsp  # Replaces with new pointer.
119         for pcc in range(pccs):
120             pcc_ip = str(ipaddr.IPv4Address(first_pcc_int + pcc))
121             list_data[1] = pcc_ip
122             list_data[4] = pcc_ip
123             whole_data = ''.join(list_data)
124             # print 'DEBUG:', whole_data + '\n'
125             worker = (lsp * pccs + pcc) % workers
126             post_kwargs = {"data": whole_data, "headers": headers}
127             yield worker, post_kwargs
128
129
130 def queued_send(session, queue_messages, queue_responses):
131     """Pop from queue, Post and append result; repeat until empty."""
132     uri = 'operations/network-topology-pcep:update-lsp'
133     while 1:
134         try:
135             post_kwargs = queue_messages.popleft()
136         except IndexError:  # nothing more to send
137             return
138         response = AuthStandalone.Post_Using_Session(session, uri, **post_kwargs)
139         # The response perhaps points to some data stored in session,
140         # and the session implementation may explicitly call close() to free that data.
141         # To be sure, we clone information before further processing.
142         status = int(response.status_code)  # copy constructor
143         content = str(response.content)  # copy constructor
144         resp_tuple = (status, content)
145         queue_responses.append(resp_tuple)
146
147
148 def classify(resp_tuple):
149     """Return 'pass' or a reason what is wrong with response."""
150     # print 'DEBUG: received', response
151     prepend = ''
152     status = resp_tuple[0]
153     # print 'DEBUG: verifying status', status
154     if status != 200:  # is it int?
155         # print 'DEBUG:', response.content
156         prepend = 'status: ' + str(status) + ' '
157     content = resp_tuple[1]
158     # print 'DEBUG: verifying content', content
159     if prepend or (content != expected):
160         return prepend + 'content: ' + str(content)
161     return 'pass'
162
163
164 # Main.
165 list_q_msg = [collections.deque() for _ in range(args.workers)]
166 for worker, post_kwargs in iterable_msg(args.pccs, args.lsps, args.workers, args.hop):
167     # print 'DEBUG: worker', repr(worker), 'message', repr(message)
168     list_q_msg[worker].append(post_kwargs)
169 queue_responses = collections.deque()  # thread safe
170 threads = []
171 for worker in range(args.workers):
172     session = AuthStandalone.Init_Session(args.odladdress, args.user, args.password, args.scope, args.reuse)
173     queue_messages = list_q_msg[worker]
174     thread_args = (session, queue_messages, queue_responses)
175     thread = threading.Thread(target=queued_send, args=thread_args)
176     thread.daemon = True
177     threads.append(thread)
178 tasks = sum(map(len, list_q_msg))  # fancy way of counting, should equal to pccs*lsps.
179 counter = CounterDown(tasks)
180 print 'work is going to start with', tasks, 'tasks'
181 time_start = time.time()
182 for thread in threads:
183     thread.start()
184 # debug_list = []
185 # time_result = time_start
186 while 1:
187     """Main loop for reading and classifying responses, sleeps when there is nothing to process."""
188     timedelta_left = time_start + args.timeout - time.time()
189     if timedelta_left > 0:
190         while counter.opened > 0:
191             try:
192                 resp_tuple = queue_responses.popleft()  # thread safe
193             except IndexError:
194                 break
195             result = classify(resp_tuple)
196             counter.add(result)
197             # time_now = time.time()
198             # timedelta_fromlast = time_now - time_result
199             # debug_msg = 'DEBUG: opened: ' + str(counter.opened)
200             # debug_msg += ' fromlast: ' + str(timedelta_fromlast)
201             # debug_list.append(debug_msg)
202             # time_result = time_now
203         if counter.opened > 0:
204             # debug_list.append('DEBUG: sleep ' + str(args.refresh))
205             time.sleep(args.refresh)
206             continue
207         left = len(queue_responses)
208         if left:
209             print 'error: more responses left inqueue', left
210     else:
211         print 'Time is up!'
212         left = len(queue_responses)  # can be still increasing
213         # if left:
214         #     print 'WARNING: left', left
215         for _ in range(left):
216             resp_tuple = queue_responses.popleft()  # thread safe
217             result = classify(resp_tuple)
218             counter.add(result)
219     break  # may leave late items in queue_reponses
220 time_stop = time.time()
221 timedelta_duration = time_stop - time_start
222 print 'took', timedelta_duration
223 print repr(counter.counter)
224 # for message in debug_list:
225 #     print message