170f8acd32c509f4189caea2594146e31e989294
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 import argparse
15 import binascii
16 import ipaddr
17 import select
18 import socket
19 import time
20 import logging
21 import struct
22
23 import thread
24 from copy import deepcopy
25
26
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
31
32
33 def parse_arguments():
34     """Use argparse to get arguments,
35
36     Returns:
37         :return: args object.
38     """
39     parser = argparse.ArgumentParser()
40     # TODO: Should we use --argument-names-with-spaces?
41     str_help = "Autonomous System number use in the stream."
42     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43     # FIXME: We are acting as iBGP peer,
44     # we should mirror AS number from peer's open message.
45     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46     parser.add_argument("--amount", default="1", type=int, help=str_help)
47     str_help = "Maximum number of IP prefixes to be announced in one iteration"
48     parser.add_argument("--insert", default="1", type=int, help=str_help)
49     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51     str_help = "The number of prefixes to process without withdrawals"
52     parser.add_argument("--prefill", default="0", type=int, help=str_help)
53     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54     parser.add_argument("--updates", choices=["single", "separate"],
55                         default=["separate"], help=str_help)
56     str_help = "Base prefix IP address for prefix generation"
57     parser.add_argument("--firstprefix", default="8.0.1.0",
58                         type=ipaddr.IPv4Address, help=str_help)
59     str_help = "The prefix length."
60     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61     str_help = "Listen for connection, instead of initiating it."
62     parser.add_argument("--listen", action="store_true", help=str_help)
63     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64                 "Default value only suitable for listening.")
65     parser.add_argument("--myip", default="0.0.0.0",
66                         type=ipaddr.IPv4Address, help=str_help)
67     str_help = ("TCP port to bind to when listening or initiating connection." +
68                 "Default only suitable for initiating.")
69     parser.add_argument("--myport", default="0", type=int, help=str_help)
70     str_help = "The IP of the next hop to be placed into the update messages."
71     parser.add_argument("--nexthop", default="192.0.2.1",
72                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73     str_help = "Identifier of the route originator."
74     parser.add_argument("--originator", default=None,
75                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
76     str_help = "Cluster list item identifier."
77     parser.add_argument("--cluster", default=None,
78                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79     str_help = ("Numeric IP Address to try to connect to." +
80                 "Currently no effect in listening mode.")
81     parser.add_argument("--peerip", default="127.0.0.2",
82                         type=ipaddr.IPv4Address, help=str_help)
83     str_help = "TCP port to try to connect to. No effect in listening mode."
84     parser.add_argument("--peerport", default="179", type=int, help=str_help)
85     str_help = "Local hold time."
86     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87     str_help = "Log level (--error, --warning, --info, --debug)"
88     parser.add_argument("--error", dest="loglevel", action="store_const",
89                         const=logging.ERROR, default=logging.INFO,
90                         help=str_help)
91     parser.add_argument("--warning", dest="loglevel", action="store_const",
92                         const=logging.WARNING, default=logging.INFO,
93                         help=str_help)
94     parser.add_argument("--info", dest="loglevel", action="store_const",
95                         const=logging.INFO, default=logging.INFO,
96                         help=str_help)
97     parser.add_argument("--debug", dest="loglevel", action="store_const",
98                         const=logging.DEBUG, default=logging.INFO,
99                         help=str_help)
100     str_help = "Log file name"
101     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102     str_help = "Trailing part of the csv result files for plotting purposes"
103     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104     str_help = "Minimum number of updates to reach to include result into csv."
105     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
108     str_help = "How many play utilities are to be started."
109     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
110     arguments = parser.parse_args()
111     if arguments.multiplicity < 1:
112         print "Multiplicity", arguments.multiplicity, "is not positive."
113         raise SystemExit(1)
114     # TODO: Are sanity checks (such as asnumber>=0) required?
115     return arguments
116
117
118 def establish_connection(arguments):
119     """Establish connection to BGP peer.
120
121     Arguments:
122         :arguments: following command-line argumets are used
123             - arguments.myip: local IP address
124             - arguments.myport: local port
125             - arguments.peerip: remote IP address
126             - arguments.peerport: remote port
127     Returns:
128         :return: socket.
129     """
130     if arguments.listen:
131         logger.info("Connecting in the listening mode.")
132         logger.debug("Local IP address: " + str(arguments.myip))
133         logger.debug("Local port: " + str(arguments.myport))
134         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
135         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
136         # bind need single tuple as argument
137         listening_socket.bind((str(arguments.myip), arguments.myport))
138         listening_socket.listen(1)
139         bgp_socket, _ = listening_socket.accept()
140         # TODO: Verify client IP is cotroller IP.
141         listening_socket.close()
142     else:
143         logger.info("Connecting in the talking mode.")
144         logger.debug("Local IP address: " + str(arguments.myip))
145         logger.debug("Local port: " + str(arguments.myport))
146         logger.debug("Remote IP address: " + str(arguments.peerip))
147         logger.debug("Remote port: " + str(arguments.peerport))
148         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
150         # bind to force specified address and port
151         talking_socket.bind((str(arguments.myip), arguments.myport))
152         # socket does not spead ipaddr, hence str()
153         talking_socket.connect((str(arguments.peerip), arguments.peerport))
154         bgp_socket = talking_socket
155     logger.info("Connected to ODL.")
156     return bgp_socket
157
158
159 def get_short_int_from_message(message, offset=16):
160     """Extract 2-bytes number from provided message.
161
162     Arguments:
163         :message: given message
164         :offset: offset of the short_int inside the message
165     Returns:
166         :return: required short_inf value.
167     Notes:
168         default offset value is the BGP message size offset.
169     """
170     high_byte_int = ord(message[offset])
171     low_byte_int = ord(message[offset + 1])
172     short_int = high_byte_int * 256 + low_byte_int
173     return short_int
174
175
176 def get_prefix_list_from_hex(prefixes_hex):
177     """Get decoded list of prefixes (rfc4271#section-4.3)
178
179     Arguments:
180         :prefixes_hex: list of prefixes to be decoded in hex
181     Returns:
182         :return: list of prefixes in the form of ip address (X.X.X.X/X)
183     """
184     prefix_list = []
185     offset = 0
186     while offset < len(prefixes_hex):
187         prefix_bit_len_hex = prefixes_hex[offset]
188         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
189         prefix_len = ((prefix_bit_len - 1) / 8) + 1
190         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
191         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
192         offset += 1 + prefix_len
193         prefix_list.append(prefix + "/" + str(prefix_bit_len))
194     return prefix_list
195
196
197 class MessageError(ValueError):
198     """Value error with logging optimized for hexlified messages."""
199
200     def __init__(self, text, message, *args):
201         """Initialisation.
202
203         Store and call super init for textual comment,
204         store raw message which caused it.
205         """
206         self.text = text
207         self.msg = message
208         super(MessageError, self).__init__(text, message, *args)
209
210     def __str__(self):
211         """Generate human readable error message.
212
213         Returns:
214             :return: human readable message as string
215         Notes:
216             Use a placeholder string if the message is to be empty.
217         """
218         message = binascii.hexlify(self.msg)
219         if message == "":
220             message = "(empty message)"
221         return self.text + ": " + message
222
223
224 def read_open_message(bgp_socket):
225     """Receive peer's OPEN message
226
227     Arguments:
228         :bgp_socket: the socket to be read
229     Returns:
230         :return: received OPEN message.
231     Notes:
232         Performs just basic incomming message checks
233     """
234     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
235     # TODO: Can the incoming open message be split in more than one packet?
236     # Some validation.
237     if len(msg_in) < 37:
238         # 37 is minimal length of open message with 4-byte AS number.
239         error_msg = (
240             "Message length (" + str(len(msg_in)) + ") is smaller than "
241             "minimal length of OPEN message with 4-byte AS number (37)"
242         )
243         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
244         raise MessageError(error_msg, msg_in)
245     # TODO: We could check BGP marker, but it is defined only later;
246     # decide what to do.
247     reported_length = get_short_int_from_message(msg_in)
248     if len(msg_in) != reported_length:
249         error_msg = (
250             "Expected message length (" + reported_length +
251             ") does not match actual length (" + str(len(msg_in)) + ")"
252         )
253         logger.error(error_msg + binascii.hexlify(msg_in))
254         raise MessageError(error_msg, msg_in)
255     logger.info("Open message received.")
256     return msg_in
257
258
259 class MessageGenerator(object):
260     """Class which generates messages, holds states and configuration values."""
261
262     # TODO: Define bgp marker as a class (constant) variable.
263     def __init__(self, args):
264         """Initialisation according to command-line args.
265
266         Arguments:
267             :args: argsparser's Namespace object which contains command-line
268                 options for MesageGenerator initialisation
269         Notes:
270             Calculates and stores default values used later on for
271             message geeration.
272         """
273         self.total_prefix_amount = args.amount
274         # Number of update messages left to be sent.
275         self.remaining_prefixes = self.total_prefix_amount
276
277         # New parameters initialisation
278         self.iteration = 0
279         self.prefix_base_default = args.firstprefix
280         self.prefix_length_default = args.prefixlen
281         self.wr_prefixes_default = []
282         self.nlri_prefixes_default = []
283         self.version_default = 4
284         self.my_autonomous_system_default = args.asnumber
285         self.hold_time_default = args.holdtime  # Local hold time.
286         self.bgp_identifier_default = int(args.myip)
287         self.next_hop_default = args.nexthop
288         self.originator_id_default = args.originator
289         self.cluster_list_item_default = args.cluster
290         self.single_update_default = args.updates == "single"
291         self.randomize_updates_default = args.updates == "random"
292         self.prefix_count_to_add_default = args.insert
293         self.prefix_count_to_del_default = args.withdraw
294         if self.prefix_count_to_del_default < 0:
295             self.prefix_count_to_del_default = 0
296         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
297             # total number of prefixes must grow to avoid infinite test loop
298             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
299         self.slot_size_default = self.prefix_count_to_add_default
300         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
301         self.results_file_name_default = args.results
302         self.performance_threshold_default = args.threshold
303         self.rfc4760 = args.rfc4760 == "yes"
304         # Default values used for randomized part
305         s1_slots = ((self.total_prefix_amount -
306                      self.remaining_prefixes_threshold - 1) /
307                     self.prefix_count_to_add_default + 1)
308         s2_slots = ((self.remaining_prefixes_threshold - 1) /
309                     (self.prefix_count_to_add_default -
310                     self.prefix_count_to_del_default) + 1)
311         # S1_First_Index = 0
312         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
313         s2_first_index = s1_slots * self.prefix_count_to_add_default
314         s2_last_index = (s2_first_index +
315                          s2_slots * (self.prefix_count_to_add_default -
316                                      self.prefix_count_to_del_default) - 1)
317         self.slot_gap_default = ((self.total_prefix_amount -
318                                   self.remaining_prefixes_threshold - 1) /
319                                  self.prefix_count_to_add_default + 1)
320         self.randomize_lowest_default = s2_first_index
321         self.randomize_highest_default = s2_last_index
322
323         # Initialising counters
324         self.phase1_start_time = 0
325         self.phase1_stop_time = 0
326         self.phase2_start_time = 0
327         self.phase2_stop_time = 0
328         self.phase1_updates_sent = 0
329         self.phase2_updates_sent = 0
330         self.updates_sent = 0
331
332         self.log_info = args.loglevel <= logging.INFO
333         self.log_debug = args.loglevel <= logging.DEBUG
334         """
335         Flags needed for the MessageGenerator performance optimization.
336         Calling logger methods each iteration even with proper log level set
337         slows down significantly the MessageGenerator performance.
338         Measured total generation time (1M updates, dry run, error log level):
339         - logging based on basic logger features: 36,2s
340         - logging based on advanced logger features (lazy logging): 21,2s
341         - conditional calling of logger methods enclosed inside condition: 8,6s
342         """
343
344         logger.info("Generator initialisation")
345         logger.info("  Target total number of prefixes to be introduced: " +
346                     str(self.total_prefix_amount))
347         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
348                     str(self.prefix_length_default))
349         logger.info("  My Autonomous System number: " +
350                     str(self.my_autonomous_system_default))
351         logger.info("  My Hold Time: " + str(self.hold_time_default))
352         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
353         logger.info("  Next Hop: " + str(self.next_hop_default))
354         logger.info("  Originator ID: " + str(self.originator_id_default))
355         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
356         logger.info("  Prefix count to be inserted at once: " +
357                     str(self.prefix_count_to_add_default))
358         logger.info("  Prefix count to be withdrawn at once: " +
359                     str(self.prefix_count_to_del_default))
360         logger.info("  Fast pre-fill up to " +
361                     str(self.total_prefix_amount -
362                         self.remaining_prefixes_threshold) + " prefixes")
363         logger.info("  Remaining number of prefixes to be processed " +
364                     "in parallel with withdrawals: " +
365                     str(self.remaining_prefixes_threshold))
366         logger.debug("  Prefix index range used after pre-fill procedure [" +
367                      str(self.randomize_lowest_default) + ", " +
368                      str(self.randomize_highest_default) + "]")
369         if self.single_update_default:
370             logger.info("  Common single UPDATE will be generated " +
371                         "for both NLRI & WITHDRAWN lists")
372         else:
373             logger.info("  Two separate UPDATEs will be generated " +
374                         "for each NLRI & WITHDRAWN lists")
375         if self.randomize_updates_default:
376             logger.info("  Generation of UPDATE messages will be randomized")
377         logger.info("  Let\'s go ...\n")
378
379         # TODO: Notification for hold timer expiration can be handy.
380
381     def store_results(self, file_name=None, threshold=None):
382         """ Stores specified results into files based on file_name value.
383
384         Arguments:
385             :param file_name: Trailing (common) part of result file names
386             :param threshold: Minimum number of sent updates needed for each
387                               result to be included into result csv file
388                               (mainly needed because of the result accuracy)
389         Returns:
390             :return: n/a
391         """
392         # default values handling
393         # TODO optimize default values handling (use e.g. dicionary.update() approach)
394         if file_name is None:
395             file_name = self.results_file_name_default
396         if threshold is None:
397             threshold = self.performance_threshold_default
398         # performance calculation
399         if self.phase1_updates_sent >= threshold:
400             totals1 = self.phase1_updates_sent
401             performance1 = int(self.phase1_updates_sent /
402                                (self.phase1_stop_time - self.phase1_start_time))
403         else:
404             totals1 = None
405             performance1 = None
406         if self.phase2_updates_sent >= threshold:
407             totals2 = self.phase2_updates_sent
408             performance2 = int(self.phase2_updates_sent /
409                                (self.phase2_stop_time - self.phase2_start_time))
410         else:
411             totals2 = None
412             performance2 = None
413
414         logger.info("#" * 10 + " Final results " + "#" * 10)
415         logger.info("Number of iterations: " + str(self.iteration))
416         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
417                     str(self.phase1_updates_sent))
418         logger.info("The pre-fill phase duration: " +
419                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
420         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
421                     str(self.phase2_updates_sent))
422         logger.info("The 2nd test phase duration: " +
423                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
424         logger.info("Threshold for performance reporting: " + str(threshold))
425
426         # making labels
427         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
428                         " route(s) per UPDATE")
429         if self.single_update_default:
430             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
431                                   "/-" + str(self.prefix_count_to_del_default) +
432                                   " routes per UPDATE")
433         else:
434             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
435                                   "/-" + str(self.prefix_count_to_del_default) +
436                                   " routes in two UPDATEs")
437         # collecting capacity and performance results
438         totals = {}
439         performance = {}
440         if totals1 is not None:
441             totals[phase1_label] = totals1
442             performance[phase1_label] = performance1
443         if totals2 is not None:
444             totals[phase2_label] = totals2
445             performance[phase2_label] = performance2
446         self.write_results_to_file(totals, "totals-" + file_name)
447         self.write_results_to_file(performance, "performance-" + file_name)
448
449     def write_results_to_file(self, results, file_name):
450         """Writes results to the csv plot file consumable by Jenkins.
451
452         Arguments:
453             :param file_name: Name of the (csv) file to be created
454         Returns:
455             :return: none
456         """
457         first_line = ""
458         second_line = ""
459         f = open(file_name, "wt")
460         try:
461             for key in sorted(results):
462                 first_line += key + ", "
463                 second_line += str(results[key]) + ", "
464             first_line = first_line[:-2]
465             second_line = second_line[:-2]
466             f.write(first_line + "\n")
467             f.write(second_line + "\n")
468             logger.info("Message generator performance results stored in " +
469                         file_name + ":")
470             logger.info("  " + first_line)
471             logger.info("  " + second_line)
472         finally:
473             f.close()
474
475     # Return pseudo-randomized (reproducible) index for selected range
476     def randomize_index(self, index, lowest=None, highest=None):
477         """Calculates pseudo-randomized index from selected range.
478
479         Arguments:
480             :param index: input index
481             :param lowest: the lowes index from the randomized area
482             :param highest: the highest index from the randomized area
483         Returns:
484             :return: the (pseudo)randomized index
485         Notes:
486             Created just as a fame for future generator enhancement.
487         """
488         # default values handling
489         # TODO optimize default values handling (use e.g. dicionary.update() approach)
490         if lowest is None:
491             lowest = self.randomize_lowest_default
492         if highest is None:
493             highest = self.randomize_highest_default
494         # randomize
495         if (index >= lowest) and (index <= highest):
496             # we are in the randomized range -> shuffle it inside
497             # the range (now just reverse the order)
498             new_index = highest - (index - lowest)
499         else:
500             # we are out of the randomized range -> nothing to do
501             new_index = index
502         return new_index
503
504     # Get list of prefixes
505     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
506                         prefix_len=None, prefix_count=None, randomize=None):
507         """Generates list of IP address prefixes.
508
509         Arguments:
510             :param slot_index: index of group of prefix addresses
511             :param slot_size: size of group of prefix addresses
512                 in [number of included prefixes]
513             :param prefix_base: IP address of the first prefix
514                 (slot_index = 0, prefix_index = 0)
515             :param prefix_len: length of the prefix in bites
516                 (the same as size of netmask)
517             :param prefix_count: number of prefixes to be returned
518                 from the specified slot
519         Returns:
520             :return: list of generated IP address prefixes
521         """
522         # default values handling
523         # TODO optimize default values handling (use e.g. dicionary.update() approach)
524         if slot_size is None:
525             slot_size = self.slot_size_default
526         if prefix_base is None:
527             prefix_base = self.prefix_base_default
528         if prefix_len is None:
529             prefix_len = self.prefix_length_default
530         if prefix_count is None:
531             prefix_count = slot_size
532         if randomize is None:
533             randomize = self.randomize_updates_default
534         # generating list of prefixes
535         indexes = []
536         prefixes = []
537         prefix_gap = 2 ** (32 - prefix_len)
538         for i in range(prefix_count):
539             prefix_index = slot_index * slot_size + i
540             if randomize:
541                 prefix_index = self.randomize_index(prefix_index)
542             indexes.append(prefix_index)
543             prefixes.append(prefix_base + prefix_index * prefix_gap)
544         if self.log_debug:
545             logger.debug("  Prefix slot index: " + str(slot_index))
546             logger.debug("  Prefix slot size: " + str(slot_size))
547             logger.debug("  Prefix count: " + str(prefix_count))
548             logger.debug("  Prefix indexes: " + str(indexes))
549             logger.debug("  Prefix list: " + str(prefixes))
550         return prefixes
551
552     def compose_update_message(self, prefix_count_to_add=None,
553                                prefix_count_to_del=None):
554         """Composes an UPDATE message
555
556         Arguments:
557             :param prefix_count_to_add: # of prefixes to put into NLRI list
558             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
559         Returns:
560             :return: encoded UPDATE message in HEX
561         Notes:
562             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
563             lists or common message wich includes both prefix lists.
564             Updates global counters.
565         """
566         # default values handling
567         # TODO optimize default values handling (use e.g. dicionary.update() approach)
568         if prefix_count_to_add is None:
569             prefix_count_to_add = self.prefix_count_to_add_default
570         if prefix_count_to_del is None:
571             prefix_count_to_del = self.prefix_count_to_del_default
572         # logging
573         if self.log_info and not (self.iteration % 1000):
574             logger.info("Iteration: " + str(self.iteration) +
575                         " - total remaining prefixes: " +
576                         str(self.remaining_prefixes))
577         if self.log_debug:
578             logger.debug("#" * 10 + " Iteration: " +
579                          str(self.iteration) + " " + "#" * 10)
580             logger.debug("Remaining prefixes: " +
581                          str(self.remaining_prefixes))
582         # scenario type & one-shot counter
583         straightforward_scenario = (self.remaining_prefixes >
584                                     self.remaining_prefixes_threshold)
585         if straightforward_scenario:
586             prefix_count_to_del = 0
587             if self.log_debug:
588                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
589             if not self.phase1_start_time:
590                 self.phase1_start_time = time.time()
591         else:
592             if self.log_debug:
593                 logger.debug("--- COMBINED SCENARIO ---")
594             if not self.phase2_start_time:
595                 self.phase2_start_time = time.time()
596         # tailor the number of prefixes if needed
597         prefix_count_to_add = (prefix_count_to_del +
598                                min(prefix_count_to_add - prefix_count_to_del,
599                                    self.remaining_prefixes))
600         # prefix slots selection for insertion and withdrawal
601         slot_index_to_add = self.iteration
602         slot_index_to_del = slot_index_to_add - self.slot_gap_default
603         # getting lists of prefixes for insertion in this iteration
604         if self.log_debug:
605             logger.debug("Prefixes to be inserted in this iteration:")
606         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
607                                                   prefix_count=prefix_count_to_add)
608         # getting lists of prefixes for withdrawal in this iteration
609         if self.log_debug:
610             logger.debug("Prefixes to be withdrawn in this iteration:")
611         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
612                                                   prefix_count=prefix_count_to_del)
613         # generating the mesage
614         if self.single_update_default:
615             # Send prefixes to be introduced and withdrawn
616             # in one UPDATE message
617             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
618                                           nlri_prefixes=prefix_list_to_add)
619         else:
620             # Send prefixes to be introduced and withdrawn
621             # in separate UPDATE messages (if needed)
622             msg_out = self.update_message(wr_prefixes=[],
623                                           nlri_prefixes=prefix_list_to_add)
624             if prefix_count_to_del:
625                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
626                                                nlri_prefixes=[])
627         # updating counters - who knows ... maybe I am last time here ;)
628         if straightforward_scenario:
629             self.phase1_stop_time = time.time()
630             self.phase1_updates_sent = self.updates_sent
631         else:
632             self.phase2_stop_time = time.time()
633             self.phase2_updates_sent = (self.updates_sent -
634                                         self.phase1_updates_sent)
635         # updating totals for the next iteration
636         self.iteration += 1
637         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
638         # returning the encoded message
639         return msg_out
640
641     # Section of message encoders
642
643     def open_message(self, version=None, my_autonomous_system=None,
644                      hold_time=None, bgp_identifier=None):
645         """Generates an OPEN Message (rfc4271#section-4.2)
646
647         Arguments:
648             :param version: see the rfc4271#section-4.2
649             :param my_autonomous_system: see the rfc4271#section-4.2
650             :param hold_time: see the rfc4271#section-4.2
651             :param bgp_identifier: see the rfc4271#section-4.2
652         Returns:
653             :return: encoded OPEN message in HEX
654         """
655
656         # default values handling
657         # TODO optimize default values handling (use e.g. dicionary.update() approach)
658         if version is None:
659             version = self.version_default
660         if my_autonomous_system is None:
661             my_autonomous_system = self.my_autonomous_system_default
662         if hold_time is None:
663             hold_time = self.hold_time_default
664         if bgp_identifier is None:
665             bgp_identifier = self.bgp_identifier_default
666
667         # Marker
668         marker_hex = "\xFF" * 16
669
670         # Type
671         type = 1
672         type_hex = struct.pack("B", type)
673
674         # version
675         version_hex = struct.pack("B", version)
676
677         # my_autonomous_system
678         # AS_TRANS value, 23456 decadic.
679         my_autonomous_system_2_bytes = 23456
680         # AS number is mappable to 2 bytes
681         if my_autonomous_system < 65536:
682             my_autonomous_system_2_bytes = my_autonomous_system
683         my_autonomous_system_hex_2_bytes = struct.pack(">H",
684                                                        my_autonomous_system)
685
686         # Hold Time
687         hold_time_hex = struct.pack(">H", hold_time)
688
689         # BGP Identifier
690         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
691
692         # Optional Parameters
693         optional_parameters_hex = ""
694         if self.rfc4760:
695             optional_parameter_hex = (
696                 "\x02"  # Param type ("Capability Ad")
697                 "\x06"  # Length (6 bytes)
698                 "\x01"  # Capability type (NLRI Unicast),
699                         # see RFC 4760, secton 8
700                 "\x04"  # Capability value length
701                 "\x00\x01"  # AFI (Ipv4)
702                 "\x00"  # (reserved)
703                 "\x01"  # SAFI (Unicast)
704             )
705             optional_parameters_hex += optional_parameter_hex
706
707         optional_parameter_hex = (
708             "\x02"  # Param type ("Capability Ad")
709             "\x06"  # Length (6 bytes)
710             "\x41"  # "32 bit AS Numbers Support"
711                     # (see RFC 6793, section 3)
712             "\x04"  # Capability value length
713         )
714         optional_parameter_hex += (
715             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
716         )
717         optional_parameters_hex += optional_parameter_hex
718
719         # Optional Parameters Length
720         optional_parameters_length = len(optional_parameters_hex)
721         optional_parameters_length_hex = struct.pack("B",
722                                                      optional_parameters_length)
723
724         # Length (big-endian)
725         length = (
726             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
727             len(my_autonomous_system_hex_2_bytes) +
728             len(hold_time_hex) + len(bgp_identifier_hex) +
729             len(optional_parameters_length_hex) +
730             len(optional_parameters_hex)
731         )
732         length_hex = struct.pack(">H", length)
733
734         # OPEN Message
735         message_hex = (
736             marker_hex +
737             length_hex +
738             type_hex +
739             version_hex +
740             my_autonomous_system_hex_2_bytes +
741             hold_time_hex +
742             bgp_identifier_hex +
743             optional_parameters_length_hex +
744             optional_parameters_hex
745         )
746
747         if self.log_debug:
748             logger.debug("OPEN message encoding")
749             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
750             logger.debug("  Length=" + str(length) + " (0x" +
751                          binascii.hexlify(length_hex) + ")")
752             logger.debug("  Type=" + str(type) + " (0x" +
753                          binascii.hexlify(type_hex) + ")")
754             logger.debug("  Version=" + str(version) + " (0x" +
755                          binascii.hexlify(version_hex) + ")")
756             logger.debug("  My Autonomous System=" +
757                          str(my_autonomous_system_2_bytes) + " (0x" +
758                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
759                          ")")
760             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
761                          binascii.hexlify(hold_time_hex) + ")")
762             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
763                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
764             logger.debug("  Optional Parameters Length=" +
765                          str(optional_parameters_length) + " (0x" +
766                          binascii.hexlify(optional_parameters_length_hex) +
767                          ")")
768             logger.debug("  Optional Parameters=0x" +
769                          binascii.hexlify(optional_parameters_hex))
770             logger.debug("OPEN message encoded: 0x%s",
771                          binascii.b2a_hex(message_hex))
772
773         return message_hex
774
775     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
776                        wr_prefix_length=None, nlri_prefix_length=None,
777                        my_autonomous_system=None, next_hop=None,
778                        originator_id=None, cluster_list_item=None):
779         """Generates an UPDATE Message (rfc4271#section-4.3)
780
781         Arguments:
782             :param wr_prefixes: see the rfc4271#section-4.3
783             :param nlri_prefixes: see the rfc4271#section-4.3
784             :param wr_prefix_length: see the rfc4271#section-4.3
785             :param nlri_prefix_length: see the rfc4271#section-4.3
786             :param my_autonomous_system: see the rfc4271#section-4.3
787             :param next_hop: see the rfc4271#section-4.3
788         Returns:
789             :return: encoded UPDATE message in HEX
790         """
791
792         # default values handling
793         # TODO optimize default values handling (use e.g. dicionary.update() approach)
794         if wr_prefixes is None:
795             wr_prefixes = self.wr_prefixes_default
796         if nlri_prefixes is None:
797             nlri_prefixes = self.nlri_prefixes_default
798         if wr_prefix_length is None:
799             wr_prefix_length = self.prefix_length_default
800         if nlri_prefix_length is None:
801             nlri_prefix_length = self.prefix_length_default
802         if my_autonomous_system is None:
803             my_autonomous_system = self.my_autonomous_system_default
804         if next_hop is None:
805             next_hop = self.next_hop_default
806         if originator_id is None:
807             originator_id = self.originator_id_default
808         if cluster_list_item is None:
809             cluster_list_item = self.cluster_list_item_default
810
811         # Marker
812         marker_hex = "\xFF" * 16
813
814         # Type
815         type = 2
816         type_hex = struct.pack("B", type)
817
818         # Withdrawn Routes
819         bytes = ((wr_prefix_length - 1) / 8) + 1
820         withdrawn_routes_hex = ""
821         for prefix in wr_prefixes:
822             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
823                                    struct.pack(">I", int(prefix))[:bytes])
824             withdrawn_routes_hex += withdrawn_route_hex
825
826         # Withdrawn Routes Length
827         withdrawn_routes_length = len(withdrawn_routes_hex)
828         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
829
830         # TODO: to replace hardcoded string by encoding?
831         # Path Attributes
832         path_attributes_hex = ""
833         if nlri_prefixes != []:
834             path_attributes_hex += (
835                 "\x40"  # Flags ("Well-Known")
836                 "\x01"  # Type (ORIGIN)
837                 "\x01"  # Length (1)
838                 "\x00"  # Origin: IGP
839             )
840             path_attributes_hex += (
841                 "\x40"  # Flags ("Well-Known")
842                 "\x02"  # Type (AS_PATH)
843                 "\x06"  # Length (6)
844                 "\x02"  # AS segment type (AS_SEQUENCE)
845                 "\x01"  # AS segment length (1)
846             )
847             my_as_hex = struct.pack(">I", my_autonomous_system)
848             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
849             path_attributes_hex += (
850                 "\x40"  # Flags ("Well-Known")
851                 "\x03"  # Type (NEXT_HOP)
852                 "\x04"  # Length (4)
853             )
854             next_hop_hex = struct.pack(">I", int(next_hop))
855             path_attributes_hex += (
856                 next_hop_hex  # IP address of the next hop (4 bytes)
857             )
858             if originator_id is not None:
859                 path_attributes_hex += (
860                     "\x80"  # Flags ("Optional, non-transitive")
861                     "\x09"  # Type (ORIGINATOR_ID)
862                     "\x04"  # Length (4)
863                 )           # ORIGINATOR_ID (4 bytes)
864                 path_attributes_hex += struct.pack(">I", int(originator_id))
865             if cluster_list_item is not None:
866                 path_attributes_hex += (
867                     "\x80"  # Flags ("Optional, non-transitive")
868                     "\x09"  # Type (CLUSTER_LIST)
869                     "\x04"  # Length (4)
870                 )           # one CLUSTER_LIST item (4 bytes)
871                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
872
873         # Total Path Attributes Length
874         total_path_attributes_length = len(path_attributes_hex)
875         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
876
877         # Network Layer Reachability Information
878         bytes = ((nlri_prefix_length - 1) / 8) + 1
879         nlri_hex = ""
880         for prefix in nlri_prefixes:
881             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
882                                struct.pack(">I", int(prefix))[:bytes])
883             nlri_hex += nlri_prefix_hex
884
885         # Length (big-endian)
886         length = (
887             len(marker_hex) + 2 + len(type_hex) +
888             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
889             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
890             len(nlri_hex))
891         length_hex = struct.pack(">H", length)
892
893         # UPDATE Message
894         message_hex = (
895             marker_hex +
896             length_hex +
897             type_hex +
898             withdrawn_routes_length_hex +
899             withdrawn_routes_hex +
900             total_path_attributes_length_hex +
901             path_attributes_hex +
902             nlri_hex
903         )
904
905         if self.log_debug:
906             logger.debug("UPDATE message encoding")
907             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
908             logger.debug("  Length=" + str(length) + " (0x" +
909                          binascii.hexlify(length_hex) + ")")
910             logger.debug("  Type=" + str(type) + " (0x" +
911                          binascii.hexlify(type_hex) + ")")
912             logger.debug("  withdrawn_routes_length=" +
913                          str(withdrawn_routes_length) + " (0x" +
914                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
915             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
916                          str(wr_prefix_length) + " (0x" +
917                          binascii.hexlify(withdrawn_routes_hex) + ")")
918             if total_path_attributes_length:
919                 logger.debug("  Total Path Attributes Length=" +
920                              str(total_path_attributes_length) + " (0x" +
921                              binascii.hexlify(total_path_attributes_length_hex) + ")")
922                 logger.debug("  Path Attributes=" + "(0x" +
923                              binascii.hexlify(path_attributes_hex) + ")")
924                 logger.debug("    Origin=IGP")
925                 logger.debug("    AS path=" + str(my_autonomous_system))
926                 logger.debug("    Next hop=" + str(next_hop))
927                 if originator_id is not None:
928                     logger.debug("    Originator id=" + str(originator_id))
929                 if cluster_list_item is not None:
930                     logger.debug("    Cluster list=" + str(cluster_list_item))
931             logger.debug("  Network Layer Reachability Information=" +
932                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
933                          " (0x" + binascii.hexlify(nlri_hex) + ")")
934             logger.debug("UPDATE message encoded: 0x" +
935                          binascii.b2a_hex(message_hex))
936
937         # updating counter
938         self.updates_sent += 1
939         # returning encoded message
940         return message_hex
941
942     def notification_message(self, error_code, error_subcode, data_hex=""):
943         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
944
945         Arguments:
946             :param error_code: see the rfc4271#section-4.5
947             :param error_subcode: see the rfc4271#section-4.5
948             :param data_hex: see the rfc4271#section-4.5
949         Returns:
950             :return: encoded NOTIFICATION message in HEX
951         """
952
953         # Marker
954         marker_hex = "\xFF" * 16
955
956         # Type
957         type = 3
958         type_hex = struct.pack("B", type)
959
960         # Error Code
961         error_code_hex = struct.pack("B", error_code)
962
963         # Error Subode
964         error_subcode_hex = struct.pack("B", error_subcode)
965
966         # Length (big-endian)
967         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
968                   len(error_subcode_hex) + len(data_hex))
969         length_hex = struct.pack(">H", length)
970
971         # NOTIFICATION Message
972         message_hex = (
973             marker_hex +
974             length_hex +
975             type_hex +
976             error_code_hex +
977             error_subcode_hex +
978             data_hex
979         )
980
981         if self.log_debug:
982             logger.debug("NOTIFICATION message encoding")
983             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
984             logger.debug("  Length=" + str(length) + " (0x" +
985                          binascii.hexlify(length_hex) + ")")
986             logger.debug("  Type=" + str(type) + " (0x" +
987                          binascii.hexlify(type_hex) + ")")
988             logger.debug("  Error Code=" + str(error_code) + " (0x" +
989                          binascii.hexlify(error_code_hex) + ")")
990             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
991                          binascii.hexlify(error_subcode_hex) + ")")
992             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
993             logger.debug("NOTIFICATION message encoded: 0x%s",
994                          binascii.b2a_hex(message_hex))
995
996         return message_hex
997
998     def keepalive_message(self):
999         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1000
1001         Returns:
1002             :return: encoded KEEP ALIVE message in HEX
1003         """
1004
1005         # Marker
1006         marker_hex = "\xFF" * 16
1007
1008         # Type
1009         type = 4
1010         type_hex = struct.pack("B", type)
1011
1012         # Length (big-endian)
1013         length = len(marker_hex) + 2 + len(type_hex)
1014         length_hex = struct.pack(">H", length)
1015
1016         # KEEP ALIVE Message
1017         message_hex = (
1018             marker_hex +
1019             length_hex +
1020             type_hex
1021         )
1022
1023         if self.log_debug:
1024             logger.debug("KEEP ALIVE message encoding")
1025             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1026             logger.debug("  Length=" + str(length) + " (0x" +
1027                          binascii.hexlify(length_hex) + ")")
1028             logger.debug("  Type=" + str(type) + " (0x" +
1029                          binascii.hexlify(type_hex) + ")")
1030             logger.debug("KEEP ALIVE message encoded: 0x%s",
1031                          binascii.b2a_hex(message_hex))
1032
1033         return message_hex
1034
1035
1036 class TimeTracker(object):
1037     """Class for tracking timers, both for my keepalives and
1038     peer's hold time.
1039     """
1040
1041     def __init__(self, msg_in):
1042         """Initialisation. based on defaults and OPEN message from peer.
1043
1044         Arguments:
1045             msg_in: the OPEN message received from peer.
1046         """
1047         # Note: Relative time is always named timedelta, to stress that
1048         # the (non-delta) time is absolute.
1049         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1050         # Upper bound for being stuck in the same state, we should
1051         # at least report something before continuing.
1052         # Negotiate the hold timer by taking the smaller
1053         # of the 2 values (mine and the peer's).
1054         hold_timedelta = 180  # Not an attribute of self yet.
1055         # TODO: Make the default value configurable,
1056         # default value could mirror what peer said.
1057         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1058         if hold_timedelta > peer_hold_timedelta:
1059             hold_timedelta = peer_hold_timedelta
1060         if hold_timedelta != 0 and hold_timedelta < 3:
1061             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1062             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1063         self.hold_timedelta = hold_timedelta
1064         # If we do not hear from peer this long, we assume it has died.
1065         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1066         # Upper limit for duration between messages, to avoid being
1067         # declared to be dead.
1068         # The same as calling snapshot(), but also declares a field.
1069         self.snapshot_time = time.time()
1070         # Sometimes we need to store time. This is where to get
1071         # the value from afterwards. Time_keepalive may be too strict.
1072         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1073         # At this time point, peer will be declared dead.
1074         self.my_keepalive_time = None  # to be set later
1075         # At this point, we should be sending keepalive message.
1076
1077     def snapshot(self):
1078         """Store current time in instance data to use later."""
1079         # Read as time before something interesting was called.
1080         self.snapshot_time = time.time()
1081
1082     def reset_peer_hold_time(self):
1083         """Move hold time to future as peer has just proven it still lives."""
1084         self.peer_hold_time = time.time() + self.hold_timedelta
1085
1086     # Some methods could rely on self.snapshot_time, but it is better
1087     # to require user to provide it explicitly.
1088     def reset_my_keepalive_time(self, keepalive_time):
1089         """Calculate and set the next my KEEP ALIVE timeout time
1090
1091         Arguments:
1092             :keepalive_time: the initial value of the KEEP ALIVE timer
1093         """
1094         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1095
1096     def is_time_for_my_keepalive(self):
1097         """Check for my KEEP ALIVE timeout occurence"""
1098         if self.hold_timedelta == 0:
1099             return False
1100         return self.snapshot_time >= self.my_keepalive_time
1101
1102     def get_next_event_time(self):
1103         """Set the time of the next expected or to be sent KEEP ALIVE"""
1104         if self.hold_timedelta == 0:
1105             return self.snapshot_time + 86400
1106         return min(self.my_keepalive_time, self.peer_hold_time)
1107
1108     def check_peer_hold_time(self, snapshot_time):
1109         """Raise error if nothing was read from peer until specified time."""
1110         # Hold time = 0 means keepalive checking off.
1111         if self.hold_timedelta != 0:
1112             # time.time() may be too strict
1113             if snapshot_time > self.peer_hold_time:
1114                 logger.error("Peer has overstepped the hold timer.")
1115                 raise RuntimeError("Peer has overstepped the hold timer.")
1116                 # TODO: Include hold_timedelta?
1117                 # TODO: Add notification sending (attempt). That means
1118                 # move to write tracker.
1119
1120
1121 class ReadTracker(object):
1122     """Class for tracking read of mesages chunk by chunk and
1123     for idle waiting.
1124     """
1125
1126     def __init__(self, bgp_socket, timer):
1127         """The reader initialisation.
1128
1129         Arguments:
1130             bgp_socket: socket to be used for sending
1131             timer: timer to be used for scheduling
1132         """
1133         # References to outside objects.
1134         self.socket = bgp_socket
1135         self.timer = timer
1136         # BGP marker length plus length field length.
1137         self.header_length = 18
1138         # TODO: make it class (constant) attribute
1139         # Computation of where next chunk ends depends on whether
1140         # we are beyond length field.
1141         self.reading_header = True
1142         # Countdown towards next size computation.
1143         self.bytes_to_read = self.header_length
1144         # Incremental buffer for message under read.
1145         self.msg_in = ""
1146         # Initialising counters
1147         self.updates_received = 0
1148         self.prefixes_introduced = 0
1149         self.prefixes_withdrawn = 0
1150         self.rx_idle_time = 0
1151         self.rx_activity_detected = True
1152
1153     def read_message_chunk(self):
1154         """Read up to one message
1155
1156         Note:
1157             Currently it does not return anything.
1158         """
1159         # TODO: We could return the whole message, currently not needed.
1160         # We assume the socket is readable.
1161         chunk_message = self.socket.recv(self.bytes_to_read)
1162         self.msg_in += chunk_message
1163         self.bytes_to_read -= len(chunk_message)
1164         # TODO: bytes_to_read < 0 is not possible, right?
1165         if not self.bytes_to_read:
1166             # Finished reading a logical block.
1167             if self.reading_header:
1168                 # The logical block was a BGP header.
1169                 # Now we know the size of the message.
1170                 self.reading_header = False
1171                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1172                                       self.header_length)
1173             else:  # We have finished reading the body of the message.
1174                 # Peer has just proven it is still alive.
1175                 self.timer.reset_peer_hold_time()
1176                 # TODO: Do we want to count received messages?
1177                 # This version ignores the received message.
1178                 # TODO: Should we do validation and exit on anything
1179                 # besides update or keepalive?
1180                 # Prepare state for reading another message.
1181                 message_type_hex = self.msg_in[self.header_length]
1182                 if message_type_hex == "\x01":
1183                     logger.info("OPEN message received: 0x%s",
1184                                 binascii.b2a_hex(self.msg_in))
1185                 elif message_type_hex == "\x02":
1186                     logger.debug("UPDATE message received: 0x%s",
1187                                  binascii.b2a_hex(self.msg_in))
1188                     self.decode_update_message(self.msg_in)
1189                 elif message_type_hex == "\x03":
1190                     logger.info("NOTIFICATION message received: 0x%s",
1191                                 binascii.b2a_hex(self.msg_in))
1192                 elif message_type_hex == "\x04":
1193                     logger.info("KEEP ALIVE message received: 0x%s",
1194                                 binascii.b2a_hex(self.msg_in))
1195                 else:
1196                     logger.warning("Unexpected message received: 0x%s",
1197                                    binascii.b2a_hex(self.msg_in))
1198                 self.msg_in = ""
1199                 self.reading_header = True
1200                 self.bytes_to_read = self.header_length
1201         # We should not act upon peer_hold_time if we are reading
1202         # something right now.
1203         return
1204
1205     def decode_path_attributes(self, path_attributes_hex):
1206         """Decode the Path Attributes field (rfc4271#section-4.3)
1207
1208         Arguments:
1209             :path_attributes: path_attributes field to be decoded in hex
1210         Returns:
1211             :return: None
1212         """
1213         hex_to_decode = path_attributes_hex
1214
1215         while len(hex_to_decode):
1216             attr_flags_hex = hex_to_decode[0]
1217             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1218 #            attr_optional_bit = attr_flags & 128
1219 #            attr_transitive_bit = attr_flags & 64
1220 #            attr_partial_bit = attr_flags & 32
1221             attr_extended_length_bit = attr_flags & 16
1222
1223             attr_type_code_hex = hex_to_decode[1]
1224             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1225
1226             if attr_extended_length_bit:
1227                 attr_length_hex = hex_to_decode[2:4]
1228                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1229                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1230                 hex_to_decode = hex_to_decode[4 + attr_length:]
1231             else:
1232                 attr_length_hex = hex_to_decode[2]
1233                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1234                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1235                 hex_to_decode = hex_to_decode[3 + attr_length:]
1236
1237             if attr_type_code == 1:
1238                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1239                              binascii.b2a_hex(attr_flags_hex))
1240                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1241             elif attr_type_code == 2:
1242                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1243                              binascii.b2a_hex(attr_flags_hex))
1244                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1245             elif attr_type_code == 3:
1246                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1247                              binascii.b2a_hex(attr_flags_hex))
1248                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1249             elif attr_type_code == 4:
1250                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1251                              binascii.b2a_hex(attr_flags_hex))
1252                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1253             elif attr_type_code == 5:
1254                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1255                              binascii.b2a_hex(attr_flags_hex))
1256                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1257             elif attr_type_code == 6:
1258                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1259                              binascii.b2a_hex(attr_flags_hex))
1260                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1261             elif attr_type_code == 7:
1262                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1263                              binascii.b2a_hex(attr_flags_hex))
1264                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1265             elif attr_type_code == 9:  # rfc4456#section-8
1266                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1267                              binascii.b2a_hex(attr_flags_hex))
1268                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1269             elif attr_type_code == 10:  # rfc4456#section-8
1270                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1271                              binascii.b2a_hex(attr_flags_hex))
1272                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1273             elif attr_type_code == 14:  # rfc4760#section-3
1274                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1275                              binascii.b2a_hex(attr_flags_hex))
1276                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1277                 address_family_identifier_hex = attr_value_hex[0:2]
1278                 logger.debug("  Address Family Identifier=0x%s",
1279                              binascii.b2a_hex(address_family_identifier_hex))
1280                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1281                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1282                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1283                 next_hop_netaddr_len_hex = attr_value_hex[3]
1284                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1285                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1286                              next_hop_netaddr_len,
1287                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1288                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1289                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1290                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1291                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1292                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1293                 logger.debug("  Reserved=0x%s",
1294                              binascii.b2a_hex(reserved_hex))
1295                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1296                 logger.debug("  Network Layer Reachability Information=0x%s",
1297                              binascii.b2a_hex(nlri_hex))
1298                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1299                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1300                 for prefix in nlri_prefix_list:
1301                     logger.debug("  nlri_prefix_received: %s", prefix)
1302                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1303             elif attr_type_code == 15:  # rfc4760#section-4
1304                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1305                              binascii.b2a_hex(attr_flags_hex))
1306                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1307                 address_family_identifier_hex = attr_value_hex[0:2]
1308                 logger.debug("  Address Family Identifier=0x%s",
1309                              binascii.b2a_hex(address_family_identifier_hex))
1310                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1311                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1312                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1313                 wd_hex = attr_value_hex[3:]
1314                 logger.debug("  Withdrawn Routes=0x%s",
1315                              binascii.b2a_hex(wd_hex))
1316                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1317                 logger.debug("  Withdrawn routes prefix list: %s",
1318                              wdr_prefix_list)
1319                 for prefix in wdr_prefix_list:
1320                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1321                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1322             else:
1323                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1324                              binascii.b2a_hex(attr_flags_hex))
1325                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1326         return None
1327
1328     def decode_update_message(self, msg):
1329         """Decode an UPDATE message (rfc4271#section-4.3)
1330
1331         Arguments:
1332             :msg: message to be decoded in hex
1333         Returns:
1334             :return: None
1335         """
1336         logger.debug("Decoding update message:")
1337         # message header - marker
1338         marker_hex = msg[:16]
1339         logger.debug("Message header marker: 0x%s",
1340                      binascii.b2a_hex(marker_hex))
1341         # message header - message length
1342         msg_length_hex = msg[16:18]
1343         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1344         logger.debug("Message lenght: 0x%s (%s)",
1345                      binascii.b2a_hex(msg_length_hex), msg_length)
1346         # message header - message type
1347         msg_type_hex = msg[18:19]
1348         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1349         if msg_type == 2:
1350             logger.debug("Message type: 0x%s (update)",
1351                          binascii.b2a_hex(msg_type_hex))
1352             # withdrawn routes length
1353             wdr_length_hex = msg[19:21]
1354             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1355             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1356                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1357             # withdrawn routes
1358             wdr_hex = msg[21:21 + wdr_length]
1359             logger.debug("Withdrawn routes: 0x%s",
1360                          binascii.b2a_hex(wdr_hex))
1361             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1362             logger.debug("Withdrawn routes prefix list: %s",
1363                          wdr_prefix_list)
1364             for prefix in wdr_prefix_list:
1365                 logger.debug("withdrawn_prefix_received: %s", prefix)
1366             # total path attribute length
1367             total_pa_length_offset = 21 + wdr_length
1368             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1369             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1370             logger.debug("Total path attribute lenght: 0x%s (%s)",
1371                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1372             # path attributes
1373             pa_offset = total_pa_length_offset + 2
1374             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1375             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1376             self.decode_path_attributes(pa_hex)
1377             # network layer reachability information length
1378             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1379             logger.debug("Calculated NLRI length: %s", nlri_length)
1380             # network layer reachability information
1381             nlri_offset = pa_offset + total_pa_length
1382             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1383             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1384             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1385             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1386             for prefix in nlri_prefix_list:
1387                 logger.debug("nlri_prefix_received: %s", prefix)
1388             # Updating counters
1389             self.updates_received += 1
1390             self.prefixes_introduced += len(nlri_prefix_list)
1391             self.prefixes_withdrawn += len(wdr_prefix_list)
1392         else:
1393             logger.error("Unexpeced message type 0x%s in 0x%s",
1394                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1395
1396     def wait_for_read(self):
1397         """Read message until timeout (next expected event).
1398
1399         Note:
1400             Used when no more updates has to be sent to avoid busy-wait.
1401             Currently it does not return anything.
1402         """
1403         # Compute time to the first predictable state change
1404         event_time = self.timer.get_next_event_time()
1405         # snapshot_time would be imprecise
1406         wait_timedelta = min(event_time - time.time(), 10)
1407         if wait_timedelta < 0:
1408             # The program got around to waiting to an event in "very near
1409             # future" so late that it became a "past" event, thus tell
1410             # "select" to not wait at all. Passing negative timedelta to
1411             # select() would lead to either waiting forever (for -1) or
1412             # select.error("Invalid parameter") (for everything else).
1413             wait_timedelta = 0
1414         # And wait for event or something to read.
1415
1416         if not self.rx_activity_detected or not (self.updates_received % 100):
1417             # right time to write statistics to the log (not for every update and
1418             # not too frequently to avoid having large log files)
1419             logger.info("total_received_update_message_counter: %s",
1420                         self.updates_received)
1421             logger.info("total_received_nlri_prefix_counter: %s",
1422                         self.prefixes_introduced)
1423             logger.info("total_received_withdrawn_prefix_counter: %s",
1424                         self.prefixes_withdrawn)
1425
1426         start_time = time.time()
1427         select.select([self.socket], [], [self.socket], wait_timedelta)
1428         timedelta = time.time() - start_time
1429         self.rx_idle_time += timedelta
1430         self.rx_activity_detected = timedelta < 1
1431
1432         if not self.rx_activity_detected or not (self.updates_received % 100):
1433             # right time to write statistics to the log (not for every update and
1434             # not too frequently to avoid having large log files)
1435             logger.info("... idle for %.3fs", timedelta)
1436             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1437         return
1438
1439
1440 class WriteTracker(object):
1441     """Class tracking enqueueing messages and sending chunks of them."""
1442
1443     def __init__(self, bgp_socket, generator, timer):
1444         """The writter initialisation.
1445
1446         Arguments:
1447             bgp_socket: socket to be used for sending
1448             generator: generator to be used for message generation
1449             timer: timer to be used for scheduling
1450         """
1451         # References to outside objects,
1452         self.socket = bgp_socket
1453         self.generator = generator
1454         self.timer = timer
1455         # Really new fields.
1456         # TODO: Would attribute docstrings add anything substantial?
1457         self.sending_message = False
1458         self.bytes_to_send = 0
1459         self.msg_out = ""
1460
1461     def enqueue_message_for_sending(self, message):
1462         """Enqueue message and change state.
1463
1464         Arguments:
1465             message: message to be enqueued into the msg_out buffer
1466         """
1467         self.msg_out += message
1468         self.bytes_to_send += len(message)
1469         self.sending_message = True
1470
1471     def send_message_chunk_is_whole(self):
1472         """Send enqueued data from msg_out buffer
1473
1474         Returns:
1475             :return: true if no remaining data to send
1476         """
1477         # We assume there is a msg_out to send and socket is writable.
1478         # print "going to send", repr(self.msg_out)
1479         self.timer.snapshot()
1480         bytes_sent = self.socket.send(self.msg_out)
1481         # Forget the part of message that was sent.
1482         self.msg_out = self.msg_out[bytes_sent:]
1483         self.bytes_to_send -= bytes_sent
1484         if not self.bytes_to_send:
1485             # TODO: Is it possible to hit negative bytes_to_send?
1486             self.sending_message = False
1487             # We should have reset hold timer on peer side.
1488             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1489             # The possible reason for not prioritizing reads is gone.
1490             return True
1491         return False
1492
1493
1494 class StateTracker(object):
1495     """Main loop has state so complex it warrants this separate class."""
1496
1497     def __init__(self, bgp_socket, generator, timer):
1498         """The state tracker initialisation.
1499
1500         Arguments:
1501             bgp_socket: socket to be used for sending / receiving
1502             generator: generator to be used for message generation
1503             timer: timer to be used for scheduling
1504         """
1505         # References to outside objects.
1506         self.socket = bgp_socket
1507         self.generator = generator
1508         self.timer = timer
1509         # Sub-trackers.
1510         self.reader = ReadTracker(bgp_socket, timer)
1511         self.writer = WriteTracker(bgp_socket, generator, timer)
1512         # Prioritization state.
1513         self.prioritize_writing = False
1514         # In general, we prioritize reading over writing. But in order
1515         # not to get blocked by neverending reads, we should
1516         # check whether we are not risking running out of holdtime.
1517         # So in some situations, this field is set to True to attempt
1518         # finishing sending a message, after which this field resets
1519         # back to False.
1520         # TODO: Alternative is to switch fairly between reading and
1521         # writing (called round robin from now on).
1522         # Message counting is done in generator.
1523
1524     def perform_one_loop_iteration(self):
1525         """ The main loop iteration
1526
1527         Notes:
1528             Calculates priority, resolves all conditions, calls
1529             appropriate method and returns to caller to repeat.
1530         """
1531         self.timer.snapshot()
1532         if not self.prioritize_writing:
1533             if self.timer.is_time_for_my_keepalive():
1534                 if not self.writer.sending_message:
1535                     # We need to schedule a keepalive ASAP.
1536                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1537                     logger.info("KEEP ALIVE is sent.")
1538                 # We are sending a message now, so let's prioritize it.
1539                 self.prioritize_writing = True
1540         # Now we know what our priorities are, we have to check
1541         # which actions are available.
1542         # socket.socket() returns three lists,
1543         # we store them to list of lists.
1544         list_list = select.select([self.socket], [self.socket], [self.socket],
1545                                   self.timer.report_timedelta)
1546         read_list, write_list, except_list = list_list
1547         # Lists are unpacked, each is either [] or [self.socket],
1548         # so we will test them as boolean.
1549         if except_list:
1550             logger.error("Exceptional state on the socket.")
1551             raise RuntimeError("Exceptional state on socket", self.socket)
1552         # We will do either read or write.
1553         if not (self.prioritize_writing and write_list):
1554             # Either we have no reason to rush writes,
1555             # or the socket is not writable.
1556             # We are focusing on reading here.
1557             if read_list:  # there is something to read indeed
1558                 # In this case we want to read chunk of message
1559                 # and repeat the select,
1560                 self.reader.read_message_chunk()
1561                 return
1562             # We were focusing on reading, but nothing to read was there.
1563             # Good time to check peer for hold timer.
1564             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1565             # Quiet on the read front, we can have attempt to write.
1566         if write_list:
1567             # Either we really want to reset peer's view of our hold
1568             # timer, or there was nothing to read.
1569             # Were we in the middle of sending a message?
1570             if self.writer.sending_message:
1571                 # Was it the end of a message?
1572                 whole = self.writer.send_message_chunk_is_whole()
1573                 # We were pressed to send something and we did it.
1574                 if self.prioritize_writing and whole:
1575                     # We prioritize reading again.
1576                     self.prioritize_writing = False
1577                 return
1578             # Finally to check if still update messages to be generated.
1579             if self.generator.remaining_prefixes:
1580                 msg_out = self.generator.compose_update_message()
1581                 if not self.generator.remaining_prefixes:
1582                     # We have just finished update generation,
1583                     # end-of-rib is due.
1584                     logger.info("All update messages generated.")
1585                     logger.info("Storing performance results.")
1586                     self.generator.store_results()
1587                     logger.info("Finally an END-OF-RIB is sent.")
1588                     msg_out += self.generator.update_message(wr_prefixes=[],
1589                                                              nlri_prefixes=[])
1590                 self.writer.enqueue_message_for_sending(msg_out)
1591                 # Attempt for real sending to be done in next iteration.
1592                 return
1593             # Nothing to write anymore.
1594             # To avoid busy loop, we do idle waiting here.
1595             self.reader.wait_for_read()
1596             return
1597         # We can neither read nor write.
1598         logger.warning("Input and output both blocked for " +
1599                        str(self.timer.report_timedelta) + " seconds.")
1600         # FIXME: Are we sure select has been really waiting
1601         # the whole period?
1602         return
1603
1604
1605 def create_logger(loglevel, logfile):
1606     """Create logger object
1607
1608     Arguments:
1609         :loglevel: log level
1610         :logfile: log file name
1611     Returns:
1612         :return: logger object
1613     """
1614     logger = logging.getLogger("logger")
1615     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1616     console_handler = logging.StreamHandler()
1617     file_handler = logging.FileHandler(logfile, mode="w")
1618     console_handler.setFormatter(log_formatter)
1619     file_handler.setFormatter(log_formatter)
1620     logger.addHandler(console_handler)
1621     logger.addHandler(file_handler)
1622     logger.setLevel(loglevel)
1623     return logger
1624
1625
1626 def job(arguments):
1627     """One time initialisation and iterations looping.
1628     Notes:
1629         Establish BGP connection and run iterations.
1630
1631     Arguments:
1632         :arguments: Command line arguments
1633     Returns:
1634         :return: None
1635     """
1636     bgp_socket = establish_connection(arguments)
1637     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1638     # Receive open message before sending anything.
1639     # FIXME: Add parameter to send default open message first,
1640     # to work with "you first" peers.
1641     msg_in = read_open_message(bgp_socket)
1642     timer = TimeTracker(msg_in)
1643     generator = MessageGenerator(arguments)
1644     msg_out = generator.open_message()
1645     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1646     # Send our open message to the peer.
1647     bgp_socket.send(msg_out)
1648     # Wait for confirming keepalive.
1649     # TODO: Surely in just one packet?
1650     # Using exact keepalive length to not to see possible updates.
1651     msg_in = bgp_socket.recv(19)
1652     if msg_in != generator.keepalive_message():
1653         error_msg = "Open not confirmed by keepalive, instead got"
1654         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1655         raise MessageError(error_msg, msg_in)
1656     timer.reset_peer_hold_time()
1657     # Send the keepalive to indicate the connection is accepted.
1658     timer.snapshot()  # Remember this time.
1659     msg_out = generator.keepalive_message()
1660     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1661     bgp_socket.send(msg_out)
1662     # Use the remembered time.
1663     timer.reset_my_keepalive_time(timer.snapshot_time)
1664     # End of initial handshake phase.
1665     state = StateTracker(bgp_socket, generator, timer)
1666     while True:  # main reactor loop
1667         state.perform_one_loop_iteration()
1668
1669
1670 def threaded_job(arguments):
1671     """Run the job threaded
1672
1673     Arguments:
1674         :arguments: Command line arguments
1675     Returns:
1676         :return: None
1677     """
1678     amount_left = arguments.amount
1679     utils_left = arguments.multiplicity
1680     prefix_current = arguments.firstprefix
1681     myip_current = arguments.myip
1682     thread_args = []
1683
1684     while 1:
1685         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1686         amount_left -= amount_per_util
1687         utils_left -= 1
1688
1689         args = deepcopy(arguments)
1690         args.amount = amount_per_util
1691         args.firstprefix = prefix_current
1692         args.myip = myip_current
1693         thread_args.append(args)
1694
1695         if not utils_left:
1696             break
1697         prefix_current += amount_per_util * 16
1698         myip_current += 1
1699
1700     try:
1701         # Create threads
1702         for t in thread_args:
1703             thread.start_new_thread(job, (t,))
1704     except Exception:
1705         print "Error: unable to start thread."
1706         raise SystemExit(2)
1707
1708     # Work remains forever
1709     while 1:
1710         time.sleep(5)
1711
1712
1713 if __name__ == "__main__":
1714     arguments = parse_arguments()
1715     logger = create_logger(arguments.loglevel, arguments.logfile)
1716     threaded_job(arguments)