BGP application peer performance suite
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
18
19 import argparse
20 import binascii
21 import ipaddr
22 import select
23 import socket
24 import time
25 import logging
26 import struct
27
28
29 def parse_arguments():
30     """Use argparse to get arguments,
31
32     Returns:
33         :return: args object.
34     """
35     parser = argparse.ArgumentParser()
36     # TODO: Should we use --argument-names-with-spaces?
37     str_help = "Autonomous System number use in the stream."
38     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
39     # FIXME: We are acting as iBGP peer,
40     # we should mirror AS number from peer's open message.
41     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
42     parser.add_argument("--amount", default="1", type=int, help=str_help)
43     str_help = "Maximum number of IP prefixes to be announced in one iteration"
44     parser.add_argument("--insert", default="1", type=int, help=str_help)
45     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
46     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
47     str_help = "The number of prefixes to process without withdrawals"
48     parser.add_argument("--prefill", default="0", type=int, help=str_help)
49     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
50     parser.add_argument("--updates", choices=["single", "separate"],
51                         default=["separate"], help=str_help)
52     str_help = "Base prefix IP address for prefix generation"
53     parser.add_argument("--firstprefix", default="8.0.1.0",
54                         type=ipaddr.IPv4Address, help=str_help)
55     str_help = "The prefix length."
56     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
57     str_help = "Listen for connection, instead of initiating it."
58     parser.add_argument("--listen", action="store_true", help=str_help)
59     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
60                 "Default value only suitable for listening.")
61     parser.add_argument("--myip", default="0.0.0.0",
62                         type=ipaddr.IPv4Address, help=str_help)
63     str_help = ("TCP port to bind to when listening or initiating connection." +
64                 "Default only suitable for initiating.")
65     parser.add_argument("--myport", default="0", type=int, help=str_help)
66     str_help = "The IP of the next hop to be placed into the update messages."
67     parser.add_argument("--nexthop", default="192.0.2.1",
68                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
69     str_help = ("Numeric IP Address to try to connect to." +
70                 "Currently no effect in listening mode.")
71     parser.add_argument("--peerip", default="127.0.0.2",
72                         type=ipaddr.IPv4Address, help=str_help)
73     str_help = "TCP port to try to connect to. No effect in listening mode."
74     parser.add_argument("--peerport", default="179", type=int, help=str_help)
75     str_help = "Local hold time."
76     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
77     str_help = "Log level (--error, --warning, --info, --debug)"
78     parser.add_argument("--error", dest="loglevel", action="store_const",
79                         const=logging.ERROR, default=logging.INFO,
80                         help=str_help)
81     parser.add_argument("--warning", dest="loglevel", action="store_const",
82                         const=logging.WARNING, default=logging.INFO,
83                         help=str_help)
84     parser.add_argument("--info", dest="loglevel", action="store_const",
85                         const=logging.INFO, default=logging.INFO,
86                         help=str_help)
87     parser.add_argument("--debug", dest="loglevel", action="store_const",
88                         const=logging.DEBUG, default=logging.INFO,
89                         help=str_help)
90     str_help = "Log file name"
91     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
92     str_help = "Trailing part of the csv result files for plotting purposes"
93     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
94     str_help = "Minimum number of updates to reach to include result into csv."
95     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
96     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
97     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
98     arguments = parser.parse_args()
99     # TODO: Are sanity checks (such as asnumber>=0) required?
100     return arguments
101
102
103 def establish_connection(arguments):
104     """Establish connection to BGP peer.
105
106     Arguments:
107         :arguments: following command-line argumets are used
108             - arguments.myip: local IP address
109             - arguments.myport: local port
110             - arguments.peerip: remote IP address
111             - arguments.peerport: remote port
112     Returns:
113         :return: socket.
114     """
115     if arguments.listen:
116         logger.info("Connecting in the listening mode.")
117         logger.debug("Local IP address: " + str(arguments.myip))
118         logger.debug("Local port: " + str(arguments.myport))
119         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
120         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
121         # bind need single tuple as argument
122         listening_socket.bind((str(arguments.myip), arguments.myport))
123         listening_socket.listen(1)
124         bgp_socket, _ = listening_socket.accept()
125         # TODO: Verify client IP is cotroller IP.
126         listening_socket.close()
127     else:
128         logger.info("Connecting in the talking mode.")
129         logger.debug("Local IP address: " + str(arguments.myip))
130         logger.debug("Local port: " + str(arguments.myport))
131         logger.debug("Remote IP address: " + str(arguments.peerip))
132         logger.debug("Remote port: " + str(arguments.peerport))
133         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
134         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
135         # bind to force specified address and port
136         talking_socket.bind((str(arguments.myip), arguments.myport))
137         # socket does not spead ipaddr, hence str()
138         talking_socket.connect((str(arguments.peerip), arguments.peerport))
139         bgp_socket = talking_socket
140     logger.info("Connected to ODL.")
141     return bgp_socket
142
143
144 def get_short_int_from_message(message, offset=16):
145     """Extract 2-bytes number from provided message.
146
147     Arguments:
148         :message: given message
149         :offset: offset of the short_int inside the message
150     Returns:
151         :return: required short_inf value.
152     Notes:
153         default offset value is the BGP message size offset.
154     """
155     high_byte_int = ord(message[offset])
156     low_byte_int = ord(message[offset + 1])
157     short_int = high_byte_int * 256 + low_byte_int
158     return short_int
159
160
161 def get_prefix_list_from_hex(prefixes_hex):
162     """Get decoded list of prefixes (rfc4271#section-4.3)
163
164     Arguments:
165         :prefixes_hex: list of prefixes to be decoded in hex
166     Returns:
167         :return: list of prefixes in the form of ip address (X.X.X.X/X)
168     """
169     prefix_list = []
170     offset = 0
171     while offset < len(prefixes_hex):
172         prefix_bit_len_hex = prefixes_hex[offset]
173         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
174         prefix_len = ((prefix_bit_len - 1) / 8) + 1
175         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
176         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
177         offset += 1 + prefix_len
178         prefix_list.append(prefix + "/" + str(prefix_bit_len))
179     return prefix_list
180
181
182 class MessageError(ValueError):
183     """Value error with logging optimized for hexlified messages."""
184
185     def __init__(self, text, message, *args):
186         """Initialisation.
187
188         Store and call super init for textual comment,
189         store raw message which caused it.
190         """
191         self.text = text
192         self.msg = message
193         super(MessageError, self).__init__(text, message, *args)
194
195     def __str__(self):
196         """Generate human readable error message.
197
198         Returns:
199             :return: human readable message as string
200         Notes:
201             Use a placeholder string if the message is to be empty.
202         """
203         message = binascii.hexlify(self.msg)
204         if message == "":
205             message = "(empty message)"
206         return self.text + ": " + message
207
208
209 def read_open_message(bgp_socket):
210     """Receive peer's OPEN message
211
212     Arguments:
213         :bgp_socket: the socket to be read
214     Returns:
215         :return: received OPEN message.
216     Notes:
217         Performs just basic incomming message checks
218     """
219     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
220     # TODO: Can the incoming open message be split in more than one packet?
221     # Some validation.
222     if len(msg_in) < 37:
223         # 37 is minimal length of open message with 4-byte AS number.
224         logger.error("Got something else than open with 4-byte AS number: " +
225                      binascii.hexlify(msg_in))
226         raise MessageError("Got something else than open with 4-byte AS number", msg_in)
227     # TODO: We could check BGP marker, but it is defined only later;
228     # decide what to do.
229     reported_length = get_short_int_from_message(msg_in)
230     if len(msg_in) != reported_length:
231         logger.error("Message length is not " + str(reported_length) +
232                      " as stated in " + binascii.hexlify(msg_in))
233         raise MessageError("Message length is not " + reported_length +
234                            " as stated in ", msg_in)
235     logger.info("Open message received.")
236     return msg_in
237
238
239 class MessageGenerator(object):
240     """Class which generates messages, holds states and configuration values."""
241
242     # TODO: Define bgp marker as a class (constant) variable.
243     def __init__(self, args):
244         """Initialisation according to command-line args.
245
246         Arguments:
247             :args: argsparser's Namespace object which contains command-line
248                 options for MesageGenerator initialisation
249         Notes:
250             Calculates and stores default values used later on for
251             message geeration.
252         """
253         self.total_prefix_amount = args.amount
254         # Number of update messages left to be sent.
255         self.remaining_prefixes = self.total_prefix_amount
256
257         # New parameters initialisation
258         self.iteration = 0
259         self.prefix_base_default = args.firstprefix
260         self.prefix_length_default = args.prefixlen
261         self.wr_prefixes_default = []
262         self.nlri_prefixes_default = []
263         self.version_default = 4
264         self.my_autonomous_system_default = args.asnumber
265         self.hold_time_default = args.holdtime  # Local hold time.
266         self.bgp_identifier_default = int(args.myip)
267         self.next_hop_default = args.nexthop
268         self.single_update_default = args.updates == "single"
269         self.randomize_updates_default = args.updates == "random"
270         self.prefix_count_to_add_default = args.insert
271         self.prefix_count_to_del_default = args.withdraw
272         if self.prefix_count_to_del_default < 0:
273             self.prefix_count_to_del_default = 0
274         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
275             # total number of prefixes must grow to avoid infinite test loop
276             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
277         self.slot_size_default = self.prefix_count_to_add_default
278         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
279         self.results_file_name_default = args.results
280         self.performance_threshold_default = args.threshold
281         self.rfc4760 = args.rfc4760 == "yes"
282         # Default values used for randomized part
283         s1_slots = ((self.total_prefix_amount -
284                      self.remaining_prefixes_threshold - 1) /
285                     self.prefix_count_to_add_default + 1)
286         s2_slots = ((self.remaining_prefixes_threshold - 1) /
287                     (self.prefix_count_to_add_default -
288                     self.prefix_count_to_del_default) + 1)
289         # S1_First_Index = 0
290         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
291         s2_first_index = s1_slots * self.prefix_count_to_add_default
292         s2_last_index = (s2_first_index +
293                          s2_slots * (self.prefix_count_to_add_default -
294                                      self.prefix_count_to_del_default) - 1)
295         self.slot_gap_default = ((self.total_prefix_amount -
296                                   self.remaining_prefixes_threshold - 1) /
297                                  self.prefix_count_to_add_default + 1)
298         self.randomize_lowest_default = s2_first_index
299         self.randomize_highest_default = s2_last_index
300
301         # Initialising counters
302         self.phase1_start_time = 0
303         self.phase1_stop_time = 0
304         self.phase2_start_time = 0
305         self.phase2_stop_time = 0
306         self.phase1_updates_sent = 0
307         self.phase2_updates_sent = 0
308         self.updates_sent = 0
309
310         self.log_info = args.loglevel <= logging.INFO
311         self.log_debug = args.loglevel <= logging.DEBUG
312         """
313         Flags needed for the MessageGenerator performance optimization.
314         Calling logger methods each iteration even with proper log level set
315         slows down significantly the MessageGenerator performance.
316         Measured total generation time (1M updates, dry run, error log level):
317         - logging based on basic logger features: 36,2s
318         - logging based on advanced logger features (lazy logging): 21,2s
319         - conditional calling of logger methods enclosed inside condition: 8,6s
320         """
321
322         logger.info("Generator initialisation")
323         logger.info("  Target total number of prefixes to be introduced: " +
324                     str(self.total_prefix_amount))
325         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
326                     str(self.prefix_length_default))
327         logger.info("  My Autonomous System number: " +
328                     str(self.my_autonomous_system_default))
329         logger.info("  My Hold Time: " + str(self.hold_time_default))
330         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
331         logger.info("  Next Hop: " + str(self.next_hop_default))
332         logger.info("  Prefix count to be inserted at once: " +
333                     str(self.prefix_count_to_add_default))
334         logger.info("  Prefix count to be withdrawn at once: " +
335                     str(self.prefix_count_to_del_default))
336         logger.info("  Fast pre-fill up to " +
337                     str(self.total_prefix_amount -
338                         self.remaining_prefixes_threshold) + " prefixes")
339         logger.info("  Remaining number of prefixes to be processed " +
340                     "in parallel with withdrawals: " +
341                     str(self.remaining_prefixes_threshold))
342         logger.debug("  Prefix index range used after pre-fill procedure [" +
343                      str(self.randomize_lowest_default) + ", " +
344                      str(self.randomize_highest_default) + "]")
345         if self.single_update_default:
346             logger.info("  Common single UPDATE will be generated " +
347                         "for both NLRI & WITHDRAWN lists")
348         else:
349             logger.info("  Two separate UPDATEs will be generated " +
350                         "for each NLRI & WITHDRAWN lists")
351         if self.randomize_updates_default:
352             logger.info("  Generation of UPDATE messages will be randomized")
353         logger.info("  Let\'s go ...\n")
354
355         # TODO: Notification for hold timer expiration can be handy.
356
357     def store_results(self, file_name=None, threshold=None):
358         """ Stores specified results into files based on file_name value.
359
360         Arguments:
361             :param file_name: Trailing (common) part of result file names
362             :param threshold: Minimum number of sent updates needed for each
363                               result to be included into result csv file
364                               (mainly needed because of the result accuracy)
365         Returns:
366             :return: n/a
367         """
368         # default values handling
369         if file_name is None:
370             file_name = self.results_file_name_default
371         if threshold is None:
372             threshold = self.performance_threshold_default
373         # performance calculation
374         if self.phase1_updates_sent >= threshold:
375             totals1 = self.phase1_updates_sent
376             performance1 = int(self.phase1_updates_sent /
377                                (self.phase1_stop_time - self.phase1_start_time))
378         else:
379             totals1 = None
380             performance1 = None
381         if self.phase2_updates_sent >= threshold:
382             totals2 = self.phase2_updates_sent
383             performance2 = int(self.phase2_updates_sent /
384                                (self.phase2_stop_time - self.phase2_start_time))
385         else:
386             totals2 = None
387             performance2 = None
388
389         logger.info("#" * 10 + " Final results " + "#" * 10)
390         logger.info("Number of iterations: " + str(self.iteration))
391         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
392                     str(self.phase1_updates_sent))
393         logger.info("The pre-fill phase duration: " +
394                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
395         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
396                     str(self.phase2_updates_sent))
397         logger.info("The 2nd test phase duration: " +
398                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
399         logger.info("Threshold for performance reporting: " + str(threshold))
400
401         # making labels
402         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
403                         " route(s) per UPDATE")
404         if self.single_update_default:
405             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
406                                   "/-" + str(self.prefix_count_to_del_default) +
407                                   " routes per UPDATE")
408         else:
409             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
410                                   "/-" + str(self.prefix_count_to_del_default) +
411                                   " routes in two UPDATEs")
412         # collecting capacity and performance results
413         totals = {}
414         performance = {}
415         if totals1 is not None:
416             totals[phase1_label] = totals1
417             performance[phase1_label] = performance1
418         if totals2 is not None:
419             totals[phase2_label] = totals2
420             performance[phase2_label] = performance2
421         self.write_results_to_file(totals, "totals-" + file_name)
422         self.write_results_to_file(performance, "performance-" + file_name)
423
424     def write_results_to_file(self, results, file_name):
425         """Writes results to the csv plot file consumable by Jenkins.
426
427         Arguments:
428             :param file_name: Name of the (csv) file to be created
429         Returns:
430             :return: none
431         """
432         first_line = ""
433         second_line = ""
434         f = open(file_name, "wt")
435         try:
436             for key in sorted(results):
437                 first_line += key + ", "
438                 second_line += str(results[key]) + ", "
439             first_line = first_line[:-2]
440             second_line = second_line[:-2]
441             f.write(first_line + "\n")
442             f.write(second_line + "\n")
443             logger.info("Message generator performance results stored in " +
444                         file_name + ":")
445             logger.info("  " + first_line)
446             logger.info("  " + second_line)
447         finally:
448             f.close()
449
450     # Return pseudo-randomized (reproducible) index for selected range
451     def randomize_index(self, index, lowest=None, highest=None):
452         """Calculates pseudo-randomized index from selected range.
453
454         Arguments:
455             :param index: input index
456             :param lowest: the lowes index from the randomized area
457             :param highest: the highest index from the randomized area
458         Returns:
459             :return: the (pseudo)randomized index
460         Notes:
461             Created just as a fame for future generator enhancement.
462         """
463         # default values handling
464         if lowest is None:
465             lowest = self.randomize_lowest_default
466         if highest is None:
467             highest = self.randomize_highest_default
468         # randomize
469         if (index >= lowest) and (index <= highest):
470             # we are in the randomized range -> shuffle it inside
471             # the range (now just reverse the order)
472             new_index = highest - (index - lowest)
473         else:
474             # we are out of the randomized range -> nothing to do
475             new_index = index
476         return new_index
477
478     # Get list of prefixes
479     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
480                         prefix_len=None, prefix_count=None, randomize=None):
481         """Generates list of IP address prefixes.
482
483         Arguments:
484             :param slot_index: index of group of prefix addresses
485             :param slot_size: size of group of prefix addresses
486                 in [number of included prefixes]
487             :param prefix_base: IP address of the first prefix
488                 (slot_index = 0, prefix_index = 0)
489             :param prefix_len: length of the prefix in bites
490                 (the same as size of netmask)
491             :param prefix_count: number of prefixes to be returned
492                 from the specified slot
493         Returns:
494             :return: list of generated IP address prefixes
495         """
496         # default values handling
497         if slot_size is None:
498             slot_size = self.slot_size_default
499         if prefix_base is None:
500             prefix_base = self.prefix_base_default
501         if prefix_len is None:
502             prefix_len = self.prefix_length_default
503         if prefix_count is None:
504             prefix_count = slot_size
505         if randomize is None:
506             randomize = self.randomize_updates_default
507         # generating list of prefixes
508         indexes = []
509         prefixes = []
510         prefix_gap = 2 ** (32 - prefix_len)
511         for i in range(prefix_count):
512             prefix_index = slot_index * slot_size + i
513             if randomize:
514                 prefix_index = self.randomize_index(prefix_index)
515             indexes.append(prefix_index)
516             prefixes.append(prefix_base + prefix_index * prefix_gap)
517         if self.log_debug:
518             logger.debug("  Prefix slot index: " + str(slot_index))
519             logger.debug("  Prefix slot size: " + str(slot_size))
520             logger.debug("  Prefix count: " + str(prefix_count))
521             logger.debug("  Prefix indexes: " + str(indexes))
522             logger.debug("  Prefix list: " + str(prefixes))
523         return prefixes
524
525     def compose_update_message(self, prefix_count_to_add=None,
526                                prefix_count_to_del=None):
527         """Composes an UPDATE message
528
529         Arguments:
530             :param prefix_count_to_add: # of prefixes to put into NLRI list
531             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
532         Returns:
533             :return: encoded UPDATE message in HEX
534         Notes:
535             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
536             lists or common message wich includes both prefix lists.
537             Updates global counters.
538         """
539         # default values handling
540         if prefix_count_to_add is None:
541             prefix_count_to_add = self.prefix_count_to_add_default
542         if prefix_count_to_del is None:
543             prefix_count_to_del = self.prefix_count_to_del_default
544         # logging
545         if self.log_info and not (self.iteration % 1000):
546             logger.info("Iteration: " + str(self.iteration) +
547                         " - total remaining prefixes: " +
548                         str(self.remaining_prefixes))
549         if self.log_debug:
550             logger.debug("#" * 10 + " Iteration: " +
551                          str(self.iteration) + " " + "#" * 10)
552             logger.debug("Remaining prefixes: " +
553                          str(self.remaining_prefixes))
554         # scenario type & one-shot counter
555         straightforward_scenario = (self.remaining_prefixes >
556                                     self.remaining_prefixes_threshold)
557         if straightforward_scenario:
558             prefix_count_to_del = 0
559             if self.log_debug:
560                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
561             if not self.phase1_start_time:
562                 self.phase1_start_time = time.time()
563         else:
564             if self.log_debug:
565                 logger.debug("--- COMBINED SCENARIO ---")
566             if not self.phase2_start_time:
567                 self.phase2_start_time = time.time()
568         # tailor the number of prefixes if needed
569         prefix_count_to_add = (prefix_count_to_del +
570                                min(prefix_count_to_add - prefix_count_to_del,
571                                    self.remaining_prefixes))
572         # prefix slots selection for insertion and withdrawal
573         slot_index_to_add = self.iteration
574         slot_index_to_del = slot_index_to_add - self.slot_gap_default
575         # getting lists of prefixes for insertion in this iteration
576         if self.log_debug:
577             logger.debug("Prefixes to be inserted in this iteration:")
578         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
579                                                   prefix_count=prefix_count_to_add)
580         # getting lists of prefixes for withdrawal in this iteration
581         if self.log_debug:
582             logger.debug("Prefixes to be withdrawn in this iteration:")
583         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
584                                                   prefix_count=prefix_count_to_del)
585         # generating the mesage
586         if self.single_update_default:
587             # Send prefixes to be introduced and withdrawn
588             # in one UPDATE message
589             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
590                                           nlri_prefixes=prefix_list_to_add)
591         else:
592             # Send prefixes to be introduced and withdrawn
593             # in separate UPDATE messages (if needed)
594             msg_out = self.update_message(wr_prefixes=[],
595                                           nlri_prefixes=prefix_list_to_add)
596             if prefix_count_to_del:
597                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
598                                                nlri_prefixes=[])
599         # updating counters - who knows ... maybe I am last time here ;)
600         if straightforward_scenario:
601             self.phase1_stop_time = time.time()
602             self.phase1_updates_sent = self.updates_sent
603         else:
604             self.phase2_stop_time = time.time()
605             self.phase2_updates_sent = (self.updates_sent -
606                                         self.phase1_updates_sent)
607         # updating totals for the next iteration
608         self.iteration += 1
609         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
610         # returning the encoded message
611         return msg_out
612
613     # Section of message encoders
614
615     def open_message(self, version=None, my_autonomous_system=None,
616                      hold_time=None, bgp_identifier=None):
617         """Generates an OPEN Message (rfc4271#section-4.2)
618
619         Arguments:
620             :param version: see the rfc4271#section-4.2
621             :param my_autonomous_system: see the rfc4271#section-4.2
622             :param hold_time: see the rfc4271#section-4.2
623             :param bgp_identifier: see the rfc4271#section-4.2
624         Returns:
625             :return: encoded OPEN message in HEX
626         """
627
628         # Default values handling
629         if version is None:
630             version = self.version_default
631         if my_autonomous_system is None:
632             my_autonomous_system = self.my_autonomous_system_default
633         if hold_time is None:
634             hold_time = self.hold_time_default
635         if bgp_identifier is None:
636             bgp_identifier = self.bgp_identifier_default
637
638         # Marker
639         marker_hex = "\xFF" * 16
640
641         # Type
642         type = 1
643         type_hex = struct.pack("B", type)
644
645         # version
646         version_hex = struct.pack("B", version)
647
648         # my_autonomous_system
649         # AS_TRANS value, 23456 decadic.
650         my_autonomous_system_2_bytes = 23456
651         # AS number is mappable to 2 bytes
652         if my_autonomous_system < 65536:
653             my_autonomous_system_2_bytes = my_autonomous_system
654         my_autonomous_system_hex_2_bytes = struct.pack(">H",
655                                                        my_autonomous_system)
656
657         # Hold Time
658         hold_time_hex = struct.pack(">H", hold_time)
659
660         # BGP Identifier
661         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
662
663         # Optional Parameters
664         optional_parameters_hex = ""
665         if self.rfc4760:
666             optional_parameter_hex = (
667                 "\x02"  # Param type ("Capability Ad")
668                 "\x06"  # Length (6 bytes)
669                 "\x01"  # Capability type (NLRI Unicast),
670                         # see RFC 4760, secton 8
671                 "\x04"  # Capability value length
672                 "\x00\x01"  # AFI (Ipv4)
673                 "\x00"  # (reserved)
674                 "\x01"  # SAFI (Unicast)
675             )
676             optional_parameters_hex += optional_parameter_hex
677
678         optional_parameter_hex = (
679             "\x02"  # Param type ("Capability Ad")
680             "\x06"  # Length (6 bytes)
681             "\x41"  # "32 bit AS Numbers Support"
682                     # (see RFC 6793, section 3)
683             "\x04"  # Capability value length
684                     # My AS in 32 bit format
685             + struct.pack(">I", my_autonomous_system)
686         )
687         optional_parameters_hex += optional_parameter_hex
688
689         # Optional Parameters Length
690         optional_parameters_length = len(optional_parameters_hex)
691         optional_parameters_length_hex = struct.pack("B",
692                                                      optional_parameters_length)
693
694         # Length (big-endian)
695         length = (
696             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
697             len(my_autonomous_system_hex_2_bytes) +
698             len(hold_time_hex) + len(bgp_identifier_hex) +
699             len(optional_parameters_length_hex) +
700             len(optional_parameters_hex)
701         )
702         length_hex = struct.pack(">H", length)
703
704         # OPEN Message
705         message_hex = (
706             marker_hex +
707             length_hex +
708             type_hex +
709             version_hex +
710             my_autonomous_system_hex_2_bytes +
711             hold_time_hex +
712             bgp_identifier_hex +
713             optional_parameters_length_hex +
714             optional_parameters_hex
715         )
716
717         if self.log_debug:
718             logger.debug("OPEN message encoding")
719             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
720             logger.debug("  Length=" + str(length) + " (0x" +
721                          binascii.hexlify(length_hex) + ")")
722             logger.debug("  Type=" + str(type) + " (0x" +
723                          binascii.hexlify(type_hex) + ")")
724             logger.debug("  Version=" + str(version) + " (0x" +
725                          binascii.hexlify(version_hex) + ")")
726             logger.debug("  My Autonomous System=" +
727                          str(my_autonomous_system_2_bytes) + " (0x" +
728                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
729                          ")")
730             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
731                          binascii.hexlify(hold_time_hex) + ")")
732             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
733                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
734             logger.debug("  Optional Parameters Length=" +
735                          str(optional_parameters_length) + " (0x" +
736                          binascii.hexlify(optional_parameters_length_hex) +
737                          ")")
738             logger.debug("  Optional Parameters=0x" +
739                          binascii.hexlify(optional_parameters_hex))
740             logger.debug("OPEN message encoded: 0x%s",
741                          binascii.b2a_hex(message_hex))
742
743         return message_hex
744
745     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
746                        wr_prefix_length=None, nlri_prefix_length=None,
747                        my_autonomous_system=None, next_hop=None):
748         """Generates an UPDATE Message (rfc4271#section-4.3)
749
750         Arguments:
751             :param wr_prefixes: see the rfc4271#section-4.3
752             :param nlri_prefixes: see the rfc4271#section-4.3
753             :param wr_prefix_length: see the rfc4271#section-4.3
754             :param nlri_prefix_length: see the rfc4271#section-4.3
755             :param my_autonomous_system: see the rfc4271#section-4.3
756             :param next_hop: see the rfc4271#section-4.3
757         Returns:
758             :return: encoded UPDATE message in HEX
759         """
760
761         # Default values handling
762         if wr_prefixes is None:
763             wr_prefixes = self.wr_prefixes_default
764         if nlri_prefixes is None:
765             nlri_prefixes = self.nlri_prefixes_default
766         if wr_prefix_length is None:
767             wr_prefix_length = self.prefix_length_default
768         if nlri_prefix_length is None:
769             nlri_prefix_length = self.prefix_length_default
770         if my_autonomous_system is None:
771             my_autonomous_system = self.my_autonomous_system_default
772         if next_hop is None:
773             next_hop = self.next_hop_default
774
775         # Marker
776         marker_hex = "\xFF" * 16
777
778         # Type
779         type = 2
780         type_hex = struct.pack("B", type)
781
782         # Withdrawn Routes
783         bytes = ((wr_prefix_length - 1) / 8) + 1
784         withdrawn_routes_hex = ""
785         for prefix in wr_prefixes:
786             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
787                                    struct.pack(">I", int(prefix))[:bytes])
788             withdrawn_routes_hex += withdrawn_route_hex
789
790         # Withdrawn Routes Length
791         withdrawn_routes_length = len(withdrawn_routes_hex)
792         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
793
794         # TODO: to replace hardcoded string by encoding?
795         # Path Attributes
796         if nlri_prefixes != []:
797             path_attributes_hex = (
798                 "\x40"  # Flags ("Well-Known")
799                 "\x01"  # Type (ORIGIN)
800                 "\x01"  # Length (1)
801                 "\x00"  # Origin: IGP
802                 "\x40"  # Flags ("Well-Known")
803                 "\x02"  # Type (AS_PATH)
804                 "\x06"  # Length (6)
805                 "\x02"  # AS segment type (AS_SEQUENCE)
806                 "\x01"  # AS segment length (1)
807                         # AS segment (4 bytes)
808                 + struct.pack(">I", my_autonomous_system) +
809                 "\x40"  # Flags ("Well-Known")
810                 "\x03"  # Type (NEXT_HOP)
811                 "\x04"  # Length (4)
812                         # IP address of the next hop (4 bytes)
813                 + struct.pack(">I", int(next_hop))
814             )
815         else:
816             path_attributes_hex = ""
817
818         # Total Path Attributes Length
819         total_path_attributes_length = len(path_attributes_hex)
820         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
821
822         # Network Layer Reachability Information
823         bytes = ((nlri_prefix_length - 1) / 8) + 1
824         nlri_hex = ""
825         for prefix in nlri_prefixes:
826             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
827                                struct.pack(">I", int(prefix))[:bytes])
828             nlri_hex += nlri_prefix_hex
829
830         # Length (big-endian)
831         length = (
832             len(marker_hex) + 2 + len(type_hex) +
833             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
834             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
835             len(nlri_hex))
836         length_hex = struct.pack(">H", length)
837
838         # UPDATE Message
839         message_hex = (
840             marker_hex +
841             length_hex +
842             type_hex +
843             withdrawn_routes_length_hex +
844             withdrawn_routes_hex +
845             total_path_attributes_length_hex +
846             path_attributes_hex +
847             nlri_hex
848         )
849
850         if self.log_debug:
851             logger.debug("UPDATE message encoding")
852             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
853             logger.debug("  Length=" + str(length) + " (0x" +
854                          binascii.hexlify(length_hex) + ")")
855             logger.debug("  Type=" + str(type) + " (0x" +
856                          binascii.hexlify(type_hex) + ")")
857             logger.debug("  withdrawn_routes_length=" +
858                          str(withdrawn_routes_length) + " (0x" +
859                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
860             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
861                          str(wr_prefix_length) + " (0x" +
862                          binascii.hexlify(withdrawn_routes_hex) + ")")
863             logger.debug("  Total Path Attributes Length=" +
864                          str(total_path_attributes_length) + " (0x" +
865                          binascii.hexlify(total_path_attributes_length_hex) +
866                          ")")
867             logger.debug("  Path Attributes=" + "(0x" +
868                          binascii.hexlify(path_attributes_hex) + ")")
869             logger.debug("  Network Layer Reachability Information=" +
870                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
871                          " (0x" + binascii.hexlify(nlri_hex) + ")")
872             logger.debug("UPDATE message encoded: 0x" +
873                          binascii.b2a_hex(message_hex))
874
875         # updating counter
876         self.updates_sent += 1
877         # returning encoded message
878         return message_hex
879
880     def notification_message(self, error_code, error_subcode, data_hex=""):
881         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
882
883         Arguments:
884             :param error_code: see the rfc4271#section-4.5
885             :param error_subcode: see the rfc4271#section-4.5
886             :param data_hex: see the rfc4271#section-4.5
887         Returns:
888             :return: encoded NOTIFICATION message in HEX
889         """
890
891         # Marker
892         marker_hex = "\xFF" * 16
893
894         # Type
895         type = 3
896         type_hex = struct.pack("B", type)
897
898         # Error Code
899         error_code_hex = struct.pack("B", error_code)
900
901         # Error Subode
902         error_subcode_hex = struct.pack("B", error_subcode)
903
904         # Length (big-endian)
905         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
906                   len(error_subcode_hex) + len(data_hex))
907         length_hex = struct.pack(">H", length)
908
909         # NOTIFICATION Message
910         message_hex = (
911             marker_hex +
912             length_hex +
913             type_hex +
914             error_code_hex +
915             error_subcode_hex +
916             data_hex
917         )
918
919         if self.log_debug:
920             logger.debug("NOTIFICATION message encoding")
921             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
922             logger.debug("  Length=" + str(length) + " (0x" +
923                          binascii.hexlify(length_hex) + ")")
924             logger.debug("  Type=" + str(type) + " (0x" +
925                          binascii.hexlify(type_hex) + ")")
926             logger.debug("  Error Code=" + str(error_code) + " (0x" +
927                          binascii.hexlify(error_code_hex) + ")")
928             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
929                          binascii.hexlify(error_subcode_hex) + ")")
930             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
931             logger.debug("NOTIFICATION message encoded: 0x%s",
932                          binascii.b2a_hex(message_hex))
933
934         return message_hex
935
936     def keepalive_message(self):
937         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
938
939         Returns:
940             :return: encoded KEEP ALIVE message in HEX
941         """
942
943         # Marker
944         marker_hex = "\xFF" * 16
945
946         # Type
947         type = 4
948         type_hex = struct.pack("B", type)
949
950         # Length (big-endian)
951         length = len(marker_hex) + 2 + len(type_hex)
952         length_hex = struct.pack(">H", length)
953
954         # KEEP ALIVE Message
955         message_hex = (
956             marker_hex +
957             length_hex +
958             type_hex
959         )
960
961         if self.log_debug:
962             logger.debug("KEEP ALIVE message encoding")
963             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
964             logger.debug("  Length=" + str(length) + " (0x" +
965                          binascii.hexlify(length_hex) + ")")
966             logger.debug("  Type=" + str(type) + " (0x" +
967                          binascii.hexlify(type_hex) + ")")
968             logger.debug("KEEP ALIVE message encoded: 0x%s",
969                          binascii.b2a_hex(message_hex))
970
971         return message_hex
972
973
974 class TimeTracker(object):
975     """Class for tracking timers, both for my keepalives and
976     peer's hold time.
977     """
978
979     def __init__(self, msg_in):
980         """Initialisation. based on defaults and OPEN message from peer.
981
982         Arguments:
983             msg_in: the OPEN message received from peer.
984         """
985         # Note: Relative time is always named timedelta, to stress that
986         # the (non-delta) time is absolute.
987         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
988         # Upper bound for being stuck in the same state, we should
989         # at least report something before continuing.
990         # Negotiate the hold timer by taking the smaller
991         # of the 2 values (mine and the peer's).
992         hold_timedelta = 180  # Not an attribute of self yet.
993         # TODO: Make the default value configurable,
994         # default value could mirror what peer said.
995         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
996         if hold_timedelta > peer_hold_timedelta:
997             hold_timedelta = peer_hold_timedelta
998         if hold_timedelta != 0 and hold_timedelta < 3:
999             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1000             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1001         self.hold_timedelta = hold_timedelta
1002         # If we do not hear from peer this long, we assume it has died.
1003         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1004         # Upper limit for duration between messages, to avoid being
1005         # declared to be dead.
1006         # The same as calling snapshot(), but also declares a field.
1007         self.snapshot_time = time.time()
1008         # Sometimes we need to store time. This is where to get
1009         # the value from afterwards. Time_keepalive may be too strict.
1010         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1011         # At this time point, peer will be declared dead.
1012         self.my_keepalive_time = None  # to be set later
1013         # At this point, we should be sending keepalive message.
1014
1015     def snapshot(self):
1016         """Store current time in instance data to use later."""
1017         # Read as time before something interesting was called.
1018         self.snapshot_time = time.time()
1019
1020     def reset_peer_hold_time(self):
1021         """Move hold time to future as peer has just proven it still lives."""
1022         self.peer_hold_time = time.time() + self.hold_timedelta
1023
1024     # Some methods could rely on self.snapshot_time, but it is better
1025     # to require user to provide it explicitly.
1026     def reset_my_keepalive_time(self, keepalive_time):
1027         """Calculate and set the next my KEEP ALIVE timeout time
1028
1029         Arguments:
1030             :keepalive_time: the initial value of the KEEP ALIVE timer
1031         """
1032         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1033
1034     def is_time_for_my_keepalive(self):
1035         """Check for my KEEP ALIVE timeout occurence"""
1036         if self.hold_timedelta == 0:
1037             return False
1038         return self.snapshot_time >= self.my_keepalive_time
1039
1040     def get_next_event_time(self):
1041         """Set the time of the next expected or to be sent KEEP ALIVE"""
1042         if self.hold_timedelta == 0:
1043             return self.snapshot_time + 86400
1044         return min(self.my_keepalive_time, self.peer_hold_time)
1045
1046     def check_peer_hold_time(self, snapshot_time):
1047         """Raise error if nothing was read from peer until specified time."""
1048         # Hold time = 0 means keepalive checking off.
1049         if self.hold_timedelta != 0:
1050             # time.time() may be too strict
1051             if snapshot_time > self.peer_hold_time:
1052                 logger.error("Peer has overstepped the hold timer.")
1053                 raise RuntimeError("Peer has overstepped the hold timer.")
1054                 # TODO: Include hold_timedelta?
1055                 # TODO: Add notification sending (attempt). That means
1056                 # move to write tracker.
1057
1058
1059 class ReadTracker(object):
1060     """Class for tracking read of mesages chunk by chunk and
1061     for idle waiting.
1062     """
1063
1064     def __init__(self, bgp_socket, timer):
1065         """The reader initialisation.
1066
1067         Arguments:
1068             bgp_socket: socket to be used for sending
1069             timer: timer to be used for scheduling
1070         """
1071         # References to outside objects.
1072         self.socket = bgp_socket
1073         self.timer = timer
1074         # BGP marker length plus length field length.
1075         self.header_length = 18
1076         # TODO: make it class (constant) attribute
1077         # Computation of where next chunk ends depends on whether
1078         # we are beyond length field.
1079         self.reading_header = True
1080         # Countdown towards next size computation.
1081         self.bytes_to_read = self.header_length
1082         # Incremental buffer for message under read.
1083         self.msg_in = ""
1084         # Initialising counters
1085         self.updates_received = 0
1086         self.prefixes_introduced = 0
1087         self.prefixes_withdrawn = 0
1088         self.rx_idle_time = 0
1089         self.rx_activity_detected = True
1090
1091     def read_message_chunk(self):
1092         """Read up to one message
1093
1094         Note:
1095             Currently it does not return anything.
1096         """
1097         # TODO: We could return the whole message, currently not needed.
1098         # We assume the socket is readable.
1099         chunk_message = self.socket.recv(self.bytes_to_read)
1100         self.msg_in += chunk_message
1101         self.bytes_to_read -= len(chunk_message)
1102         # TODO: bytes_to_read < 0 is not possible, right?
1103         if not self.bytes_to_read:
1104             # Finished reading a logical block.
1105             if self.reading_header:
1106                 # The logical block was a BGP header.
1107                 # Now we know the size of the message.
1108                 self.reading_header = False
1109                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1110                                       self.header_length)
1111             else:  # We have finished reading the body of the message.
1112                 # Peer has just proven it is still alive.
1113                 self.timer.reset_peer_hold_time()
1114                 # TODO: Do we want to count received messages?
1115                 # This version ignores the received message.
1116                 # TODO: Should we do validation and exit on anything
1117                 # besides update or keepalive?
1118                 # Prepare state for reading another message.
1119                 message_type_hex = self.msg_in[self.header_length]
1120                 if message_type_hex == "\x01":
1121                     logger.info("OPEN message received: 0x%s",
1122                                 binascii.b2a_hex(self.msg_in))
1123                 elif message_type_hex == "\x02":
1124                     logger.debug("UPDATE message received: 0x%s",
1125                                  binascii.b2a_hex(self.msg_in))
1126                     self.decode_update_message(self.msg_in)
1127                 elif message_type_hex == "\x03":
1128                     logger.info("NOTIFICATION message received: 0x%s",
1129                                 binascii.b2a_hex(self.msg_in))
1130                 elif message_type_hex == "\x04":
1131                     logger.info("KEEP ALIVE message received: 0x%s",
1132                                 binascii.b2a_hex(self.msg_in))
1133                 else:
1134                     logger.warning("Unexpected message received: 0x%s",
1135                                    binascii.b2a_hex(self.msg_in))
1136                 self.msg_in = ""
1137                 self.reading_header = True
1138                 self.bytes_to_read = self.header_length
1139         # We should not act upon peer_hold_time if we are reading
1140         # something right now.
1141         return
1142
1143     def decode_path_attributes(self, path_attributes_hex):
1144         """Decode the Path Attributes field (rfc4271#section-4.3)
1145
1146         Arguments:
1147             :path_attributes: path_attributes field to be decoded in hex
1148         Returns:
1149             :return: None
1150         """
1151         hex_to_decode = path_attributes_hex
1152
1153         while len(hex_to_decode):
1154             attr_flags_hex = hex_to_decode[0]
1155             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1156 #            attr_optional_bit = attr_flags & 128
1157 #            attr_transitive_bit = attr_flags & 64
1158 #            attr_partial_bit = attr_flags & 32
1159             attr_extended_length_bit = attr_flags & 16
1160
1161             attr_type_code_hex = hex_to_decode[1]
1162             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1163
1164             if attr_extended_length_bit:
1165                 attr_length_hex = hex_to_decode[2:4]
1166                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1167                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1168                 hex_to_decode = hex_to_decode[4 + attr_length:]
1169             else:
1170                 attr_length_hex = hex_to_decode[2]
1171                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1172                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1173                 hex_to_decode = hex_to_decode[3 + attr_length:]
1174
1175             if attr_type_code == 1:
1176                 logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
1177                              binascii.b2a_hex(attr_flags_hex))
1178                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1179             elif attr_type_code == 2:
1180                 logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
1181                              binascii.b2a_hex(attr_flags_hex))
1182                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1183             elif attr_type_code == 3:
1184                 logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
1185                              binascii.b2a_hex(attr_flags_hex))
1186                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1187             elif attr_type_code == 4:
1188                 logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
1189                              binascii.b2a_hex(attr_flags_hex))
1190                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1191             elif attr_type_code == 5:
1192                 logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
1193                              binascii.b2a_hex(attr_flags_hex))
1194                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1195             elif attr_type_code == 6:
1196                 logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
1197                              binascii.b2a_hex(attr_flags_hex))
1198                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1199             elif attr_type_code == 7:
1200                 logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
1201                              binascii.b2a_hex(attr_flags_hex))
1202                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1203             elif attr_type_code == 14:  # rfc4760#section-3
1204                 logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
1205                              binascii.b2a_hex(attr_flags_hex))
1206                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1207                 address_family_identifier_hex = attr_value_hex[0:2]
1208                 logger.debug("  Address Family Identifier = 0x%s",
1209                              binascii.b2a_hex(address_family_identifier_hex))
1210                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1211                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1212                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1213                 next_hop_netaddr_len_hex = attr_value_hex[3]
1214                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1215                 logger.debug("  Length of Next Hop Network Address = 0x%s (%s)",
1216                              binascii.b2a_hex(next_hop_netaddr_len_hex),
1217                              next_hop_netaddr_len)
1218                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1219                 logger.debug("  Network Address of Next Hop = 0x%s",
1220                              binascii.b2a_hex(next_hop_netaddr_hex))
1221                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1222                 logger.debug("  Reserved = 0x%s",
1223                              binascii.b2a_hex(reserved_hex))
1224                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1225                 logger.debug("  Network Layer Reachability Information = 0x%s",
1226                              binascii.b2a_hex(nlri_hex))
1227                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1228                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1229                 for prefix in nlri_prefix_list:
1230                     logger.debug("  nlri_prefix_received: %s", prefix)
1231                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1232             elif attr_type_code == 15:  # rfc4760#section-4
1233                 logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
1234                              binascii.b2a_hex(attr_flags_hex))
1235                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1236                 address_family_identifier_hex = attr_value_hex[0:2]
1237                 logger.debug("  Address Family Identifier = 0x%s",
1238                              binascii.b2a_hex(address_family_identifier_hex))
1239                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1240                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1241                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1242                 wd_hex = attr_value_hex[3:]
1243                 logger.debug("  Withdrawn Routes = 0x%s",
1244                              binascii.b2a_hex(wd_hex))
1245                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1246                 logger.debug("  Withdrawn routes prefix list: %s",
1247                              wdr_prefix_list)
1248                 for prefix in wdr_prefix_list:
1249                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1250                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1251             else:
1252                 logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
1253                              binascii.b2a_hex(attr_flags_hex))
1254                 logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1255         return None
1256
1257     def decode_update_message(self, msg):
1258         """Decode an UPDATE message (rfc4271#section-4.3)
1259
1260         Arguments:
1261             :msg: message to be decoded in hex
1262         Returns:
1263             :return: None
1264         """
1265         logger.debug("Decoding update message:")
1266         # message header - marker
1267         marker_hex = msg[:16]
1268         logger.debug("Message header marker: 0x%s",
1269                      binascii.b2a_hex(marker_hex))
1270         # message header - message length
1271         msg_length_hex = msg[16:18]
1272         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1273         logger.debug("Message lenght: 0x%s (%s)",
1274                      binascii.b2a_hex(msg_length_hex), msg_length)
1275         # message header - message type
1276         msg_type_hex = msg[18:19]
1277         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1278         if msg_type == 2:
1279             logger.debug("Message type: 0x%s (update)",
1280                          binascii.b2a_hex(msg_type_hex))
1281             # withdrawn routes length
1282             wdr_length_hex = msg[19:21]
1283             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1284             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1285                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1286             # withdrawn routes
1287             wdr_hex = msg[21:21 + wdr_length]
1288             logger.debug("Withdrawn routes: 0x%s",
1289                          binascii.b2a_hex(wdr_hex))
1290             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1291             logger.debug("Withdrawn routes prefix list: %s",
1292                          wdr_prefix_list)
1293             for prefix in wdr_prefix_list:
1294                 logger.debug("withdrawn_prefix_received: %s", prefix)
1295             # total path attribute length
1296             total_pa_length_offset = 21 + wdr_length
1297             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1298             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1299             logger.debug("Total path attribute lenght: 0x%s (%s)",
1300                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1301             # path attributes
1302             pa_offset = total_pa_length_offset + 2
1303             pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1304             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1305             self.decode_path_attributes(pa_hex)
1306             # network layer reachability information length
1307             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1308             logger.debug("Calculated NLRI length: %s", nlri_length)
1309             # network layer reachability information
1310             nlri_offset = pa_offset + total_pa_length
1311             nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1312             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1313             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1314             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1315             for prefix in nlri_prefix_list:
1316                 logger.debug("nlri_prefix_received: %s", prefix)
1317             # Updating counters
1318             self.updates_received += 1
1319             self.prefixes_introduced += len(nlri_prefix_list)
1320             self.prefixes_withdrawn += len(wdr_prefix_list)
1321         else:
1322             logger.error("Unexpeced message type 0x%s in 0x%s",
1323                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1324
1325     def wait_for_read(self):
1326         """Read message until timeout (next expected event).
1327
1328         Note:
1329             Used when no more updates has to be sent to avoid busy-wait.
1330             Currently it does not return anything.
1331         """
1332         # Compute time to the first predictable state change
1333         event_time = self.timer.get_next_event_time()
1334         # snapshot_time would be imprecise
1335         wait_timedelta = min(event_time - time.time(), 10)
1336         if wait_timedelta < 0:
1337             # The program got around to waiting to an event in "very near
1338             # future" so late that it became a "past" event, thus tell
1339             # "select" to not wait at all. Passing negative timedelta to
1340             # select() would lead to either waiting forever (for -1) or
1341             # select.error("Invalid parameter") (for everything else).
1342             wait_timedelta = 0
1343         # And wait for event or something to read.
1344
1345         if not self.rx_activity_detected or not (self.updates_received % 100):
1346             # right time to write statistics to the log (not for every update and
1347             # not too frequently to avoid having large log files)
1348             logger.info("total_received_update_message_counter: %s",
1349                         self.updates_received)
1350             logger.info("total_received_nlri_prefix_counter: %s",
1351                         self.prefixes_introduced)
1352             logger.info("total_received_withdrawn_prefix_counter: %s",
1353                         self.prefixes_withdrawn)
1354
1355         start_time = time.time()
1356         select.select([self.socket], [], [self.socket], wait_timedelta)
1357         timedelta = time.time() - start_time
1358         self.rx_idle_time += timedelta
1359         self.rx_activity_detected = timedelta < 1
1360
1361         if not self.rx_activity_detected or not (self.updates_received % 100):
1362             # right time to write statistics to the log (not for every update and
1363             # not too frequently to avoid having large log files)
1364             logger.info("... idle for %.3fs", timedelta)
1365             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1366         return
1367
1368
1369 class WriteTracker(object):
1370     """Class tracking enqueueing messages and sending chunks of them."""
1371
1372     def __init__(self, bgp_socket, generator, timer):
1373         """The writter initialisation.
1374
1375         Arguments:
1376             bgp_socket: socket to be used for sending
1377             generator: generator to be used for message generation
1378             timer: timer to be used for scheduling
1379         """
1380         # References to outside objects,
1381         self.socket = bgp_socket
1382         self.generator = generator
1383         self.timer = timer
1384         # Really new fields.
1385         # TODO: Would attribute docstrings add anything substantial?
1386         self.sending_message = False
1387         self.bytes_to_send = 0
1388         self.msg_out = ""
1389
1390     def enqueue_message_for_sending(self, message):
1391         """Enqueue message and change state.
1392
1393         Arguments:
1394             message: message to be enqueued into the msg_out buffer
1395         """
1396         self.msg_out += message
1397         self.bytes_to_send += len(message)
1398         self.sending_message = True
1399
1400     def send_message_chunk_is_whole(self):
1401         """Send enqueued data from msg_out buffer
1402
1403         Returns:
1404             :return: true if no remaining data to send
1405         """
1406         # We assume there is a msg_out to send and socket is writable.
1407         # print "going to send", repr(self.msg_out)
1408         self.timer.snapshot()
1409         bytes_sent = self.socket.send(self.msg_out)
1410         # Forget the part of message that was sent.
1411         self.msg_out = self.msg_out[bytes_sent:]
1412         self.bytes_to_send -= bytes_sent
1413         if not self.bytes_to_send:
1414             # TODO: Is it possible to hit negative bytes_to_send?
1415             self.sending_message = False
1416             # We should have reset hold timer on peer side.
1417             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1418             # The possible reason for not prioritizing reads is gone.
1419             return True
1420         return False
1421
1422
1423 class StateTracker(object):
1424     """Main loop has state so complex it warrants this separate class."""
1425
1426     def __init__(self, bgp_socket, generator, timer):
1427         """The state tracker initialisation.
1428
1429         Arguments:
1430             bgp_socket: socket to be used for sending / receiving
1431             generator: generator to be used for message generation
1432             timer: timer to be used for scheduling
1433         """
1434         # References to outside objects.
1435         self.socket = bgp_socket
1436         self.generator = generator
1437         self.timer = timer
1438         # Sub-trackers.
1439         self.reader = ReadTracker(bgp_socket, timer)
1440         self.writer = WriteTracker(bgp_socket, generator, timer)
1441         # Prioritization state.
1442         self.prioritize_writing = False
1443         # In general, we prioritize reading over writing. But in order
1444         # not to get blocked by neverending reads, we should
1445         # check whether we are not risking running out of holdtime.
1446         # So in some situations, this field is set to True to attempt
1447         # finishing sending a message, after which this field resets
1448         # back to False.
1449         # TODO: Alternative is to switch fairly between reading and
1450         # writing (called round robin from now on).
1451         # Message counting is done in generator.
1452
1453     def perform_one_loop_iteration(self):
1454         """ The main loop iteration
1455
1456         Notes:
1457             Calculates priority, resolves all conditions, calls
1458             appropriate method and returns to caller to repeat.
1459         """
1460         self.timer.snapshot()
1461         if not self.prioritize_writing:
1462             if self.timer.is_time_for_my_keepalive():
1463                 if not self.writer.sending_message:
1464                     # We need to schedule a keepalive ASAP.
1465                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1466                     logger.info("KEEP ALIVE is sent.")
1467                 # We are sending a message now, so let's prioritize it.
1468                 self.prioritize_writing = True
1469         # Now we know what our priorities are, we have to check
1470         # which actions are available.
1471         # socket.socket() returns three lists,
1472         # we store them to list of lists.
1473         list_list = select.select([self.socket], [self.socket], [self.socket],
1474                                   self.timer.report_timedelta)
1475         read_list, write_list, except_list = list_list
1476         # Lists are unpacked, each is either [] or [self.socket],
1477         # so we will test them as boolean.
1478         if except_list:
1479             logger.error("Exceptional state on the socket.")
1480             raise RuntimeError("Exceptional state on socket", self.socket)
1481         # We will do either read or write.
1482         if not (self.prioritize_writing and write_list):
1483             # Either we have no reason to rush writes,
1484             # or the socket is not writable.
1485             # We are focusing on reading here.
1486             if read_list:  # there is something to read indeed
1487                 # In this case we want to read chunk of message
1488                 # and repeat the select,
1489                 self.reader.read_message_chunk()
1490                 return
1491             # We were focusing on reading, but nothing to read was there.
1492             # Good time to check peer for hold timer.
1493             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1494             # Quiet on the read front, we can have attempt to write.
1495         if write_list:
1496             # Either we really want to reset peer's view of our hold
1497             # timer, or there was nothing to read.
1498             # Were we in the middle of sending a message?
1499             if self.writer.sending_message:
1500                 # Was it the end of a message?
1501                 whole = self.writer.send_message_chunk_is_whole()
1502                 # We were pressed to send something and we did it.
1503                 if self.prioritize_writing and whole:
1504                     # We prioritize reading again.
1505                     self.prioritize_writing = False
1506                 return
1507             # Finally to check if still update messages to be generated.
1508             if self.generator.remaining_prefixes:
1509                 msg_out = self.generator.compose_update_message()
1510                 if not self.generator.remaining_prefixes:
1511                     # We have just finished update generation,
1512                     # end-of-rib is due.
1513                     logger.info("All update messages generated.")
1514                     logger.info("Storing performance results.")
1515                     self.generator.store_results()
1516                     logger.info("Finally an END-OF-RIB is sent.")
1517                     msg_out += self.generator.update_message(wr_prefixes=[],
1518                                                              nlri_prefixes=[])
1519                 self.writer.enqueue_message_for_sending(msg_out)
1520                 # Attempt for real sending to be done in next iteration.
1521                 return
1522             # Nothing to write anymore.
1523             # To avoid busy loop, we do idle waiting here.
1524             self.reader.wait_for_read()
1525             return
1526         # We can neither read nor write.
1527         logger.warning("Input and output both blocked for " +
1528                        str(self.timer.report_timedelta) + " seconds.")
1529         # FIXME: Are we sure select has been really waiting
1530         # the whole period?
1531         return
1532
1533
1534 if __name__ == "__main__":
1535     """ One time initialisation and iterations looping.
1536
1537     Notes:
1538         Establish BGP connection and run iterations.
1539     """
1540     arguments = parse_arguments()
1541     logger = logging.getLogger("logger")
1542     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1543     console_handler = logging.StreamHandler()
1544     file_handler = logging.FileHandler(arguments.logfile, mode="w")
1545     console_handler.setFormatter(log_formatter)
1546     file_handler.setFormatter(log_formatter)
1547     logger.addHandler(console_handler)
1548     logger.addHandler(file_handler)
1549     logger.setLevel(arguments.loglevel)
1550     bgp_socket = establish_connection(arguments)
1551     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1552     # Receive open message before sending anything.
1553     # FIXME: Add parameter to send default open message first,
1554     # to work with "you first" peers.
1555     msg_in = read_open_message(bgp_socket)
1556     timer = TimeTracker(msg_in)
1557     generator = MessageGenerator(arguments)
1558     msg_out = generator.open_message()
1559     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1560     # Send our open message to the peer.
1561     bgp_socket.send(msg_out)
1562     # Wait for confirming keepalive.
1563     # TODO: Surely in just one packet?
1564     # Using exact keepalive length to not to see possible updates.
1565     msg_in = bgp_socket.recv(19)
1566     if msg_in != generator.keepalive_message():
1567         logger.error("Open not confirmed by keepalive, instead got " +
1568                      binascii.hexlify(msg_in))
1569         raise MessageError("Open not confirmed by keepalive, instead got",
1570                            msg_in)
1571     timer.reset_peer_hold_time()
1572     # Send the keepalive to indicate the connection is accepted.
1573     timer.snapshot()  # Remember this time.
1574     msg_out = generator.keepalive_message()
1575     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1576     bgp_socket.send(msg_out)
1577     # Use the remembered time.
1578     timer.reset_my_keepalive_time(timer.snapshot_time)
1579     # End of initial handshake phase.
1580     state = StateTracker(bgp_socket, generator, timer)
1581     while True:  # main reactor loop
1582         state.perform_one_loop_iteration()