# terms of the Eclipse Public License v1.0 which accompanies this distribution,
# and is available at http://www.eclipse.org/legal/epl-v10.html
-__author__ = "Vratko Polak"
-__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
-__license__ = "Eclipse Public License v1.0"
-__email__ = "vrpolak@cisco.com"
-
import argparse
import binascii
import ipaddr
import logging
import struct
+import thread
+from copy import deepcopy
+
+
+__author__ = "Vratko Polak"
+__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
+__license__ = "Eclipse Public License v1.0"
+__email__ = "vrpolak@cisco.com"
+
def parse_arguments():
"""Use argparse to get arguments,
str_help = "The number of prefixes to process without withdrawals"
parser.add_argument("--prefill", default="0", type=int, help=str_help)
str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
- parser.add_argument("--updates", choices=["single", "mixed"],
- default=["mixed"], help=str_help)
+ parser.add_argument("--updates", choices=["single", "separate"],
+ default=["separate"], help=str_help)
str_help = "Base prefix IP address for prefix generation"
parser.add_argument("--firstprefix", default="8.0.1.0",
type=ipaddr.IPv4Address, help=str_help)
parser.add_argument("--holdtime", default="180", type=int, help=str_help)
str_help = "Log level (--error, --warning, --info, --debug)"
parser.add_argument("--error", dest="loglevel", action="store_const",
- const=logging.ERROR, default=logging.ERROR,
+ const=logging.ERROR, default=logging.INFO,
help=str_help)
parser.add_argument("--warning", dest="loglevel", action="store_const",
- const=logging.WARNING, default=logging.ERROR,
+ const=logging.WARNING, default=logging.INFO,
help=str_help)
parser.add_argument("--info", dest="loglevel", action="store_const",
- const=logging.INFO, default=logging.ERROR,
+ const=logging.INFO, default=logging.INFO,
help=str_help)
parser.add_argument("--debug", dest="loglevel", action="store_const",
- const=logging.DEBUG, default=logging.ERROR,
+ const=logging.DEBUG, default=logging.INFO,
help=str_help)
+ str_help = "Log file name"
+ parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
str_help = "Trailing part of the csv result files for plotting purposes"
parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
str_help = "Minimum number of updates to reach to include result into csv."
parser.add_argument("--threshold", default="1000", type=int, help=str_help)
+ str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
+ parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
+ str_help = "How many play utilities are to be started."
+ parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
arguments = parser.parse_args()
+ if arguments.multiplicity < 1:
+ print "Multiplicity", arguments.multiplicity, "is not positive."
+ raise SystemExit(1)
# TODO: Are sanity checks (such as asnumber>=0) required?
return arguments
:return: socket.
"""
if arguments.listen:
- logging.info("Connecting in the listening mode.")
- logging.debug("Local IP address: " + str(arguments.myip))
- logging.debug("Local port: " + str(arguments.myport))
+ logger.info("Connecting in the listening mode.")
+ logger.debug("Local IP address: " + str(arguments.myip))
+ logger.debug("Local port: " + str(arguments.myport))
listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# bind need single tuple as argument
# TODO: Verify client IP is cotroller IP.
listening_socket.close()
else:
- logging.info("Connecting in the talking mode.")
- logging.debug("Local IP address: " + str(arguments.myip))
- logging.debug("Local port: " + str(arguments.myport))
- logging.debug("Remote IP address: " + str(arguments.peerip))
- logging.debug("Remote port: " + str(arguments.peerport))
+ logger.info("Connecting in the talking mode.")
+ logger.debug("Local IP address: " + str(arguments.myip))
+ logger.debug("Local port: " + str(arguments.myport))
+ logger.debug("Remote IP address: " + str(arguments.peerip))
+ logger.debug("Remote port: " + str(arguments.peerport))
talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# bind to force specified address and port
# socket does not spead ipaddr, hence str()
talking_socket.connect((str(arguments.peerip), arguments.peerport))
bgp_socket = talking_socket
- logging.info("Connected to ODL.")
+ logger.info("Connected to ODL.")
return bgp_socket
return short_int
+def get_prefix_list_from_hex(prefixes_hex):
+ """Get decoded list of prefixes (rfc4271#section-4.3)
+
+ Arguments:
+ :prefixes_hex: list of prefixes to be decoded in hex
+ Returns:
+ :return: list of prefixes in the form of ip address (X.X.X.X/X)
+ """
+ prefix_list = []
+ offset = 0
+ while offset < len(prefixes_hex):
+ prefix_bit_len_hex = prefixes_hex[offset]
+ prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
+ prefix_len = ((prefix_bit_len - 1) / 8) + 1
+ prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
+ prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
+ offset += 1 + prefix_len
+ prefix_list.append(prefix + "/" + str(prefix_bit_len))
+ return prefix_list
+
+
class MessageError(ValueError):
"""Value error with logging optimized for hexlified messages."""
# Some validation.
if len(msg_in) < 37:
# 37 is minimal length of open message with 4-byte AS number.
- logging.error("Got something else than open with 4-byte AS number: " +
- binascii.hexlify(msg_in))
- raise MessageError("Got something else than open with 4-byte AS number",
- msg_in)
+ logger.error("Got something else than open with 4-byte AS number: " +
+ binascii.hexlify(msg_in))
+ raise MessageError("Got something else than open with 4-byte AS number", msg_in)
# TODO: We could check BGP marker, but it is defined only later;
# decide what to do.
reported_length = get_short_int_from_message(msg_in)
if len(msg_in) != reported_length:
- logging.error("Message length is not " + str(reported_length) +
- " as stated in " + binascii.hexlify(msg_in))
+ logger.error("Message length is not " + str(reported_length) +
+ " as stated in " + binascii.hexlify(msg_in))
raise MessageError("Message length is not " + reported_length +
" as stated in ", msg_in)
- logging.info("Open message received.")
+ logger.info("Open message received.")
return msg_in
self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
self.results_file_name_default = args.results
self.performance_threshold_default = args.threshold
+ self.rfc4760 = args.rfc4760 == "yes"
# Default values used for randomized part
s1_slots = ((self.total_prefix_amount -
self.remaining_prefixes_threshold - 1) /
- conditional calling of logger methods enclosed inside condition: 8,6s
"""
- logging.info("Generator initialisation")
- logging.info(" Target total number of prefixes to be introduced: " +
- str(self.total_prefix_amount))
- logging.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
- str(self.prefix_length_default))
- logging.info(" My Autonomous System number: " +
- str(self.my_autonomous_system_default))
- logging.info(" My Hold Time: " + str(self.hold_time_default))
- logging.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
- logging.info(" Next Hop: " + str(self.next_hop_default))
- logging.info(" Prefix count to be inserted at once: " +
- str(self.prefix_count_to_add_default))
- logging.info(" Prefix count to be withdrawn at once: " +
- str(self.prefix_count_to_del_default))
- logging.info(" Fast pre-fill up to " +
- str(self.total_prefix_amount -
- self.remaining_prefixes_threshold) + " prefixes")
- logging.info(" Remaining number of prefixes to be processed " +
- "in parallel with withdrawals: " +
- str(self.remaining_prefixes_threshold))
- logging.debug(" Prefix index range used after pre-fill procedure [" +
- str(self.randomize_lowest_default) + ", " +
- str(self.randomize_highest_default) + "]")
+ logger.info("Generator initialisation")
+ logger.info(" Target total number of prefixes to be introduced: " +
+ str(self.total_prefix_amount))
+ logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
+ str(self.prefix_length_default))
+ logger.info(" My Autonomous System number: " +
+ str(self.my_autonomous_system_default))
+ logger.info(" My Hold Time: " + str(self.hold_time_default))
+ logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
+ logger.info(" Next Hop: " + str(self.next_hop_default))
+ logger.info(" Prefix count to be inserted at once: " +
+ str(self.prefix_count_to_add_default))
+ logger.info(" Prefix count to be withdrawn at once: " +
+ str(self.prefix_count_to_del_default))
+ logger.info(" Fast pre-fill up to " +
+ str(self.total_prefix_amount -
+ self.remaining_prefixes_threshold) + " prefixes")
+ logger.info(" Remaining number of prefixes to be processed " +
+ "in parallel with withdrawals: " +
+ str(self.remaining_prefixes_threshold))
+ logger.debug(" Prefix index range used after pre-fill procedure [" +
+ str(self.randomize_lowest_default) + ", " +
+ str(self.randomize_highest_default) + "]")
if self.single_update_default:
- logging.info(" Common single UPDATE will be generated " +
- "for both NLRI & WITHDRAWN lists")
+ logger.info(" Common single UPDATE will be generated " +
+ "for both NLRI & WITHDRAWN lists")
else:
- logging.info(" Two separate UPDATEs will be generated " +
- "for each NLRI & WITHDRAWN lists")
+ logger.info(" Two separate UPDATEs will be generated " +
+ "for each NLRI & WITHDRAWN lists")
if self.randomize_updates_default:
- logging.info(" Generation of UPDATE messages will be randomized")
- logging.info(" Let\"s go ...\n")
+ logger.info(" Generation of UPDATE messages will be randomized")
+ logger.info(" Let\'s go ...\n")
# TODO: Notification for hold timer expiration can be handy.
totals2 = None
performance2 = None
- logging.info("#" * 10 + " Final results " + "#" * 10)
- logging.info("Number of iterations: " + str(self.iteration))
- logging.info("Number of UPDATE messages sent in the pre-fill phase: " +
- str(self.phase1_updates_sent))
- logging.info("The pre-fill phase duration: " +
- str(self.phase1_stop_time - self.phase1_start_time) + "s")
- logging.info("Number of UPDATE messages sent in the 2nd test phase: " +
- str(self.phase2_updates_sent))
- logging.info("The 2nd test phase duration: " +
- str(self.phase2_stop_time - self.phase2_start_time) + "s")
- logging.info("Threshold for performance reporting: " + str(threshold))
+ logger.info("#" * 10 + " Final results " + "#" * 10)
+ logger.info("Number of iterations: " + str(self.iteration))
+ logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
+ str(self.phase1_updates_sent))
+ logger.info("The pre-fill phase duration: " +
+ str(self.phase1_stop_time - self.phase1_start_time) + "s")
+ logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
+ str(self.phase2_updates_sent))
+ logger.info("The 2nd test phase duration: " +
+ str(self.phase2_stop_time - self.phase2_start_time) + "s")
+ logger.info("Threshold for performance reporting: " + str(threshold))
# making labels
phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
second_line = second_line[:-2]
f.write(first_line + "\n")
f.write(second_line + "\n")
- logging.info("Performance results of message generator stored in " +
- file_name + ':')
- logging.info(" " + first_line)
- logging.info(" " + second_line)
+ logger.info("Message generator performance results stored in " +
+ file_name + ":")
+ logger.info(" " + first_line)
+ logger.info(" " + second_line)
finally:
f.close()
indexes.append(prefix_index)
prefixes.append(prefix_base + prefix_index * prefix_gap)
if self.log_debug:
- logging.debug(" Prefix slot index: " + str(slot_index))
- logging.debug(" Prefix slot size: " + str(slot_size))
- logging.debug(" Prefix count: " + str(prefix_count))
- logging.debug(" Prefix indexes: " + str(indexes))
- logging.debug(" Prefix list: " + str(prefixes))
+ logger.debug(" Prefix slot index: " + str(slot_index))
+ logger.debug(" Prefix slot size: " + str(slot_size))
+ logger.debug(" Prefix count: " + str(prefix_count))
+ logger.debug(" Prefix indexes: " + str(indexes))
+ logger.debug(" Prefix list: " + str(prefixes))
return prefixes
def compose_update_message(self, prefix_count_to_add=None,
prefix_count_to_del = self.prefix_count_to_del_default
# logging
if self.log_info and not (self.iteration % 1000):
- logging.info("Iteration: " + str(self.iteration) +
- " - total remaining prefixes: " +
- str(self.remaining_prefixes))
+ logger.info("Iteration: " + str(self.iteration) +
+ " - total remaining prefixes: " +
+ str(self.remaining_prefixes))
if self.log_debug:
- logging.debug("#" * 10 + " Iteration: " +
- str(self.iteration) + " " + "#" * 10)
- logging.debug("Remaining prefixes: " +
- str(self.remaining_prefixes))
+ logger.debug("#" * 10 + " Iteration: " +
+ str(self.iteration) + " " + "#" * 10)
+ logger.debug("Remaining prefixes: " +
+ str(self.remaining_prefixes))
# scenario type & one-shot counter
straightforward_scenario = (self.remaining_prefixes >
self.remaining_prefixes_threshold)
if straightforward_scenario:
prefix_count_to_del = 0
if self.log_debug:
- logging.debug("--- STARAIGHTFORWARD SCENARIO ---")
+ logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
if not self.phase1_start_time:
self.phase1_start_time = time.time()
else:
if self.log_debug:
- logging.debug("--- COMBINED SCENARIO ---")
+ logger.debug("--- COMBINED SCENARIO ---")
if not self.phase2_start_time:
self.phase2_start_time = time.time()
# tailor the number of prefixes if needed
slot_index_to_del = slot_index_to_add - self.slot_gap_default
# getting lists of prefixes for insertion in this iteration
if self.log_debug:
- logging.debug("Prefixes to be inserted in this iteration:")
+ logger.debug("Prefixes to be inserted in this iteration:")
prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
prefix_count=prefix_count_to_add)
# getting lists of prefixes for withdrawal in this iteration
if self.log_debug:
- logging.debug("Prefixes to be withdrawn in this iteration:")
+ logger.debug("Prefixes to be withdrawn in this iteration:")
prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
prefix_count=prefix_count_to_del)
# generating the mesage
bgp_identifier_hex = struct.pack(">I", bgp_identifier)
# Optional Parameters
- optional_parameters_hex = (
- "\x02" # Param type ("Capability Ad")
- "\x06" # Length (6 bytes)
- "\x01" # Capability type (NLRI Unicast),
- # see RFC 4760, secton 8
- "\x04" # Capability value length
- "\x00\x01" # AFI (Ipv4)
- "\x00" # (reserved)
- "\x01" # SAFI (Unicast)
+ optional_parameters_hex = ""
+ if self.rfc4760:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Capability type (NLRI Unicast),
+ # see RFC 4760, secton 8
+ "\x04" # Capability value length
+ "\x00\x01" # AFI (Ipv4)
+ "\x00" # (reserved)
+ "\x01" # SAFI (Unicast)
+ )
+ optional_parameters_hex += optional_parameter_hex
+ optional_parameter_hex = (
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
"\x41" # "32 bit AS Numbers Support"
# (see RFC 6793, section 3)
"\x04" # Capability value length
- # My AS in 32 bit format
- + struct.pack(">I", my_autonomous_system)
)
+ optional_parameter_hex += (
+ struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
+ )
+ optional_parameters_hex += optional_parameter_hex
# Optional Parameters Length
optional_parameters_length = len(optional_parameters_hex)
)
if self.log_debug:
- logging.debug("OPEN Message encoding")
- logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logging.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logging.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logging.debug(" Version=" + str(version) + " (0x" +
- binascii.hexlify(version_hex) + ")")
- logging.debug(" My Autonomous System=" +
- str(my_autonomous_system_2_bytes) + " (0x" +
- binascii.hexlify(my_autonomous_system_hex_2_bytes) +
- ")")
- logging.debug(" Hold Time=" + str(hold_time) + " (0x" +
- binascii.hexlify(hold_time_hex) + ")")
- logging.debug(" BGP Identifier=" + str(bgp_identifier) +
- " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
- logging.debug(" Optional Parameters Length=" +
- str(optional_parameters_length) + " (0x" +
- binascii.hexlify(optional_parameters_length_hex) +
- ")")
- logging.debug(" Optional Parameters=0x" +
- binascii.hexlify(optional_parameters_hex))
- logging.debug(" OPEN Message encoded: 0x" +
- binascii.b2a_hex(message_hex))
+ logger.debug("OPEN message encoding")
+ logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logger.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logger.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logger.debug(" Version=" + str(version) + " (0x" +
+ binascii.hexlify(version_hex) + ")")
+ logger.debug(" My Autonomous System=" +
+ str(my_autonomous_system_2_bytes) + " (0x" +
+ binascii.hexlify(my_autonomous_system_hex_2_bytes) +
+ ")")
+ logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
+ binascii.hexlify(hold_time_hex) + ")")
+ logger.debug(" BGP Identifier=" + str(bgp_identifier) +
+ " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
+ logger.debug(" Optional Parameters Length=" +
+ str(optional_parameters_length) + " (0x" +
+ binascii.hexlify(optional_parameters_length_hex) +
+ ")")
+ logger.debug(" Optional Parameters=0x" +
+ binascii.hexlify(optional_parameters_hex))
+ logger.debug("OPEN message encoded: 0x%s",
+ binascii.b2a_hex(message_hex))
return message_hex
"\x06" # Length (6)
"\x02" # AS segment type (AS_SEQUENCE)
"\x01" # AS segment length (1)
- # AS segment (4 bytes)
- + struct.pack(">I", my_autonomous_system) +
+ )
+ my_AS = struct.pack(">I", my_autonomous_system)
+ path_attributes_hex += my_AS # AS segment (4 bytes)
+ path_attributes_hex += (
"\x40" # Flags ("Well-Known")
"\x03" # Type (NEXT_HOP)
"\x04" # Length (4)
- # IP address of the next hop (4 bytes)
- + struct.pack(">I", int(next_hop))
+ )
+ next_hop = struct.pack(">I", int(next_hop))
+ path_attributes_hex += (
+ next_hop # IP address of the next hop (4 bytes)
)
else:
path_attributes_hex = ""
)
if self.log_debug:
- logging.debug("UPDATE Message encoding")
- logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logging.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logging.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logging.debug(" withdrawn_routes_length=" +
- str(withdrawn_routes_length) + " (0x" +
- binascii.hexlify(withdrawn_routes_length_hex) + ")")
- logging.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
- str(wr_prefix_length) + " (0x" +
- binascii.hexlify(withdrawn_routes_hex) + ")")
- logging.debug(" Total Path Attributes Length=" +
- str(total_path_attributes_length) + " (0x" +
- binascii.hexlify(total_path_attributes_length_hex) +
- ")")
- logging.debug(" Path Attributes=" + "(0x" +
- binascii.hexlify(path_attributes_hex) + ")")
- logging.debug(" Network Layer Reachability Information=" +
- str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
- " (0x" + binascii.hexlify(nlri_hex) + ")")
- logging.debug(" UPDATE Message encoded: 0x" +
- binascii.b2a_hex(message_hex))
+ logger.debug("UPDATE message encoding")
+ logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logger.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logger.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logger.debug(" withdrawn_routes_length=" +
+ str(withdrawn_routes_length) + " (0x" +
+ binascii.hexlify(withdrawn_routes_length_hex) + ")")
+ logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
+ str(wr_prefix_length) + " (0x" +
+ binascii.hexlify(withdrawn_routes_hex) + ")")
+ logger.debug(" Total Path Attributes Length=" +
+ str(total_path_attributes_length) + " (0x" +
+ binascii.hexlify(total_path_attributes_length_hex) +
+ ")")
+ logger.debug(" Path Attributes=" + "(0x" +
+ binascii.hexlify(path_attributes_hex) + ")")
+ logger.debug(" Network Layer Reachability Information=" +
+ str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
+ " (0x" + binascii.hexlify(nlri_hex) + ")")
+ logger.debug("UPDATE message encoded: 0x" +
+ binascii.b2a_hex(message_hex))
# updating counter
self.updates_sent += 1
)
if self.log_debug:
- logging.debug("NOTIFICATION Message encoding")
- logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logging.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logging.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logging.debug(" Error Code=" + str(error_code) + " (0x" +
- binascii.hexlify(error_code_hex) + ")")
- logging.debug(" Error Subode=" + str(error_subcode) + " (0x" +
- binascii.hexlify(error_subcode_hex) + ")")
- logging.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
- logging.debug(" NOTIFICATION Message encoded: 0x" +
- binascii.b2a_hex(message_hex))
+ logger.debug("NOTIFICATION message encoding")
+ logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logger.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logger.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logger.debug(" Error Code=" + str(error_code) + " (0x" +
+ binascii.hexlify(error_code_hex) + ")")
+ logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
+ binascii.hexlify(error_subcode_hex) + ")")
+ logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
+ logger.debug("NOTIFICATION message encoded: 0x%s",
+ binascii.b2a_hex(message_hex))
return message_hex
)
if self.log_debug:
- logging.debug("KEEP ALIVE Message encoding")
- logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logging.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logging.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logging.debug(" KEEP ALIVE Message encoded: 0x" +
- binascii.b2a_hex(message_hex))
+ logger.debug("KEEP ALIVE message encoding")
+ logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
+ logger.debug(" Length=" + str(length) + " (0x" +
+ binascii.hexlify(length_hex) + ")")
+ logger.debug(" Type=" + str(type) + " (0x" +
+ binascii.hexlify(type_hex) + ")")
+ logger.debug("KEEP ALIVE message encoded: 0x%s",
+ binascii.b2a_hex(message_hex))
return message_hex
if hold_timedelta > peer_hold_timedelta:
hold_timedelta = peer_hold_timedelta
if hold_timedelta != 0 and hold_timedelta < 3:
- logging.error("Invalid hold timedelta value: " + str(hold_timedelta))
+ logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
self.hold_timedelta = hold_timedelta
# If we do not hear from peer this long, we assume it has died.
if self.hold_timedelta != 0:
# time.time() may be too strict
if snapshot_time > self.peer_hold_time:
- logging.error("Peer has overstepped the hold timer.")
+ logger.error("Peer has overstepped the hold timer.")
raise RuntimeError("Peer has overstepped the hold timer.")
# TODO: Include hold_timedelta?
# TODO: Add notification sending (attempt). That means
self.bytes_to_read = self.header_length
# Incremental buffer for message under read.
self.msg_in = ""
+ # Initialising counters
+ self.updates_received = 0
+ self.prefixes_introduced = 0
+ self.prefixes_withdrawn = 0
+ self.rx_idle_time = 0
+ self.rx_activity_detected = True
def read_message_chunk(self):
"""Read up to one message
# The logical block was a BGP header.
# Now we know the size of the message.
self.reading_header = False
- self.bytes_to_read = get_short_int_from_message(self.msg_in)
+ self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
+ self.header_length)
else: # We have finished reading the body of the message.
# Peer has just proven it is still alive.
self.timer.reset_peer_hold_time()
# TODO: Should we do validation and exit on anything
# besides update or keepalive?
# Prepare state for reading another message.
- logging.debug("Message received: 0x%s", binascii.b2a_hex(self.msg_in))
+ message_type_hex = self.msg_in[self.header_length]
+ if message_type_hex == "\x01":
+ logger.info("OPEN message received: 0x%s",
+ binascii.b2a_hex(self.msg_in))
+ elif message_type_hex == "\x02":
+ logger.debug("UPDATE message received: 0x%s",
+ binascii.b2a_hex(self.msg_in))
+ self.decode_update_message(self.msg_in)
+ elif message_type_hex == "\x03":
+ logger.info("NOTIFICATION message received: 0x%s",
+ binascii.b2a_hex(self.msg_in))
+ elif message_type_hex == "\x04":
+ logger.info("KEEP ALIVE message received: 0x%s",
+ binascii.b2a_hex(self.msg_in))
+ else:
+ logger.warning("Unexpected message received: 0x%s",
+ binascii.b2a_hex(self.msg_in))
self.msg_in = ""
self.reading_header = True
self.bytes_to_read = self.header_length
# something right now.
return
+ def decode_path_attributes(self, path_attributes_hex):
+ """Decode the Path Attributes field (rfc4271#section-4.3)
+
+ Arguments:
+ :path_attributes: path_attributes field to be decoded in hex
+ Returns:
+ :return: None
+ """
+ hex_to_decode = path_attributes_hex
+
+ while len(hex_to_decode):
+ attr_flags_hex = hex_to_decode[0]
+ attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
+# attr_optional_bit = attr_flags & 128
+# attr_transitive_bit = attr_flags & 64
+# attr_partial_bit = attr_flags & 32
+ attr_extended_length_bit = attr_flags & 16
+
+ attr_type_code_hex = hex_to_decode[1]
+ attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
+
+ if attr_extended_length_bit:
+ attr_length_hex = hex_to_decode[2:4]
+ attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
+ attr_value_hex = hex_to_decode[4:4 + attr_length]
+ hex_to_decode = hex_to_decode[4 + attr_length:]
+ else:
+ attr_length_hex = hex_to_decode[2]
+ attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
+ attr_value_hex = hex_to_decode[3:3 + attr_length]
+ hex_to_decode = hex_to_decode[3 + attr_length:]
+
+ if attr_type_code == 1:
+ logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 2:
+ logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 3:
+ logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 4:
+ logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 5:
+ logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 6:
+ logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 7:
+ logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 14: # rfc4760#section-3
+ logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ address_family_identifier_hex = attr_value_hex[0:2]
+ logger.debug(" Address Family Identifier = 0x%s",
+ binascii.b2a_hex(address_family_identifier_hex))
+ subsequent_address_family_identifier_hex = attr_value_hex[2]
+ logger.debug(" Subsequent Address Family Identifier = 0x%s",
+ binascii.b2a_hex(subsequent_address_family_identifier_hex))
+ next_hop_netaddr_len_hex = attr_value_hex[3]
+ next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
+ logger.debug(" Length of Next Hop Network Address = 0x%s (%s)",
+ binascii.b2a_hex(next_hop_netaddr_len_hex),
+ next_hop_netaddr_len)
+ next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
+ logger.debug(" Network Address of Next Hop = 0x%s",
+ binascii.b2a_hex(next_hop_netaddr_hex))
+ reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
+ logger.debug(" Reserved = 0x%s",
+ binascii.b2a_hex(reserved_hex))
+ nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
+ logger.debug(" Network Layer Reachability Information = 0x%s",
+ binascii.b2a_hex(nlri_hex))
+ nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
+ logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
+ for prefix in nlri_prefix_list:
+ logger.debug(" nlri_prefix_received: %s", prefix)
+ self.prefixes_introduced += len(nlri_prefix_list) # update counter
+ elif attr_type_code == 15: # rfc4760#section-4
+ logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ address_family_identifier_hex = attr_value_hex[0:2]
+ logger.debug(" Address Family Identifier = 0x%s",
+ binascii.b2a_hex(address_family_identifier_hex))
+ subsequent_address_family_identifier_hex = attr_value_hex[2]
+ logger.debug(" Subsequent Address Family Identifier = 0x%s",
+ binascii.b2a_hex(subsequent_address_family_identifier_hex))
+ wd_hex = attr_value_hex[3:]
+ logger.debug(" Withdrawn Routes = 0x%s",
+ binascii.b2a_hex(wd_hex))
+ wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
+ logger.debug(" Withdrawn routes prefix list: %s",
+ wdr_prefix_list)
+ for prefix in wdr_prefix_list:
+ logger.debug(" withdrawn_prefix_received: %s", prefix)
+ self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
+ else:
+ logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ return None
+
+ def decode_update_message(self, msg):
+ """Decode an UPDATE message (rfc4271#section-4.3)
+
+ Arguments:
+ :msg: message to be decoded in hex
+ Returns:
+ :return: None
+ """
+ logger.debug("Decoding update message:")
+ # message header - marker
+ marker_hex = msg[:16]
+ logger.debug("Message header marker: 0x%s",
+ binascii.b2a_hex(marker_hex))
+ # message header - message length
+ msg_length_hex = msg[16:18]
+ msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
+ logger.debug("Message lenght: 0x%s (%s)",
+ binascii.b2a_hex(msg_length_hex), msg_length)
+ # message header - message type
+ msg_type_hex = msg[18:19]
+ msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
+ if msg_type == 2:
+ logger.debug("Message type: 0x%s (update)",
+ binascii.b2a_hex(msg_type_hex))
+ # withdrawn routes length
+ wdr_length_hex = msg[19:21]
+ wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
+ logger.debug("Withdrawn routes lenght: 0x%s (%s)",
+ binascii.b2a_hex(wdr_length_hex), wdr_length)
+ # withdrawn routes
+ wdr_hex = msg[21:21 + wdr_length]
+ logger.debug("Withdrawn routes: 0x%s",
+ binascii.b2a_hex(wdr_hex))
+ wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
+ logger.debug("Withdrawn routes prefix list: %s",
+ wdr_prefix_list)
+ for prefix in wdr_prefix_list:
+ logger.debug("withdrawn_prefix_received: %s", prefix)
+ # total path attribute length
+ total_pa_length_offset = 21 + wdr_length
+ total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
+ total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
+ logger.debug("Total path attribute lenght: 0x%s (%s)",
+ binascii.b2a_hex(total_pa_length_hex), total_pa_length)
+ # path attributes
+ pa_offset = total_pa_length_offset + 2
+ pa_hex = msg[pa_offset:pa_offset+total_pa_length]
+ logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
+ self.decode_path_attributes(pa_hex)
+ # network layer reachability information length
+ nlri_length = msg_length - 23 - total_pa_length - wdr_length
+ logger.debug("Calculated NLRI length: %s", nlri_length)
+ # network layer reachability information
+ nlri_offset = pa_offset + total_pa_length
+ nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
+ logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
+ nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
+ logger.debug("NLRI prefix list: %s", nlri_prefix_list)
+ for prefix in nlri_prefix_list:
+ logger.debug("nlri_prefix_received: %s", prefix)
+ # Updating counters
+ self.updates_received += 1
+ self.prefixes_introduced += len(nlri_prefix_list)
+ self.prefixes_withdrawn += len(wdr_prefix_list)
+ else:
+ logger.error("Unexpeced message type 0x%s in 0x%s",
+ binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
+
def wait_for_read(self):
"""Read message until timeout (next expected event).
# Compute time to the first predictable state change
event_time = self.timer.get_next_event_time()
# snapshot_time would be imprecise
- wait_timedelta = event_time - time.time()
+ wait_timedelta = min(event_time - time.time(), 10)
if wait_timedelta < 0:
# The program got around to waiting to an event in "very near
# future" so late that it became a "past" event, thus tell
# select.error("Invalid parameter") (for everything else).
wait_timedelta = 0
# And wait for event or something to read.
+
+ if not self.rx_activity_detected or not (self.updates_received % 100):
+ # right time to write statistics to the log (not for every update and
+ # not too frequently to avoid having large log files)
+ logger.info("total_received_update_message_counter: %s",
+ self.updates_received)
+ logger.info("total_received_nlri_prefix_counter: %s",
+ self.prefixes_introduced)
+ logger.info("total_received_withdrawn_prefix_counter: %s",
+ self.prefixes_withdrawn)
+
+ start_time = time.time()
select.select([self.socket], [], [self.socket], wait_timedelta)
- # Not checking anything, that will be done in next iteration.
+ timedelta = time.time() - start_time
+ self.rx_idle_time += timedelta
+ self.rx_activity_detected = timedelta < 1
+
+ if not self.rx_activity_detected or not (self.updates_received % 100):
+ # right time to write statistics to the log (not for every update and
+ # not too frequently to avoid having large log files)
+ logger.info("... idle for %.3fs", timedelta)
+ logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
return
if not self.writer.sending_message:
# We need to schedule a keepalive ASAP.
self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
+ logger.info("KEEP ALIVE is sent.")
# We are sending a message now, so let's prioritize it.
self.prioritize_writing = True
# Now we know what our priorities are, we have to check
# Lists are unpacked, each is either [] or [self.socket],
# so we will test them as boolean.
if except_list:
- logging.error("Exceptional state on the socket.")
+ logger.error("Exceptional state on the socket.")
raise RuntimeError("Exceptional state on socket", self.socket)
# We will do either read or write.
if not (self.prioritize_writing and write_list):
if not self.generator.remaining_prefixes:
# We have just finished update generation,
# end-of-rib is due.
- logging.info("All update messages generated.")
- logging.info("Storing performance results.")
+ logger.info("All update messages generated.")
+ logger.info("Storing performance results.")
self.generator.store_results()
- logging.info("Finally an END-OF-RIB is going to be sent.")
+ logger.info("Finally an END-OF-RIB is sent.")
msg_out += self.generator.update_message(wr_prefixes=[],
nlri_prefixes=[])
self.writer.enqueue_message_for_sending(msg_out)
# Attempt for real sending to be done in next iteration.
return
- # Nothing to write anymore, except occasional keepalives.
- logging.info("Everything has been done." +
- "Now just waiting for possible incomming message.")
+ # Nothing to write anymore.
# To avoid busy loop, we do idle waiting here.
self.reader.wait_for_read()
return
# We can neither read nor write.
- logging.warning("Input and output both blocked for " +
- str(self.timer.report_timedelta) + " seconds.")
+ logger.warning("Input and output both blocked for " +
+ str(self.timer.report_timedelta) + " seconds.")
# FIXME: Are we sure select has been really waiting
# the whole period?
return
-def main():
- """ One time initialisation and iterations looping.
+def create_logger(loglevel, logfile):
+ """Create logger object
+ Arguments:
+ :loglevel: log level
+ :logfile: log file name
+ Returns:
+ :return: logger object
+ """
+ logger = logging.getLogger("logger")
+ log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
+ console_handler = logging.StreamHandler()
+ file_handler = logging.FileHandler(logfile, mode="w")
+ console_handler.setFormatter(log_formatter)
+ file_handler.setFormatter(log_formatter)
+ logger.addHandler(console_handler)
+ logger.addHandler(file_handler)
+ logger.setLevel(loglevel)
+ return logger
+
+
+def job(arguments):
+ """One time initialisation and iterations looping.
Notes:
Establish BGP connection and run iterations.
+
+ Arguments:
+ :arguments: Command line arguments
+ Returns:
+ :return: None
"""
- arguments = parse_arguments()
- logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s",
- level=arguments.loglevel)
bgp_socket = establish_connection(arguments)
# Initial handshake phase. TODO: Can it be also moved to StateTracker?
# Receive open message before sending anything.
timer = TimeTracker(msg_in)
generator = MessageGenerator(arguments)
msg_out = generator.open_message()
- logging.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
+ logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
# Send our open message to the peer.
bgp_socket.send(msg_out)
# Wait for confirming keepalive.
# Using exact keepalive length to not to see possible updates.
msg_in = bgp_socket.recv(19)
if msg_in != generator.keepalive_message():
- logging.error("Open not confirmed by keepalive, instead got " +
- binascii.hexlify(msg_in))
+ logger.error("Open not confirmed by keepalive, instead got " +
+ binascii.hexlify(msg_in))
raise MessageError("Open not confirmed by keepalive, instead got",
msg_in)
timer.reset_peer_hold_time()
# Send the keepalive to indicate the connection is accepted.
timer.snapshot() # Remember this time.
msg_out = generator.keepalive_message()
- logging.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
+ logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
bgp_socket.send(msg_out)
# Use the remembered time.
timer.reset_my_keepalive_time(timer.snapshot_time)
while True: # main reactor loop
state.perform_one_loop_iteration()
+
+def threaded_job(arguments):
+ """Run the job threaded
+
+ Arguments:
+ :arguments: Command line arguments
+ Returns:
+ :return: None
+ """
+ amount_left = arguments.amount
+ utils_left = arguments.multiplicity
+ prefix_current = arguments.firstprefix
+ myip_current = arguments.myip
+ thread_args = []
+
+ while 1:
+ amount_per_util = (amount_left - 1) / utils_left + 1 # round up
+ amount_left -= amount_per_util
+ utils_left -= 1
+
+ args = deepcopy(arguments)
+ args.amount = amount_per_util
+ args.firstprefix = prefix_current
+ args.myip = myip_current
+ thread_args.append(args)
+
+ if not utils_left:
+ break
+ prefix_current += amount_per_util * 16
+ myip_current += 1
+
+ try:
+ # Create threads
+ for t in thread_args:
+ thread.start_new_thread(job, (t,))
+ except Exception:
+ print "Error: unable to start thread."
+ raise SystemExit(2)
+
+ # Work remains forever
+ while 1:
+ time.sleep(5)
+
+
if __name__ == "__main__":
- main()
+ arguments = parse_arguments()
+ logger = create_logger(arguments.loglevel, arguments.logfile)
+ if arguments.multiplicity > 1:
+ threaded_job(arguments)
+ else:
+ job(arguments)