Mvpn functional tests
[integration/test.git] / tools / fastbgp / play.py
index dfc4f0ef98af5b17ecd6520a326f68885d9131a0..4cc0fce66f23536dae04ec6e1e30c29d24849581 100755 (executable)
@@ -11,17 +11,19 @@ EXABGP in this type of scenario."""
 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
 # and is available at http://www.eclipse.org/legal/epl-v10.html
 
+from copy import deepcopy
+from SimpleXMLRPCServer import SimpleXMLRPCServer
 import argparse
 import binascii
 import ipaddr
+import logging
+import Queue
 import select
 import socket
-import time
-import logging
 import struct
-
 import thread
-from copy import deepcopy
+import threading
+import time
 
 
 __author__ = "Vratko Polak"
@@ -30,6 +32,25 @@ __license__ = "Eclipse Public License v1.0"
 __email__ = "vrpolak@cisco.com"
 
 
+class SafeDict(dict):
+    '''Thread safe dictionary
+
+    The object will serve as thread safe data storage.
+    It should be used with "with" statement.
+    '''
+
+    def __init__(self, * p_arg, ** n_arg):
+        super(SafeDict, self).__init__()
+        self._lock = threading.Lock()
+
+    def __enter__(self):
+        self._lock.acquire()
+        return self
+
+    def __exit__(self, type, value, traceback):
+        self._lock.release()
+
+
 def parse_arguments():
     """Use argparse to get arguments,
 
@@ -131,6 +152,17 @@ def parse_arguments():
     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
     str_help = "How many play utilities are to be started."
     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
+    str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
+Enabling this flag makes the script not decoding the update mesage, because of not\
+supported decoding for these elements."
+    parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
+    str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
+    Enabling this flag makes the script not decoding the update mesage, because of not\
+    supported decoding for these elements."
+    parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
+    parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
+    str_help = "Skipping well known attributes for update message"
+    parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
     arguments = parser.parse_args()
     if arguments.multiplicity < 1:
         print "Multiplicity", arguments.multiplicity, "is not positive."
@@ -326,6 +358,9 @@ class MessageGenerator(object):
         self.performance_threshold_default = args.threshold
         self.rfc4760 = args.rfc4760
         self.bgpls = args.bgpls
+        self.evpn = args.evpn
+        self.mvpn = args.mvpn
+        self.skipattr = args.skipattr
         # Default values when BGP-LS Attributes are used
         if self.bgpls:
             self.prefix_count_to_add_default = 1
@@ -346,7 +381,7 @@ class MessageGenerator(object):
                     self.prefix_count_to_add_default + 1)
         s2_slots = ((self.remaining_prefixes_threshold - 1) /
                     (self.prefix_count_to_add_default -
-                    self.prefix_count_to_del_default) + 1)
+                     self.prefix_count_to_del_default) + 1)
         # S1_First_Index = 0
         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
         s2_first_index = s1_slots * self.prefix_count_to_add_default
@@ -781,6 +816,40 @@ class MessageGenerator(object):
             )
             optional_parameters_hex += optional_parameter_hex
 
