fixing pep8 problems for test verify tox job
[integration/test.git] / tools / exabgp_files / exarpc.py
1 #!/usr/bin/env python
2
3 import argparse
4 import binascii
5 import json
6 import logging
7 import os
8 import re
9 import select
10 import sys
11 import threading
12 from exabgp.bgp.message import Message
13 from SimpleXMLRPCServer import SimpleXMLRPCServer
14
15
16 class ExaStorage(dict):
17     """Thread safe dictionary
18
19     The object will serve as thread safe data storage.
20     It should be used with "with" statement.
21
22     The content of thet dict may be changed dynamically, but i'll use it as
23     {
24       "counters": { <msg type>: count, ...}
25       "messages": { <msg type>: [hex_string1, ]}
26     """
27     def __init__(self):
28         """Thread safe dictionary init"""
29         super(ExaStorage, self).__init__()
30         self._lock = threading.Lock()
31
32     def __enter__(self):
33         """Entry point of "with" statement"""
34         self._lock.acquire()
35         return self
36
37     def __exit__(self, type_, value, traceback):
38         """End point of "with" statement"""
39         self._lock.release()
40
41
42 class Rpcs(object):
43     """Handler for SimpleXMLRPCServer."""
44
45     def __init__(self, storage):
46         """Init method
47
48         Arguments:
49             :storage: thread safe dict
50         """
51         self.storage = storage
52
53     def _write(self, text):
54         """Pass commands from rpc server towards exabgp
55
56         Arguments:
57             :text: exabgp command
58         """
59         logging.debug('Command towards exabgp: {}'.format(text))
60         sys.stdout.write(text)
61         sys.stdout.write("\n")
62         sys.stdout.flush()
63         logging.debug('Connand flushed: {}.'.format(text))
64
65     def get_counter(self, msg_type):
66         """Gets counter value
67
68         Arguments:
69             :msg_type: message type which counter should be returned
70         Returns:
71             :cnt: counter value
72         """
73         logging.debug('get_counter rpc called, storage {}'.format(self.storage))
74         with self.storage as s:
75             if 'counters' not in s:
76                 return 0
77             cnt = 0 if msg_type not in s['counters'] else s['counters'][msg_type]
78         return cnt
79
80     def clean_counter(self, msg_type):
81         """Cleans counter
82
83         Arguments:
84             :msg_type: message type which counter should be cleaned
85         """
86
87         logging.debug('clean_counter rpc called, storage {}'.format(self.storage))
88         with self.storage as s:
89             if 'counters' not in s:
90                 return
91             if msg_type in s['counters']:
92                 del s['counters'][msg_type]
93
94     def get_message(self, msg_type):
95         """Gets last received message
96
97         Arguments:
98             :msg_type: message type which counter should be returned
99         Returns:
100             :msg: message
101         """
102         logging.debug('get_message {} rpc called, storage {}'.format(msg_type, self.storage))
103         with self.storage as s:
104             if 'messages' not in s:
105                 return None
106             msg = None if msg_type not in s['messages'] else s['messages'][msg_type]
107         return msg
108
109     def clean_message(self, msg_type):
110         """Removes stored message
111
112         Arguments:
113             :msg_type: message type which message should be cleaned
114         """
115
116         logging.debug('clean_message rpc called, storage {}'.format(self.storage))
117         with self.storage as s:
118             if 'messages' not in s:
119                 return
120             if msg_type in s['messages']:
121                 del s['messages'][msg_type]
122         return
123
124     def execute(self, exabgp_cmd):
125         """Execite given command on exabgp
126
127         Arguments:
128             :exabgp_cmd: command
129         """
130         logging.info('executing: {}.'.format(exabgp_cmd))
131         self._write(exabgp_cmd)
132
133
134 def decode_message(header, body):
135     """Decodes message
136
137     Arguments:
138         :header: hexstring of the header
139         :body: hexstring of the body
140     Returns:
141         :msg_type: message type
142         :msg: None (in the future some decoded data)
143     """
144     headbin = binascii.unhexlify(header)
145
146     msg_type = ord(headbin[18])
147     msg = None
148
149     return msg_type, msg
150
151
152 def _increment_counter(storage, key):
153     """Increments the counter for a message
154
155     Arguments:
156         :key: message type
157     """
158     with storage as s:
159         if 'counters' not in s:
160             s['counters'] = {}
161         if key not in s['counters']:
162             s['counters'][key] = 1
163         else:
164             s['counters'][key] += 1
165
166
167 def _store_last_received_message(storage, key, msg):
168     """Stores message under key.
169
170     Arguments:
171         :key: message type
172     """
173     with storage as s:
174         if 'messages' not in s:
175             s['messages'] = {}
176         s['messages'][key] = msg
177
178
179 def handle_open(storage, msg):
180     """Handles received bgp open message
181
182     - incements open counter
183
184     Arguments:
185         :msg: hex string of open body
186     """
187     logging.debug('Handling Open with storage {}'.format(storage))
188     _increment_counter(storage, 'open')
189
190
191 def handle_keepalive(storage, msg):
192     """Handles received bgp keepalive message
193
194     - incements keepalive counter
195
196     Arguments:
197         :msg: hex string of message body (in fact it is None)
198     """
199     logging.debug('Handling KeepAlive with storage {}'.format(storage))
200     _increment_counter(storage, 'keepalive')
201
202
203 def handle_update(storage, msg):
204     """Handles received bgp update message
205
206     - incements update counter
207
208     Arguments:
209         :msg: hex string of update body
210     """
211     logging.debug('Handling Update with storage {}'.format(storage))
212     _increment_counter(storage, 'update')
213
214
215 def handle_route_refresh(storage, msg):
216     """Handles received bgp route refresh message
217
218     - incements route refresh counter
219
220     Arguments:
221         :msg: hex string of route refresh body
222     """
223     logging.debug('Handling Route Refresh with storage {}'.format(storage))
224     _increment_counter(storage, 'route_refresh')
225
226
227 def handle_json_update(storage, jdata):
228     """Handles received json parsed bgp update message
229
230     - incements update counter
231
232     Arguments:
233         :jdata: json formated data of update message
234     """
235     logging.debug('Handling Json Update with storage {}'.format(storage))
236     _increment_counter(storage, 'update')
237     _store_last_received_message(storage, 'update', jdata)
238
239
240 def handle_json_state(storage, jdata):
241     """Handles received json state message
242
243     This is for future use. This information is not used/required/needed
244     at the moment.
245
246     Arguments:
247         :jdata: json formated data about connection/peer state
248     """
249     logging.debug('Handling Json State with storage {}'.format(storage))
250
251
252 def handle_json_refresh(storage, jdata):
253     """Handles received json route refresh message
254
255     This is for future use. This information is not used/required/needed
256     at the moment.
257
258     Arguments:
259         :jdata: json formated data about connection/peer state
260     """
261     logging.debug('Handling Json State with storage {}'.format(storage))
262     _increment_counter(storage, 'route_refresh')
263
264
265 def exa_msg_handler(storage, data, encoder):
266     """Handles incomming messages"""
267
268     if encoder == 'text':
269         if not ('neighbor' in data and 'header' in data and 'body' in data):
270             logging.debug('Ignoring received notification from exabgp: {}'.format(data))
271             return
272         restr = 'neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
273  (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?'
274         pat = re.compile(restr)
275         match = re.search(pat, data)
276         if match is None:
277             logging.warn('Unexpected data in this part, only bgp message expected. Received: {}.'.format(data))
278             return
279         msg_type, msg = decode_message(match.groupdict()['header'], match.groupdict()['body'])
280         if msg_type == Message.CODE.KEEPALIVE:
281             handle_keepalive(storage, msg)
282         elif msg_type == Message.CODE.OPEN:
283             handle_open(storage, msg)
284         elif msg_type == Message.CODE.UPDATE:
285             handle_update(storage, msg)
286         elif msg_type == Message.CODE.ROUTE_REFRESH:
287             handle_route_refresh(storage, msg)
288         else:
289             logging.warn('No handler function for msg_type: {}'.format(msg_type))
290     elif encoder == 'json':
291         try:
292             jdata = json.loads(data)
293         except Exception:
294             logging.error('Unable to parse, expected json, received: {}.'.format(data))
295             return
296         if jdata['type'] == 'state':
297             logging.debug('State info received: {}.'.format(data))
298             handle_json_state(storage, jdata)
299         elif jdata['type'] == 'update':
300             logging.debug('Update info received: {}.'.format(data))
301             handle_json_update(storage, jdata)
302         elif jdata['type'] == 'notification':
303             logging.debug('Notification info received: {}.'.format(data))
304         elif jdata['type'] == 'refresh':
305             logging.debug('Route refresh received: {}.'.format(data))
306             handle_json_refresh(storage, jdata)
307         else:
308             logging.error('Unexpected type for data: {}'.format(data))
309     else:
310         logging.error('Ignoring received data, unknown encoder: {}'.format(encoder))
311
312
313 def main(*argv):
314     """This script is used as i/o api for communication with exabgp
315
316     Arguments:
317         :*argv: unparsed cli arguments
318
319     In a separate thread an rpc server is started. This server will be used as an api towards the user.
320     Stdin and stdout are used for communication with exabgp.
321     """
322
323     parser = argparse.ArgumentParser(description='ExaBgp rpc server script')
324     parser.add_argument('--host', default='127.0.0.1', help='Host where exabgp is running (default is 127.0.0.1)')
325     parser.add_argument('--loglevel', default=logging.DEBUG, help='Log level')
326     parser.add_argument('--logfile', default='{}/exarpc.log'.format(os.path.dirname(os.path.abspath(__file__))),
327                         help='Log file name.')
328     parser.add_argument('--encoder', default='json', help='Exabgp encoder type')
329     in_args = parser.parse_args(*argv)
330     logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
331
332     storage = ExaStorage()
333     rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True)
334     rpcserver.register_instance(Rpcs(storage))
335     trpc = threading.Thread(target=rpcserver.serve_forever)
336     trpc.start()
337
338     epoll = select.epoll()
339
340     epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
341
342     try:
343         while True:
344             logging.debug('Epoll loop')
345             events = epoll.poll(10)
346             for fd, event_type in events:
347                 logging.debug('Epoll returned: {},{}'.format(fd, event_type))
348                 if event_type != select.EPOLLIN:
349                     raise Exception('Unexpected epoll event')
350                 else:
351                     data = sys.stdin.readline()
352                     logging.debug('Data recevied from exabgp: {}.'.format(data))
353                     exa_msg_handler(storage, data, in_args.encoder)
354     except Exception as e:
355         logging.warn('Exception occured: {}'.format(e))
356     finally:
357         rpcserver.shutdown()
358         trpc.join()
359
360
361 if __name__ == '__main__':
362     main()