12 from exabgp.bgp.message import Message
13 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 class ExaStorage(dict):
17 """Thread safe dictionary
19 The object will serve as thread safe data storage.
20 It should be used with "with" statement.
22 The content of thet dict may be changed dynamically, but i'll use it as
24 "counters": { <msg type>: count, ...}
25 "messages": { <msg type>: [hex_string1, ]}
29 """Thread safe dictionary init"""
30 super(ExaStorage, self).__init__()
31 self._lock = threading.Lock()
34 """Entry point of "with" statement"""
38 def __exit__(self, type_, value, traceback):
39 """End point of "with" statement"""
44 """Handler for SimpleXMLRPCServer."""
46 def __init__(self, storage):
50 :storage: thread safe dict
52 self.storage = storage
54 def _write(self, text):
55 """Pass commands from rpc server towards exabgp
60 logging.debug('Command towards exabgp: {}'.format(text))
61 sys.stdout.write(text)
62 sys.stdout.write("\n")
64 logging.debug('Connand flushed: {}.'.format(text))
66 def get_counter(self, msg_type):
70 :msg_type: message type which counter should be returned
74 logging.debug('get_counter rpc called, storage {}'.format(self.storage))
75 with self.storage as s:
76 if 'counters' not in s:
78 cnt = 0 if msg_type not in s['counters'] else s['counters'][msg_type]
81 def clean_counter(self, msg_type):
85 :msg_type: message type which counter should be cleaned
88 logging.debug('clean_counter rpc called, storage {}'.format(self.storage))
89 with self.storage as s:
90 if 'counters' not in s:
92 if msg_type in s['counters']:
93 del s['counters'][msg_type]
95 def get_message(self, msg_type):
96 """Gets last received message
99 :msg_type: message type which counter should be returned
103 logging.debug('get_message {} rpc called, storage {}'.format(msg_type, self.storage))
104 with self.storage as s:
105 if 'messages' not in s:
107 msg = None if msg_type not in s['messages'] else s['messages'][msg_type]
110 def clean_message(self, msg_type):
111 """Removes stored message
114 :msg_type: message type which message should be cleaned
117 logging.debug('clean_message rpc called, storage {}'.format(self.storage))
118 with self.storage as s:
119 if 'messages' not in s:
121 if msg_type in s['messages']:
122 del s['messages'][msg_type]
125 def execute(self, exabgp_cmd):
126 """Execite given command on exabgp
131 logging.info('executing: {}.'.format(exabgp_cmd))
132 self._write(exabgp_cmd)
135 def decode_message(header, body):
139 :header: hexstring of the header
140 :body: hexstring of the body
142 :msg_type: message type
143 :msg: None (in the future some decoded data)
145 headbin = binascii.unhexlify(header)
147 msg_type = ord(headbin[18])
153 def _increment_counter(storage, key):
154 """Increments the counter for a message
160 if 'counters' not in s:
162 if key not in s['counters']:
163 s['counters'][key] = 1
165 s['counters'][key] += 1
168 def _store_last_received_message(storage, key, msg):
169 """Stores message under key.
175 if 'messages' not in s:
177 s['messages'][key] = msg
180 def handle_open(storage, msg):
181 """Handles received bgp open message
183 - incements open counter
186 :msg: hex string of open body
188 logging.debug('Handling Open with storage {}'.format(storage))
189 _increment_counter(storage, 'open')
192 def handle_keepalive(storage, msg):
193 """Handles received bgp keepalive message
195 - incements keepalive counter
198 :msg: hex string of message body (in fact it is None)
200 logging.debug('Handling KeepAlive with storage {}'.format(storage))
201 _increment_counter(storage, 'keepalive')
204 def handle_update(storage, msg):
205 """Handles received bgp update message
207 - incements update counter
210 :msg: hex string of update body
212 logging.debug('Handling Update with storage {}'.format(storage))
213 _increment_counter(storage, 'update')
216 def handle_route_refresh(storage, msg):
217 """Handles received bgp route refresh message
219 - incements route refresh counter
222 :msg: hex string of route refresh body
224 logging.debug('Handling Route Refresh with storage {}'.format(storage))
225 _increment_counter(storage, 'route_refresh')
228 def handle_json_update(storage, jdata):
229 """Handles received json parsed bgp update message
231 - incements update counter
234 :jdata: json formated data of update message
236 logging.debug('Handling Json Update with storage {}'.format(storage))
237 _increment_counter(storage, 'update')
238 _store_last_received_message(storage, 'update', jdata)
241 def handle_json_state(storage, jdata):
242 """Handles received json state message
244 This is for future use. This information is not used/required/needed
248 :jdata: json formated data about connection/peer state
250 logging.debug('Handling Json State with storage {}'.format(storage))
253 def handle_json_refresh(storage, jdata):
254 """Handles received json route refresh message
256 This is for future use. This information is not used/required/needed
260 :jdata: json formated data about connection/peer state
262 logging.debug('Handling Json State with storage {}'.format(storage))
263 _increment_counter(storage, 'route_refresh')
266 def exa_msg_handler(storage, data, encoder):
267 """Handles incomming messages"""
269 if encoder == 'text':
270 if not ('neighbor' in data and 'header' in data and 'body' in data):
271 logging.debug('Ignoring received notification from exabgp: {}'.format(data))
273 restr = 'neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
274 (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?'
275 pat = re.compile(restr)
276 match = re.search(pat, data)
278 logging.warn('Unexpected data in this part, only bgp message expected. Received: {}.'.format(data))
280 msg_type, msg = decode_message(match.groupdict()['header'], match.groupdict()['body'])
281 if msg_type == Message.CODE.KEEPALIVE:
282 handle_keepalive(storage, msg)
283 elif msg_type == Message.CODE.OPEN:
284 handle_open(storage, msg)
285 elif msg_type == Message.CODE.UPDATE:
286 handle_update(storage, msg)
287 elif msg_type == Message.CODE.ROUTE_REFRESH:
288 handle_route_refresh(storage, msg)
290 logging.warn('No handler function for msg_type: {}'.format(msg_type))
291 elif encoder == 'json':
293 jdata = json.loads(data)
295 logging.error('Unable to parse, expected json, received: {}.'.format(data))
297 if jdata['type'] == 'state':
298 logging.debug('State info received: {}.'.format(data))
299 handle_json_state(storage, jdata)
300 elif jdata['type'] == 'update':
301 logging.debug('Update info received: {}.'.format(data))
302 handle_json_update(storage, jdata)
303 elif jdata['type'] == 'notification':
304 logging.debug('Notification info received: {}.'.format(data))
305 elif jdata['type'] == 'refresh':
306 logging.debug('Route refresh received: {}.'.format(data))
307 handle_json_refresh(storage, jdata)
309 logging.error('Unexpected type for data: {}'.format(data))
311 logging.error('Ignoring received data, unknown encoder: {}'.format(encoder))
315 """This script is used as i/o api for communication with exabgp
318 :*argv: unparsed cli arguments
320 In a separate thread an rpc server is started. This server will be used as an api towards the user.
321 Stdin and stdout are used for communication with exabgp.
324 parser = argparse.ArgumentParser(description='ExaBgp rpc server script')
325 parser.add_argument('--host', default='127.0.0.1', help='Host where exabgp is running (default is 127.0.0.1)')
326 parser.add_argument('--loglevel', default=logging.DEBUG, help='Log level')
327 parser.add_argument('--logfile', default='{}/exarpc.log'.format(os.path.dirname(os.path.abspath(__file__))),
328 help='Log file name.')
329 parser.add_argument('--encoder', default='json', help='Exabgp encoder type')
330 in_args = parser.parse_args(*argv)
331 logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
333 storage = ExaStorage()
334 rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True)
335 rpcserver.register_instance(Rpcs(storage))
336 trpc = threading.Thread(target=rpcserver.serve_forever)
339 epoll = select.epoll()
341 epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
345 logging.debug('Epoll loop')
346 events = epoll.poll(10)
347 for fd, event_type in events:
348 logging.debug('Epoll returned: {},{}'.format(fd, event_type))
349 if event_type != select.EPOLLIN:
350 raise Exception('Unexpected epoll event')
352 data = sys.stdin.readline()
353 logging.debug('Data recevied from exabgp: {}.'.format(data))
354 exa_msg_handler(storage, data, in_args.encoder)
355 except Exception as e:
356 logging.warn('Exception occured: {}'.format(e))
362 if __name__ == '__main__':