# terms of the Eclipse Public License v1.0 which accompanies this distribution,
# and is available at http://www.eclipse.org/legal/epl-v10.html
-__author__ = "Vratko Polak"
-__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
-__license__ = "Eclipse Public License v1.0"
-__email__ = "vrpolak@cisco.com"
-
+from copy import deepcopy
+from SimpleXMLRPCServer import SimpleXMLRPCServer
import argparse
import binascii
import ipaddr
+import logging
+import Queue
import select
import socket
-import time
-import logging
import struct
+import thread
+import threading
+import time
+
+
+__author__ = "Vratko Polak"
+__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
+__license__ = "Eclipse Public License v1.0"
+__email__ = "vrpolak@cisco.com"
+
+
+class SafeDict(dict):
+ '''Thread safe dictionary
+
+ The object will serve as thread safe data storage.
+ It should be used with "with" statement.
+ '''
+
+ def __init__(self, * p_arg, ** n_arg):
+ super(SafeDict, self).__init__()
+ self._lock = threading.Lock()
+
+ def __enter__(self):
+ self._lock.acquire()
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self._lock.release()
def parse_arguments():
# we should mirror AS number from peer's open message.
str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
parser.add_argument("--amount", default="1", type=int, help=str_help)
+ str_help = "Rpc server port."
+ parser.add_argument("--port", default="8000", type=int, help=str_help)
str_help = "Maximum number of IP prefixes to be announced in one iteration"
parser.add_argument("--insert", default="1", type=int, help=str_help)
str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
str_help = "The IP of the next hop to be placed into the update messages."
parser.add_argument("--nexthop", default="192.0.2.1",
type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
+ str_help = "Identifier of the route originator."
+ parser.add_argument("--originator", default=None,
+ type=ipaddr.IPv4Address, dest="originator", help=str_help)
+ str_help = "Cluster list item identifier."
+ parser.add_argument("--cluster", default=None,
+ type=ipaddr.IPv4Address, dest="cluster", help=str_help)
str_help = ("Numeric IP Address to try to connect to." +
"Currently no effect in listening mode.")
parser.add_argument("--peerip", default="127.0.0.2",
str_help = "Minimum number of updates to reach to include result into csv."
parser.add_argument("--threshold", default="1000", type=int, help=str_help)
str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
- parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
+ parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
+ str_help = "Using peerip instead of myip for xmlrpc server"
+ parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
+ str_help = "Link-State NLRI supported"
+ parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
+ str_help = "Link-State NLRI: Identifier"
+ parser.add_argument("-lsid", default="1", type=int, help=str_help)
+ str_help = "Link-State NLRI: Tunnel ID"
+ parser.add_argument("-lstid", default="1", type=int, help=str_help)
+ str_help = "Link-State NLRI: LSP ID"
+ parser.add_argument("-lspid", default="1", type=int, help=str_help)
+ str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
+ parser.add_argument("--lstsaddr", default="1.2.3.4",
+ type=ipaddr.IPv4Address, help=str_help)
+ str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
+ parser.add_argument("--lsteaddr", default="5.6.7.8",
+ type=ipaddr.IPv4Address, help=str_help)
+ str_help = "Link-State NLRI: Identifier Step"
+ parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
+ str_help = "Link-State NLRI: Tunnel ID Step"
+ parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
+ str_help = "Link-State NLRI: LSP ID Step"
+ parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
+ str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
+ parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
+ str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
+ parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
+ str_help = "How many play utilities are to be started."
+ parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
+ str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
+ Enabling this flag makes the script not decoding the update mesage, because of not\
+ supported decoding for these elements."
+ parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
+ str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
+ Enabling this flag makes the script not decoding the update mesage, because of not\
+ supported decoding for these elements."
+ parser.add_argument("--grace", default="8", type=int, help=str_help)
+ str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
+ IPV4-Unicast, IPV6-Unicast, BGP-LS\
+ Enabling this flag makes the script not decoding the update mesage, because of not\
+ supported decoding for these elements."
+ parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
+ str_help = "Open message includes L3VPN-MULTICAST arguments.\
+ Enabling this flag makes the script not decoding the update mesage, because of not\
+ supported decoding for these elements."
+ parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
+ str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
+ parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
+ str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
+ parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
+ str_help = "Open message includes ipv6-unicast family, without message decoding."
+ parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
+ str_help = "Add all supported families without message decoding."
+ parser.add_argument("--allf", default=False, action="store_true", help=str_help)
+ parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
+ str_help = "Skipping well known attributes for update message"
+ parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
arguments = parser.parse_args()
+ if arguments.multiplicity < 1:
+ print "Multiplicity", arguments.multiplicity, "is not positive."
+ raise SystemExit(1)
# TODO: Are sanity checks (such as asnumber>=0) required?
return arguments
"""Establish connection to BGP peer.
Arguments:
- :arguments: following command-line argumets are used
+ :arguments: following command-line arguments are used
- arguments.myip: local IP address
- arguments.myport: local port
- arguments.peerip: remote IP address
# Some validation.
if len(msg_in) < 37:
# 37 is minimal length of open message with 4-byte AS number.
- logger.error("Got something else than open with 4-byte AS number: " +
- binascii.hexlify(msg_in))
- raise MessageError("Got something else than open with 4-byte AS number", msg_in)
+ error_msg = (
+ "Message length (" + str(len(msg_in)) + ") is smaller than "
+ "minimal length of OPEN message with 4-byte AS number (37)"
+ )
+ logger.error(error_msg + ": " + binascii.hexlify(msg_in))
+ raise MessageError(error_msg, msg_in)
# TODO: We could check BGP marker, but it is defined only later;
# decide what to do.
reported_length = get_short_int_from_message(msg_in)
if len(msg_in) != reported_length:
- logger.error("Message length is not " + str(reported_length) +
- " as stated in " + binascii.hexlify(msg_in))
- raise MessageError("Message length is not " + reported_length +
- " as stated in ", msg_in)
+ error_msg = (
+ "Expected message length (" + reported_length +
+ ") does not match actual length (" + str(len(msg_in)) + ")"
+ )
+ logger.error(error_msg + binascii.hexlify(msg_in))
+ raise MessageError(error_msg, msg_in)
logger.info("Open message received.")
return msg_in
options for MesageGenerator initialisation
Notes:
Calculates and stores default values used later on for
- message geeration.
+ message generation.
"""
self.total_prefix_amount = args.amount
# Number of update messages left to be sent.
self.remaining_prefixes = self.total_prefix_amount
# New parameters initialisation
+ self.port = args.port
self.iteration = 0
self.prefix_base_default = args.firstprefix
self.prefix_length_default = args.prefixlen
self.hold_time_default = args.holdtime # Local hold time.
self.bgp_identifier_default = int(args.myip)
self.next_hop_default = args.nexthop
+ self.originator_id_default = args.originator
+ self.cluster_list_item_default = args.cluster
self.single_update_default = args.updates == "single"
self.randomize_updates_default = args.updates == "random"
self.prefix_count_to_add_default = args.insert
self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
self.results_file_name_default = args.results
self.performance_threshold_default = args.threshold
- self.rfc4760 = args.rfc4760 == "yes"
+ self.rfc4760 = args.rfc4760
+ self.bgpls = args.bgpls
+ self.evpn = args.evpn
+ self.mvpn = args.mvpn
+ self.l3vpn_mcast = args.l3vpn_mcast
+ self.l3vpn = args.l3vpn
+ self.rt_constrain = args.rt_constrain
+ self.ipv6 = args.ipv6
+ self.allf = args.allf
+ self.skipattr = args.skipattr
+ self.grace = args.grace
+ # Default values when BGP-LS Attributes are used
+ if self.bgpls:
+ self.prefix_count_to_add_default = 1
+ self.prefix_count_to_del_default = 0
+ self.ls_nlri_default = {"Identifier": args.lsid,
+ "TunnelID": args.lstid,
+ "LSPID": args.lspid,
+ "IPv4TunnelSenderAddress": args.lstsaddr,
+ "IPv4TunnelEndPointAddress": args.lsteaddr}
+ self.lsid_step = args.lsidstep
+ self.lstid_step = args.lstidstep
+ self.lspid_step = args.lspidstep
+ self.lstsaddr_step = args.lstsaddrstep
+ self.lsteaddr_step = args.lsteaddrstep
# Default values used for randomized part
s1_slots = ((self.total_prefix_amount -
self.remaining_prefixes_threshold - 1) /
self.prefix_count_to_add_default + 1)
s2_slots = ((self.remaining_prefixes_threshold - 1) /
(self.prefix_count_to_add_default -
- self.prefix_count_to_del_default) + 1)
+ self.prefix_count_to_del_default) + 1)
# S1_First_Index = 0
# S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
s2_first_index = s1_slots * self.prefix_count_to_add_default
self.prefix_count_to_add_default + 1)
self.randomize_lowest_default = s2_first_index
self.randomize_highest_default = s2_last_index
-
# Initialising counters
self.phase1_start_time = 0
self.phase1_stop_time = 0
logger.info(" My Hold Time: " + str(self.hold_time_default))
logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
logger.info(" Next Hop: " + str(self.next_hop_default))
+ logger.info(" Originator ID: " + str(self.originator_id_default))
+ logger.info(" Cluster list: " + str(self.cluster_list_item_default))
logger.info(" Prefix count to be inserted at once: " +
str(self.prefix_count_to_add_default))
logger.info(" Prefix count to be withdrawn at once: " +
:return: n/a
"""
# default values handling
+ # TODO optimize default values handling (use e.g. dicionary.update() approach)
if file_name is None:
file_name = self.results_file_name_default
if threshold is None:
Created just as a fame for future generator enhancement.
"""
# default values handling
+ # TODO optimize default values handling (use e.g. dicionary.update() approach)
if lowest is None:
lowest = self.randomize_lowest_default
if highest is None:
new_index = index
return new_index
- # Get list of prefixes
+ def get_ls_nlri_values(self, index):
+ """Generates LS-NLRI parameters.
+ http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
+
+ Arguments:
+ :param index: index (iteration)
+ Returns:
+ :return: dictionary of LS NLRI parameters and values
+ """
+ # generating list of LS NLRI parameters
+ identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
+ ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
+ tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
+ lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
+ ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
+ ls_nlri_values = {"Identifier": identifier,
+ "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
+ "TunnelID": tunnel_id, "LSPID": lsp_id,
+ "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
+ return ls_nlri_values
+
def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
prefix_len=None, prefix_count=None, randomize=None):
"""Generates list of IP address prefixes.
:return: list of generated IP address prefixes
"""
# default values handling
+ # TODO optimize default values handling (use e.g. dicionary.update() approach)
if slot_size is None:
slot_size = self.slot_size_default
if prefix_base is None:
Updates global counters.
"""
# default values handling
+ # TODO optimize default values handling (use e.g. dicionary.update() approach)
if prefix_count_to_add is None:
prefix_count_to_add = self.prefix_count_to_add_default
if prefix_count_to_del is None:
logger.debug("Prefixes to be withdrawn in this iteration:")
prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
prefix_count=prefix_count_to_del)
- # generating the mesage
- if self.single_update_default:
- # Send prefixes to be introduced and withdrawn
- # in one UPDATE message
- msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
- nlri_prefixes=prefix_list_to_add)
+ # generating the UPDATE mesage with LS-NLRI only
+ if self.bgpls:
+ ls_nlri = self.get_ls_nlri_values(self.iteration)
+ msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
+ **ls_nlri)
else:
- # Send prefixes to be introduced and withdrawn
- # in separate UPDATE messages (if needed)
- msg_out = self.update_message(wr_prefixes=[],
- nlri_prefixes=prefix_list_to_add)
- if prefix_count_to_del:
- msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
- nlri_prefixes=[])
+ # generating the UPDATE message with prefix lists
+ if self.single_update_default:
+ # Send prefixes to be introduced and withdrawn
+ # in one UPDATE message
+ msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
+ nlri_prefixes=prefix_list_to_add)
+ else:
+ # Send prefixes to be introduced and withdrawn
+ # in separate UPDATE messages (if needed)
+ msg_out = self.update_message(wr_prefixes=[],
+ nlri_prefixes=prefix_list_to_add)
+ if prefix_count_to_del:
+ msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
+ nlri_prefixes=[])
# updating counters - who knows ... maybe I am last time here ;)
if straightforward_scenario:
self.phase1_stop_time = time.time()
:return: encoded OPEN message in HEX
"""
- # Default values handling
+ # default values handling
+ # TODO optimize default values handling (use e.g. dicionary.update() approach)
if version is None:
version = self.version_default
if my_autonomous_system is None:
# Optional Parameters
optional_parameters_hex = ""
- if self.rfc4760:
+ if self.rfc4760 or self.allf:
optional_parameter_hex = (
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
)
optional_parameters_hex += optional_parameter_hex
+ if self.ipv6 or self.allf:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x02" # AFI (IPV6)
+ "\x00" # (reserved)
+ "\x01" # SAFI (UNICAST)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
+ if self.bgpls or self.allf:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Capability type (NLRI Unicast),
+ # see RFC 4760, secton 8
+ "\x04" # Capability value length
+ "\x40\x04" # AFI (BGP-LS)
+ "\x00" # (reserved)
+ "\x47" # SAFI (BGP-LS)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
+ if self.evpn or self.allf:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x19" # AFI (L2-VPN)
+ "\x00" # (reserved)
+ "\x46" # SAFI (EVPN)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
+ if self.mvpn or self.allf:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x01" # AFI (IPV4)
+ "\x00" # (reserved)
+ "\x05" # SAFI (MCAST-VPN)
+ )
+ optional_parameters_hex += optional_parameter_hex
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x02" # AFI (IPV6)
+ "\x00" # (reserved)
+ "\x05" # SAFI (MCAST-VPN)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
+ if self.l3vpn_mcast or self.allf:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x01" # AFI (IPV4)
+ "\x00" # (reserved)
+ "\x81" # SAFI (L3VPN-MCAST)
+ )
+ optional_parameters_hex += optional_parameter_hex
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x02" # AFI (IPV6)
+ "\x00" # (reserved)
+ "\x81" # SAFI (L3VPN-MCAST)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
+ if self.l3vpn or self.allf:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x01" # AFI (IPV4)
+ "\x00" # (reserved)
+ "\x80" # SAFI (L3VPN-UNICAST)
+ )
+ optional_parameters_hex += optional_parameter_hex
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x02" # AFI (IPV6)
+ "\x00" # (reserved)
+ "\x80" # SAFI (L3VPN-UNICAST)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
+ if self.rt_constrain or self.allf:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x01" # AFI (IPV4)
+ "\x00" # (reserved)
+ "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
optional_parameter_hex = (
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
"\x41" # "32 bit AS Numbers Support"
# (see RFC 6793, section 3)
"\x04" # Capability value length
- # My AS in 32 bit format
- + struct.pack(">I", my_autonomous_system)
+ )
+ optional_parameter_hex += (
+ struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
)
optional_parameters_hex += optional_parameter_hex
+ if self.grace != 8:
+ b = list(bin(self.grace)[2:])
+ b = b + [0] * (3 - len(b))
+ length = "\x08"
+ if b[1] == '1':
+ restart_flag = "\x80\x05"
+ else:
+ restart_flag = "\x00\x05"
+ if b[2] == '1':
+ ipv4_flag = "\x80"
+ else:
+ ipv4_flag = "\x00"
+ if b[0] == '1':
+ ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
+ length = "\x11"
+ else:
+ ll_gr = ""
+ logger.debug("Grace parameters list: {}".format(b))
+ # "\x02" Param type ("Capability Ad")
+ # :param length: Length of whole message
+ # "\x40" Graceful-restart capability
+ # "\x06" Length (6 bytes)
+ # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
+ # "\x05" Restart timer (5sec)
+ # "\x00\x01" AFI (IPV4)
+ # "\x01" SAFI (Unicast)
+ # "\x00" Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
+ # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
+ # ll-gr turned on when grace is between 4-7
+ optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
+ length, restart_flag, ipv4_flag, ll_gr)
+ optional_parameters_hex += optional_parameter_hex
+
# Optional Parameters Length
optional_parameters_length = len(optional_parameters_hex)
optional_parameters_length_hex = struct.pack("B",
def update_message(self, wr_prefixes=None, nlri_prefixes=None,
wr_prefix_length=None, nlri_prefix_length=None,
- my_autonomous_system=None, next_hop=None):
+ my_autonomous_system=None, next_hop=None,
+ originator_id=None, cluster_list_item=None,
+ end_of_rib=False, **ls_nlri_params):
"""Generates an UPDATE Message (rfc4271#section-4.3)
Arguments:
:return: encoded UPDATE message in HEX
"""
- # Default values handling
+ # default values handling
+ # TODO optimize default values handling (use e.g. dicionary.update() approach)
if wr_prefixes is None:
wr_prefixes = self.wr_prefixes_default
if nlri_prefixes is None:
my_autonomous_system = self.my_autonomous_system_default
if next_hop is None:
next_hop = self.next_hop_default
+ if originator_id is None:
+ originator_id = self.originator_id_default
+ if cluster_list_item is None:
+ cluster_list_item = self.cluster_list_item_default
+ ls_nlri = self.ls_nlri_default.copy()
+ ls_nlri.update(ls_nlri_params)
# Marker
marker_hex = "\xFF" * 16
type_hex = struct.pack("B", type)
# Withdrawn Routes
- bytes = ((wr_prefix_length - 1) / 8) + 1
withdrawn_routes_hex = ""
- for prefix in wr_prefixes:
- withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
- struct.pack(">I", int(prefix))[:bytes])
- withdrawn_routes_hex += withdrawn_route_hex
+ if not self.bgpls:
+ bytes = ((wr_prefix_length - 1) / 8) + 1
+ for prefix in wr_prefixes:
+ withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
+ struct.pack(">I", int(prefix))[:bytes])
+ withdrawn_routes_hex += withdrawn_route_hex
# Withdrawn Routes Length
withdrawn_routes_length = len(withdrawn_routes_hex)
# TODO: to replace hardcoded string by encoding?
# Path Attributes
- if nlri_prefixes != []:
- path_attributes_hex = (
+ path_attributes_hex = ""
+ if not self.skipattr:
+ path_attributes_hex += (
"\x40" # Flags ("Well-Known")
"\x01" # Type (ORIGIN)
"\x01" # Length (1)
"\x00" # Origin: IGP
+ )
+ path_attributes_hex += (
"\x40" # Flags ("Well-Known")
"\x02" # Type (AS_PATH)
"\x06" # Length (6)
"\x02" # AS segment type (AS_SEQUENCE)
"\x01" # AS segment length (1)
- # AS segment (4 bytes)
- + struct.pack(">I", my_autonomous_system) +
+ )
+ my_as_hex = struct.pack(">I", my_autonomous_system)
+ path_attributes_hex += my_as_hex # AS segment (4 bytes)
+ path_attributes_hex += (
+ "\x40" # Flags ("Well-Known")
+ "\x05" # Type (LOCAL_PREF)
+ "\x04" # Length (4)
+ "\x00\x00\x00\x64" # (100)
+ )
+ if nlri_prefixes != []:
+ path_attributes_hex += (
"\x40" # Flags ("Well-Known")
"\x03" # Type (NEXT_HOP)
"\x04" # Length (4)
- # IP address of the next hop (4 bytes)
- + struct.pack(">I", int(next_hop))
)
- else:
- path_attributes_hex = ""
+ next_hop_hex = struct.pack(">I", int(next_hop))
+ path_attributes_hex += (
+ next_hop_hex # IP address of the next hop (4 bytes)
+ )
+ if originator_id is not None:
+ path_attributes_hex += (
+ "\x80" # Flags ("Optional, non-transitive")
+ "\x09" # Type (ORIGINATOR_ID)
+ "\x04" # Length (4)
+ ) # ORIGINATOR_ID (4 bytes)
+ path_attributes_hex += struct.pack(">I", int(originator_id))
+ if cluster_list_item is not None:
+ path_attributes_hex += (
+ "\x80" # Flags ("Optional, non-transitive")
+ "\x0a" # Type (CLUSTER_LIST)
+ "\x04" # Length (4)
+ ) # one CLUSTER_LIST item (4 bytes)
+ path_attributes_hex += struct.pack(">I", int(cluster_list_item))
+
+ if self.bgpls and not end_of_rib:
+ path_attributes_hex += (
+ "\x80" # Flags ("Optional, non-transitive")
+ "\x0e" # Type (MP_REACH_NLRI)
+ "\x22" # Length (34)
+ "\x40\x04" # AFI (BGP-LS)
+ "\x47" # SAFI (BGP-LS)
+ "\x04" # Next Hop Length (4)
+ )
+ path_attributes_hex += struct.pack(">I", int(next_hop))
+ path_attributes_hex += "\x00" # Reserved
+ path_attributes_hex += (
+ "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
+ "\x00\x15" # LS-NLRI.TotalNLRILength (21)
+ "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
+ )
+ path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
+ path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
+ path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
+ path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
+ path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
# Total Path Attributes Length
total_path_attributes_length = len(path_attributes_hex)
total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
# Network Layer Reachability Information
- bytes = ((nlri_prefix_length - 1) / 8) + 1
nlri_hex = ""
- for prefix in nlri_prefixes:
- nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
- struct.pack(">I", int(prefix))[:bytes])
- nlri_hex += nlri_prefix_hex
+ if not self.bgpls:
+ bytes = ((nlri_prefix_length - 1) / 8) + 1
+ for prefix in nlri_prefixes:
+ nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
+ struct.pack(">I", int(prefix))[:bytes])
+ nlri_hex += nlri_prefix_hex
# Length (big-endian)
length = (
nlri_hex
)
+ if self.grace != 8 and self.grace != 0 and end_of_rib:
+ message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
+
if self.log_debug:
logger.debug("UPDATE message encoding")
logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
str(wr_prefix_length) + " (0x" +
binascii.hexlify(withdrawn_routes_hex) + ")")
- logger.debug(" Total Path Attributes Length=" +
- str(total_path_attributes_length) + " (0x" +
- binascii.hexlify(total_path_attributes_length_hex) +
- ")")
- logger.debug(" Path Attributes=" + "(0x" +
- binascii.hexlify(path_attributes_hex) + ")")
+ if total_path_attributes_length:
+ logger.debug(" Total Path Attributes Length=" +
+ str(total_path_attributes_length) + " (0x" +
+ binascii.hexlify(total_path_attributes_length_hex) + ")")
+ logger.debug(" Path Attributes=" + "(0x" +
+ binascii.hexlify(path_attributes_hex) + ")")
+ logger.debug(" Origin=IGP")
+ logger.debug(" AS path=" + str(my_autonomous_system))
+ logger.debug(" Next hop=" + str(next_hop))
+ if originator_id is not None:
+ logger.debug(" Originator id=" + str(originator_id))
+ if cluster_list_item is not None:
+ logger.debug(" Cluster list=" + str(cluster_list_item))
+ if self.bgpls:
+ logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
logger.debug(" Network Layer Reachability Information=" +
str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
" (0x" + binascii.hexlify(nlri_hex) + ")")
for idle waiting.
"""
- def __init__(self, bgp_socket, timer):
+ def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
+ l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
+ ipv6=False, grace=8, wait_for_read=10):
"""The reader initialisation.
Arguments:
bgp_socket: socket to be used for sending
timer: timer to be used for scheduling
+ storage: thread safe dict
+ evpn: flag that evpn functionality is tested
+ mvpn: flag that mvpn functionality is tested
+ grace: flag that grace-restart functionality is tested
+ l3vpn_mcast: flag that l3vpn_mcast functionality is tested
+ l3vpn: flag that l3vpn unicast functionality is tested
+ rt_constrain: flag that rt-constrain functionality is tested
+ allf: flag for all family testing.
"""
# References to outside objects.
self.socket = bgp_socket
self.prefixes_introduced = 0
self.prefixes_withdrawn = 0
self.rx_idle_time = 0
+ self.rx_activity_detected = True
+ self.storage = storage
+ self.evpn = evpn
+ self.mvpn = mvpn
+ self.l3vpn_mcast = l3vpn_mcast
+ self.l3vpn = l3vpn
+ self.rt_constrain = rt_constrain
+ self.ipv6 = ipv6
+ self.allf = allf
+ self.wfr = wait_for_read
+ self.grace = grace
def read_message_chunk(self):
"""Read up to one message
hex_to_decode = hex_to_decode[3 + attr_length:]
if attr_type_code == 1:
- logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
+ logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 2:
- logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
+ logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 3:
- logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
+ logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 4:
- logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
+ logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 5:
- logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
+ logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 6:
- logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
+ logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 7:
- logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
+ logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 9: # rfc4456#section-8
+ logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
+ binascii.b2a_hex(attr_flags_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
+ elif attr_type_code == 10: # rfc4456#section-8
+ logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
elif attr_type_code == 14: # rfc4760#section-3
- logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
+ logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
address_family_identifier_hex = attr_value_hex[0:2]
- logger.debug(" Address Family Identifier = 0x%s",
+ logger.debug(" Address Family Identifier=0x%s",
binascii.b2a_hex(address_family_identifier_hex))
subsequent_address_family_identifier_hex = attr_value_hex[2]
- logger.debug(" Subsequent Address Family Identifier = 0x%s",
+ logger.debug(" Subsequent Address Family Identifier=0x%s",
binascii.b2a_hex(subsequent_address_family_identifier_hex))
next_hop_netaddr_len_hex = attr_value_hex[3]
next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
- logger.debug(" Length of Next Hop Network Address = 0x%s (%s)",
- binascii.b2a_hex(next_hop_netaddr_len_hex),
- next_hop_netaddr_len)
+ logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
+ next_hop_netaddr_len,
+ binascii.b2a_hex(next_hop_netaddr_len_hex))
next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
- logger.debug(" Network Address of Next Hop = 0x%s",
- binascii.b2a_hex(next_hop_netaddr_hex))
+ next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
+ logger.debug(" Network Address of Next Hop=%s (0x%s)",
+ next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
- logger.debug(" Reserved = 0x%s",
+ logger.debug(" Reserved=0x%s",
binascii.b2a_hex(reserved_hex))
nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
- logger.debug(" Network Layer Reachability Information = 0x%s",
+ logger.debug(" Network Layer Reachability Information=0x%s",
binascii.b2a_hex(nlri_hex))
nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
logger.debug(" nlri_prefix_received: %s", prefix)
self.prefixes_introduced += len(nlri_prefix_list) # update counter
elif attr_type_code == 15: # rfc4760#section-4
- logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
+ logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
address_family_identifier_hex = attr_value_hex[0:2]
- logger.debug(" Address Family Identifier = 0x%s",
+ logger.debug(" Address Family Identifier=0x%s",
binascii.b2a_hex(address_family_identifier_hex))
subsequent_address_family_identifier_hex = attr_value_hex[2]
- logger.debug(" Subsequent Address Family Identifier = 0x%s",
+ logger.debug(" Subsequent Address Family Identifier=0x%s",
binascii.b2a_hex(subsequent_address_family_identifier_hex))
wd_hex = attr_value_hex[3:]
- logger.debug(" Withdrawn Routes = 0x%s",
+ logger.debug(" Withdrawn Routes=0x%s",
binascii.b2a_hex(wd_hex))
wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
logger.debug(" Withdrawn routes prefix list: %s",
logger.debug(" withdrawn_prefix_received: %s", prefix)
self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
else:
- logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
+ logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
binascii.b2a_hex(attr_flags_hex))
- logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
+ logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
return None
def decode_update_message(self, msg):
# message header - message type
msg_type_hex = msg[18:19]
msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
+
+ with self.storage as stor:
+ # this will replace the previously stored message
+ stor['update'] = binascii.hexlify(msg)
+
+ logger.debug("Evpn {}".format(self.evpn))
+ if self.evpn:
+ logger.debug("Skipping update decoding due to evpn data expected")
+ return
+
+ logger.debug("Graceful-restart {}".format(self.grace))
+ if self.grace != 8:
+ logger.debug("Skipping update decoding due to graceful-restart data expected")
+ return
+
+ logger.debug("Mvpn {}".format(self.mvpn))
+ if self.mvpn:
+ logger.debug("Skipping update decoding due to mvpn data expected")
+ return
+
+ logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
+ if self.l3vpn_mcast:
+ logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
+ return
+
+ logger.debug("L3vpn-unicast {}".format(self.l3vpn))
+ if self.l3vpn_mcast:
+ logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
+ return
+
+ logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
+ if self.rt_constrain:
+ logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
+ return
+
+ logger.debug("Ipv6-Unicast {}".format(self.ipv6))
+ if self.ipv6:
+ logger.debug("Skipping update decoding due to Ipv6 data expected")
+ return
+
+ logger.debug("Allf {}".format(self.allf))
+ if self.allf:
+ logger.debug("Skipping update decoding")
+ return
+
if msg_type == 2:
logger.debug("Message type: 0x%s (update)",
binascii.b2a_hex(msg_type_hex))
logger.debug("withdrawn_prefix_received: %s", prefix)
# total path attribute length
total_pa_length_offset = 21 + wdr_length
- total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
+ total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
logger.debug("Total path attribute lenght: 0x%s (%s)",
binascii.b2a_hex(total_pa_length_hex), total_pa_length)
# path attributes
pa_offset = total_pa_length_offset + 2
- pa_hex = msg[pa_offset:pa_offset+total_pa_length]
+ pa_hex = msg[pa_offset:pa_offset + total_pa_length]
logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
self.decode_path_attributes(pa_hex)
# network layer reachability information length
logger.debug("Calculated NLRI length: %s", nlri_length)
# network layer reachability information
nlri_offset = pa_offset + total_pa_length
- nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
+ nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
logger.debug("NLRI prefix list: %s", nlri_prefix_list)
# Compute time to the first predictable state change
event_time = self.timer.get_next_event_time()
# snapshot_time would be imprecise
- wait_timedelta = min(event_time - time.time(), 10)
+ wait_timedelta = min(event_time - time.time(), self.wfr)
if wait_timedelta < 0:
# The program got around to waiting to an event in "very near
# future" so late that it became a "past" event, thus tell
wait_timedelta = 0
# And wait for event or something to read.
- logger.info("total_received_update_message_counter: %s",
- self.updates_received)
- logger.info("total_received_nlri_prefix_counter: %s",
- self.prefixes_introduced)
- logger.info("total_received_withdrawn_prefix_counter: %s",
- self.prefixes_withdrawn)
+ if not self.rx_activity_detected or not (self.updates_received % 100):
+ # right time to write statistics to the log (not for every update and
+ # not too frequently to avoid having large log files)
+ logger.info("total_received_update_message_counter: %s",
+ self.updates_received)
+ logger.info("total_received_nlri_prefix_counter: %s",
+ self.prefixes_introduced)
+ logger.info("total_received_withdrawn_prefix_counter: %s",
+ self.prefixes_withdrawn)
start_time = time.time()
select.select([self.socket], [], [self.socket], wait_timedelta)
timedelta = time.time() - start_time
self.rx_idle_time += timedelta
+ self.rx_activity_detected = timedelta < 1
- logger.info("... idle for %.3fs", timedelta)
- logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
+ if not self.rx_activity_detected or not (self.updates_received % 100):
+ # right time to write statistics to the log (not for every update and
+ # not too frequently to avoid having large log files)
+ logger.info("... idle for %.3fs", timedelta)
+ logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
return
class StateTracker(object):
"""Main loop has state so complex it warrants this separate class."""
- def __init__(self, bgp_socket, generator, timer):
+ def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
"""The state tracker initialisation.
Arguments:
bgp_socket: socket to be used for sending / receiving
generator: generator to be used for message generation
timer: timer to be used for scheduling
+ inqueue: user initiated messages queue
+ storage: thread safe dict to store data for the rpc server
+ cliargs: cli args from the user
"""
# References to outside objects.
self.socket = bgp_socket
self.generator = generator
self.timer = timer
# Sub-trackers.
- self.reader = ReadTracker(bgp_socket, timer)
+ self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
+ l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
+ rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
+ wait_for_read=cliargs.wfr)
self.writer = WriteTracker(bgp_socket, generator, timer)
# Prioritization state.
self.prioritize_writing = False
# TODO: Alternative is to switch fairly between reading and
# writing (called round robin from now on).
# Message counting is done in generator.
+ self.inqueue = inqueue
def perform_one_loop_iteration(self):
""" The main loop iteration
logger.info("KEEP ALIVE is sent.")
# We are sending a message now, so let's prioritize it.
self.prioritize_writing = True
+
+ try:
+ msg = self.inqueue.get_nowait()
+ logger.info("Received message: {}".format(msg))
+ msgbin = binascii.unhexlify(msg)
+ self.writer.enqueue_message_for_sending(msgbin)
+ except Queue.Empty:
+ pass
# Now we know what our priorities are, we have to check
# which actions are available.
# socket.socket() returns three lists,
self.generator.store_results()
logger.info("Finally an END-OF-RIB is sent.")
msg_out += self.generator.update_message(wr_prefixes=[],
- nlri_prefixes=[])
+ nlri_prefixes=[],
+ end_of_rib=True)
self.writer.enqueue_message_for_sending(msg_out)
# Attempt for real sending to be done in next iteration.
return
return
-if __name__ == "__main__":
- """ One time initialisation and iterations looping.
+def create_logger(loglevel, logfile):
+ """Create logger object
- Notes:
- Establish BGP connection and run iterations.
+ Arguments:
+ :loglevel: log level
+ :logfile: log file name
+ Returns:
+ :return: logger object
"""
- arguments = parse_arguments()
logger = logging.getLogger("logger")
- log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
+ log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
console_handler = logging.StreamHandler()
- file_handler = logging.FileHandler(arguments.logfile, mode="w")
+ file_handler = logging.FileHandler(logfile, mode="w")
console_handler.setFormatter(log_formatter)
file_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
- logger.setLevel(arguments.loglevel)
+ logger.setLevel(loglevel)
+ return logger
+
+
+def job(arguments, inqueue, storage):
+ """One time initialisation and iterations looping.
+ Notes:
+ Establish BGP connection and run iterations.
+
+ Arguments:
+ :arguments: Command line arguments
+ :inqueue: Data to be sent from play.py
+ :storage: Shared dict for rpc server
+ Returns:
+ :return: None
+ """
bgp_socket = establish_connection(arguments)
# Initial handshake phase. TODO: Can it be also moved to StateTracker?
# Receive open message before sending anything.
# FIXME: Add parameter to send default open message first,
# to work with "you first" peers.
msg_in = read_open_message(bgp_socket)
+ logger.info(binascii.hexlify(msg_in))
+ storage['open'] = binascii.hexlify(msg_in)
timer = TimeTracker(msg_in)
generator = MessageGenerator(arguments)
msg_out = generator.open_message()
# Using exact keepalive length to not to see possible updates.
msg_in = bgp_socket.recv(19)
if msg_in != generator.keepalive_message():
- logger.error("Open not confirmed by keepalive, instead got " +
- binascii.hexlify(msg_in))
- raise MessageError("Open not confirmed by keepalive, instead got",
- msg_in)
+ error_msg = "Open not confirmed by keepalive, instead got"
+ logger.error(error_msg + ": " + binascii.hexlify(msg_in))
+ raise MessageError(error_msg, msg_in)
timer.reset_peer_hold_time()
# Send the keepalive to indicate the connection is accepted.
timer.snapshot() # Remember this time.
# Use the remembered time.
timer.reset_my_keepalive_time(timer.snapshot_time)
# End of initial handshake phase.
- state = StateTracker(bgp_socket, generator, timer)
+ state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
while True: # main reactor loop
state.perform_one_loop_iteration()
+
+
+class Rpcs:
+ '''Handler for SimpleXMLRPCServer'''
+
+ def __init__(self, sendqueue, storage):
+ '''Init method
+
+ Arguments:
+ :sendqueue: queue for data to be sent towards odl
+ :storage: thread safe dict
+ '''
+ self.queue = sendqueue
+ self.storage = storage
+
+ def send(self, text):
+ '''Data to be sent
+
+ Arguments:
+ :text: hes string of the data to be sent
+ '''
+ self.queue.put(text)
+
+ def get(self, text=''):
+ '''Reads data form the storage
+
+ - returns stored data or an empty string, at the moment only
+ 'update' is stored
+
+ Arguments:
+ :text: a key to the storage to get the data
+ Returns:
+ :data: stored data
+ '''
+ with self.storage as stor:
+ return stor.get(text, '')
+
+ def clean(self, text=''):
+ '''Cleans data form the storage
+
+ Arguments:
+ :text: a key to the storage to clean the data
+ '''
+ with self.storage as stor:
+ if text in stor:
+ del stor[text]
+
+
+def threaded_job(arguments):
+ """Run the job threaded
+
+ Arguments:
+ :arguments: Command line arguments
+ Returns:
+ :return: None
+ """
+ amount_left = arguments.amount
+ utils_left = arguments.multiplicity
+ prefix_current = arguments.firstprefix
+ myip_current = arguments.myip
+ port = arguments.port
+ thread_args = []
+ rpcqueue = Queue.Queue()
+ storage = SafeDict()
+
+ while 1:
+ amount_per_util = (amount_left - 1) / utils_left + 1 # round up
+ amount_left -= amount_per_util
+ utils_left -= 1
+
+ args = deepcopy(arguments)
+ args.amount = amount_per_util
+ args.firstprefix = prefix_current
+ args.myip = myip_current
+ thread_args.append(args)
+
+ if not utils_left:
+ break
+ prefix_current += amount_per_util * 16
+ myip_current += 1
+
+ try:
+ # Create threads
+ for t in thread_args:
+ thread.start_new_thread(job, (t, rpcqueue, storage))
+ except Exception:
+ print "Error: unable to start thread."
+ raise SystemExit(2)
+
+ if arguments.usepeerip:
+ ip = arguments.peerip
+ else:
+ ip = arguments.myip
+ rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
+ rpcserver.register_instance(Rpcs(rpcqueue, storage))
+ rpcserver.serve_forever()
+
+
+if __name__ == "__main__":
+ arguments = parse_arguments()
+ logger = create_logger(arguments.loglevel, arguments.logfile)
+ threaded_job(arguments)