X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tools%2Ffastbgp%2Fplay.py;h=dfc4f0ef98af5b17ecd6520a326f68885d9131a0;hb=c1e47dcd89e9a06f125ffb4e7debb66386994f79;hp=c9dba81ecca07754c7b1b37a626f4d36e6ea20af;hpb=50d572c715d37cb1fd852dc402bb31d6c530a18d;p=integration%2Ftest.git diff --git a/tools/fastbgp/play.py b/tools/fastbgp/play.py old mode 100644 new mode 100755 index c9dba81ecc..dfc4f0ef98 --- a/tools/fastbgp/play.py +++ b/tools/fastbgp/play.py @@ -11,11 +11,6 @@ EXABGP in this type of scenario.""" # terms of the Eclipse Public License v1.0 which accompanies this distribution, # and is available at http://www.eclipse.org/legal/epl-v10.html -__author__ = "Vratko Polak" -__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc." -__license__ = "Eclipse Public License v1.0" -__email__ = "vrpolak@cisco.com" - import argparse import binascii import ipaddr @@ -25,6 +20,15 @@ import time import logging import struct +import thread +from copy import deepcopy + + +__author__ = "Vratko Polak" +__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc." +__license__ = "Eclipse Public License v1.0" +__email__ = "vrpolak@cisco.com" + def parse_arguments(): """Use argparse to get arguments, @@ -47,8 +51,8 @@ def parse_arguments(): str_help = "The number of prefixes to process without withdrawals" parser.add_argument("--prefill", default="0", type=int, help=str_help) str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent" - parser.add_argument("--updates", choices=["single", "mixed"], - default=["mixed"], help=str_help) + parser.add_argument("--updates", choices=["single", "separate"], + default=["separate"], help=str_help) str_help = "Base prefix IP address for prefix generation" parser.add_argument("--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help) @@ -66,6 +70,12 @@ def parse_arguments(): str_help = "The IP of the next hop to be placed into the update messages." parser.add_argument("--nexthop", default="192.0.2.1", type=ipaddr.IPv4Address, dest="nexthop", help=str_help) + str_help = "Identifier of the route originator." + parser.add_argument("--originator", default=None, + type=ipaddr.IPv4Address, dest="originator", help=str_help) + str_help = "Cluster list item identifier." + parser.add_argument("--cluster", default=None, + type=ipaddr.IPv4Address, dest="cluster", help=str_help) str_help = ("Numeric IP Address to try to connect to." + "Currently no effect in listening mode.") parser.add_argument("--peerip", default="127.0.0.2", @@ -76,22 +86,55 @@ def parse_arguments(): parser.add_argument("--holdtime", default="180", type=int, help=str_help) str_help = "Log level (--error, --warning, --info, --debug)" parser.add_argument("--error", dest="loglevel", action="store_const", - const=logging.ERROR, default=logging.ERROR, + const=logging.ERROR, default=logging.INFO, help=str_help) parser.add_argument("--warning", dest="loglevel", action="store_const", - const=logging.WARNING, default=logging.ERROR, + const=logging.WARNING, default=logging.INFO, help=str_help) parser.add_argument("--info", dest="loglevel", action="store_const", - const=logging.INFO, default=logging.ERROR, + const=logging.INFO, default=logging.INFO, help=str_help) parser.add_argument("--debug", dest="loglevel", action="store_const", - const=logging.DEBUG, default=logging.ERROR, + const=logging.DEBUG, default=logging.INFO, help=str_help) + str_help = "Log file name" + parser.add_argument("--logfile", default="bgp_peer.log", help=str_help) str_help = "Trailing part of the csv result files for plotting purposes" parser.add_argument("--results", default="bgp.csv", type=str, help=str_help) str_help = "Minimum number of updates to reach to include result into csv." parser.add_argument("--threshold", default="1000", type=int, help=str_help) + str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported" + parser.add_argument("--rfc4760", default=True, type=bool, help=str_help) + str_help = "Link-State NLRI supported" + parser.add_argument("--bgpls", default=False, type=bool, help=str_help) + str_help = "Link-State NLRI: Identifier" + parser.add_argument("-lsid", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: Tunnel ID" + parser.add_argument("-lstid", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: LSP ID" + parser.add_argument("-lspid", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel Sender Address" + parser.add_argument("--lstsaddr", default="1.2.3.4", + type=ipaddr.IPv4Address, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel End Point Address" + parser.add_argument("--lsteaddr", default="5.6.7.8", + type=ipaddr.IPv4Address, help=str_help) + str_help = "Link-State NLRI: Identifier Step" + parser.add_argument("-lsidstep", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: Tunnel ID Step" + parser.add_argument("-lstidstep", default="2", type=int, help=str_help) + str_help = "Link-State NLRI: LSP ID Step" + parser.add_argument("-lspidstep", default="4", type=int, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step" + parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step" + parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help) + str_help = "How many play utilities are to be started." + parser.add_argument("--multiplicity", default="1", type=int, help=str_help) arguments = parser.parse_args() + if arguments.multiplicity < 1: + print "Multiplicity", arguments.multiplicity, "is not positive." + raise SystemExit(1) # TODO: Are sanity checks (such as asnumber>=0) required? return arguments @@ -109,9 +152,9 @@ def establish_connection(arguments): :return: socket. """ if arguments.listen: - logging.info("Connecting in the listening mode.") - logging.debug("Local IP address: " + str(arguments.myip)) - logging.debug("Local port: " + str(arguments.myport)) + logger.info("Connecting in the listening mode.") + logger.debug("Local IP address: " + str(arguments.myip)) + logger.debug("Local port: " + str(arguments.myport)) listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # bind need single tuple as argument @@ -121,11 +164,11 @@ def establish_connection(arguments): # TODO: Verify client IP is cotroller IP. listening_socket.close() else: - logging.info("Connecting in the talking mode.") - logging.debug("Local IP address: " + str(arguments.myip)) - logging.debug("Local port: " + str(arguments.myport)) - logging.debug("Remote IP address: " + str(arguments.peerip)) - logging.debug("Remote port: " + str(arguments.peerport)) + logger.info("Connecting in the talking mode.") + logger.debug("Local IP address: " + str(arguments.myip)) + logger.debug("Local port: " + str(arguments.myport)) + logger.debug("Remote IP address: " + str(arguments.peerip)) + logger.debug("Remote port: " + str(arguments.peerport)) talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # bind to force specified address and port @@ -133,7 +176,7 @@ def establish_connection(arguments): # socket does not spead ipaddr, hence str() talking_socket.connect((str(arguments.peerip), arguments.peerport)) bgp_socket = talking_socket - logging.info("Connected to ODL.") + logger.info("Connected to ODL.") return bgp_socket @@ -154,9 +197,29 @@ def get_short_int_from_message(message, offset=16): return short_int -class MessageError(ValueError): - """Value error with logging optimized for hexlified messages. +def get_prefix_list_from_hex(prefixes_hex): + """Get decoded list of prefixes (rfc4271#section-4.3) + + Arguments: + :prefixes_hex: list of prefixes to be decoded in hex + Returns: + :return: list of prefixes in the form of ip address (X.X.X.X/X) """ + prefix_list = [] + offset = 0 + while offset < len(prefixes_hex): + prefix_bit_len_hex = prefixes_hex[offset] + prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16) + prefix_len = ((prefix_bit_len - 1) / 8) + 1 + prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len] + prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex)) + offset += 1 + prefix_len + prefix_list.append(prefix + "/" + str(prefix_bit_len)) + return prefix_list + + +class MessageError(ValueError): + """Value error with logging optimized for hexlified messages.""" def __init__(self, text, message, *args): """Initialisation. @@ -197,25 +260,28 @@ def read_open_message(bgp_socket): # Some validation. if len(msg_in) < 37: # 37 is minimal length of open message with 4-byte AS number. - logging.error("Got something else than open with 4-byte AS number: " + - binascii.hexlify(msg_in)) - raise MessageError("Got something else than open with 4-byte AS number", - msg_in) + error_msg = ( + "Message length (" + str(len(msg_in)) + ") is smaller than " + "minimal length of OPEN message with 4-byte AS number (37)" + ) + logger.error(error_msg + ": " + binascii.hexlify(msg_in)) + raise MessageError(error_msg, msg_in) # TODO: We could check BGP marker, but it is defined only later; # decide what to do. reported_length = get_short_int_from_message(msg_in) if len(msg_in) != reported_length: - logging.error("Message length is not " + str(reported_length) + - " as stated in " + binascii.hexlify(msg_in)) - raise MessageError("Message length is not " + reported_length + - " as stated in ", msg_in) - logging.info("Open message received.") + error_msg = ( + "Expected message length (" + reported_length + + ") does not match actual length (" + str(len(msg_in)) + ")" + ) + logger.error(error_msg + binascii.hexlify(msg_in)) + raise MessageError(error_msg, msg_in) + logger.info("Open message received.") return msg_in class MessageGenerator(object): - """Class which generates messages, holds states and configuration values. - """ + """Class which generates messages, holds states and configuration values.""" # TODO: Define bgp marker as a class (constant) variable. def __init__(self, args): @@ -243,15 +309,14 @@ class MessageGenerator(object): self.hold_time_default = args.holdtime # Local hold time. self.bgp_identifier_default = int(args.myip) self.next_hop_default = args.nexthop + self.originator_id_default = args.originator + self.cluster_list_item_default = args.cluster self.single_update_default = args.updates == "single" self.randomize_updates_default = args.updates == "random" self.prefix_count_to_add_default = args.insert self.prefix_count_to_del_default = args.withdraw if self.prefix_count_to_del_default < 0: self.prefix_count_to_del_default = 0 - if not self.single_update_default and not self.prefix_count_to_del_default: - self.prefix_count_to_del_default = 1 - # we need some content for the 2nd UPDATE in the iteration if self.prefix_count_to_add_default <= self.prefix_count_to_del_default: # total number of prefixes must grow to avoid infinite test loop self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1 @@ -259,6 +324,22 @@ class MessageGenerator(object): self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill self.results_file_name_default = args.results self.performance_threshold_default = args.threshold + self.rfc4760 = args.rfc4760 + self.bgpls = args.bgpls + # Default values when BGP-LS Attributes are used + if self.bgpls: + self.prefix_count_to_add_default = 1 + self.prefix_count_to_del_default = 0 + self.ls_nlri_default = {"Identifier": args.lsid, + "TunnelID": args.lstid, + "LSPID": args.lspid, + "IPv4TunnelSenderAddress": args.lstsaddr, + "IPv4TunnelEndPointAddress": args.lsteaddr} + self.lsid_step = args.lsidstep + self.lstid_step = args.lstidstep + self.lspid_step = args.lspidstep + self.lstsaddr_step = args.lstsaddrstep + self.lsteaddr_step = args.lsteaddrstep # Default values used for randomized part s1_slots = ((self.total_prefix_amount - self.remaining_prefixes_threshold - 1) / @@ -277,7 +358,6 @@ class MessageGenerator(object): self.prefix_count_to_add_default + 1) self.randomize_lowest_default = s2_first_index self.randomize_highest_default = s2_last_index - # Initialising counters self.phase1_start_time = 0 self.phase1_stop_time = 0 @@ -287,42 +367,52 @@ class MessageGenerator(object): self.phase2_updates_sent = 0 self.updates_sent = 0 - # Needed for the MessageGenerator performance optimization self.log_info = args.loglevel <= logging.INFO self.log_debug = args.loglevel <= logging.DEBUG + """ + Flags needed for the MessageGenerator performance optimization. + Calling logger methods each iteration even with proper log level set + slows down significantly the MessageGenerator performance. + Measured total generation time (1M updates, dry run, error log level): + - logging based on basic logger features: 36,2s + - logging based on advanced logger features (lazy logging): 21,2s + - conditional calling of logger methods enclosed inside condition: 8,6s + """ - logging.info("Generator initialisation") - logging.info(" Target total number of prefixes to be introduced: " + - str(self.total_prefix_amount)) - logging.info(" Prefix base: " + str(self.prefix_base_default) + "/" + - str(self.prefix_length_default)) - logging.info(" My Autonomous System number: " + - str(self.my_autonomous_system_default)) - logging.info(" My Hold Time: " + str(self.hold_time_default)) - logging.info(" My BGP Identifier: " + str(self.bgp_identifier_default)) - logging.info(" Next Hop: " + str(self.next_hop_default)) - logging.info(" Prefix count to be inserted at once: " + - str(self.prefix_count_to_add_default)) - logging.info(" Prefix count to be withdrawn at once: " + - str(self.prefix_count_to_del_default)) - logging.info(" Fast pre-fill up to " + - str(self.total_prefix_amount - - self.remaining_prefixes_threshold) + " prefixes") - logging.info(" Remaining number of prefixes to be processed " + - "in parallel with withdrawals: " + - str(self.remaining_prefixes_threshold)) - logging.debug(" Prefix index range used after pre-fill procedure [" + - str(self.randomize_lowest_default) + ", " + - str(self.randomize_highest_default) + "]") + logger.info("Generator initialisation") + logger.info(" Target total number of prefixes to be introduced: " + + str(self.total_prefix_amount)) + logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" + + str(self.prefix_length_default)) + logger.info(" My Autonomous System number: " + + str(self.my_autonomous_system_default)) + logger.info(" My Hold Time: " + str(self.hold_time_default)) + logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default)) + logger.info(" Next Hop: " + str(self.next_hop_default)) + logger.info(" Originator ID: " + str(self.originator_id_default)) + logger.info(" Cluster list: " + str(self.cluster_list_item_default)) + logger.info(" Prefix count to be inserted at once: " + + str(self.prefix_count_to_add_default)) + logger.info(" Prefix count to be withdrawn at once: " + + str(self.prefix_count_to_del_default)) + logger.info(" Fast pre-fill up to " + + str(self.total_prefix_amount - + self.remaining_prefixes_threshold) + " prefixes") + logger.info(" Remaining number of prefixes to be processed " + + "in parallel with withdrawals: " + + str(self.remaining_prefixes_threshold)) + logger.debug(" Prefix index range used after pre-fill procedure [" + + str(self.randomize_lowest_default) + ", " + + str(self.randomize_highest_default) + "]") if self.single_update_default: - logging.info(" Common single UPDATE will be generated " + - "for both NLRI & WITHDRAWN lists") + logger.info(" Common single UPDATE will be generated " + + "for both NLRI & WITHDRAWN lists") else: - logging.info(" Two separate UPDATEs will be generated " + - "for each NLRI & WITHDRAWN lists") + logger.info(" Two separate UPDATEs will be generated " + + "for each NLRI & WITHDRAWN lists") if self.randomize_updates_default: - logging.info(" Generation of UPDATE messages will be randomized") - logging.info(" Let\"s go ...\n") + logger.info(" Generation of UPDATE messages will be randomized") + logger.info(" Let\'s go ...\n") # TODO: Notification for hold timer expiration can be handy. @@ -338,6 +428,7 @@ class MessageGenerator(object): :return: n/a """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if file_name is None: file_name = self.results_file_name_default if threshold is None: @@ -358,17 +449,17 @@ class MessageGenerator(object): totals2 = None performance2 = None - logging.info("#" * 10 + " Final results " + "#" * 10) - logging.info("Number of iterations: " + str(self.iteration)) - logging.info("Number of UPDATE messages sent in the pre-fill phase: " + - str(self.phase1_updates_sent)) - logging.info("The pre-fill phase duration: " + - str(self.phase1_stop_time - self.phase1_start_time) + "s") - logging.info("Number of UPDATE messages sent in the 2nd test phase: " + - str(self.phase2_updates_sent)) - logging.info("The 2nd test phase duration: " + - str(self.phase2_stop_time - self.phase2_start_time) + "s") - logging.info("Threshold for performance reporting: " + str(threshold)) + logger.info("#" * 10 + " Final results " + "#" * 10) + logger.info("Number of iterations: " + str(self.iteration)) + logger.info("Number of UPDATE messages sent in the pre-fill phase: " + + str(self.phase1_updates_sent)) + logger.info("The pre-fill phase duration: " + + str(self.phase1_stop_time - self.phase1_start_time) + "s") + logger.info("Number of UPDATE messages sent in the 2nd test phase: " + + str(self.phase2_updates_sent)) + logger.info("The 2nd test phase duration: " + + str(self.phase2_stop_time - self.phase2_start_time) + "s") + logger.info("Threshold for performance reporting: " + str(threshold)) # making labels phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) + @@ -412,10 +503,10 @@ class MessageGenerator(object): second_line = second_line[:-2] f.write(first_line + "\n") f.write(second_line + "\n") - logging.info("Performance results of message generator stored in " + - file_name + ':') - logging.info(" " + first_line) - logging.info(" " + second_line) + logger.info("Message generator performance results stored in " + + file_name + ":") + logger.info(" " + first_line) + logger.info(" " + second_line) finally: f.close() @@ -433,6 +524,7 @@ class MessageGenerator(object): Created just as a fame for future generator enhancement. """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if lowest is None: lowest = self.randomize_lowest_default if highest is None: @@ -447,7 +539,27 @@ class MessageGenerator(object): new_index = index return new_index - # Get list of prefixes + def get_ls_nlri_values(self, index): + """Generates LS-NLRI parameters. + http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03 + + Arguments: + :param index: index (iteration) + Returns: + :return: dictionary of LS NLRI parameters and values + """ + # generating list of LS NLRI parameters + identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step + ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step + tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step + lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step + ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step + ls_nlri_values = {"Identifier": identifier, + "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address, + "TunnelID": tunnel_id, "LSPID": lsp_id, + "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address} + return ls_nlri_values + def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None, prefix_len=None, prefix_count=None, randomize=None): """Generates list of IP address prefixes. @@ -466,6 +578,7 @@ class MessageGenerator(object): :return: list of generated IP address prefixes """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if slot_size is None: slot_size = self.slot_size_default if prefix_base is None: @@ -487,11 +600,11 @@ class MessageGenerator(object): indexes.append(prefix_index) prefixes.append(prefix_base + prefix_index * prefix_gap) if self.log_debug: - logging.debug(" Prefix slot index: " + str(slot_index)) - logging.debug(" Prefix slot size: " + str(slot_size)) - logging.debug(" Prefix count: " + str(prefix_count)) - logging.debug(" Prefix indexes: " + str(indexes)) - logging.debug(" Prefix list: " + str(prefixes)) + logger.debug(" Prefix slot index: " + str(slot_index)) + logger.debug(" Prefix slot size: " + str(slot_size)) + logger.debug(" Prefix count: " + str(prefix_count)) + logger.debug(" Prefix indexes: " + str(indexes)) + logger.debug(" Prefix list: " + str(prefixes)) return prefixes def compose_update_message(self, prefix_count_to_add=None, @@ -509,32 +622,33 @@ class MessageGenerator(object): Updates global counters. """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if prefix_count_to_add is None: prefix_count_to_add = self.prefix_count_to_add_default if prefix_count_to_del is None: prefix_count_to_del = self.prefix_count_to_del_default # logging if self.log_info and not (self.iteration % 1000): - logging.info("Iteration: " + str(self.iteration) + - " - total remaining prefixes: " + - str(self.remaining_prefixes)) + logger.info("Iteration: " + str(self.iteration) + + " - total remaining prefixes: " + + str(self.remaining_prefixes)) if self.log_debug: - logging.debug("#" * 10 + " Iteration: " + - str(self.iteration) + " " + "#" * 10) - logging.debug("Remaining prefixes: " + - str(self.remaining_prefixes)) + logger.debug("#" * 10 + " Iteration: " + + str(self.iteration) + " " + "#" * 10) + logger.debug("Remaining prefixes: " + + str(self.remaining_prefixes)) # scenario type & one-shot counter straightforward_scenario = (self.remaining_prefixes > self.remaining_prefixes_threshold) if straightforward_scenario: prefix_count_to_del = 0 if self.log_debug: - logging.debug("--- STARAIGHTFORWARD SCENARIO ---") + logger.debug("--- STARAIGHTFORWARD SCENARIO ---") if not self.phase1_start_time: self.phase1_start_time = time.time() else: if self.log_debug: - logging.debug("--- COMBINED SCENARIO ---") + logger.debug("--- COMBINED SCENARIO ---") if not self.phase2_start_time: self.phase2_start_time = time.time() # tailor the number of prefixes if needed @@ -546,28 +660,34 @@ class MessageGenerator(object): slot_index_to_del = slot_index_to_add - self.slot_gap_default # getting lists of prefixes for insertion in this iteration if self.log_debug: - logging.debug("Prefixes to be inserted in this iteration:") + logger.debug("Prefixes to be inserted in this iteration:") prefix_list_to_add = self.get_prefix_list(slot_index_to_add, prefix_count=prefix_count_to_add) # getting lists of prefixes for withdrawal in this iteration if self.log_debug: - logging.debug("Prefixes to be withdrawn in this iteration:") + logger.debug("Prefixes to be withdrawn in this iteration:") prefix_list_to_del = self.get_prefix_list(slot_index_to_del, prefix_count=prefix_count_to_del) - # generating the mesage - if self.single_update_default: - # Send prefixes to be introduced and withdrawn - # in one UPDATE message - msg_out = self.update_message(wr_prefixes=prefix_list_to_del, - nlri_prefixes=prefix_list_to_add) + # generating the UPDATE mesage with LS-NLRI only + if self.bgpls: + ls_nlri = self.get_ls_nlri_values(self.iteration) + msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[], + **ls_nlri) else: - # Send prefixes to be introduced and withdrawn - # in separate UPDATE messages (if needed) - msg_out = self.update_message(wr_prefixes=[], - nlri_prefixes=prefix_list_to_add) - if prefix_count_to_del: - msg_out += self.update_message(wr_prefixes=prefix_list_to_del, - nlri_prefixes=[]) + # generating the UPDATE message with prefix lists + if self.single_update_default: + # Send prefixes to be introduced and withdrawn + # in one UPDATE message + msg_out = self.update_message(wr_prefixes=prefix_list_to_del, + nlri_prefixes=prefix_list_to_add) + else: + # Send prefixes to be introduced and withdrawn + # in separate UPDATE messages (if needed) + msg_out = self.update_message(wr_prefixes=[], + nlri_prefixes=prefix_list_to_add) + if prefix_count_to_del: + msg_out += self.update_message(wr_prefixes=prefix_list_to_del, + nlri_prefixes=[]) # updating counters - who knows ... maybe I am last time here ;) if straightforward_scenario: self.phase1_stop_time = time.time() @@ -597,7 +717,8 @@ class MessageGenerator(object): :return: encoded OPEN message in HEX """ - # Default values handling + # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if version is None: version = self.version_default if my_autonomous_system is None: @@ -633,24 +754,44 @@ class MessageGenerator(object): bgp_identifier_hex = struct.pack(">I", bgp_identifier) # Optional Parameters - optional_parameters_hex = ( - "\x02" # Param type ("Capability Ad") - "\x06" # Length (6 bytes) - "\x01" # Capability type (NLRI Unicast), - # see RFC 4760, secton 8 - "\x04" # Capability value length - "\x00\x01" # AFI (Ipv4) - "\x00" # (reserved) - "\x01" # SAFI (Unicast) + optional_parameters_hex = "" + if self.rfc4760: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Capability type (NLRI Unicast), + # see RFC 4760, secton 8 + "\x04" # Capability value length + "\x00\x01" # AFI (Ipv4) + "\x00" # (reserved) + "\x01" # SAFI (Unicast) + ) + optional_parameters_hex += optional_parameter_hex + + if self.bgpls: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Capability type (NLRI Unicast), + # see RFC 4760, secton 8 + "\x04" # Capability value length + "\x40\x04" # AFI (BGP-LS) + "\x00" # (reserved) + "\x47" # SAFI (BGP-LS) + ) + optional_parameters_hex += optional_parameter_hex + optional_parameter_hex = ( "\x02" # Param type ("Capability Ad") "\x06" # Length (6 bytes) "\x41" # "32 bit AS Numbers Support" # (see RFC 6793, section 3) "\x04" # Capability value length - # My AS in 32 bit format - + struct.pack(">I", my_autonomous_system) ) + optional_parameter_hex += ( + struct.pack(">I", my_autonomous_system) # My AS in 32 bit format + ) + optional_parameters_hex += optional_parameter_hex # Optional Parameters Length optional_parameters_length = len(optional_parameters_hex) @@ -681,36 +822,38 @@ class MessageGenerator(object): ) if self.log_debug: - logging.debug("OPEN Message encoding") - logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) - logging.debug(" Length=" + str(length) + " (0x" + - binascii.hexlify(length_hex) + ")") - logging.debug(" Type=" + str(type) + " (0x" + - binascii.hexlify(type_hex) + ")") - logging.debug(" Version=" + str(version) + " (0x" + - binascii.hexlify(version_hex) + ")") - logging.debug(" My Autonomous System=" + - str(my_autonomous_system_2_bytes) + " (0x" + - binascii.hexlify(my_autonomous_system_hex_2_bytes) + - ")") - logging.debug(" Hold Time=" + str(hold_time) + " (0x" + - binascii.hexlify(hold_time_hex) + ")") - logging.debug(" BGP Identifier=" + str(bgp_identifier) + - " (0x" + binascii.hexlify(bgp_identifier_hex) + ")") - logging.debug(" Optional Parameters Length=" + - str(optional_parameters_length) + " (0x" + - binascii.hexlify(optional_parameters_length_hex) + - ")") - logging.debug(" Optional Parameters=0x" + - binascii.hexlify(optional_parameters_hex)) - logging.debug(" OPEN Message encoded: 0x" + - binascii.b2a_hex(message_hex)) + logger.debug("OPEN message encoding") + logger.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logger.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logger.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logger.debug(" Version=" + str(version) + " (0x" + + binascii.hexlify(version_hex) + ")") + logger.debug(" My Autonomous System=" + + str(my_autonomous_system_2_bytes) + " (0x" + + binascii.hexlify(my_autonomous_system_hex_2_bytes) + + ")") + logger.debug(" Hold Time=" + str(hold_time) + " (0x" + + binascii.hexlify(hold_time_hex) + ")") + logger.debug(" BGP Identifier=" + str(bgp_identifier) + + " (0x" + binascii.hexlify(bgp_identifier_hex) + ")") + logger.debug(" Optional Parameters Length=" + + str(optional_parameters_length) + " (0x" + + binascii.hexlify(optional_parameters_length_hex) + + ")") + logger.debug(" Optional Parameters=0x" + + binascii.hexlify(optional_parameters_hex)) + logger.debug("OPEN message encoded: 0x%s", + binascii.b2a_hex(message_hex)) return message_hex def update_message(self, wr_prefixes=None, nlri_prefixes=None, wr_prefix_length=None, nlri_prefix_length=None, - my_autonomous_system=None, next_hop=None): + my_autonomous_system=None, next_hop=None, + originator_id=None, cluster_list_item=None, + end_of_rib=False, **ls_nlri_params): """Generates an UPDATE Message (rfc4271#section-4.3) Arguments: @@ -724,7 +867,8 @@ class MessageGenerator(object): :return: encoded UPDATE message in HEX """ - # Default values handling + # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if wr_prefixes is None: wr_prefixes = self.wr_prefixes_default if nlri_prefixes is None: @@ -737,6 +881,12 @@ class MessageGenerator(object): my_autonomous_system = self.my_autonomous_system_default if next_hop is None: next_hop = self.next_hop_default + if originator_id is None: + originator_id = self.originator_id_default + if cluster_list_item is None: + cluster_list_item = self.cluster_list_item_default + ls_nlri = self.ls_nlri_default.copy() + ls_nlri.update(ls_nlri_params) # Marker marker_hex = "\xFF" * 16 @@ -746,12 +896,13 @@ class MessageGenerator(object): type_hex = struct.pack("B", type) # Withdrawn Routes - bytes = ((wr_prefix_length - 1) / 8) + 1 withdrawn_routes_hex = "" - for prefix in wr_prefixes: - withdrawn_route_hex = (struct.pack("B", wr_prefix_length) + - struct.pack(">I", int(prefix))[:bytes]) - withdrawn_routes_hex += withdrawn_route_hex + if not self.bgpls: + bytes = ((wr_prefix_length - 1) / 8) + 1 + for prefix in wr_prefixes: + withdrawn_route_hex = (struct.pack("B", wr_prefix_length) + + struct.pack(">I", int(prefix))[:bytes]) + withdrawn_routes_hex += withdrawn_route_hex # Withdrawn Routes Length withdrawn_routes_length = len(withdrawn_routes_hex) @@ -759,39 +910,81 @@ class MessageGenerator(object): # TODO: to replace hardcoded string by encoding? # Path Attributes + path_attributes_hex = "" if nlri_prefixes != []: - path_attributes_hex = ( + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x01" # Type (ORIGIN) "\x01" # Length (1) "\x00" # Origin: IGP + ) + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x02" # Type (AS_PATH) "\x06" # Length (6) "\x02" # AS segment type (AS_SEQUENCE) "\x01" # AS segment length (1) - # AS segment (4 bytes) - + struct.pack(">I", my_autonomous_system) + + ) + my_as_hex = struct.pack(">I", my_autonomous_system) + path_attributes_hex += my_as_hex # AS segment (4 bytes) + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x03" # Type (NEXT_HOP) "\x04" # Length (4) - # IP address of the next hop (4 bytes) - + struct.pack(">I", int(next_hop)) ) - else: - path_attributes_hex = "" + next_hop_hex = struct.pack(">I", int(next_hop)) + path_attributes_hex += ( + next_hop_hex # IP address of the next hop (4 bytes) + ) + if originator_id is not None: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x09" # Type (ORIGINATOR_ID) + "\x04" # Length (4) + ) # ORIGINATOR_ID (4 bytes) + path_attributes_hex += struct.pack(">I", int(originator_id)) + if cluster_list_item is not None: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x09" # Type (CLUSTER_LIST) + "\x04" # Length (4) + ) # one CLUSTER_LIST item (4 bytes) + path_attributes_hex += struct.pack(">I", int(cluster_list_item)) + + if self.bgpls and not end_of_rib: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x0e" # Type (MP_REACH_NLRI) + "\x22" # Length (34) + "\x40\x04" # AFI (BGP-LS) + "\x47" # SAFI (BGP-LS) + "\x04" # Next Hop Length (4) + ) + path_attributes_hex += struct.pack(">I", int(next_hop)) + path_attributes_hex += "\x00" # Reserved + path_attributes_hex += ( + "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI) + "\x00\x15" # LS-NLRI.TotalNLRILength (21) + "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE) + ) + path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"])) + path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"])) + path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"])) + path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"])) + path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"])) # Total Path Attributes Length total_path_attributes_length = len(path_attributes_hex) total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length) # Network Layer Reachability Information - bytes = ((nlri_prefix_length - 1) / 8) + 1 nlri_hex = "" - for prefix in nlri_prefixes: - nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) + - struct.pack(">I", int(prefix))[:bytes]) - nlri_hex += nlri_prefix_hex + if not self.bgpls: + bytes = ((nlri_prefix_length - 1) / 8) + 1 + for prefix in nlri_prefixes: + nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) + + struct.pack(">I", int(prefix))[:bytes]) + nlri_hex += nlri_prefix_hex # Length (big-endian) length = ( @@ -814,29 +1007,38 @@ class MessageGenerator(object): ) if self.log_debug: - logging.debug("UPDATE Message encoding") - logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) - logging.debug(" Length=" + str(length) + " (0x" + - binascii.hexlify(length_hex) + ")") - logging.debug(" Type=" + str(type) + " (0x" + - binascii.hexlify(type_hex) + ")") - logging.debug(" withdrawn_routes_length=" + - str(withdrawn_routes_length) + " (0x" + - binascii.hexlify(withdrawn_routes_length_hex) + ")") - logging.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" + - str(wr_prefix_length) + " (0x" + - binascii.hexlify(withdrawn_routes_hex) + ")") - logging.debug(" Total Path Attributes Length=" + - str(total_path_attributes_length) + " (0x" + - binascii.hexlify(total_path_attributes_length_hex) + - ")") - logging.debug(" Path Attributes=" + "(0x" + - binascii.hexlify(path_attributes_hex) + ")") - logging.debug(" Network Layer Reachability Information=" + - str(nlri_prefixes) + "/" + str(nlri_prefix_length) + - " (0x" + binascii.hexlify(nlri_hex) + ")") - logging.debug(" UPDATE Message encoded: 0x" + - binascii.b2a_hex(message_hex)) + logger.debug("UPDATE message encoding") + logger.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logger.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logger.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logger.debug(" withdrawn_routes_length=" + + str(withdrawn_routes_length) + " (0x" + + binascii.hexlify(withdrawn_routes_length_hex) + ")") + logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" + + str(wr_prefix_length) + " (0x" + + binascii.hexlify(withdrawn_routes_hex) + ")") + if total_path_attributes_length: + logger.debug(" Total Path Attributes Length=" + + str(total_path_attributes_length) + " (0x" + + binascii.hexlify(total_path_attributes_length_hex) + ")") + logger.debug(" Path Attributes=" + "(0x" + + binascii.hexlify(path_attributes_hex) + ")") + logger.debug(" Origin=IGP") + logger.debug(" AS path=" + str(my_autonomous_system)) + logger.debug(" Next hop=" + str(next_hop)) + if originator_id is not None: + logger.debug(" Originator id=" + str(originator_id)) + if cluster_list_item is not None: + logger.debug(" Cluster list=" + str(cluster_list_item)) + if self.bgpls: + logger.debug(" MP_REACH_NLRI: %s", ls_nlri) + logger.debug(" Network Layer Reachability Information=" + + str(nlri_prefixes) + "/" + str(nlri_prefix_length) + + " (0x" + binascii.hexlify(nlri_hex) + ")") + logger.debug("UPDATE message encoded: 0x" + + binascii.b2a_hex(message_hex)) # updating counter self.updates_sent += 1 @@ -883,19 +1085,19 @@ class MessageGenerator(object): ) if self.log_debug: - logging.debug("NOTIFICATION Message encoding") - logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) - logging.debug(" Length=" + str(length) + " (0x" + - binascii.hexlify(length_hex) + ")") - logging.debug(" Type=" + str(type) + " (0x" + - binascii.hexlify(type_hex) + ")") - logging.debug(" Error Code=" + str(error_code) + " (0x" + - binascii.hexlify(error_code_hex) + ")") - logging.debug(" Error Subode=" + str(error_subcode) + " (0x" + - binascii.hexlify(error_subcode_hex) + ")") - logging.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")") - logging.debug(" NOTIFICATION Message encoded: 0x" + - binascii.b2a_hex(message_hex)) + logger.debug("NOTIFICATION message encoding") + logger.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logger.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logger.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logger.debug(" Error Code=" + str(error_code) + " (0x" + + binascii.hexlify(error_code_hex) + ")") + logger.debug(" Error Subode=" + str(error_subcode) + " (0x" + + binascii.hexlify(error_subcode_hex) + ")") + logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")") + logger.debug("NOTIFICATION message encoded: 0x%s", + binascii.b2a_hex(message_hex)) return message_hex @@ -925,14 +1127,14 @@ class MessageGenerator(object): ) if self.log_debug: - logging.debug("KEEP ALIVE Message encoding") - logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) - logging.debug(" Length=" + str(length) + " (0x" + - binascii.hexlify(length_hex) + ")") - logging.debug(" Type=" + str(type) + " (0x" + - binascii.hexlify(type_hex) + ")") - logging.debug(" KEEP ALIVE Message encoded: 0x" + - binascii.b2a_hex(message_hex)) + logger.debug("KEEP ALIVE message encoding") + logger.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logger.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logger.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logger.debug("KEEP ALIVE message encoded: 0x%s", + binascii.b2a_hex(message_hex)) return message_hex @@ -962,7 +1164,7 @@ class TimeTracker(object): if hold_timedelta > peer_hold_timedelta: hold_timedelta = peer_hold_timedelta if hold_timedelta != 0 and hold_timedelta < 3: - logging.error("Invalid hold timedelta value: " + str(hold_timedelta)) + logger.error("Invalid hold timedelta value: " + str(hold_timedelta)) raise ValueError("Invalid hold timedelta value: ", hold_timedelta) self.hold_timedelta = hold_timedelta # If we do not hear from peer this long, we assume it has died. @@ -1015,7 +1217,7 @@ class TimeTracker(object): if self.hold_timedelta != 0: # time.time() may be too strict if snapshot_time > self.peer_hold_time: - logging.error("Peer has overstepped the hold timer.") + logger.error("Peer has overstepped the hold timer.") raise RuntimeError("Peer has overstepped the hold timer.") # TODO: Include hold_timedelta? # TODO: Add notification sending (attempt). That means @@ -1047,6 +1249,12 @@ class ReadTracker(object): self.bytes_to_read = self.header_length # Incremental buffer for message under read. self.msg_in = "" + # Initialising counters + self.updates_received = 0 + self.prefixes_introduced = 0 + self.prefixes_withdrawn = 0 + self.rx_idle_time = 0 + self.rx_activity_detected = True def read_message_chunk(self): """Read up to one message @@ -1066,7 +1274,8 @@ class ReadTracker(object): # The logical block was a BGP header. # Now we know the size of the message. self.reading_header = False - self.bytes_to_read = get_short_int_from_message(self.msg_in) + self.bytes_to_read = (get_short_int_from_message(self.msg_in) - + self.header_length) else: # We have finished reading the body of the message. # Peer has just proven it is still alive. self.timer.reset_peer_hold_time() @@ -1075,6 +1284,23 @@ class ReadTracker(object): # TODO: Should we do validation and exit on anything # besides update or keepalive? # Prepare state for reading another message. + message_type_hex = self.msg_in[self.header_length] + if message_type_hex == "\x01": + logger.info("OPEN message received: 0x%s", + binascii.b2a_hex(self.msg_in)) + elif message_type_hex == "\x02": + logger.debug("UPDATE message received: 0x%s", + binascii.b2a_hex(self.msg_in)) + self.decode_update_message(self.msg_in) + elif message_type_hex == "\x03": + logger.info("NOTIFICATION message received: 0x%s", + binascii.b2a_hex(self.msg_in)) + elif message_type_hex == "\x04": + logger.info("KEEP ALIVE message received: 0x%s", + binascii.b2a_hex(self.msg_in)) + else: + logger.warning("Unexpected message received: 0x%s", + binascii.b2a_hex(self.msg_in)) self.msg_in = "" self.reading_header = True self.bytes_to_read = self.header_length @@ -1082,6 +1308,197 @@ class ReadTracker(object): # something right now. return + def decode_path_attributes(self, path_attributes_hex): + """Decode the Path Attributes field (rfc4271#section-4.3) + + Arguments: + :path_attributes: path_attributes field to be decoded in hex + Returns: + :return: None + """ + hex_to_decode = path_attributes_hex + + while len(hex_to_decode): + attr_flags_hex = hex_to_decode[0] + attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16) +# attr_optional_bit = attr_flags & 128 +# attr_transitive_bit = attr_flags & 64 +# attr_partial_bit = attr_flags & 32 + attr_extended_length_bit = attr_flags & 16 + + attr_type_code_hex = hex_to_decode[1] + attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16) + + if attr_extended_length_bit: + attr_length_hex = hex_to_decode[2:4] + attr_length = int(binascii.b2a_hex(attr_length_hex), 16) + attr_value_hex = hex_to_decode[4:4 + attr_length] + hex_to_decode = hex_to_decode[4 + attr_length:] + else: + attr_length_hex = hex_to_decode[2] + attr_length = int(binascii.b2a_hex(attr_length_hex), 16) + attr_value_hex = hex_to_decode[3:3 + attr_length] + hex_to_decode = hex_to_decode[3 + attr_length:] + + if attr_type_code == 1: + logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 2: + logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 3: + logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 4: + logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 5: + logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 6: + logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 7: + logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 9: # rfc4456#section-8 + logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 10: # rfc4456#section-8 + logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 14: # rfc4760#section-3 + logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + address_family_identifier_hex = attr_value_hex[0:2] + logger.debug(" Address Family Identifier=0x%s", + binascii.b2a_hex(address_family_identifier_hex)) + subsequent_address_family_identifier_hex = attr_value_hex[2] + logger.debug(" Subsequent Address Family Identifier=0x%s", + binascii.b2a_hex(subsequent_address_family_identifier_hex)) + next_hop_netaddr_len_hex = attr_value_hex[3] + next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16) + logger.debug(" Length of Next Hop Network Address=%s (0x%s)", + next_hop_netaddr_len, + binascii.b2a_hex(next_hop_netaddr_len_hex)) + next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len] + next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)) + logger.debug(" Network Address of Next Hop=%s (0x%s)", + next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex)) + reserved_hex = attr_value_hex[4 + next_hop_netaddr_len] + logger.debug(" Reserved=0x%s", + binascii.b2a_hex(reserved_hex)) + nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:] + logger.debug(" Network Layer Reachability Information=0x%s", + binascii.b2a_hex(nlri_hex)) + nlri_prefix_list = get_prefix_list_from_hex(nlri_hex) + logger.debug(" NLRI prefix list: %s", nlri_prefix_list) + for prefix in nlri_prefix_list: + logger.debug(" nlri_prefix_received: %s", prefix) + self.prefixes_introduced += len(nlri_prefix_list) # update counter + elif attr_type_code == 15: # rfc4760#section-4 + logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + address_family_identifier_hex = attr_value_hex[0:2] + logger.debug(" Address Family Identifier=0x%s", + binascii.b2a_hex(address_family_identifier_hex)) + subsequent_address_family_identifier_hex = attr_value_hex[2] + logger.debug(" Subsequent Address Family Identifier=0x%s", + binascii.b2a_hex(subsequent_address_family_identifier_hex)) + wd_hex = attr_value_hex[3:] + logger.debug(" Withdrawn Routes=0x%s", + binascii.b2a_hex(wd_hex)) + wdr_prefix_list = get_prefix_list_from_hex(wd_hex) + logger.debug(" Withdrawn routes prefix list: %s", + wdr_prefix_list) + for prefix in wdr_prefix_list: + logger.debug(" withdrawn_prefix_received: %s", prefix) + self.prefixes_withdrawn += len(wdr_prefix_list) # update counter + else: + logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code, + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + return None + + def decode_update_message(self, msg): + """Decode an UPDATE message (rfc4271#section-4.3) + + Arguments: + :msg: message to be decoded in hex + Returns: + :return: None + """ + logger.debug("Decoding update message:") + # message header - marker + marker_hex = msg[:16] + logger.debug("Message header marker: 0x%s", + binascii.b2a_hex(marker_hex)) + # message header - message length + msg_length_hex = msg[16:18] + msg_length = int(binascii.b2a_hex(msg_length_hex), 16) + logger.debug("Message lenght: 0x%s (%s)", + binascii.b2a_hex(msg_length_hex), msg_length) + # message header - message type + msg_type_hex = msg[18:19] + msg_type = int(binascii.b2a_hex(msg_type_hex), 16) + if msg_type == 2: + logger.debug("Message type: 0x%s (update)", + binascii.b2a_hex(msg_type_hex)) + # withdrawn routes length + wdr_length_hex = msg[19:21] + wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16) + logger.debug("Withdrawn routes lenght: 0x%s (%s)", + binascii.b2a_hex(wdr_length_hex), wdr_length) + # withdrawn routes + wdr_hex = msg[21:21 + wdr_length] + logger.debug("Withdrawn routes: 0x%s", + binascii.b2a_hex(wdr_hex)) + wdr_prefix_list = get_prefix_list_from_hex(wdr_hex) + logger.debug("Withdrawn routes prefix list: %s", + wdr_prefix_list) + for prefix in wdr_prefix_list: + logger.debug("withdrawn_prefix_received: %s", prefix) + # total path attribute length + total_pa_length_offset = 21 + wdr_length + total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2] + total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16) + logger.debug("Total path attribute lenght: 0x%s (%s)", + binascii.b2a_hex(total_pa_length_hex), total_pa_length) + # path attributes + pa_offset = total_pa_length_offset + 2 + pa_hex = msg[pa_offset:pa_offset + total_pa_length] + logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex)) + self.decode_path_attributes(pa_hex) + # network layer reachability information length + nlri_length = msg_length - 23 - total_pa_length - wdr_length + logger.debug("Calculated NLRI length: %s", nlri_length) + # network layer reachability information + nlri_offset = pa_offset + total_pa_length + nlri_hex = msg[nlri_offset:nlri_offset + nlri_length] + logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex)) + nlri_prefix_list = get_prefix_list_from_hex(nlri_hex) + logger.debug("NLRI prefix list: %s", nlri_prefix_list) + for prefix in nlri_prefix_list: + logger.debug("nlri_prefix_received: %s", prefix) + # Updating counters + self.updates_received += 1 + self.prefixes_introduced += len(nlri_prefix_list) + self.prefixes_withdrawn += len(wdr_prefix_list) + else: + logger.error("Unexpeced message type 0x%s in 0x%s", + binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg)) + def wait_for_read(self): """Read message until timeout (next expected event). @@ -1092,7 +1509,7 @@ class ReadTracker(object): # Compute time to the first predictable state change event_time = self.timer.get_next_event_time() # snapshot_time would be imprecise - wait_timedelta = event_time - time.time() + wait_timedelta = min(event_time - time.time(), 10) if wait_timedelta < 0: # The program got around to waiting to an event in "very near # future" so late that it became a "past" event, thus tell @@ -1101,14 +1518,33 @@ class ReadTracker(object): # select.error("Invalid parameter") (for everything else). wait_timedelta = 0 # And wait for event or something to read. + + if not self.rx_activity_detected or not (self.updates_received % 100): + # right time to write statistics to the log (not for every update and + # not too frequently to avoid having large log files) + logger.info("total_received_update_message_counter: %s", + self.updates_received) + logger.info("total_received_nlri_prefix_counter: %s", + self.prefixes_introduced) + logger.info("total_received_withdrawn_prefix_counter: %s", + self.prefixes_withdrawn) + + start_time = time.time() select.select([self.socket], [], [self.socket], wait_timedelta) - # Not checking anything, that will be done in next iteration. + timedelta = time.time() - start_time + self.rx_idle_time += timedelta + self.rx_activity_detected = timedelta < 1 + + if not self.rx_activity_detected or not (self.updates_received % 100): + # right time to write statistics to the log (not for every update and + # not too frequently to avoid having large log files) + logger.info("... idle for %.3fs", timedelta) + logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time) return class WriteTracker(object): - """Class tracking enqueueing messages and sending chunks of them. - """ + """Class tracking enqueueing messages and sending chunks of them.""" def __init__(self, bgp_socket, generator, timer): """The writter initialisation. @@ -1204,6 +1640,7 @@ class StateTracker(object): if not self.writer.sending_message: # We need to schedule a keepalive ASAP. self.writer.enqueue_message_for_sending(self.generator.keepalive_message()) + logger.info("KEEP ALIVE is sent.") # We are sending a message now, so let's prioritize it. self.prioritize_writing = True # Now we know what our priorities are, we have to check @@ -1216,7 +1653,7 @@ class StateTracker(object): # Lists are unpacked, each is either [] or [self.socket], # so we will test them as boolean. if except_list: - logging.error("Exceptional state on the socket.") + logger.error("Exceptional state on the socket.") raise RuntimeError("Exceptional state on socket", self.socket) # We will do either read or write. if not (self.prioritize_writing and write_list): @@ -1250,38 +1687,59 @@ class StateTracker(object): if not self.generator.remaining_prefixes: # We have just finished update generation, # end-of-rib is due. - logging.info("All update messages generated.") - logging.info("Storing performance results.") + logger.info("All update messages generated.") + logger.info("Storing performance results.") self.generator.store_results() - logging.info("Finally an END-OF-RIB is going to be sent.") + logger.info("Finally an END-OF-RIB is sent.") msg_out += self.generator.update_message(wr_prefixes=[], - nlri_prefixes=[]) + nlri_prefixes=[], + end_of_rib=True) self.writer.enqueue_message_for_sending(msg_out) # Attempt for real sending to be done in next iteration. return - # Nothing to write anymore, except occasional keepalives. - logging.info("Everything has been done." + - "Now just waiting for possible incomming message.") + # Nothing to write anymore. # To avoid busy loop, we do idle waiting here. self.reader.wait_for_read() return # We can neither read nor write. - logging.warning("Input and output both blocked for " + - str(self.timer.report_timedelta) + " seconds.") + logger.warning("Input and output both blocked for " + + str(self.timer.report_timedelta) + " seconds.") # FIXME: Are we sure select has been really waiting # the whole period? return -def main(): - """ One time initialisation and iterations looping. +def create_logger(loglevel, logfile): + """Create logger object + Arguments: + :loglevel: log level + :logfile: log file name + Returns: + :return: logger object + """ + logger = logging.getLogger("logger") + log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s") + console_handler = logging.StreamHandler() + file_handler = logging.FileHandler(logfile, mode="w") + console_handler.setFormatter(log_formatter) + file_handler.setFormatter(log_formatter) + logger.addHandler(console_handler) + logger.addHandler(file_handler) + logger.setLevel(loglevel) + return logger + + +def job(arguments): + """One time initialisation and iterations looping. Notes: Establish BGP connection and run iterations. + + Arguments: + :arguments: Command line arguments + Returns: + :return: None """ - arguments = parse_arguments() - logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", - level=arguments.loglevel) bgp_socket = establish_connection(arguments) # Initial handshake phase. TODO: Can it be also moved to StateTracker? # Receive open message before sending anything. @@ -1291,7 +1749,7 @@ def main(): timer = TimeTracker(msg_in) generator = MessageGenerator(arguments) msg_out = generator.open_message() - logging.debug("Sending the OPEN message: " + binascii.hexlify(msg_out)) + logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out)) # Send our open message to the peer. bgp_socket.send(msg_out) # Wait for confirming keepalive. @@ -1299,15 +1757,14 @@ def main(): # Using exact keepalive length to not to see possible updates. msg_in = bgp_socket.recv(19) if msg_in != generator.keepalive_message(): - logging.error("Open not confirmed by keepalive, instead got " + - binascii.hexlify(msg_in)) - raise MessageError("Open not confirmed by keepalive, instead got", - msg_in) + error_msg = "Open not confirmed by keepalive, instead got" + logger.error(error_msg + ": " + binascii.hexlify(msg_in)) + raise MessageError(error_msg, msg_in) timer.reset_peer_hold_time() # Send the keepalive to indicate the connection is accepted. timer.snapshot() # Remember this time. msg_out = generator.keepalive_message() - logging.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out)) + logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out)) bgp_socket.send(msg_out) # Use the remembered time. timer.reset_my_keepalive_time(timer.snapshot_time) @@ -1316,5 +1773,51 @@ def main(): while True: # main reactor loop state.perform_one_loop_iteration() + +def threaded_job(arguments): + """Run the job threaded + + Arguments: + :arguments: Command line arguments + Returns: + :return: None + """ + amount_left = arguments.amount + utils_left = arguments.multiplicity + prefix_current = arguments.firstprefix + myip_current = arguments.myip + thread_args = [] + + while 1: + amount_per_util = (amount_left - 1) / utils_left + 1 # round up + amount_left -= amount_per_util + utils_left -= 1 + + args = deepcopy(arguments) + args.amount = amount_per_util + args.firstprefix = prefix_current + args.myip = myip_current + thread_args.append(args) + + if not utils_left: + break + prefix_current += amount_per_util * 16 + myip_current += 1 + + try: + # Create threads + for t in thread_args: + thread.start_new_thread(job, (t,)) + except Exception: + print "Error: unable to start thread." + raise SystemExit(2) + + # Work remains forever + while 1: + time.sleep(5) + + if __name__ == "__main__": - main() + arguments = parse_arguments() + logger = create_logger(arguments.loglevel, arguments.logfile) + threaded_job(arguments)