From 50d572c715d37cb1fd852dc402bb31d6c530a18d Mon Sep 17 00:00:00 2001 From: Radovan Sajben Date: Mon, 21 Sep 2015 10:30:00 +0200 Subject: [PATCH] BGP Speaker tool & BGP performance suite functional enhancement - functions for BGP messages encoding according to RFC 4271 - configurable message generator via command-line parameters - support for logging and debugging - provides performance data for plotting Change-Id: Ifee756b0cd67bda5a2a931f19b70111f5fa123a3 Signed-off-by: Radovan Sajben --- .../bgpingest/singlepeer_changecount.robot | 43 +- .../bgpingest/singlepeer_prefixcount.robot | 42 +- csit/testplans/bgpcep-bgp-ingest-mixed.txt | 10 + .../bgpcep-bgp-ingest-straightforward.txt | 10 + tools/fastbgp/play.py | 1228 ++++++++++++++--- 5 files changed, 1134 insertions(+), 199 deletions(-) create mode 100644 csit/testplans/bgpcep-bgp-ingest-mixed.txt create mode 100644 csit/testplans/bgpcep-bgp-ingest-straightforward.txt diff --git a/csit/suites/bgpcep/bgpingest/singlepeer_changecount.robot b/csit/suites/bgpcep/bgpingest/singlepeer_changecount.robot index bc49feeb85..e3ee58c455 100644 --- a/csit/suites/bgpcep/bgpingest/singlepeer_changecount.robot +++ b/csit/suites/bgpcep/bgpingest/singlepeer_changecount.robot @@ -53,6 +53,15 @@ ${COUNT_CHANGE_COUNT} ${COUNT} ${CHECK_PERIOD} 1 ${CHECK_PERIOD_CHANGE_COUNT} ${CHECK_PERIOD} ${REPETITIONS_CHANGE_COUNT} 1 +${INSERT} 1 +${WITHDRAW} 0 +${PREFILL} 0 +${UPDATE} single +${BGP_TOOL_LOG_LEVEL} info +${CONTROLLER_LOG_LEVEL} INFO +${CONTROLLER_BGP_LOG_LEVEL} DEFAULT +${RESULTS_FILE_NAME} bgp.csv + # TODO: Option names can be better. ${last_change_count} -1 @@ -80,7 +89,7 @@ Start_Talking_BGP_Speaker [Documentation] Start Python speaker to connect to ODL. Store_Change_Count # Myport value is needed for checking whether connection at precise port was established. - BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_CHANGE_COUNT} --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} --peerport=${ODL_BGP_PORT} + BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_CHANGE_COUNT} --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} --peerport=${ODL_BGP_PORT} --insert=${INSERT} --withdraw=${WITHDRAW} --prefill ${PREFILL} --update ${UPDATE} --${BGP_TOOL_LOG_LEVEL} --results ${RESULTS_FILE_NAME} Wait_For_Stable_Talking_Ipv4_Topology [Documentation] Wait until example-ipv4-topology becomes stable. This is done by checking the change counter. @@ -100,6 +109,14 @@ Kill_Talking_BGP_Speaker # NOTE: It is still possible to remain failing fast, if both previous and this test have failed. [Teardown] FailFast.Do_Not_Start_Failing_If_This_Failed +Store_Results_For_Talking_BGP_Speaker + [Documentation] Store results for plotting + [Setup] SetupUtils.Setup_Test_With_Logging_And_Without_Fast_Failing + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} performance-${RESULTS_FILE_NAME} + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} changecount-talking-totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} changecount-talking-performance-${RESULTS_FILE_NAME} + Wait_For_Stable_Ipv4_Topology_After_Talking [Documentation] Wait until example-ipv4-topology becomes stable again. [Tags] critical @@ -112,7 +129,7 @@ Check_For_Empty_Ipv4_Topology_After_Talking Start_Listening_BGP_Speaker [Documentation] Start Python speaker in listening mode. - BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_CHANGE_COUNT} --listen --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} + BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_CHANGE_COUNT} --listen --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} --insert=${INSERT} --withdraw=${WITHDRAW} --prefill ${PREFILL} --update ${UPDATE} --${BGP_TOOL_LOG_LEVEL} --results ${RESULTS_FILE_NAME} Reconfigure_ODL_To_Initiate_Connection [Documentation] Replace BGP peer config module, now with initiate-connection set to true. @@ -138,6 +155,14 @@ Kill_Listening_BGP_Speaker # NOTE: It is still possible to remain failing fast, if both previous and this test have failed. [Teardown] FailFast.Do_Not_Start_Failing_If_This_Failed +Store_Results_For_Listening_BGP_Speaker + [Documentation] Store results for plotting + [Setup] SetupUtils.Setup_Test_With_Logging_And_Without_Fast_Failing + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} performance-${RESULTS_FILE_NAME} + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} changecount-listening-totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} changecount-listening-performance-${RESULTS_FILE_NAME} + Wait_For_Stable_Ipv4_Topology_After_Listening [Documentation] Wait until example-ipv4-topology becomes stable again. [Tags] critical @@ -180,6 +205,12 @@ Setup_Everything # TODO: Replace 20 with some formula from period and repetitions. Builtin.Set_Suite_Variable ${bgp_filling_timeout} ${count*3/10000+20} Builtin.Set_Suite_Variable ${bgp_emptying_timeout} ${bgp_filling_timeout*3/4} + # Timeout in case of doubled number of updates per iteration (per prefix count increase in the simplest mixed scenario) + Builtin.Run_Keyword_If '${UPDATE}' == 'mixed' Builtin.Set_Suite_Variable ${bgp_filling_timeout} ${count*6/10000+20} + KarafKeywords.Execute_Controller_Karaf_Command_On_Background log:set ${CONTROLLER_LOG_LEVEL} + KarafKeywords.Execute_Controller_Karaf_Command_On_Background log:set ${CONTROLLER_BGP_LOG_LEVEL} org.opendaylight.bgpcep + KarafKeywords.Execute_Controller_Karaf_Command_On_Background log:set ${CONTROLLER_BGP_LOG_LEVEL} org.opendaylight.protocol + Teardown_Everything [Documentation] Make sure Python tool was killed and tear down imported Resources. @@ -192,3 +223,11 @@ Store_Change_Count [Documentation] Get the count of changes from BGP change counter. Ignore error or store the value. ${status} ${count} = BuiltIn.Run_Keyword_And_Ignore_Error ChangeCounter.Get_Change_Count BuiltIn.Run_Keyword_If '${status}' == 'PASS' BuiltIn.Set_Suite_Variable ${last_change_count} ${count} + +Store_File_To_Workspace + [Arguments] ${src_file_name} ${dst_file_name} + [Documentation] Store the provided file from the SSH client to workspace. + ${files}= SSHLibrary.List Files In Directory . + ${output_log}= SSHLibrary.Execute_Command cat ${src_file_name} + BuiltIn.Log ${output_log} + Create File ${dst_file_name} ${output_log} diff --git a/csit/suites/bgpcep/bgpingest/singlepeer_prefixcount.robot b/csit/suites/bgpcep/bgpingest/singlepeer_prefixcount.robot index c97d2e5f9a..e99d186865 100644 --- a/csit/suites/bgpcep/bgpingest/singlepeer_prefixcount.robot +++ b/csit/suites/bgpcep/bgpingest/singlepeer_prefixcount.robot @@ -45,6 +45,15 @@ ${COUNT_PREFIX_COUNT} ${COUNT} ${CHECK_PERIOD} 1 ${CHECK_PERIOD_PREFIX_COUNT} ${CHECK_PERIOD} ${REPETITIONS_PREFIX_COUNT} 1 +${INSERT} 1 +${WITHDRAW} 0 +${PREFILL} 0 +${UPDATE} single +${BGP_TOOL_LOG_LEVEL} info +${CONTROLLER_LOG_LEVEL} INFO +${CONTROLLER_BGP_LOG_LEVEL} DEFAULT +${RESULTS_FILE_NAME} bgp.csv + # TODO: Option names can be better. ${last_prefix_count} -1 @@ -63,7 +72,7 @@ Reconfigure_ODL_To_Accept_Connection Start_Talking_BGP_Speaker [Documentation] Start Python speaker to connect to ODL. # Myport value is needed for checking whether connection at precise port was established. - BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_PREFIX_COUNT} --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} --peerport=${ODL_BGP_PORT} + BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_PREFIX_COUNT} --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} --peerport=${ODL_BGP_PORT} --insert=${INSERT} --withdraw=${WITHDRAW} --prefill ${PREFILL} --update ${UPDATE} --${BGP_TOOL_LOG_LEVEL} --results ${RESULTS_FILE_NAME} Wait_For_Stable_Talking_Ipv4_Topology [Documentation] Wait until example-ipv4-topology becomes stable. This is done by checking stability of prefix count. @@ -82,6 +91,14 @@ Kill_Talking_BGP_Speaker # NOTE: It is still possible to remain failing fast, if both previous and this test have failed. [Teardown] FailFast.Do_Not_Start_Failing_If_This_Failed +Store_Results_For_Talking_BGP_Speaker + [Documentation] Store results for plotting + [Setup] SetupUtils.Setup_Test_With_Logging_And_Without_Fast_Failing + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} performance-${RESULTS_FILE_NAME} + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} prefixcount-talking-totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} prefixcount-talking-performance-${RESULTS_FILE_NAME} + Wait_For_Stable_Ipv4_Topology_After_Talking [Documentation] Wait until example-ipv4-topology becomes stable again. [Tags] critical @@ -97,7 +114,7 @@ Check_For_Empty_Ipv4_Topology_After_Talking Start_Listening_BGP_Speaker [Documentation] Start Python speaker in listening mode. - BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_PREFIX_COUNT} --listen --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} + BGPSpeaker.Start_BGP_Speaker --amount ${COUNT_PREFIX_COUNT} --listen --myip=${TOOLS_SYSTEM_IP} --myport=${BGP_TOOL_PORT} --peerip=${ODL_SYSTEM_IP} --insert=${INSERT} --withdraw=${WITHDRAW} --prefill ${PREFILL} --update ${UPDATE} --${BGP_TOOL_LOG_LEVEL} --results ${RESULTS_FILE_NAME} Reconfigure_ODL_To_Initiate_Connection [Documentation] Replace BGP peer config module, now with initiate-connection set to true. @@ -121,6 +138,14 @@ Kill_Listening_BGP_Speaker # NOTE: It is still possible to remain failing fast, if both previous and this test have failed. [Teardown] FailFast.Do_Not_Start_Failing_If_This_Failed +Store_Results_For_Listening_BGP_Speaker + [Documentation] Store results for plotting + [Setup] SetupUtils.Setup_Test_With_Logging_And_Without_Fast_Failing + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} performance-${RESULTS_FILE_NAME} + Store_File_To_Workspace totals-${RESULTS_FILE_NAME} prefixcount-listening-totals-${RESULTS_FILE_NAME} + Store_File_To_Workspace performance-${RESULTS_FILE_NAME} prefixcount-listening-performance-${RESULTS_FILE_NAME} + Wait_For_Stable_Ipv4_Topology_After_Listening [Documentation] Wait until example-ipv4-topology becomes stable again. [Tags] critical @@ -157,6 +182,11 @@ Setup_Everything # TODO: Replace 20 with some formula from period and repetitions. Builtin.Set_Suite_Variable ${bgp_filling_timeout} ${count*3/10000+20} Builtin.Set_Suite_Variable ${bgp_emptying_timeout} ${bgp_filling_timeout*3/4} + # Timeout in case of doubled number of updates per iteration (per prefix count increase in the simplest mixed scenario) + Builtin.Run_Keyword_If '${UPDATE}' == 'mixed' Builtin.Set_Suite_Variable ${bgp_filling_timeout} ${count*6/10000+20} + KarafKeywords.Execute_Controller_Karaf_Command_On_Background log:set ${CONTROLLER_LOG_LEVEL} + KarafKeywords.Execute_Controller_Karaf_Command_On_Background log:set ${CONTROLLER_BGP_LOG_LEVEL} org.opendaylight.bgpcep + KarafKeywords.Execute_Controller_Karaf_Command_On_Background log:set ${CONTROLLER_BGP_LOG_LEVEL} org.opendaylight.protocol Teardown_Everything [Documentation] Make sure Python tool was killed and tear down imported Resources. @@ -164,3 +194,11 @@ Teardown_Everything ConfigViaRestconf.Teardown_Config_Via_Restconf RequestsLibrary.Delete_All_Sessions SSHLibrary.Close_All_Connections + +Store_File_To_Workspace + [Arguments] ${src_file_name} ${dst_file_name} + [Documentation] Store the provided file from the SSH client to workspace. + ${files}= SSHLibrary.List Files In Directory . + ${output_log}= SSHLibrary.Execute_Command cat ${src_file_name} + BuiltIn.Log ${output_log} + Create File ${dst_file_name} ${output_log} diff --git a/csit/testplans/bgpcep-bgp-ingest-mixed.txt b/csit/testplans/bgpcep-bgp-ingest-mixed.txt new file mode 100644 index 0000000000..e9ceb48cb2 --- /dev/null +++ b/csit/testplans/bgpcep-bgp-ingest-mixed.txt @@ -0,0 +1,10 @@ +# Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v1.0 which accompanies this distribution, +# and is available at http://www.eclipse.org/legal/epl-v10.html + +# Place the suites in run order: +integration/test/csit/suites/netconf/ready +integration/test/csit/suites/bgpcep/bgpingest/singlepeer_changecount.robot +integration/test/csit/suites/bgpcep/bgpingest/singlepeer_prefixcount.robot diff --git a/csit/testplans/bgpcep-bgp-ingest-straightforward.txt b/csit/testplans/bgpcep-bgp-ingest-straightforward.txt new file mode 100644 index 0000000000..e9ceb48cb2 --- /dev/null +++ b/csit/testplans/bgpcep-bgp-ingest-straightforward.txt @@ -0,0 +1,10 @@ +# Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v1.0 which accompanies this distribution, +# and is available at http://www.eclipse.org/legal/epl-v10.html + +# Place the suites in run order: +integration/test/csit/suites/netconf/ready +integration/test/csit/suites/bgpcep/bgpingest/singlepeer_changecount.robot +integration/test/csit/suites/bgpcep/bgpingest/singlepeer_prefixcount.robot diff --git a/tools/fastbgp/play.py b/tools/fastbgp/play.py index 8b4d6dd291..c9dba81ecc 100644 --- a/tools/fastbgp/play.py +++ b/tools/fastbgp/play.py @@ -16,69 +16,138 @@ __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc." __license__ = "Eclipse Public License v1.0" __email__ = "vrpolak@cisco.com" - import argparse import binascii import ipaddr import select import socket import time +import logging +import struct def parse_arguments(): - """Use argparse to get arguments, return args object.""" + """Use argparse to get arguments, + + Returns: + :return: args object. + """ parser = argparse.ArgumentParser() # TODO: Should we use --argument-names-with-spaces? - str_help = "Autonomous System number use in the stream (current default as in ODL: 64496)." + str_help = "Autonomous System number use in the stream." parser.add_argument("--asnumber", default=64496, type=int, help=str_help) - # FIXME: We are acting as iBGP peer, we should mirror AS number from peer's open message. - str_help = "Amount of IP prefixes to generate. Negative number is taken as overflown positive." + # FIXME: We are acting as iBGP peer, + # we should mirror AS number from peer's open message. + str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")." parser.add_argument("--amount", default="1", type=int, help=str_help) - str_help = "The first IPv4 prefix to announce, given as numeric IPv4 address." - parser.add_argument("--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help) - str_help = "If present, this tool will be listening for connection, instead of initiating it." + str_help = "Maximum number of IP prefixes to be announced in one iteration" + parser.add_argument("--insert", default="1", type=int, help=str_help) + str_help = "Maximum number of IP prefixes to be withdrawn in one iteration" + parser.add_argument("--withdraw", default="0", type=int, help=str_help) + str_help = "The number of prefixes to process without withdrawals" + parser.add_argument("--prefill", default="0", type=int, help=str_help) + str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent" + parser.add_argument("--updates", choices=["single", "mixed"], + default=["mixed"], help=str_help) + str_help = "Base prefix IP address for prefix generation" + parser.add_argument("--firstprefix", default="8.0.1.0", + type=ipaddr.IPv4Address, help=str_help) + str_help = "The prefix length." + parser.add_argument("--prefixlen", default=28, type=int, help=str_help) + str_help = "Listen for connection, instead of initiating it." parser.add_argument("--listen", action="store_true", help=str_help) - str_help = "Numeric IP Address to bind to and derive BGP ID from. Default value only suitable for listening." - parser.add_argument("--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help) - str_help = "TCP port to bind to when listening or initiating connection. Default only suitable for initiating." + str_help = ("Numeric IP Address to bind to and derive BGP ID from." + + "Default value only suitable for listening.") + parser.add_argument("--myip", default="0.0.0.0", + type=ipaddr.IPv4Address, help=str_help) + str_help = ("TCP port to bind to when listening or initiating connection." + + "Default only suitable for initiating.") parser.add_argument("--myport", default="0", type=int, help=str_help) str_help = "The IP of the next hop to be placed into the update messages." - parser.add_argument("--nexthop", default="192.0.2.1", type=ipaddr.IPv4Address, dest="nexthop", help=str_help) - str_help = "Numeric IP Address to try to connect to. Currently no effect in listening mode." - parser.add_argument("--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help) + parser.add_argument("--nexthop", default="192.0.2.1", + type=ipaddr.IPv4Address, dest="nexthop", help=str_help) + str_help = ("Numeric IP Address to try to connect to." + + "Currently no effect in listening mode.") + parser.add_argument("--peerip", default="127.0.0.2", + type=ipaddr.IPv4Address, help=str_help) str_help = "TCP port to try to connect to. No effect in listening mode." parser.add_argument("--peerport", default="179", type=int, help=str_help) - # TODO: The step between IP prefixes is currently hardcoded to 16. Should we make it configurable? - # Yes, the argument list above is sorted alphabetically. + str_help = "Local hold time." + parser.add_argument("--holdtime", default="180", type=int, help=str_help) + str_help = "Log level (--error, --warning, --info, --debug)" + parser.add_argument("--error", dest="loglevel", action="store_const", + const=logging.ERROR, default=logging.ERROR, + help=str_help) + parser.add_argument("--warning", dest="loglevel", action="store_const", + const=logging.WARNING, default=logging.ERROR, + help=str_help) + parser.add_argument("--info", dest="loglevel", action="store_const", + const=logging.INFO, default=logging.ERROR, + help=str_help) + parser.add_argument("--debug", dest="loglevel", action="store_const", + const=logging.DEBUG, default=logging.ERROR, + help=str_help) + str_help = "Trailing part of the csv result files for plotting purposes" + parser.add_argument("--results", default="bgp.csv", type=str, help=str_help) + str_help = "Minimum number of updates to reach to include result into csv." + parser.add_argument("--threshold", default="1000", type=int, help=str_help) arguments = parser.parse_args() # TODO: Are sanity checks (such as asnumber>=0) required? return arguments def establish_connection(arguments): - """Establish connection according to arguments, return socket.""" + """Establish connection to BGP peer. + + Arguments: + :arguments: following command-line argumets are used + - arguments.myip: local IP address + - arguments.myport: local port + - arguments.peerip: remote IP address + - arguments.peerport: remote port + Returns: + :return: socket. + """ if arguments.listen: - # print "DEBUG: connecting in the listening case." + logging.info("Connecting in the listening mode.") + logging.debug("Local IP address: " + str(arguments.myip)) + logging.debug("Local port: " + str(arguments.myport)) listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listening_socket.bind((str(arguments.myip), arguments.myport)) # bind need single tuple as argument + # bind need single tuple as argument + listening_socket.bind((str(arguments.myip), arguments.myport)) listening_socket.listen(1) bgp_socket, _ = listening_socket.accept() # TODO: Verify client IP is cotroller IP. listening_socket.close() else: - # print "DEBUG: connecting in the talking case." + logging.info("Connecting in the talking mode.") + logging.debug("Local IP address: " + str(arguments.myip)) + logging.debug("Local port: " + str(arguments.myport)) + logging.debug("Remote IP address: " + str(arguments.peerip)) + logging.debug("Remote port: " + str(arguments.peerport)) talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - talking_socket.bind((str(arguments.myip), arguments.myport)) # bind to force specified address and port - talking_socket.connect((str(arguments.peerip), arguments.peerport)) # socket does not spead ipaddr, hence str() + # bind to force specified address and port + talking_socket.bind((str(arguments.myip), arguments.myport)) + # socket does not spead ipaddr, hence str() + talking_socket.connect((str(arguments.peerip), arguments.peerport)) bgp_socket = talking_socket - print "Connected to ODL." + logging.info("Connected to ODL.") return bgp_socket def get_short_int_from_message(message, offset=16): - """Extract 2-bytes number from packed string, default offset is for BGP message size.""" + """Extract 2-bytes number from provided message. + + Arguments: + :message: given message + :offset: offset of the short_int inside the message + Returns: + :return: required short_inf value. + Notes: + default offset value is the BGP message size offset. + """ high_byte_int = ord(message[offset]) low_byte_int = ord(message[offset + 1]) short_int = high_byte_int * 256 + low_byte_int @@ -86,21 +155,26 @@ def get_short_int_from_message(message, offset=16): class MessageError(ValueError): - """Value error with logging optimized for hexlified messages.""" + """Value error with logging optimized for hexlified messages. + """ def __init__(self, text, message, *args): - """Store and call super init for textual comment, store raw message which caused it.""" + """Initialisation. + + Store and call super init for textual comment, + store raw message which caused it. + """ self.text = text self.msg = message super(MessageError, self).__init__(text, message, *args) def __str__(self): - """ - Generate human readable error message + """Generate human readable error message. - Concatenate text comment, colon with space - and hexlified message. Use a placeholder string - if the message turns out to be empty. + Returns: + :return: human readable message as string + Notes: + Use a placeholder string if the message is to be empty. """ message = binascii.hexlify(self.msg) if message == "": @@ -109,198 +183,888 @@ class MessageError(ValueError): def read_open_message(bgp_socket): - """Receive message, perform some validation, return the raw message.""" + """Receive peer's OPEN message + + Arguments: + :bgp_socket: the socket to be read + Returns: + :return: received OPEN message. + Notes: + Performs just basic incomming message checks + """ msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe? - # TODO: Is it possible for incoming open message to be split in more than one packet? + # TODO: Can the incoming open message be split in more than one packet? # Some validation. - if len(msg_in) < 37: # 37 is minimal length of open message with 4-byte AS number. - raise MessageError("Got something else than open with 4-byte AS number", msg_in) - # TODO: We could check BGP marker, but it is defined only later; decide what to do. + if len(msg_in) < 37: + # 37 is minimal length of open message with 4-byte AS number. + logging.error("Got something else than open with 4-byte AS number: " + + binascii.hexlify(msg_in)) + raise MessageError("Got something else than open with 4-byte AS number", + msg_in) + # TODO: We could check BGP marker, but it is defined only later; + # decide what to do. reported_length = get_short_int_from_message(msg_in) if len(msg_in) != reported_length: - raise MessageError("Message length is not " + reported_length + " in message", msg_in) - print "Open message received" + logging.error("Message length is not " + str(reported_length) + + " as stated in " + binascii.hexlify(msg_in)) + raise MessageError("Message length is not " + reported_length + + " as stated in ", msg_in) + logging.info("Open message received.") return msg_in class MessageGenerator(object): - """Class with methods returning messages and state holding configuration data required to do it properly.""" + """Class which generates messages, holds states and configuration values. + """ - # TODO: Define bgp marker as class (constant) variable. + # TODO: Define bgp marker as a class (constant) variable. def __init__(self, args): - """Initialize data according to command-line args.""" - # Various auxiliary variables. - # Hack: 4-byte AS number uses the same "int to packed" encoding as IPv4 addresses. - asnumber_4bytes = ipaddr.v4_int_to_packed(args.asnumber) - asnumber_2bytes = "\x5b\xa0" # AS_TRANS value, 23456 decadic. - if args.asnumber < 65536: # AS number is mappable to 2 bytes - asnumber_2bytes = asnumber_4bytes[2:4] - # From now on, attribute docsrings are used. - self.int_nextprefix = int(args.firstprefix) - """Prefix IP address for next update message, as integer.""" - self.updates_to_send = args.amount - """Number of update messages left to be sent.""" - # All information ready, so we can define messages. Mostly copied from play.py by Jozef Behran. - # The following attributes are constant. - self.bgp_marker = "\xFF" * 16 - """Every message starts with this, see rfc4271#section-4.1""" - self.keepalive_message = self.bgp_marker + ( - "\x00\x13" # Size - "\x04" # Type KEEPALIVE - ) - """KeepAlive message, see rfc4271#section-4.4""" + """Initialisation according to command-line args. + + Arguments: + :args: argsparser's Namespace object which contains command-line + options for MesageGenerator initialisation + Notes: + Calculates and stores default values used later on for + message geeration. + """ + self.total_prefix_amount = args.amount + # Number of update messages left to be sent. + self.remaining_prefixes = self.total_prefix_amount + + # New parameters initialisation + self.iteration = 0 + self.prefix_base_default = args.firstprefix + self.prefix_length_default = args.prefixlen + self.wr_prefixes_default = [] + self.nlri_prefixes_default = [] + self.version_default = 4 + self.my_autonomous_system_default = args.asnumber + self.hold_time_default = args.holdtime # Local hold time. + self.bgp_identifier_default = int(args.myip) + self.next_hop_default = args.nexthop + self.single_update_default = args.updates == "single" + self.randomize_updates_default = args.updates == "random" + self.prefix_count_to_add_default = args.insert + self.prefix_count_to_del_default = args.withdraw + if self.prefix_count_to_del_default < 0: + self.prefix_count_to_del_default = 0 + if not self.single_update_default and not self.prefix_count_to_del_default: + self.prefix_count_to_del_default = 1 + # we need some content for the 2nd UPDATE in the iteration + if self.prefix_count_to_add_default <= self.prefix_count_to_del_default: + # total number of prefixes must grow to avoid infinite test loop + self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1 + self.slot_size_default = self.prefix_count_to_add_default + self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill + self.results_file_name_default = args.results + self.performance_threshold_default = args.threshold + # Default values used for randomized part + s1_slots = ((self.total_prefix_amount - + self.remaining_prefixes_threshold - 1) / + self.prefix_count_to_add_default + 1) + s2_slots = ((self.remaining_prefixes_threshold - 1) / + (self.prefix_count_to_add_default - + self.prefix_count_to_del_default) + 1) + # S1_First_Index = 0 + # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1 + s2_first_index = s1_slots * self.prefix_count_to_add_default + s2_last_index = (s2_first_index + + s2_slots * (self.prefix_count_to_add_default - + self.prefix_count_to_del_default) - 1) + self.slot_gap_default = ((self.total_prefix_amount - + self.remaining_prefixes_threshold - 1) / + self.prefix_count_to_add_default + 1) + self.randomize_lowest_default = s2_first_index + self.randomize_highest_default = s2_last_index + + # Initialising counters + self.phase1_start_time = 0 + self.phase1_stop_time = 0 + self.phase2_start_time = 0 + self.phase2_stop_time = 0 + self.phase1_updates_sent = 0 + self.phase2_updates_sent = 0 + self.updates_sent = 0 + + # Needed for the MessageGenerator performance optimization + self.log_info = args.loglevel <= logging.INFO + self.log_debug = args.loglevel <= logging.DEBUG + + logging.info("Generator initialisation") + logging.info(" Target total number of prefixes to be introduced: " + + str(self.total_prefix_amount)) + logging.info(" Prefix base: " + str(self.prefix_base_default) + "/" + + str(self.prefix_length_default)) + logging.info(" My Autonomous System number: " + + str(self.my_autonomous_system_default)) + logging.info(" My Hold Time: " + str(self.hold_time_default)) + logging.info(" My BGP Identifier: " + str(self.bgp_identifier_default)) + logging.info(" Next Hop: " + str(self.next_hop_default)) + logging.info(" Prefix count to be inserted at once: " + + str(self.prefix_count_to_add_default)) + logging.info(" Prefix count to be withdrawn at once: " + + str(self.prefix_count_to_del_default)) + logging.info(" Fast pre-fill up to " + + str(self.total_prefix_amount - + self.remaining_prefixes_threshold) + " prefixes") + logging.info(" Remaining number of prefixes to be processed " + + "in parallel with withdrawals: " + + str(self.remaining_prefixes_threshold)) + logging.debug(" Prefix index range used after pre-fill procedure [" + + str(self.randomize_lowest_default) + ", " + + str(self.randomize_highest_default) + "]") + if self.single_update_default: + logging.info(" Common single UPDATE will be generated " + + "for both NLRI & WITHDRAWN lists") + else: + logging.info(" Two separate UPDATEs will be generated " + + "for each NLRI & WITHDRAWN lists") + if self.randomize_updates_default: + logging.info(" Generation of UPDATE messages will be randomized") + logging.info(" Let\"s go ...\n") + # TODO: Notification for hold timer expiration can be handy. - self.eor_message = self.bgp_marker + ( - "\x00\x17" # Size - "\x02" # Type (UPDATE) - "\x00\x00" # Withdrawn routes length (0) - "\x00\x00" # Total Path Attributes Length (0) - ) - """End-of-RIB marker, see rfc4724#section-2""" - self.update_message_without_prefix = self.bgp_marker + ( - "\x00\x30" # Size - "\x02" # Type (UPDATE) - "\x00\x00" # Withdrawn routes length (0) - "\x00\x14" # Total Path Attributes Length (20) - "\x40" # Flags ("Well-Known") - "\x01" # Type (ORIGIN) - "\x01" # Length (1) - "\x00" # Origin: IGP - "\x40" # Flags ("Well-Known") - "\x02" # Type (AS_PATH) - "\x06" # Length (6) - "\x02" # AS segment type (AS_SEQUENCE) - "\x01" # AS segment length (1) - + asnumber_4bytes + # AS segment (4 bytes) - "\x40" # Flags ("Well-Known") - "\x03" # Type (NEXT_HOP) - "\x04" # Length (4) - + args.nexthop.packed + # IP address of the next hop (4 bytes) - "\x1c" # IPv4 prefix length, see RFC 4271, page 20. This tool uses Network Mask: 255.255.255.240 - ) - """The IP address prefix (4 bytes) has to be appended to complete Update message, see rfc4271#section-4.3.""" - self.open_message = self.bgp_marker + ( - "\x00\x2d" # Size - "\x01" # Type (OPEN) - "\x04" # BGP Varsion (4) - + asnumber_2bytes + # My Autonomous System - # FIXME: The following hold time is hardcoded separately. Compute from initial hold_time value. - "\x00\xb4" # Hold Time (180) - + args.myip.packed + # BGP Identifer - "\x10" # Optional parameters length + + def store_results(self, file_name=None, threshold=None): + """ Stores specified results into files based on file_name value. + + Arguments: + :param file_name: Trailing (common) part of result file names + :param threshold: Minimum number of sent updates needed for each + result to be included into result csv file + (mainly needed because of the result accuracy) + Returns: + :return: n/a + """ + # default values handling + if file_name is None: + file_name = self.results_file_name_default + if threshold is None: + threshold = self.performance_threshold_default + # performance calculation + if self.phase1_updates_sent >= threshold: + totals1 = self.phase1_updates_sent + performance1 = int(self.phase1_updates_sent / + (self.phase1_stop_time - self.phase1_start_time)) + else: + totals1 = None + performance1 = None + if self.phase2_updates_sent >= threshold: + totals2 = self.phase2_updates_sent + performance2 = int(self.phase2_updates_sent / + (self.phase2_stop_time - self.phase2_start_time)) + else: + totals2 = None + performance2 = None + + logging.info("#" * 10 + " Final results " + "#" * 10) + logging.info("Number of iterations: " + str(self.iteration)) + logging.info("Number of UPDATE messages sent in the pre-fill phase: " + + str(self.phase1_updates_sent)) + logging.info("The pre-fill phase duration: " + + str(self.phase1_stop_time - self.phase1_start_time) + "s") + logging.info("Number of UPDATE messages sent in the 2nd test phase: " + + str(self.phase2_updates_sent)) + logging.info("The 2nd test phase duration: " + + str(self.phase2_stop_time - self.phase2_start_time) + "s") + logging.info("Threshold for performance reporting: " + str(threshold)) + + # making labels + phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) + + " route(s) per UPDATE") + if self.single_update_default: + phase2_label = "+" + (str(self.prefix_count_to_add_default) + + "/-" + str(self.prefix_count_to_del_default) + + " routes per UPDATE") + else: + phase2_label = "+" + (str(self.prefix_count_to_add_default) + + "/-" + str(self.prefix_count_to_del_default) + + " routes in two UPDATEs") + # collecting capacity and performance results + totals = {} + performance = {} + if totals1 is not None: + totals[phase1_label] = totals1 + performance[phase1_label] = performance1 + if totals2 is not None: + totals[phase2_label] = totals2 + performance[phase2_label] = performance2 + self.write_results_to_file(totals, "totals-" + file_name) + self.write_results_to_file(performance, "performance-" + file_name) + + def write_results_to_file(self, results, file_name): + """Writes results to the csv plot file consumable by Jenkins. + + Arguments: + :param file_name: Name of the (csv) file to be created + Returns: + :return: none + """ + first_line = "" + second_line = "" + f = open(file_name, "wt") + try: + for key in sorted(results): + first_line += key + ", " + second_line += str(results[key]) + ", " + first_line = first_line[:-2] + second_line = second_line[:-2] + f.write(first_line + "\n") + f.write(second_line + "\n") + logging.info("Performance results of message generator stored in " + + file_name + ':') + logging.info(" " + first_line) + logging.info(" " + second_line) + finally: + f.close() + + # Return pseudo-randomized (reproducible) index for selected range + def randomize_index(self, index, lowest=None, highest=None): + """Calculates pseudo-randomized index from selected range. + + Arguments: + :param index: input index + :param lowest: the lowes index from the randomized area + :param highest: the highest index from the randomized area + Returns: + :return: the (pseudo)randomized index + Notes: + Created just as a fame for future generator enhancement. + """ + # default values handling + if lowest is None: + lowest = self.randomize_lowest_default + if highest is None: + highest = self.randomize_highest_default + # randomize + if (index >= lowest) and (index <= highest): + # we are in the randomized range -> shuffle it inside + # the range (now just reverse the order) + new_index = highest - (index - lowest) + else: + # we are out of the randomized range -> nothing to do + new_index = index + return new_index + + # Get list of prefixes + def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None, + prefix_len=None, prefix_count=None, randomize=None): + """Generates list of IP address prefixes. + + Arguments: + :param slot_index: index of group of prefix addresses + :param slot_size: size of group of prefix addresses + in [number of included prefixes] + :param prefix_base: IP address of the first prefix + (slot_index = 0, prefix_index = 0) + :param prefix_len: length of the prefix in bites + (the same as size of netmask) + :param prefix_count: number of prefixes to be returned + from the specified slot + Returns: + :return: list of generated IP address prefixes + """ + # default values handling + if slot_size is None: + slot_size = self.slot_size_default + if prefix_base is None: + prefix_base = self.prefix_base_default + if prefix_len is None: + prefix_len = self.prefix_length_default + if prefix_count is None: + prefix_count = slot_size + if randomize is None: + randomize = self.randomize_updates_default + # generating list of prefixes + indexes = [] + prefixes = [] + prefix_gap = 2 ** (32 - prefix_len) + for i in range(prefix_count): + prefix_index = slot_index * slot_size + i + if randomize: + prefix_index = self.randomize_index(prefix_index) + indexes.append(prefix_index) + prefixes.append(prefix_base + prefix_index * prefix_gap) + if self.log_debug: + logging.debug(" Prefix slot index: " + str(slot_index)) + logging.debug(" Prefix slot size: " + str(slot_size)) + logging.debug(" Prefix count: " + str(prefix_count)) + logging.debug(" Prefix indexes: " + str(indexes)) + logging.debug(" Prefix list: " + str(prefixes)) + return prefixes + + def compose_update_message(self, prefix_count_to_add=None, + prefix_count_to_del=None): + """Composes an UPDATE message + + Arguments: + :param prefix_count_to_add: # of prefixes to put into NLRI list + :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list + Returns: + :return: encoded UPDATE message in HEX + Notes: + Optionally generates separate UPDATEs for NLRI and WITHDRAWN + lists or common message wich includes both prefix lists. + Updates global counters. + """ + # default values handling + if prefix_count_to_add is None: + prefix_count_to_add = self.prefix_count_to_add_default + if prefix_count_to_del is None: + prefix_count_to_del = self.prefix_count_to_del_default + # logging + if self.log_info and not (self.iteration % 1000): + logging.info("Iteration: " + str(self.iteration) + + " - total remaining prefixes: " + + str(self.remaining_prefixes)) + if self.log_debug: + logging.debug("#" * 10 + " Iteration: " + + str(self.iteration) + " " + "#" * 10) + logging.debug("Remaining prefixes: " + + str(self.remaining_prefixes)) + # scenario type & one-shot counter + straightforward_scenario = (self.remaining_prefixes > + self.remaining_prefixes_threshold) + if straightforward_scenario: + prefix_count_to_del = 0 + if self.log_debug: + logging.debug("--- STARAIGHTFORWARD SCENARIO ---") + if not self.phase1_start_time: + self.phase1_start_time = time.time() + else: + if self.log_debug: + logging.debug("--- COMBINED SCENARIO ---") + if not self.phase2_start_time: + self.phase2_start_time = time.time() + # tailor the number of prefixes if needed + prefix_count_to_add = (prefix_count_to_del + + min(prefix_count_to_add - prefix_count_to_del, + self.remaining_prefixes)) + # prefix slots selection for insertion and withdrawal + slot_index_to_add = self.iteration + slot_index_to_del = slot_index_to_add - self.slot_gap_default + # getting lists of prefixes for insertion in this iteration + if self.log_debug: + logging.debug("Prefixes to be inserted in this iteration:") + prefix_list_to_add = self.get_prefix_list(slot_index_to_add, + prefix_count=prefix_count_to_add) + # getting lists of prefixes for withdrawal in this iteration + if self.log_debug: + logging.debug("Prefixes to be withdrawn in this iteration:") + prefix_list_to_del = self.get_prefix_list(slot_index_to_del, + prefix_count=prefix_count_to_del) + # generating the mesage + if self.single_update_default: + # Send prefixes to be introduced and withdrawn + # in one UPDATE message + msg_out = self.update_message(wr_prefixes=prefix_list_to_del, + nlri_prefixes=prefix_list_to_add) + else: + # Send prefixes to be introduced and withdrawn + # in separate UPDATE messages (if needed) + msg_out = self.update_message(wr_prefixes=[], + nlri_prefixes=prefix_list_to_add) + if prefix_count_to_del: + msg_out += self.update_message(wr_prefixes=prefix_list_to_del, + nlri_prefixes=[]) + # updating counters - who knows ... maybe I am last time here ;) + if straightforward_scenario: + self.phase1_stop_time = time.time() + self.phase1_updates_sent = self.updates_sent + else: + self.phase2_stop_time = time.time() + self.phase2_updates_sent = (self.updates_sent - + self.phase1_updates_sent) + # updating totals for the next iteration + self.iteration += 1 + self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del) + # returning the encoded message + return msg_out + + # Section of message encoders + + def open_message(self, version=None, my_autonomous_system=None, + hold_time=None, bgp_identifier=None): + """Generates an OPEN Message (rfc4271#section-4.2) + + Arguments: + :param version: see the rfc4271#section-4.2 + :param my_autonomous_system: see the rfc4271#section-4.2 + :param hold_time: see the rfc4271#section-4.2 + :param bgp_identifier: see the rfc4271#section-4.2 + Returns: + :return: encoded OPEN message in HEX + """ + + # Default values handling + if version is None: + version = self.version_default + if my_autonomous_system is None: + my_autonomous_system = self.my_autonomous_system_default + if hold_time is None: + hold_time = self.hold_time_default + if bgp_identifier is None: + bgp_identifier = self.bgp_identifier_default + + # Marker + marker_hex = "\xFF" * 16 + + # Type + type = 1 + type_hex = struct.pack("B", type) + + # version + version_hex = struct.pack("B", version) + + # my_autonomous_system + # AS_TRANS value, 23456 decadic. + my_autonomous_system_2_bytes = 23456 + # AS number is mappable to 2 bytes + if my_autonomous_system < 65536: + my_autonomous_system_2_bytes = my_autonomous_system + my_autonomous_system_hex_2_bytes = struct.pack(">H", + my_autonomous_system) + + # Hold Time + hold_time_hex = struct.pack(">H", hold_time) + + # BGP Identifier + bgp_identifier_hex = struct.pack(">I", bgp_identifier) + + # Optional Parameters + optional_parameters_hex = ( "\x02" # Param type ("Capability Ad") "\x06" # Length (6 bytes) - "\x01" # Capability type (NLRI Unicast), see RFC 4760, secton 8 + "\x01" # Capability type (NLRI Unicast), + # see RFC 4760, secton 8 "\x04" # Capability value length "\x00\x01" # AFI (Ipv4) "\x00" # (reserved) "\x01" # SAFI (Unicast) + "\x02" # Param type ("Capability Ad") "\x06" # Length (6 bytes) - "\x41" # "32 bit AS Numbers Support" (see RFC 6793, section 3) + "\x41" # "32 bit AS Numbers Support" + # (see RFC 6793, section 3) "\x04" # Capability value length - + asnumber_4bytes # My AS in 32 bit format + # My AS in 32 bit format + + struct.pack(">I", my_autonomous_system) ) - """Open message, see rfc4271#section-4.2""" - # __init__ ends - - def compose_update_message(self): - """Return update message, prepare next prefix, decrease amount without checking it.""" - prefix_packed = ipaddr.v4_int_to_packed(self.int_nextprefix) - # print "DEBUG: prefix", self.int_nextprefix, "packed to", binascii.hexlify(prefix_packed) - msg_out = self.update_message_without_prefix + prefix_packed - self.int_nextprefix += 16 # Hardcoded, as open message specifies such netmask. - self.updates_to_send -= 1 - return msg_out + + # Optional Parameters Length + optional_parameters_length = len(optional_parameters_hex) + optional_parameters_length_hex = struct.pack("B", + optional_parameters_length) + + # Length (big-endian) + length = ( + len(marker_hex) + 2 + len(type_hex) + len(version_hex) + + len(my_autonomous_system_hex_2_bytes) + + len(hold_time_hex) + len(bgp_identifier_hex) + + len(optional_parameters_length_hex) + + len(optional_parameters_hex) + ) + length_hex = struct.pack(">H", length) + + # OPEN Message + message_hex = ( + marker_hex + + length_hex + + type_hex + + version_hex + + my_autonomous_system_hex_2_bytes + + hold_time_hex + + bgp_identifier_hex + + optional_parameters_length_hex + + optional_parameters_hex + ) + + if self.log_debug: + logging.debug("OPEN Message encoding") + logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logging.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logging.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logging.debug(" Version=" + str(version) + " (0x" + + binascii.hexlify(version_hex) + ")") + logging.debug(" My Autonomous System=" + + str(my_autonomous_system_2_bytes) + " (0x" + + binascii.hexlify(my_autonomous_system_hex_2_bytes) + + ")") + logging.debug(" Hold Time=" + str(hold_time) + " (0x" + + binascii.hexlify(hold_time_hex) + ")") + logging.debug(" BGP Identifier=" + str(bgp_identifier) + + " (0x" + binascii.hexlify(bgp_identifier_hex) + ")") + logging.debug(" Optional Parameters Length=" + + str(optional_parameters_length) + " (0x" + + binascii.hexlify(optional_parameters_length_hex) + + ")") + logging.debug(" Optional Parameters=0x" + + binascii.hexlify(optional_parameters_hex)) + logging.debug(" OPEN Message encoded: 0x" + + binascii.b2a_hex(message_hex)) + + return message_hex + + def update_message(self, wr_prefixes=None, nlri_prefixes=None, + wr_prefix_length=None, nlri_prefix_length=None, + my_autonomous_system=None, next_hop=None): + """Generates an UPDATE Message (rfc4271#section-4.3) + + Arguments: + :param wr_prefixes: see the rfc4271#section-4.3 + :param nlri_prefixes: see the rfc4271#section-4.3 + :param wr_prefix_length: see the rfc4271#section-4.3 + :param nlri_prefix_length: see the rfc4271#section-4.3 + :param my_autonomous_system: see the rfc4271#section-4.3 + :param next_hop: see the rfc4271#section-4.3 + Returns: + :return: encoded UPDATE message in HEX + """ + + # Default values handling + if wr_prefixes is None: + wr_prefixes = self.wr_prefixes_default + if nlri_prefixes is None: + nlri_prefixes = self.nlri_prefixes_default + if wr_prefix_length is None: + wr_prefix_length = self.prefix_length_default + if nlri_prefix_length is None: + nlri_prefix_length = self.prefix_length_default + if my_autonomous_system is None: + my_autonomous_system = self.my_autonomous_system_default + if next_hop is None: + next_hop = self.next_hop_default + + # Marker + marker_hex = "\xFF" * 16 + + # Type + type = 2 + type_hex = struct.pack("B", type) + + # Withdrawn Routes + bytes = ((wr_prefix_length - 1) / 8) + 1 + withdrawn_routes_hex = "" + for prefix in wr_prefixes: + withdrawn_route_hex = (struct.pack("B", wr_prefix_length) + + struct.pack(">I", int(prefix))[:bytes]) + withdrawn_routes_hex += withdrawn_route_hex + + # Withdrawn Routes Length + withdrawn_routes_length = len(withdrawn_routes_hex) + withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length) + + # TODO: to replace hardcoded string by encoding? + # Path Attributes + if nlri_prefixes != []: + path_attributes_hex = ( + "\x40" # Flags ("Well-Known") + "\x01" # Type (ORIGIN) + "\x01" # Length (1) + "\x00" # Origin: IGP + "\x40" # Flags ("Well-Known") + "\x02" # Type (AS_PATH) + "\x06" # Length (6) + "\x02" # AS segment type (AS_SEQUENCE) + "\x01" # AS segment length (1) + # AS segment (4 bytes) + + struct.pack(">I", my_autonomous_system) + + "\x40" # Flags ("Well-Known") + "\x03" # Type (NEXT_HOP) + "\x04" # Length (4) + # IP address of the next hop (4 bytes) + + struct.pack(">I", int(next_hop)) + ) + else: + path_attributes_hex = "" + + # Total Path Attributes Length + total_path_attributes_length = len(path_attributes_hex) + total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length) + + # Network Layer Reachability Information + bytes = ((nlri_prefix_length - 1) / 8) + 1 + nlri_hex = "" + for prefix in nlri_prefixes: + nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) + + struct.pack(">I", int(prefix))[:bytes]) + nlri_hex += nlri_prefix_hex + + # Length (big-endian) + length = ( + len(marker_hex) + 2 + len(type_hex) + + len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) + + len(total_path_attributes_length_hex) + len(path_attributes_hex) + + len(nlri_hex)) + length_hex = struct.pack(">H", length) + + # UPDATE Message + message_hex = ( + marker_hex + + length_hex + + type_hex + + withdrawn_routes_length_hex + + withdrawn_routes_hex + + total_path_attributes_length_hex + + path_attributes_hex + + nlri_hex + ) + + if self.log_debug: + logging.debug("UPDATE Message encoding") + logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logging.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logging.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logging.debug(" withdrawn_routes_length=" + + str(withdrawn_routes_length) + " (0x" + + binascii.hexlify(withdrawn_routes_length_hex) + ")") + logging.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" + + str(wr_prefix_length) + " (0x" + + binascii.hexlify(withdrawn_routes_hex) + ")") + logging.debug(" Total Path Attributes Length=" + + str(total_path_attributes_length) + " (0x" + + binascii.hexlify(total_path_attributes_length_hex) + + ")") + logging.debug(" Path Attributes=" + "(0x" + + binascii.hexlify(path_attributes_hex) + ")") + logging.debug(" Network Layer Reachability Information=" + + str(nlri_prefixes) + "/" + str(nlri_prefix_length) + + " (0x" + binascii.hexlify(nlri_hex) + ")") + logging.debug(" UPDATE Message encoded: 0x" + + binascii.b2a_hex(message_hex)) + + # updating counter + self.updates_sent += 1 + # returning encoded message + return message_hex + + def notification_message(self, error_code, error_subcode, data_hex=""): + """Generates a NOTIFICATION Message (rfc4271#section-4.5) + + Arguments: + :param error_code: see the rfc4271#section-4.5 + :param error_subcode: see the rfc4271#section-4.5 + :param data_hex: see the rfc4271#section-4.5 + Returns: + :return: encoded NOTIFICATION message in HEX + """ + + # Marker + marker_hex = "\xFF" * 16 + + # Type + type = 3 + type_hex = struct.pack("B", type) + + # Error Code + error_code_hex = struct.pack("B", error_code) + + # Error Subode + error_subcode_hex = struct.pack("B", error_subcode) + + # Length (big-endian) + length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) + + len(error_subcode_hex) + len(data_hex)) + length_hex = struct.pack(">H", length) + + # NOTIFICATION Message + message_hex = ( + marker_hex + + length_hex + + type_hex + + error_code_hex + + error_subcode_hex + + data_hex + ) + + if self.log_debug: + logging.debug("NOTIFICATION Message encoding") + logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logging.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logging.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logging.debug(" Error Code=" + str(error_code) + " (0x" + + binascii.hexlify(error_code_hex) + ")") + logging.debug(" Error Subode=" + str(error_subcode) + " (0x" + + binascii.hexlify(error_subcode_hex) + ")") + logging.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")") + logging.debug(" NOTIFICATION Message encoded: 0x" + + binascii.b2a_hex(message_hex)) + + return message_hex + + def keepalive_message(self): + """Generates a KEEP ALIVE Message (rfc4271#section-4.4) + + Returns: + :return: encoded KEEP ALIVE message in HEX + """ + + # Marker + marker_hex = "\xFF" * 16 + + # Type + type = 4 + type_hex = struct.pack("B", type) + + # Length (big-endian) + length = len(marker_hex) + 2 + len(type_hex) + length_hex = struct.pack(">H", length) + + # KEEP ALIVE Message + message_hex = ( + marker_hex + + length_hex + + type_hex + ) + + if self.log_debug: + logging.debug("KEEP ALIVE Message encoding") + logging.debug(" Marker=0x" + binascii.hexlify(marker_hex)) + logging.debug(" Length=" + str(length) + " (0x" + + binascii.hexlify(length_hex) + ")") + logging.debug(" Type=" + str(type) + " (0x" + + binascii.hexlify(type_hex) + ")") + logging.debug(" KEEP ALIVE Message encoded: 0x" + + binascii.b2a_hex(message_hex)) + + return message_hex class TimeTracker(object): - """Class for tracking timers, both for my keepalives and peer's hold time.""" + """Class for tracking timers, both for my keepalives and + peer's hold time. + """ def __init__(self, msg_in): - """Initialize config, based on hardcoded defaults and open message from peer.""" - # Note: Relative time is always named timedelta, to stress that (non-delta) time is absolute. + """Initialisation. based on defaults and OPEN message from peer. + + Arguments: + msg_in: the OPEN message received from peer. + """ + # Note: Relative time is always named timedelta, to stress that + # the (non-delta) time is absolute. self.report_timedelta = 1.0 # In seconds. TODO: Configurable? - """Upper bound for being stuck in the same state, we should at least report something before continuing.""" - # Negotiate the hold timer by taking the smaller of the 2 values (mine and the peer's). + # Upper bound for being stuck in the same state, we should + # at least report something before continuing. + # Negotiate the hold timer by taking the smaller + # of the 2 values (mine and the peer's). hold_timedelta = 180 # Not an attribute of self yet. - # TODO: Make the default value configurable, default value could mirror what peer said. + # TODO: Make the default value configurable, + # default value could mirror what peer said. peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22) if hold_timedelta > peer_hold_timedelta: hold_timedelta = peer_hold_timedelta if hold_timedelta != 0 and hold_timedelta < 3: + logging.error("Invalid hold timedelta value: " + str(hold_timedelta)) raise ValueError("Invalid hold timedelta value: ", hold_timedelta) - self.hold_timedelta = hold_timedelta # only now the final value is visible from outside - """If we do not hear from peer this long, we assume it has died.""" + self.hold_timedelta = hold_timedelta + # If we do not hear from peer this long, we assume it has died. self.keepalive_timedelta = int(hold_timedelta / 3.0) - """Upper limit for duration between messages, to avoid being declared dead.""" - self.snapshot_time = time.time() # The same as calling snapshot(), but also declares a field. - """Sometimes we need to store time. This is where to get the value from afterwards.""" - self.peer_hold_time = self.snapshot_time + self.hold_timedelta # time_keepalive may be too strict - """At this time point, peer will be declared dead.""" + # Upper limit for duration between messages, to avoid being + # declared to be dead. + # The same as calling snapshot(), but also declares a field. + self.snapshot_time = time.time() + # Sometimes we need to store time. This is where to get + # the value from afterwards. Time_keepalive may be too strict. + self.peer_hold_time = self.snapshot_time + self.hold_timedelta + # At this time point, peer will be declared dead. self.my_keepalive_time = None # to be set later - """At this point, we should be sending keepalive message.""" + # At this point, we should be sending keepalive message. def snapshot(self): """Store current time in instance data to use later.""" - self.snapshot_time = time.time() # Read as time before something interesting was called. + # Read as time before something interesting was called. + self.snapshot_time = time.time() def reset_peer_hold_time(self): """Move hold time to future as peer has just proven it still lives.""" self.peer_hold_time = time.time() + self.hold_timedelta - # Some methods could rely on self.snapshot_time, but it is better to require user to provide it explicitly. + # Some methods could rely on self.snapshot_time, but it is better + # to require user to provide it explicitly. def reset_my_keepalive_time(self, keepalive_time): - """Move KA timer to future based on given time from before sending.""" + """Calculate and set the next my KEEP ALIVE timeout time + + Arguments: + :keepalive_time: the initial value of the KEEP ALIVE timer + """ self.my_keepalive_time = keepalive_time + self.keepalive_timedelta def is_time_for_my_keepalive(self): + """Check for my KEEP ALIVE timeout occurence""" if self.hold_timedelta == 0: return False return self.snapshot_time >= self.my_keepalive_time def get_next_event_time(self): + """Set the time of the next expected or to be sent KEEP ALIVE""" if self.hold_timedelta == 0: return self.snapshot_time + 86400 return min(self.my_keepalive_time, self.peer_hold_time) def check_peer_hold_time(self, snapshot_time): """Raise error if nothing was read from peer until specified time.""" - if self.hold_timedelta != 0: # Hold time = 0 means keepalive checking off. - if snapshot_time > self.peer_hold_time: # time.time() may be too strict - raise RuntimeError("Peer has overstepped the hold timer.") # TODO: Include hold_timedelta? - # TODO: Add notification sending (attempt). That means move to write tracker. + # Hold time = 0 means keepalive checking off. + if self.hold_timedelta != 0: + # time.time() may be too strict + if snapshot_time > self.peer_hold_time: + logging.error("Peer has overstepped the hold timer.") + raise RuntimeError("Peer has overstepped the hold timer.") + # TODO: Include hold_timedelta? + # TODO: Add notification sending (attempt). That means + # move to write tracker. class ReadTracker(object): - """Class for tracking read of mesages chunk by chunk and for idle waiting.""" + """Class for tracking read of mesages chunk by chunk and + for idle waiting. + """ def __init__(self, bgp_socket, timer): - """Set initial state.""" + """The reader initialisation. + + Arguments: + bgp_socket: socket to be used for sending + timer: timer to be used for scheduling + """ # References to outside objects. self.socket = bgp_socket self.timer = timer - # Really new fields. + # BGP marker length plus length field length. self.header_length = 18 - """BGP marker length plus length field length.""" # TODO: make it class (constant) attribute + # TODO: make it class (constant) attribute + # Computation of where next chunk ends depends on whether + # we are beyond length field. self.reading_header = True - """Computation of where next chunk ends depends on whether we are beyond length field.""" + # Countdown towards next size computation. self.bytes_to_read = self.header_length - """Countdown towards next size computation.""" + # Incremental buffer for message under read. self.msg_in = "" - """Incremental buffer for message under read.""" def read_message_chunk(self): - """Read up to one message, do not return anything.""" - # TODO: We also could return the whole message, but currently nobody cares. + """Read up to one message + + Note: + Currently it does not return anything. + """ + # TODO: We could return the whole message, currently not needed. # We assume the socket is readable. chunk_message = self.socket.recv(self.bytes_to_read) self.msg_in += chunk_message self.bytes_to_read -= len(chunk_message) - if not self.bytes_to_read: # TODO: bytes_to_read < 0 is not possible, right? + # TODO: bytes_to_read < 0 is not possible, right? + if not self.bytes_to_read: # Finished reading a logical block. if self.reading_header: - # The logical block was a BGP header. Now we know size of message. + # The logical block was a BGP header. + # Now we know the size of the message. self.reading_header = False self.bytes_to_read = get_short_int_from_message(self.msg_in) else: # We have finished reading the body of the message. @@ -308,19 +1072,27 @@ class ReadTracker(object): self.timer.reset_peer_hold_time() # TODO: Do we want to count received messages? # This version ignores the received message. - # TODO: Should we do validation and exit on anything besides update or keepalive? + # TODO: Should we do validation and exit on anything + # besides update or keepalive? # Prepare state for reading another message. self.msg_in = "" self.reading_header = True self.bytes_to_read = self.header_length - # We should not act upon peer_hold_time if we are reading something right now. + # We should not act upon peer_hold_time if we are reading + # something right now. return def wait_for_read(self): - """When we know there are no more updates to send, we use this to avoid busy-wait.""" - # First, compute time to first predictable state change (or report event) + """Read message until timeout (next expected event). + + Note: + Used when no more updates has to be sent to avoid busy-wait. + Currently it does not return anything. + """ + # Compute time to the first predictable state change event_time = self.timer.get_next_event_time() - wait_timedelta = event_time - time.time() # snapshot_time would be imprecise + # snapshot_time would be imprecise + wait_timedelta = event_time - time.time() if wait_timedelta < 0: # The program got around to waiting to an event in "very near # future" so late that it became a "past" event, thus tell @@ -335,10 +1107,17 @@ class ReadTracker(object): class WriteTracker(object): - """Class tracking enqueueing messages and sending chunks of them.""" + """Class tracking enqueueing messages and sending chunks of them. + """ def __init__(self, bgp_socket, generator, timer): - """Set initial state.""" + """The writter initialisation. + + Arguments: + bgp_socket: socket to be used for sending + generator: generator to be used for message generation + timer: timer to be used for scheduling + """ # References to outside objects, self.socket = bgp_socket self.generator = generator @@ -350,25 +1129,34 @@ class WriteTracker(object): self.msg_out = "" def enqueue_message_for_sending(self, message): - """Change write state to include the message.""" + """Enqueue message and change state. + + Arguments: + message: message to be enqueued into the msg_out buffer + """ self.msg_out += message self.bytes_to_send += len(message) self.sending_message = True def send_message_chunk_is_whole(self): - """Perform actions related to sending (chunk of) message, return whether message was completed.""" + """Send enqueued data from msg_out buffer + + Returns: + :return: true if no remaining data to send + """ # We assume there is a msg_out to send and socket is writable. # print "going to send", repr(self.msg_out) self.timer.snapshot() bytes_sent = self.socket.send(self.msg_out) - self.msg_out = self.msg_out[bytes_sent:] # Forget the part of message that was sent. + # Forget the part of message that was sent. + self.msg_out = self.msg_out[bytes_sent:] self.bytes_to_send -= bytes_sent if not self.bytes_to_send: # TODO: Is it possible to hit negative bytes_to_send? self.sending_message = False # We should have reset hold timer on peer side. self.timer.reset_my_keepalive_time(self.timer.snapshot_time) - # Which means the possible reason for not prioritizing reads is gone. + # The possible reason for not prioritizing reads is gone. return True return False @@ -377,7 +1165,13 @@ class StateTracker(object): """Main loop has state so complex it warrants this separate class.""" def __init__(self, bgp_socket, generator, timer): - """Set the initial state according to existing socket and generator.""" + """The state tracker initialisation. + + Arguments: + bgp_socket: socket to be used for sending / receiving + generator: generator to be used for message generation + timer: timer to be used for scheduling + """ # References to outside objects. self.socket = bgp_socket self.generator = generator @@ -387,92 +1181,136 @@ class StateTracker(object): self.writer = WriteTracker(bgp_socket, generator, timer) # Prioritization state. self.prioritize_writing = False - """ - In general, we prioritize reading over writing. But in order to not get blocked by neverending reads, - we should check whether we are not risking running out of holdtime. - So in some situations, this field is set to True to attempt finishing sending a message, - after which this field resets back to False. - """ - # TODO: Alternative is to switch fairly between reading and writing (called round robin from now on). + # In general, we prioritize reading over writing. But in order + # not to get blocked by neverending reads, we should + # check whether we are not risking running out of holdtime. + # So in some situations, this field is set to True to attempt + # finishing sending a message, after which this field resets + # back to False. + # TODO: Alternative is to switch fairly between reading and + # writing (called round robin from now on). # Message counting is done in generator. def perform_one_loop_iteration(self): - """Calculate priority, resolve all ifs, call appropriate method, return to caller to repeat.""" + """ The main loop iteration + + Notes: + Calculates priority, resolves all conditions, calls + appropriate method and returns to caller to repeat. + """ self.timer.snapshot() if not self.prioritize_writing: if self.timer.is_time_for_my_keepalive(): if not self.writer.sending_message: # We need to schedule a keepalive ASAP. - self.writer.enqueue_message_for_sending(self.generator.keepalive_message) - # We are sending a message now, so prioritize finishing it. + self.writer.enqueue_message_for_sending(self.generator.keepalive_message()) + # We are sending a message now, so let's prioritize it. self.prioritize_writing = True - # Now we know what our priorities are, we have to check which actions are available. - # socket.socket() returns three lists, we store them to list of lists. - list_list = select.select([self.socket], [self.socket], [self.socket], self.timer.report_timedelta) + # Now we know what our priorities are, we have to check + # which actions are available. + # socket.socket() returns three lists, + # we store them to list of lists. + list_list = select.select([self.socket], [self.socket], [self.socket], + self.timer.report_timedelta) read_list, write_list, except_list = list_list - # Lists are unpacked, each is either [] or [self.socket], so we will test them as boolean. + # Lists are unpacked, each is either [] or [self.socket], + # so we will test them as boolean. if except_list: + logging.error("Exceptional state on the socket.") raise RuntimeError("Exceptional state on socket", self.socket) # We will do either read or write. if not (self.prioritize_writing and write_list): - # Either we have no reason to rush writes, or the socket is not writable. + # Either we have no reason to rush writes, + # or the socket is not writable. # We are focusing on reading here. if read_list: # there is something to read indeed - # In this case we want to read chunk of message and repeat the select, + # In this case we want to read chunk of message + # and repeat the select, self.reader.read_message_chunk() return # We were focusing on reading, but nothing to read was there. # Good time to check peer for hold timer. self.timer.check_peer_hold_time(self.timer.snapshot_time) - # Things are quiet on the read front, we can go on and attempt to write. + # Quiet on the read front, we can have attempt to write. if write_list: - # Either we really want to reset peer's view of our hold timer, or there was nothing to read. - if self.writer.sending_message: # We were in the middle of sending a message. - whole = self.writer.send_message_chunk_is_whole() # Was it the end of a message? - if self.prioritize_writing and whole: # We were pressed to send something and we did it. - self.prioritize_writing = False # We prioritize reading again. + # Either we really want to reset peer's view of our hold + # timer, or there was nothing to read. + # Were we in the middle of sending a message? + if self.writer.sending_message: + # Was it the end of a message? + whole = self.writer.send_message_chunk_is_whole() + # We were pressed to send something and we did it. + if self.prioritize_writing and whole: + # We prioritize reading again. + self.prioritize_writing = False return - # Finally, we can look if there is some update message for us to generate. - if self.generator.updates_to_send: + # Finally to check if still update messages to be generated. + if self.generator.remaining_prefixes: msg_out = self.generator.compose_update_message() - if not self.generator.updates_to_send: # We have just finished update generation, end-of-rib is due. - msg_out += self.generator.eor_message + if not self.generator.remaining_prefixes: + # We have just finished update generation, + # end-of-rib is due. + logging.info("All update messages generated.") + logging.info("Storing performance results.") + self.generator.store_results() + logging.info("Finally an END-OF-RIB is going to be sent.") + msg_out += self.generator.update_message(wr_prefixes=[], + nlri_prefixes=[]) self.writer.enqueue_message_for_sending(msg_out) - return # Attempt for the actual sending will be done in next iteration. + # Attempt for real sending to be done in next iteration. + return # Nothing to write anymore, except occasional keepalives. + logging.info("Everything has been done." + + "Now just waiting for possible incomming message.") # To avoid busy loop, we do idle waiting here. self.reader.wait_for_read() return # We can neither read nor write. - print "Input and output both blocked for", self.timer.report_timedelta, "seconds." - # FIXME: Are we sure select has been really waiting the whole period? + logging.warning("Input and output both blocked for " + + str(self.timer.report_timedelta) + " seconds.") + # FIXME: Are we sure select has been really waiting + # the whole period? return def main(): - """Establish BGP connection and enter main loop for sending updates.""" + """ One time initialisation and iterations looping. + + Notes: + Establish BGP connection and run iterations. + """ arguments = parse_arguments() + logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", + level=arguments.loglevel) bgp_socket = establish_connection(arguments) # Initial handshake phase. TODO: Can it be also moved to StateTracker? # Receive open message before sending anything. - # FIXME: Add parameter to send default open message first, to work with "you first" peers. + # FIXME: Add parameter to send default open message first, + # to work with "you first" peers. msg_in = read_open_message(bgp_socket) timer = TimeTracker(msg_in) generator = MessageGenerator(arguments) - msg_out = generator.open_message - # print "DEBUG: going to send open:", binascii.hexlify(msg_out) + msg_out = generator.open_message() + logging.debug("Sending the OPEN message: " + binascii.hexlify(msg_out)) # Send our open message to the peer. bgp_socket.send(msg_out) # Wait for confirming keepalive. # TODO: Surely in just one packet? - msg_in = bgp_socket.recv(19) # Using exact keepalive length to not see possible updates. - if msg_in != generator.keepalive_message: - raise MessageError("Open not confirmed by keepalive, instead got", msg_in) + # Using exact keepalive length to not to see possible updates. + msg_in = bgp_socket.recv(19) + if msg_in != generator.keepalive_message(): + logging.error("Open not confirmed by keepalive, instead got " + + binascii.hexlify(msg_in)) + raise MessageError("Open not confirmed by keepalive, instead got", + msg_in) timer.reset_peer_hold_time() # Send the keepalive to indicate the connection is accepted. timer.snapshot() # Remember this time. - bgp_socket.send(generator.keepalive_message) - timer.reset_my_keepalive_time(timer.snapshot_time) # Use the remembered time. + msg_out = generator.keepalive_message() + logging.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out)) + bgp_socket.send(msg_out) + # Use the remembered time. + timer.reset_my_keepalive_time(timer.snapshot_time) # End of initial handshake phase. state = StateTracker(bgp_socket, generator, timer) while True: # main reactor loop -- 2.36.6