X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tools%2Ffastbgp%2Fplay.py;h=0a94983e147fbad9475a33979d7971dabd4b8103;hb=7ef504db242cbb8a108afbc20a9ec9881a64fbdc;hp=1d96f184b9fb7c747a1077ec96c87bdc7acd5ea0;hpb=abeae1864ac4b47c730b9deb6c66c68c40f77384;p=integration%2Ftest.git diff --git a/tools/fastbgp/play.py b/tools/fastbgp/play.py index 1d96f184b9..0a94983e14 100755 --- a/tools/fastbgp/play.py +++ b/tools/fastbgp/play.py @@ -11,17 +11,19 @@ EXABGP in this type of scenario.""" # terms of the Eclipse Public License v1.0 which accompanies this distribution, # and is available at http://www.eclipse.org/legal/epl-v10.html +from copy import deepcopy +from SimpleXMLRPCServer import SimpleXMLRPCServer import argparse import binascii import ipaddr +import logging +import Queue import select import socket -import time -import logging import struct - import thread -from copy import deepcopy +import threading +import time __author__ = "Vratko Polak" @@ -30,6 +32,25 @@ __license__ = "Eclipse Public License v1.0" __email__ = "vrpolak@cisco.com" +class SafeDict(dict): + '''Thread safe dictionary + + The object will serve as thread safe data storage. + It should be used with "with" statement. + ''' + + def __init__(self, * p_arg, ** n_arg): + super(SafeDict, self).__init__() + self._lock = threading.Lock() + + def __enter__(self): + self._lock.acquire() + return self + + def __exit__(self, type, value, traceback): + self._lock.release() + + def parse_arguments(): """Use argparse to get arguments, @@ -70,6 +91,12 @@ def parse_arguments(): str_help = "The IP of the next hop to be placed into the update messages." parser.add_argument("--nexthop", default="192.0.2.1", type=ipaddr.IPv4Address, dest="nexthop", help=str_help) + str_help = "Identifier of the route originator." + parser.add_argument("--originator", default=None, + type=ipaddr.IPv4Address, dest="originator", help=str_help) + str_help = "Cluster list item identifier." + parser.add_argument("--cluster", default=None, + type=ipaddr.IPv4Address, dest="cluster", help=str_help) str_help = ("Numeric IP Address to try to connect to." + "Currently no effect in listening mode.") parser.add_argument("--peerip", default="127.0.0.2", @@ -98,9 +125,54 @@ def parse_arguments(): str_help = "Minimum number of updates to reach to include result into csv." parser.add_argument("--threshold", default="1000", type=int, help=str_help) str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported" - parser.add_argument("--rfc4760", default="yes", type=str, help=str_help) + parser.add_argument("--rfc4760", default=True, type=bool, help=str_help) + str_help = "Link-State NLRI supported" + parser.add_argument("--bgpls", default=False, type=bool, help=str_help) + str_help = "Link-State NLRI: Identifier" + parser.add_argument("-lsid", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: Tunnel ID" + parser.add_argument("-lstid", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: LSP ID" + parser.add_argument("-lspid", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel Sender Address" + parser.add_argument("--lstsaddr", default="1.2.3.4", + type=ipaddr.IPv4Address, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel End Point Address" + parser.add_argument("--lsteaddr", default="5.6.7.8", + type=ipaddr.IPv4Address, help=str_help) + str_help = "Link-State NLRI: Identifier Step" + parser.add_argument("-lsidstep", default="1", type=int, help=str_help) + str_help = "Link-State NLRI: Tunnel ID Step" + parser.add_argument("-lstidstep", default="2", type=int, help=str_help) + str_help = "Link-State NLRI: LSP ID Step" + parser.add_argument("-lspidstep", default="4", type=int, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step" + parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help) + str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step" + parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help) str_help = "How many play utilities are to be started." parser.add_argument("--multiplicity", default="1", type=int, help=str_help) + str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\ + Enabling this flag makes the script not decoding the update mesage, because of not\ + supported decoding for these elements." + parser.add_argument("--evpn", default=False, action="store_true", help=str_help) + str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\ + Enabling this flag makes the script not decoding the update mesage, because of not\ + supported decoding for these elements." + parser.add_argument("--mvpn", default=False, action="store_true", help=str_help) + str_help = "Open message includes L3VPN-MULTICAST arguments.\ + Enabling this flag makes the script not decoding the update mesage, because of not\ + supported decoding for these elements." + parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help) + str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding." + parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help) + str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding." + parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help) + str_help = "Add all supported families without message decoding." + parser.add_argument("--allf", default=False, action="store_true", help=str_help) + parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout") + str_help = "Skipping well known attributes for update message" + parser.add_argument("--skipattr", default=False, action="store_true", help=str_help) arguments = parser.parse_args() if arguments.multiplicity < 1: print "Multiplicity", arguments.multiplicity, "is not positive." @@ -230,17 +302,22 @@ def read_open_message(bgp_socket): # Some validation. if len(msg_in) < 37: # 37 is minimal length of open message with 4-byte AS number. - logger.error("Got something else than open with 4-byte AS number: " + - binascii.hexlify(msg_in)) - raise MessageError("Got something else than open with 4-byte AS number", msg_in) + error_msg = ( + "Message length (" + str(len(msg_in)) + ") is smaller than " + "minimal length of OPEN message with 4-byte AS number (37)" + ) + logger.error(error_msg + ": " + binascii.hexlify(msg_in)) + raise MessageError(error_msg, msg_in) # TODO: We could check BGP marker, but it is defined only later; # decide what to do. reported_length = get_short_int_from_message(msg_in) if len(msg_in) != reported_length: - logger.error("Message length is not " + str(reported_length) + - " as stated in " + binascii.hexlify(msg_in)) - raise MessageError("Message length is not " + reported_length + - " as stated in ", msg_in) + error_msg = ( + "Expected message length (" + reported_length + + ") does not match actual length (" + str(len(msg_in)) + ")" + ) + logger.error(error_msg + binascii.hexlify(msg_in)) + raise MessageError(error_msg, msg_in) logger.info("Open message received.") return msg_in @@ -274,6 +351,8 @@ class MessageGenerator(object): self.hold_time_default = args.holdtime # Local hold time. self.bgp_identifier_default = int(args.myip) self.next_hop_default = args.nexthop + self.originator_id_default = args.originator + self.cluster_list_item_default = args.cluster self.single_update_default = args.updates == "single" self.randomize_updates_default = args.updates == "random" self.prefix_count_to_add_default = args.insert @@ -287,14 +366,36 @@ class MessageGenerator(object): self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill self.results_file_name_default = args.results self.performance_threshold_default = args.threshold - self.rfc4760 = args.rfc4760 == "yes" + self.rfc4760 = args.rfc4760 + self.bgpls = args.bgpls + self.evpn = args.evpn + self.mvpn = args.mvpn + self.l3vpn_mcast = args.l3vpn_mcast + self.l3vpn = args.l3vpn + self.rt_constrain = args.rt_constrain + self.allf = args.allf + self.skipattr = args.skipattr + # Default values when BGP-LS Attributes are used + if self.bgpls: + self.prefix_count_to_add_default = 1 + self.prefix_count_to_del_default = 0 + self.ls_nlri_default = {"Identifier": args.lsid, + "TunnelID": args.lstid, + "LSPID": args.lspid, + "IPv4TunnelSenderAddress": args.lstsaddr, + "IPv4TunnelEndPointAddress": args.lsteaddr} + self.lsid_step = args.lsidstep + self.lstid_step = args.lstidstep + self.lspid_step = args.lspidstep + self.lstsaddr_step = args.lstsaddrstep + self.lsteaddr_step = args.lsteaddrstep # Default values used for randomized part s1_slots = ((self.total_prefix_amount - self.remaining_prefixes_threshold - 1) / self.prefix_count_to_add_default + 1) s2_slots = ((self.remaining_prefixes_threshold - 1) / (self.prefix_count_to_add_default - - self.prefix_count_to_del_default) + 1) + self.prefix_count_to_del_default) + 1) # S1_First_Index = 0 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1 s2_first_index = s1_slots * self.prefix_count_to_add_default @@ -306,7 +407,6 @@ class MessageGenerator(object): self.prefix_count_to_add_default + 1) self.randomize_lowest_default = s2_first_index self.randomize_highest_default = s2_last_index - # Initialising counters self.phase1_start_time = 0 self.phase1_stop_time = 0 @@ -338,6 +438,8 @@ class MessageGenerator(object): logger.info(" My Hold Time: " + str(self.hold_time_default)) logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default)) logger.info(" Next Hop: " + str(self.next_hop_default)) + logger.info(" Originator ID: " + str(self.originator_id_default)) + logger.info(" Cluster list: " + str(self.cluster_list_item_default)) logger.info(" Prefix count to be inserted at once: " + str(self.prefix_count_to_add_default)) logger.info(" Prefix count to be withdrawn at once: " + @@ -375,6 +477,7 @@ class MessageGenerator(object): :return: n/a """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if file_name is None: file_name = self.results_file_name_default if threshold is None: @@ -470,6 +573,7 @@ class MessageGenerator(object): Created just as a fame for future generator enhancement. """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if lowest is None: lowest = self.randomize_lowest_default if highest is None: @@ -484,7 +588,27 @@ class MessageGenerator(object): new_index = index return new_index - # Get list of prefixes + def get_ls_nlri_values(self, index): + """Generates LS-NLRI parameters. + http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03 + + Arguments: + :param index: index (iteration) + Returns: + :return: dictionary of LS NLRI parameters and values + """ + # generating list of LS NLRI parameters + identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step + ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step + tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step + lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step + ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step + ls_nlri_values = {"Identifier": identifier, + "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address, + "TunnelID": tunnel_id, "LSPID": lsp_id, + "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address} + return ls_nlri_values + def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None, prefix_len=None, prefix_count=None, randomize=None): """Generates list of IP address prefixes. @@ -503,6 +627,7 @@ class MessageGenerator(object): :return: list of generated IP address prefixes """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if slot_size is None: slot_size = self.slot_size_default if prefix_base is None: @@ -546,6 +671,7 @@ class MessageGenerator(object): Updates global counters. """ # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if prefix_count_to_add is None: prefix_count_to_add = self.prefix_count_to_add_default if prefix_count_to_del is None: @@ -591,20 +717,26 @@ class MessageGenerator(object): logger.debug("Prefixes to be withdrawn in this iteration:") prefix_list_to_del = self.get_prefix_list(slot_index_to_del, prefix_count=prefix_count_to_del) - # generating the mesage - if self.single_update_default: - # Send prefixes to be introduced and withdrawn - # in one UPDATE message - msg_out = self.update_message(wr_prefixes=prefix_list_to_del, - nlri_prefixes=prefix_list_to_add) + # generating the UPDATE mesage with LS-NLRI only + if self.bgpls: + ls_nlri = self.get_ls_nlri_values(self.iteration) + msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[], + **ls_nlri) else: - # Send prefixes to be introduced and withdrawn - # in separate UPDATE messages (if needed) - msg_out = self.update_message(wr_prefixes=[], - nlri_prefixes=prefix_list_to_add) - if prefix_count_to_del: - msg_out += self.update_message(wr_prefixes=prefix_list_to_del, - nlri_prefixes=[]) + # generating the UPDATE message with prefix lists + if self.single_update_default: + # Send prefixes to be introduced and withdrawn + # in one UPDATE message + msg_out = self.update_message(wr_prefixes=prefix_list_to_del, + nlri_prefixes=prefix_list_to_add) + else: + # Send prefixes to be introduced and withdrawn + # in separate UPDATE messages (if needed) + msg_out = self.update_message(wr_prefixes=[], + nlri_prefixes=prefix_list_to_add) + if prefix_count_to_del: + msg_out += self.update_message(wr_prefixes=prefix_list_to_del, + nlri_prefixes=[]) # updating counters - who knows ... maybe I am last time here ;) if straightforward_scenario: self.phase1_stop_time = time.time() @@ -634,7 +766,8 @@ class MessageGenerator(object): :return: encoded OPEN message in HEX """ - # Default values handling + # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if version is None: version = self.version_default if my_autonomous_system is None: @@ -671,7 +804,7 @@ class MessageGenerator(object): # Optional Parameters optional_parameters_hex = "" - if self.rfc4760: + if self.rfc4760 or self.allf: optional_parameter_hex = ( "\x02" # Param type ("Capability Ad") "\x06" # Length (6 bytes) @@ -684,6 +817,109 @@ class MessageGenerator(object): ) optional_parameters_hex += optional_parameter_hex + if self.bgpls or self.allf: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Capability type (NLRI Unicast), + # see RFC 4760, secton 8 + "\x04" # Capability value length + "\x40\x04" # AFI (BGP-LS) + "\x00" # (reserved) + "\x47" # SAFI (BGP-LS) + ) + optional_parameters_hex += optional_parameter_hex + + if self.evpn or self.allf: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x19" # AFI (L2-VPN) + "\x00" # (reserved) + "\x46" # SAFI (EVPN) + ) + optional_parameters_hex += optional_parameter_hex + + if self.mvpn or self.allf: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x01" # AFI (IPV4) + "\x00" # (reserved) + "\x05" # SAFI (MCAST-VPN) + ) + optional_parameters_hex += optional_parameter_hex + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x02" # AFI (IPV6) + "\x00" # (reserved) + "\x05" # SAFI (MCAST-VPN) + ) + optional_parameters_hex += optional_parameter_hex + + if self.l3vpn_mcast or self.allf: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x01" # AFI (IPV4) + "\x00" # (reserved) + "\x81" # SAFI (L3VPN-MCAST) + ) + optional_parameters_hex += optional_parameter_hex + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x02" # AFI (IPV6) + "\x00" # (reserved) + "\x81" # SAFI (L3VPN-MCAST) + ) + optional_parameters_hex += optional_parameter_hex + + if self.l3vpn or self.allf: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x01" # AFI (IPV4) + "\x00" # (reserved) + "\x80" # SAFI (L3VPN-UNICAST) + ) + optional_parameters_hex += optional_parameter_hex + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x02" # AFI (IPV6) + "\x00" # (reserved) + "\x80" # SAFI (L3VPN-UNICAST) + ) + optional_parameters_hex += optional_parameter_hex + + if self.rt_constrain or self.allf: + optional_parameter_hex = ( + "\x02" # Param type ("Capability Ad") + "\x06" # Length (6 bytes) + "\x01" # Multiprotocol extetension capability, + "\x04" # Capability value length + "\x00\x01" # AFI (IPV4) + "\x00" # (reserved) + "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN) + ) + optional_parameters_hex += optional_parameter_hex + optional_parameter_hex = ( "\x02" # Param type ("Capability Ad") "\x06" # Length (6 bytes) @@ -754,7 +990,9 @@ class MessageGenerator(object): def update_message(self, wr_prefixes=None, nlri_prefixes=None, wr_prefix_length=None, nlri_prefix_length=None, - my_autonomous_system=None, next_hop=None): + my_autonomous_system=None, next_hop=None, + originator_id=None, cluster_list_item=None, + end_of_rib=False, **ls_nlri_params): """Generates an UPDATE Message (rfc4271#section-4.3) Arguments: @@ -768,7 +1006,8 @@ class MessageGenerator(object): :return: encoded UPDATE message in HEX """ - # Default values handling + # default values handling + # TODO optimize default values handling (use e.g. dicionary.update() approach) if wr_prefixes is None: wr_prefixes = self.wr_prefixes_default if nlri_prefixes is None: @@ -781,6 +1020,12 @@ class MessageGenerator(object): my_autonomous_system = self.my_autonomous_system_default if next_hop is None: next_hop = self.next_hop_default + if originator_id is None: + originator_id = self.originator_id_default + if cluster_list_item is None: + cluster_list_item = self.cluster_list_item_default + ls_nlri = self.ls_nlri_default.copy() + ls_nlri.update(ls_nlri_params) # Marker marker_hex = "\xFF" * 16 @@ -790,12 +1035,13 @@ class MessageGenerator(object): type_hex = struct.pack("B", type) # Withdrawn Routes - bytes = ((wr_prefix_length - 1) / 8) + 1 withdrawn_routes_hex = "" - for prefix in wr_prefixes: - withdrawn_route_hex = (struct.pack("B", wr_prefix_length) + - struct.pack(">I", int(prefix))[:bytes]) - withdrawn_routes_hex += withdrawn_route_hex + if not self.bgpls: + bytes = ((wr_prefix_length - 1) / 8) + 1 + for prefix in wr_prefixes: + withdrawn_route_hex = (struct.pack("B", wr_prefix_length) + + struct.pack(">I", int(prefix))[:bytes]) + withdrawn_routes_hex += withdrawn_route_hex # Withdrawn Routes Length withdrawn_routes_length = len(withdrawn_routes_hex) @@ -803,12 +1049,15 @@ class MessageGenerator(object): # TODO: to replace hardcoded string by encoding? # Path Attributes - if nlri_prefixes != []: - path_attributes_hex = ( + path_attributes_hex = "" + if not self.skipattr: + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x01" # Type (ORIGIN) "\x01" # Length (1) "\x00" # Origin: IGP + ) + path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x02" # Type (AS_PATH) "\x06" # Length (6) @@ -817,6 +1066,13 @@ class MessageGenerator(object): ) my_as_hex = struct.pack(">I", my_autonomous_system) path_attributes_hex += my_as_hex # AS segment (4 bytes) + path_attributes_hex += ( + "\x40" # Flags ("Well-Known") + "\x05" # Type (LOCAL_PREF) + "\x04" # Length (4) + "\x00\x00\x00\x64" # (100) + ) + if nlri_prefixes != []: path_attributes_hex += ( "\x40" # Flags ("Well-Known") "\x03" # Type (NEXT_HOP) @@ -826,20 +1082,55 @@ class MessageGenerator(object): path_attributes_hex += ( next_hop_hex # IP address of the next hop (4 bytes) ) - else: - path_attributes_hex = "" + if originator_id is not None: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x09" # Type (ORIGINATOR_ID) + "\x04" # Length (4) + ) # ORIGINATOR_ID (4 bytes) + path_attributes_hex += struct.pack(">I", int(originator_id)) + if cluster_list_item is not None: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x0a" # Type (CLUSTER_LIST) + "\x04" # Length (4) + ) # one CLUSTER_LIST item (4 bytes) + path_attributes_hex += struct.pack(">I", int(cluster_list_item)) + + if self.bgpls and not end_of_rib: + path_attributes_hex += ( + "\x80" # Flags ("Optional, non-transitive") + "\x0e" # Type (MP_REACH_NLRI) + "\x22" # Length (34) + "\x40\x04" # AFI (BGP-LS) + "\x47" # SAFI (BGP-LS) + "\x04" # Next Hop Length (4) + ) + path_attributes_hex += struct.pack(">I", int(next_hop)) + path_attributes_hex += "\x00" # Reserved + path_attributes_hex += ( + "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI) + "\x00\x15" # LS-NLRI.TotalNLRILength (21) + "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE) + ) + path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"])) + path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"])) + path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"])) + path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"])) + path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"])) # Total Path Attributes Length total_path_attributes_length = len(path_attributes_hex) total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length) # Network Layer Reachability Information - bytes = ((nlri_prefix_length - 1) / 8) + 1 nlri_hex = "" - for prefix in nlri_prefixes: - nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) + - struct.pack(">I", int(prefix))[:bytes]) - nlri_hex += nlri_prefix_hex + if not self.bgpls: + bytes = ((nlri_prefix_length - 1) / 8) + 1 + for prefix in nlri_prefixes: + nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) + + struct.pack(">I", int(prefix))[:bytes]) + nlri_hex += nlri_prefix_hex # Length (big-endian) length = ( @@ -874,12 +1165,21 @@ class MessageGenerator(object): logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" + str(wr_prefix_length) + " (0x" + binascii.hexlify(withdrawn_routes_hex) + ")") - logger.debug(" Total Path Attributes Length=" + - str(total_path_attributes_length) + " (0x" + - binascii.hexlify(total_path_attributes_length_hex) + - ")") - logger.debug(" Path Attributes=" + "(0x" + - binascii.hexlify(path_attributes_hex) + ")") + if total_path_attributes_length: + logger.debug(" Total Path Attributes Length=" + + str(total_path_attributes_length) + " (0x" + + binascii.hexlify(total_path_attributes_length_hex) + ")") + logger.debug(" Path Attributes=" + "(0x" + + binascii.hexlify(path_attributes_hex) + ")") + logger.debug(" Origin=IGP") + logger.debug(" AS path=" + str(my_autonomous_system)) + logger.debug(" Next hop=" + str(next_hop)) + if originator_id is not None: + logger.debug(" Originator id=" + str(originator_id)) + if cluster_list_item is not None: + logger.debug(" Cluster list=" + str(cluster_list_item)) + if self.bgpls: + logger.debug(" MP_REACH_NLRI: %s", ls_nlri) logger.debug(" Network Layer Reachability Information=" + str(nlri_prefixes) + "/" + str(nlri_prefix_length) + " (0x" + binascii.hexlify(nlri_hex) + ")") @@ -1075,12 +1375,21 @@ class ReadTracker(object): for idle waiting. """ - def __init__(self, bgp_socket, timer): + def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False, + l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False, + wait_for_read=10): """The reader initialisation. Arguments: bgp_socket: socket to be used for sending timer: timer to be used for scheduling + storage: thread safe dict + evpn: flag that evpn functionality is tested + mvpn: flag that mvpn functionality is tested + l3vpn_mcast: flag that l3vpn_mcast functionality is tested + l3vpn: flag that l3vpn unicast functionality is tested + rt_constrain: flag that rt-constrain functionality is tested + allf: flag for all family testing. """ # References to outside objects. self.socket = bgp_socket @@ -1101,6 +1410,14 @@ class ReadTracker(object): self.prefixes_withdrawn = 0 self.rx_idle_time = 0 self.rx_activity_detected = True + self.storage = storage + self.evpn = evpn + self.mvpn = mvpn + self.l3vpn_mcast = l3vpn_mcast + self.l3vpn = l3vpn + self.rt_constrain = rt_constrain + self.allf = allf + self.wfr = wait_for_read def read_message_chunk(self): """Read up to one message @@ -1187,56 +1504,65 @@ class ReadTracker(object): hex_to_decode = hex_to_decode[3 + attr_length:] if attr_type_code == 1: - logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)", + logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 2: - logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)", + logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 3: - logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)", + logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 4: - logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)", + logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 5: - logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)", + logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 6: - logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)", + logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 7: - logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)", + logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 9: # rfc4456#section-8 + logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)", + binascii.b2a_hex(attr_flags_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) + elif attr_type_code == 10: # rfc4456#section-8 + logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) elif attr_type_code == 14: # rfc4760#section-3 - logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)", + logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) address_family_identifier_hex = attr_value_hex[0:2] - logger.debug(" Address Family Identifier = 0x%s", + logger.debug(" Address Family Identifier=0x%s", binascii.b2a_hex(address_family_identifier_hex)) subsequent_address_family_identifier_hex = attr_value_hex[2] - logger.debug(" Subsequent Address Family Identifier = 0x%s", + logger.debug(" Subsequent Address Family Identifier=0x%s", binascii.b2a_hex(subsequent_address_family_identifier_hex)) next_hop_netaddr_len_hex = attr_value_hex[3] next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16) - logger.debug(" Length of Next Hop Network Address = 0x%s (%s)", - binascii.b2a_hex(next_hop_netaddr_len_hex), - next_hop_netaddr_len) + logger.debug(" Length of Next Hop Network Address=%s (0x%s)", + next_hop_netaddr_len, + binascii.b2a_hex(next_hop_netaddr_len_hex)) next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len] - logger.debug(" Network Address of Next Hop = 0x%s", - binascii.b2a_hex(next_hop_netaddr_hex)) + next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)) + logger.debug(" Network Address of Next Hop=%s (0x%s)", + next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex)) reserved_hex = attr_value_hex[4 + next_hop_netaddr_len] - logger.debug(" Reserved = 0x%s", + logger.debug(" Reserved=0x%s", binascii.b2a_hex(reserved_hex)) nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:] - logger.debug(" Network Layer Reachability Information = 0x%s", + logger.debug(" Network Layer Reachability Information=0x%s", binascii.b2a_hex(nlri_hex)) nlri_prefix_list = get_prefix_list_from_hex(nlri_hex) logger.debug(" NLRI prefix list: %s", nlri_prefix_list) @@ -1244,17 +1570,17 @@ class ReadTracker(object): logger.debug(" nlri_prefix_received: %s", prefix) self.prefixes_introduced += len(nlri_prefix_list) # update counter elif attr_type_code == 15: # rfc4760#section-4 - logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)", + logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)", binascii.b2a_hex(attr_flags_hex)) - logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) address_family_identifier_hex = attr_value_hex[0:2] - logger.debug(" Address Family Identifier = 0x%s", + logger.debug(" Address Family Identifier=0x%s", binascii.b2a_hex(address_family_identifier_hex)) subsequent_address_family_identifier_hex = attr_value_hex[2] - logger.debug(" Subsequent Address Family Identifier = 0x%s", + logger.debug(" Subsequent Address Family Identifier=0x%s", binascii.b2a_hex(subsequent_address_family_identifier_hex)) wd_hex = attr_value_hex[3:] - logger.debug(" Withdrawn Routes = 0x%s", + logger.debug(" Withdrawn Routes=0x%s", binascii.b2a_hex(wd_hex)) wdr_prefix_list = get_prefix_list_from_hex(wd_hex) logger.debug(" Withdrawn routes prefix list: %s", @@ -1263,9 +1589,9 @@ class ReadTracker(object): logger.debug(" withdrawn_prefix_received: %s", prefix) self.prefixes_withdrawn += len(wdr_prefix_list) # update counter else: - logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code, + logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code, binascii.b2a_hex(attr_flags_hex)) - logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex)) + logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)) return None def decode_update_message(self, msg): @@ -1289,6 +1615,41 @@ class ReadTracker(object): # message header - message type msg_type_hex = msg[18:19] msg_type = int(binascii.b2a_hex(msg_type_hex), 16) + + with self.storage as stor: + # this will replace the previously stored message + stor['update'] = binascii.hexlify(msg) + + logger.debug("Evpn {}".format(self.evpn)) + if self.evpn: + logger.debug("Skipping update decoding due to evpn data expected") + return + + logger.debug("Mvpn {}".format(self.mvpn)) + if self.mvpn: + logger.debug("Skipping update decoding due to mvpn data expected") + return + + logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast)) + if self.l3vpn_mcast: + logger.debug("Skipping update decoding due to l3vpn_mcast data expected") + return + + logger.debug("L3vpn-unicast {}".format(self.l3vpn)) + if self.l3vpn_mcast: + logger.debug("Skipping update decoding due to l3vpn-unicast data expected") + return + + logger.debug("Route-Target-Constrain {}".format(self.l3vpn_mcast)) + if self.l3vpn_mcast: + logger.debug("Skipping update decoding due to Route-Target-Constrain data expected") + return + + logger.debug("Allf {}".format(self.allf)) + if self.allf: + logger.debug("Skipping update decoding") + return + if msg_type == 2: logger.debug("Message type: 0x%s (update)", binascii.b2a_hex(msg_type_hex)) @@ -1308,13 +1669,13 @@ class ReadTracker(object): logger.debug("withdrawn_prefix_received: %s", prefix) # total path attribute length total_pa_length_offset = 21 + wdr_length - total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2] + total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2] total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16) logger.debug("Total path attribute lenght: 0x%s (%s)", binascii.b2a_hex(total_pa_length_hex), total_pa_length) # path attributes pa_offset = total_pa_length_offset + 2 - pa_hex = msg[pa_offset:pa_offset+total_pa_length] + pa_hex = msg[pa_offset:pa_offset + total_pa_length] logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex)) self.decode_path_attributes(pa_hex) # network layer reachability information length @@ -1322,7 +1683,7 @@ class ReadTracker(object): logger.debug("Calculated NLRI length: %s", nlri_length) # network layer reachability information nlri_offset = pa_offset + total_pa_length - nlri_hex = msg[nlri_offset:nlri_offset+nlri_length] + nlri_hex = msg[nlri_offset:nlri_offset + nlri_length] logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex)) nlri_prefix_list = get_prefix_list_from_hex(nlri_hex) logger.debug("NLRI prefix list: %s", nlri_prefix_list) @@ -1346,7 +1707,7 @@ class ReadTracker(object): # Compute time to the first predictable state change event_time = self.timer.get_next_event_time() # snapshot_time would be imprecise - wait_timedelta = min(event_time - time.time(), 10) + wait_timedelta = min(event_time - time.time(), self.wfr) if wait_timedelta < 0: # The program got around to waiting to an event in "very near # future" so late that it became a "past" event, thus tell @@ -1437,20 +1798,25 @@ class WriteTracker(object): class StateTracker(object): """Main loop has state so complex it warrants this separate class.""" - def __init__(self, bgp_socket, generator, timer): + def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs): """The state tracker initialisation. Arguments: bgp_socket: socket to be used for sending / receiving generator: generator to be used for message generation timer: timer to be used for scheduling + inqueue: user initiated messages queue + storage: thread safe dict to store data for the rpc server + cliargs: cli args from the user """ # References to outside objects. self.socket = bgp_socket self.generator = generator self.timer = timer # Sub-trackers. - self.reader = ReadTracker(bgp_socket, timer) + self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn, + l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf, + rt_constrain=cliargs.rt_constrain, wait_for_read=cliargs.wfr) self.writer = WriteTracker(bgp_socket, generator, timer) # Prioritization state. self.prioritize_writing = False @@ -1463,6 +1829,7 @@ class StateTracker(object): # TODO: Alternative is to switch fairly between reading and # writing (called round robin from now on). # Message counting is done in generator. + self.inqueue = inqueue def perform_one_loop_iteration(self): """ The main loop iteration @@ -1480,6 +1847,14 @@ class StateTracker(object): logger.info("KEEP ALIVE is sent.") # We are sending a message now, so let's prioritize it. self.prioritize_writing = True + + try: + msg = self.inqueue.get_nowait() + logger.info("Received message: {}".format(msg)) + msgbin = binascii.unhexlify(msg) + self.writer.enqueue_message_for_sending(msgbin) + except Queue.Empty: + pass # Now we know what our priorities are, we have to check # which actions are available. # socket.socket() returns three lists, @@ -1529,7 +1904,8 @@ class StateTracker(object): self.generator.store_results() logger.info("Finally an END-OF-RIB is sent.") msg_out += self.generator.update_message(wr_prefixes=[], - nlri_prefixes=[]) + nlri_prefixes=[], + end_of_rib=True) self.writer.enqueue_message_for_sending(msg_out) # Attempt for real sending to be done in next iteration. return @@ -1555,7 +1931,7 @@ def create_logger(loglevel, logfile): :return: logger object """ logger = logging.getLogger("logger") - log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s") + log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s") console_handler = logging.StreamHandler() file_handler = logging.FileHandler(logfile, mode="w") console_handler.setFormatter(log_formatter) @@ -1566,13 +1942,15 @@ def create_logger(loglevel, logfile): return logger -def job(arguments): +def job(arguments, inqueue, storage): """One time initialisation and iterations looping. Notes: Establish BGP connection and run iterations. Arguments: :arguments: Command line arguments + :inqueue: Data to be sent from play.py + :storage: Shared dict for rpc server Returns: :return: None """ @@ -1593,10 +1971,9 @@ def job(arguments): # Using exact keepalive length to not to see possible updates. msg_in = bgp_socket.recv(19) if msg_in != generator.keepalive_message(): - logger.error("Open not confirmed by keepalive, instead got " + - binascii.hexlify(msg_in)) - raise MessageError("Open not confirmed by keepalive, instead got", - msg_in) + error_msg = "Open not confirmed by keepalive, instead got" + logger.error(error_msg + ": " + binascii.hexlify(msg_in)) + raise MessageError(error_msg, msg_in) timer.reset_peer_hold_time() # Send the keepalive to indicate the connection is accepted. timer.snapshot() # Remember this time. @@ -1606,11 +1983,57 @@ def job(arguments): # Use the remembered time. timer.reset_my_keepalive_time(timer.snapshot_time) # End of initial handshake phase. - state = StateTracker(bgp_socket, generator, timer) + state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments) while True: # main reactor loop state.perform_one_loop_iteration() +class Rpcs: + '''Handler for SimpleXMLRPCServer''' + + def __init__(self, sendqueue, storage): + '''Init method + + Arguments: + :sendqueue: queue for data to be sent towards odl + :storage: thread safe dict + ''' + self.queue = sendqueue + self.storage = storage + + def send(self, text): + '''Data to be sent + + Arguments: + :text: hes string of the data to be sent + ''' + self.queue.put(text) + + def get(self, text=''): + '''Reads data form the storage + + - returns stored data or an empty string, at the moment only + 'update' is stored + + Arguments: + :text: a key to the storage to get the data + Returns: + :data: stored data + ''' + with self.storage as stor: + return stor.get(text, '') + + def clean(self, text=''): + '''Cleans data form the storage + + Arguments: + :text: a key to the storage to clean the data + ''' + with self.storage as stor: + if text in stor: + del stor[text] + + def threaded_job(arguments): """Run the job threaded @@ -1624,6 +2047,8 @@ def threaded_job(arguments): prefix_current = arguments.firstprefix myip_current = arguments.myip thread_args = [] + rpcqueue = Queue.Queue() + storage = SafeDict() while 1: amount_per_util = (amount_left - 1) / utils_left + 1 # round up @@ -1644,20 +2069,17 @@ def threaded_job(arguments): try: # Create threads for t in thread_args: - thread.start_new_thread(job, (t,)) + thread.start_new_thread(job, (t, rpcqueue, storage)) except Exception: print "Error: unable to start thread." raise SystemExit(2) - # Work remains forever - while 1: - time.sleep(5) + rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True) + rpcserver.register_instance(Rpcs(rpcqueue, storage)) + rpcserver.serve_forever() if __name__ == "__main__": arguments = parse_arguments() logger = create_logger(arguments.loglevel, arguments.logfile) - if arguments.multiplicity > 1: - threaded_job(arguments) - else: - job(arguments) + threaded_job(arguments)