12 from exabgp.bgp.message import Message
13 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 class ExaStorage(dict):
17 """Thread safe dictionary
19 The object will serve as thread safe data storage.
20 It should be used with "with" statement.
22 The content of thet dict may be changed dynamically, but i'll use it as
24 "counters": { <msg type>: count, ...}
25 "messages": { <msg type>: [hex_string1, ]}
28 """Thread safe dictionary init"""
29 super(ExaStorage, self).__init__()
30 self._lock = threading.Lock()
33 """Entry point of "with" statement"""
37 def __exit__(self, type_, value, traceback):
38 """End point of "with" statement"""
43 """Handler for SimpleXMLRPCServer."""
45 def __init__(self, storage):
49 :storage: thread safe dict
51 self.storage = storage
53 def _write(self, text):
54 """Pass commands from rpc server towards exabgp
59 logging.debug('Command towards exabgp: {}'.format(text))
60 sys.stdout.write(text)
61 sys.stdout.write("\n")
63 logging.debug('Connand flushed: {}.'.format(text))
65 def get_counter(self, msg_type):
69 :msg_type: message type which counter should be returned
73 logging.debug('get_counter rpc called, storage {}'.format(self.storage))
74 with self.storage as s:
75 if 'counters' not in s:
77 cnt = 0 if msg_type not in s['counters'] else s['counters'][msg_type]
80 def clean_counter(self, msg_type):
84 :msg_type: message type which counter should be cleaned
87 logging.debug('clean_counter rpc called, storage {}'.format(self.storage))
88 with self.storage as s:
89 if 'counters' not in s:
91 if msg_type in s['counters']:
92 del s['counters'][msg_type]
94 def get_message(self, msg_type):
95 """Gets last received message
98 :msg_type: message type which counter should be returned
102 logging.debug('get_message {} rpc called, storage {}'.format(msg_type, self.storage))
103 with self.storage as s:
104 if 'messages' not in s:
106 msg = None if msg_type not in s['messages'] else s['messages'][msg_type]
109 def clean_message(self, msg_type):
110 """Removes stored message
113 :msg_type: message type which message should be cleaned
116 logging.debug('clean_message rpc called, storage {}'.format(self.storage))
117 with self.storage as s:
118 if 'messages' not in s:
120 if msg_type in s['messages']:
121 del s['messages'][msg_type]
124 def execute(self, exabgp_cmd):
125 """Execite given command on exabgp
130 logging.info('executing: {}.'.format(exabgp_cmd))
131 self._write(exabgp_cmd)
134 def decode_message(header, body):
138 :header: hexstring of the header
139 :body: hexstring of the body
141 :msg_type: message type
142 :msg: None (in the future some decoded data)
144 headbin = binascii.unhexlify(header)
146 msg_type = ord(headbin[18])
152 def _increment_counter(storage, key):
153 """Increments the counter for a message
159 if 'counters' not in s:
161 if key not in s['counters']:
162 s['counters'][key] = 1
164 s['counters'][key] += 1
167 def _store_last_received_message(storage, key, msg):
168 """Stores message under key.
174 if 'messages' not in s:
176 s['messages'][key] = msg
179 def handle_open(storage, msg):
180 """Handles received bgp open message
182 - incements open counter
185 :msg: hex string of open body
187 logging.debug('Handling Open with storage {}'.format(storage))
188 _increment_counter(storage, 'open')
191 def handle_keepalive(storage, msg):
192 """Handles received bgp keepalive message
194 - incements keepalive counter
197 :msg: hex string of message body (in fact it is None)
199 logging.debug('Handling KeepAlive with storage {}'.format(storage))
200 _increment_counter(storage, 'keepalive')
203 def handle_update(storage, msg):
204 """Handles received bgp update message
206 - incements update counter
209 :msg: hex string of update body
211 logging.debug('Handling Update with storage {}'.format(storage))
212 _increment_counter(storage, 'update')
215 def handle_route_refresh(storage, msg):
216 """Handles received bgp route refresh message
218 - incements route refresh counter
221 :msg: hex string of route refresh body
223 logging.debug('Handling Route Refresh with storage {}'.format(storage))
224 _increment_counter(storage, 'route_refresh')
227 def handle_json_update(storage, jdata):
228 """Handles received json parsed bgp update message
230 - incements update counter
233 :jdata: json formated data of update message
235 logging.debug('Handling Json Update with storage {}'.format(storage))
236 _increment_counter(storage, 'update')
237 _store_last_received_message(storage, 'update', jdata)
240 def handle_json_state(storage, jdata):
241 """Handles received json state message
243 This is for future use. This information is not used/required/needed
247 :jdata: json formated data about connection/peer state
249 logging.debug('Handling Json State with storage {}'.format(storage))
252 def handle_json_refresh(storage, jdata):
253 """Handles received json route refresh message
255 This is for future use. This information is not used/required/needed
259 :jdata: json formated data about connection/peer state
261 logging.debug('Handling Json State with storage {}'.format(storage))
262 _increment_counter(storage, 'route_refresh')
265 def exa_msg_handler(storage, data, encoder):
266 """Handles incomming messages"""
268 if encoder == 'text':
269 if not ('neighbor' in data and 'header' in data and 'body' in data):
270 logging.debug('Ignoring received notification from exabgp: {}'.format(data))
272 restr = 'neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
273 (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?'
274 pat = re.compile(restr)
275 match = re.search(pat, data)
277 logging.warn('Unexpected data in this part, only bgp message expected. Received: {}.'.format(data))
279 msg_type, msg = decode_message(match.groupdict()['header'], match.groupdict()['body'])
280 if msg_type == Message.CODE.KEEPALIVE:
281 handle_keepalive(storage, msg)
282 elif msg_type == Message.CODE.OPEN:
283 handle_open(storage, msg)
284 elif msg_type == Message.CODE.UPDATE:
285 handle_update(storage, msg)
286 elif msg_type == Message.CODE.ROUTE_REFRESH:
287 handle_route_refresh(storage, msg)
289 logging.warn('No handler function for msg_type: {}'.format(msg_type))
290 elif encoder == 'json':
292 jdata = json.loads(data)
294 logging.error('Unable to parse, expected json, received: {}.'.format(data))
296 if jdata['type'] == 'state':
297 logging.debug('State info received: {}.'.format(data))
298 handle_json_state(storage, jdata)
299 elif jdata['type'] == 'update':
300 logging.debug('Update info received: {}.'.format(data))
301 handle_json_update(storage, jdata)
302 elif jdata['type'] == 'notification':
303 logging.debug('Notification info received: {}.'.format(data))
304 elif jdata['type'] == 'refresh':
305 logging.debug('Route refresh received: {}.'.format(data))
306 handle_json_refresh(storage, jdata)
308 logging.error('Unexpected type for data: {}'.format(data))
310 logging.error('Ignoring received data, unknown encoder: {}'.format(encoder))
314 """This script is used as i/o api for communication with exabgp
317 :*argv: unparsed cli arguments
319 In a separate thread an rpc server is started. This server will be used as an api towards the user.
320 Stdin and stdout are used for communication with exabgp.
323 parser = argparse.ArgumentParser(description='ExaBgp rpc server script')
324 parser.add_argument('--host', default='127.0.0.1', help='Host where exabgp is running (default is 127.0.0.1)')
325 parser.add_argument('--loglevel', default=logging.DEBUG, help='Log level')
326 parser.add_argument('--logfile', default='{}/exarpc.log'.format(os.path.dirname(os.path.abspath(__file__))),
327 help='Log file name.')
328 parser.add_argument('--encoder', default='json', help='Exabgp encoder type')
329 in_args = parser.parse_args(*argv)
330 logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
332 storage = ExaStorage()
333 rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True)
334 rpcserver.register_instance(Rpcs(storage))
335 trpc = threading.Thread(target=rpcserver.serve_forever)
338 epoll = select.epoll()
340 epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
344 logging.debug('Epoll loop')
345 events = epoll.poll(10)
346 for fd, event_type in events:
347 logging.debug('Epoll returned: {},{}'.format(fd, event_type))
348 if event_type != select.EPOLLIN:
349 raise Exception('Unexpected epoll event')
351 data = sys.stdin.readline()
352 logging.debug('Data recevied from exabgp: {}.'.format(data))
353 exa_msg_handler(storage, data, in_args.encoder)
354 except Exception as e:
355 logging.warn('Exception occured: {}'.format(e))
360 if __name__ == '__main__':