Step 2: Move test folder to root
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
18
19
20 import argparse
21 import binascii
22 import ipaddr
23 import select
24 import socket
25 import time
26
27
28 def parse_arguments():
29     """Use argparse to get arguments, return args object."""
30     parser = argparse.ArgumentParser()
31     # TODO: Should we use --argument-names-with-spaces?
32     str_help = 'Autonomous System number use in the stream (current default as in ODL: 64496).'
33     parser.add_argument('--asnumber', default=64496, type=int, help=str_help)
34     # FIXME: We are acting as iBGP peer, we should mirror AS number from peer's open message.
35     str_help = 'Amount of IP prefixes to generate. Negative number means "practically infinite".'
36     parser.add_argument('--amount', default='1', type=int, help=str_help)
37     str_help = 'The first IPv4 prefix to announce, given as numeric IPv4 address.'
38     parser.add_argument('--firstprefix', default='8.0.1.0', type=ipaddr.IPv4Address, help=str_help)
39     str_help = 'If present, this tool will be listening for connection, instead of initiating it.'
40     parser.add_argument('--listen', action='store_true', help=str_help)
41     str_help = 'Numeric IP Address to bind to and derive BGP ID from. Default value only suitable for listening.'
42     parser.add_argument('--myip', default='0.0.0.0', type=ipaddr.IPv4Address, help=str_help)
43     str_help = 'TCP port to bind to when listening or initiating connection. Default only suitable for initiating.'
44     parser.add_argument('--myport', default='0', type=int, help=str_help)
45     str_help = 'The IP of the next hop to be placed into the update messages.'
46     parser.add_argument('--nexthop', default='192.0.2.1', type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
47     str_help = 'Numeric IP Address to try to connect to. Currently no effect in listening mode.'
48     parser.add_argument('--peerip', default='127.0.0.2', type=ipaddr.IPv4Address, help=str_help)
49     str_help = 'TCP port to try to connect to. No effect in listening mode.'
50     parser.add_argument('--peerport', default='179', type=int, help=str_help)
51     # TODO: The step between IP previxes is currently hardcoded to 16. Should we make it configurable?
52     # Yes, the argument list above is sorted alphabetically.
53     arguments = parser.parse_args()
54     # TODO: Are sanity checks (such as asnumber>=0) required?
55     return arguments
56
57
58 def establish_connection(arguments):
59     """Establish connection according to arguments, return socket."""
60     if arguments.listen:
61         # print "DEBUG: connecting in the listening case."
62         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
63         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
64         listening_socket.bind((str(arguments.myip), arguments.myport))  # bind need single tuple as argument
65         listening_socket.listen(1)
66         bgp_socket, _ = listening_socket.accept()
67         # TODO: Verify client IP is cotroller IP.
68         listening_socket.close()
69     else:
70         # print "DEBUG: connecting in the talking case."
71         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
72         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
73         talking_socket.bind((str(arguments.myip), arguments.myport))  # bind to force specified address and port
74         talking_socket.connect((str(arguments.peerip), arguments.peerport))  # socket does not spead ipaddr, hence str()
75         bgp_socket = talking_socket
76     print 'Connected to ODL.'
77     return bgp_socket
78
79
80 def get_short_int_from_message(message, offset=16):
81     """Extract 2-bytes number from packed string, default offset is for BGP message size."""
82     high_byte_int = ord(message[offset])
83     low_byte_int = ord(message[offset + 1])
84     short_int = high_byte_int * 256 + low_byte_int
85     return short_int
86
87
88 class MessageError(ValueError):
89     """Value error with logging optimized for hexlified messages."""
90
91     def __init__(self, text, message, *args):
92         """Store and call super init for textual comment, store raw message which caused it."""
93         self.text = text
94         self.msg = message
95         super(MessageError, self).__init__(text, message, *args)
96
97     def __str__(self):
98         """
99         Generate human readable error message
100
101         Concatenate text comment, colon with space
102         and hexlified message. Use a placeholder string
103         if the message turns out to be empty.
104         """
105         message = binascii.hexlify(self.msg)
106         if message == "":
107             message = "(empty message)"
108         return self.text + ': ' + message
109
110
111 def read_open_message(bgp_socket):
112     """Receive message, perform some validation, return the raw message."""
113     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
114     # TODO: Is it possible for incoming open message to be split in more than one packet?
115     # Some validation.
116     if len(msg_in) < 37:  # 37 is minimal length of open message with 4-byte AS number.
117         raise MessageError("Got something else than open with 4-byte AS number", msg_in)
118     # TODO: We could check BGP marker, but it is defined only later; decide what to do.
119     reported_length = get_short_int_from_message(msg_in)
120     if len(msg_in) != reported_length:
121         raise MessageError("Message length is not " + reported_length + " in message", msg_in)
122     print "Open message received"
123     return msg_in
124
125
126 class MessageGenerator(object):
127     """Class with methods returning messages and state holding configuration data required to do it properly."""
128
129     # TODO: Define bgp marker as class (constant) variable.
130     def __init__(self, args):
131         """Initialize data according to command-line args."""
132         # Various auxiliary variables.
133         # Hack: 4-byte AS number uses the same "int to packed" encoding as IPv4 addresses.
134         asnumber_4bytes = ipaddr.v4_int_to_packed(args.asnumber)
135         asnumber_2bytes = "\x5b\xa0"  # AS_TRANS value, 23456 decadic.
136         if args.asnumber < 65536:  # AS number is mappable to 2 bytes
137             asnumber_2bytes = asnumber_4bytes[2:4]
138         # From now on, attribute docsrings are used.
139         self.int_nextprefix = int(args.firstprefix)
140         """Prefix IP address for next update message, as integer."""
141         self.updates_to_send = args.amount
142         """Number of update messages left to be sent."""
143         # All information ready, so we can define messages. Mostly copied from play.py by Jozef Behran.
144         # The following attributes are constant.
145         self.bgp_marker = "\xFF" * 16
146         """Every message starts with this, see rfc4271#section-4.1"""
147         self.keepalive_message = self.bgp_marker + (
148             "\x00\x13"  # Size
149             "\x04"  # Type KEEPALIVE
150         )
151         """KeepAlive message, see rfc4271#section-4.4"""
152         # TODO: Notification for hold timer expiration can be handy.
153         self.eor_message = self.bgp_marker + (
154             "\x00\x17"  # Size
155             "\x02"  # Type (UPDATE)
156             "\x00\x00"  # Withdrawn routes length (0)
157             "\x00\x00"  # Total Path Attributes Length (0)
158         )
159         """End-of-RIB marker, see rfc4724#section-2"""
160         self.update_message_without_prefix = self.bgp_marker + (
161             "\x00\x30"  # Size
162             "\x02"  # Type (UPDATE)
163             "\x00\x00"  # Withdrawn routes length (0)
164             "\x00\x14"  # Total Path Attributes Length (20)
165             "\x40"  # Flags ("Well-Known")
166             "\x01"  # Type (ORIGIN)
167             "\x01"  # Length (1)
168             "\x00"  # Origin: IGP
169             "\x40"  # Flags ("Well-Known")
170             "\x02"  # Type (AS_PATH)
171             "\x06"  # Length (6)
172             "\x02"  # AS segment type (AS_SEQUENCE)
173             "\x01"  # AS segment length (1)
174             + asnumber_4bytes +  # AS segment (4 bytes)
175             "\x40"  # Flags ("Well-Known")
176             "\x03"  # Type (NEXT_HOP)
177             "\x04"  # Length (4)
178             + args.nexthop.packed +  # IP address of the next hop (4 bytes)
179             "\x1c"  # IPv4 prefix length, see RFC 4271, page 20. This tool uses Network Mask: 255.255.255.240
180         )
181         """The IP address prefix (4 bytes) has to be appended to complete Update message, see rfc4271#section-4.3."""
182         self.open_message = self.bgp_marker + (
183             "\x00\x2d"  # Size
184             "\x01"  # Type (OPEN)
185             "\x04"  # BGP Varsion (4)
186             + asnumber_2bytes +  # My Autonomous System
187             # FIXME: The following hold time is hardcoded separately. Compute from initial hold_time value.
188             "\x00\xb4"  # Hold Time (180)
189             + args.myip.packed +  # BGP Identifer
190             "\x10"  # Optional parameters length
191             "\x02"  # Param type ("Capability Ad")
192             "\x06"  # Length (6 bytes)
193             "\x01"  # Capability type (NLRI Unicast), see RFC 4760, secton 8
194             "\x04"  # Capability value length
195             "\x00\x01"  # AFI (Ipv4)
196             "\x00"  # (reserved)
197             "\x01"  # SAFI (Unicast)
198             "\x02"  # Param type ("Capability Ad")
199             "\x06"  # Length (6 bytes)
200             "\x41"  # "32 bit AS Numbers Support" (see RFC 6793, section 3)
201             "\x04"  # Capability value length
202             + asnumber_4bytes  # My AS in 32 bit format
203         )
204         """Open message, see rfc4271#section-4.2"""
205         # __init__ ends
206
207     def compose_update_message(self):
208         """Return update message, prepare next prefix, decrease amount without checking it."""
209         prefix_packed = ipaddr.v4_int_to_packed(self.int_nextprefix)
210         # print "DEBUG: prefix", self.int_nextprefix, "packed to", binascii.hexlify(prefix_packed)
211         msg_out = self.update_message_without_prefix + prefix_packed
212         self.int_nextprefix += 16  # Hardcoded, as open message specifies such netmask.
213         self.updates_to_send -= 1
214         return msg_out
215
216
217 class TimeTracker(object):
218     """Class for tracking timers, both for my keepalives and peer's hold time."""
219
220     def __init__(self, msg_in):
221         """Initialize config, based on hardcoded defaults and open message from peer."""
222         # Note: Relative time is always named timedelta, to stress that (non-delta) time is absolute.
223         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
224         """Upper bound for being stuck in the same state, we should at least report something before continuing."""
225         # Negotiate the hold timer by taking the smaller of the 2 values (mine and the peer's).
226         hold_timedelta = 240  # Not an attribute of self yet.
227         # TODO: Make the default value configurable, default value could mirror what peer said.
228         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
229         if hold_timedelta > peer_hold_timedelta:
230             hold_timedelta = peer_hold_timedelta
231         if hold_timedelta != 0 and hold_timedelta < 3:
232             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
233         self.hold_timedelta = hold_timedelta  # only now the final value is visible from outside
234         """If we do not hear from peer this long, we assume it has died."""
235         self.keepalive_timedelta = int(hold_timedelta / 3.0)
236         """Upper limit for duration between messages, to avoid being declared dead."""
237         self.snapshot_time = time.time()  # The same as calling snapshot(), but also declares a field.
238         """Sometimes we need to store time. This is where to get the value from afterwards."""
239         self.peer_hold_time = self.snapshot_time + self.hold_timedelta  # time_keepalive may be too strict
240         """At this time point, peer will be declared dead."""
241         self.my_keepalive_time = None  # to be set later
242         """At this point, we should be sending keepalive message."""
243
244     def snapshot(self):
245         """Store current time in instance data to use later."""
246         self.snapshot_time = time.time()  # Read as time before something interesting was called.
247
248     def reset_peer_hold_time(self):
249         """Move hold time to future as peer has just proven it still lives."""
250         self.peer_hold_time = time.time() + self.hold_timedelta
251
252     # Some methods could rely on self.snapshot_time, but it is better to require user to provide it explicitly.
253     def reset_my_keepalive_time(self, keepalive_time):
254         """Move KA timer to future based on given time from before sending."""
255         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
256
257     def is_time_for_my_keepalive(self):
258         if self.hold_timedelta == 0:
259             return False
260         return self.snapshot_time >= self.my_keepalive_time
261
262     def get_next_event_time(self):
263         if self.hold_timedelta == 0:
264             return self.snapshot_time + 86400
265         return min(self.my_keepalive_time, self.peer_hold_time)
266
267     def check_peer_hold_time(self, snapshot_time):
268         """Raise error if nothing was read from peer until specified time."""
269         if self.hold_timedelta != 0:  # Hold time = 0 means keepalive checking off.
270             if snapshot_time > self.peer_hold_time:  # time.time() may be too strict
271                 raise RuntimeError("Peer has overstepped the hold timer.")  # TODO: Include hold_timedelta?
272                 # TODO: Add notification sending (attempt). That means move to write tracker.
273
274
275 class ReadTracker(object):
276     """Class for tracking read of mesages chunk by chunk and for idle waiting."""
277
278     def __init__(self, bgp_socket, timer):
279         """Set initial state."""
280         # References to outside objects.
281         self.socket = bgp_socket
282         self.timer = timer
283         # Really new fields.
284         self.header_length = 18
285         """BGP marker length plus length field length."""  # TODO: make it class (constant) attribute
286         self.reading_header = True
287         """Computation of where next chunk ends depends on whether we are beyond length field."""
288         self.bytes_to_read = self.header_length
289         """Countdown towards next size computation."""
290         self.msg_in = ""
291         """Incremental buffer for message under read."""
292
293     def read_message_chunk(self):
294         """Read up to one message, do not return anything."""
295         # TODO: We also could return the whole message, but currently nobody cares.
296         # We assume the socket is readable.
297         chunk_message = self.socket.recv(self.bytes_to_read)
298         self.msg_in += chunk_message
299         self.bytes_to_read -= len(chunk_message)
300         if not self.bytes_to_read:  # TODO: bytes_to_read < 0 is not possible, right?
301             # Finished reading a logical block.
302             if self.reading_header:
303                 # The logical block was a BGP header. Now we know size of message.
304                 self.reading_header = False
305                 self.bytes_to_read = get_short_int_from_message(self.msg_in)
306             else:  # We have finished reading the body of the message.
307                 # Peer has just proven it is still alive.
308                 self.timer.reset_peer_hold_time()
309                 # TODO: Do we want to count received messages?
310                 # This version ignores the received message.
311                 # TODO: Should we do validation and exit on anything besides update or keepalive?
312                 # Prepare state for reading another message.
313                 self.msg_in = ""
314                 self.reading_header = True
315                 self.bytes_to_read = self.header_length
316         # We should not act upon peer_hold_time if we are reading something right now.
317         return
318
319     def wait_for_read(self):
320         """When we know there are no more updates to send, we use this to avoid busy-wait."""
321         # First, compute time to first predictable state change (or report event)
322         event_time = self.timer.get_next_event_time()
323         wait_timedelta = event_time - time.time()  # snapshot_time would be imprecise
324         if wait_timedelta < 0:
325             # The program got around to waiting to an event in "very near
326             # future" so late that it became a "past" event, thus tell
327             # "select" to not wait at all. Passing negative timedelta to
328             # select() would lead to either waiting forever (for -1) or
329             # select.error("Invalid parameter") (for everything else).
330             wait_timedelta = 0
331         # And wait for event or something to read.
332         select.select([self.socket], [], [self.socket], wait_timedelta)
333         # Not checking anything, that will be done in next iteration.
334         return
335
336
337 class WriteTracker(object):
338     """Class tracking enqueueing messages and sending chunks of them."""
339
340     def __init__(self, bgp_socket, generator, timer):
341         """Set initial state."""
342         # References to outside objects,
343         self.socket = bgp_socket
344         self.generator = generator
345         self.timer = timer
346         # Really new fields.
347         # TODO: Would attribute docstrings add anything substantial?
348         self.sending_message = False
349         self.bytes_to_send = 0
350         self.msg_out = ""
351
352     def enqueue_message_for_sending(self, message):
353         """Change write state to include the message."""
354         self.msg_out += message
355         self.bytes_to_send += len(message)
356         self.sending_message = True
357
358     def send_message_chunk_is_whole(self):
359         """Perform actions related to sending (chunk of) message, return whether message was completed."""
360         # We assume there is a msg_out to send and socket is writable.
361         # print 'going to send', repr(self.msg_out)
362         self.timer.snapshot()
363         bytes_sent = self.socket.send(self.msg_out)
364         self.msg_out = self.msg_out[bytes_sent:]  # Forget the part of message that was sent.
365         self.bytes_to_send -= bytes_sent
366         if not self.bytes_to_send:
367             # TODO: Is it possible to hit negative bytes_to_send?
368             self.sending_message = False
369             # We should have reset hold timer on peer side.
370             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
371             # Which means the possible reason for not prioritizing reads is gone.
372             return True
373         return False
374
375
376 class StateTracker(object):
377     """Main loop has state so complex it warrants this separate class."""
378
379     def __init__(self, bgp_socket, generator, timer):
380         """Set the initial state according to existing socket and generator."""
381         # References to outside objects.
382         self.socket = bgp_socket
383         self.generator = generator
384         self.timer = timer
385         # Sub-trackers.
386         self.reader = ReadTracker(bgp_socket, timer)
387         self.writer = WriteTracker(bgp_socket, generator, timer)
388         # Prioritization state.
389         self.prioritize_writing = False
390         """
391         In general, we prioritize reading over writing. But in order to not get blocked by neverending reads,
392         we should check whether we are not risking running out of holdtime.
393         So in some situations, this field is set to True to attempt finishing sending a message,
394         after which this field resets back to False.
395         """
396         # TODO: Alternative is to switch fairly between reading and writing (called round robin from now on).
397         # Message counting is done in generator.
398
399     def perform_one_loop_iteration(self):
400         """Calculate priority, resolve all ifs, call appropriate method, return to caller to repeat."""
401         self.timer.snapshot()
402         if not self.prioritize_writing:
403             if self.timer.is_time_for_my_keepalive():
404                 if not self.writer.sending_message:
405                     # We need to schedule a keepalive ASAP.
406                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message)
407                 # We are sending a message now, so prioritize finishing it.
408                 self.prioritize_writing = True
409         # Now we know what our priorities are, we have to check which actions are available.
410         # socket.socket() returns three lists, we store them to list of lists.
411         list_list = select.select([self.socket], [self.socket], [self.socket], self.timer.report_timedelta)
412         read_list, write_list, except_list = list_list
413         # Lists are unpacked, each is either [] or [self.socket], so we will test them as boolean.
414         if except_list:
415             raise RuntimeError("Exceptional state on socket", self.socket)
416         # We will do either read or write.
417         if not (self.prioritize_writing and write_list):
418             # Either we have no reason to rush writes, or the socket is not writable.
419             # We are focusing on reading here.
420             if read_list:  # there is something to read indeed
421                 # In this case we want to read chunk of message and repeat the select,
422                 self.reader.read_message_chunk()
423                 return
424             # We were focusing on reading, but nothing to read was there.
425             # Good time to check peer for hold timer.
426             self.timer.check_peer_hold_time(self.timer.snapshot_time)
427             # Things are quiet on the read front, we can go on and attempt to write.
428         if write_list:
429             # Either we really want to reset peer's view of our hold timer, or there was nothing to read.
430             if self.writer.sending_message:  # We were in the middle of sending a message.
431                 whole = self.writer.send_message_chunk_is_whole()  # Was it the end of a message?
432                 if self.prioritize_writing and whole:  # We were pressed to send something and we did it.
433                     self.prioritize_writing = False  # We prioritize reading again.
434                 return
435             # Finally, we can look if there is some update message for us to generate.
436             if self.generator.updates_to_send:
437                 msg_out = self.generator.compose_update_message()
438                 if not self.generator.updates_to_send:  # We have just finished update generation, end-of-rib is due.
439                     msg_out += self.generator.eor_message
440                 self.writer.enqueue_message_for_sending(msg_out)
441                 return  # Attempt for the actual sending will be done in next iteration.
442             # Nothing to write anymore, except occasional keepalives.
443             # To avoid busy loop, we do idle waiting here.
444             self.reader.wait_for_read()
445             return
446         # We can neither read nor write.
447         print 'Input and output both blocked for', self.timer.report_timedelta, 'seconds.'
448         # FIXME: Are we sure select has been really waiting the whole period?
449         return
450
451
452 def main():
453     """Establish BGP connection and enter main loop for sending updates."""
454     arguments = parse_arguments()
455     bgp_socket = establish_connection(arguments)
456     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
457     # Receive open message before sending anything.
458     # FIXME: Add parameter to send default open message first, to work with "you first" peers.
459     msg_in = read_open_message(bgp_socket)
460     timer = TimeTracker(msg_in)
461     generator = MessageGenerator(arguments)
462     msg_out = generator.open_message
463     # print "DEBUG: going to send open:", binascii.hexlify(msg_out)
464     # Send our open message to the peer.
465     bgp_socket.send(msg_out)
466     # Wait for confirming keepalive.
467     # TODO: Surely in just one packet?
468     msg_in = bgp_socket.recv(19)  # Using exact keepalive length to not see possible updates.
469     if msg_in != generator.keepalive_message:
470         raise MessageError("Open not confirmed by keepalive, instead got", msg_in)
471     timer.reset_peer_hold_time()
472     # Send the keepalive to indicate the connection is accepted.
473     timer.snapshot()  # Remember this time.
474     bgp_socket.send(generator.keepalive_message)
475     timer.reset_my_keepalive_time(timer.snapshot_time)  # Use the remembered time.
476     # End of initial handshake phase.
477     state = StateTracker(bgp_socket, generator, timer)
478     while True:  # main reactor loop
479         state.perform_one_loop_iteration()
480
481 if __name__ == "__main__":
482     main()