Migrate Get Requests invocations(libraries)
[integration/test.git] / tools / exabgp_files / exarpc.py
1 #!/usr/bin/env python
2
3 import argparse
4 import binascii
5 import json
6 import logging
7 import os
8 import re
9 import select
10 import sys
11 import threading
12 from exabgp.bgp.message import Message
13 from SimpleXMLRPCServer import SimpleXMLRPCServer
14
15
16 class ExaStorage(dict):
17     """Thread safe dictionary
18
19     The object will serve as thread safe data storage.
20     It should be used with "with" statement.
21
22     The content of thet dict may be changed dynamically, but i'll use it as
23     {
24       "counters": { <msg type>: count, ...}
25       "messages": { <msg type>: [hex_string1, ]}
26     """
27
28     def __init__(self):
29         """Thread safe dictionary init"""
30         super(ExaStorage, self).__init__()
31         self._lock = threading.Lock()
32
33     def __enter__(self):
34         """Entry point of "with" statement"""
35         self._lock.acquire()
36         return self
37
38     def __exit__(self, type_, value, traceback):
39         """End point of "with" statement"""
40         self._lock.release()
41
42
43 class Rpcs(object):
44     """Handler for SimpleXMLRPCServer."""
45
46     def __init__(self, storage):
47         """Init method
48
49         Arguments:
50             :storage: thread safe dict
51         """
52         self.storage = storage
53
54     def _write(self, text):
55         """Pass commands from rpc server towards exabgp
56
57         Arguments:
58             :text: exabgp command
59         """
60         logging.debug('Command towards exabgp: {}'.format(text))
61         sys.stdout.write(text)
62         sys.stdout.write("\n")
63         sys.stdout.flush()
64         logging.debug('Connand flushed: {}.'.format(text))
65
66     def get_counter(self, msg_type):
67         """Gets counter value
68
69         Arguments:
70             :msg_type: message type which counter should be returned
71         Returns:
72             :cnt: counter value
73         """
74         logging.debug('get_counter rpc called, storage {}'.format(self.storage))
75         with self.storage as s:
76             if 'counters' not in s:
77                 return 0
78             cnt = 0 if msg_type not in s['counters'] else s['counters'][msg_type]
79         return cnt
80
81     def clean_counter(self, msg_type):
82         """Cleans counter
83
84         Arguments:
85             :msg_type: message type which counter should be cleaned
86         """
87
88         logging.debug('clean_counter rpc called, storage {}'.format(self.storage))
89         with self.storage as s:
90             if 'counters' not in s:
91                 return
92             if msg_type in s['counters']:
93                 del s['counters'][msg_type]
94
95     def get_message(self, msg_type):
96         """Gets last received message
97
98         Arguments:
99             :msg_type: message type which counter should be returned
100         Returns:
101             :msg: message
102         """
103         logging.debug('get_message {} rpc called, storage {}'.format(msg_type, self.storage))
104         with self.storage as s:
105             if 'messages' not in s:
106                 return None
107             msg = None if msg_type not in s['messages'] else s['messages'][msg_type]
108         return msg
109
110     def clean_message(self, msg_type):
111         """Removes stored message
112
113         Arguments:
114             :msg_type: message type which message should be cleaned
115         """
116
117         logging.debug('clean_message rpc called, storage {}'.format(self.storage))
118         with self.storage as s:
119             if 'messages' not in s:
120                 return
121             if msg_type in s['messages']:
122                 del s['messages'][msg_type]
123         return
124
125     def execute(self, exabgp_cmd):
126         """Execite given command on exabgp
127
128         Arguments:
129             :exabgp_cmd: command
130         """
131         logging.info('executing: {}.'.format(exabgp_cmd))
132         self._write(exabgp_cmd)
133
134
135 def decode_message(header, body):
136     """Decodes message
137
138     Arguments:
139         :header: hexstring of the header
140         :body: hexstring of the body
141     Returns:
142         :msg_type: message type
143         :msg: None (in the future some decoded data)
144     """
145     headbin = binascii.unhexlify(header)
146
147     msg_type = ord(headbin[18])
148     msg = None
149
150     return msg_type, msg
151
152
153 def _increment_counter(storage, key):
154     """Increments the counter for a message
155
156     Arguments:
157         :key: message type
158     """
159     with storage as s:
160         if 'counters' not in s:
161             s['counters'] = {}
162         if key not in s['counters']:
163             s['counters'][key] = 1
164         else:
165             s['counters'][key] += 1
166
167
168 def _store_last_received_message(storage, key, msg):
169     """Stores message under key.
170
171     Arguments:
172         :key: message type
173     """
174     with storage as s:
175         if 'messages' not in s:
176             s['messages'] = {}
177         s['messages'][key] = msg
178
179
180 def handle_open(storage, msg):
181     """Handles received bgp open message
182
183     - incements open counter
184
185     Arguments:
186         :msg: hex string of open body
187     """
188     logging.debug('Handling Open with storage {}'.format(storage))
189     _increment_counter(storage, 'open')
190
191
192 def handle_keepalive(storage, msg):
193     """Handles received bgp keepalive message
194
195     - incements keepalive counter
196
197     Arguments:
198         :msg: hex string of message body (in fact it is None)
199     """
200     logging.debug('Handling KeepAlive with storage {}'.format(storage))
201     _increment_counter(storage, 'keepalive')
202
203
204 def handle_update(storage, msg):
205     """Handles received bgp update message
206
207     - incements update counter
208
209     Arguments:
210         :msg: hex string of update body
211     """
212     logging.debug('Handling Update with storage {}'.format(storage))
213     _increment_counter(storage, 'update')
214
215
216 def handle_route_refresh(storage, msg):
217     """Handles received bgp route refresh message
218
219     - incements route refresh counter
220
221     Arguments:
222         :msg: hex string of route refresh body
223     """
224     logging.debug('Handling Route Refresh with storage {}'.format(storage))
225     _increment_counter(storage, 'route_refresh')
226
227
228 def handle_json_update(storage, jdata):
229     """Handles received json parsed bgp update message
230
231     - incements update counter
232
233     Arguments:
234         :jdata: json formated data of update message
235     """
236     logging.debug('Handling Json Update with storage {}'.format(storage))
237     _increment_counter(storage, 'update')
238     _store_last_received_message(storage, 'update', jdata)
239
240
241 def handle_json_state(storage, jdata):
242     """Handles received json state message
243
244     This is for future use. This information is not used/required/needed
245     at the moment.
246
247     Arguments:
248         :jdata: json formated data about connection/peer state
249     """
250     logging.debug('Handling Json State with storage {}'.format(storage))
251
252
253 def handle_json_refresh(storage, jdata):
254     """Handles received json route refresh message
255
256     This is for future use. This information is not used/required/needed
257     at the moment.
258
259     Arguments:
260         :jdata: json formated data about connection/peer state
261     """
262     logging.debug('Handling Json State with storage {}'.format(storage))
263     _increment_counter(storage, 'route_refresh')
264
265
266 def exa_msg_handler(storage, data, encoder):
267     """Handles incomming messages"""
268
269     if encoder == 'text':
270         if not ('neighbor' in data and 'header' in data and 'body' in data):
271             logging.debug('Ignoring received notification from exabgp: {}'.format(data))
272             return
273         restr = 'neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
274  (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?'
275         pat = re.compile(restr)
276         match = re.search(pat, data)
277         if match is None:
278             logging.warn('Unexpected data in this part, only bgp message expected. Received: {}.'.format(data))
279             return
280         msg_type, msg = decode_message(match.groupdict()['header'], match.groupdict()['body'])
281         if msg_type == Message.CODE.KEEPALIVE:
282             handle_keepalive(storage, msg)
283         elif msg_type == Message.CODE.OPEN:
284             handle_open(storage, msg)
285         elif msg_type == Message.CODE.UPDATE:
286             handle_update(storage, msg)
287         elif msg_type == Message.CODE.ROUTE_REFRESH:
288             handle_route_refresh(storage, msg)
289         else:
290             logging.warn('No handler function for msg_type: {}'.format(msg_type))
291     elif encoder == 'json':
292         try:
293             jdata = json.loads(data)
294         except Exception:
295             logging.error('Unable to parse, expected json, received: {}.'.format(data))
296             return
297         if jdata['type'] == 'state':
298             logging.debug('State info received: {}.'.format(data))
299             handle_json_state(storage, jdata)
300         elif jdata['type'] == 'update':
301             logging.debug('Update info received: {}.'.format(data))
302             handle_json_update(storage, jdata)
303         elif jdata['type'] == 'notification':
304             logging.debug('Notification info received: {}.'.format(data))
305         elif jdata['type'] == 'refresh':
306             logging.debug('Route refresh received: {}.'.format(data))
307             handle_json_refresh(storage, jdata)
308         else:
309             logging.error('Unexpected type for data: {}'.format(data))
310     else:
311         logging.error('Ignoring received data, unknown encoder: {}'.format(encoder))
312
313
314 def main(*argv):
315     """This script is used as i/o api for communication with exabgp
316
317     Arguments:
318         :*argv: unparsed cli arguments
319
320     In a separate thread an rpc server is started. This server will be used as an api towards the user.
321     Stdin and stdout are used for communication with exabgp.
322     """
323
324     parser = argparse.ArgumentParser(description='ExaBgp rpc server script')
325     parser.add_argument('--host', default='127.0.0.1', help='Host where exabgp is running (default is 127.0.0.1)')
326     parser.add_argument('--loglevel', default=logging.DEBUG, help='Log level')
327     parser.add_argument('--logfile', default='{}/exarpc.log'.format(os.path.dirname(os.path.abspath(__file__))),
328                         help='Log file name.')
329     parser.add_argument('--encoder', default='json', help='Exabgp encoder type')
330     in_args = parser.parse_args(*argv)
331     logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
332
333     storage = ExaStorage()
334     rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True)
335     rpcserver.register_instance(Rpcs(storage))
336     trpc = threading.Thread(target=rpcserver.serve_forever)
337     trpc.start()
338
339     epoll = select.epoll()
340
341     epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
342
343     try:
344         while True:
345             logging.debug('Epoll loop')
346             events = epoll.poll(10)
347             for fd, event_type in events:
348                 logging.debug('Epoll returned: {},{}'.format(fd, event_type))
349                 if event_type != select.EPOLLIN:
350                     raise Exception('Unexpected epoll event')
351                 else:
352                     data = sys.stdin.readline()
353                     logging.debug('Data recevied from exabgp: {}.'.format(data))
354                     exa_msg_handler(storage, data, in_args.encoder)
355     except Exception as e:
356         logging.warn('Exception occured: {}'.format(e))
357     finally:
358         rpcserver.shutdown()
359         trpc.join()
360
361
362 if __name__ == '__main__':
363     main()