Arguments:
:text: exabgp command
"""
- logging.debug('Command towards exabgp: {}'.format(text))
+ logging.debug("Command towards exabgp: {}".format(text))
sys.stdout.write(text)
sys.stdout.write("\n")
sys.stdout.flush()
- logging.debug('Connand flushed: {}.'.format(text))
+ logging.debug("Connand flushed: {}.".format(text))
def get_counter(self, msg_type):
"""Gets counter value
Returns:
:cnt: counter value
"""
- logging.debug('get_counter rpc called, storage {}'.format(self.storage))
+ logging.debug("get_counter rpc called, storage {}".format(self.storage))
with self.storage as s:
- if 'counters' not in s:
+ if "counters" not in s:
return 0
- cnt = 0 if msg_type not in s['counters'] else s['counters'][msg_type]
+ cnt = 0 if msg_type not in s["counters"] else s["counters"][msg_type]
return cnt
def clean_counter(self, msg_type):
:msg_type: message type which counter should be cleaned
"""
- logging.debug('clean_counter rpc called, storage {}'.format(self.storage))
+ logging.debug("clean_counter rpc called, storage {}".format(self.storage))
with self.storage as s:
- if 'counters' not in s:
+ if "counters" not in s:
return
- if msg_type in s['counters']:
- del s['counters'][msg_type]
+ if msg_type in s["counters"]:
+ del s["counters"][msg_type]
def get_message(self, msg_type):
"""Gets last received message
Returns:
:msg: message
"""
- logging.debug('get_message {} rpc called, storage {}'.format(msg_type, self.storage))
+ logging.debug(
+ "get_message {} rpc called, storage {}".format(msg_type, self.storage)
+ )
with self.storage as s:
- if 'messages' not in s:
+ if "messages" not in s:
return None
- msg = None if msg_type not in s['messages'] else s['messages'][msg_type]
+ msg = None if msg_type not in s["messages"] else s["messages"][msg_type]
return msg
def clean_message(self, msg_type):
:msg_type: message type which message should be cleaned
"""
- logging.debug('clean_message rpc called, storage {}'.format(self.storage))
+ logging.debug("clean_message rpc called, storage {}".format(self.storage))
with self.storage as s:
- if 'messages' not in s:
+ if "messages" not in s:
return
- if msg_type in s['messages']:
- del s['messages'][msg_type]
+ if msg_type in s["messages"]:
+ del s["messages"][msg_type]
return
def execute(self, exabgp_cmd):
Arguments:
:exabgp_cmd: command
"""
- logging.info('executing: {}.'.format(exabgp_cmd))
+ logging.info("executing: {}.".format(exabgp_cmd))
self._write(exabgp_cmd)
:key: message type
"""
with storage as s:
- if 'counters' not in s:
- s['counters'] = {}
- if key not in s['counters']:
- s['counters'][key] = 1
+ if "counters" not in s:
+ s["counters"] = {}
+ if key not in s["counters"]:
+ s["counters"][key] = 1
else:
- s['counters'][key] += 1
+ s["counters"][key] += 1
def _store_last_received_message(storage, key, msg):
:key: message type
"""
with storage as s:
- if 'messages' not in s:
- s['messages'] = {}
- s['messages'][key] = msg
+ if "messages" not in s:
+ s["messages"] = {}
+ s["messages"][key] = msg
def handle_open(storage, msg):
Arguments:
:msg: hex string of open body
"""
- logging.debug('Handling Open with storage {}'.format(storage))
- _increment_counter(storage, 'open')
+ logging.debug("Handling Open with storage {}".format(storage))
+ _increment_counter(storage, "open")
def handle_keepalive(storage, msg):
Arguments:
:msg: hex string of message body (in fact it is None)
"""
- logging.debug('Handling KeepAlive with storage {}'.format(storage))
- _increment_counter(storage, 'keepalive')
+ logging.debug("Handling KeepAlive with storage {}".format(storage))
+ _increment_counter(storage, "keepalive")
def handle_update(storage, msg):
Arguments:
:msg: hex string of update body
"""
- logging.debug('Handling Update with storage {}'.format(storage))
- _increment_counter(storage, 'update')
+ logging.debug("Handling Update with storage {}".format(storage))
+ _increment_counter(storage, "update")
def handle_route_refresh(storage, msg):
Arguments:
:msg: hex string of route refresh body
"""
- logging.debug('Handling Route Refresh with storage {}'.format(storage))
- _increment_counter(storage, 'route_refresh')
+ logging.debug("Handling Route Refresh with storage {}".format(storage))
+ _increment_counter(storage, "route_refresh")
def handle_json_update(storage, jdata):
Arguments:
:jdata: json formated data of update message
"""
- logging.debug('Handling Json Update with storage {}'.format(storage))
- _increment_counter(storage, 'update')
- _store_last_received_message(storage, 'update', jdata)
+ logging.debug("Handling Json Update with storage {}".format(storage))
+ _increment_counter(storage, "update")
+ _store_last_received_message(storage, "update", jdata)
def handle_json_state(storage, jdata):
Arguments:
:jdata: json formated data about connection/peer state
"""
- logging.debug('Handling Json State with storage {}'.format(storage))
+ logging.debug("Handling Json State with storage {}".format(storage))
def handle_json_refresh(storage, jdata):
Arguments:
:jdata: json formated data about connection/peer state
"""
- logging.debug('Handling Json State with storage {}'.format(storage))
- _increment_counter(storage, 'route_refresh')
+ logging.debug("Handling Json State with storage {}".format(storage))
+ _increment_counter(storage, "route_refresh")
def exa_msg_handler(storage, data, encoder):
"""Handles incomming messages"""
- if encoder == 'text':
- if not ('neighbor' in data and 'header' in data and 'body' in data):
- logging.debug('Ignoring received notification from exabgp: {}'.format(data))
+ if encoder == "text":
+ if not ("neighbor" in data and "header" in data and "body" in data):
+ logging.debug("Ignoring received notification from exabgp: {}".format(data))
return
- restr = 'neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
- (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?'
+ restr = "neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
+ (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?"
pat = re.compile(restr)
match = re.search(pat, data)
if match is None:
- logging.warn('Unexpected data in this part, only bgp message expected. Received: {}.'.format(data))
+ logging.warn(
+ "Unexpected data in this part, only bgp message expected. Received: {}.".format(
+ data
+ )
+ )
return
- msg_type, msg = decode_message(match.groupdict()['header'], match.groupdict()['body'])
+ msg_type, msg = decode_message(
+ match.groupdict()["header"], match.groupdict()["body"]
+ )
if msg_type == Message.CODE.KEEPALIVE:
handle_keepalive(storage, msg)
elif msg_type == Message.CODE.OPEN:
elif msg_type == Message.CODE.ROUTE_REFRESH:
handle_route_refresh(storage, msg)
else:
- logging.warn('No handler function for msg_type: {}'.format(msg_type))
- elif encoder == 'json':
+ logging.warn("No handler function for msg_type: {}".format(msg_type))
+ elif encoder == "json":
try:
jdata = json.loads(data)
except Exception:
- logging.error('Unable to parse, expected json, received: {}.'.format(data))
+ logging.error("Unable to parse, expected json, received: {}.".format(data))
return
- if jdata['type'] == 'state':
- logging.debug('State info received: {}.'.format(data))
+ if jdata["type"] == "state":
+ logging.debug("State info received: {}.".format(data))
handle_json_state(storage, jdata)
- elif jdata['type'] == 'update':
- logging.debug('Update info received: {}.'.format(data))
+ elif jdata["type"] == "update":
+ logging.debug("Update info received: {}.".format(data))
handle_json_update(storage, jdata)
- elif jdata['type'] == 'notification':
- logging.debug('Notification info received: {}.'.format(data))
- elif jdata['type'] == 'refresh':
- logging.debug('Route refresh received: {}.'.format(data))
+ elif jdata["type"] == "notification":
+ logging.debug("Notification info received: {}.".format(data))
+ elif jdata["type"] == "refresh":
+ logging.debug("Route refresh received: {}.".format(data))
handle_json_refresh(storage, jdata)
else:
- logging.error('Unexpected type for data: {}'.format(data))
+ logging.error("Unexpected type for data: {}".format(data))
else:
- logging.error('Ignoring received data, unknown encoder: {}'.format(encoder))
+ logging.error("Ignoring received data, unknown encoder: {}".format(encoder))
def main(*argv):
Stdin and stdout are used for communication with exabgp.
"""
- parser = argparse.ArgumentParser(description='ExaBgp rpc server script')
- parser.add_argument('--host', default='127.0.0.1', help='Host where exabgp is running (default is 127.0.0.1)')
- parser.add_argument('--loglevel', default=logging.DEBUG, help='Log level')
- parser.add_argument('--logfile', default='{}/exarpc.log'.format(os.path.dirname(os.path.abspath(__file__))),
- help='Log file name.')
- parser.add_argument('--encoder', default='json', help='Exabgp encoder type')
+ parser = argparse.ArgumentParser(description="ExaBgp rpc server script")
+ parser.add_argument(
+ "--host",
+ default="127.0.0.1",
+ help="Host where exabgp is running (default is 127.0.0.1)",
+ )
+ parser.add_argument("--loglevel", default=logging.DEBUG, help="Log level")
+ parser.add_argument(
+ "--logfile",
+ default="{}/exarpc.log".format(os.path.dirname(os.path.abspath(__file__))),
+ help="Log file name.",
+ )
+ parser.add_argument("--encoder", default="json", help="Exabgp encoder type")
in_args = parser.parse_args(*argv)
logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
try:
while True:
- logging.debug('Epoll loop')
+ logging.debug("Epoll loop")
events = epoll.poll(10)
for fd, event_type in events:
- logging.debug('Epoll returned: {},{}'.format(fd, event_type))
+ logging.debug("Epoll returned: {},{}".format(fd, event_type))
if event_type != select.EPOLLIN:
- raise Exception('Unexpected epoll event')
+ raise Exception("Unexpected epoll event")
else:
data = sys.stdin.readline()
- logging.debug('Data recevied from exabgp: {}.'.format(data))
+ logging.debug("Data recevied from exabgp: {}.".format(data))
exa_msg_handler(storage, data, in_args.encoder)
except Exception as e:
- logging.warn('Exception occured: {}'.format(e))
+ logging.warn("Exception occured: {}".format(e))
finally:
rpcserver.shutdown()
trpc.join()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()