class SafeDict(dict):
- '''Thread safe dictionary
+ """Thread safe dictionary
The object will serve as thread safe data storage.
It should be used with "with" statement.
- '''
+ """
- def __init__(self, * p_arg, ** n_arg):
+ def __init__(self, *p_arg, **n_arg):
super(SafeDict, self).__init__()
self._lock = threading.Lock()
parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
# FIXME: We are acting as iBGP peer,
# we should mirror AS number from peer's open message.
- str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
+ str_help = "Amount of IP prefixes to generate. (negative means " "infinite" ")."
parser.add_argument("--amount", default="1", type=int, help=str_help)
str_help = "Rpc server port."
parser.add_argument("--port", default="8000", type=int, help=str_help)
str_help = "The number of prefixes to process without withdrawals"
parser.add_argument("--prefill", default="0", type=int, help=str_help)
str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
- parser.add_argument("--updates", choices=["single", "separate"],
- default=["separate"], help=str_help)
+ parser.add_argument(
+ "--updates", choices=["single", "separate"], default=["separate"], help=str_help
+ )
str_help = "Base prefix IP address for prefix generation"
- parser.add_argument("--firstprefix", default="8.0.1.0",
- type=ipaddr.IPv4Address, help=str_help)
+ parser.add_argument(
+ "--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help
+ )
str_help = "The prefix length."
parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
str_help = "Listen for connection, instead of initiating it."
parser.add_argument("--listen", action="store_true", help=str_help)
- str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
- "Default value only suitable for listening.")
- parser.add_argument("--myip", default="0.0.0.0",
- type=ipaddr.IPv4Address, help=str_help)
- str_help = ("TCP port to bind to when listening or initiating connection." +
- "Default only suitable for initiating.")
+ str_help = (
+ "Numeric IP Address to bind to and derive BGP ID from."
+ + "Default value only suitable for listening."
+ )
+ parser.add_argument(
+ "--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help
+ )
+ str_help = (
+ "TCP port to bind to when listening or initiating connection."
+ + "Default only suitable for initiating."
+ )
parser.add_argument("--myport", default="0", type=int, help=str_help)
str_help = "The IP of the next hop to be placed into the update messages."
- parser.add_argument("--nexthop", default="192.0.2.1",
- type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
+ parser.add_argument(
+ "--nexthop",
+ default="192.0.2.1",
+ type=ipaddr.IPv4Address,
+ dest="nexthop",
+ help=str_help,
+ )
str_help = "Identifier of the route originator."
- parser.add_argument("--originator", default=None,
- type=ipaddr.IPv4Address, dest="originator", help=str_help)
+ parser.add_argument(
+ "--originator",
+ default=None,
+ type=ipaddr.IPv4Address,
+ dest="originator",
+ help=str_help,
+ )
str_help = "Cluster list item identifier."
- parser.add_argument("--cluster", default=None,
- type=ipaddr.IPv4Address, dest="cluster", help=str_help)
- str_help = ("Numeric IP Address to try to connect to." +
- "Currently no effect in listening mode.")
- parser.add_argument("--peerip", default="127.0.0.2",
- type=ipaddr.IPv4Address, help=str_help)
+ parser.add_argument(
+ "--cluster",
+ default=None,
+ type=ipaddr.IPv4Address,
+ dest="cluster",
+ help=str_help,
+ )
+ str_help = (
+ "Numeric IP Address to try to connect to."
+ + "Currently no effect in listening mode."
+ )
+ parser.add_argument(
+ "--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help
+ )
str_help = "TCP port to try to connect to. No effect in listening mode."
parser.add_argument("--peerport", default="179", type=int, help=str_help)
str_help = "Local hold time."
parser.add_argument("--holdtime", default="180", type=int, help=str_help)
str_help = "Log level (--error, --warning, --info, --debug)"
- parser.add_argument("--error", dest="loglevel", action="store_const",
- const=logging.ERROR, default=logging.INFO,
- help=str_help)
- parser.add_argument("--warning", dest="loglevel", action="store_const",
- const=logging.WARNING, default=logging.INFO,
- help=str_help)
- parser.add_argument("--info", dest="loglevel", action="store_const",
- const=logging.INFO, default=logging.INFO,
- help=str_help)
- parser.add_argument("--debug", dest="loglevel", action="store_const",
- const=logging.DEBUG, default=logging.INFO,
- help=str_help)
+ parser.add_argument(
+ "--error",
+ dest="loglevel",
+ action="store_const",
+ const=logging.ERROR,
+ default=logging.INFO,
+ help=str_help,
+ )
+ parser.add_argument(
+ "--warning",
+ dest="loglevel",
+ action="store_const",
+ const=logging.WARNING,
+ default=logging.INFO,
+ help=str_help,
+ )
+ parser.add_argument(
+ "--info",
+ dest="loglevel",
+ action="store_const",
+ const=logging.INFO,
+ default=logging.INFO,
+ help=str_help,
+ )
+ parser.add_argument(
+ "--debug",
+ dest="loglevel",
+ action="store_const",
+ const=logging.DEBUG,
+ default=logging.INFO,
+ help=str_help,
+ )
str_help = "Log file name"
parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
str_help = "Trailing part of the csv result files for plotting purposes"
str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
str_help = "Using peerip instead of myip for xmlrpc server"
- parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
+ parser.add_argument(
+ "--usepeerip", default=False, action="store_true", help=str_help
+ )
str_help = "Link-State NLRI supported"
parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
str_help = "Link-State NLRI: Identifier"
str_help = "Link-State NLRI: LSP ID"
parser.add_argument("-lspid", default="1", type=int, help=str_help)
str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
- parser.add_argument("--lstsaddr", default="1.2.3.4",
- type=ipaddr.IPv4Address, help=str_help)
+ parser.add_argument(
+ "--lstsaddr", default="1.2.3.4", type=ipaddr.IPv4Address, help=str_help
+ )
str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
- parser.add_argument("--lsteaddr", default="5.6.7.8",
- type=ipaddr.IPv4Address, help=str_help)
+ parser.add_argument(
+ "--lsteaddr", default="5.6.7.8", type=ipaddr.IPv4Address, help=str_help
+ )
str_help = "Link-State NLRI: Identifier Step"
parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
str_help = "Link-State NLRI: Tunnel ID Step"
str_help = "Open message includes L3VPN-MULTICAST arguments.\
Enabling this flag makes the script not decoding the update mesage, because of not\
supported decoding for these elements."
- parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
- str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
+ parser.add_argument(
+ "--l3vpn_mcast", default=False, action="store_true", help=str_help
+ )
+ str_help = (
+ "Open message includes L3VPN-UNICAST arguments, without message decoding."
+ )
parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
- parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
+ parser.add_argument(
+ "--rt_constrain", default=False, action="store_true", help=str_help
+ )
str_help = "Open message includes ipv6-unicast family, without message decoding."
parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
str_help = "Add all supported families without message decoding."
prefix_bit_len_hex = prefixes_hex[offset]
prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
prefix_len = ((prefix_bit_len - 1) / 8) + 1
- prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
+ prefix_hex = prefixes_hex[offset + 1 : offset + 1 + prefix_len]
prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
offset += 1 + prefix_len
prefix_list.append(prefix + "/" + str(prefix_bit_len))
reported_length = get_short_int_from_message(msg_in)
if len(msg_in) != reported_length:
error_msg = (
- "Expected message length (" + reported_length +
- ") does not match actual length (" + str(len(msg_in)) + ")"
+ "Expected message length ("
+ + reported_length
+ + ") does not match actual length ("
+ + str(len(msg_in))
+ + ")"
)
logger.error(error_msg + binascii.hexlify(msg_in))
raise MessageError(error_msg, msg_in)
if self.bgpls:
self.prefix_count_to_add_default = 1
self.prefix_count_to_del_default = 0
- self.ls_nlri_default = {"Identifier": args.lsid,
- "TunnelID": args.lstid,
- "LSPID": args.lspid,
- "IPv4TunnelSenderAddress": args.lstsaddr,
- "IPv4TunnelEndPointAddress": args.lsteaddr}
+ self.ls_nlri_default = {
+ "Identifier": args.lsid,
+ "TunnelID": args.lstid,
+ "LSPID": args.lspid,
+ "IPv4TunnelSenderAddress": args.lstsaddr,
+ "IPv4TunnelEndPointAddress": args.lsteaddr,
+ }
self.lsid_step = args.lsidstep
self.lstid_step = args.lstidstep
self.lspid_step = args.lspidstep
self.lstsaddr_step = args.lstsaddrstep
self.lsteaddr_step = args.lsteaddrstep
# Default values used for randomized part
- s1_slots = ((self.total_prefix_amount -
- self.remaining_prefixes_threshold - 1) /
- self.prefix_count_to_add_default + 1)
- s2_slots = (
- (self.remaining_prefixes_threshold - 1)
- / (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
- + 1
- )
+ s1_slots = (
+ self.total_prefix_amount - self.remaining_prefixes_threshold - 1
+ ) / self.prefix_count_to_add_default + 1
+ s2_slots = (self.remaining_prefixes_threshold - 1) / (
+ self.prefix_count_to_add_default - self.prefix_count_to_del_default
+ ) + 1
# S1_First_Index = 0
# S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
s2_first_index = s1_slots * self.prefix_count_to_add_default
- s2_last_index = (s2_first_index +
- s2_slots * (self.prefix_count_to_add_default -
- self.prefix_count_to_del_default) - 1)
- self.slot_gap_default = ((self.total_prefix_amount -
- self.remaining_prefixes_threshold - 1) /
- self.prefix_count_to_add_default + 1)
+ s2_last_index = (
+ s2_first_index
+ + s2_slots
+ * (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
+ - 1
+ )
+ self.slot_gap_default = (
+ self.total_prefix_amount - self.remaining_prefixes_threshold - 1
+ ) / self.prefix_count_to_add_default + 1
self.randomize_lowest_default = s2_first_index
self.randomize_highest_default = s2_last_index
# Initialising counters
"""
logger.info("Generator initialisation")
- logger.info(" Target total number of prefixes to be introduced: " +
- str(self.total_prefix_amount))
- logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
- str(self.prefix_length_default))
- logger.info(" My Autonomous System number: " +
- str(self.my_autonomous_system_default))
+ logger.info(
+ " Target total number of prefixes to be introduced: "
+ + str(self.total_prefix_amount)
+ )
+ logger.info(
+ " Prefix base: "
+ + str(self.prefix_base_default)
+ + "/"
+ + str(self.prefix_length_default)
+ )
+ logger.info(
+ " My Autonomous System number: " + str(self.my_autonomous_system_default)
+ )
logger.info(" My Hold Time: " + str(self.hold_time_default))
logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
logger.info(" Next Hop: " + str(self.next_hop_default))
logger.info(" Originator ID: " + str(self.originator_id_default))
logger.info(" Cluster list: " + str(self.cluster_list_item_default))
- logger.info(" Prefix count to be inserted at once: " +
- str(self.prefix_count_to_add_default))
- logger.info(" Prefix count to be withdrawn at once: " +
- str(self.prefix_count_to_del_default))
- logger.info(" Fast pre-fill up to " +
- str(self.total_prefix_amount -
- self.remaining_prefixes_threshold) + " prefixes")
- logger.info(" Remaining number of prefixes to be processed " +
- "in parallel with withdrawals: " +
- str(self.remaining_prefixes_threshold))
- logger.debug(" Prefix index range used after pre-fill procedure [" +
- str(self.randomize_lowest_default) + ", " +
- str(self.randomize_highest_default) + "]")
+ logger.info(
+ " Prefix count to be inserted at once: "
+ + str(self.prefix_count_to_add_default)
+ )
+ logger.info(
+ " Prefix count to be withdrawn at once: "
+ + str(self.prefix_count_to_del_default)
+ )
+ logger.info(
+ " Fast pre-fill up to "
+ + str(self.total_prefix_amount - self.remaining_prefixes_threshold)
+ + " prefixes"
+ )
+ logger.info(
+ " Remaining number of prefixes to be processed "
+ + "in parallel with withdrawals: "
+ + str(self.remaining_prefixes_threshold)
+ )
+ logger.debug(
+ " Prefix index range used after pre-fill procedure ["
+ + str(self.randomize_lowest_default)
+ + ", "
+ + str(self.randomize_highest_default)
+ + "]"
+ )
if self.single_update_default:
- logger.info(" Common single UPDATE will be generated " +
- "for both NLRI & WITHDRAWN lists")
+ logger.info(
+ " Common single UPDATE will be generated "
+ + "for both NLRI & WITHDRAWN lists"
+ )
else:
- logger.info(" Two separate UPDATEs will be generated " +
- "for each NLRI & WITHDRAWN lists")
+ logger.info(
+ " Two separate UPDATEs will be generated "
+ + "for each NLRI & WITHDRAWN lists"
+ )
if self.randomize_updates_default:
logger.info(" Generation of UPDATE messages will be randomized")
- logger.info(" Let\'s go ...\n")
+ logger.info(" Let's go ...\n")
# TODO: Notification for hold timer expiration can be handy.
# performance calculation
if self.phase1_updates_sent >= threshold:
totals1 = self.phase1_updates_sent
- performance1 = int(self.phase1_updates_sent /
- (self.phase1_stop_time - self.phase1_start_time))
+ performance1 = int(
+ self.phase1_updates_sent
+ / (self.phase1_stop_time - self.phase1_start_time)
+ )
else:
totals1 = None
performance1 = None
if self.phase2_updates_sent >= threshold:
totals2 = self.phase2_updates_sent
- performance2 = int(self.phase2_updates_sent /
- (self.phase2_stop_time - self.phase2_start_time))
+ performance2 = int(
+ self.phase2_updates_sent
+ / (self.phase2_stop_time - self.phase2_start_time)
+ )
else:
totals2 = None
performance2 = None
logger.info("#" * 10 + " Final results " + "#" * 10)
logger.info("Number of iterations: " + str(self.iteration))
- logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
- str(self.phase1_updates_sent))
- logger.info("The pre-fill phase duration: " +
- str(self.phase1_stop_time - self.phase1_start_time) + "s")
- logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
- str(self.phase2_updates_sent))
- logger.info("The 2nd test phase duration: " +
- str(self.phase2_stop_time - self.phase2_start_time) + "s")
+ logger.info(
+ "Number of UPDATE messages sent in the pre-fill phase: "
+ + str(self.phase1_updates_sent)
+ )
+ logger.info(
+ "The pre-fill phase duration: "
+ + str(self.phase1_stop_time - self.phase1_start_time)
+ + "s"
+ )
+ logger.info(
+ "Number of UPDATE messages sent in the 2nd test phase: "
+ + str(self.phase2_updates_sent)
+ )
+ logger.info(
+ "The 2nd test phase duration: "
+ + str(self.phase2_stop_time - self.phase2_start_time)
+ + "s"
+ )
logger.info("Threshold for performance reporting: " + str(threshold))
# making labels
- phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
- " route(s) per UPDATE")
+ phase1_label = (
+ "pre-fill " + str(self.prefix_count_to_add_default) + " route(s) per UPDATE"
+ )
if self.single_update_default:
- phase2_label = "+" + (str(self.prefix_count_to_add_default) +
- "/-" + str(self.prefix_count_to_del_default) +
- " routes per UPDATE")
+ phase2_label = "+" + (
+ str(self.prefix_count_to_add_default)
+ + "/-"
+ + str(self.prefix_count_to_del_default)
+ + " routes per UPDATE"
+ )
else:
- phase2_label = "+" + (str(self.prefix_count_to_add_default) +
- "/-" + str(self.prefix_count_to_del_default) +
- " routes in two UPDATEs")
+ phase2_label = "+" + (
+ str(self.prefix_count_to_add_default)
+ + "/-"
+ + str(self.prefix_count_to_del_default)
+ + " routes in two UPDATEs"
+ )
# collecting capacity and performance results
totals = {}
performance = {}
second_line = second_line[:-2]
f.write(first_line + "\n")
f.write(second_line + "\n")
- logger.info("Message generator performance results stored in " +
- file_name + ":")
+ logger.info(
+ "Message generator performance results stored in " + file_name + ":"
+ )
logger.info(" " + first_line)
logger.info(" " + second_line)
finally:
"""
# generating list of LS NLRI parameters
identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
- ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
+ ipv4_tunnel_sender_address = (
+ self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
+ )
tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
- ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
- ls_nlri_values = {"Identifier": identifier,
- "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
- "TunnelID": tunnel_id, "LSPID": lsp_id,
- "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
+ ipv4_tunnel_endpoint_address = (
+ self.ls_nlri_default["IPv4TunnelEndPointAddress"]
+ + index / self.lsteaddr_step
+ )
+ ls_nlri_values = {
+ "Identifier": identifier,
+ "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
+ "TunnelID": tunnel_id,
+ "LSPID": lsp_id,
+ "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address,
+ }
return ls_nlri_values
- def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
- prefix_len=None, prefix_count=None, randomize=None):
+ def get_prefix_list(
+ self,
+ slot_index,
+ slot_size=None,
+ prefix_base=None,
+ prefix_len=None,
+ prefix_count=None,
+ randomize=None,
+ ):
"""Generates list of IP address prefixes.
Arguments:
logger.debug(" Prefix list: " + str(prefixes))
return prefixes
- def compose_update_message(self, prefix_count_to_add=None,
- prefix_count_to_del=None):
+ def compose_update_message(
+ self, prefix_count_to_add=None, prefix_count_to_del=None
+ ):
"""Composes an UPDATE message
Arguments:
prefix_count_to_del = self.prefix_count_to_del_default
# logging
if self.log_info and not (self.iteration % 1000):
- logger.info("Iteration: " + str(self.iteration) +
- " - total remaining prefixes: " +
- str(self.remaining_prefixes))
+ logger.info(
+ "Iteration: "
+ + str(self.iteration)
+ + " - total remaining prefixes: "
+ + str(self.remaining_prefixes)
+ )
if self.log_debug:
- logger.debug("#" * 10 + " Iteration: " +
- str(self.iteration) + " " + "#" * 10)
- logger.debug("Remaining prefixes: " +
- str(self.remaining_prefixes))
+ logger.debug(
+ "#" * 10 + " Iteration: " + str(self.iteration) + " " + "#" * 10
+ )
+ logger.debug("Remaining prefixes: " + str(self.remaining_prefixes))
# scenario type & one-shot counter
- straightforward_scenario = (self.remaining_prefixes >
- self.remaining_prefixes_threshold)
+ straightforward_scenario = (
+ self.remaining_prefixes > self.remaining_prefixes_threshold
+ )
if straightforward_scenario:
prefix_count_to_del = 0
if self.log_debug:
if not self.phase2_start_time:
self.phase2_start_time = time.time()
# tailor the number of prefixes if needed
- prefix_count_to_add = (
- prefix_count_to_del
- + min(prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes)
+ prefix_count_to_add = prefix_count_to_del + min(
+ prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes
)
# prefix slots selection for insertion and withdrawal
slot_index_to_add = self.iteration
# getting lists of prefixes for insertion in this iteration
if self.log_debug:
logger.debug("Prefixes to be inserted in this iteration:")
- prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
- prefix_count=prefix_count_to_add)
+ prefix_list_to_add = self.get_prefix_list(
+ slot_index_to_add, prefix_count=prefix_count_to_add
+ )
# getting lists of prefixes for withdrawal in this iteration
if self.log_debug:
logger.debug("Prefixes to be withdrawn in this iteration:")
- prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
- prefix_count=prefix_count_to_del)
+ prefix_list_to_del = self.get_prefix_list(
+ slot_index_to_del, prefix_count=prefix_count_to_del
+ )
# generating the UPDATE mesage with LS-NLRI only
if self.bgpls:
ls_nlri = self.get_ls_nlri_values(self.iteration)
- msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
- **ls_nlri)
+ msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[], **ls_nlri)
else:
# generating the UPDATE message with prefix lists
if self.single_update_default:
# Send prefixes to be introduced and withdrawn
# in one UPDATE message
- msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
- nlri_prefixes=prefix_list_to_add)
+ msg_out = self.update_message(
+ wr_prefixes=prefix_list_to_del, nlri_prefixes=prefix_list_to_add
+ )
else:
# Send prefixes to be introduced and withdrawn
# in separate UPDATE messages (if needed)
- msg_out = self.update_message(wr_prefixes=[],
- nlri_prefixes=prefix_list_to_add)
+ msg_out = self.update_message(
+ wr_prefixes=[], nlri_prefixes=prefix_list_to_add
+ )
if prefix_count_to_del:
- msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
- nlri_prefixes=[])
+ msg_out += self.update_message(
+ wr_prefixes=prefix_list_to_del, nlri_prefixes=[]
+ )
# updating counters - who knows ... maybe I am last time here ;)
if straightforward_scenario:
self.phase1_stop_time = time.time()
self.phase1_updates_sent = self.updates_sent
else:
self.phase2_stop_time = time.time()
- self.phase2_updates_sent = (self.updates_sent -
- self.phase1_updates_sent)
+ self.phase2_updates_sent = self.updates_sent - self.phase1_updates_sent
# updating totals for the next iteration
self.iteration += 1
- self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
+ self.remaining_prefixes -= prefix_count_to_add - prefix_count_to_del
# returning the encoded message
return msg_out
# Section of message encoders
- def open_message(self, version=None, my_autonomous_system=None,
- hold_time=None, bgp_identifier=None):
+ def open_message(
+ self,
+ version=None,
+ my_autonomous_system=None,
+ hold_time=None,
+ bgp_identifier=None,
+ ):
"""Generates an OPEN Message (rfc4271#section-4.2)
Arguments:
# AS number is mappable to 2 bytes
if my_autonomous_system < 65536:
my_autonomous_system_2_bytes = my_autonomous_system
- my_autonomous_system_hex_2_bytes = struct.pack(">H",
- my_autonomous_system)
+ my_autonomous_system_hex_2_bytes = struct.pack(">H", my_autonomous_system)
# Hold Time
hold_time_hex = struct.pack(">H", hold_time)
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
"\x01" # Capability type (NLRI Unicast),
- # see RFC 4760, secton 8
+ # see RFC 4760, secton 8
"\x04" # Capability value length
"\x00\x01" # AFI (Ipv4)
"\x00" # (reserved)
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
"\x01" # Capability type (NLRI Unicast),
- # see RFC 4760, secton 8
+ # see RFC 4760, secton 8
"\x04" # Capability value length
"\x40\x04" # AFI (BGP-LS)
"\x00" # (reserved)
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
"\x41" # "32 bit AS Numbers Support"
- # (see RFC 6793, section 3)
+ # (see RFC 6793, section 3)
"\x04" # Capability value length
)
- optional_parameter_hex += (
- struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
- )
+ optional_parameter_hex += struct.pack(
+ ">I", my_autonomous_system
+ ) # My AS in 32 bit format
optional_parameters_hex += optional_parameter_hex
if self.grace != 8:
b = list(bin(self.grace)[2:])
b = b + [0] * (3 - len(b))
length = "\x08"
- if b[1] == '1':
+ if b[1] == "1":
restart_flag = "\x80\x05"
else:
restart_flag = "\x00\x05"
- if b[2] == '1':
+ if b[2] == "1":
ipv4_flag = "\x80"
else:
ipv4_flag = "\x00"
- if b[0] == '1':
+ if b[0] == "1":
ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
length = "\x11"
else:
# "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
# ll-gr turned on when grace is between 4-7
optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
- length, restart_flag, ipv4_flag, ll_gr)
+ length, restart_flag, ipv4_flag, ll_gr
+ )
optional_parameters_hex += optional_parameter_hex
# Optional Parameters Length
optional_parameters_length = len(optional_parameters_hex)
- optional_parameters_length_hex = struct.pack("B",
- optional_parameters_length)
+ optional_parameters_length_hex = struct.pack("B", optional_parameters_length)
# Length (big-endian)
length = (
- len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
- len(my_autonomous_system_hex_2_bytes) +
- len(hold_time_hex) + len(bgp_identifier_hex) +
- len(optional_parameters_length_hex) +
- len(optional_parameters_hex)
+ len(marker_hex)
+ + 2
+ + len(type_hex)
+ + len(version_hex)
+ + len(my_autonomous_system_hex_2_bytes)
+ + len(hold_time_hex)
+ + len(bgp_identifier_hex)
+ + len(optional_parameters_length_hex)
+ + len(optional_parameters_hex)
)
length_hex = struct.pack(">H", length)
# OPEN Message
message_hex = (
- marker_hex +
- length_hex +
- type_hex +
- version_hex +
- my_autonomous_system_hex_2_bytes +
- hold_time_hex +
- bgp_identifier_hex +
- optional_parameters_length_hex +
- optional_parameters_hex
+ marker_hex
+ + length_hex
+ + type_hex
+ + version_hex
+ + my_autonomous_system_hex_2_bytes
+ + hold_time_hex
+ + bgp_identifier_hex
+ + optional_parameters_length_hex
+ + optional_parameters_hex
)
if self.log_debug:
logger.debug("OPEN message encoding")
logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logger.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logger.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logger.debug(" Version=" + str(version) + " (0x" +
- binascii.hexlify(version_hex) + ")")
- logger.debug(" My Autonomous System=" +
- str(my_autonomous_system_2_bytes) + " (0x" +
- binascii.hexlify(my_autonomous_system_hex_2_bytes) +
- ")")
- logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
- binascii.hexlify(hold_time_hex) + ")")
- logger.debug(" BGP Identifier=" + str(bgp_identifier) +
- " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
- logger.debug(" Optional Parameters Length=" +
- str(optional_parameters_length) + " (0x" +
- binascii.hexlify(optional_parameters_length_hex) +
- ")")
- logger.debug(" Optional Parameters=0x" +
- binascii.hexlify(optional_parameters_hex))
- logger.debug("OPEN message encoded: 0x%s",
- binascii.b2a_hex(message_hex))
+ logger.debug(
+ " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
+ )
+ logger.debug(
+ " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
+ )
+ logger.debug(
+ " Version="
+ + str(version)
+ + " (0x"
+ + binascii.hexlify(version_hex)
+ + ")"
+ )
+ logger.debug(
+ " My Autonomous System="
+ + str(my_autonomous_system_2_bytes)
+ + " (0x"
+ + binascii.hexlify(my_autonomous_system_hex_2_bytes)
+ + ")"
+ )
+ logger.debug(
+ " Hold Time="
+ + str(hold_time)
+ + " (0x"
+ + binascii.hexlify(hold_time_hex)
+ + ")"
+ )
+ logger.debug(
+ " BGP Identifier="
+ + str(bgp_identifier)
+ + " (0x"
+ + binascii.hexlify(bgp_identifier_hex)
+ + ")"
+ )
+ logger.debug(
+ " Optional Parameters Length="
+ + str(optional_parameters_length)
+ + " (0x"
+ + binascii.hexlify(optional_parameters_length_hex)
+ + ")"
+ )
+ logger.debug(
+ " Optional Parameters=0x" + binascii.hexlify(optional_parameters_hex)
+ )
+ logger.debug("OPEN message encoded: 0x%s", binascii.b2a_hex(message_hex))
return message_hex
- def update_message(self, wr_prefixes=None, nlri_prefixes=None,
- wr_prefix_length=None, nlri_prefix_length=None,
- my_autonomous_system=None, next_hop=None,
- originator_id=None, cluster_list_item=None,
- end_of_rib=False, **ls_nlri_params):
+ def update_message(
+ self,
+ wr_prefixes=None,
+ nlri_prefixes=None,
+ wr_prefix_length=None,
+ nlri_prefix_length=None,
+ my_autonomous_system=None,
+ next_hop=None,
+ originator_id=None,
+ cluster_list_item=None,
+ end_of_rib=False,
+ **ls_nlri_params
+ ):
"""Generates an UPDATE Message (rfc4271#section-4.3)
Arguments:
if not self.bgpls:
bytes = ((wr_prefix_length - 1) / 8) + 1
for prefix in wr_prefixes:
- withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
- struct.pack(">I", int(prefix))[:bytes])
+ withdrawn_route_hex = (
+ struct.pack("B", wr_prefix_length)
+ + struct.pack(">I", int(prefix))[:bytes]
+ )
withdrawn_routes_hex += withdrawn_route_hex
# Withdrawn Routes Length
"\x04" # Length (4)
)
next_hop_hex = struct.pack(">I", int(next_hop))
- path_attributes_hex += (
- next_hop_hex # IP address of the next hop (4 bytes)
- )
+ path_attributes_hex += next_hop_hex # IP address of the next hop (4 bytes)
if originator_id is not None:
path_attributes_hex += (
"\x80" # Flags ("Optional, non-transitive")
"\x09" # Type (ORIGINATOR_ID)
"\x04" # Length (4)
- ) # ORIGINATOR_ID (4 bytes)
+ ) # ORIGINATOR_ID (4 bytes)
path_attributes_hex += struct.pack(">I", int(originator_id))
if cluster_list_item is not None:
path_attributes_hex += (
"\x80" # Flags ("Optional, non-transitive")
"\x0a" # Type (CLUSTER_LIST)
"\x04" # Length (4)
- ) # one CLUSTER_LIST item (4 bytes)
+ ) # one CLUSTER_LIST item (4 bytes)
path_attributes_hex += struct.pack(">I", int(cluster_list_item))
if self.bgpls and not end_of_rib:
"\x04" # Next Hop Length (4)
)
path_attributes_hex += struct.pack(">I", int(next_hop))
- path_attributes_hex += "\x00" # Reserved
+ path_attributes_hex += "\x00" # Reserved
path_attributes_hex += (
"\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
"\x00\x15" # LS-NLRI.TotalNLRILength (21)
- "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
+ "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
)
path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
- path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
+ path_attributes_hex += struct.pack(
+ ">I", int(ls_nlri["IPv4TunnelSenderAddress"])
+ )
path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
- path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
+ path_attributes_hex += struct.pack(
+ ">I", int(ls_nlri["IPv4TunnelEndPointAddress"])
+ )
# Total Path Attributes Length
total_path_attributes_length = len(path_attributes_hex)
- total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
+ total_path_attributes_length_hex = struct.pack(
+ ">H", total_path_attributes_length
+ )
# Network Layer Reachability Information
nlri_hex = ""
if not self.bgpls:
bytes = ((nlri_prefix_length - 1) / 8) + 1
for prefix in nlri_prefixes:
- nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
- struct.pack(">I", int(prefix))[:bytes])
+ nlri_prefix_hex = (
+ struct.pack("B", nlri_prefix_length)
+ + struct.pack(">I", int(prefix))[:bytes]
+ )
nlri_hex += nlri_prefix_hex
# Length (big-endian)
length = (
- len(marker_hex) + 2 + len(type_hex) +
- len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
- len(total_path_attributes_length_hex) + len(path_attributes_hex) +
- len(nlri_hex))
+ len(marker_hex)
+ + 2
+ + len(type_hex)
+ + len(withdrawn_routes_length_hex)
+ + len(withdrawn_routes_hex)
+ + len(total_path_attributes_length_hex)
+ + len(path_attributes_hex)
+ + len(nlri_hex)
+ )
length_hex = struct.pack(">H", length)
# UPDATE Message
message_hex = (
- marker_hex +
- length_hex +
- type_hex +
- withdrawn_routes_length_hex +
- withdrawn_routes_hex +
- total_path_attributes_length_hex +
- path_attributes_hex +
- nlri_hex
+ marker_hex
+ + length_hex
+ + type_hex
+ + withdrawn_routes_length_hex
+ + withdrawn_routes_hex
+ + total_path_attributes_length_hex
+ + path_attributes_hex
+ + nlri_hex
)
if self.grace != 8 and self.grace != 0 and end_of_rib:
- message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
+ message_hex = marker_hex + binascii.unhexlify("00170200000000")
if self.log_debug:
logger.debug("UPDATE message encoding")
logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logger.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logger.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logger.debug(" withdrawn_routes_length=" +
- str(withdrawn_routes_length) + " (0x" +
- binascii.hexlify(withdrawn_routes_length_hex) + ")")
- logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
- str(wr_prefix_length) + " (0x" +
- binascii.hexlify(withdrawn_routes_hex) + ")")
+ logger.debug(
+ " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
+ )
+ logger.debug(
+ " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
+ )
+ logger.debug(
+ " withdrawn_routes_length="
+ + str(withdrawn_routes_length)
+ + " (0x"
+ + binascii.hexlify(withdrawn_routes_length_hex)
+ + ")"
+ )
+ logger.debug(
+ " Withdrawn_Routes="
+ + str(wr_prefixes)
+ + "/"
+ + str(wr_prefix_length)
+ + " (0x"
+ + binascii.hexlify(withdrawn_routes_hex)
+ + ")"
+ )
if total_path_attributes_length:
- logger.debug(" Total Path Attributes Length=" +
- str(total_path_attributes_length) + " (0x" +
- binascii.hexlify(total_path_attributes_length_hex) + ")")
- logger.debug(" Path Attributes=" + "(0x" +
- binascii.hexlify(path_attributes_hex) + ")")
+ logger.debug(
+ " Total Path Attributes Length="
+ + str(total_path_attributes_length)
+ + " (0x"
+ + binascii.hexlify(total_path_attributes_length_hex)
+ + ")"
+ )
+ logger.debug(
+ " Path Attributes="
+ + "(0x"
+ + binascii.hexlify(path_attributes_hex)
+ + ")"
+ )
logger.debug(" Origin=IGP")
logger.debug(" AS path=" + str(my_autonomous_system))
logger.debug(" Next hop=" + str(next_hop))
logger.debug(" Cluster list=" + str(cluster_list_item))
if self.bgpls:
logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
- logger.debug(" Network Layer Reachability Information=" +
- str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
- " (0x" + binascii.hexlify(nlri_hex) + ")")
- logger.debug("UPDATE message encoded: 0x" +
- binascii.b2a_hex(message_hex))
+ logger.debug(
+ " Network Layer Reachability Information="
+ + str(nlri_prefixes)
+ + "/"
+ + str(nlri_prefix_length)
+ + " (0x"
+ + binascii.hexlify(nlri_hex)
+ + ")"
+ )
+ logger.debug("UPDATE message encoded: 0x" + binascii.b2a_hex(message_hex))
# updating counter
self.updates_sent += 1
error_subcode_hex = struct.pack("B", error_subcode)
# Length (big-endian)
- length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
- len(error_subcode_hex) + len(data_hex))
+ length = (
+ len(marker_hex)
+ + 2
+ + len(type_hex)
+ + len(error_code_hex)
+ + len(error_subcode_hex)
+ + len(data_hex)
+ )
length_hex = struct.pack(">H", length)
# NOTIFICATION Message
message_hex = (
- marker_hex +
- length_hex +
- type_hex +
- error_code_hex +
- error_subcode_hex +
- data_hex
+ marker_hex
+ + length_hex
+ + type_hex
+ + error_code_hex
+ + error_subcode_hex
+ + data_hex
)
if self.log_debug:
logger.debug("NOTIFICATION message encoding")
logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logger.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logger.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logger.debug(" Error Code=" + str(error_code) + " (0x" +
- binascii.hexlify(error_code_hex) + ")")
- logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
- binascii.hexlify(error_subcode_hex) + ")")
+ logger.debug(
+ " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
+ )
+ logger.debug(
+ " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
+ )
+ logger.debug(
+ " Error Code="
+ + str(error_code)
+ + " (0x"
+ + binascii.hexlify(error_code_hex)
+ + ")"
+ )
+ logger.debug(
+ " Error Subode="
+ + str(error_subcode)
+ + " (0x"
+ + binascii.hexlify(error_subcode_hex)
+ + ")"
+ )
logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
- logger.debug("NOTIFICATION message encoded: 0x%s",
- binascii.b2a_hex(message_hex))
+ logger.debug(
+ "NOTIFICATION message encoded: 0x%s", binascii.b2a_hex(message_hex)
+ )
return message_hex
length_hex = struct.pack(">H", length)
# KEEP ALIVE Message
- message_hex = (
- marker_hex +
- length_hex +
- type_hex
- )
+ message_hex = marker_hex + length_hex + type_hex
if self.log_debug:
logger.debug("KEEP ALIVE message encoding")
logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
- logger.debug(" Length=" + str(length) + " (0x" +
- binascii.hexlify(length_hex) + ")")
- logger.debug(" Type=" + str(type) + " (0x" +
- binascii.hexlify(type_hex) + ")")
- logger.debug("KEEP ALIVE message encoded: 0x%s",
- binascii.b2a_hex(message_hex))
+ logger.debug(
+ " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
+ )
+ logger.debug(
+ " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
+ )
+ logger.debug(
+ "KEEP ALIVE message encoded: 0x%s", binascii.b2a_hex(message_hex)
+ )
return message_hex
for idle waiting.
"""
- def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
- l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
- ipv6=False, grace=8, wait_for_read=10):
+ def __init__(
+ self,
+ bgp_socket,
+ timer,
+ storage,
+ evpn=False,
+ mvpn=False,
+ l3vpn_mcast=False,
+ allf=False,
+ l3vpn=False,
+ rt_constrain=False,
+ ipv6=False,
+ grace=8,
+ wait_for_read=10,
+ ):
"""The reader initialisation.
Arguments:
# The logical block was a BGP header.
# Now we know the size of the message.
self.reading_header = False
- self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
- self.header_length)
+ self.bytes_to_read = (
+ get_short_int_from_message(self.msg_in) - self.header_length
+ )
else: # We have finished reading the body of the message.
# Peer has just proven it is still alive.
self.timer.reset_peer_hold_time()
# Prepare state for reading another message.
message_type_hex = self.msg_in[self.header_length]
if message_type_hex == "\x01":
- logger.info("OPEN message received: 0x%s",
- binascii.b2a_hex(self.msg_in))
+ logger.info(
+ "OPEN message received: 0x%s", binascii.b2a_hex(self.msg_in)
+ )
elif message_type_hex == "\x02":
- logger.debug("UPDATE message received: 0x%s",
- binascii.b2a_hex(self.msg_in))
+ logger.debug(
+ "UPDATE message received: 0x%s", binascii.b2a_hex(self.msg_in)
+ )
self.decode_update_message(self.msg_in)
elif message_type_hex == "\x03":
- logger.info("NOTIFICATION message received: 0x%s",
- binascii.b2a_hex(self.msg_in))
+ logger.info(
+ "NOTIFICATION message received: 0x%s",
+ binascii.b2a_hex(self.msg_in),
+ )
elif message_type_hex == "\x04":
- logger.info("KEEP ALIVE message received: 0x%s",
- binascii.b2a_hex(self.msg_in))
+ logger.info(
+ "KEEP ALIVE message received: 0x%s",
+ binascii.b2a_hex(self.msg_in),
+ )
else:
- logger.warning("Unexpected message received: 0x%s",
- binascii.b2a_hex(self.msg_in))
+ logger.warning(
+ "Unexpected message received: 0x%s",
+ binascii.b2a_hex(self.msg_in),
+ )
self.msg_in = ""
self.reading_header = True
self.bytes_to_read = self.header_length
while len(hex_to_decode):
attr_flags_hex = hex_to_decode[0]
attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
-# attr_optional_bit = attr_flags & 128
-# attr_transitive_bit = attr_flags & 64
-# attr_partial_bit = attr_flags & 32
+ # attr_optional_bit = attr_flags & 128
+ # attr_transitive_bit = attr_flags & 64
+ # attr_partial_bit = attr_flags & 32
attr_extended_length_bit = attr_flags & 16
attr_type_code_hex = hex_to_decode[1]
if attr_extended_length_bit:
attr_length_hex = hex_to_decode[2:4]
attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
- attr_value_hex = hex_to_decode[4:4 + attr_length]
- hex_to_decode = hex_to_decode[4 + attr_length:]
+ attr_value_hex = hex_to_decode[4 : 4 + attr_length]
+ hex_to_decode = hex_to_decode[4 + attr_length :]
else:
attr_length_hex = hex_to_decode[2]
attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
- attr_value_hex = hex_to_decode[3:3 + attr_length]
- hex_to_decode = hex_to_decode[3 + attr_length:]
+ attr_value_hex = hex_to_decode[3 : 3 + attr_length]
+ hex_to_decode = hex_to_decode[3 + attr_length :]
if attr_type_code == 1:
- logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=1 (ORIGIN, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 2:
- logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=2 (AS_PATH, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 3:
- logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=3 (NEXT_HOP, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 4:
- logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 5:
- logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=5 (LOCAL_PREF, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 6:
- logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 7:
- logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=7 (AGGREGATOR, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 9: # rfc4456#section-8
- logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 10: # rfc4456#section-8
- logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 14: # rfc4760#section-3
- logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
address_family_identifier_hex = attr_value_hex[0:2]
- logger.debug(" Address Family Identifier=0x%s",
- binascii.b2a_hex(address_family_identifier_hex))
+ logger.debug(
+ " Address Family Identifier=0x%s",
+ binascii.b2a_hex(address_family_identifier_hex),
+ )
subsequent_address_family_identifier_hex = attr_value_hex[2]
- logger.debug(" Subsequent Address Family Identifier=0x%s",
- binascii.b2a_hex(subsequent_address_family_identifier_hex))
+ logger.debug(
+ " Subsequent Address Family Identifier=0x%s",
+ binascii.b2a_hex(subsequent_address_family_identifier_hex),
+ )
next_hop_netaddr_len_hex = attr_value_hex[3]
- next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
- logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
- next_hop_netaddr_len,
- binascii.b2a_hex(next_hop_netaddr_len_hex))
- next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
- next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
- logger.debug(" Network Address of Next Hop=%s (0x%s)",
- next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
+ next_hop_netaddr_len = int(
+ binascii.b2a_hex(next_hop_netaddr_len_hex), 16
+ )
+ logger.debug(
+ " Length of Next Hop Network Address=%s (0x%s)",
+ next_hop_netaddr_len,
+ binascii.b2a_hex(next_hop_netaddr_len_hex),
+ )
+ next_hop_netaddr_hex = attr_value_hex[4 : 4 + next_hop_netaddr_len]
+ next_hop_netaddr = ".".join(
+ str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)
+ )
+ logger.debug(
+ " Network Address of Next Hop=%s (0x%s)",
+ next_hop_netaddr,
+ binascii.b2a_hex(next_hop_netaddr_hex),
+ )
reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
- logger.debug(" Reserved=0x%s",
- binascii.b2a_hex(reserved_hex))
- nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
- logger.debug(" Network Layer Reachability Information=0x%s",
- binascii.b2a_hex(nlri_hex))
+ logger.debug(" Reserved=0x%s", binascii.b2a_hex(reserved_hex))
+ nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1 :]
+ logger.debug(
+ " Network Layer Reachability Information=0x%s",
+ binascii.b2a_hex(nlri_hex),
+ )
nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
for prefix in nlri_prefix_list:
logger.debug(" nlri_prefix_received: %s", prefix)
self.prefixes_introduced += len(nlri_prefix_list) # update counter
elif attr_type_code == 15: # rfc4760#section-4
- logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
- binascii.b2a_hex(attr_flags_hex))
+ logger.debug(
+ "Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex),
+ )
logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
address_family_identifier_hex = attr_value_hex[0:2]
- logger.debug(" Address Family Identifier=0x%s",
- binascii.b2a_hex(address_family_identifier_hex))
+ logger.debug(
+ " Address Family Identifier=0x%s",
+ binascii.b2a_hex(address_family_identifier_hex),
+ )
subsequent_address_family_identifier_hex = attr_value_hex[2]
- logger.debug(" Subsequent Address Family Identifier=0x%s",
- binascii.b2a_hex(subsequent_address_family_identifier_hex))
+ logger.debug(
+ " Subsequent Address Family Identifier=0x%s",
+ binascii.b2a_hex(subsequent_address_family_identifier_hex),
+ )
wd_hex = attr_value_hex[3:]
- logger.debug(" Withdrawn Routes=0x%s",
- binascii.b2a_hex(wd_hex))
+ logger.debug(" Withdrawn Routes=0x%s", binascii.b2a_hex(wd_hex))
wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
- logger.debug(" Withdrawn routes prefix list: %s",
- wdr_prefix_list)
+ logger.debug(" Withdrawn routes prefix list: %s", wdr_prefix_list)
for prefix in wdr_prefix_list:
logger.debug(" withdrawn_prefix_received: %s", prefix)
self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
else:
- logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
- binascii.b2a_hex(attr_flags_hex))
- logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug(
+ "Unknown attribute type=%s, flags:0x%s)",
+ attr_type_code,
+ binascii.b2a_hex(attr_flags_hex),
+ )
+ logger.debug(
+ "Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)
+ )
return None
def decode_update_message(self, msg):
logger.debug("Decoding update message:")
# message header - marker
marker_hex = msg[:16]
- logger.debug("Message header marker: 0x%s",
- binascii.b2a_hex(marker_hex))
+ logger.debug("Message header marker: 0x%s", binascii.b2a_hex(marker_hex))
# message header - message length
msg_length_hex = msg[16:18]
msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
- logger.debug("Message lenght: 0x%s (%s)",
- binascii.b2a_hex(msg_length_hex), msg_length)
+ logger.debug(
+ "Message lenght: 0x%s (%s)", binascii.b2a_hex(msg_length_hex), msg_length
+ )
# message header - message type
msg_type_hex = msg[18:19]
msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
with self.storage as stor:
# this will replace the previously stored message
- stor['update'] = binascii.hexlify(msg)
+ stor["update"] = binascii.hexlify(msg)
logger.debug("Evpn {}".format(self.evpn))
if self.evpn:
logger.debug("Graceful-restart {}".format(self.grace))
if self.grace != 8:
- logger.debug("Skipping update decoding due to graceful-restart data expected")
+ logger.debug(
+ "Skipping update decoding due to graceful-restart data expected"
+ )
return
logger.debug("Mvpn {}".format(self.mvpn))
logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
if self.rt_constrain:
- logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
+ logger.debug(
+ "Skipping update decoding due to Route-Target-Constrain data expected"
+ )
return
logger.debug("Ipv6-Unicast {}".format(self.ipv6))
return
if msg_type == 2:
- logger.debug("Message type: 0x%s (update)",
- binascii.b2a_hex(msg_type_hex))
+ logger.debug("Message type: 0x%s (update)", binascii.b2a_hex(msg_type_hex))
# withdrawn routes length
wdr_length_hex = msg[19:21]
wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
- logger.debug("Withdrawn routes lenght: 0x%s (%s)",
- binascii.b2a_hex(wdr_length_hex), wdr_length)
+ logger.debug(
+ "Withdrawn routes lenght: 0x%s (%s)",
+ binascii.b2a_hex(wdr_length_hex),
+ wdr_length,
+ )
# withdrawn routes
- wdr_hex = msg[21:21 + wdr_length]
- logger.debug("Withdrawn routes: 0x%s",
- binascii.b2a_hex(wdr_hex))
+ wdr_hex = msg[21 : 21 + wdr_length]
+ logger.debug("Withdrawn routes: 0x%s", binascii.b2a_hex(wdr_hex))
wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
- logger.debug("Withdrawn routes prefix list: %s",
- wdr_prefix_list)
+ logger.debug("Withdrawn routes prefix list: %s", wdr_prefix_list)
for prefix in wdr_prefix_list:
logger.debug("withdrawn_prefix_received: %s", prefix)
# total path attribute length
total_pa_length_offset = 21 + wdr_length
- total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
+ total_pa_length_hex = msg[
+ total_pa_length_offset : total_pa_length_offset + 2
+ ]
total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
- logger.debug("Total path attribute lenght: 0x%s (%s)",
- binascii.b2a_hex(total_pa_length_hex), total_pa_length)
+ logger.debug(
+ "Total path attribute lenght: 0x%s (%s)",
+ binascii.b2a_hex(total_pa_length_hex),
+ total_pa_length,
+ )
# path attributes
pa_offset = total_pa_length_offset + 2
- pa_hex = msg[pa_offset:pa_offset + total_pa_length]
+ pa_hex = msg[pa_offset : pa_offset + total_pa_length]
logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
self.decode_path_attributes(pa_hex)
# network layer reachability information length
logger.debug("Calculated NLRI length: %s", nlri_length)
# network layer reachability information
nlri_offset = pa_offset + total_pa_length
- nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
+ nlri_hex = msg[nlri_offset : nlri_offset + nlri_length]
logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
logger.debug("NLRI prefix list: %s", nlri_prefix_list)
self.prefixes_introduced += len(nlri_prefix_list)
self.prefixes_withdrawn += len(wdr_prefix_list)
else:
- logger.error("Unexpeced message type 0x%s in 0x%s",
- binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
+ logger.error(
+ "Unexpeced message type 0x%s in 0x%s",
+ binascii.b2a_hex(msg_type_hex),
+ binascii.b2a_hex(msg),
+ )
def wait_for_read(self):
"""Read message until timeout (next expected event).
if not self.rx_activity_detected or not (self.updates_received % 100):
# right time to write statistics to the log (not for every update and
# not too frequently to avoid having large log files)
- logger.info("total_received_update_message_counter: %s",
- self.updates_received)
- logger.info("total_received_nlri_prefix_counter: %s",
- self.prefixes_introduced)
- logger.info("total_received_withdrawn_prefix_counter: %s",
- self.prefixes_withdrawn)
+ logger.info(
+ "total_received_update_message_counter: %s", self.updates_received
+ )
+ logger.info(
+ "total_received_nlri_prefix_counter: %s", self.prefixes_introduced
+ )
+ logger.info(
+ "total_received_withdrawn_prefix_counter: %s", self.prefixes_withdrawn
+ )
start_time = time.time()
select.select([self.socket], [], [self.socket], wait_timedelta)
self.generator = generator
self.timer = timer
# Sub-trackers.
- self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
- l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
- rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
- wait_for_read=cliargs.wfr)
+ self.reader = ReadTracker(
+ bgp_socket,
+ timer,
+ storage,
+ evpn=cliargs.evpn,
+ mvpn=cliargs.mvpn,
+ l3vpn_mcast=cliargs.l3vpn_mcast,
+ l3vpn=cliargs.l3vpn,
+ allf=cliargs.allf,
+ rt_constrain=cliargs.rt_constrain,
+ ipv6=cliargs.ipv6,
+ grace=cliargs.grace,
+ wait_for_read=cliargs.wfr,
+ )
self.writer = WriteTracker(bgp_socket, generator, timer)
# Prioritization state.
self.prioritize_writing = False
if self.timer.is_time_for_my_keepalive():
if not self.writer.sending_message:
# We need to schedule a keepalive ASAP.
- self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
+ self.writer.enqueue_message_for_sending(
+ self.generator.keepalive_message()
+ )
logger.info("KEEP ALIVE is sent.")
# We are sending a message now, so let's prioritize it.
self.prioritize_writing = True
# which actions are available.
# socket.socket() returns three lists,
# we store them to list of lists.
- list_list = select.select([self.socket], [self.socket], [self.socket],
- self.timer.report_timedelta)
+ list_list = select.select(
+ [self.socket], [self.socket], [self.socket], self.timer.report_timedelta
+ )
read_list, write_list, except_list = list_list
# Lists are unpacked, each is either [] or [self.socket],
# so we will test them as boolean.
logger.info("Storing performance results.")
self.generator.store_results()
logger.info("Finally an END-OF-RIB is sent.")
- msg_out += self.generator.update_message(wr_prefixes=[],
- nlri_prefixes=[],
- end_of_rib=True)
+ msg_out += self.generator.update_message(
+ wr_prefixes=[], nlri_prefixes=[], end_of_rib=True
+ )
self.writer.enqueue_message_for_sending(msg_out)
# Attempt for real sending to be done in next iteration.
return
self.reader.wait_for_read()
return
# We can neither read nor write.
- logger.warning("Input and output both blocked for " +
- str(self.timer.report_timedelta) + " seconds.")
+ logger.warning(
+ "Input and output both blocked for "
+ + str(self.timer.report_timedelta)
+ + " seconds."
+ )
# FIXME: Are we sure select has been really waiting
# the whole period?
return
:return: logger object
"""
logger = logging.getLogger("logger")
- log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
+ log_formatter = logging.Formatter(
+ "%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s"
+ )
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler(logfile, mode="w")
console_handler.setFormatter(log_formatter)
# to work with "you first" peers.
msg_in = read_open_message(bgp_socket)
logger.info(binascii.hexlify(msg_in))
- storage['open'] = binascii.hexlify(msg_in)
+ storage["open"] = binascii.hexlify(msg_in)
timer = TimeTracker(msg_in)
generator = MessageGenerator(arguments)
msg_out = generator.open_message()
class Rpcs:
- '''Handler for SimpleXMLRPCServer'''
+ """Handler for SimpleXMLRPCServer"""
def __init__(self, sendqueue, storage):
- '''Init method
+ """Init method
Arguments:
:sendqueue: queue for data to be sent towards odl
:storage: thread safe dict
- '''
+ """
self.queue = sendqueue
self.storage = storage
def send(self, text):
- '''Data to be sent
+ """Data to be sent
Arguments:
:text: hes string of the data to be sent
- '''
+ """
self.queue.put(text)
- def get(self, text=''):
- '''Reads data form the storage
+ def get(self, text=""):
+ """Reads data form the storage
- returns stored data or an empty string, at the moment only
'update' is stored
:text: a key to the storage to get the data
Returns:
:data: stored data
- '''
+ """
with self.storage as stor:
- return stor.get(text, '')
+ return stor.get(text, "")
- def clean(self, text=''):
- '''Cleans data form the storage
+ def clean(self, text=""):
+ """Cleans data form the storage
Arguments:
:text: a key to the storage to clean the data
- '''
+ """
with self.storage as stor:
if text in stor:
del stor[text]