Do not emit raw binary data after unconfirmed open
[integration/test.git] / tools / fastbgp / play.py
index 304bb8f1f52add7560c43893d58132ee775b2411..2342224a3ca76b0befbbeed771e5f6017c590583 100755 (executable)
@@ -11,11 +11,6 @@ EXABGP in this type of scenario."""
 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
 # and is available at http://www.eclipse.org/legal/epl-v10.html
 
-__author__ = "Vratko Polak"
-__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
-__license__ = "Eclipse Public License v1.0"
-__email__ = "vrpolak@cisco.com"
-
 import argparse
 import binascii
 import ipaddr
@@ -25,6 +20,15 @@ import time
 import logging
 import struct
 
+import thread
+from copy import deepcopy
+
+
+__author__ = "Vratko Polak"
+__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
+__license__ = "Eclipse Public License v1.0"
+__email__ = "vrpolak@cisco.com"
+
 
 def parse_arguments():
     """Use argparse to get arguments,
@@ -66,6 +70,12 @@ def parse_arguments():
     str_help = "The IP of the next hop to be placed into the update messages."
     parser.add_argument("--nexthop", default="192.0.2.1",
                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
+    str_help = "Identifier of the route originator."
+    parser.add_argument("--originator", default=None,
+                        type=ipaddr.IPv4Address, dest="originator", help=str_help)
+    str_help = "Cluster list item identifier."
+    parser.add_argument("--cluster", default=None,
+                        type=ipaddr.IPv4Address, dest="cluster", help=str_help)
     str_help = ("Numeric IP Address to try to connect to." +
                 "Currently no effect in listening mode.")
     parser.add_argument("--peerip", default="127.0.0.2",
@@ -95,7 +105,12 @@ def parse_arguments():
     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
+    str_help = "How many play utilities are to be started."
+    parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
     arguments = parser.parse_args()
+    if arguments.multiplicity < 1:
+        print "Multiplicity", arguments.multiplicity, "is not positive."
+        raise SystemExit(1)
     # TODO: Are sanity checks (such as asnumber>=0) required?
     return arguments
 
@@ -265,6 +280,8 @@ class MessageGenerator(object):
         self.hold_time_default = args.holdtime  # Local hold time.
         self.bgp_identifier_default = int(args.myip)
         self.next_hop_default = args.nexthop
+        self.originator_id_default = args.originator
+        self.cluster_list_item_default = args.cluster
         self.single_update_default = args.updates == "single"
         self.randomize_updates_default = args.updates == "random"
         self.prefix_count_to_add_default = args.insert
@@ -329,6 +346,8 @@ class MessageGenerator(object):
         logger.info("  My Hold Time: " + str(self.hold_time_default))
         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
         logger.info("  Next Hop: " + str(self.next_hop_default))
+        logger.info("  Originator ID: " + str(self.originator_id_default))
+        logger.info("  Cluster list: " + str(self.cluster_list_item_default))
         logger.info("  Prefix count to be inserted at once: " +
                     str(self.prefix_count_to_add_default))
         logger.info("  Prefix count to be withdrawn at once: " +
@@ -366,6 +385,7 @@ class MessageGenerator(object):
             :return: n/a
         """
         # default values handling
+        # TODO optimize default values handling (use e.g. dicionary.update() approach)
         if file_name is None:
             file_name = self.results_file_name_default
         if threshold is None:
@@ -461,6 +481,7 @@ class MessageGenerator(object):
             Created just as a fame for future generator enhancement.
         """
         # default values handling
+        # TODO optimize default values handling (use e.g. dicionary.update() approach)
         if lowest is None:
             lowest = self.randomize_lowest_default
         if highest is None:
@@ -494,6 +515,7 @@ class MessageGenerator(object):
             :return: list of generated IP address prefixes
         """
         # default values handling
+        # TODO optimize default values handling (use e.g. dicionary.update() approach)
         if slot_size is None:
             slot_size = self.slot_size_default
         if prefix_base is None:
@@ -537,6 +559,7 @@ class MessageGenerator(object):
             Updates global counters.
         """
         # default values handling
+        # TODO optimize default values handling (use e.g. dicionary.update() approach)
         if prefix_count_to_add is None:
             prefix_count_to_add = self.prefix_count_to_add_default
         if prefix_count_to_del is None:
@@ -625,7 +648,8 @@ class MessageGenerator(object):
             :return: encoded OPEN message in HEX
         """
 
-        # Default values handling
+        # default values handling
+        # TODO optimize default values handling (use e.g. dicionary.update() approach)
         if version is None:
             version = self.version_default
         if my_autonomous_system is None:
@@ -681,8 +705,9 @@ class MessageGenerator(object):
             "\x41"  # "32 bit AS Numbers Support"
                     # (see RFC 6793, section 3)
             "\x04"  # Capability value length
-                    # My AS in 32 bit format
-            + struct.pack(">I", my_autonomous_system)
+        )
+        optional_parameter_hex += (
+            struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
         )
         optional_parameters_hex += optional_parameter_hex
 
@@ -744,7 +769,8 @@ class MessageGenerator(object):
 
     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
                        wr_prefix_length=None, nlri_prefix_length=None,
-                       my_autonomous_system=None, next_hop=None):
+                       my_autonomous_system=None, next_hop=None,
+                       originator_id=None, cluster_list_item=None):
         """Generates an UPDATE Message (rfc4271#section-4.3)
 
         Arguments:
@@ -758,7 +784,8 @@ class MessageGenerator(object):
             :return: encoded UPDATE message in HEX
         """
 
-        # Default values handling
+        # default values handling
+        # TODO optimize default values handling (use e.g. dicionary.update() approach)
         if wr_prefixes is None:
             wr_prefixes = self.wr_prefixes_default
         if nlri_prefixes is None:
@@ -771,6 +798,10 @@ class MessageGenerator(object):
             my_autonomous_system = self.my_autonomous_system_default
         if next_hop is None:
             next_hop = self.next_hop_default
+        if originator_id is None:
+            originator_id = self.originator_id_default
+        if cluster_list_item is None:
+            cluster_list_item = self.cluster_list_item_default
 
         # Marker
         marker_hex = "\xFF" * 16
@@ -793,27 +824,46 @@ class MessageGenerator(object):
 
         # TODO: to replace hardcoded string by encoding?
         # Path Attributes
+        path_attributes_hex = ""
         if nlri_prefixes != []:
-            path_attributes_hex = (
+            path_attributes_hex += (
                 "\x40"  # Flags ("Well-Known")
                 "\x01"  # Type (ORIGIN)
                 "\x01"  # Length (1)
                 "\x00"  # Origin: IGP
+            )
+            path_attributes_hex += (
                 "\x40"  # Flags ("Well-Known")
                 "\x02"  # Type (AS_PATH)
                 "\x06"  # Length (6)
                 "\x02"  # AS segment type (AS_SEQUENCE)
                 "\x01"  # AS segment length (1)
-                        # AS segment (4 bytes)
-                + struct.pack(">I", my_autonomous_system) +
+            )
+            my_as_hex = struct.pack(">I", my_autonomous_system)
+            path_attributes_hex += my_as_hex  # AS segment (4 bytes)
+            path_attributes_hex += (
                 "\x40"  # Flags ("Well-Known")
                 "\x03"  # Type (NEXT_HOP)
                 "\x04"  # Length (4)
-                        # IP address of the next hop (4 bytes)
-                + struct.pack(">I", int(next_hop))
             )
-        else:
-            path_attributes_hex = ""
+            next_hop_hex = struct.pack(">I", int(next_hop))
+            path_attributes_hex += (
+                next_hop_hex  # IP address of the next hop (4 bytes)
+            )
+            if originator_id is not None:
+                path_attributes_hex += (
+                    "\x80"  # Flags ("Optional, non-transitive")
+                    "\x09"  # Type (ORIGINATOR_ID)
+                    "\x04"  # Length (4)
+                )           # ORIGINATOR_ID (4 bytes)
+                path_attributes_hex += struct.pack(">I", int(originator_id))
+            if cluster_list_item is not None:
+                path_attributes_hex += (
+                    "\x80"  # Flags ("Optional, non-transitive")
+                    "\x09"  # Type (CLUSTER_LIST)
+                    "\x04"  # Length (4)
+                )           # one CLUSTER_LIST item (4 bytes)
+                path_attributes_hex += struct.pack(">I", int(cluster_list_item))
 
         # Total Path Attributes Length
         total_path_attributes_length = len(path_attributes_hex)
@@ -860,12 +910,19 @@ class MessageGenerator(object):
             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
                          str(wr_prefix_length) + " (0x" +
                          binascii.hexlify(withdrawn_routes_hex) + ")")
-            logger.debug("  Total Path Attributes Length=" +
-                         str(total_path_attributes_length) + " (0x" +
-                         binascii.hexlify(total_path_attributes_length_hex) +
-                         ")")
-            logger.debug("  Path Attributes=" + "(0x" +
-                         binascii.hexlify(path_attributes_hex) + ")")
+            if total_path_attributes_length:
+                logger.debug("  Total Path Attributes Length=" +
+                             str(total_path_attributes_length) + " (0x" +
+                             binascii.hexlify(total_path_attributes_length_hex) + ")")
+                logger.debug("  Path Attributes=" + "(0x" +
+                             binascii.hexlify(path_attributes_hex) + ")")
+                logger.debug("    Origin=IGP")
+                logger.debug("    AS path=" + str(my_autonomous_system))
+                logger.debug("    Next hop=" + str(next_hop))
+                if originator_id is not None:
+                    logger.debug("    Originator id=" + str(originator_id))
+                if cluster_list_item is not None:
+                    logger.debug("    Cluster list=" + str(cluster_list_item))
             logger.debug("  Network Layer Reachability Information=" +
                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
                          " (0x" + binascii.hexlify(nlri_hex) + ")")
@@ -1086,6 +1143,7 @@ class ReadTracker(object):
         self.prefixes_introduced = 0
         self.prefixes_withdrawn = 0
         self.rx_idle_time = 0
+        self.rx_activity_detected = True
 
     def read_message_chunk(self):
         """Read up to one message
@@ -1172,56 +1230,65 @@ class ReadTracker(object):
                 hex_to_decode = hex_to_decode[3 + attr_length:]
 
             if attr_type_code == 1:
-                logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
+                logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
             elif attr_type_code == 2:
-                logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
+                logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
             elif attr_type_code == 3:
-                logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
+                logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
             elif attr_type_code == 4:
-                logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
+                logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
             elif attr_type_code == 5:
-                logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
+                logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
             elif attr_type_code == 6:
-                logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
+                logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
             elif attr_type_code == 7:
-                logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
+                logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
+                             binascii.b2a_hex(attr_flags_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
+            elif attr_type_code == 9:  # rfc4456#section-8
+                logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
+            elif attr_type_code == 10:  # rfc4456#section-8
+                logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
+                             binascii.b2a_hex(attr_flags_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
             elif attr_type_code == 14:  # rfc4760#section-3
-                logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
+                logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
                 address_family_identifier_hex = attr_value_hex[0:2]
-                logger.debug("  Address Family Identifier = 0x%s",
+                logger.debug("  Address Family Identifier=0x%s",
                              binascii.b2a_hex(address_family_identifier_hex))
                 subsequent_address_family_identifier_hex = attr_value_hex[2]
-                logger.debug("  Subsequent Address Family Identifier = 0x%s",
+                logger.debug("  Subsequent Address Family Identifier=0x%s",
                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
                 next_hop_netaddr_len_hex = attr_value_hex[3]
                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
-                logger.debug("  Length of Next Hop Network Address = 0x%s (%s)",
-                             binascii.b2a_hex(next_hop_netaddr_len_hex),
-                             next_hop_netaddr_len)
+                logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
+                             next_hop_netaddr_len,
+                             binascii.b2a_hex(next_hop_netaddr_len_hex))
                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
-                logger.debug("  Network Address of Next Hop = 0x%s",
-                             binascii.b2a_hex(next_hop_netaddr_hex))
+                next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
+                logger.debug("  Network Address of Next Hop=%s (0x%s)",
+                             next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
-                logger.debug("  Reserved = 0x%s",
+                logger.debug("  Reserved=0x%s",
                              binascii.b2a_hex(reserved_hex))
                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
-                logger.debug("  Network Layer Reachability Information = 0x%s",
+                logger.debug("  Network Layer Reachability Information=0x%s",
                              binascii.b2a_hex(nlri_hex))
                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
@@ -1229,17 +1296,17 @@ class ReadTracker(object):
                     logger.debug("  nlri_prefix_received: %s", prefix)
                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
             elif attr_type_code == 15:  # rfc4760#section-4
-                logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
+                logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
                 address_family_identifier_hex = attr_value_hex[0:2]
-                logger.debug("  Address Family Identifier = 0x%s",
+                logger.debug("  Address Family Identifier=0x%s",
                              binascii.b2a_hex(address_family_identifier_hex))
                 subsequent_address_family_identifier_hex = attr_value_hex[2]
-                logger.debug("  Subsequent Address Family Identifier = 0x%s",
+                logger.debug("  Subsequent Address Family Identifier=0x%s",
                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
                 wd_hex = attr_value_hex[3:]
-                logger.debug("  Withdrawn Routes = 0x%s",
+                logger.debug("  Withdrawn Routes=0x%s",
                              binascii.b2a_hex(wd_hex))
                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
                 logger.debug("  Withdrawn routes prefix list: %s",
@@ -1248,9 +1315,9 @@ class ReadTracker(object):
                     logger.debug("  withdrawn_prefix_received: %s", prefix)
                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
             else:
-                logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
+                logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
                              binascii.b2a_hex(attr_flags_hex))
-                logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+                logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
         return None
 
     def decode_update_message(self, msg):
@@ -1293,13 +1360,13 @@ class ReadTracker(object):
                 logger.debug("withdrawn_prefix_received: %s", prefix)
             # total path attribute length
             total_pa_length_offset = 21 + wdr_length
-            total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
+            total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
             logger.debug("Total path attribute lenght: 0x%s (%s)",
                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
             # path attributes
             pa_offset = total_pa_length_offset + 2
-            pa_hex = msg[pa_offset:pa_offset+total_pa_length]
+            pa_hex = msg[pa_offset:pa_offset + total_pa_length]
             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
             self.decode_path_attributes(pa_hex)
             # network layer reachability information length
@@ -1307,7 +1374,7 @@ class ReadTracker(object):
             logger.debug("Calculated NLRI length: %s", nlri_length)
             # network layer reachability information
             nlri_offset = pa_offset + total_pa_length
-            nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
+            nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
@@ -1341,20 +1408,27 @@ class ReadTracker(object):
             wait_timedelta = 0
         # And wait for event or something to read.
 
-        logger.info("total_received_update_message_counter: %s",
-                    self.updates_received)
-        logger.info("total_received_nlri_prefix_counter: %s",
-                    self.prefixes_introduced)
-        logger.info("total_received_withdrawn_prefix_counter: %s",
-                    self.prefixes_withdrawn)
+        if not self.rx_activity_detected or not (self.updates_received % 100):
+            # right time to write statistics to the log (not for every update and
+            # not too frequently to avoid having large log files)
+            logger.info("total_received_update_message_counter: %s",
+                        self.updates_received)
+            logger.info("total_received_nlri_prefix_counter: %s",
+                        self.prefixes_introduced)
+            logger.info("total_received_withdrawn_prefix_counter: %s",
+                        self.prefixes_withdrawn)
 
         start_time = time.time()
         select.select([self.socket], [], [self.socket], wait_timedelta)
         timedelta = time.time() - start_time
         self.rx_idle_time += timedelta
+        self.rx_activity_detected = timedelta < 1
 
-        logger.info("... idle for %.3fs", timedelta)
-        logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
+        if not self.rx_activity_detected or not (self.updates_received % 100):
+            # right time to write statistics to the log (not for every update and
+            # not too frequently to avoid having large log files)
+            logger.info("... idle for %.3fs", timedelta)
+            logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
         return
 
 
@@ -1523,22 +1597,37 @@ class StateTracker(object):
         return
 
 
-if __name__ == "__main__":
-    """ One time initialisation and iterations looping.
+def create_logger(loglevel, logfile):
+    """Create logger object
 
-    Notes:
-        Establish BGP connection and run iterations.
+    Arguments:
+        :loglevel: log level
+        :logfile: log file name
+    Returns:
+        :return: logger object
     """
-    arguments = parse_arguments()
     logger = logging.getLogger("logger")
-    log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
+    log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
     console_handler = logging.StreamHandler()
-    file_handler = logging.FileHandler(arguments.logfile, mode="w")
+    file_handler = logging.FileHandler(logfile, mode="w")
     console_handler.setFormatter(log_formatter)
     file_handler.setFormatter(log_formatter)
     logger.addHandler(console_handler)
     logger.addHandler(file_handler)
-    logger.setLevel(arguments.loglevel)
+    logger.setLevel(loglevel)
+    return logger
+
+
+def job(arguments):
+    """One time initialisation and iterations looping.
+    Notes:
+        Establish BGP connection and run iterations.
+
+    Arguments:
+        :arguments: Command line arguments
+    Returns:
+        :return: None
+    """
     bgp_socket = establish_connection(arguments)
     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
     # Receive open message before sending anything.
@@ -1556,10 +1645,12 @@ if __name__ == "__main__":
     # Using exact keepalive length to not to see possible updates.
     msg_in = bgp_socket.recv(19)
     if msg_in != generator.keepalive_message():
-        logger.error("Open not confirmed by keepalive, instead got " +
-                     binascii.hexlify(msg_in))
-        raise MessageError("Open not confirmed by keepalive, instead got",
-                           msg_in)
+        error_msg = (
+            "Open not confirmed by keepalive, instead got " +
+            binascii.hexlify(msg_in)
+        )
+        logger.error(error_msg)
+        raise MessageError(error_msg)
     timer.reset_peer_hold_time()
     # Send the keepalive to indicate the connection is accepted.
     timer.snapshot()  # Remember this time.
@@ -1572,3 +1663,52 @@ if __name__ == "__main__":
     state = StateTracker(bgp_socket, generator, timer)
     while True:  # main reactor loop
         state.perform_one_loop_iteration()
+
+
+def threaded_job(arguments):
+    """Run the job threaded
+
+    Arguments:
+        :arguments: Command line arguments
+    Returns:
+        :return: None
+    """
+    amount_left = arguments.amount
+    utils_left = arguments.multiplicity
+    prefix_current = arguments.firstprefix
+    myip_current = arguments.myip
+    thread_args = []
+
+    while 1:
+        amount_per_util = (amount_left - 1) / utils_left + 1  # round up
+        amount_left -= amount_per_util
+        utils_left -= 1
+
+        args = deepcopy(arguments)
+        args.amount = amount_per_util
+        args.firstprefix = prefix_current
+        args.myip = myip_current
+        thread_args.append(args)
+
+        if not utils_left:
+            break
+        prefix_current += amount_per_util * 16
+        myip_current += 1
+
+    try:
+        # Create threads
+        for t in thread_args:
+            thread.start_new_thread(job, (t,))
+    except Exception:
+        print "Error: unable to start thread."
+        raise SystemExit(2)
+
+    # Work remains forever
+    while 1:
+        time.sleep(5)
+
+
+if __name__ == "__main__":
+    arguments = parse_arguments()
+    logger = create_logger(arguments.loglevel, arguments.logfile)
+    threaded_job(arguments)