X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=test%2Ftools%2Ffastbgp%2Fplay.py;fp=test%2Ftools%2Ffastbgp%2Fplay.py;h=0000000000000000000000000000000000000000;hb=59e81c38620fa1b61e15771191e35771450b9499;hp=4412616675131f96a2ecf9fb0073fc5205a209a0;hpb=072f6e3a8d1bdf8f4c663843589c22d93ba07791;p=integration%2Ftest.git diff --git a/test/tools/fastbgp/play.py b/test/tools/fastbgp/play.py deleted file mode 100644 index 4412616675..0000000000 --- a/test/tools/fastbgp/play.py +++ /dev/null @@ -1,482 +0,0 @@ -"""Utility for playing generated BGP data to ODL. - -It needs to be run with sudo-able user when you want to use ports below 1024 -as --myip. This utility is used to avoid excessive waiting times which EXABGP -exhibits when used with huge router tables and also avoid the memory cost of -EXABGP in this type of scenario.""" - -# Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. -# -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution, -# and is available at http://www.eclipse.org/legal/epl-v10.html - -__author__ = "Vratko Polak" -__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc." -__license__ = "Eclipse Public License v1.0" -__email__ = "vrpolak@cisco.com" - - -import argparse -import binascii -import ipaddr -import select -import socket -import time - - -def parse_arguments(): - """Use argparse to get arguments, return args object.""" - parser = argparse.ArgumentParser() - # TODO: Should we use --argument-names-with-spaces? - str_help = 'Autonomous System number use in the stream (current default as in ODL: 64496).' - parser.add_argument('--asnumber', default=64496, type=int, help=str_help) - # FIXME: We are acting as iBGP peer, we should mirror AS number from peer's open message. - str_help = 'Amount of IP prefixes to generate. Negative number means "practically infinite".' - parser.add_argument('--amount', default='1', type=int, help=str_help) - str_help = 'The first IPv4 prefix to announce, given as numeric IPv4 address.' - parser.add_argument('--firstprefix', default='8.0.1.0', type=ipaddr.IPv4Address, help=str_help) - str_help = 'If present, this tool will be listening for connection, instead of initiating it.' - parser.add_argument('--listen', action='store_true', help=str_help) - str_help = 'Numeric IP Address to bind to and derive BGP ID from. Default value only suitable for listening.' - parser.add_argument('--myip', default='0.0.0.0', type=ipaddr.IPv4Address, help=str_help) - str_help = 'TCP port to bind to when listening or initiating connection. Default only suitable for initiating.' - parser.add_argument('--myport', default='0', type=int, help=str_help) - str_help = 'The IP of the next hop to be placed into the update messages.' - parser.add_argument('--nexthop', default='192.0.2.1', type=ipaddr.IPv4Address, dest="nexthop", help=str_help) - str_help = 'Numeric IP Address to try to connect to. Currently no effect in listening mode.' - parser.add_argument('--peerip', default='127.0.0.2', type=ipaddr.IPv4Address, help=str_help) - str_help = 'TCP port to try to connect to. No effect in listening mode.' - parser.add_argument('--peerport', default='179', type=int, help=str_help) - # TODO: The step between IP previxes is currently hardcoded to 16. Should we make it configurable? - # Yes, the argument list above is sorted alphabetically. - arguments = parser.parse_args() - # TODO: Are sanity checks (such as asnumber>=0) required? - return arguments - - -def establish_connection(arguments): - """Establish connection according to arguments, return socket.""" - if arguments.listen: - # print "DEBUG: connecting in the listening case." - listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listening_socket.bind((str(arguments.myip), arguments.myport)) # bind need single tuple as argument - listening_socket.listen(1) - bgp_socket, _ = listening_socket.accept() - # TODO: Verify client IP is cotroller IP. - listening_socket.close() - else: - # print "DEBUG: connecting in the talking case." - talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - talking_socket.bind((str(arguments.myip), arguments.myport)) # bind to force specified address and port - talking_socket.connect((str(arguments.peerip), arguments.peerport)) # socket does not spead ipaddr, hence str() - bgp_socket = talking_socket - print 'Connected to ODL.' - return bgp_socket - - -def get_short_int_from_message(message, offset=16): - """Extract 2-bytes number from packed string, default offset is for BGP message size.""" - high_byte_int = ord(message[offset]) - low_byte_int = ord(message[offset + 1]) - short_int = high_byte_int * 256 + low_byte_int - return short_int - - -class MessageError(ValueError): - """Value error with logging optimized for hexlified messages.""" - - def __init__(self, text, message, *args): - """Store and call super init for textual comment, store raw message which caused it.""" - self.text = text - self.msg = message - super(MessageError, self).__init__(text, message, *args) - - def __str__(self): - """ - Generate human readable error message - - Concatenate text comment, colon with space - and hexlified message. Use a placeholder string - if the message turns out to be empty. - """ - message = binascii.hexlify(self.msg) - if message == "": - message = "(empty message)" - return self.text + ': ' + message - - -def read_open_message(bgp_socket): - """Receive message, perform some validation, return the raw message.""" - msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe? - # TODO: Is it possible for incoming open message to be split in more than one packet? - # Some validation. - if len(msg_in) < 37: # 37 is minimal length of open message with 4-byte AS number. - raise MessageError("Got something else than open with 4-byte AS number", msg_in) - # TODO: We could check BGP marker, but it is defined only later; decide what to do. - reported_length = get_short_int_from_message(msg_in) - if len(msg_in) != reported_length: - raise MessageError("Message length is not " + reported_length + " in message", msg_in) - print "Open message received" - return msg_in - - -class MessageGenerator(object): - """Class with methods returning messages and state holding configuration data required to do it properly.""" - - # TODO: Define bgp marker as class (constant) variable. - def __init__(self, args): - """Initialize data according to command-line args.""" - # Various auxiliary variables. - # Hack: 4-byte AS number uses the same "int to packed" encoding as IPv4 addresses. - asnumber_4bytes = ipaddr.v4_int_to_packed(args.asnumber) - asnumber_2bytes = "\x5b\xa0" # AS_TRANS value, 23456 decadic. - if args.asnumber < 65536: # AS number is mappable to 2 bytes - asnumber_2bytes = asnumber_4bytes[2:4] - # From now on, attribute docsrings are used. - self.int_nextprefix = int(args.firstprefix) - """Prefix IP address for next update message, as integer.""" - self.updates_to_send = args.amount - """Number of update messages left to be sent.""" - # All information ready, so we can define messages. Mostly copied from play.py by Jozef Behran. - # The following attributes are constant. - self.bgp_marker = "\xFF" * 16 - """Every message starts with this, see rfc4271#section-4.1""" - self.keepalive_message = self.bgp_marker + ( - "\x00\x13" # Size - "\x04" # Type KEEPALIVE - ) - """KeepAlive message, see rfc4271#section-4.4""" - # TODO: Notification for hold timer expiration can be handy. - self.eor_message = self.bgp_marker + ( - "\x00\x17" # Size - "\x02" # Type (UPDATE) - "\x00\x00" # Withdrawn routes length (0) - "\x00\x00" # Total Path Attributes Length (0) - ) - """End-of-RIB marker, see rfc4724#section-2""" - self.update_message_without_prefix = self.bgp_marker + ( - "\x00\x30" # Size - "\x02" # Type (UPDATE) - "\x00\x00" # Withdrawn routes length (0) - "\x00\x14" # Total Path Attributes Length (20) - "\x40" # Flags ("Well-Known") - "\x01" # Type (ORIGIN) - "\x01" # Length (1) - "\x00" # Origin: IGP - "\x40" # Flags ("Well-Known") - "\x02" # Type (AS_PATH) - "\x06" # Length (6) - "\x02" # AS segment type (AS_SEQUENCE) - "\x01" # AS segment length (1) - + asnumber_4bytes + # AS segment (4 bytes) - "\x40" # Flags ("Well-Known") - "\x03" # Type (NEXT_HOP) - "\x04" # Length (4) - + args.nexthop.packed + # IP address of the next hop (4 bytes) - "\x1c" # IPv4 prefix length, see RFC 4271, page 20. This tool uses Network Mask: 255.255.255.240 - ) - """The IP address prefix (4 bytes) has to be appended to complete Update message, see rfc4271#section-4.3.""" - self.open_message = self.bgp_marker + ( - "\x00\x2d" # Size - "\x01" # Type (OPEN) - "\x04" # BGP Varsion (4) - + asnumber_2bytes + # My Autonomous System - # FIXME: The following hold time is hardcoded separately. Compute from initial hold_time value. - "\x00\xb4" # Hold Time (180) - + args.myip.packed + # BGP Identifer - "\x10" # Optional parameters length - "\x02" # Param type ("Capability Ad") - "\x06" # Length (6 bytes) - "\x01" # Capability type (NLRI Unicast), see RFC 4760, secton 8 - "\x04" # Capability value length - "\x00\x01" # AFI (Ipv4) - "\x00" # (reserved) - "\x01" # SAFI (Unicast) - "\x02" # Param type ("Capability Ad") - "\x06" # Length (6 bytes) - "\x41" # "32 bit AS Numbers Support" (see RFC 6793, section 3) - "\x04" # Capability value length - + asnumber_4bytes # My AS in 32 bit format - ) - """Open message, see rfc4271#section-4.2""" - # __init__ ends - - def compose_update_message(self): - """Return update message, prepare next prefix, decrease amount without checking it.""" - prefix_packed = ipaddr.v4_int_to_packed(self.int_nextprefix) - # print "DEBUG: prefix", self.int_nextprefix, "packed to", binascii.hexlify(prefix_packed) - msg_out = self.update_message_without_prefix + prefix_packed - self.int_nextprefix += 16 # Hardcoded, as open message specifies such netmask. - self.updates_to_send -= 1 - return msg_out - - -class TimeTracker(object): - """Class for tracking timers, both for my keepalives and peer's hold time.""" - - def __init__(self, msg_in): - """Initialize config, based on hardcoded defaults and open message from peer.""" - # Note: Relative time is always named timedelta, to stress that (non-delta) time is absolute. - self.report_timedelta = 1.0 # In seconds. TODO: Configurable? - """Upper bound for being stuck in the same state, we should at least report something before continuing.""" - # Negotiate the hold timer by taking the smaller of the 2 values (mine and the peer's). - hold_timedelta = 240 # Not an attribute of self yet. - # TODO: Make the default value configurable, default value could mirror what peer said. - peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22) - if hold_timedelta > peer_hold_timedelta: - hold_timedelta = peer_hold_timedelta - if hold_timedelta != 0 and hold_timedelta < 3: - raise ValueError("Invalid hold timedelta value: ", hold_timedelta) - self.hold_timedelta = hold_timedelta # only now the final value is visible from outside - """If we do not hear from peer this long, we assume it has died.""" - self.keepalive_timedelta = int(hold_timedelta / 3.0) - """Upper limit for duration between messages, to avoid being declared dead.""" - self.snapshot_time = time.time() # The same as calling snapshot(), but also declares a field. - """Sometimes we need to store time. This is where to get the value from afterwards.""" - self.peer_hold_time = self.snapshot_time + self.hold_timedelta # time_keepalive may be too strict - """At this time point, peer will be declared dead.""" - self.my_keepalive_time = None # to be set later - """At this point, we should be sending keepalive message.""" - - def snapshot(self): - """Store current time in instance data to use later.""" - self.snapshot_time = time.time() # Read as time before something interesting was called. - - def reset_peer_hold_time(self): - """Move hold time to future as peer has just proven it still lives.""" - self.peer_hold_time = time.time() + self.hold_timedelta - - # Some methods could rely on self.snapshot_time, but it is better to require user to provide it explicitly. - def reset_my_keepalive_time(self, keepalive_time): - """Move KA timer to future based on given time from before sending.""" - self.my_keepalive_time = keepalive_time + self.keepalive_timedelta - - def is_time_for_my_keepalive(self): - if self.hold_timedelta == 0: - return False - return self.snapshot_time >= self.my_keepalive_time - - def get_next_event_time(self): - if self.hold_timedelta == 0: - return self.snapshot_time + 86400 - return min(self.my_keepalive_time, self.peer_hold_time) - - def check_peer_hold_time(self, snapshot_time): - """Raise error if nothing was read from peer until specified time.""" - if self.hold_timedelta != 0: # Hold time = 0 means keepalive checking off. - if snapshot_time > self.peer_hold_time: # time.time() may be too strict - raise RuntimeError("Peer has overstepped the hold timer.") # TODO: Include hold_timedelta? - # TODO: Add notification sending (attempt). That means move to write tracker. - - -class ReadTracker(object): - """Class for tracking read of mesages chunk by chunk and for idle waiting.""" - - def __init__(self, bgp_socket, timer): - """Set initial state.""" - # References to outside objects. - self.socket = bgp_socket - self.timer = timer - # Really new fields. - self.header_length = 18 - """BGP marker length plus length field length.""" # TODO: make it class (constant) attribute - self.reading_header = True - """Computation of where next chunk ends depends on whether we are beyond length field.""" - self.bytes_to_read = self.header_length - """Countdown towards next size computation.""" - self.msg_in = "" - """Incremental buffer for message under read.""" - - def read_message_chunk(self): - """Read up to one message, do not return anything.""" - # TODO: We also could return the whole message, but currently nobody cares. - # We assume the socket is readable. - chunk_message = self.socket.recv(self.bytes_to_read) - self.msg_in += chunk_message - self.bytes_to_read -= len(chunk_message) - if not self.bytes_to_read: # TODO: bytes_to_read < 0 is not possible, right? - # Finished reading a logical block. - if self.reading_header: - # The logical block was a BGP header. Now we know size of message. - self.reading_header = False - self.bytes_to_read = get_short_int_from_message(self.msg_in) - else: # We have finished reading the body of the message. - # Peer has just proven it is still alive. - self.timer.reset_peer_hold_time() - # TODO: Do we want to count received messages? - # This version ignores the received message. - # TODO: Should we do validation and exit on anything besides update or keepalive? - # Prepare state for reading another message. - self.msg_in = "" - self.reading_header = True - self.bytes_to_read = self.header_length - # We should not act upon peer_hold_time if we are reading something right now. - return - - def wait_for_read(self): - """When we know there are no more updates to send, we use this to avoid busy-wait.""" - # First, compute time to first predictable state change (or report event) - event_time = self.timer.get_next_event_time() - wait_timedelta = event_time - time.time() # snapshot_time would be imprecise - if wait_timedelta < 0: - # The program got around to waiting to an event in "very near - # future" so late that it became a "past" event, thus tell - # "select" to not wait at all. Passing negative timedelta to - # select() would lead to either waiting forever (for -1) or - # select.error("Invalid parameter") (for everything else). - wait_timedelta = 0 - # And wait for event or something to read. - select.select([self.socket], [], [self.socket], wait_timedelta) - # Not checking anything, that will be done in next iteration. - return - - -class WriteTracker(object): - """Class tracking enqueueing messages and sending chunks of them.""" - - def __init__(self, bgp_socket, generator, timer): - """Set initial state.""" - # References to outside objects, - self.socket = bgp_socket - self.generator = generator - self.timer = timer - # Really new fields. - # TODO: Would attribute docstrings add anything substantial? - self.sending_message = False - self.bytes_to_send = 0 - self.msg_out = "" - - def enqueue_message_for_sending(self, message): - """Change write state to include the message.""" - self.msg_out += message - self.bytes_to_send += len(message) - self.sending_message = True - - def send_message_chunk_is_whole(self): - """Perform actions related to sending (chunk of) message, return whether message was completed.""" - # We assume there is a msg_out to send and socket is writable. - # print 'going to send', repr(self.msg_out) - self.timer.snapshot() - bytes_sent = self.socket.send(self.msg_out) - self.msg_out = self.msg_out[bytes_sent:] # Forget the part of message that was sent. - self.bytes_to_send -= bytes_sent - if not self.bytes_to_send: - # TODO: Is it possible to hit negative bytes_to_send? - self.sending_message = False - # We should have reset hold timer on peer side. - self.timer.reset_my_keepalive_time(self.timer.snapshot_time) - # Which means the possible reason for not prioritizing reads is gone. - return True - return False - - -class StateTracker(object): - """Main loop has state so complex it warrants this separate class.""" - - def __init__(self, bgp_socket, generator, timer): - """Set the initial state according to existing socket and generator.""" - # References to outside objects. - self.socket = bgp_socket - self.generator = generator - self.timer = timer - # Sub-trackers. - self.reader = ReadTracker(bgp_socket, timer) - self.writer = WriteTracker(bgp_socket, generator, timer) - # Prioritization state. - self.prioritize_writing = False - """ - In general, we prioritize reading over writing. But in order to not get blocked by neverending reads, - we should check whether we are not risking running out of holdtime. - So in some situations, this field is set to True to attempt finishing sending a message, - after which this field resets back to False. - """ - # TODO: Alternative is to switch fairly between reading and writing (called round robin from now on). - # Message counting is done in generator. - - def perform_one_loop_iteration(self): - """Calculate priority, resolve all ifs, call appropriate method, return to caller to repeat.""" - self.timer.snapshot() - if not self.prioritize_writing: - if self.timer.is_time_for_my_keepalive(): - if not self.writer.sending_message: - # We need to schedule a keepalive ASAP. - self.writer.enqueue_message_for_sending(self.generator.keepalive_message) - # We are sending a message now, so prioritize finishing it. - self.prioritize_writing = True - # Now we know what our priorities are, we have to check which actions are available. - # socket.socket() returns three lists, we store them to list of lists. - list_list = select.select([self.socket], [self.socket], [self.socket], self.timer.report_timedelta) - read_list, write_list, except_list = list_list - # Lists are unpacked, each is either [] or [self.socket], so we will test them as boolean. - if except_list: - raise RuntimeError("Exceptional state on socket", self.socket) - # We will do either read or write. - if not (self.prioritize_writing and write_list): - # Either we have no reason to rush writes, or the socket is not writable. - # We are focusing on reading here. - if read_list: # there is something to read indeed - # In this case we want to read chunk of message and repeat the select, - self.reader.read_message_chunk() - return - # We were focusing on reading, but nothing to read was there. - # Good time to check peer for hold timer. - self.timer.check_peer_hold_time(self.timer.snapshot_time) - # Things are quiet on the read front, we can go on and attempt to write. - if write_list: - # Either we really want to reset peer's view of our hold timer, or there was nothing to read. - if self.writer.sending_message: # We were in the middle of sending a message. - whole = self.writer.send_message_chunk_is_whole() # Was it the end of a message? - if self.prioritize_writing and whole: # We were pressed to send something and we did it. - self.prioritize_writing = False # We prioritize reading again. - return - # Finally, we can look if there is some update message for us to generate. - if self.generator.updates_to_send: - msg_out = self.generator.compose_update_message() - if not self.generator.updates_to_send: # We have just finished update generation, end-of-rib is due. - msg_out += self.generator.eor_message - self.writer.enqueue_message_for_sending(msg_out) - return # Attempt for the actual sending will be done in next iteration. - # Nothing to write anymore, except occasional keepalives. - # To avoid busy loop, we do idle waiting here. - self.reader.wait_for_read() - return - # We can neither read nor write. - print 'Input and output both blocked for', self.timer.report_timedelta, 'seconds.' - # FIXME: Are we sure select has been really waiting the whole period? - return - - -def main(): - """Establish BGP connection and enter main loop for sending updates.""" - arguments = parse_arguments() - bgp_socket = establish_connection(arguments) - # Initial handshake phase. TODO: Can it be also moved to StateTracker? - # Receive open message before sending anything. - # FIXME: Add parameter to send default open message first, to work with "you first" peers. - msg_in = read_open_message(bgp_socket) - timer = TimeTracker(msg_in) - generator = MessageGenerator(arguments) - msg_out = generator.open_message - # print "DEBUG: going to send open:", binascii.hexlify(msg_out) - # Send our open message to the peer. - bgp_socket.send(msg_out) - # Wait for confirming keepalive. - # TODO: Surely in just one packet? - msg_in = bgp_socket.recv(19) # Using exact keepalive length to not see possible updates. - if msg_in != generator.keepalive_message: - raise MessageError("Open not confirmed by keepalive, instead got", msg_in) - timer.reset_peer_hold_time() - # Send the keepalive to indicate the connection is accepted. - timer.snapshot() # Remember this time. - bgp_socket.send(generator.keepalive_message) - timer.reset_my_keepalive_time(timer.snapshot_time) # Use the remembered time. - # End of initial handshake phase. - state = StateTracker(bgp_socket, generator, timer) - while True: # main reactor loop - state.perform_one_loop_iteration() - -if __name__ == "__main__": - main()