X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tools%2Ffastbgp%2Fplay.py;h=2342224a3ca76b0befbbeed771e5f6017c590583;hb=b760b426641ff5af6f9259e6e21acd101e8f6e3d;hp=304bb8f1f52add7560c43893d58132ee775b2411;hpb=6543fa20aed2795cb984c2bfafea2de0e59ecaa0;p=integration%2Ftest.git diff --git a/tools/fastbgp/play.py b/tools/fastbgp/play.py index 304bb8f1f5..2342224a3c 100755 --- a/tools/fastbgp/play.py +++ b/tools/fastbgp/play.py @@ -11,11 +11,6 @@ EXABGP in this type of scenario.""" # terms of the Eclipse Public License v1.0 which accompanies this distribution, # and is available at http://www.eclipse.org/legal/epl-v10.html -__author__ = "Vratko Polak" -__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc." -__license__ = "Eclipse Public License v1.0" -__email__ = "vrpolak@cisco.com" - import argparse import binascii import ipaddr @@ -25,6 +20,15 @@ import time import logging import struct +import thread +from copy import deepcopy + + +__author__ = "Vratko Polak" +__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc." +__license__ = "Eclipse Public License v1.0" +__email__ = "vrpolak@cisco.com" + def parse_arguments(): """Use argparse to get arguments, @@ -66,6 +70,12 @@ def parse_arguments(): str_help = "The IP of the next hop to be placed into the update messages." parser.add_argument("--nexthop", default="192.0.2.1", type=ipaddr.IPv4Address, dest="nexthop", help=str_help) + str_help = "Identifier of the route originator." + parser.add_argument("--originator", default=None, + type=ipaddr.IPv4Address, dest="originator", help=str_help) + str_help = "Cluster list item identifier." + parser.add_argument("--cluster", default=None, + type=ipaddr.IPv4Address, dest="cluster", help=str_help) str_help = ("Numeric IP Address to try to connect to." + "Currently no effect in listening mode.") parser.add_argument("--peerip", default="127.0.0.2", @@ -95,7 +105,12 @@ def parse_arguments(): parser.add_argument("--threshold", default="1000", type=int, help=str_help) str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported" parser.add_argument("--rfc4760", default="yes", type=str, help=str_help) + str_help = "How many play utilities are to be started." + parser.add_argument("--multiplicity", default="1", type=int, help=str_help) arguments = parser.parse_args() + if arguments.multiplicity < 1: + print "Multiplicity", arguments.multiplicity, "is not positive." + raise SystemExit(1) # TODO: Are sanity checks (such as asnumber>=0) required? return arguments @@ -265,6 +280,8 @@ class MessageGenerator(object): self.hold_time_default = args.holdtime # Local hold time. self.bgp_identifier_default = int(args.myip) self.next_hop_default = args.nexthop + self.originator_id_default = args.originator + self.cluster_list_item_default = args.cluster self.single_update_default = args.updates == "single" self.randomize_updates_default = args.updates == "random" self.prefix_count_to_add_default = args.insert @@ -329,6 +346,8 @@ class MessageGenerator(object): logger.info(" My Hold Time: " + str(self.hold_time_default)) logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default)) logger.info(" Next Hop: " + str(self.next_hop_default)) + logger.info(" Originator ID: " + str(self.originator_id_default)) + logger.info(" Cluster list: " + str(self.cluster_list_item_default)) logger.info(" Prefix count to be inserted at once: " + str(self.prefix_count_to_add_default)) logger.info(" Prefix count to be withdrawn at once: " + @@ -366,6 +385,7 @@ class MessageGenerator(object): :return: n/a """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if file_name is None: file_name = self.results_file_name_default if threshold is None: @@ -461,6 +481,7 @@ class MessageGenerator(object): Created just as a fame for future generator enhancement. """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if lowest is None: lowest = self.randomize_lowest_default if highest is None: @@ -494,6 +515,7 @@ class MessageGenerator(object): :return: list of generated IP address prefixes """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if slot_size is None: slot_size = self.slot_size_default if prefix_base is None: @@ -537,6 +559,7 @@ class MessageGenerator(object): Updates global counters. """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if prefix_count_to_add is None: prefix_count_to_add = self.prefix_count_to_add_default if prefix_count_to_del is None: @@ -625,7 +648,8 @@ class MessageGenerator(object): :return: encoded OPEN message in HEX """ - # Default values handling + # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if version is None: version = self.version_default if my_autonomous_system is None: @@ -681,8 +705,9 @@ class MessageGenerator(object): "\x41" # "32 bit AS Numbers Support" # (see RFC 6793, section 3) "\x04" # Capability value length - # My AS in 32 bit format - + struct.pack(">I", my_autonomous_system) + ) + optional_parameter_hex += ( + struct.pack(">I", my_autonomous_system) # My AS in 32 bit format ) optional_parameters_hex += optional_parameter_hex @@ -744,7 +769,8 @@ class MessageGenerator(object): def update_message(self, wr_prefixes=None, nlri_prefixes=None, wr_prefix_length=None, nlri_prefix_length=None, - my_autonomous_system=None, next_hop=None): + my_autonomous_system=None, next_hop=None, + originator_id=None, cluster_list_item=None): """Generates an UPDATE Message (rfc4271#section-4.3) Arguments: @@ -758,7 +784,8 @@ class MessageGenerator(object): :return: encoded UPDATE message in HEX """ - # Default values handling + # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if wr_prefixes is None: wr_prefixes = self.wr_prefixes_default if nlri_prefixes is None: @@ -771,6 +798,10 @@ class MessageGenerator(object): my_autonomous_system = self.my_autonomous_system_default if next_hop is None: next_hop = self.next_hop_default + if originator_id is None: + originator_id = self.originator_id_default + if cluster_list_item is None: + cluster_list_item = self.cluster_list_item_default # Marker marker_hex = "\xFF" * 16 @@ -793,27 +824,46 @@ class MessageGenerator(object): # TODO: to replace hardcoded string by encoding? # Path Attributes + path_attributes_hex = "" if nlri_prefixes != []: - path_attributes_hex = ( + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x01" # Type (ORIGIN) "\x01" # Length (1) "\x00" # Origin: IGP + ) + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x02" # Type (AS_PATH) "\x06" # Length (6) "\x02" # AS segment type (AS_SEQUENCE) "\x01" # AS segment length (1) - # AS segment (4 bytes) - + struct.pack(">I", my_autonomous_system) + + ) + my_as_hex = struct.pack(">I", my_autonomous_system) + path_attributes_hex += my_as_hex # AS segment (4 bytes) + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x03" # Type (NEXT_HOP) "\x04" # Length (4) - # IP address of the next hop (4 bytes) - + struct.pack(">I", int(next_hop)) ) - else: - path_attributes_hex = "" + next_hop_hex = struct.pack(">I", int(next_hop)) + path_attributes_hex += ( + next_hop_hex # IP address of the next hop (4 bytes) + ) + if originator_id is not None: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x09" # Type (ORIGINATOR_ID) + "\x04" # Length (4) + ) # ORIGINATOR_ID (4 bytes) + path_attributes_hex += struct.pack(">I", int(originator_id)) + if cluster_list_item is not None: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x09" # Type (CLUSTER_LIST) + "\x04" # Length (4) + ) # one CLUSTER_LIST item (4 bytes) + path_attributes_hex += struct.pack(">I", int(cluster_list_item)) # Total Path Attributes Length total_path_attributes_length = len(path_attributes_hex) @@ -860,12 +910,19 @@ class MessageGenerator(object): logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" + str(wr_prefix_length) + " (0x" + binascii.hexlify(withdrawn_routes_hex) + ")") - logger.debug(" Total Path Attributes Length=" + - str(total_path_attributes_length) + " (0x" + - binascii.hexlify(total_path_attributes_length_hex) + - ")") - logger.debug(" Path Attributes=" + "(0x" + - binascii.hexlify(path_attributes_hex) + ")") + if total_path_attributes_length: + logger.debug(" Total Path Attributes Length=" + + str(total_path_attributes_length) + " (0x" + + binascii.hexlify(total_path_attributes_length_hex) + ")") + logger.debug(" Path Attributes=" + "(0x" + + binascii.hexlify(path_attributes_hex) + ")") + logger.debug(" Origin=IGP") + logger.debug(" AS path=" + str(my_autonomous_system)) + logger.debug(" Next hop=" + str(next_hop)) + if originator_id is not None: + logger.debug(" Originator id=" + str(originator_id)) + if cluster_list_item is not None: + logger.debug(" Cluster list=" + str(cluster_list_item)) logger.debug(" Network Layer Reachability Information=" + str(nlri_prefixes) + "/" + str(nlri_prefix_length) + " (0x" + binascii.hexlify(nlri_hex) + ")") @@ -1086,6 +1143,7 @@ class ReadTracker(object): self.prefixes_introduced = 0 self.prefixes_withdrawn = 0 self.rx_idle_time = 0 + self.rx_activity_detected = True def read_message_chunk(self): """Read up to one message @@ -1172,56 +1230,65 @@ class ReadTracker(object): hex_to_decode = hex_to_decode[3 + attr_length:] if attr_type_code == 1: - logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)", + logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 2: - logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)", + logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 3: - logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)", + logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 4: - logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)", + logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 5: - logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)", + logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 6: - logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)", + logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 7: - logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)", + logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 9: # rfc4456#section-8 + logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 10: # rfc4456#section-8 + logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 14: # rfc4760#section-3 - logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)", + logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) address_family_identifier_hex = attr_value_hex[0:2] - logger.debug(" Address Family Identifier = 0x%s", + logger.debug(" Address Family Identifier=0x%s", binascii.b2a_hex(address_family_identifier_hex)) subsequent_address_family_identifier_hex = attr_value_hex[2] - logger.debug(" Subsequent Address Family Identifier = 0x%s", + logger.debug(" Subsequent Address Family Identifier=0x%s", binascii.b2a_hex(subsequent_address_family_identifier_hex)) next_hop_netaddr_len_hex = attr_value_hex[3] next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16) - logger.debug(" Length of Next Hop Network Address = 0x%s (%s)", - binascii.b2a_hex(next_hop_netaddr_len_hex), - next_hop_netaddr_len) + logger.debug(" Length of Next Hop Network Address=%s (0x%s)", + next_hop_netaddr_len, + binascii.b2a_hex(next_hop_netaddr_len_hex)) next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len] - logger.debug(" Network Address of Next Hop = 0x%s", - binascii.b2a_hex(next_hop_netaddr_hex)) + next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)) + logger.debug(" Network Address of Next Hop=%s (0x%s)", + next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex)) reserved_hex = attr_value_hex[4 + next_hop_netaddr_len] - logger.debug(" Reserved = 0x%s", + logger.debug(" Reserved=0x%s", binascii.b2a_hex(reserved_hex)) nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:] - logger.debug(" Network Layer Reachability Information = 0x%s", + logger.debug(" Network Layer Reachability Information=0x%s", binascii.b2a_hex(nlri_hex)) nlri_prefix_list = get_prefix_list_from_hex(nlri_hex) logger.debug(" NLRI prefix list: %s", nlri_prefix_list) @@ -1229,17 +1296,17 @@ class ReadTracker(object): logger.debug(" nlri_prefix_received: %s", prefix) self.prefixes_introduced += len(nlri_prefix_list) # update counter elif attr_type_code == 15: # rfc4760#section-4 - logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)", + logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) address_family_identifier_hex = attr_value_hex[0:2] - logger.debug(" Address Family Identifier = 0x%s", + logger.debug(" Address Family Identifier=0x%s", binascii.b2a_hex(address_family_identifier_hex)) subsequent_address_family_identifier_hex = attr_value_hex[2] - logger.debug(" Subsequent Address Family Identifier = 0x%s", + logger.debug(" Subsequent Address Family Identifier=0x%s", binascii.b2a_hex(subsequent_address_family_identifier_hex)) wd_hex = attr_value_hex[3:] - logger.debug(" Withdrawn Routes = 0x%s", + logger.debug(" Withdrawn Routes=0x%s", binascii.b2a_hex(wd_hex)) wdr_prefix_list = get_prefix_list_from_hex(wd_hex) logger.debug(" Withdrawn routes prefix list: %s", @@ -1248,9 +1315,9 @@ class ReadTracker(object): logger.debug(" withdrawn_prefix_received: %s", prefix) self.prefixes_withdrawn += len(wdr_prefix_list) # update counter else: - logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code, + logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code, binascii.b2a_hex(attr_flags_hex)) - logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) return None def decode_update_message(self, msg): @@ -1293,13 +1360,13 @@ class ReadTracker(object): logger.debug("withdrawn_prefix_received: %s", prefix) # total path attribute length total_pa_length_offset = 21 + wdr_length - total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2] + total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2] total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16) logger.debug("Total path attribute lenght: 0x%s (%s)", binascii.b2a_hex(total_pa_length_hex), total_pa_length) # path attributes pa_offset = total_pa_length_offset + 2 - pa_hex = msg[pa_offset:pa_offset+total_pa_length] + pa_hex = msg[pa_offset:pa_offset + total_pa_length] logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex)) self.decode_path_attributes(pa_hex) # network layer reachability information length @@ -1307,7 +1374,7 @@ class ReadTracker(object): logger.debug("Calculated NLRI length: %s", nlri_length) # network layer reachability information nlri_offset = pa_offset + total_pa_length - nlri_hex = msg[nlri_offset:nlri_offset+nlri_length] + nlri_hex = msg[nlri_offset:nlri_offset + nlri_length] logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex)) nlri_prefix_list = get_prefix_list_from_hex(nlri_hex) logger.debug("NLRI prefix list: %s", nlri_prefix_list) @@ -1341,20 +1408,27 @@ class ReadTracker(object): wait_timedelta = 0 # And wait for event or something to read. - logger.info("total_received_update_message_counter: %s", - self.updates_received) - logger.info("total_received_nlri_prefix_counter: %s", - self.prefixes_introduced) - logger.info("total_received_withdrawn_prefix_counter: %s", - self.prefixes_withdrawn) + if not self.rx_activity_detected or not (self.updates_received % 100): + # right time to write statistics to the log (not for every update and + # not too frequently to avoid having large log files) + logger.info("total_received_update_message_counter: %s", + self.updates_received) + logger.info("total_received_nlri_prefix_counter: %s", + self.prefixes_introduced) + logger.info("total_received_withdrawn_prefix_counter: %s", + self.prefixes_withdrawn) start_time = time.time() select.select([self.socket], [], [self.socket], wait_timedelta) timedelta = time.time() - start_time self.rx_idle_time += timedelta + self.rx_activity_detected = timedelta < 1 - logger.info("... idle for %.3fs", timedelta) - logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time) + if not self.rx_activity_detected or not (self.updates_received % 100): + # right time to write statistics to the log (not for every update and + # not too frequently to avoid having large log files) + logger.info("... idle for %.3fs", timedelta) + logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time) return @@ -1523,22 +1597,37 @@ class StateTracker(object): return -if __name__ == "__main__": - """ One time initialisation and iterations looping. +def create_logger(loglevel, logfile): + """Create logger object - Notes: - Establish BGP connection and run iterations. + Arguments: + :loglevel: log level + :logfile: log file name + Returns: + :return: logger object """ - arguments = parse_arguments() logger = logging.getLogger("logger") - log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s") + log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s") console_handler = logging.StreamHandler() - file_handler = logging.FileHandler(arguments.logfile, mode="w") + file_handler = logging.FileHandler(logfile, mode="w") console_handler.setFormatter(log_formatter) file_handler.setFormatter(log_formatter) logger.addHandler(console_handler) logger.addHandler(file_handler) - logger.setLevel(arguments.loglevel) + logger.setLevel(loglevel) + return logger + + +def job(arguments): + """One time initialisation and iterations looping. + Notes: + Establish BGP connection and run iterations. + + Arguments: + :arguments: Command line arguments + Returns: + :return: None + """ bgp_socket = establish_connection(arguments) # Initial handshake phase. TODO: Can it be also moved to StateTracker? # Receive open message before sending anything. @@ -1556,10 +1645,12 @@ if __name__ == "__main__": # Using exact keepalive length to not to see possible updates. msg_in = bgp_socket.recv(19) if msg_in != generator.keepalive_message(): - logger.error("Open not confirmed by keepalive, instead got " + - binascii.hexlify(msg_in)) - raise MessageError("Open not confirmed by keepalive, instead got", - msg_in) + error_msg = ( + "Open not confirmed by keepalive, instead got " + + binascii.hexlify(msg_in) + ) + logger.error(error_msg) + raise MessageError(error_msg) timer.reset_peer_hold_time() # Send the keepalive to indicate the connection is accepted. timer.snapshot() # Remember this time. @@ -1572,3 +1663,52 @@ if __name__ == "__main__": state = StateTracker(bgp_socket, generator, timer) while True: # main reactor loop state.perform_one_loop_iteration() + + +def threaded_job(arguments): + """Run the job threaded + + Arguments: + :arguments: Command line arguments + Returns: + :return: None + """ + amount_left = arguments.amount + utils_left = arguments.multiplicity + prefix_current = arguments.firstprefix + myip_current = arguments.myip + thread_args = [] + + while 1: + amount_per_util = (amount_left - 1) / utils_left + 1 # round up + amount_left -= amount_per_util + utils_left -= 1 + + args = deepcopy(arguments) + args.amount = amount_per_util + args.firstprefix = prefix_current + args.myip = myip_current + thread_args.append(args) + + if not utils_left: + break + prefix_current += amount_per_util * 16 + myip_current += 1 + + try: + # Create threads + for t in thread_args: + thread.start_new_thread(job, (t,)) + except Exception: + print "Error: unable to start thread." + raise SystemExit(2) + + # Work remains forever + while 1: + time.sleep(5) + + +if __name__ == "__main__": + arguments = parse_arguments() + logger = create_logger(arguments.loglevel, arguments.logfile) + threaded_job(arguments)