#!/usr/bin/env python import argparse import binascii import json import logging import os import re import select import sys import threading from exabgp.bgp.message import Message from SimpleXMLRPCServer import SimpleXMLRPCServer class ExaStorage(dict): """Thread safe dictionary The object will serve as thread safe data storage. It should be used with "with" statement. The content of thet dict may be changed dynamically, but i'll use it as { "counters": { : count, ...} "messages": { : [hex_string1, ]} """ def __init__(self): """Thread safe dictionary init""" super(ExaStorage, self).__init__() self._lock = threading.Lock() def __enter__(self): """Entry point of "with" statement""" self._lock.acquire() return self def __exit__(self, type_, value, traceback): """End point of "with" statement""" self._lock.release() class Rpcs(object): """Handler for SimpleXMLRPCServer.""" def __init__(self, storage): """Init method Arguments: :storage: thread safe dict """ self.storage = storage def _write(self, text): """Pass commands from rpc server towards exabgp Arguments: :text: exabgp command """ logging.debug("Command towards exabgp: {}".format(text)) sys.stdout.write(text) sys.stdout.write("\n") sys.stdout.flush() logging.debug("Connand flushed: {}.".format(text)) def get_counter(self, msg_type): """Gets counter value Arguments: :msg_type: message type which counter should be returned Returns: :cnt: counter value """ logging.debug("get_counter rpc called, storage {}".format(self.storage)) with self.storage as s: if "counters" not in s: return 0 cnt = 0 if msg_type not in s["counters"] else s["counters"][msg_type] return cnt def clean_counter(self, msg_type): """Cleans counter Arguments: :msg_type: message type which counter should be cleaned """ logging.debug("clean_counter rpc called, storage {}".format(self.storage)) with self.storage as s: if "counters" not in s: return if msg_type in s["counters"]: del s["counters"][msg_type] def get_message(self, msg_type): """Gets last received message Arguments: :msg_type: message type which counter should be returned Returns: :msg: message """ logging.debug( "get_message {} rpc called, storage {}".format(msg_type, self.storage) ) with self.storage as s: if "messages" not in s: return None msg = None if msg_type not in s["messages"] else s["messages"][msg_type] return msg def clean_message(self, msg_type): """Removes stored message Arguments: :msg_type: message type which message should be cleaned """ logging.debug("clean_message rpc called, storage {}".format(self.storage)) with self.storage as s: if "messages" not in s: return if msg_type in s["messages"]: del s["messages"][msg_type] return def execute(self, exabgp_cmd): """Execite given command on exabgp Arguments: :exabgp_cmd: command """ logging.info("executing: {}.".format(exabgp_cmd)) self._write(exabgp_cmd) def decode_message(header, body): """Decodes message Arguments: :header: hexstring of the header :body: hexstring of the body Returns: :msg_type: message type :msg: None (in the future some decoded data) """ headbin = binascii.unhexlify(header) msg_type = ord(headbin[18]) msg = None return msg_type, msg def _increment_counter(storage, key): """Increments the counter for a message Arguments: :key: message type """ with storage as s: if "counters" not in s: s["counters"] = {} if key not in s["counters"]: s["counters"][key] = 1 else: s["counters"][key] += 1 def _store_last_received_message(storage, key, msg): """Stores message under key. Arguments: :key: message type """ with storage as s: if "messages" not in s: s["messages"] = {} s["messages"][key] = msg def handle_open(storage, msg): """Handles received bgp open message - incements open counter Arguments: :msg: hex string of open body """ logging.debug("Handling Open with storage {}".format(storage)) _increment_counter(storage, "open") def handle_keepalive(storage, msg): """Handles received bgp keepalive message - incements keepalive counter Arguments: :msg: hex string of message body (in fact it is None) """ logging.debug("Handling KeepAlive with storage {}".format(storage)) _increment_counter(storage, "keepalive") def handle_update(storage, msg): """Handles received bgp update message - incements update counter Arguments: :msg: hex string of update body """ logging.debug("Handling Update with storage {}".format(storage)) _increment_counter(storage, "update") def handle_route_refresh(storage, msg): """Handles received bgp route refresh message - incements route refresh counter Arguments: :msg: hex string of route refresh body """ logging.debug("Handling Route Refresh with storage {}".format(storage)) _increment_counter(storage, "route_refresh") def handle_json_update(storage, jdata): """Handles received json parsed bgp update message - incements update counter Arguments: :jdata: json formated data of update message """ logging.debug("Handling Json Update with storage {}".format(storage)) _increment_counter(storage, "update") _store_last_received_message(storage, "update", jdata) def handle_json_state(storage, jdata): """Handles received json state message This is for future use. This information is not used/required/needed at the moment. Arguments: :jdata: json formated data about connection/peer state """ logging.debug("Handling Json State with storage {}".format(storage)) def handle_json_refresh(storage, jdata): """Handles received json route refresh message This is for future use. This information is not used/required/needed at the moment. Arguments: :jdata: json formated data about connection/peer state """ logging.debug("Handling Json State with storage {}".format(storage)) _increment_counter(storage, "route_refresh") def exa_msg_handler(storage, data, encoder): """Handles incomming messages""" if encoder == "text": if not ("neighbor" in data and "header" in data and "body" in data): logging.debug("Ignoring received notification from exabgp: {}".format(data)) return restr = "neighbor (?P[0-9,\\.]+) received (?P[0-9]+) header\ (?P
[0-9,A-F]+) body.?(?P[0-9,A-F]+)?" pat = re.compile(restr) match = re.search(pat, data) if match is None: logging.warn( "Unexpected data in this part, only bgp message expected. Received: {}.".format( data ) ) return msg_type, msg = decode_message( match.groupdict()["header"], match.groupdict()["body"] ) if msg_type == Message.CODE.KEEPALIVE: handle_keepalive(storage, msg) elif msg_type == Message.CODE.OPEN: handle_open(storage, msg) elif msg_type == Message.CODE.UPDATE: handle_update(storage, msg) elif msg_type == Message.CODE.ROUTE_REFRESH: handle_route_refresh(storage, msg) else: logging.warn("No handler function for msg_type: {}".format(msg_type)) elif encoder == "json": try: jdata = json.loads(data) except Exception: logging.error("Unable to parse, expected json, received: {}.".format(data)) return if jdata["type"] == "state": logging.debug("State info received: {}.".format(data)) handle_json_state(storage, jdata) elif jdata["type"] == "update": logging.debug("Update info received: {}.".format(data)) handle_json_update(storage, jdata) elif jdata["type"] == "notification": logging.debug("Notification info received: {}.".format(data)) elif jdata["type"] == "refresh": logging.debug("Route refresh received: {}.".format(data)) handle_json_refresh(storage, jdata) else: logging.error("Unexpected type for data: {}".format(data)) else: logging.error("Ignoring received data, unknown encoder: {}".format(encoder)) def main(*argv): """This script is used as i/o api for communication with exabgp Arguments: :*argv: unparsed cli arguments In a separate thread an rpc server is started. This server will be used as an api towards the user. Stdin and stdout are used for communication with exabgp. """ parser = argparse.ArgumentParser(description="ExaBgp rpc server script") parser.add_argument( "--host", default="127.0.0.1", help="Host where exabgp is running (default is 127.0.0.1)", ) parser.add_argument("--loglevel", default=logging.DEBUG, help="Log level") parser.add_argument( "--logfile", default="{}/exarpc.log".format(os.path.dirname(os.path.abspath(__file__))), help="Log file name.", ) parser.add_argument("--encoder", default="json", help="Exabgp encoder type") in_args = parser.parse_args(*argv) logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel) storage = ExaStorage() rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True) rpcserver.register_instance(Rpcs(storage)) trpc = threading.Thread(target=rpcserver.serve_forever) trpc.start() epoll = select.epoll() epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP) try: while True: logging.debug("Epoll loop") events = epoll.poll(10) for fd, event_type in events: logging.debug("Epoll returned: {},{}".format(fd, event_type)) if event_type != select.EPOLLIN: raise Exception("Unexpected epoll event") else: data = sys.stdin.readline() logging.debug("Data recevied from exabgp: {}.".format(data)) exa_msg_handler(storage, data, in_args.encoder) except Exception as e: logging.warn("Exception occured: {}".format(e)) finally: rpcserver.shutdown() trpc.join() if __name__ == "__main__": main()