12 from exabgp.bgp.message import Message
13 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 class ExaStorage(dict):
17 """Thread safe dictionary
19 The object will serve as thread safe data storage.
20 It should be used with "with" statement.
22 The content of thet dict may be changed dynamically, but i'll use it as
24 "counters": { <msg type>: count, ...}
25 "messages": { <msg type>: [hex_string1, ]}
29 """Thread safe dictionary init"""
30 super(ExaStorage, self).__init__()
31 self._lock = threading.Lock()
34 """Entry point of "with" statement"""
38 def __exit__(self, type_, value, traceback):
39 """End point of "with" statement"""
44 """Handler for SimpleXMLRPCServer."""
46 def __init__(self, storage):
50 :storage: thread safe dict
52 self.storage = storage
54 def _write(self, text):
55 """Pass commands from rpc server towards exabgp
60 logging.debug("Command towards exabgp: {}".format(text))
61 sys.stdout.write(text)
62 sys.stdout.write("\n")
64 logging.debug("Connand flushed: {}.".format(text))
66 def get_counter(self, msg_type):
70 :msg_type: message type which counter should be returned
74 logging.debug("get_counter rpc called, storage {}".format(self.storage))
75 with self.storage as s:
76 if "counters" not in s:
78 cnt = 0 if msg_type not in s["counters"] else s["counters"][msg_type]
81 def clean_counter(self, msg_type):
85 :msg_type: message type which counter should be cleaned
88 logging.debug("clean_counter rpc called, storage {}".format(self.storage))
89 with self.storage as s:
90 if "counters" not in s:
92 if msg_type in s["counters"]:
93 del s["counters"][msg_type]
95 def get_message(self, msg_type):
96 """Gets last received message
99 :msg_type: message type which counter should be returned
104 "get_message {} rpc called, storage {}".format(msg_type, self.storage)
106 with self.storage as s:
107 if "messages" not in s:
109 msg = None if msg_type not in s["messages"] else s["messages"][msg_type]
112 def clean_message(self, msg_type):
113 """Removes stored message
116 :msg_type: message type which message should be cleaned
119 logging.debug("clean_message rpc called, storage {}".format(self.storage))
120 with self.storage as s:
121 if "messages" not in s:
123 if msg_type in s["messages"]:
124 del s["messages"][msg_type]
127 def execute(self, exabgp_cmd):
128 """Execite given command on exabgp
133 logging.info("executing: {}.".format(exabgp_cmd))
134 self._write(exabgp_cmd)
137 def decode_message(header, body):
141 :header: hexstring of the header
142 :body: hexstring of the body
144 :msg_type: message type
145 :msg: None (in the future some decoded data)
147 headbin = binascii.unhexlify(header)
149 msg_type = ord(headbin[18])
155 def _increment_counter(storage, key):
156 """Increments the counter for a message
162 if "counters" not in s:
164 if key not in s["counters"]:
165 s["counters"][key] = 1
167 s["counters"][key] += 1
170 def _store_last_received_message(storage, key, msg):
171 """Stores message under key.
177 if "messages" not in s:
179 s["messages"][key] = msg
182 def handle_open(storage, msg):
183 """Handles received bgp open message
185 - incements open counter
188 :msg: hex string of open body
190 logging.debug("Handling Open with storage {}".format(storage))
191 _increment_counter(storage, "open")
194 def handle_keepalive(storage, msg):
195 """Handles received bgp keepalive message
197 - incements keepalive counter
200 :msg: hex string of message body (in fact it is None)
202 logging.debug("Handling KeepAlive with storage {}".format(storage))
203 _increment_counter(storage, "keepalive")
206 def handle_update(storage, msg):
207 """Handles received bgp update message
209 - incements update counter
212 :msg: hex string of update body
214 logging.debug("Handling Update with storage {}".format(storage))
215 _increment_counter(storage, "update")
218 def handle_route_refresh(storage, msg):
219 """Handles received bgp route refresh message
221 - incements route refresh counter
224 :msg: hex string of route refresh body
226 logging.debug("Handling Route Refresh with storage {}".format(storage))
227 _increment_counter(storage, "route_refresh")
230 def handle_json_update(storage, jdata):
231 """Handles received json parsed bgp update message
233 - incements update counter
236 :jdata: json formated data of update message
238 logging.debug("Handling Json Update with storage {}".format(storage))
239 _increment_counter(storage, "update")
240 _store_last_received_message(storage, "update", jdata)
243 def handle_json_state(storage, jdata):
244 """Handles received json state message
246 This is for future use. This information is not used/required/needed
250 :jdata: json formated data about connection/peer state
252 logging.debug("Handling Json State with storage {}".format(storage))
255 def handle_json_refresh(storage, jdata):
256 """Handles received json route refresh message
258 This is for future use. This information is not used/required/needed
262 :jdata: json formated data about connection/peer state
264 logging.debug("Handling Json State with storage {}".format(storage))
265 _increment_counter(storage, "route_refresh")
268 def exa_msg_handler(storage, data, encoder):
269 """Handles incomming messages"""
271 if encoder == "text":
272 if not ("neighbor" in data and "header" in data and "body" in data):
273 logging.debug("Ignoring received notification from exabgp: {}".format(data))
275 restr = "neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
276 (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?"
277 pat = re.compile(restr)
278 match = re.search(pat, data)
281 "Unexpected data in this part, only bgp message expected. Received: {}.".format(
286 msg_type, msg = decode_message(
287 match.groupdict()["header"], match.groupdict()["body"]
289 if msg_type == Message.CODE.KEEPALIVE:
290 handle_keepalive(storage, msg)
291 elif msg_type == Message.CODE.OPEN:
292 handle_open(storage, msg)
293 elif msg_type == Message.CODE.UPDATE:
294 handle_update(storage, msg)
295 elif msg_type == Message.CODE.ROUTE_REFRESH:
296 handle_route_refresh(storage, msg)
298 logging.warn("No handler function for msg_type: {}".format(msg_type))
299 elif encoder == "json":
301 jdata = json.loads(data)
303 logging.error("Unable to parse, expected json, received: {}.".format(data))
305 if jdata["type"] == "state":
306 logging.debug("State info received: {}.".format(data))
307 handle_json_state(storage, jdata)
308 elif jdata["type"] == "update":
309 logging.debug("Update info received: {}.".format(data))
310 handle_json_update(storage, jdata)
311 elif jdata["type"] == "notification":
312 logging.debug("Notification info received: {}.".format(data))
313 elif jdata["type"] == "refresh":
314 logging.debug("Route refresh received: {}.".format(data))
315 handle_json_refresh(storage, jdata)
317 logging.error("Unexpected type for data: {}".format(data))
319 logging.error("Ignoring received data, unknown encoder: {}".format(encoder))
323 """This script is used as i/o api for communication with exabgp
326 :*argv: unparsed cli arguments
328 In a separate thread an rpc server is started. This server will be used as an api towards the user.
329 Stdin and stdout are used for communication with exabgp.
332 parser = argparse.ArgumentParser(description="ExaBgp rpc server script")
336 help="Host where exabgp is running (default is 127.0.0.1)",
338 parser.add_argument("--loglevel", default=logging.DEBUG, help="Log level")
341 default="{}/exarpc.log".format(os.path.dirname(os.path.abspath(__file__))),
342 help="Log file name.",
344 parser.add_argument("--encoder", default="json", help="Exabgp encoder type")
345 in_args = parser.parse_args(*argv)
346 logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
348 storage = ExaStorage()
349 rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True)
350 rpcserver.register_instance(Rpcs(storage))
351 trpc = threading.Thread(target=rpcserver.serve_forever)
354 epoll = select.epoll()
356 epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
360 logging.debug("Epoll loop")
361 events = epoll.poll(10)
362 for fd, event_type in events:
363 logging.debug("Epoll returned: {},{}".format(fd, event_type))
364 if event_type != select.EPOLLIN:
365 raise Exception("Unexpected epoll event")
367 data = sys.stdin.readline()
368 logging.debug("Data recevied from exabgp: {}.".format(data))
369 exa_msg_handler(storage, data, in_args.encoder)
370 except Exception as e:
371 logging.warn("Exception occured: {}".format(e))
377 if __name__ == "__main__":