__license__ = "Eclipse Public License v1.0"
__email__ = "vrpolak@cisco.com"
-
import argparse
import binascii
import ipaddr
import select
import socket
import time
+import logging
+import struct
def parse_arguments():
- """Use argparse to get arguments, return args object."""
+ """Use argparse to get arguments,
+
+ Returns:
+ :return: args object.
+ """
parser = argparse.ArgumentParser()
# TODO: Should we use --argument-names-with-spaces?
- str_help = "Autonomous System number use in the stream (current default as in ODL: 64496)."
+ str_help = "Autonomous System number use in the stream."
parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
- # FIXME: We are acting as iBGP peer, we should mirror AS number from peer's open message.
- str_help = "Amount of IP prefixes to generate. Negative number is taken as overflown positive."
+ # FIXME: We are acting as iBGP peer,
+ # we should mirror AS number from peer's open message.
+ str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
parser.add_argument("--amount", default="1", type=int, help=str_help)
- str_help = "The first IPv4 prefix to announce, given as numeric IPv4 address."
- parser.add_argument("--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help)
- str_help = "If present, this tool will be listening for connection, instead of initiating it."
+ str_help = "Maximum number of IP prefixes to be announced in one iteration"
+ parser.add_argument("--insert", default="1", type=int, help=str_help)
+ str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
+ parser.add_argument("--withdraw", default="0", type=int, help=str_help)
+ str_help = "The number of prefixes to process without withdrawals"
+ parser.add_argument("--prefill", default="0", type=int, help=str_help)
+ str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
+ parser.add_argument("--updates", choices=["single", "mixed"],
+ default=["mixed"], help=str_help)
+ str_help = "Base prefix IP address for prefix generation"
+ parser.add_argument("--firstprefix", default="8.0.1.0",
+ type=ipaddr.IPv4Address, help=str_help)
+ str_help = "The prefix length."
+ parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
+ str_help = "Listen for connection, instead of initiating it."
parser.add_argument("--listen", action="store_true", help=str_help)
- str_help = "Numeric IP Address to bind to and derive BGP ID from. Default value only suitable for listening."
- parser.add_argument("--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help)
- str_help = "TCP port to bind to when listening or initiating connection. Default only suitable for initiating."
+ str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
+ "Default value only suitable for listening.")
+ parser.add_argument("--myip", default="0.0.0.0",
+ type=ipaddr.IPv4Address, help=str_help)
+ str_help = ("TCP port to bind to when listening or initiating connection." +
+ "Default only suitable for initiating.")
parser.add_argument("--myport", default="0", type=int, help=str_help)
str_help = "The IP of the next hop to be placed into the update messages."
- parser.add_argument("--nexthop", default="192.0.2.1", type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
- str_help = "Numeric IP Address to try to connect to. Currently no effect in listening mode."
- parser.add_argument("--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help)
+ parser.add_argument("--nexthop", default="192.0.2.1",
+ type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
+ str_help = ("Numeric IP Address to try to connect to." +
+ "Currently no effect in listening mode.")
+ parser.add_argument("--peerip", default="127.0.0.2",
+ type=ipaddr.IPv4Address, help=str_help)
str_help = "TCP port to try to connect to. No effect in listening mode."
parser.add_argument("--peerport", default="179", type=int, help=str_help)
- # TODO: The step between IP prefixes is currently hardcoded to 16. Should we make it configurable?
- # Yes, the argument list above is sorted alphabetically.
+ str_help = "Local hold time."
+ parser.add_argument("--holdtime", default="180", type=int, help=str_help)
+ str_help = "Log level (--error, --warning, --info, --debug)"
+ parser.add_argument("--error", dest="loglevel", action="store_const",
+ const=logging.ERROR, default=logging.ERROR,
+ help=str_help)
+ parser.add_argument("--warning", dest="loglevel", action="store_const",
+ const=logging.WARNING, default=logging.ERROR,
+ help=str_help)
+ parser.add_argument("--info", dest="loglevel", action="store_const",
+ const=logging.INFO, default=logging.ERROR,
+ help=str_help)
+ parser.add_argument("--debug", dest="loglevel", action="store_const",
+ const=logging.DEBUG, default=logging.ERROR,
+ help=str_help)
+ str_help = "Trailing part of the csv result files for plotting purposes"
+ parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
+ str_help = "Minimum number of updates to reach to include result into csv."
+ parser.add_argument("--threshold", default="1000", type=int, help=str_help)
arguments = parser.parse_args()
# TODO: Are sanity checks (such as asnumber>=0) required?
return arguments
def establish_connection(arguments):
- """Establish connection according to arguments, return socket."""
+ """Establish connection to BGP peer.
+
+ Arguments:
+ :arguments: following command-line argumets are used
+ - arguments.myip: local IP address
+ - arguments.myport: local port
+ - arguments.peerip: remote IP address
+ - arguments.peerport: remote port
+ Returns:
+ :return: socket.
+ """
if arguments.listen:
- # print "DEBUG: connecting in the listening case."
+ logging.info("Connecting in the listening mode.")
+ logging.debug("Local IP address: " + str(arguments.myip))
+ logging.debug("Local port: " + str(arguments.myport))
listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- listening_socket.bind((str(arguments.myip), arguments.myport)) # bind need single tuple as argument
+ # bind need single tuple as argument
+ listening_socket.bind((str(arguments.myip), arguments.myport))
listening_socket.listen(1)
bgp_socket, _ = listening_socket.accept()
# TODO: Verify client IP is cotroller IP.
listening_socket.close()
else:
- # print "DEBUG: connecting in the talking case."
+ logging.info("Connecting in the talking mode.")
+ logging.debug("Local IP address: " + str(arguments.myip))
+ logging.debug("Local port: " + str(arguments.myport))
+ logging.debug("Remote IP address: " + str(arguments.peerip))
+ logging.debug("Remote port: " + str(arguments.peerport))
talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- talking_socket.bind((str(arguments.myip), arguments.myport)) # bind to force specified address and port
- talking_socket.connect((str(arguments.peerip), arguments.peerport)) # socket does not spead ipaddr, hence str()
+ # bind to force specified address and port
+ talking_socket.bind((str(arguments.myip), arguments.myport))
+ # socket does not spead ipaddr, hence str()
+ talking_socket.connect((str(arguments.peerip), arguments.peerport))
bgp_socket = talking_socket
- print "Connected to ODL."
+ logging.info("Connected to ODL.")
return bgp_socket
def get_short_int_from_message(message, offset=16):
- """Extract 2-bytes number from packed string, default offset is for BGP message size."""
+ """Extract 2-bytes number from provided message.
+
+ Arguments:
+ :message: given message
+ :offset: offset of the short_int inside the message
+ Returns:
+ :return: required short_inf value.
+ Notes:
+ default offset value is the BGP message size offset.
+ """
high_byte_int = ord(message[offset])
low_byte_int = ord(message[offset + 1])
short_int = high_byte_int * 256 + low_byte_int
class MessageError(ValueError):
- """Value error with logging optimized for hexlified messages."""
+ """Value error with logging optimized for hexlified messages.
+ """
def __init__(self, text, message, *args):
- """Store and call super init for textual comment, store raw message which caused it."""
+ """Initialisation.
+
+ Store and call super init for textual comment,
+ store raw message which caused it.
+ """
self.text = text
self.msg = message
super(MessageError, self).__init__(text, message, *args)
def __str__(self):
- """
- Generate human readable error message
+ """Generate human readable error message.
- Concatenate text comment, colon with space
- and hexlified message. Use a placeholder string
- if the message turns out to be empty.
+ Returns:
+ :return: human readable message as string
+ Notes:
+ Use a placeholder string if the message is to be empty.
"""
message = binascii.hexlify(self.msg)
if message == "":
def read_open_message(bgp_socket):
- """Receive message, perform some validation, return the raw message."""
+ """Receive peer's OPEN message
+
+ Arguments:
+ :bgp_socket: the socket to be read
+ Returns:
+ :return: received OPEN message.
+ Notes:
+ Performs just basic incomming message checks
+ """
msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
- # TODO: Is it possible for incoming open message to be split in more than one packet?
+ # TODO: Can the incoming open message be split in more than one packet?
# Some validation.
- if len(msg_in) < 37: # 37 is minimal length of open message with 4-byte AS number.
- raise MessageError("Got something else than open with 4-byte AS number", msg_in)
- # TODO: We could check BGP marker, but it is defined only later; decide what to do.
+ if len(msg_in) < 37:
+ # 37 is minimal length of open message with 4-byte AS number.
+ logging.error("Got something else than open with 4-byte AS number: " +
+ binascii.hexlify(msg_in))
+ raise MessageError("Got something else than open with 4-byte AS number",
+ msg_in)
+ # TODO: We could check BGP marker, but it is defined only later;
+ # decide what to do.
reported_length = get_short_int_from_message(msg_in)
if len(msg_in) != reported_length:
- raise MessageError("Message length is not " + reported_length + " in message", msg_in)
- print "Open message received"
+ logging.error("Message length is not " + str(reported_length) +
+ " as stated in " + binascii.hexlify(msg_in))
+ raise MessageError("Message length is not " + reported_length +
+ " as stated in ", msg_in)
+ logging.info("Open message received.")
return msg_in
class MessageGenerator(object):
- """Class with methods returning messages and state holding configuration data required to do it properly."""
+ """Class which generates messages, holds states and configuration values.
+ """
- # TODO: Define bgp marker as class (constant) variable.
+ # TODO: Define bgp marker as a class (constant) variable.
def __init__(self, args):
- """Initialize data according to command-line args."""
- # Various auxiliary variables.
- # Hack: 4-byte AS number uses the same "int to packed" encoding as IPv4 addresses.
- asnumber_4bytes = ipaddr.v4_int_to_packed(args.asnumber)
- asnumber_2bytes = "\x5b\xa0" # AS_TRANS value, 23456 decadic.
- if args.asnumber < 65536: # AS number is mappable to 2 bytes
- asnumber_2bytes = asnumber_4bytes[2:4]
- # From now on, attribute docsrings are used.
- self.int_nextprefix = int(args.firstprefix)
- """Prefix IP address for next update message, as integer."""
- self.updates_to_send = args.amount
- """Number of update messages left to be sent."""
- # All information ready, so we can define messages. Mostly copied from play.py by Jozef Behran.
- # The following attributes are constant.
- self.bgp_marker = "\xFF" * 16
- """Every message starts with this, see rfc4271#section-4.1"""
- self.keepalive_message = self.bgp_marker + (
- "\x00\x13" # Size
- "\x04" # Type KEEPALIVE
- )
- """KeepAlive message, see rfc4271#section-4.4"""
+ """Initialisation according to command-line args.
+
+ Arguments:
+ :args: argsparser's Namespace object which contains command-line
+ options for MesageGenerator initialisation
+ Notes:
+ Calculates and stores default values used later on for
+ message geeration.
+ """
+ self.total_prefix_amount = args.amount
+ # Number of update messages left to be sent.
+ self.remaining_prefixes = self.total_prefix_amount
+
+ # New parameters initialisation
+ self.iteration = 0
+ self.prefix_base_default = args.firstprefix
+ self.prefix_length_default = args.prefixlen
+ self.wr_prefixes_default = []
+ self.nlri_prefixes_default = []
+ self.version_default = 4
+ self.my_autonomous_system_default = args.asnumber
+ self.hold_time_default = args.holdtime # Local hold time.
+ self.bgp_identifier_default = int(args.myip)
+ self.next_hop_default = args.nexthop
+ self.single_update_default = args.updates == "single"
+ self.randomize_updates_default = args.updates == "random"
+ self.prefix_count_to_add_default = args.insert
+ self.prefix_count_to_del_default = args.withdraw
+ if self.prefix_count_to_del_default < 0:
+ self.prefix_count_to_del_default = 0
+ if not self.single_update_default and not self.prefix_count_to_del_default:
+ self.prefix_count_to_del_default = 1
+ # we need some content for the 2nd UPDATE in the iteration
+ if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
+ # total number of prefixes must grow to avoid infinite test loop
+ self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
+ self.slot_size_default = self.prefix_count_to_add_default
+ self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
+ self.results_file_name_default = args.results
+ self.performance_threshold_default = args.threshold
+ # Default values used for randomized part
+ s1_slots = ((self.total_prefix_amount -
+ self.remaining_prefixes_threshold - 1) /
+ self.prefix_count_to_add_default + 1)
+ s2_slots = ((self.remaining_prefixes_threshold - 1) /
+ (self.prefix_count_to_add_default -
+ self.prefix_count_to_del_default) + 1)
+ # S1_First_Index = 0
+ # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
+ s2_first_index = s1_slots * self.prefix_count_to_add_default
+ s2_last_index = (s2_first_index +
+ s2_slots * (self.prefix_count_to_add_default -
+ self.prefix_count_to_del_default) - 1)
+ self.slot_gap_default = ((self.total_prefix_amount -
+ self.remaining_prefixes_threshold - 1) /
+ self.prefix_count_to_add_default + 1)
+ self.randomize_lowest_default = s2_first_index
+ self.randomize_highest_default = s2_last_index
+
+ # Initialising counters
+ self.phase1_start_time = 0
+ self.phase1_stop_time = 0
+ self.phase2_start_time = 0
+ self.phase2_stop_time = 0
+ self.phase1_updates_sent = 0
+ self.phase2_updates_sent = 0
+ self.updates_sent = 0
+
+ # Needed for the MessageGenerator performance optimization
+ self.log_info = args.loglevel <= logging.INFO
+ self.log_debug = args.loglevel <= logging.DEBUG
+
+ logging.info("Generator initialisation")
+ logging.info(" Target total number of prefixes to be introduced: " +
+ str(self.total_prefix_amount))
+ logging.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
+ str(self.prefix_length_default))
+ logging.info(" My Autonomous System number: " +
+ str(self.my_autonomous_system_default))
+ logging.info(" My Hold Time: " + str(self.hold_time_default))
+ logging.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
+ logging.info(" Next Hop: " + str(self.next_hop_default))
+ logging.info(" Prefix count to be inserted at once: " +
+ str(self.prefix_count_to_add_default))
+ logging.info(" Prefix count to be withdrawn at once: " +
+ str(self.prefix_count_to_del_default))
+ logging.info(" Fast pre-fill up to " +
+ str(self.total_prefix_amount -
+ self.remaining_prefixes_threshold) + " prefixes")
+ logging.info(" Remaining number of prefixes to be processed " +
+ "in parallel with withdrawals: " +
+ str(self.remaining_prefixes_threshold))
+ logging.debug(" Prefix index range used after pre-fill procedure [" +
+ str(self.randomize_lowest_default) + ", " +
+ str(self.randomize_highest_default) + "]")
+ if self.single_update_default:
+ logging.info(" Common single UPDATE will be generated " +
+ "for both NLRI & WITHDRAWN lists")
+ else:
+ logging.info(" Two separate UPDATEs will be generated " +
+ "for each NLRI & WITHDRAWN lists")
+ if self.randomize_updates_default:
+ logging.info(" Generation of UPDATE messages will be randomized")
+ logging.info(" Let\"s go ...\n")
+
# TODO: Notification for hold timer expiration can be handy.
- self.eor_message = self.bgp_marker + (
- "\x00\x17" # Size
- "\x02" # Type (UPDATE)
- "\x00\x00" # Withdrawn routes length (0)
- "\x00\x00" # Total Path Attributes Length (0)
- )
- """End-of-RIB marker, see rfc4724#section-2"""
- self.update_message_without_prefix = self.bgp_marker + (
- "\x00\x30" # Size
- "\x02" # Type (UPDATE)
- "\x00\x00" # Withdrawn routes length (0)
- "\x00\x14" # Total Path Attributes Length (20)
- "\x40" # Flags ("Well-Known")
- "\x01" # Type (ORIGIN)
- "\x01" # Length (1)
- "\x00" # Origin: IGP
- "\x40" # Flags ("Well-Known")
- "\x02" # Type (AS_PATH)
- "\x06" # Length (6)
- "\x02" # AS segment type (AS_SEQUENCE)
- "\x01" # AS segment length (1)
- + asnumber_4bytes + # AS segment (4 bytes)
- "\x40" # Flags ("Well-Known")
- "\x03" # Type (NEXT_HOP)
- "\x04" # Length (4)
- + args.nexthop.packed + # IP address of the next hop (4 bytes)
- "\x1c" # IPv4 prefix length, see RFC 4271, page 20. This tool uses Network Mask: 255.255.255.240
- )
- """The IP address prefix (4 bytes) has to be appended to complete Update message, see rfc4271#section-4.3."""
- self.open_message = self.bgp_marker + (
- "\x00\x2d" # Size
- "\x01" # Type (OPEN)
- "\x04" # BGP Varsion (4)
- + asnumber_2bytes + # My Autonomous System
- # FIXME: The following hold time is hardcoded separately. Compute from initial hold_time value.
- "\x00\xb4" # Hold Time (180)
- + args.myip.packed + # BGP Identifer
- "\x10" # Optional parameters length
+
+ def store_results(self, file_name=None, threshold=None):
+ """ Stores specified results into files based on file_name value.
+
+ Arguments:
+ :param file_name: Trailing (common) part of result file names
+ :param threshold: Minimum number of sent updates needed for each
+ result to be included into result csv file
+ (mainly needed because of the result accuracy)
+ Returns:
+ :return: n/a
+ """
+ # default values handling
+ if file_name is None:
+ file_name = self.results_file_name_default
+ if threshold is None:
+ threshold = self.performance_threshold_default
+ # performance calculation
+ if self.phase1_updates_sent >= threshold:
+ totals1 = self.phase1_updates_sent
+ performance1 = int(self.phase1_updates_sent /
+ (self.phase1_stop_time - self.phase1_start_time))
+ else:
+ totals1 = None
+ performance1 = None
+ if self.phase2_updates_sent >= threshold:
+ totals2 = self.phase2_updates_sent
+ performance2 = int(self.phase2_updates_sent /
+ (self.phase2_stop_time - self.phase2_start_time))
+ else:
+ totals2 = None
+ performance2 = None
+
+ logging.info("#" * 10 + " Final results " + "#" * 10)
+ logging.info("Number of iterations: " + str(self.iteration))
+ logging.info("Number of UPDATE messages sent in the pre-fill phase: " +
+ str(self.phase1_updates_sent))
+ logging.info("The pre-fill phase duration: " +
+ str(self.phase1_stop_time - self.phase1_start_time) + "s")
+ logging.info("Number of UPDATE messages sent in the 2nd test phase: " +
+ str(self.phase2_updates_sent))
+ logging.info("The 2nd test phase duration: " +
+ str(self.phase2_stop_time - self.phase2_start_time) + "s")
+ logging.info("Threshold for performance reporting: " + str(threshold))
+
+ # making labels
+ phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
+ " route(s) per UPDATE")
+ if self.single_update_default:
+ phase2_label = "+" + (str(self.prefix_count_to_add_default) +
+ "/-" + str(self.prefix_count_to_del_default) +
+ " routes per UPDATE")
+ else:
+ phase2_label = "+" + (str(self.prefix_count_to_add_default) +
+ "/-" + str(self.prefix_count_to_del_default) +
+ " routes in two UPDATEs")
+ # collecting capacity and performance results
+ totals = {}
+ performance = {}
+ if totals1 is not None:
+ totals[phase1_label] = totals1
+ performance[phase1_label] = performance1
+ if totals2 is not None:
+ totals[phase2_label] = totals2
+ performance[phase2_label] = performance2
+ self.write_results_to_file(totals, "totals-" + file_name)
+ self.write_results_to_file(performance, "performance-" + file_name)
+
+ def write_results_to_file(self, results, file_name):
+ """Writes results to the csv plot file consumable by Jenkins.
+
+ Arguments:
+ :param file_name: Name of the (csv) file to be created
+ Returns:
+ :return: none
+ """
+ first_line = ""
+ second_line = ""
+ f = open(file_name, "wt")
+ try:
+ for key in sorted(results):
+ first_line += key + ", "
+ second_line += str(results[key]) + ", "
+ first_line = first_line[:-2]
+ second_line = second_line[:-2]
+ f.write(first_line + "\n")
+ f.write(second_line + "\n")
+ logging.info("Performance results of message generator stored in " +
+ file_name + ':')
+ logging.info(" " + first_line)
+ logging.info(" " + second_line)
+ finally:
+ f.close()
+
+ # Return pseudo-randomized (reproducible) index for selected range
+ def randomize_index(self, index, lowest=None, highest=None):
+ """Calculates pseudo-randomized index from selected range.
+
+ Arguments:
+ :param index: input index
+ :param lowest: the lowes index from the randomized area
+ :param highest: the highest index from the randomized area
+ Returns:
+ :return: the (pseudo)randomized index
+ Notes:
+ Created just as a fame for future generator enhancement.
+ """
+ # default values handling
+ if lowest is None:
+ lowest = self.randomize_lowest_default
+ if highest is None:
+ highest = self.randomize_highest_default
+ # randomize
+ if (index >= lowest) and (index <= highest):
+ # we are in the randomized range -> shuffle it inside
+ # the range (now just reverse the order)
+ new_index = highest - (index - lowest)
+ else:
+ # we are out of the randomized range -> nothing to do
+ new_index = index
+ return new_index
+
+ # Get list of prefixes
+ def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
+ prefix_len=None, prefix_count=None, randomize=None):
+ """Generates list of IP address prefixes.
+
+ Arguments:
+ :param slot_index: index of group of prefix addresses
+ :param slot_size: size of group of prefix addresses
+ in [number of included prefixes]
+ :param prefix_base: IP address of the first prefix
+ (slot_index = 0, prefix_index = 0)
+ :param prefix_len: length of the prefix in bites
+ (the same as size of netmask)
+ :param prefix_count: number of prefixes to be returned
+ from the specified slot
+ Returns:
+ :return: list of generated IP address prefixes
+ """
+ # default values handling
+ if slot_size is None:
+ slot_size = self.slot_size_default
+ if prefix_base is None:
+ prefix_base = self.prefix_base_default
+ if prefix_len is None:
+ prefix_len = self.prefix_length_default
+ if prefix_count is None:
+ prefix_count = slot_size
+ if randomize is None:
+ randomize = self.randomize_updates_default
+ # generating list of prefixes
+ indexes = []
+ prefixes = []
+ prefix_gap = 2 ** (32 - prefix_len)
+ for i in range(prefix_count):
+ prefix_index = slot_index * slot_size + i
+ if randomize:
+ prefix_index = self.randomize_index(prefix_index)
+ indexes.append(prefix_index)
+ prefixes.append(prefix_base + prefix_index * prefix_gap)
+ if self.log_debug:
+ logging.debug(" Prefix slot index: " + str(slot_index))
+ logging.debug(" Prefix slot size: " + str(slot_size))
+ logging.debug(" Prefix count: " + str(prefix_count))
+ logging.debug(" Prefix indexes: " + str(indexes))
+ logging.debug(" Prefix list: " + str(prefixes))
+ return prefixes
+
+ def compose_update_message(self, prefix_count_to_add=None,
+ prefix_count_to_del=None):
+ """Composes an UPDATE message
+
+ Arguments:
+ :param prefix_count_to_add: # of prefixes to put into NLRI list
+ :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
+ Returns:
+ :return: encoded UPDATE message in HEX
+ Notes:
+ Optionally generates separate UPDATEs for NLRI and WITHDRAWN
+ lists or common message wich includes both prefix lists.
+ Updates global counters.
+ """
+ # default values handling
+ if prefix_count_to_add is None:
+ prefix_count_to_add = self.prefix_count_to_add_default
+ if prefix_count_to_del is None:
+ prefix_count_to_del = self.prefix_count_to_del_default
+ # logging
+ if self.log_info and not (self.iteration % 1000):
+ logging.info("Iteration: " + str(self.iteration) +
+ " - total remaining prefixes: " +
+ str(self.remaining_prefixes))
+ if self.log_debug:
+ logging.debug("#" * 10 + " Iteration: " +
+ str(self.iteration) + " " + "#" * 10)
+ logging.debug("Remaining prefixes: " +
+ str(self.remaining_prefixes))
+ # scenario type & one-shot counter
+ straightforward_scenario = (self.remaining_prefixes >
+ self.remaining_prefixes_threshold)
+ if straightforward_scenario:
+ prefix_count_to_del = 0
+ if self.log_debug:
+ logging.debug("--- STARAIGHTFORWARD SCENARIO ---")
+ if not self.phase1_start_time:
+ self.phase1_start_time = time.time()
+ else:
+ if self.log_debug:
+ logging.debug("--- COMBINED SCENARIO ---")
+ if not self.phase2_start_time:
+ self.phase2_start_time = time.time()
+ # tailor the number of prefixes if needed
+ prefix_count_to_add = (prefix_count_to_del +
+ min(prefix_count_to_add - prefix_count_to_del,
+ self.remaining_prefixes))
+ # prefix slots selection for insertion and withdrawal
+ slot_index_to_add = self.iteration
+ slot_index_to_del = slot_index_to_add - self.slot_gap_default
+ # getting lists of prefixes for insertion in this iteration
+ if self.log_debug:
+ logging.debug("Prefixes to be inserted in this iteration:")
+ prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
+ prefix_count=prefix_count_to_add)
+ # getting lists of prefixes for withdrawal in this iteration
+ if self.log_debug:
+ logging.debug("Prefixes to be withdrawn in this iteration:")
+ prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
+ prefix_count=prefix_count_to_del)
+ # generating the mesage
+ if self.single_update_default:
+ # Send prefixes to be introduced and withdrawn
+ # in one UPDATE message
+ msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
+ nlri_prefixes=prefix_list_to_add)
+ else:
+ # Send prefixes to be introduced and withdrawn
+ # in separate UPDATE messages (if needed)
+ msg_out = self.update_message(wr_prefixes=[],
+ nlri_prefixes=prefix_list_to_add)
+ if prefix_count_to_del:
+ msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
+ nlri_prefixes=[])
+ # updating counters - who knows ... maybe I am last time here ;)
+ if straightforward_scenario:
+ self.phase1_stop_time = time.time()
+ self.phase1_updates_sent = self.updates_sent
+ else:
+ self.phase2_stop_time = time.time()
+ self.phase2_updates_sent = (self.updates_sent -
+ self.phase1_updates_sent)
+ # updating totals for the next iteration
+ self.iteration += 1
+ self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
+ # returning the encoded message
+ return msg_out
+
+ # Section of message encoders
+
+ def open_message(self, version=None, my_autonomous_system=None,
+ hold_time=None, bgp_identifier=None):
+ """Generates an OPEN Message (rfc4271#section-4.2)
+
+ Arguments:
+ :param version: see the rfc4271#section-4.2
+ :param my_autonomous_system: see the rfc4271#section-4.2
+ :param hold_time: see the rfc4271#section-4.2
+ :param bgp_identifier: see the rfc4271#section-4.2
+ Returns:
+ :return: encoded OPEN message in HEX
+ """
+
+ # Default values handling
+ if version is None:
+ version = self.version_default
+ if my_autonomous_system is None:
+ my_autonomous_system = self.my_autonomous_system_default
+ if hold_time is None:
+ hold_time = self.hold_time_default
+ if bgp_identifier is None:
+ bgp_identifier = self.bgp_identifier_default
+
+ # Marker
+ marker_hex = "\xFF" * 16
+
+ # Type
+ type = 1
+ type_hex = struct.pack("B", type)
+
+ # version
+ version_hex = struct.pack("B", version)
+
+ # my_autonomous_system
+ # AS_TRANS value, 23456 decadic.
+ my_autonomous_system_2_bytes = 23456
+ # AS number is mappable to 2 bytes
+ if my_autonomous_system < 65536:
+ my_autonomous_system_2_bytes = my_autonomous_system
+ my_autonomous_system_hex_2_bytes = struct.pack(">H",
+ my_autonomous_system)
+
+ # Hold Time
+ hold_time_hex = struct.pack(">H", hold_time)
+
+ # BGP Identifier
+ bgp_identifier_hex = struct.pack(">I", bgp_identifier)
+
+ # Optional Parameters
+ optional_parameters_hex = (
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
- "\x01" # Capability type (NLRI Unicast), see RFC 4760, secton 8
+ "\x01" # Capability type (NLRI Unicast),
+ # see RFC 4760, secton 8
"\x04" # Capability value length
"\x00\x01" # AFI (Ipv4)
"\x00" # (reserved)
"\x01" # SAFI (Unicast)
+
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
- "\x41" # "32 bit AS Numbers Support" (see RFC 6793, section 3)
+ "\x41" # "32 bit AS Numbers Support"
+ # (see RFC 6793, section 3)
"\x04" # Capability value length
- + asnumber_4bytes # My AS in 32 bit format
+ # My AS in 32 bit format
+ + struct.pack(">I", my_autonomous_system)
)
- """Open message, see rfc4271#section-4.2"""
- # __init__ ends
-
- def compose_update_message(self):
- """Return update message, prepare next prefix, decrease amount without checking it."""
- prefix_packed = ipaddr.v4_int_to_packed(self.int_nextprefix)
- # print "DEBUG: prefix", self.int_nextprefix, "packed to", binascii.hexlify(prefix_packed)
- msg_out = self.update_message_without_prefix + prefix_packed
- self.int_nextprefix += 16 # Hardcoded, as open message specifies such netmask.
- self.updates_to_send -= 1
- return msg_out
+
+ # Optional Parameters Length
+ optional_parameters_length = len(optional_parameters_hex)
+ optional_parameters_length_hex = struct.pack("B",
+ optional_parameters_length)
+
+ # Length (big-endian)
+ length = (
+ len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
+ len(my_autonomous_system_hex_2_bytes) +
+ len(hold_time_hex) + len(bgp_identifier_hex) +
+ len(optional_parameters_length_hex) +
+ len(optional_parameters_hex)
+ )
+ length_hex = struct.pack(">H", length)
+
+ # OPEN Message
+ message_hex = (
+ marker_hex +
+ length_hex +
+ type_hex +
+ version_hex +
+ my_autonomous_system_hex_2_bytes +
+ hold_time_hex +
+ bgp_identifier_hex +
+ optional_parameters_length_hex +
+ optional_parameters_hex
+ )
+
+ if self.log_debug:
+ logging.debug("OPEN Message encoding")
+ logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logging.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logging.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logging.debug(" Version=" + str(version) + " (0x" +
+ binascii.hexlify(version_hex) + ")")
+ logging.debug(" My Autonomous System=" +
+ str(my_autonomous_system_2_bytes) + " (0x" +
+ binascii.hexlify(my_autonomous_system_hex_2_bytes) +
+ ")")
+ logging.debug(" Hold Time=" + str(hold_time) + " (0x" +
+ binascii.hexlify(hold_time_hex) + ")")
+ logging.debug(" BGP Identifier=" + str(bgp_identifier) +
+ " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
+ logging.debug(" Optional Parameters Length=" +
+ str(optional_parameters_length) + " (0x" +
+ binascii.hexlify(optional_parameters_length_hex) +
+ ")")
+ logging.debug(" Optional Parameters=0x" +
+ binascii.hexlify(optional_parameters_hex))
+ logging.debug(" OPEN Message encoded: 0x" +
+ binascii.b2a_hex(message_hex))
+
+ return message_hex
+
+ def update_message(self, wr_prefixes=None, nlri_prefixes=None,
+ wr_prefix_length=None, nlri_prefix_length=None,
+ my_autonomous_system=None, next_hop=None):
+ """Generates an UPDATE Message (rfc4271#section-4.3)
+
+ Arguments:
+ :param wr_prefixes: see the rfc4271#section-4.3
+ :param nlri_prefixes: see the rfc4271#section-4.3
+ :param wr_prefix_length: see the rfc4271#section-4.3
+ :param nlri_prefix_length: see the rfc4271#section-4.3
+ :param my_autonomous_system: see the rfc4271#section-4.3
+ :param next_hop: see the rfc4271#section-4.3
+ Returns:
+ :return: encoded UPDATE message in HEX
+ """
+
+ # Default values handling
+ if wr_prefixes is None:
+ wr_prefixes = self.wr_prefixes_default
+ if nlri_prefixes is None:
+ nlri_prefixes = self.nlri_prefixes_default
+ if wr_prefix_length is None:
+ wr_prefix_length = self.prefix_length_default
+ if nlri_prefix_length is None:
+ nlri_prefix_length = self.prefix_length_default
+ if my_autonomous_system is None:
+ my_autonomous_system = self.my_autonomous_system_default
+ if next_hop is None:
+ next_hop = self.next_hop_default
+
+ # Marker
+ marker_hex = "\xFF" * 16
+
+ # Type
+ type = 2
+ type_hex = struct.pack("B", type)
+
+ # Withdrawn Routes
+ bytes = ((wr_prefix_length - 1) / 8) + 1
+ withdrawn_routes_hex = ""
+ for prefix in wr_prefixes:
+ withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
+ struct.pack(">I", int(prefix))[:bytes])
+ withdrawn_routes_hex += withdrawn_route_hex
+
+ # Withdrawn Routes Length
+ withdrawn_routes_length = len(withdrawn_routes_hex)
+ withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
+
+ # TODO: to replace hardcoded string by encoding?
+ # Path Attributes
+ if nlri_prefixes != []:
+ path_attributes_hex = (
+ "\x40" # Flags ("Well-Known")
+ "\x01" # Type (ORIGIN)
+ "\x01" # Length (1)
+ "\x00" # Origin: IGP
+ "\x40" # Flags ("Well-Known")
+ "\x02" # Type (AS_PATH)
+ "\x06" # Length (6)
+ "\x02" # AS segment type (AS_SEQUENCE)
+ "\x01" # AS segment length (1)
+ # AS segment (4 bytes)
+ + struct.pack(">I", my_autonomous_system) +
+ "\x40" # Flags ("Well-Known")
+ "\x03" # Type (NEXT_HOP)
+ "\x04" # Length (4)
+ # IP address of the next hop (4 bytes)
+ + struct.pack(">I", int(next_hop))
+ )
+ else:
+ path_attributes_hex = ""
+
+ # Total Path Attributes Length
+ total_path_attributes_length = len(path_attributes_hex)
+ total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
+
+ # Network Layer Reachability Information
+ bytes = ((nlri_prefix_length - 1) / 8) + 1
+ nlri_hex = ""
+ for prefix in nlri_prefixes:
+ nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
+ struct.pack(">I", int(prefix))[:bytes])
+ nlri_hex += nlri_prefix_hex
+
+ # Length (big-endian)
+ length = (
+ len(marker_hex) + 2 + len(type_hex) +
+ len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
+ len(total_path_attributes_length_hex) + len(path_attributes_hex) +
+ len(nlri_hex))
+ length_hex = struct.pack(">H", length)
+
+ # UPDATE Message
+ message_hex = (
+ marker_hex +
+ length_hex +
+ type_hex +
+ withdrawn_routes_length_hex +
+ withdrawn_routes_hex +
+ total_path_attributes_length_hex +
+ path_attributes_hex +
+ nlri_hex
+ )
+
+ if self.log_debug:
+ logging.debug("UPDATE Message encoding")
+ logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logging.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logging.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logging.debug(" withdrawn_routes_length=" +
+ str(withdrawn_routes_length) + " (0x" +
+ binascii.hexlify(withdrawn_routes_length_hex) + ")")
+ logging.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
+ str(wr_prefix_length) + " (0x" +
+ binascii.hexlify(withdrawn_routes_hex) + ")")
+ logging.debug(" Total Path Attributes Length=" +
+ str(total_path_attributes_length) + " (0x" +
+ binascii.hexlify(total_path_attributes_length_hex) +
+ ")")
+ logging.debug(" Path Attributes=" + "(0x" +
+ binascii.hexlify(path_attributes_hex) + ")")
+ logging.debug(" Network Layer Reachability Information=" +
+ str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
+ " (0x" + binascii.hexlify(nlri_hex) + ")")
+ logging.debug(" UPDATE Message encoded: 0x" +
+ binascii.b2a_hex(message_hex))
+
+ # updating counter
+ self.updates_sent += 1
+ # returning encoded message
+ return message_hex
+
+ def notification_message(self, error_code, error_subcode, data_hex=""):
+ """Generates a NOTIFICATION Message (rfc4271#section-4.5)
+
+ Arguments:
+ :param error_code: see the rfc4271#section-4.5
+ :param error_subcode: see the rfc4271#section-4.5
+ :param data_hex: see the rfc4271#section-4.5
+ Returns:
+ :return: encoded NOTIFICATION message in HEX
+ """
+
+ # Marker
+ marker_hex = "\xFF" * 16
+
+ # Type
+ type = 3
+ type_hex = struct.pack("B", type)
+
+ # Error Code
+ error_code_hex = struct.pack("B", error_code)
+
+ # Error Subode
+ error_subcode_hex = struct.pack("B", error_subcode)
+
+ # Length (big-endian)
+ length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
+ len(error_subcode_hex) + len(data_hex))
+ length_hex = struct.pack(">H", length)
+
+ # NOTIFICATION Message
+ message_hex = (
+ marker_hex +
+ length_hex +
+ type_hex +
+ error_code_hex +
+ error_subcode_hex +
+ data_hex
+ )
+
+ if self.log_debug:
+ logging.debug("NOTIFICATION Message encoding")
+ logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logging.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logging.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logging.debug(" Error Code=" + str(error_code) + " (0x" +
+ binascii.hexlify(error_code_hex) + ")")
+ logging.debug(" Error Subode=" + str(error_subcode) + " (0x" +
+ binascii.hexlify(error_subcode_hex) + ")")
+ logging.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
+ logging.debug(" NOTIFICATION Message encoded: 0x" +
+ binascii.b2a_hex(message_hex))
+
+ return message_hex
+
+ def keepalive_message(self):
+ """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
+
+ Returns:
+ :return: encoded KEEP ALIVE message in HEX
+ """
+
+ # Marker
+ marker_hex = "\xFF" * 16
+
+ # Type
+ type = 4
+ type_hex = struct.pack("B", type)
+
+ # Length (big-endian)
+ length = len(marker_hex) + 2 + len(type_hex)
+ length_hex = struct.pack(">H", length)
+
+ # KEEP ALIVE Message
+ message_hex = (
+ marker_hex +
+ length_hex +
+ type_hex
+ )
+
+ if self.log_debug:
+ logging.debug("KEEP ALIVE Message encoding")
+ logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logging.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logging.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logging.debug(" KEEP ALIVE Message encoded: 0x" +
+ binascii.b2a_hex(message_hex))
+
+ return message_hex
class TimeTracker(object):
- """Class for tracking timers, both for my keepalives and peer's hold time."""
+ """Class for tracking timers, both for my keepalives and
+ peer's hold time.
+ """
def __init__(self, msg_in):
- """Initialize config, based on hardcoded defaults and open message from peer."""
- # Note: Relative time is always named timedelta, to stress that (non-delta) time is absolute.
+ """Initialisation. based on defaults and OPEN message from peer.
+
+ Arguments:
+ msg_in: the OPEN message received from peer.
+ """
+ # Note: Relative time is always named timedelta, to stress that
+ # the (non-delta) time is absolute.
self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
- """Upper bound for being stuck in the same state, we should at least report something before continuing."""
- # Negotiate the hold timer by taking the smaller of the 2 values (mine and the peer's).
+ # Upper bound for being stuck in the same state, we should
+ # at least report something before continuing.
+ # Negotiate the hold timer by taking the smaller
+ # of the 2 values (mine and the peer's).
hold_timedelta = 180 # Not an attribute of self yet.
- # TODO: Make the default value configurable, default value could mirror what peer said.
+ # TODO: Make the default value configurable,
+ # default value could mirror what peer said.
peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
if hold_timedelta > peer_hold_timedelta:
hold_timedelta = peer_hold_timedelta
if hold_timedelta != 0 and hold_timedelta < 3:
+ logging.error("Invalid hold timedelta value: " + str(hold_timedelta))
raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
- self.hold_timedelta = hold_timedelta # only now the final value is visible from outside
- """If we do not hear from peer this long, we assume it has died."""
+ self.hold_timedelta = hold_timedelta
+ # If we do not hear from peer this long, we assume it has died.
self.keepalive_timedelta = int(hold_timedelta / 3.0)
- """Upper limit for duration between messages, to avoid being declared dead."""
- self.snapshot_time = time.time() # The same as calling snapshot(), but also declares a field.
- """Sometimes we need to store time. This is where to get the value from afterwards."""
- self.peer_hold_time = self.snapshot_time + self.hold_timedelta # time_keepalive may be too strict
- """At this time point, peer will be declared dead."""
+ # Upper limit for duration between messages, to avoid being
+ # declared to be dead.
+ # The same as calling snapshot(), but also declares a field.
+ self.snapshot_time = time.time()
+ # Sometimes we need to store time. This is where to get
+ # the value from afterwards. Time_keepalive may be too strict.
+ self.peer_hold_time = self.snapshot_time + self.hold_timedelta
+ # At this time point, peer will be declared dead.
self.my_keepalive_time = None # to be set later
- """At this point, we should be sending keepalive message."""
+ # At this point, we should be sending keepalive message.
def snapshot(self):
"""Store current time in instance data to use later."""
- self.snapshot_time = time.time() # Read as time before something interesting was called.
+ # Read as time before something interesting was called.
+ self.snapshot_time = time.time()
def reset_peer_hold_time(self):
"""Move hold time to future as peer has just proven it still lives."""
self.peer_hold_time = time.time() + self.hold_timedelta
- # Some methods could rely on self.snapshot_time, but it is better to require user to provide it explicitly.
+ # Some methods could rely on self.snapshot_time, but it is better
+ # to require user to provide it explicitly.
def reset_my_keepalive_time(self, keepalive_time):
- """Move KA timer to future based on given time from before sending."""
+ """Calculate and set the next my KEEP ALIVE timeout time
+
+ Arguments:
+ :keepalive_time: the initial value of the KEEP ALIVE timer
+ """
self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
def is_time_for_my_keepalive(self):
+ """Check for my KEEP ALIVE timeout occurence"""
if self.hold_timedelta == 0:
return False
return self.snapshot_time >= self.my_keepalive_time
def get_next_event_time(self):
+ """Set the time of the next expected or to be sent KEEP ALIVE"""
if self.hold_timedelta == 0:
return self.snapshot_time + 86400
return min(self.my_keepalive_time, self.peer_hold_time)
def check_peer_hold_time(self, snapshot_time):
"""Raise error if nothing was read from peer until specified time."""
- if self.hold_timedelta != 0: # Hold time = 0 means keepalive checking off.
- if snapshot_time > self.peer_hold_time: # time.time() may be too strict
- raise RuntimeError("Peer has overstepped the hold timer.") # TODO: Include hold_timedelta?
- # TODO: Add notification sending (attempt). That means move to write tracker.
+ # Hold time = 0 means keepalive checking off.
+ if self.hold_timedelta != 0:
+ # time.time() may be too strict
+ if snapshot_time > self.peer_hold_time:
+ logging.error("Peer has overstepped the hold timer.")
+ raise RuntimeError("Peer has overstepped the hold timer.")
+ # TODO: Include hold_timedelta?
+ # TODO: Add notification sending (attempt). That means
+ # move to write tracker.
class ReadTracker(object):
- """Class for tracking read of mesages chunk by chunk and for idle waiting."""
+ """Class for tracking read of mesages chunk by chunk and
+ for idle waiting.
+ """
def __init__(self, bgp_socket, timer):
- """Set initial state."""
+ """The reader initialisation.
+
+ Arguments:
+ bgp_socket: socket to be used for sending
+ timer: timer to be used for scheduling
+ """
# References to outside objects.
self.socket = bgp_socket
self.timer = timer
- # Really new fields.
+ # BGP marker length plus length field length.
self.header_length = 18
- """BGP marker length plus length field length.""" # TODO: make it class (constant) attribute
+ # TODO: make it class (constant) attribute
+ # Computation of where next chunk ends depends on whether
+ # we are beyond length field.
self.reading_header = True
- """Computation of where next chunk ends depends on whether we are beyond length field."""
+ # Countdown towards next size computation.
self.bytes_to_read = self.header_length
- """Countdown towards next size computation."""
+ # Incremental buffer for message under read.
self.msg_in = ""
- """Incremental buffer for message under read."""
def read_message_chunk(self):
- """Read up to one message, do not return anything."""
- # TODO: We also could return the whole message, but currently nobody cares.
+ """Read up to one message
+
+ Note:
+ Currently it does not return anything.
+ """
+ # TODO: We could return the whole message, currently not needed.
# We assume the socket is readable.
chunk_message = self.socket.recv(self.bytes_to_read)
self.msg_in += chunk_message
self.bytes_to_read -= len(chunk_message)
- if not self.bytes_to_read: # TODO: bytes_to_read < 0 is not possible, right?
+ # TODO: bytes_to_read < 0 is not possible, right?
+ if not self.bytes_to_read:
# Finished reading a logical block.
if self.reading_header:
- # The logical block was a BGP header. Now we know size of message.
+ # The logical block was a BGP header.
+ # Now we know the size of the message.
self.reading_header = False
self.bytes_to_read = get_short_int_from_message(self.msg_in)
else: # We have finished reading the body of the message.
self.timer.reset_peer_hold_time()
# TODO: Do we want to count received messages?
# This version ignores the received message.
- # TODO: Should we do validation and exit on anything besides update or keepalive?
+ # TODO: Should we do validation and exit on anything
+ # besides update or keepalive?
# Prepare state for reading another message.
self.msg_in = ""
self.reading_header = True
self.bytes_to_read = self.header_length
- # We should not act upon peer_hold_time if we are reading something right now.
+ # We should not act upon peer_hold_time if we are reading
+ # something right now.
return
def wait_for_read(self):
- """When we know there are no more updates to send, we use this to avoid busy-wait."""
- # First, compute time to first predictable state change (or report event)
+ """Read message until timeout (next expected event).
+
+ Note:
+ Used when no more updates has to be sent to avoid busy-wait.
+ Currently it does not return anything.
+ """
+ # Compute time to the first predictable state change
event_time = self.timer.get_next_event_time()
- wait_timedelta = event_time - time.time() # snapshot_time would be imprecise
+ # snapshot_time would be imprecise
+ wait_timedelta = event_time - time.time()
if wait_timedelta < 0:
# The program got around to waiting to an event in "very near
# future" so late that it became a "past" event, thus tell
class WriteTracker(object):
- """Class tracking enqueueing messages and sending chunks of them."""
+ """Class tracking enqueueing messages and sending chunks of them.
+ """
def __init__(self, bgp_socket, generator, timer):
- """Set initial state."""
+ """The writter initialisation.
+
+ Arguments:
+ bgp_socket: socket to be used for sending
+ generator: generator to be used for message generation
+ timer: timer to be used for scheduling
+ """
# References to outside objects,
self.socket = bgp_socket
self.generator = generator
self.msg_out = ""
def enqueue_message_for_sending(self, message):
- """Change write state to include the message."""
+ """Enqueue message and change state.
+
+ Arguments:
+ message: message to be enqueued into the msg_out buffer
+ """
self.msg_out += message
self.bytes_to_send += len(message)
self.sending_message = True
def send_message_chunk_is_whole(self):
- """Perform actions related to sending (chunk of) message, return whether message was completed."""
+ """Send enqueued data from msg_out buffer
+
+ Returns:
+ :return: true if no remaining data to send
+ """
# We assume there is a msg_out to send and socket is writable.
# print "going to send", repr(self.msg_out)
self.timer.snapshot()
bytes_sent = self.socket.send(self.msg_out)
- self.msg_out = self.msg_out[bytes_sent:] # Forget the part of message that was sent.
+ # Forget the part of message that was sent.
+ self.msg_out = self.msg_out[bytes_sent:]
self.bytes_to_send -= bytes_sent
if not self.bytes_to_send:
# TODO: Is it possible to hit negative bytes_to_send?
self.sending_message = False
# We should have reset hold timer on peer side.
self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
- # Which means the possible reason for not prioritizing reads is gone.
+ # The possible reason for not prioritizing reads is gone.
return True
return False
"""Main loop has state so complex it warrants this separate class."""
def __init__(self, bgp_socket, generator, timer):
- """Set the initial state according to existing socket and generator."""
+ """The state tracker initialisation.
+
+ Arguments:
+ bgp_socket: socket to be used for sending / receiving
+ generator: generator to be used for message generation
+ timer: timer to be used for scheduling
+ """
# References to outside objects.
self.socket = bgp_socket
self.generator = generator
self.writer = WriteTracker(bgp_socket, generator, timer)
# Prioritization state.
self.prioritize_writing = False
- """
- In general, we prioritize reading over writing. But in order to not get blocked by neverending reads,
- we should check whether we are not risking running out of holdtime.
- So in some situations, this field is set to True to attempt finishing sending a message,
- after which this field resets back to False.
- """
- # TODO: Alternative is to switch fairly between reading and writing (called round robin from now on).
+ # In general, we prioritize reading over writing. But in order
+ # not to get blocked by neverending reads, we should
+ # check whether we are not risking running out of holdtime.
+ # So in some situations, this field is set to True to attempt
+ # finishing sending a message, after which this field resets
+ # back to False.
+ # TODO: Alternative is to switch fairly between reading and
+ # writing (called round robin from now on).
# Message counting is done in generator.
def perform_one_loop_iteration(self):
- """Calculate priority, resolve all ifs, call appropriate method, return to caller to repeat."""
+ """ The main loop iteration
+
+ Notes:
+ Calculates priority, resolves all conditions, calls
+ appropriate method and returns to caller to repeat.
+ """
self.timer.snapshot()
if not self.prioritize_writing:
if self.timer.is_time_for_my_keepalive():
if not self.writer.sending_message:
# We need to schedule a keepalive ASAP.
- self.writer.enqueue_message_for_sending(self.generator.keepalive_message)
- # We are sending a message now, so prioritize finishing it.
+ self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
+ # We are sending a message now, so let's prioritize it.
self.prioritize_writing = True
- # Now we know what our priorities are, we have to check which actions are available.
- # socket.socket() returns three lists, we store them to list of lists.
- list_list = select.select([self.socket], [self.socket], [self.socket], self.timer.report_timedelta)
+ # Now we know what our priorities are, we have to check
+ # which actions are available.
+ # socket.socket() returns three lists,
+ # we store them to list of lists.
+ list_list = select.select([self.socket], [self.socket], [self.socket],
+ self.timer.report_timedelta)
read_list, write_list, except_list = list_list
- # Lists are unpacked, each is either [] or [self.socket], so we will test them as boolean.
+ # Lists are unpacked, each is either [] or [self.socket],
+ # so we will test them as boolean.
if except_list:
+ logging.error("Exceptional state on the socket.")
raise RuntimeError("Exceptional state on socket", self.socket)
# We will do either read or write.
if not (self.prioritize_writing and write_list):
- # Either we have no reason to rush writes, or the socket is not writable.
+ # Either we have no reason to rush writes,
+ # or the socket is not writable.
# We are focusing on reading here.
if read_list: # there is something to read indeed
- # In this case we want to read chunk of message and repeat the select,
+ # In this case we want to read chunk of message
+ # and repeat the select,
self.reader.read_message_chunk()
return
# We were focusing on reading, but nothing to read was there.
# Good time to check peer for hold timer.
self.timer.check_peer_hold_time(self.timer.snapshot_time)
- # Things are quiet on the read front, we can go on and attempt to write.
+ # Quiet on the read front, we can have attempt to write.
if write_list:
- # Either we really want to reset peer's view of our hold timer, or there was nothing to read.
- if self.writer.sending_message: # We were in the middle of sending a message.
- whole = self.writer.send_message_chunk_is_whole() # Was it the end of a message?
- if self.prioritize_writing and whole: # We were pressed to send something and we did it.
- self.prioritize_writing = False # We prioritize reading again.
+ # Either we really want to reset peer's view of our hold
+ # timer, or there was nothing to read.
+ # Were we in the middle of sending a message?
+ if self.writer.sending_message:
+ # Was it the end of a message?
+ whole = self.writer.send_message_chunk_is_whole()
+ # We were pressed to send something and we did it.
+ if self.prioritize_writing and whole:
+ # We prioritize reading again.
+ self.prioritize_writing = False
return
- # Finally, we can look if there is some update message for us to generate.
- if self.generator.updates_to_send:
+ # Finally to check if still update messages to be generated.
+ if self.generator.remaining_prefixes:
msg_out = self.generator.compose_update_message()
- if not self.generator.updates_to_send: # We have just finished update generation, end-of-rib is due.
- msg_out += self.generator.eor_message
+ if not self.generator.remaining_prefixes:
+ # We have just finished update generation,
+ # end-of-rib is due.
+ logging.info("All update messages generated.")
+ logging.info("Storing performance results.")
+ self.generator.store_results()
+ logging.info("Finally an END-OF-RIB is going to be sent.")
+ msg_out += self.generator.update_message(wr_prefixes=[],
+ nlri_prefixes=[])
self.writer.enqueue_message_for_sending(msg_out)
- return # Attempt for the actual sending will be done in next iteration.
+ # Attempt for real sending to be done in next iteration.
+ return
# Nothing to write anymore, except occasional keepalives.
+ logging.info("Everything has been done." +
+ "Now just waiting for possible incomming message.")
# To avoid busy loop, we do idle waiting here.
self.reader.wait_for_read()
return
# We can neither read nor write.
- print "Input and output both blocked for", self.timer.report_timedelta, "seconds."
- # FIXME: Are we sure select has been really waiting the whole period?
+ logging.warning("Input and output both blocked for " +
+ str(self.timer.report_timedelta) + " seconds.")
+ # FIXME: Are we sure select has been really waiting
+ # the whole period?
return
def main():
- """Establish BGP connection and enter main loop for sending updates."""
+ """ One time initialisation and iterations looping.
+
+ Notes:
+ Establish BGP connection and run iterations.
+ """
arguments = parse_arguments()
+ logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s",
+ level=arguments.loglevel)
bgp_socket = establish_connection(arguments)
# Initial handshake phase. TODO: Can it be also moved to StateTracker?
# Receive open message before sending anything.
- # FIXME: Add parameter to send default open message first, to work with "you first" peers.
+ # FIXME: Add parameter to send default open message first,
+ # to work with "you first" peers.
msg_in = read_open_message(bgp_socket)
timer = TimeTracker(msg_in)
generator = MessageGenerator(arguments)
- msg_out = generator.open_message
- # print "DEBUG: going to send open:", binascii.hexlify(msg_out)
+ msg_out = generator.open_message()
+ logging.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
# Send our open message to the peer.
bgp_socket.send(msg_out)
# Wait for confirming keepalive.
# TODO: Surely in just one packet?
- msg_in = bgp_socket.recv(19) # Using exact keepalive length to not see possible updates.
- if msg_in != generator.keepalive_message:
- raise MessageError("Open not confirmed by keepalive, instead got", msg_in)
+ # Using exact keepalive length to not to see possible updates.
+ msg_in = bgp_socket.recv(19)
+ if msg_in != generator.keepalive_message():
+ logging.error("Open not confirmed by keepalive, instead got " +
+ binascii.hexlify(msg_in))
+ raise MessageError("Open not confirmed by keepalive, instead got",
+ msg_in)
timer.reset_peer_hold_time()
# Send the keepalive to indicate the connection is accepted.
timer.snapshot() # Remember this time.
- bgp_socket.send(generator.keepalive_message)
- timer.reset_my_keepalive_time(timer.snapshot_time) # Use the remembered time.
+ msg_out = generator.keepalive_message()
+ logging.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
+ bgp_socket.send(msg_out)
+ # Use the remembered time.
+ timer.reset_my_keepalive_time(timer.snapshot_time)
# End of initial handshake phase.
state = StateTracker(bgp_socket, generator, timer)
while True: # main reactor loop