Updated code to match new rules
[integration/test.git] / tools / fastbgp / play.py
index 304bb8f1f52add7560c43893d58132ee775b2411..cd0ce016ac5dc5a9aa6b10c8ffe30c5af5897f95 100755 (executable)
@@ -11,11 +11,6 @@ EXABGP in this type of scenario."""
 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
 # and is available at http://www.eclipse.org/legal/epl-v10.html
 
-__author__ = "Vratko Polak"
-__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
-__license__ = "Eclipse Public License v1.0"
-__email__ = "vrpolak@cisco.com"
-
 import argparse
 import binascii
 import ipaddr
@@ -25,6 +20,15 @@ import time
 import logging
 import struct
 
+import thread
+from copy import deepcopy
+
+
+__author__ = "Vratko Polak"
+__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
+__license__ = "Eclipse Public License v1.0"
+__email__ = "vrpolak@cisco.com"
+
 
 def parse_arguments():
     """Use argparse to get arguments,
@@ -95,7 +99,12 @@ def parse_arguments():
     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
+    str_help = "How many play utilities are to be started."
+    parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
     arguments = parser.parse_args()
+    if arguments.multiplicity < 1:
+        print "Multiplicity", arguments.multiplicity, "is not positive."
+        raise SystemExit(1)
     # TODO: Are sanity checks (such as asnumber>=0) required?
     return arguments
 
@@ -681,8 +690,9 @@ class MessageGenerator(object):
             "\x41"  # "32 bit AS Numbers Support"
                     # (see RFC 6793, section 3)
             "\x04"  # Capability value length
-                    # My AS in 32 bit format
-            + struct.pack(">I", my_autonomous_system)
+        )
+        optional_parameter_hex += (
+            struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
         )
         optional_parameters_hex += optional_parameter_hex
 
@@ -804,13 +814,17 @@ class MessageGenerator(object):
                 "\x06"  # Length (6)
                 "\x02"  # AS segment type (AS_SEQUENCE)
                 "\x01"  # AS segment length (1)
-                        # AS segment (4 bytes)
-                + struct.pack(">I", my_autonomous_system) +
+            )
+            my_AS = struct.pack(">I", my_autonomous_system)
+            path_attributes_hex += my_AS  # AS segment (4 bytes)
+            path_attributes_hex += (
                 "\x40"  # Flags ("Well-Known")
                 "\x03"  # Type (NEXT_HOP)
                 "\x04"  # Length (4)
-                        # IP address of the next hop (4 bytes)
-                + struct.pack(">I", int(next_hop))
+            )
+            next_hop = struct.pack(">I", int(next_hop))
+            path_attributes_hex += (
+                next_hop  # IP address of the next hop (4 bytes)
             )
         else:
             path_attributes_hex = ""
@@ -1086,6 +1100,7 @@ class ReadTracker(object):
         self.prefixes_introduced = 0
         self.prefixes_withdrawn = 0
         self.rx_idle_time = 0
+        self.rx_activity_detected = True
 
     def read_message_chunk(self):
         """Read up to one message
@@ -1341,20 +1356,27 @@ class ReadTracker(object):
             wait_timedelta = 0
         # And wait for event or something to read.
 
-        logger.info("total_received_update_message_counter: %s",
-                    self.updates_received)
-        logger.info("total_received_nlri_prefix_counter: %s",
-                    self.prefixes_introduced)
-        logger.info("total_received_withdrawn_prefix_counter: %s",
-                    self.prefixes_withdrawn)
+        if not self.rx_activity_detected or not (self.updates_received % 100):
+            # right time to write statistics to the log (not for every update and
+            # not too frequently to avoid having large log files)
+            logger.info("total_received_update_message_counter: %s",
+                        self.updates_received)
+            logger.info("total_received_nlri_prefix_counter: %s",
+                        self.prefixes_introduced)
+            logger.info("total_received_withdrawn_prefix_counter: %s",
+                        self.prefixes_withdrawn)
 
         start_time = time.time()
         select.select([self.socket], [], [self.socket], wait_timedelta)
         timedelta = time.time() - start_time
         self.rx_idle_time += timedelta
+        self.rx_activity_detected = timedelta < 1
 
-        logger.info("... idle for %.3fs", timedelta)
-        logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
+        if not self.rx_activity_detected or not (self.updates_received % 100):
+            # right time to write statistics to the log (not for every update and
+            # not too frequently to avoid having large log files)
+            logger.info("... idle for %.3fs", timedelta)
+            logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
         return
 
 
@@ -1523,22 +1545,37 @@ class StateTracker(object):
         return
 
 
-if __name__ == "__main__":
-    """ One time initialisation and iterations looping.
+def create_logger(loglevel, logfile):
+    """Create logger object
 
-    Notes:
-        Establish BGP connection and run iterations.
+    Arguments:
+        :loglevel: log level
+        :logfile: log file name
+    Returns:
+        :return: logger object
     """
-    arguments = parse_arguments()
     logger = logging.getLogger("logger")
     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
     console_handler = logging.StreamHandler()
-    file_handler = logging.FileHandler(arguments.logfile, mode="w")
+    file_handler = logging.FileHandler(logfile, mode="w")
     console_handler.setFormatter(log_formatter)
     file_handler.setFormatter(log_formatter)
     logger.addHandler(console_handler)
     logger.addHandler(file_handler)
-    logger.setLevel(arguments.loglevel)
+    logger.setLevel(loglevel)
+    return logger
+
+
+def job(arguments):
+    """One time initialisation and iterations looping.
+    Notes:
+        Establish BGP connection and run iterations.
+
+    Arguments:
+        :arguments: Command line arguments
+    Returns:
+        :return: None
+    """
     bgp_socket = establish_connection(arguments)
     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
     # Receive open message before sending anything.
@@ -1572,3 +1609,55 @@ if __name__ == "__main__":
     state = StateTracker(bgp_socket, generator, timer)
     while True:  # main reactor loop
         state.perform_one_loop_iteration()
+
+
+def threaded_job(arguments):
+    """Run the job threaded
+
+    Arguments:
+        :arguments: Command line arguments
+    Returns:
+        :return: None
+    """
+    amount_left = arguments.amount
+    utils_left = arguments.multiplicity
+    prefix_current = arguments.firstprefix
+    myip_current = arguments.myip
+    thread_args = []
+
+    while 1:
+        amount_per_util = (amount_left - 1) / utils_left + 1  # round up
+        amount_left -= amount_per_util
+        utils_left -= 1
+
+        args = deepcopy(arguments)
+        args.amount = amount_per_util
+        args.firstprefix = prefix_current
+        args.myip = myip_current
+        thread_args.append(args)
+
+        if not utils_left:
+            break
+        prefix_current += amount_per_util * 16
+        myip_current += 1
+
+    try:
+        # Create threads
+        for t in thread_args:
+            thread.start_new_thread(job, (t,))
+    except Exception:
+        print "Error: unable to start thread."
+        raise SystemExit(2)
+
+    # Work remains forever
+    while 1:
+        time.sleep(5)
+
+
+if __name__ == "__main__":
+    arguments = parse_arguments()
+    logger = create_logger(arguments.loglevel, arguments.logfile)
+    if arguments.multiplicity > 1:
+        threaded_job(arguments)
+    else:
+        job(arguments)