BGP application peer functional test suite
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
18
19 import argparse
20 import binascii
21 import ipaddr
22 import select
23 import socket
24 import time
25 import logging
26 import struct
27
28
29 def parse_arguments():
30     """Use argparse to get arguments,
31
32     Returns:
33         :return: args object.
34     """
35     parser = argparse.ArgumentParser()
36     # TODO: Should we use --argument-names-with-spaces?
37     str_help = "Autonomous System number use in the stream."
38     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
39     # FIXME: We are acting as iBGP peer,
40     # we should mirror AS number from peer's open message.
41     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
42     parser.add_argument("--amount", default="1", type=int, help=str_help)
43     str_help = "Maximum number of IP prefixes to be announced in one iteration"
44     parser.add_argument("--insert", default="1", type=int, help=str_help)
45     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
46     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
47     str_help = "The number of prefixes to process without withdrawals"
48     parser.add_argument("--prefill", default="0", type=int, help=str_help)
49     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
50     parser.add_argument("--updates", choices=["single", "separate"],
51                         default=["separate"], help=str_help)
52     str_help = "Base prefix IP address for prefix generation"
53     parser.add_argument("--firstprefix", default="8.0.1.0",
54                         type=ipaddr.IPv4Address, help=str_help)
55     str_help = "The prefix length."
56     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
57     str_help = "Listen for connection, instead of initiating it."
58     parser.add_argument("--listen", action="store_true", help=str_help)
59     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
60                 "Default value only suitable for listening.")
61     parser.add_argument("--myip", default="0.0.0.0",
62                         type=ipaddr.IPv4Address, help=str_help)
63     str_help = ("TCP port to bind to when listening or initiating connection." +
64                 "Default only suitable for initiating.")
65     parser.add_argument("--myport", default="0", type=int, help=str_help)
66     str_help = "The IP of the next hop to be placed into the update messages."
67     parser.add_argument("--nexthop", default="192.0.2.1",
68                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
69     str_help = ("Numeric IP Address to try to connect to." +
70                 "Currently no effect in listening mode.")
71     parser.add_argument("--peerip", default="127.0.0.2",
72                         type=ipaddr.IPv4Address, help=str_help)
73     str_help = "TCP port to try to connect to. No effect in listening mode."
74     parser.add_argument("--peerport", default="179", type=int, help=str_help)
75     str_help = "Local hold time."
76     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
77     str_help = "Log level (--error, --warning, --info, --debug)"
78     parser.add_argument("--error", dest="loglevel", action="store_const",
79                         const=logging.ERROR, default=logging.INFO,
80                         help=str_help)
81     parser.add_argument("--warning", dest="loglevel", action="store_const",
82                         const=logging.WARNING, default=logging.INFO,
83                         help=str_help)
84     parser.add_argument("--info", dest="loglevel", action="store_const",
85                         const=logging.INFO, default=logging.INFO,
86                         help=str_help)
87     parser.add_argument("--debug", dest="loglevel", action="store_const",
88                         const=logging.DEBUG, default=logging.INFO,
89                         help=str_help)
90     str_help = "Log file name"
91     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
92     str_help = "Trailing part of the csv result files for plotting purposes"
93     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
94     str_help = "Minimum number of updates to reach to include result into csv."
95     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
96     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
97     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
98     arguments = parser.parse_args()
99     # TODO: Are sanity checks (such as asnumber>=0) required?
100     return arguments
101
102
103 def establish_connection(arguments):
104     """Establish connection to BGP peer.
105
106     Arguments:
107         :arguments: following command-line argumets are used
108             - arguments.myip: local IP address
109             - arguments.myport: local port
110             - arguments.peerip: remote IP address
111             - arguments.peerport: remote port
112     Returns:
113         :return: socket.
114     """
115     if arguments.listen:
116         logger.info("Connecting in the listening mode.")
117         logger.debug("Local IP address: " + str(arguments.myip))
118         logger.debug("Local port: " + str(arguments.myport))
119         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
120         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
121         # bind need single tuple as argument
122         listening_socket.bind((str(arguments.myip), arguments.myport))
123         listening_socket.listen(1)
124         bgp_socket, _ = listening_socket.accept()
125         # TODO: Verify client IP is cotroller IP.
126         listening_socket.close()
127     else:
128         logger.info("Connecting in the talking mode.")
129         logger.debug("Local IP address: " + str(arguments.myip))
130         logger.debug("Local port: " + str(arguments.myport))
131         logger.debug("Remote IP address: " + str(arguments.peerip))
132         logger.debug("Remote port: " + str(arguments.peerport))
133         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
134         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
135         # bind to force specified address and port
136         talking_socket.bind((str(arguments.myip), arguments.myport))
137         # socket does not spead ipaddr, hence str()
138         talking_socket.connect((str(arguments.peerip), arguments.peerport))
139         bgp_socket = talking_socket
140     logger.info("Connected to ODL.")
141     return bgp_socket
142
143
144 def get_short_int_from_message(message, offset=16):
145     """Extract 2-bytes number from provided message.
146
147     Arguments:
148         :message: given message
149         :offset: offset of the short_int inside the message
150     Returns:
151         :return: required short_inf value.
152     Notes:
153         default offset value is the BGP message size offset.
154     """
155     high_byte_int = ord(message[offset])
156     low_byte_int = ord(message[offset + 1])
157     short_int = high_byte_int * 256 + low_byte_int
158     return short_int
159
160
161 def get_prefix_list_from_hex(prefixes_hex):
162     """Get decoded list of prefixes (rfc4271#section-4.3)
163
164     Arguments:
165         :prefixes_hex: list of prefixes to be decoded in hex
166     Returns:
167         :return: list of prefixes in the form of ip address (X.X.X.X/X)
168     """
169     prefix_list = []
170     offset = 0
171     while offset < len(prefixes_hex):
172         prefix_bit_len_hex = prefixes_hex[offset]
173         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
174         prefix_len = ((prefix_bit_len - 1) / 8) + 1
175         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
176         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
177         offset += 1 + prefix_len
178         prefix_list.append(prefix + "/" + str(prefix_bit_len))
179     return prefix_list
180
181
182 class MessageError(ValueError):
183     """Value error with logging optimized for hexlified messages."""
184
185     def __init__(self, text, message, *args):
186         """Initialisation.
187
188         Store and call super init for textual comment,
189         store raw message which caused it.
190         """
191         self.text = text
192         self.msg = message
193         super(MessageError, self).__init__(text, message, *args)
194
195     def __str__(self):
196         """Generate human readable error message.
197
198         Returns:
199             :return: human readable message as string
200         Notes:
201             Use a placeholder string if the message is to be empty.
202         """
203         message = binascii.hexlify(self.msg)
204         if message == "":
205             message = "(empty message)"
206         return self.text + ": " + message
207
208
209 def read_open_message(bgp_socket):
210     """Receive peer's OPEN message
211
212     Arguments:
213         :bgp_socket: the socket to be read
214     Returns:
215         :return: received OPEN message.
216     Notes:
217         Performs just basic incomming message checks
218     """
219     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
220     # TODO: Can the incoming open message be split in more than one packet?
221     # Some validation.
222     if len(msg_in) < 37:
223         # 37 is minimal length of open message with 4-byte AS number.
224         logger.error("Got something else than open with 4-byte AS number: " +
225                      binascii.hexlify(msg_in))
226         raise MessageError("Got something else than open with 4-byte AS number", msg_in)
227     # TODO: We could check BGP marker, but it is defined only later;
228     # decide what to do.
229     reported_length = get_short_int_from_message(msg_in)
230     if len(msg_in) != reported_length:
231         logger.error("Message length is not " + str(reported_length) +
232                      " as stated in " + binascii.hexlify(msg_in))
233         raise MessageError("Message length is not " + reported_length +
234                            " as stated in ", msg_in)
235     logger.info("Open message received.")
236     return msg_in
237
238
239 class MessageGenerator(object):
240     """Class which generates messages, holds states and configuration values."""
241
242     # TODO: Define bgp marker as a class (constant) variable.
243     def __init__(self, args):
244         """Initialisation according to command-line args.
245
246         Arguments:
247             :args: argsparser's Namespace object which contains command-line
248                 options for MesageGenerator initialisation
249         Notes:
250             Calculates and stores default values used later on for
251             message geeration.
252         """
253         self.total_prefix_amount = args.amount
254         # Number of update messages left to be sent.
255         self.remaining_prefixes = self.total_prefix_amount
256
257         # New parameters initialisation
258         self.iteration = 0
259         self.prefix_base_default = args.firstprefix
260         self.prefix_length_default = args.prefixlen
261         self.wr_prefixes_default = []
262         self.nlri_prefixes_default = []
263         self.version_default = 4
264         self.my_autonomous_system_default = args.asnumber
265         self.hold_time_default = args.holdtime  # Local hold time.
266         self.bgp_identifier_default = int(args.myip)
267         self.next_hop_default = args.nexthop
268         self.single_update_default = args.updates == "single"
269         self.randomize_updates_default = args.updates == "random"
270         self.prefix_count_to_add_default = args.insert
271         self.prefix_count_to_del_default = args.withdraw
272         if self.prefix_count_to_del_default < 0:
273             self.prefix_count_to_del_default = 0
274         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
275             # total number of prefixes must grow to avoid infinite test loop
276             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
277         self.slot_size_default = self.prefix_count_to_add_default
278         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
279         self.results_file_name_default = args.results
280         self.performance_threshold_default = args.threshold
281         self.rfc4760 = args.rfc4760 == "yes"
282         # Default values used for randomized part
283         s1_slots = ((self.total_prefix_amount -
284                      self.remaining_prefixes_threshold - 1) /
285                     self.prefix_count_to_add_default + 1)
286         s2_slots = ((self.remaining_prefixes_threshold - 1) /
287                     (self.prefix_count_to_add_default -
288                     self.prefix_count_to_del_default) + 1)
289         # S1_First_Index = 0
290         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
291         s2_first_index = s1_slots * self.prefix_count_to_add_default
292         s2_last_index = (s2_first_index +
293                          s2_slots * (self.prefix_count_to_add_default -
294                                      self.prefix_count_to_del_default) - 1)
295         self.slot_gap_default = ((self.total_prefix_amount -
296                                   self.remaining_prefixes_threshold - 1) /
297                                  self.prefix_count_to_add_default + 1)
298         self.randomize_lowest_default = s2_first_index
299         self.randomize_highest_default = s2_last_index
300
301         # Initialising counters
302         self.phase1_start_time = 0
303         self.phase1_stop_time = 0
304         self.phase2_start_time = 0
305         self.phase2_stop_time = 0
306         self.phase1_updates_sent = 0
307         self.phase2_updates_sent = 0
308         self.updates_sent = 0
309
310         self.log_info = args.loglevel <= logging.INFO
311         self.log_debug = args.loglevel <= logging.DEBUG
312         """
313         Flags needed for the MessageGenerator performance optimization.
314         Calling logger methods each iteration even with proper log level set
315         slows down significantly the MessageGenerator performance.
316         Measured total generation time (1M updates, dry run, error log level):
317         - logging based on basic logger features: 36,2s
318         - logging based on advanced logger features (lazy logging): 21,2s
319         - conditional calling of logger methods enclosed inside condition: 8,6s
320         """
321
322         logger.info("Generator initialisation")
323         logger.info("  Target total number of prefixes to be introduced: " +
324                     str(self.total_prefix_amount))
325         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
326                     str(self.prefix_length_default))
327         logger.info("  My Autonomous System number: " +
328                     str(self.my_autonomous_system_default))
329         logger.info("  My Hold Time: " + str(self.hold_time_default))
330         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
331         logger.info("  Next Hop: " + str(self.next_hop_default))
332         logger.info("  Prefix count to be inserted at once: " +
333                     str(self.prefix_count_to_add_default))
334         logger.info("  Prefix count to be withdrawn at once: " +
335                     str(self.prefix_count_to_del_default))
336         logger.info("  Fast pre-fill up to " +
337                     str(self.total_prefix_amount -
338                         self.remaining_prefixes_threshold) + " prefixes")
339         logger.info("  Remaining number of prefixes to be processed " +
340                     "in parallel with withdrawals: " +
341                     str(self.remaining_prefixes_threshold))
342         logger.debug("  Prefix index range used after pre-fill procedure [" +
343                      str(self.randomize_lowest_default) + ", " +
344                      str(self.randomize_highest_default) + "]")
345         if self.single_update_default:
346             logger.info("  Common single UPDATE will be generated " +
347                         "for both NLRI & WITHDRAWN lists")
348         else:
349             logger.info("  Two separate UPDATEs will be generated " +
350                         "for each NLRI & WITHDRAWN lists")
351         if self.randomize_updates_default:
352             logger.info("  Generation of UPDATE messages will be randomized")
353         logger.info("  Let\'s go ...\n")
354
355         # TODO: Notification for hold timer expiration can be handy.
356
357     def store_results(self, file_name=None, threshold=None):
358         """ Stores specified results into files based on file_name value.
359
360         Arguments:
361             :param file_name: Trailing (common) part of result file names
362             :param threshold: Minimum number of sent updates needed for each
363                               result to be included into result csv file
364                               (mainly needed because of the result accuracy)
365         Returns:
366             :return: n/a
367         """
368         # default values handling
369         if file_name is None:
370             file_name = self.results_file_name_default
371         if threshold is None:
372             threshold = self.performance_threshold_default
373         # performance calculation
374         if self.phase1_updates_sent >= threshold:
375             totals1 = self.phase1_updates_sent
376             performance1 = int(self.phase1_updates_sent /
377                                (self.phase1_stop_time - self.phase1_start_time))
378         else:
379             totals1 = None
380             performance1 = None
381         if self.phase2_updates_sent >= threshold:
382             totals2 = self.phase2_updates_sent
383             performance2 = int(self.phase2_updates_sent /
384                                (self.phase2_stop_time - self.phase2_start_time))
385         else:
386             totals2 = None
387             performance2 = None
388
389         logger.info("#" * 10 + " Final results " + "#" * 10)
390         logger.info("Number of iterations: " + str(self.iteration))
391         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
392                     str(self.phase1_updates_sent))
393         logger.info("The pre-fill phase duration: " +
394                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
395         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
396                     str(self.phase2_updates_sent))
397         logger.info("The 2nd test phase duration: " +
398                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
399         logger.info("Threshold for performance reporting: " + str(threshold))
400
401         # making labels
402         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
403                         " route(s) per UPDATE")
404         if self.single_update_default:
405             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
406                                   "/-" + str(self.prefix_count_to_del_default) +
407                                   " routes per UPDATE")
408         else:
409             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
410                                   "/-" + str(self.prefix_count_to_del_default) +
411                                   " routes in two UPDATEs")
412         # collecting capacity and performance results
413         totals = {}
414         performance = {}
415         if totals1 is not None:
416             totals[phase1_label] = totals1
417             performance[phase1_label] = performance1
418         if totals2 is not None:
419             totals[phase2_label] = totals2
420             performance[phase2_label] = performance2
421         self.write_results_to_file(totals, "totals-" + file_name)
422         self.write_results_to_file(performance, "performance-" + file_name)
423
424     def write_results_to_file(self, results, file_name):
425         """Writes results to the csv plot file consumable by Jenkins.
426
427         Arguments:
428             :param file_name: Name of the (csv) file to be created
429         Returns:
430             :return: none
431         """
432         first_line = ""
433         second_line = ""
434         f = open(file_name, "wt")
435         try:
436             for key in sorted(results):
437                 first_line += key + ", "
438                 second_line += str(results[key]) + ", "
439             first_line = first_line[:-2]
440             second_line = second_line[:-2]
441             f.write(first_line + "\n")
442             f.write(second_line + "\n")
443             logger.info("Message generator performance results stored in " +
444                         file_name + ":")
445             logger.info("  " + first_line)
446             logger.info("  " + second_line)
447         finally:
448             f.close()
449
450     # Return pseudo-randomized (reproducible) index for selected range
451     def randomize_index(self, index, lowest=None, highest=None):
452         """Calculates pseudo-randomized index from selected range.
453
454         Arguments:
455             :param index: input index
456             :param lowest: the lowes index from the randomized area
457             :param highest: the highest index from the randomized area
458         Returns:
459             :return: the (pseudo)randomized index
460         Notes:
461             Created just as a fame for future generator enhancement.
462         """
463         # default values handling
464         if lowest is None:
465             lowest = self.randomize_lowest_default
466         if highest is None:
467             highest = self.randomize_highest_default
468         # randomize
469         if (index >= lowest) and (index <= highest):
470             # we are in the randomized range -> shuffle it inside
471             # the range (now just reverse the order)
472             new_index = highest - (index - lowest)
473         else:
474             # we are out of the randomized range -> nothing to do
475             new_index = index
476         return new_index
477
478     # Get list of prefixes
479     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
480                         prefix_len=None, prefix_count=None, randomize=None):
481         """Generates list of IP address prefixes.
482
483         Arguments:
484             :param slot_index: index of group of prefix addresses
485             :param slot_size: size of group of prefix addresses
486                 in [number of included prefixes]
487             :param prefix_base: IP address of the first prefix
488                 (slot_index = 0, prefix_index = 0)
489             :param prefix_len: length of the prefix in bites
490                 (the same as size of netmask)
491             :param prefix_count: number of prefixes to be returned
492                 from the specified slot
493         Returns:
494             :return: list of generated IP address prefixes
495         """
496         # default values handling
497         if slot_size is None:
498             slot_size = self.slot_size_default
499         if prefix_base is None:
500             prefix_base = self.prefix_base_default
501         if prefix_len is None:
502             prefix_len = self.prefix_length_default
503         if prefix_count is None:
504             prefix_count = slot_size
505         if randomize is None:
506             randomize = self.randomize_updates_default
507         # generating list of prefixes
508         indexes = []
509         prefixes = []
510         prefix_gap = 2 ** (32 - prefix_len)
511         for i in range(prefix_count):
512             prefix_index = slot_index * slot_size + i
513             if randomize:
514                 prefix_index = self.randomize_index(prefix_index)
515             indexes.append(prefix_index)
516             prefixes.append(prefix_base + prefix_index * prefix_gap)
517         if self.log_debug:
518             logger.debug("  Prefix slot index: " + str(slot_index))
519             logger.debug("  Prefix slot size: " + str(slot_size))
520             logger.debug("  Prefix count: " + str(prefix_count))
521             logger.debug("  Prefix indexes: " + str(indexes))
522             logger.debug("  Prefix list: " + str(prefixes))
523         return prefixes
524
525     def compose_update_message(self, prefix_count_to_add=None,
526                                prefix_count_to_del=None):
527         """Composes an UPDATE message
528
529         Arguments:
530             :param prefix_count_to_add: # of prefixes to put into NLRI list
531             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
532         Returns:
533             :return: encoded UPDATE message in HEX
534         Notes:
535             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
536             lists or common message wich includes both prefix lists.
537             Updates global counters.
538         """
539         # default values handling
540         if prefix_count_to_add is None:
541             prefix_count_to_add = self.prefix_count_to_add_default
542         if prefix_count_to_del is None:
543             prefix_count_to_del = self.prefix_count_to_del_default
544         # logging
545         if self.log_info and not (self.iteration % 1000):
546             logger.info("Iteration: " + str(self.iteration) +
547                         " - total remaining prefixes: " +
548                         str(self.remaining_prefixes))
549         if self.log_debug:
550             logger.debug("#" * 10 + " Iteration: " +
551                          str(self.iteration) + " " + "#" * 10)
552             logger.debug("Remaining prefixes: " +
553                          str(self.remaining_prefixes))
554         # scenario type & one-shot counter
555         straightforward_scenario = (self.remaining_prefixes >
556                                     self.remaining_prefixes_threshold)
557         if straightforward_scenario:
558             prefix_count_to_del = 0
559             if self.log_debug:
560                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
561             if not self.phase1_start_time:
562                 self.phase1_start_time = time.time()
563         else:
564             if self.log_debug:
565                 logger.debug("--- COMBINED SCENARIO ---")
566             if not self.phase2_start_time:
567                 self.phase2_start_time = time.time()
568         # tailor the number of prefixes if needed
569         prefix_count_to_add = (prefix_count_to_del +
570                                min(prefix_count_to_add - prefix_count_to_del,
571                                    self.remaining_prefixes))
572         # prefix slots selection for insertion and withdrawal
573         slot_index_to_add = self.iteration
574         slot_index_to_del = slot_index_to_add - self.slot_gap_default
575         # getting lists of prefixes for insertion in this iteration
576         if self.log_debug:
577             logger.debug("Prefixes to be inserted in this iteration:")
578         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
579                                                   prefix_count=prefix_count_to_add)
580         # getting lists of prefixes for withdrawal in this iteration
581         if self.log_debug:
582             logger.debug("Prefixes to be withdrawn in this iteration:")
583         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
584                                                   prefix_count=prefix_count_to_del)
585         # generating the mesage
586         if self.single_update_default:
587             # Send prefixes to be introduced and withdrawn
588             # in one UPDATE message
589             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
590                                           nlri_prefixes=prefix_list_to_add)
591         else:
592             # Send prefixes to be introduced and withdrawn
593             # in separate UPDATE messages (if needed)
594             msg_out = self.update_message(wr_prefixes=[],
595                                           nlri_prefixes=prefix_list_to_add)
596             if prefix_count_to_del:
597                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
598                                                nlri_prefixes=[])
599         # updating counters - who knows ... maybe I am last time here ;)
600         if straightforward_scenario:
601             self.phase1_stop_time = time.time()
602             self.phase1_updates_sent = self.updates_sent
603         else:
604             self.phase2_stop_time = time.time()
605             self.phase2_updates_sent = (self.updates_sent -
606                                         self.phase1_updates_sent)
607         # updating totals for the next iteration
608         self.iteration += 1
609         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
610         # returning the encoded message
611         return msg_out
612
613     # Section of message encoders
614
615     def open_message(self, version=None, my_autonomous_system=None,
616                      hold_time=None, bgp_identifier=None):
617         """Generates an OPEN Message (rfc4271#section-4.2)
618
619         Arguments:
620             :param version: see the rfc4271#section-4.2
621             :param my_autonomous_system: see the rfc4271#section-4.2
622             :param hold_time: see the rfc4271#section-4.2
623             :param bgp_identifier: see the rfc4271#section-4.2
624         Returns:
625             :return: encoded OPEN message in HEX
626         """
627
628         # Default values handling
629         if version is None:
630             version = self.version_default
631         if my_autonomous_system is None:
632             my_autonomous_system = self.my_autonomous_system_default
633         if hold_time is None:
634             hold_time = self.hold_time_default
635         if bgp_identifier is None:
636             bgp_identifier = self.bgp_identifier_default
637
638         # Marker
639         marker_hex = "\xFF" * 16
640
641         # Type
642         type = 1
643         type_hex = struct.pack("B", type)
644
645         # version
646         version_hex = struct.pack("B", version)
647
648         # my_autonomous_system
649         # AS_TRANS value, 23456 decadic.
650         my_autonomous_system_2_bytes = 23456
651         # AS number is mappable to 2 bytes
652         if my_autonomous_system < 65536:
653             my_autonomous_system_2_bytes = my_autonomous_system
654         my_autonomous_system_hex_2_bytes = struct.pack(">H",
655                                                        my_autonomous_system)
656
657         # Hold Time
658         hold_time_hex = struct.pack(">H", hold_time)
659
660         # BGP Identifier
661         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
662
663         # Optional Parameters
664         optional_parameters_hex = ""
665         if self.rfc4760:
666             optional_parameter_hex = (
667                 "\x02"  # Param type ("Capability Ad")
668                 "\x06"  # Length (6 bytes)
669                 "\x01"  # Capability type (NLRI Unicast),
670                         # see RFC 4760, secton 8
671                 "\x04"  # Capability value length
672                 "\x00\x01"  # AFI (Ipv4)
673                 "\x00"  # (reserved)
674                 "\x01"  # SAFI (Unicast)
675             )
676             optional_parameters_hex += optional_parameter_hex
677
678         optional_parameter_hex = (
679             "\x02"  # Param type ("Capability Ad")
680             "\x06"  # Length (6 bytes)
681             "\x41"  # "32 bit AS Numbers Support"
682                     # (see RFC 6793, section 3)
683             "\x04"  # Capability value length
684                     # My AS in 32 bit format
685             + struct.pack(">I", my_autonomous_system)
686         )
687         optional_parameters_hex += optional_parameter_hex
688
689         # Optional Parameters Length
690         optional_parameters_length = len(optional_parameters_hex)
691         optional_parameters_length_hex = struct.pack("B",
692                                                      optional_parameters_length)
693
694         # Length (big-endian)
695         length = (
696             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
697             len(my_autonomous_system_hex_2_bytes) +
698             len(hold_time_hex) + len(bgp_identifier_hex) +
699             len(optional_parameters_length_hex) +
700             len(optional_parameters_hex)
701         )
702         length_hex = struct.pack(">H", length)
703
704         # OPEN Message
705         message_hex = (
706             marker_hex +
707             length_hex +
708             type_hex +
709             version_hex +
710             my_autonomous_system_hex_2_bytes +
711             hold_time_hex +
712             bgp_identifier_hex +
713             optional_parameters_length_hex +
714             optional_parameters_hex
715         )
716
717         if self.log_debug:
718             logger.debug("OPEN message encoding")
719             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
720             logger.debug("  Length=" + str(length) + " (0x" +
721                          binascii.hexlify(length_hex) + ")")
722             logger.debug("  Type=" + str(type) + " (0x" +
723                          binascii.hexlify(type_hex) + ")")
724             logger.debug("  Version=" + str(version) + " (0x" +
725                          binascii.hexlify(version_hex) + ")")
726             logger.debug("  My Autonomous System=" +
727                          str(my_autonomous_system_2_bytes) + " (0x" +
728                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
729                          ")")
730             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
731                          binascii.hexlify(hold_time_hex) + ")")
732             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
733                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
734             logger.debug("  Optional Parameters Length=" +
735                          str(optional_parameters_length) + " (0x" +
736                          binascii.hexlify(optional_parameters_length_hex) +
737                          ")")
738             logger.debug("  Optional Parameters=0x" +
739                          binascii.hexlify(optional_parameters_hex))
740             logger.debug("OPEN message encoded: 0x%s",
741                          binascii.b2a_hex(message_hex))
742
743         return message_hex
744
745     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
746                        wr_prefix_length=None, nlri_prefix_length=None,
747                        my_autonomous_system=None, next_hop=None):
748         """Generates an UPDATE Message (rfc4271#section-4.3)
749
750         Arguments:
751             :param wr_prefixes: see the rfc4271#section-4.3
752             :param nlri_prefixes: see the rfc4271#section-4.3
753             :param wr_prefix_length: see the rfc4271#section-4.3
754             :param nlri_prefix_length: see the rfc4271#section-4.3
755             :param my_autonomous_system: see the rfc4271#section-4.3
756             :param next_hop: see the rfc4271#section-4.3
757         Returns:
758             :return: encoded UPDATE message in HEX
759         """
760
761         # Default values handling
762         if wr_prefixes is None:
763             wr_prefixes = self.wr_prefixes_default
764         if nlri_prefixes is None:
765             nlri_prefixes = self.nlri_prefixes_default
766         if wr_prefix_length is None:
767             wr_prefix_length = self.prefix_length_default
768         if nlri_prefix_length is None:
769             nlri_prefix_length = self.prefix_length_default
770         if my_autonomous_system is None:
771             my_autonomous_system = self.my_autonomous_system_default
772         if next_hop is None:
773             next_hop = self.next_hop_default
774
775         # Marker
776         marker_hex = "\xFF" * 16
777
778         # Type
779         type = 2
780         type_hex = struct.pack("B", type)
781
782         # Withdrawn Routes
783         bytes = ((wr_prefix_length - 1) / 8) + 1
784         withdrawn_routes_hex = ""
785         for prefix in wr_prefixes:
786             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
787                                    struct.pack(">I", int(prefix))[:bytes])
788             withdrawn_routes_hex += withdrawn_route_hex
789
790         # Withdrawn Routes Length
791         withdrawn_routes_length = len(withdrawn_routes_hex)
792         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
793
794         # TODO: to replace hardcoded string by encoding?
795         # Path Attributes
796         if nlri_prefixes != []:
797             path_attributes_hex = (
798                 "\x40"  # Flags ("Well-Known")
799                 "\x01"  # Type (ORIGIN)
800                 "\x01"  # Length (1)
801                 "\x00"  # Origin: IGP
802                 "\x40"  # Flags ("Well-Known")
803                 "\x02"  # Type (AS_PATH)
804                 "\x06"  # Length (6)
805                 "\x02"  # AS segment type (AS_SEQUENCE)
806                 "\x01"  # AS segment length (1)
807                         # AS segment (4 bytes)
808                 + struct.pack(">I", my_autonomous_system) +
809                 "\x40"  # Flags ("Well-Known")
810                 "\x03"  # Type (NEXT_HOP)
811                 "\x04"  # Length (4)
812                         # IP address of the next hop (4 bytes)
813                 + struct.pack(">I", int(next_hop))
814             )
815         else:
816             path_attributes_hex = ""
817
818         # Total Path Attributes Length
819         total_path_attributes_length = len(path_attributes_hex)
820         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
821
822         # Network Layer Reachability Information
823         bytes = ((nlri_prefix_length - 1) / 8) + 1
824         nlri_hex = ""
825         for prefix in nlri_prefixes:
826             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
827                                struct.pack(">I", int(prefix))[:bytes])
828             nlri_hex += nlri_prefix_hex
829
830         # Length (big-endian)
831         length = (
832             len(marker_hex) + 2 + len(type_hex) +
833             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
834             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
835             len(nlri_hex))
836         length_hex = struct.pack(">H", length)
837
838         # UPDATE Message
839         message_hex = (
840             marker_hex +
841             length_hex +
842             type_hex +
843             withdrawn_routes_length_hex +
844             withdrawn_routes_hex +
845             total_path_attributes_length_hex +
846             path_attributes_hex +
847             nlri_hex
848         )
849
850         if self.log_debug:
851             logger.debug("UPDATE message encoding")
852             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
853             logger.debug("  Length=" + str(length) + " (0x" +
854                          binascii.hexlify(length_hex) + ")")
855             logger.debug("  Type=" + str(type) + " (0x" +
856                          binascii.hexlify(type_hex) + ")")
857             logger.debug("  withdrawn_routes_length=" +
858                          str(withdrawn_routes_length) + " (0x" +
859                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
860             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
861                          str(wr_prefix_length) + " (0x" +
862                          binascii.hexlify(withdrawn_routes_hex) + ")")
863             logger.debug("  Total Path Attributes Length=" +
864                          str(total_path_attributes_length) + " (0x" +
865                          binascii.hexlify(total_path_attributes_length_hex) +
866                          ")")
867             logger.debug("  Path Attributes=" + "(0x" +
868                          binascii.hexlify(path_attributes_hex) + ")")
869             logger.debug("  Network Layer Reachability Information=" +
870                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
871                          " (0x" + binascii.hexlify(nlri_hex) + ")")
872             logger.debug("UPDATE message encoded: 0x" +
873                          binascii.b2a_hex(message_hex))
874
875         # updating counter
876         self.updates_sent += 1
877         # returning encoded message
878         return message_hex
879
880     def notification_message(self, error_code, error_subcode, data_hex=""):
881         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
882
883         Arguments:
884             :param error_code: see the rfc4271#section-4.5
885             :param error_subcode: see the rfc4271#section-4.5
886             :param data_hex: see the rfc4271#section-4.5
887         Returns:
888             :return: encoded NOTIFICATION message in HEX
889         """
890
891         # Marker
892         marker_hex = "\xFF" * 16
893
894         # Type
895         type = 3
896         type_hex = struct.pack("B", type)
897
898         # Error Code
899         error_code_hex = struct.pack("B", error_code)
900
901         # Error Subode
902         error_subcode_hex = struct.pack("B", error_subcode)
903
904         # Length (big-endian)
905         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
906                   len(error_subcode_hex) + len(data_hex))
907         length_hex = struct.pack(">H", length)
908
909         # NOTIFICATION Message
910         message_hex = (
911             marker_hex +
912             length_hex +
913             type_hex +
914             error_code_hex +
915             error_subcode_hex +
916             data_hex
917         )
918
919         if self.log_debug:
920             logger.debug("NOTIFICATION message encoding")
921             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
922             logger.debug("  Length=" + str(length) + " (0x" +
923                          binascii.hexlify(length_hex) + ")")
924             logger.debug("  Type=" + str(type) + " (0x" +
925                          binascii.hexlify(type_hex) + ")")
926             logger.debug("  Error Code=" + str(error_code) + " (0x" +
927                          binascii.hexlify(error_code_hex) + ")")
928             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
929                          binascii.hexlify(error_subcode_hex) + ")")
930             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
931             logger.debug("NOTIFICATION message encoded: 0x%s",
932                          binascii.b2a_hex(message_hex))
933
934         return message_hex
935
936     def keepalive_message(self):
937         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
938
939         Returns:
940             :return: encoded KEEP ALIVE message in HEX
941         """
942
943         # Marker
944         marker_hex = "\xFF" * 16
945
946         # Type
947         type = 4
948         type_hex = struct.pack("B", type)
949
950         # Length (big-endian)
951         length = len(marker_hex) + 2 + len(type_hex)
952         length_hex = struct.pack(">H", length)
953
954         # KEEP ALIVE Message
955         message_hex = (
956             marker_hex +
957             length_hex +
958             type_hex
959         )
960
961         if self.log_debug:
962             logger.debug("KEEP ALIVE message encoding")
963             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
964             logger.debug("  Length=" + str(length) + " (0x" +
965                          binascii.hexlify(length_hex) + ")")
966             logger.debug("  Type=" + str(type) + " (0x" +
967                          binascii.hexlify(type_hex) + ")")
968             logger.debug("KEEP ALIVE message encoded: 0x%s",
969                          binascii.b2a_hex(message_hex))
970
971         return message_hex
972
973
974 class TimeTracker(object):
975     """Class for tracking timers, both for my keepalives and
976     peer's hold time.
977     """
978
979     def __init__(self, msg_in):
980         """Initialisation. based on defaults and OPEN message from peer.
981
982         Arguments:
983             msg_in: the OPEN message received from peer.
984         """
985         # Note: Relative time is always named timedelta, to stress that
986         # the (non-delta) time is absolute.
987         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
988         # Upper bound for being stuck in the same state, we should
989         # at least report something before continuing.
990         # Negotiate the hold timer by taking the smaller
991         # of the 2 values (mine and the peer's).
992         hold_timedelta = 180  # Not an attribute of self yet.
993         # TODO: Make the default value configurable,
994         # default value could mirror what peer said.
995         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
996         if hold_timedelta > peer_hold_timedelta:
997             hold_timedelta = peer_hold_timedelta
998         if hold_timedelta != 0 and hold_timedelta < 3:
999             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1000             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1001         self.hold_timedelta = hold_timedelta
1002         # If we do not hear from peer this long, we assume it has died.
1003         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1004         # Upper limit for duration between messages, to avoid being
1005         # declared to be dead.
1006         # The same as calling snapshot(), but also declares a field.
1007         self.snapshot_time = time.time()
1008         # Sometimes we need to store time. This is where to get
1009         # the value from afterwards. Time_keepalive may be too strict.
1010         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1011         # At this time point, peer will be declared dead.
1012         self.my_keepalive_time = None  # to be set later
1013         # At this point, we should be sending keepalive message.
1014
1015     def snapshot(self):
1016         """Store current time in instance data to use later."""
1017         # Read as time before something interesting was called.
1018         self.snapshot_time = time.time()
1019
1020     def reset_peer_hold_time(self):
1021         """Move hold time to future as peer has just proven it still lives."""
1022         self.peer_hold_time = time.time() + self.hold_timedelta
1023
1024     # Some methods could rely on self.snapshot_time, but it is better
1025     # to require user to provide it explicitly.
1026     def reset_my_keepalive_time(self, keepalive_time):
1027         """Calculate and set the next my KEEP ALIVE timeout time
1028
1029         Arguments:
1030             :keepalive_time: the initial value of the KEEP ALIVE timer
1031         """
1032         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1033
1034     def is_time_for_my_keepalive(self):
1035         """Check for my KEEP ALIVE timeout occurence"""
1036         if self.hold_timedelta == 0:
1037             return False
1038         return self.snapshot_time >= self.my_keepalive_time
1039
1040     def get_next_event_time(self):
1041         """Set the time of the next expected or to be sent KEEP ALIVE"""
1042         if self.hold_timedelta == 0:
1043             return self.snapshot_time + 86400
1044         return min(self.my_keepalive_time, self.peer_hold_time)
1045
1046     def check_peer_hold_time(self, snapshot_time):
1047         """Raise error if nothing was read from peer until specified time."""
1048         # Hold time = 0 means keepalive checking off.
1049         if self.hold_timedelta != 0:
1050             # time.time() may be too strict
1051             if snapshot_time > self.peer_hold_time:
1052                 logger.error("Peer has overstepped the hold timer.")
1053                 raise RuntimeError("Peer has overstepped the hold timer.")
1054                 # TODO: Include hold_timedelta?
1055                 # TODO: Add notification sending (attempt). That means
1056                 # move to write tracker.
1057
1058
1059 class ReadTracker(object):
1060     """Class for tracking read of mesages chunk by chunk and
1061     for idle waiting.
1062     """
1063
1064     def __init__(self, bgp_socket, timer):
1065         """The reader initialisation.
1066
1067         Arguments:
1068             bgp_socket: socket to be used for sending
1069             timer: timer to be used for scheduling
1070         """
1071         # References to outside objects.
1072         self.socket = bgp_socket
1073         self.timer = timer
1074         # BGP marker length plus length field length.
1075         self.header_length = 18
1076         # TODO: make it class (constant) attribute
1077         # Computation of where next chunk ends depends on whether
1078         # we are beyond length field.
1079         self.reading_header = True
1080         # Countdown towards next size computation.
1081         self.bytes_to_read = self.header_length
1082         # Incremental buffer for message under read.
1083         self.msg_in = ""
1084         # Initialising counters
1085         self.updates_received = 0
1086         self.prefixes_introduced = 0
1087         self.prefixes_withdrawn = 0
1088         self.rx_idle_time = 0
1089
1090     def read_message_chunk(self):
1091         """Read up to one message
1092
1093         Note:
1094             Currently it does not return anything.
1095         """
1096         # TODO: We could return the whole message, currently not needed.
1097         # We assume the socket is readable.
1098         chunk_message = self.socket.recv(self.bytes_to_read)
1099         self.msg_in += chunk_message
1100         self.bytes_to_read -= len(chunk_message)
1101         # TODO: bytes_to_read < 0 is not possible, right?
1102         if not self.bytes_to_read:
1103             # Finished reading a logical block.
1104             if self.reading_header:
1105                 # The logical block was a BGP header.
1106                 # Now we know the size of the message.
1107                 self.reading_header = False
1108                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1109                                       self.header_length)
1110             else:  # We have finished reading the body of the message.
1111                 # Peer has just proven it is still alive.
1112                 self.timer.reset_peer_hold_time()
1113                 # TODO: Do we want to count received messages?
1114                 # This version ignores the received message.
1115                 # TODO: Should we do validation and exit on anything
1116                 # besides update or keepalive?
1117                 # Prepare state for reading another message.
1118                 message_type_hex = self.msg_in[self.header_length]
1119                 if message_type_hex == "\x01":
1120                     logger.info("OPEN message received: 0x%s",
1121                                 binascii.b2a_hex(self.msg_in))
1122                 elif message_type_hex == "\x02":
1123                     logger.debug("UPDATE message received: 0x%s",
1124                                  binascii.b2a_hex(self.msg_in))
1125                     self.decode_update_message(self.msg_in)
1126                 elif message_type_hex == "\x03":
1127                     logger.info("NOTIFICATION message received: 0x%s",
1128                                 binascii.b2a_hex(self.msg_in))
1129                 elif message_type_hex == "\x04":
1130                     logger.info("KEEP ALIVE message received: 0x%s",
1131                                 binascii.b2a_hex(self.msg_in))
1132                 else:
1133                     logger.warning("Unexpected message received: 0x%s",
1134                                    binascii.b2a_hex(self.msg_in))
1135                 self.msg_in = ""
1136                 self.reading_header = True
1137                 self.bytes_to_read = self.header_length
1138         # We should not act upon peer_hold_time if we are reading
1139         # something right now.
1140         return
1141
1142     def decode_path_attributes(self, path_attributes_hex):
1143         """Decode the Path Attributes field (rfc4271#section-4.3)
1144
1145         Arguments:
1146             :path_attributes: path_attributes field to be decoded in hex
1147         Returns:
1148             :return: None
1149         """
1150         hex_to_decode = path_attributes_hex
1151
1152         while len(hex_to_decode):
1153             attr_flags_hex = hex_to_decode[0]
1154             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1155 #            attr_optional_bit = attr_flags & 128
1156 #            attr_transitive_bit = attr_flags & 64
1157 #            attr_partial_bit = attr_flags & 32
1158             attr_extended_length_bit = attr_flags & 16
1159
1160             attr_type_code_hex = hex_to_decode[1]
1161             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1162
1163             if attr_extended_length_bit:
1164                 attr_length_hex = hex_to_decode[2:4]
1165                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1166                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1167                 hex_to_decode = hex_to_decode[4 + attr_length:]
1168             else:
1169                 attr_length_hex = hex_to_decode[2]
1170                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1171                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1172                 hex_to_decode = hex_to_decode[3 + attr_length:]
1173
1174             if attr_type_code == 1:
1175                 logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
1176                              binascii.b2a_hex(attr_flags_hex))
1177                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1178             elif attr_type_code == 2:
1179                 logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
1180                              binascii.b2a_hex(attr_flags_hex))
1181                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1182             elif attr_type_code == 3:
1183                 logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
1184                              binascii.b2a_hex(attr_flags_hex))
1185                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1186             elif attr_type_code == 4:
1187                 logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
1188                              binascii.b2a_hex(attr_flags_hex))
1189                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1190             elif attr_type_code == 5:
1191                 logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
1192                              binascii.b2a_hex(attr_flags_hex))
1193                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1194             elif attr_type_code == 6:
1195                 logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
1196                              binascii.b2a_hex(attr_flags_hex))
1197                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1198             elif attr_type_code == 7:
1199                 logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
1200                              binascii.b2a_hex(attr_flags_hex))
1201                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1202             elif attr_type_code == 14:  # rfc4760#section-3
1203                 logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
1204                              binascii.b2a_hex(attr_flags_hex))
1205                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1206                 address_family_identifier_hex = attr_value_hex[0:2]
1207                 logger.debug("  Address Family Identifier = 0x%s",
1208                              binascii.b2a_hex(address_family_identifier_hex))
1209                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1210                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1211                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1212                 next_hop_netaddr_len_hex = attr_value_hex[3]
1213                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1214                 logger.debug("  Length of Next Hop Network Address = 0x%s (%s)",
1215                              binascii.b2a_hex(next_hop_netaddr_len_hex),
1216                              next_hop_netaddr_len)
1217                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1218                 logger.debug("  Network Address of Next Hop = 0x%s",
1219                              binascii.b2a_hex(next_hop_netaddr_hex))
1220                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1221                 logger.debug("  Reserved = 0x%s",
1222                              binascii.b2a_hex(reserved_hex))
1223                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1224                 logger.debug("  Network Layer Reachability Information = 0x%s",
1225                              binascii.b2a_hex(nlri_hex))
1226                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1227                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1228                 for prefix in nlri_prefix_list:
1229                     logger.debug("  nlri_prefix_received: %s", prefix)
1230                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1231             elif attr_type_code == 15:  # rfc4760#section-4
1232                 logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
1233                              binascii.b2a_hex(attr_flags_hex))
1234                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1235                 address_family_identifier_hex = attr_value_hex[0:2]
1236                 logger.debug("  Address Family Identifier = 0x%s",
1237                              binascii.b2a_hex(address_family_identifier_hex))
1238                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1239                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1240                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1241                 wd_hex = attr_value_hex[3:]
1242                 logger.debug("  Withdrawn Routes = 0x%s",
1243                              binascii.b2a_hex(wd_hex))
1244                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1245                 logger.debug("  Withdrawn routes prefix list: %s",
1246                              wdr_prefix_list)
1247                 for prefix in wdr_prefix_list:
1248                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1249                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1250             else:
1251                 logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
1252                              binascii.b2a_hex(attr_flags_hex))
1253                 logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1254         return None
1255
1256     def decode_update_message(self, msg):
1257         """Decode an UPDATE message (rfc4271#section-4.3)
1258
1259         Arguments:
1260             :msg: message to be decoded in hex
1261         Returns:
1262             :return: None
1263         """
1264         logger.debug("Decoding update message:")
1265         # message header - marker
1266         marker_hex = msg[:16]
1267         logger.debug("Message header marker: 0x%s",
1268                      binascii.b2a_hex(marker_hex))
1269         # message header - message length
1270         msg_length_hex = msg[16:18]
1271         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1272         logger.debug("Message lenght: 0x%s (%s)",
1273                      binascii.b2a_hex(msg_length_hex), msg_length)
1274         # message header - message type
1275         msg_type_hex = msg[18:19]
1276         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1277         if msg_type == 2:
1278             logger.debug("Message type: 0x%s (update)",
1279                          binascii.b2a_hex(msg_type_hex))
1280             # withdrawn routes length
1281             wdr_length_hex = msg[19:21]
1282             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1283             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1284                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1285             # withdrawn routes
1286             wdr_hex = msg[21:21 + wdr_length]
1287             logger.debug("Withdrawn routes: 0x%s",
1288                          binascii.b2a_hex(wdr_hex))
1289             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1290             logger.debug("Withdrawn routes prefix list: %s",
1291                          wdr_prefix_list)
1292             for prefix in wdr_prefix_list:
1293                 logger.debug("withdrawn_prefix_received: %s", prefix)
1294             # total path attribute length
1295             total_pa_length_offset = 21 + wdr_length
1296             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1297             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1298             logger.debug("Total path attribute lenght: 0x%s (%s)",
1299                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1300             # path attributes
1301             pa_offset = total_pa_length_offset + 2
1302             pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1303             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1304             self.decode_path_attributes(pa_hex)
1305             # network layer reachability information length
1306             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1307             logger.debug("Calculated NLRI length: %s", nlri_length)
1308             # network layer reachability information
1309             nlri_offset = pa_offset + total_pa_length
1310             nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1311             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1312             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1313             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1314             for prefix in nlri_prefix_list:
1315                 logger.debug("nlri_prefix_received: %s", prefix)
1316             # Updating counters
1317             self.updates_received += 1
1318             self.prefixes_introduced += len(nlri_prefix_list)
1319             self.prefixes_withdrawn += len(wdr_prefix_list)
1320         else:
1321             logger.error("Unexpeced message type 0x%s in 0x%s",
1322                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1323
1324     def wait_for_read(self):
1325         """Read message until timeout (next expected event).
1326
1327         Note:
1328             Used when no more updates has to be sent to avoid busy-wait.
1329             Currently it does not return anything.
1330         """
1331         # Compute time to the first predictable state change
1332         event_time = self.timer.get_next_event_time()
1333         # snapshot_time would be imprecise
1334         wait_timedelta = min(event_time - time.time(), 10)
1335         if wait_timedelta < 0:
1336             # The program got around to waiting to an event in "very near
1337             # future" so late that it became a "past" event, thus tell
1338             # "select" to not wait at all. Passing negative timedelta to
1339             # select() would lead to either waiting forever (for -1) or
1340             # select.error("Invalid parameter") (for everything else).
1341             wait_timedelta = 0
1342         # And wait for event or something to read.
1343
1344         logger.info("total_received_update_message_counter: %s",
1345                     self.updates_received)
1346         logger.info("total_received_nlri_prefix_counter: %s",
1347                     self.prefixes_introduced)
1348         logger.info("total_received_withdrawn_prefix_counter: %s",
1349                     self.prefixes_withdrawn)
1350
1351         start_time = time.time()
1352         select.select([self.socket], [], [self.socket], wait_timedelta)
1353         timedelta = time.time() - start_time
1354         self.rx_idle_time += timedelta
1355
1356         logger.info("... idle for %.3fs", timedelta)
1357         logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1358         return
1359
1360
1361 class WriteTracker(object):
1362     """Class tracking enqueueing messages and sending chunks of them."""
1363
1364     def __init__(self, bgp_socket, generator, timer):
1365         """The writter initialisation.
1366
1367         Arguments:
1368             bgp_socket: socket to be used for sending
1369             generator: generator to be used for message generation
1370             timer: timer to be used for scheduling
1371         """
1372         # References to outside objects,
1373         self.socket = bgp_socket
1374         self.generator = generator
1375         self.timer = timer
1376         # Really new fields.
1377         # TODO: Would attribute docstrings add anything substantial?
1378         self.sending_message = False
1379         self.bytes_to_send = 0
1380         self.msg_out = ""
1381
1382     def enqueue_message_for_sending(self, message):
1383         """Enqueue message and change state.
1384
1385         Arguments:
1386             message: message to be enqueued into the msg_out buffer
1387         """
1388         self.msg_out += message
1389         self.bytes_to_send += len(message)
1390         self.sending_message = True
1391
1392     def send_message_chunk_is_whole(self):
1393         """Send enqueued data from msg_out buffer
1394
1395         Returns:
1396             :return: true if no remaining data to send
1397         """
1398         # We assume there is a msg_out to send and socket is writable.
1399         # print "going to send", repr(self.msg_out)
1400         self.timer.snapshot()
1401         bytes_sent = self.socket.send(self.msg_out)
1402         # Forget the part of message that was sent.
1403         self.msg_out = self.msg_out[bytes_sent:]
1404         self.bytes_to_send -= bytes_sent
1405         if not self.bytes_to_send:
1406             # TODO: Is it possible to hit negative bytes_to_send?
1407             self.sending_message = False
1408             # We should have reset hold timer on peer side.
1409             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1410             # The possible reason for not prioritizing reads is gone.
1411             return True
1412         return False
1413
1414
1415 class StateTracker(object):
1416     """Main loop has state so complex it warrants this separate class."""
1417
1418     def __init__(self, bgp_socket, generator, timer):
1419         """The state tracker initialisation.
1420
1421         Arguments:
1422             bgp_socket: socket to be used for sending / receiving
1423             generator: generator to be used for message generation
1424             timer: timer to be used for scheduling
1425         """
1426         # References to outside objects.
1427         self.socket = bgp_socket
1428         self.generator = generator
1429         self.timer = timer
1430         # Sub-trackers.
1431         self.reader = ReadTracker(bgp_socket, timer)
1432         self.writer = WriteTracker(bgp_socket, generator, timer)
1433         # Prioritization state.
1434         self.prioritize_writing = False
1435         # In general, we prioritize reading over writing. But in order
1436         # not to get blocked by neverending reads, we should
1437         # check whether we are not risking running out of holdtime.
1438         # So in some situations, this field is set to True to attempt
1439         # finishing sending a message, after which this field resets
1440         # back to False.
1441         # TODO: Alternative is to switch fairly between reading and
1442         # writing (called round robin from now on).
1443         # Message counting is done in generator.
1444
1445     def perform_one_loop_iteration(self):
1446         """ The main loop iteration
1447
1448         Notes:
1449             Calculates priority, resolves all conditions, calls
1450             appropriate method and returns to caller to repeat.
1451         """
1452         self.timer.snapshot()
1453         if not self.prioritize_writing:
1454             if self.timer.is_time_for_my_keepalive():
1455                 if not self.writer.sending_message:
1456                     # We need to schedule a keepalive ASAP.
1457                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1458                     logger.info("KEEP ALIVE is sent.")
1459                 # We are sending a message now, so let's prioritize it.
1460                 self.prioritize_writing = True
1461         # Now we know what our priorities are, we have to check
1462         # which actions are available.
1463         # socket.socket() returns three lists,
1464         # we store them to list of lists.
1465         list_list = select.select([self.socket], [self.socket], [self.socket],
1466                                   self.timer.report_timedelta)
1467         read_list, write_list, except_list = list_list
1468         # Lists are unpacked, each is either [] or [self.socket],
1469         # so we will test them as boolean.
1470         if except_list:
1471             logger.error("Exceptional state on the socket.")
1472             raise RuntimeError("Exceptional state on socket", self.socket)
1473         # We will do either read or write.
1474         if not (self.prioritize_writing and write_list):
1475             # Either we have no reason to rush writes,
1476             # or the socket is not writable.
1477             # We are focusing on reading here.
1478             if read_list:  # there is something to read indeed
1479                 # In this case we want to read chunk of message
1480                 # and repeat the select,
1481                 self.reader.read_message_chunk()
1482                 return
1483             # We were focusing on reading, but nothing to read was there.
1484             # Good time to check peer for hold timer.
1485             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1486             # Quiet on the read front, we can have attempt to write.
1487         if write_list:
1488             # Either we really want to reset peer's view of our hold
1489             # timer, or there was nothing to read.
1490             # Were we in the middle of sending a message?
1491             if self.writer.sending_message:
1492                 # Was it the end of a message?
1493                 whole = self.writer.send_message_chunk_is_whole()
1494                 # We were pressed to send something and we did it.
1495                 if self.prioritize_writing and whole:
1496                     # We prioritize reading again.
1497                     self.prioritize_writing = False
1498                 return
1499             # Finally to check if still update messages to be generated.
1500             if self.generator.remaining_prefixes:
1501                 msg_out = self.generator.compose_update_message()
1502                 if not self.generator.remaining_prefixes:
1503                     # We have just finished update generation,
1504                     # end-of-rib is due.
1505                     logger.info("All update messages generated.")
1506                     logger.info("Storing performance results.")
1507                     self.generator.store_results()
1508                     logger.info("Finally an END-OF-RIB is sent.")
1509                     msg_out += self.generator.update_message(wr_prefixes=[],
1510                                                              nlri_prefixes=[])
1511                 self.writer.enqueue_message_for_sending(msg_out)
1512                 # Attempt for real sending to be done in next iteration.
1513                 return
1514             # Nothing to write anymore.
1515             # To avoid busy loop, we do idle waiting here.
1516             self.reader.wait_for_read()
1517             return
1518         # We can neither read nor write.
1519         logger.warning("Input and output both blocked for " +
1520                        str(self.timer.report_timedelta) + " seconds.")
1521         # FIXME: Are we sure select has been really waiting
1522         # the whole period?
1523         return
1524
1525
1526 if __name__ == "__main__":
1527     """ One time initialisation and iterations looping.
1528
1529     Notes:
1530         Establish BGP connection and run iterations.
1531     """
1532     arguments = parse_arguments()
1533     logger = logging.getLogger("logger")
1534     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1535     console_handler = logging.StreamHandler()
1536     file_handler = logging.FileHandler(arguments.logfile, mode="w")
1537     console_handler.setFormatter(log_formatter)
1538     file_handler.setFormatter(log_formatter)
1539     logger.addHandler(console_handler)
1540     logger.addHandler(file_handler)
1541     logger.setLevel(arguments.loglevel)
1542     bgp_socket = establish_connection(arguments)
1543     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1544     # Receive open message before sending anything.
1545     # FIXME: Add parameter to send default open message first,
1546     # to work with "you first" peers.
1547     msg_in = read_open_message(bgp_socket)
1548     timer = TimeTracker(msg_in)
1549     generator = MessageGenerator(arguments)
1550     msg_out = generator.open_message()
1551     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1552     # Send our open message to the peer.
1553     bgp_socket.send(msg_out)
1554     # Wait for confirming keepalive.
1555     # TODO: Surely in just one packet?
1556     # Using exact keepalive length to not to see possible updates.
1557     msg_in = bgp_socket.recv(19)
1558     if msg_in != generator.keepalive_message():
1559         logger.error("Open not confirmed by keepalive, instead got " +
1560                      binascii.hexlify(msg_in))
1561         raise MessageError("Open not confirmed by keepalive, instead got",
1562                            msg_in)
1563     timer.reset_peer_hold_time()
1564     # Send the keepalive to indicate the connection is accepted.
1565     timer.snapshot()  # Remember this time.
1566     msg_out = generator.keepalive_message()
1567     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1568     bgp_socket.send(msg_out)
1569     # Use the remembered time.
1570     timer.reset_my_keepalive_time(timer.snapshot_time)
1571     # End of initial handshake phase.
1572     state = StateTracker(bgp_socket, generator, timer)
1573     while True:  # main reactor loop
1574         state.perform_one_loop_iteration()