+        if self.evpn:
+            optional_parameter_hex = (
+                "\x02"  # Param type ("Capability Ad")
+                "\x06"  # Length (6 bytes)
+                "\x01"  # Multiprotocol extetension capability,
+                "\x04"  # Capability value length
+                "\x00\x19"  # AFI (L2-VPN)
+                "\x00"  # (reserved)
+                "\x46"  # SAFI (EVPN)
+            )
+            optional_parameters_hex += optional_parameter_hex
+
+        if self.mvpn:
+            optional_parameter_hex = (
+                "\x02"  # Param type ("Capability Ad")
+                "\x06"  # Length (6 bytes)
+                "\x01"  # Multiprotocol extetension capability,
+                "\x04"  # Capability value length
+                "\x00\x01"  # AFI (IPV4)
+                "\x00"  # (reserved)
+                "\x05"  # SAFI (MCAST-VPN)
+            )
+            optional_parameters_hex += optional_parameter_hex
+            optional_parameter_hex = (
+                "\x02"  # Param type ("Capability Ad")
+                "\x06"  # Length (6 bytes)
+                "\x01"  # Multiprotocol extetension capability,
+                "\x04"  # Capability value length
+                "\x00\x02"  # AFI (IPV6)
+                "\x00"  # (reserved)
+                "\x05"  # SAFI (MCAST-VPN)
+            )
+            optional_parameters_hex += optional_parameter_hex
+
         optional_parameter_hex = (
             "\x02"  # Param type ("Capability Ad")
             "\x06"  # Length (6 bytes)
@@ -911,7 +980,7 @@ class MessageGenerator(object):
         # TODO: to replace hardcoded string by encoding?
         # Path Attributes
         path_attributes_hex = ""
-        if nlri_prefixes != []:
+        if not self.skipattr:
             path_attributes_hex += (
                 "\x40"  # Flags ("Well-Known")
                 "\x01"  # Type (ORIGIN)
@@ -927,6 +996,13 @@ class MessageGenerator(object):
             )
             my_as_hex = struct.pack(">I", my_autonomous_system)
             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
+            path_attributes_hex += (
+                "\x40"  # Flags ("Well-Known")
+                "\x05"  # Type (LOCAL_PREF)
+                "\x04"  # Length (4)
+                "\x00\x00\x00\x64"  # (100)
+            )
+        if nlri_prefixes != []:
             path_attributes_hex += (
                 "\x40"  # Flags ("Well-Known")
                 "\x03"  # Type (NEXT_HOP)
@@ -946,7 +1022,7 @@ class MessageGenerator(object):
             if cluster_list_item is not None:
                 path_attributes_hex += (
                     "\x80"  # Flags ("Optional, non-transitive")
-                    "\x09"  # Type (CLUSTER_LIST)
+                    "\x0a"  # Type (CLUSTER_LIST)
                     "\x04"  # Length (4)
                 )           # one CLUSTER_LIST item (4 bytes)
                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
@@ -1229,12 +1305,15 @@ class ReadTracker(object):
     for idle waiting.
     """
 
-    def __init__(self, bgp_socket, timer):
+    def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False, wait_for_read=10):
         """The reader initialisation.
 
         Arguments:
             bgp_socket: socket to be used for sending
             timer: timer to be used for scheduling
+            storage: thread safe dict
+            evpn: flag that evpn functionality is tested
+            mvpn: flag that mvpn functionality is tested
         """
         # References to outside objects.
         self.socket = bgp_socket
@@ -1255,6 +1334,10 @@ class ReadTracker(object):
         self.prefixes_withdrawn = 0
         self.rx_idle_time = 0
         self.rx_activity_detected = True
+        self.storage = storage
+        self.evpn = evpn
+        self.mvpn = mvpn
+        self.wfr = wait_for_read
 
     def read_message_chunk(self):
         """Read up to one message
@@ -1264,6 +1347,7 @@ class ReadTracker(object):
         """
         # TODO: We could return the whole message, currently not needed.
         # We assume the socket is readable.
+        logger.info("READING MESSAGE")
         chunk_message = self.socket.recv(self.bytes_to_read)
         self.msg_in += chunk_message
         self.bytes_to_read -= len(chunk_message)
@@ -1452,6 +1536,21 @@ class ReadTracker(object):
         # message header - message type
         msg_type_hex = msg[18:19]
         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
+
+        with self.storage as stor:
+            # this will replace the previously stored message
+            stor['update'] = binascii.hexlify(msg)
+
+        logger.debug("Evpn {}".format(self.evpn))
+        if self.evpn:
+            logger.debug("Skipping update decoding due to evpn data expected")
+            return
+
+        logger.debug("Mvpn {}".format(self.mvpn))
+        if self.mvpn:
+            logger.debug("Skipping update decoding due to mvpn data expected")
+            return
+
         if msg_type == 2:
             logger.debug("Message type: 0x%s (update)",
                          binascii.b2a_hex(msg_type_hex))
@@ -1509,7 +1608,7 @@ class ReadTracker(object):
         # Compute time to the first predictable state change
         event_time = self.timer.get_next_event_time()
         # snapshot_time would be imprecise
-        wait_timedelta = min(event_time - time.time(), 10)
+        wait_timedelta = min(event_time - time.time(), self.wfr)
         if wait_timedelta < 0:
             # The program got around to waiting to an event in "very near
             # future" so late that it became a "past" event, thus tell
@@ -1600,20 +1699,24 @@ class WriteTracker(object):
 class StateTracker(object):
     """Main loop has state so complex it warrants this separate class."""
 
-    def __init__(self, bgp_socket, generator, timer):
+    def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
         """The state tracker initialisation.
 
         Arguments:
             bgp_socket: socket to be used for sending / receiving
             generator: generator to be used for message generation
             timer: timer to be used for scheduling
+            inqueue: user initiated messages queue
+            storage: thread safe dict to store data for the rpc server
+            cliargs: cli args from the user
         """
         # References to outside objects.
         self.socket = bgp_socket
         self.generator = generator
         self.timer = timer
         # Sub-trackers.
-        self.reader = ReadTracker(bgp_socket, timer)
+        self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn,
+                                  mvpn=cliargs.mvpn, wait_for_read=cliargs.wfr)
         self.writer = WriteTracker(bgp_socket, generator, timer)
         # Prioritization state.
         self.prioritize_writing = False
@@ -1626,6 +1729,7 @@ class StateTracker(object):
         # TODO: Alternative is to switch fairly between reading and
         # writing (called round robin from now on).
         # Message counting is done in generator.
+        self.inqueue = inqueue
 
     def perform_one_loop_iteration(self):
         """ The main loop iteration
@@ -1643,6 +1747,14 @@ class StateTracker(object):
                     logger.info("KEEP ALIVE is sent.")
                 # We are sending a message now, so let's prioritize it.
                 self.prioritize_writing = True
+
+        try:
+            msg = self.inqueue.get_nowait()
+            logger.info("Received message: {}".format(msg))
+            msgbin = binascii.unhexlify(msg)
+            self.writer.enqueue_message_for_sending(msgbin)
+        except Queue.Empty:
+            pass
         # Now we know what our priorities are, we have to check
         # which actions are available.
         # socket.socket() returns three lists,
@@ -1730,13 +1842,15 @@ def create_logger(loglevel, logfile):
     return logger
 
 
-def job(arguments):
+def job(arguments, inqueue, storage):
     """One time initialisation and iterations looping.
     Notes:
         Establish BGP connection and run iterations.
 
     Arguments:
         :arguments: Command line arguments
+        :inqueue: Data to be sent from play.py
+        :storage: Shared dict for rpc server
     Returns:
         :return: None
     """
@@ -1769,11 +1883,57 @@ def job(arguments):
     # Use the remembered time.
     timer.reset_my_keepalive_time(timer.snapshot_time)
     # End of initial handshake phase.
-    state = StateTracker(bgp_socket, generator, timer)
+    state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
     while True:  # main reactor loop
         state.perform_one_loop_iteration()
 
 
+class Rpcs:
+    '''Handler for SimpleXMLRPCServer'''
+
+    def __init__(self, sendqueue, storage):
+        '''Init method
+
+        Arguments:
+            :sendqueue: queue for data to be sent towards odl
+            :storage: thread safe dict
+        '''
+        self.queue = sendqueue
+        self.storage = storage
+
+    def send(self, text):
+        '''Data to be sent
+
+        Arguments:
+            :text: hes string of the data to be sent
+        '''
+        self.queue.put(text)
+
+    def get(self, text=''):
+        '''Reads data form the storage
+
+        - returns stored data or an empty string, at the moment only
+          'update' is stored
+
+        Arguments:
+            :text: a key to the storage to get the data
+        Returns:
+            :data: stored data
+        '''
+        with self.storage as stor:
+            return stor.get(text, '')
+
+    def clean(self, text=''):
+        '''Cleans data form the storage
+
+        Arguments:
+            :text: a key to the storage to clean the data
+        '''
+        with self.storage as stor:
+            if text in stor:
+                del stor[text]
+
+
 def threaded_job(arguments):
     """Run the job threaded
 
@@ -1787,6 +1947,8 @@ def threaded_job(arguments):
     prefix_current = arguments.firstprefix
     myip_current = arguments.myip
     thread_args = []
+    rpcqueue = Queue.Queue()
+    storage = SafeDict()
 
     while 1:
         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
@@ -1807,14 +1969,14 @@ def threaded_job(arguments):
     try:
         # Create threads
         for t in thread_args:
-            thread.start_new_thread(job, (t,))
+            thread.start_new_thread(job, (t, rpcqueue, storage))
     except Exception:
         print "Error: unable to start thread."
         raise SystemExit(2)
 
-    # Work remains forever
-    while 1:
-        time.sleep(5)
+    rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
+    rpcserver.register_instance(Rpcs(rpcqueue, storage))
+    rpcserver.serve_forever()
 
 
 if __name__ == "__main__":