Add functional suites for bgpcep using exabgp
[integration/test.git] / tools / exabgp_files / exarpc.py
diff --git a/tools/exabgp_files/exarpc.py b/tools/exabgp_files/exarpc.py
new file mode 100644 (file)
index 0000000..1d2a36c
--- /dev/null
@@ -0,0 +1,361 @@
+#!/usr/bin/env python
+
+import argparse
+import binascii
+import json
+import logging
+import os
+import re
+import select
+import sys
+import threading
+from exabgp.bgp.message import Message
+from SimpleXMLRPCServer import SimpleXMLRPCServer
+
+
+class ExaStorage(dict):
+    """Thread safe dictionary
+
+    The object will serve as thread safe data storage.
+    It should be used with "with" statement.
+
+    The content of thet dict may be changed dynamically, but i'll use it as
+    {
+      "counters": { <msg type>: count, ...}
+      "messages": { <msg type>: [hex_string1, ]}
+    """
+    def __init__(self):
+        """Thread safe dictionary init"""
+        super(ExaStorage, self).__init__()
+        self._lock = threading.Lock()
+
+    def __enter__(self):
+        """Entry point of "with" statement"""
+        self._lock.acquire()
+        return self
+
+    def __exit__(self, type_, value, traceback):
+        """End point of "with" statement"""
+        self._lock.release()
+
+
+class Rpcs(object):
+    """Handler for SimpleXMLRPCServer."""
+
+    def __init__(self, storage):
+        """Init method
+
+        Arguments:
+            :storage: thread safe dict
+        """
+        self.storage = storage
+
+    def _write(self, text):
+        """Pass commands from rpc server towards exabgp
+
+        Arguments:
+            :text: exabgp command
+        """
+        logging.debug('Command towards exabgp: {}'.format(text))
+        sys.stdout.write(text)
+        sys.stdout.write("\n")
+        sys.stdout.flush()
+        logging.debug('Connand flushed: {}.'.format(text))
+
+    def get_counter(self, msg_type):
+        """Gets counter value
+
+        Arguments:
+            :msg_type: message type which counter should be returned
+        Returns:
+            :cnt: counter value
+        """
+        logging.debug('get_counter rpc called, storage {}'.format(self.storage))
+        with self.storage as s:
+            if 'counters' not in s:
+                return 0
+            cnt = 0 if msg_type not in s['counters'] else s['counters'][msg_type]
+        return cnt
+
+    def clean_counter(self, msg_type):
+        """Cleans counter
+
+        Arguments:
+            :msg_type: message type which counter should be cleaned
+        """
+
+        logging.debug('clean_counter rpc called, storage {}'.format(self.storage))
+        with self.storage as s:
+            if 'counters' not in s:
+                return
+            if msg_type in s['counters']:
+                del s['counters'][msg_type]
+
+    def get_message(self, msg_type):
+        """Gets last received message
+
+        Arguments:
+            :msg_type: message type which counter should be returned
+        Returns:
+            :msg: message
+        """
+        logging.debug('get_message {} rpc called, storage {}'.format(msg_type, self.storage))
+        with self.storage as s:
+            if 'messages' not in s:
+                return None
+            msg = None if msg_type not in s['messages'] else s['messages'][msg_type]
+        return msg
+
+    def clean_message(self, msg_type):
+        """Removes stored message
+
+        Arguments:
+            :msg_type: message type which message should be cleaned
+        """
+
+        logging.debug('clean_message rpc called, storage {}'.format(self.storage))
+        with self.storage as s:
+            if 'messages' not in s:
+                return
+            if msg_type in s['messages']:
+                del s['messages'][msg_type]
+        return
+
+    def execute(self, exabgp_cmd):
+        """Execite given command on exabgp
+
+        Arguments:
+            :exabgp_cmd: command
+        """
+        logging.info('executing: {}.'.format(exabgp_cmd))
+        self._write(exabgp_cmd)
+
+
+def decode_message(header, body):
+    """Decodes message
+
+    Arguments:
+        :header: hexstring of the header
+        :body: hexstring of the body
+    Returns:
+        :msg_type: message type
+        :msg: None (in the future some decoded data)
+    """
+    headbin = binascii.unhexlify(header)
+
+    msg_type = ord(headbin[18])
+    msg = None
+
+    return msg_type, msg
+
+
+def _increment_counter(storage, key):
+    """Increments the counter for a message
+
+    Arguments:
+        :key: message type
+    """
+    with storage as s:
+        if 'counters' not in s:
+            s['counters'] = {}
+        if key not in s['counters']:
+            s['counters'][key] = 1
+        else:
+            s['counters'][key] += 1
+
+
+def _store_last_received_message(storage, key, msg):
+    """Stores message under key.
+
+    Arguments:
+        :key: message type
+    """
+    with storage as s:
+        if 'messages' not in s:
+            s['messages'] = {}
+        s['messages'][key] = msg
+
+
+def handle_open(storage, msg):
+    """Handles received bgp open message
+
+    - incements open counter
+
+    Arguments:
+        :msg: hex string of open body
+    """
+    logging.debug('Handling Open with storage {}'.format(storage))
+    _increment_counter(storage, 'open')
+
+
+def handle_keepalive(storage, msg):
+    """Handles received bgp keepalive message
+
+    - incements keepalive counter
+
+    Arguments:
+        :msg: hex string of message body (in fact it is None)
+    """
+    logging.debug('Handling KeepAlive with storage {}'.format(storage))
+    _increment_counter(storage, 'keepalive')
+
+
+def handle_update(storage, msg):
+    """Handles received bgp update message
+
+    - incements update counter
+
+    Arguments:
+        :msg: hex string of update body
+    """
+    logging.debug('Handling Update with storage {}'.format(storage))
+    _increment_counter(storage, 'update')
+
+
+def handle_route_refresh(storage, msg):
+    """Handles received bgp route refresh message
+
+    - incements route refresh counter
+
+    Arguments:
+        :msg: hex string of route refresh body
+    """
+    logging.debug('Handling Route Refresh with storage {}'.format(storage))
+    _increment_counter(storage, 'route_refresh')
+
+
+def handle_json_update(storage, jdata):
+    """Handles received json parsed bgp update message
+
+    - incements update counter
+
+    Arguments:
+        :jdata: json formated data of update message
+    """
+    logging.debug('Handling Json Update with storage {}'.format(storage))
+    _increment_counter(storage, 'update')
+    _store_last_received_message(storage, 'update', jdata)
+
+
+def handle_json_state(storage, jdata):
+    """Handles received json state message
+
+    This is for future use. This information is not used/required/needed
+    at the moment.
+
+    Arguments:
+        :jdata: json formated data about connection/peer state
+    """
+    logging.debug('Handling Json State with storage {}'.format(storage))
+
+
+def handle_json_refresh(storage, jdata):
+    """Handles received json route refresh message
+
+    This is for future use. This information is not used/required/needed
+    at the moment.
+
+    Arguments:
+        :jdata: json formated data about connection/peer state
+    """
+    logging.debug('Handling Json State with storage {}'.format(storage))
+    _increment_counter(storage, 'route_refresh')
+
+
+def exa_msg_handler(storage, data, encoder):
+    """Handles incomming messages"""
+
+    if encoder == 'text':
+        if not ('neighbor' in data and 'header' in data and 'body' in data):
+            logging.debug('Ignoring received notification from exabgp: {}'.format(data))
+            return
+        restr = 'neighbor (?P<ip>[0-9,\\.]+) received (?P<mid>[0-9]+) header\
+ (?P<header>[0-9,A-F]+) body.?(?P<body>[0-9,A-F]+)?'
+        pat = re.compile(restr)
+        match = re.search(pat, data)
+        if match is None:
+            logging.warn('Unexpected data in this part, only bgp message expected. Received: {}.'.format(data))
+            return
+        msg_type, msg = decode_message(match.groupdict()['header'], match.groupdict()['body'])
+        if msg_type == Message.CODE.KEEPALIVE:
+            handle_keepalive(storage, msg)
+        elif msg_type == Message.CODE.OPEN:
+            handle_open(storage, msg)
+        elif msg_type == Message.CODE.UPDATE:
+            handle_update(storage, msg)
+        elif msg_type == Message.CODE.ROUTE_REFRESH:
+            handle_route_refresh(storage, msg)
+        else:
+            logging.warn('No handler function for msg_type: {}'.format(msg_type))
+    elif encoder == 'json':
+        try:
+            jdata = json.loads(data)
+        except Exception:
+            logging.error('Unable to parse, expected json, received: {}.'.format(data))
+            return
+        if jdata['type'] == 'state':
+            logging.debug('State info received: {}.'.format(data))
+            handle_json_state(storage, jdata)
+        elif jdata['type'] == 'update':
+            logging.debug('Update info received: {}.'.format(data))
+            handle_json_update(storage, jdata)
+        elif jdata['type'] == 'notification':
+            logging.debug('Notification info received: {}.'.format(data))
+        elif jdata['type'] == 'refresh':
+            logging.debug('Route refresh received: {}.'.format(data))
+            handle_json_refresh(storage, jdata)
+        else:
+            logging.error('Unexpected type for data: {}'.format(data))
+    else:
+        logging.error('Ignoring received data, unknown encoder: {}'.format(encoder))
+
+
+def main(*argv):
+    """This script is used as i/o api for communication with exabgp
+
+    Arguments:
+        :*argv: unparsed cli arguments
+
+    In a separate thread an rpc server is started. This server will be used as an api towards the user.
+    Stdin and stdout are used for communication with exabgp.
+    """
+
+    parser = argparse.ArgumentParser(description='ExaBgp rpc server script')
+    parser.add_argument('--host', default='127.0.0.1', help='Host where exabgp is running (default is 127.0.0.1)')
+    parser.add_argument('--loglevel', default=logging.DEBUG, help='Log level')
+    parser.add_argument('--logfile', default='{}/exarpc.log'.format(os.path.dirname(os.path.abspath(__file__))),
+                        help='Log file name.')
+    parser.add_argument('--encoder', default='json', help='Exabgp encoder type')
+    in_args = parser.parse_args(*argv)
+    logging.basicConfig(filename=in_args.logfile, level=in_args.loglevel)
+
+    storage = ExaStorage()
+    rpcserver = SimpleXMLRPCServer((in_args.host, 8000), allow_none=True)
+    rpcserver.register_instance(Rpcs(storage))
+    trpc = threading.Thread(target=rpcserver.serve_forever)
+    trpc.start()
+
+    epoll = select.epoll()
+
+    epoll.register(sys.__stdin__, select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)
+
+    try:
+        while True:
+            logging.debug('Epoll loop')
+            events = epoll.poll(10)
+            for fd, event_type in events:
+                logging.debug('Epoll returned: {},{}'.format(fd, event_type))
+                if event_type != select.EPOLLIN:
+                    raise Exception('Unexpected epoll event')
+                else:
+                    data = sys.stdin.readline()
+                    logging.debug('Data recevied from exabgp: {}.'.format(data))
+                    exa_msg_handler(storage, data, in_args.encoder)
+    except Exception as e:
+        logging.warn('Exception occured: {}'.format(e))
+    finally:
+        rpcserver.shutdown()
+        trpc.join()
+
+if __name__ == '__main__':
+    main()