5551bc8df1480839bdd63afdc4ccb388041dd913
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 import argparse
15 import binascii
16 import ipaddr
17 import select
18 import socket
19 import time
20 import logging
21 import struct
22
23 import thread
24 from copy import deepcopy
25
26
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
31
32
33 def parse_arguments():
34     """Use argparse to get arguments,
35
36     Returns:
37         :return: args object.
38     """
39     parser = argparse.ArgumentParser()
40     # TODO: Should we use --argument-names-with-spaces?
41     str_help = "Autonomous System number use in the stream."
42     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43     # FIXME: We are acting as iBGP peer,
44     # we should mirror AS number from peer's open message.
45     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46     parser.add_argument("--amount", default="1", type=int, help=str_help)
47     str_help = "Maximum number of IP prefixes to be announced in one iteration"
48     parser.add_argument("--insert", default="1", type=int, help=str_help)
49     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51     str_help = "The number of prefixes to process without withdrawals"
52     parser.add_argument("--prefill", default="0", type=int, help=str_help)
53     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54     parser.add_argument("--updates", choices=["single", "separate"],
55                         default=["separate"], help=str_help)
56     str_help = "Base prefix IP address for prefix generation"
57     parser.add_argument("--firstprefix", default="8.0.1.0",
58                         type=ipaddr.IPv4Address, help=str_help)
59     str_help = "The prefix length."
60     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61     str_help = "Listen for connection, instead of initiating it."
62     parser.add_argument("--listen", action="store_true", help=str_help)
63     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64                 "Default value only suitable for listening.")
65     parser.add_argument("--myip", default="0.0.0.0",
66                         type=ipaddr.IPv4Address, help=str_help)
67     str_help = ("TCP port to bind to when listening or initiating connection." +
68                 "Default only suitable for initiating.")
69     parser.add_argument("--myport", default="0", type=int, help=str_help)
70     str_help = "The IP of the next hop to be placed into the update messages."
71     parser.add_argument("--nexthop", default="192.0.2.1",
72                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73     str_help = "Identifier of the route originator."
74     parser.add_argument("--originator", default=None,
75                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
76     str_help = "Cluster list item identifier."
77     parser.add_argument("--cluster", default=None,
78                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79     str_help = ("Numeric IP Address to try to connect to." +
80                 "Currently no effect in listening mode.")
81     parser.add_argument("--peerip", default="127.0.0.2",
82                         type=ipaddr.IPv4Address, help=str_help)
83     str_help = "TCP port to try to connect to. No effect in listening mode."
84     parser.add_argument("--peerport", default="179", type=int, help=str_help)
85     str_help = "Local hold time."
86     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87     str_help = "Log level (--error, --warning, --info, --debug)"
88     parser.add_argument("--error", dest="loglevel", action="store_const",
89                         const=logging.ERROR, default=logging.INFO,
90                         help=str_help)
91     parser.add_argument("--warning", dest="loglevel", action="store_const",
92                         const=logging.WARNING, default=logging.INFO,
93                         help=str_help)
94     parser.add_argument("--info", dest="loglevel", action="store_const",
95                         const=logging.INFO, default=logging.INFO,
96                         help=str_help)
97     parser.add_argument("--debug", dest="loglevel", action="store_const",
98                         const=logging.DEBUG, default=logging.INFO,
99                         help=str_help)
100     str_help = "Log file name"
101     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102     str_help = "Trailing part of the csv result files for plotting purposes"
103     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104     str_help = "Minimum number of updates to reach to include result into csv."
105     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
108     str_help = "How many play utilities are to be started."
109     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
110     arguments = parser.parse_args()
111     if arguments.multiplicity < 1:
112         print "Multiplicity", arguments.multiplicity, "is not positive."
113         raise SystemExit(1)
114     # TODO: Are sanity checks (such as asnumber>=0) required?
115     return arguments
116
117
118 def establish_connection(arguments):
119     """Establish connection to BGP peer.
120
121     Arguments:
122         :arguments: following command-line argumets are used
123             - arguments.myip: local IP address
124             - arguments.myport: local port
125             - arguments.peerip: remote IP address
126             - arguments.peerport: remote port
127     Returns:
128         :return: socket.
129     """
130     if arguments.listen:
131         logger.info("Connecting in the listening mode.")
132         logger.debug("Local IP address: " + str(arguments.myip))
133         logger.debug("Local port: " + str(arguments.myport))
134         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
135         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
136         # bind need single tuple as argument
137         listening_socket.bind((str(arguments.myip), arguments.myport))
138         listening_socket.listen(1)
139         bgp_socket, _ = listening_socket.accept()
140         # TODO: Verify client IP is cotroller IP.
141         listening_socket.close()
142     else:
143         logger.info("Connecting in the talking mode.")
144         logger.debug("Local IP address: " + str(arguments.myip))
145         logger.debug("Local port: " + str(arguments.myport))
146         logger.debug("Remote IP address: " + str(arguments.peerip))
147         logger.debug("Remote port: " + str(arguments.peerport))
148         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
150         # bind to force specified address and port
151         talking_socket.bind((str(arguments.myip), arguments.myport))
152         # socket does not spead ipaddr, hence str()
153         talking_socket.connect((str(arguments.peerip), arguments.peerport))
154         bgp_socket = talking_socket
155     logger.info("Connected to ODL.")
156     return bgp_socket
157
158
159 def get_short_int_from_message(message, offset=16):
160     """Extract 2-bytes number from provided message.
161
162     Arguments:
163         :message: given message
164         :offset: offset of the short_int inside the message
165     Returns:
166         :return: required short_inf value.
167     Notes:
168         default offset value is the BGP message size offset.
169     """
170     high_byte_int = ord(message[offset])
171     low_byte_int = ord(message[offset + 1])
172     short_int = high_byte_int * 256 + low_byte_int
173     return short_int
174
175
176 def get_prefix_list_from_hex(prefixes_hex):
177     """Get decoded list of prefixes (rfc4271#section-4.3)
178
179     Arguments:
180         :prefixes_hex: list of prefixes to be decoded in hex
181     Returns:
182         :return: list of prefixes in the form of ip address (X.X.X.X/X)
183     """
184     prefix_list = []
185     offset = 0
186     while offset < len(prefixes_hex):
187         prefix_bit_len_hex = prefixes_hex[offset]
188         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
189         prefix_len = ((prefix_bit_len - 1) / 8) + 1
190         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
191         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
192         offset += 1 + prefix_len
193         prefix_list.append(prefix + "/" + str(prefix_bit_len))
194     return prefix_list
195
196
197 class MessageError(ValueError):
198     """Value error with logging optimized for hexlified messages."""
199
200     def __init__(self, text, message, *args):
201         """Initialisation.
202
203         Store and call super init for textual comment,
204         store raw message which caused it.
205         """
206         self.text = text
207         self.msg = message
208         super(MessageError, self).__init__(text, message, *args)
209
210     def __str__(self):
211         """Generate human readable error message.
212
213         Returns:
214             :return: human readable message as string
215         Notes:
216             Use a placeholder string if the message is to be empty.
217         """
218         message = binascii.hexlify(self.msg)
219         if message == "":
220             message = "(empty message)"
221         return self.text + ": " + message
222
223
224 def read_open_message(bgp_socket):
225     """Receive peer's OPEN message
226
227     Arguments:
228         :bgp_socket: the socket to be read
229     Returns:
230         :return: received OPEN message.
231     Notes:
232         Performs just basic incomming message checks
233     """
234     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
235     # TODO: Can the incoming open message be split in more than one packet?
236     # Some validation.
237     if len(msg_in) < 37:
238         # 37 is minimal length of open message with 4-byte AS number.
239         logger.error("Got something else than open with 4-byte AS number: " +
240                      binascii.hexlify(msg_in))
241         raise MessageError("Got something else than open with 4-byte AS number", msg_in)
242     # TODO: We could check BGP marker, but it is defined only later;
243     # decide what to do.
244     reported_length = get_short_int_from_message(msg_in)
245     if len(msg_in) != reported_length:
246         logger.error("Message length is not " + str(reported_length) +
247                      " as stated in " + binascii.hexlify(msg_in))
248         raise MessageError("Message length is not " + reported_length +
249                            " as stated in ", msg_in)
250     logger.info("Open message received.")
251     return msg_in
252
253
254 class MessageGenerator(object):
255     """Class which generates messages, holds states and configuration values."""
256
257     # TODO: Define bgp marker as a class (constant) variable.
258     def __init__(self, args):
259         """Initialisation according to command-line args.
260
261         Arguments:
262             :args: argsparser's Namespace object which contains command-line
263                 options for MesageGenerator initialisation
264         Notes:
265             Calculates and stores default values used later on for
266             message geeration.
267         """
268         self.total_prefix_amount = args.amount
269         # Number of update messages left to be sent.
270         self.remaining_prefixes = self.total_prefix_amount
271
272         # New parameters initialisation
273         self.iteration = 0
274         self.prefix_base_default = args.firstprefix
275         self.prefix_length_default = args.prefixlen
276         self.wr_prefixes_default = []
277         self.nlri_prefixes_default = []
278         self.version_default = 4
279         self.my_autonomous_system_default = args.asnumber
280         self.hold_time_default = args.holdtime  # Local hold time.
281         self.bgp_identifier_default = int(args.myip)
282         self.next_hop_default = args.nexthop
283         self.originator_id_default = args.originator
284         self.cluster_list_item_default = args.cluster
285         self.single_update_default = args.updates == "single"
286         self.randomize_updates_default = args.updates == "random"
287         self.prefix_count_to_add_default = args.insert
288         self.prefix_count_to_del_default = args.withdraw
289         if self.prefix_count_to_del_default < 0:
290             self.prefix_count_to_del_default = 0
291         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
292             # total number of prefixes must grow to avoid infinite test loop
293             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
294         self.slot_size_default = self.prefix_count_to_add_default
295         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
296         self.results_file_name_default = args.results
297         self.performance_threshold_default = args.threshold
298         self.rfc4760 = args.rfc4760 == "yes"
299         # Default values used for randomized part
300         s1_slots = ((self.total_prefix_amount -
301                      self.remaining_prefixes_threshold - 1) /
302                     self.prefix_count_to_add_default + 1)
303         s2_slots = ((self.remaining_prefixes_threshold - 1) /
304                     (self.prefix_count_to_add_default -
305                     self.prefix_count_to_del_default) + 1)
306         # S1_First_Index = 0
307         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
308         s2_first_index = s1_slots * self.prefix_count_to_add_default
309         s2_last_index = (s2_first_index +
310                          s2_slots * (self.prefix_count_to_add_default -
311                                      self.prefix_count_to_del_default) - 1)
312         self.slot_gap_default = ((self.total_prefix_amount -
313                                   self.remaining_prefixes_threshold - 1) /
314                                  self.prefix_count_to_add_default + 1)
315         self.randomize_lowest_default = s2_first_index
316         self.randomize_highest_default = s2_last_index
317
318         # Initialising counters
319         self.phase1_start_time = 0
320         self.phase1_stop_time = 0
321         self.phase2_start_time = 0
322         self.phase2_stop_time = 0
323         self.phase1_updates_sent = 0
324         self.phase2_updates_sent = 0
325         self.updates_sent = 0
326
327         self.log_info = args.loglevel <= logging.INFO
328         self.log_debug = args.loglevel <= logging.DEBUG
329         """
330         Flags needed for the MessageGenerator performance optimization.
331         Calling logger methods each iteration even with proper log level set
332         slows down significantly the MessageGenerator performance.
333         Measured total generation time (1M updates, dry run, error log level):
334         - logging based on basic logger features: 36,2s
335         - logging based on advanced logger features (lazy logging): 21,2s
336         - conditional calling of logger methods enclosed inside condition: 8,6s
337         """
338
339         logger.info("Generator initialisation")
340         logger.info("  Target total number of prefixes to be introduced: " +
341                     str(self.total_prefix_amount))
342         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
343                     str(self.prefix_length_default))
344         logger.info("  My Autonomous System number: " +
345                     str(self.my_autonomous_system_default))
346         logger.info("  My Hold Time: " + str(self.hold_time_default))
347         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
348         logger.info("  Next Hop: " + str(self.next_hop_default))
349         logger.info("  Originator ID: " + str(self.originator_id_default))
350         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
351         logger.info("  Prefix count to be inserted at once: " +
352                     str(self.prefix_count_to_add_default))
353         logger.info("  Prefix count to be withdrawn at once: " +
354                     str(self.prefix_count_to_del_default))
355         logger.info("  Fast pre-fill up to " +
356                     str(self.total_prefix_amount -
357                         self.remaining_prefixes_threshold) + " prefixes")
358         logger.info("  Remaining number of prefixes to be processed " +
359                     "in parallel with withdrawals: " +
360                     str(self.remaining_prefixes_threshold))
361         logger.debug("  Prefix index range used after pre-fill procedure [" +
362                      str(self.randomize_lowest_default) + ", " +
363                      str(self.randomize_highest_default) + "]")
364         if self.single_update_default:
365             logger.info("  Common single UPDATE will be generated " +
366                         "for both NLRI & WITHDRAWN lists")
367         else:
368             logger.info("  Two separate UPDATEs will be generated " +
369                         "for each NLRI & WITHDRAWN lists")
370         if self.randomize_updates_default:
371             logger.info("  Generation of UPDATE messages will be randomized")
372         logger.info("  Let\'s go ...\n")
373
374         # TODO: Notification for hold timer expiration can be handy.
375
376     def store_results(self, file_name=None, threshold=None):
377         """ Stores specified results into files based on file_name value.
378
379         Arguments:
380             :param file_name: Trailing (common) part of result file names
381             :param threshold: Minimum number of sent updates needed for each
382                               result to be included into result csv file
383                               (mainly needed because of the result accuracy)
384         Returns:
385             :return: n/a
386         """
387         # default values handling
388         # TODO optimize default values handling (use e.g. dicionary.update() approach)
389         if file_name is None:
390             file_name = self.results_file_name_default
391         if threshold is None:
392             threshold = self.performance_threshold_default
393         # performance calculation
394         if self.phase1_updates_sent >= threshold:
395             totals1 = self.phase1_updates_sent
396             performance1 = int(self.phase1_updates_sent /
397                                (self.phase1_stop_time - self.phase1_start_time))
398         else:
399             totals1 = None
400             performance1 = None
401         if self.phase2_updates_sent >= threshold:
402             totals2 = self.phase2_updates_sent
403             performance2 = int(self.phase2_updates_sent /
404                                (self.phase2_stop_time - self.phase2_start_time))
405         else:
406             totals2 = None
407             performance2 = None
408
409         logger.info("#" * 10 + " Final results " + "#" * 10)
410         logger.info("Number of iterations: " + str(self.iteration))
411         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
412                     str(self.phase1_updates_sent))
413         logger.info("The pre-fill phase duration: " +
414                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
415         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
416                     str(self.phase2_updates_sent))
417         logger.info("The 2nd test phase duration: " +
418                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
419         logger.info("Threshold for performance reporting: " + str(threshold))
420
421         # making labels
422         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
423                         " route(s) per UPDATE")
424         if self.single_update_default:
425             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
426                                   "/-" + str(self.prefix_count_to_del_default) +
427                                   " routes per UPDATE")
428         else:
429             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
430                                   "/-" + str(self.prefix_count_to_del_default) +
431                                   " routes in two UPDATEs")
432         # collecting capacity and performance results
433         totals = {}
434         performance = {}
435         if totals1 is not None:
436             totals[phase1_label] = totals1
437             performance[phase1_label] = performance1
438         if totals2 is not None:
439             totals[phase2_label] = totals2
440             performance[phase2_label] = performance2
441         self.write_results_to_file(totals, "totals-" + file_name)
442         self.write_results_to_file(performance, "performance-" + file_name)
443
444     def write_results_to_file(self, results, file_name):
445         """Writes results to the csv plot file consumable by Jenkins.
446
447         Arguments:
448             :param file_name: Name of the (csv) file to be created
449         Returns:
450             :return: none
451         """
452         first_line = ""
453         second_line = ""
454         f = open(file_name, "wt")
455         try:
456             for key in sorted(results):
457                 first_line += key + ", "
458                 second_line += str(results[key]) + ", "
459             first_line = first_line[:-2]
460             second_line = second_line[:-2]
461             f.write(first_line + "\n")
462             f.write(second_line + "\n")
463             logger.info("Message generator performance results stored in " +
464                         file_name + ":")
465             logger.info("  " + first_line)
466             logger.info("  " + second_line)
467         finally:
468             f.close()
469
470     # Return pseudo-randomized (reproducible) index for selected range
471     def randomize_index(self, index, lowest=None, highest=None):
472         """Calculates pseudo-randomized index from selected range.
473
474         Arguments:
475             :param index: input index
476             :param lowest: the lowes index from the randomized area
477             :param highest: the highest index from the randomized area
478         Returns:
479             :return: the (pseudo)randomized index
480         Notes:
481             Created just as a fame for future generator enhancement.
482         """
483         # default values handling
484         # TODO optimize default values handling (use e.g. dicionary.update() approach)
485         if lowest is None:
486             lowest = self.randomize_lowest_default
487         if highest is None:
488             highest = self.randomize_highest_default
489         # randomize
490         if (index >= lowest) and (index <= highest):
491             # we are in the randomized range -> shuffle it inside
492             # the range (now just reverse the order)
493             new_index = highest - (index - lowest)
494         else:
495             # we are out of the randomized range -> nothing to do
496             new_index = index
497         return new_index
498
499     # Get list of prefixes
500     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
501                         prefix_len=None, prefix_count=None, randomize=None):
502         """Generates list of IP address prefixes.
503
504         Arguments:
505             :param slot_index: index of group of prefix addresses
506             :param slot_size: size of group of prefix addresses
507                 in [number of included prefixes]
508             :param prefix_base: IP address of the first prefix
509                 (slot_index = 0, prefix_index = 0)
510             :param prefix_len: length of the prefix in bites
511                 (the same as size of netmask)
512             :param prefix_count: number of prefixes to be returned
513                 from the specified slot
514         Returns:
515             :return: list of generated IP address prefixes
516         """
517         # default values handling
518         # TODO optimize default values handling (use e.g. dicionary.update() approach)
519         if slot_size is None:
520             slot_size = self.slot_size_default
521         if prefix_base is None:
522             prefix_base = self.prefix_base_default
523         if prefix_len is None:
524             prefix_len = self.prefix_length_default
525         if prefix_count is None:
526             prefix_count = slot_size
527         if randomize is None:
528             randomize = self.randomize_updates_default
529         # generating list of prefixes
530         indexes = []
531         prefixes = []
532         prefix_gap = 2 ** (32 - prefix_len)
533         for i in range(prefix_count):
534             prefix_index = slot_index * slot_size + i
535             if randomize:
536                 prefix_index = self.randomize_index(prefix_index)
537             indexes.append(prefix_index)
538             prefixes.append(prefix_base + prefix_index * prefix_gap)
539         if self.log_debug:
540             logger.debug("  Prefix slot index: " + str(slot_index))
541             logger.debug("  Prefix slot size: " + str(slot_size))
542             logger.debug("  Prefix count: " + str(prefix_count))
543             logger.debug("  Prefix indexes: " + str(indexes))
544             logger.debug("  Prefix list: " + str(prefixes))
545         return prefixes
546
547     def compose_update_message(self, prefix_count_to_add=None,
548                                prefix_count_to_del=None):
549         """Composes an UPDATE message
550
551         Arguments:
552             :param prefix_count_to_add: # of prefixes to put into NLRI list
553             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
554         Returns:
555             :return: encoded UPDATE message in HEX
556         Notes:
557             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
558             lists or common message wich includes both prefix lists.
559             Updates global counters.
560         """
561         # default values handling
562         # TODO optimize default values handling (use e.g. dicionary.update() approach)
563         if prefix_count_to_add is None:
564             prefix_count_to_add = self.prefix_count_to_add_default
565         if prefix_count_to_del is None:
566             prefix_count_to_del = self.prefix_count_to_del_default
567         # logging
568         if self.log_info and not (self.iteration % 1000):
569             logger.info("Iteration: " + str(self.iteration) +
570                         " - total remaining prefixes: " +
571                         str(self.remaining_prefixes))
572         if self.log_debug:
573             logger.debug("#" * 10 + " Iteration: " +
574                          str(self.iteration) + " " + "#" * 10)
575             logger.debug("Remaining prefixes: " +
576                          str(self.remaining_prefixes))
577         # scenario type & one-shot counter
578         straightforward_scenario = (self.remaining_prefixes >
579                                     self.remaining_prefixes_threshold)
580         if straightforward_scenario:
581             prefix_count_to_del = 0
582             if self.log_debug:
583                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
584             if not self.phase1_start_time:
585                 self.phase1_start_time = time.time()
586         else:
587             if self.log_debug:
588                 logger.debug("--- COMBINED SCENARIO ---")
589             if not self.phase2_start_time:
590                 self.phase2_start_time = time.time()
591         # tailor the number of prefixes if needed
592         prefix_count_to_add = (prefix_count_to_del +
593                                min(prefix_count_to_add - prefix_count_to_del,
594                                    self.remaining_prefixes))
595         # prefix slots selection for insertion and withdrawal
596         slot_index_to_add = self.iteration
597         slot_index_to_del = slot_index_to_add - self.slot_gap_default
598         # getting lists of prefixes for insertion in this iteration
599         if self.log_debug:
600             logger.debug("Prefixes to be inserted in this iteration:")
601         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
602                                                   prefix_count=prefix_count_to_add)
603         # getting lists of prefixes for withdrawal in this iteration
604         if self.log_debug:
605             logger.debug("Prefixes to be withdrawn in this iteration:")
606         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
607                                                   prefix_count=prefix_count_to_del)
608         # generating the mesage
609         if self.single_update_default:
610             # Send prefixes to be introduced and withdrawn
611             # in one UPDATE message
612             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
613                                           nlri_prefixes=prefix_list_to_add)
614         else:
615             # Send prefixes to be introduced and withdrawn
616             # in separate UPDATE messages (if needed)
617             msg_out = self.update_message(wr_prefixes=[],
618                                           nlri_prefixes=prefix_list_to_add)
619             if prefix_count_to_del:
620                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
621                                                nlri_prefixes=[])
622         # updating counters - who knows ... maybe I am last time here ;)
623         if straightforward_scenario:
624             self.phase1_stop_time = time.time()
625             self.phase1_updates_sent = self.updates_sent
626         else:
627             self.phase2_stop_time = time.time()
628             self.phase2_updates_sent = (self.updates_sent -
629                                         self.phase1_updates_sent)
630         # updating totals for the next iteration
631         self.iteration += 1
632         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
633         # returning the encoded message
634         return msg_out
635
636     # Section of message encoders
637
638     def open_message(self, version=None, my_autonomous_system=None,
639                      hold_time=None, bgp_identifier=None):
640         """Generates an OPEN Message (rfc4271#section-4.2)
641
642         Arguments:
643             :param version: see the rfc4271#section-4.2
644             :param my_autonomous_system: see the rfc4271#section-4.2
645             :param hold_time: see the rfc4271#section-4.2
646             :param bgp_identifier: see the rfc4271#section-4.2
647         Returns:
648             :return: encoded OPEN message in HEX
649         """
650
651         # default values handling
652         # TODO optimize default values handling (use e.g. dicionary.update() approach)
653         if version is None:
654             version = self.version_default
655         if my_autonomous_system is None:
656             my_autonomous_system = self.my_autonomous_system_default
657         if hold_time is None:
658             hold_time = self.hold_time_default
659         if bgp_identifier is None:
660             bgp_identifier = self.bgp_identifier_default
661
662         # Marker
663         marker_hex = "\xFF" * 16
664
665         # Type
666         type = 1
667         type_hex = struct.pack("B", type)
668
669         # version
670         version_hex = struct.pack("B", version)
671
672         # my_autonomous_system
673         # AS_TRANS value, 23456 decadic.
674         my_autonomous_system_2_bytes = 23456
675         # AS number is mappable to 2 bytes
676         if my_autonomous_system < 65536:
677             my_autonomous_system_2_bytes = my_autonomous_system
678         my_autonomous_system_hex_2_bytes = struct.pack(">H",
679                                                        my_autonomous_system)
680
681         # Hold Time
682         hold_time_hex = struct.pack(">H", hold_time)
683
684         # BGP Identifier
685         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
686
687         # Optional Parameters
688         optional_parameters_hex = ""
689         if self.rfc4760:
690             optional_parameter_hex = (
691                 "\x02"  # Param type ("Capability Ad")
692                 "\x06"  # Length (6 bytes)
693                 "\x01"  # Capability type (NLRI Unicast),
694                         # see RFC 4760, secton 8
695                 "\x04"  # Capability value length
696                 "\x00\x01"  # AFI (Ipv4)
697                 "\x00"  # (reserved)
698                 "\x01"  # SAFI (Unicast)
699             )
700             optional_parameters_hex += optional_parameter_hex
701
702         optional_parameter_hex = (
703             "\x02"  # Param type ("Capability Ad")
704             "\x06"  # Length (6 bytes)
705             "\x41"  # "32 bit AS Numbers Support"
706                     # (see RFC 6793, section 3)
707             "\x04"  # Capability value length
708         )
709         optional_parameter_hex += (
710             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
711         )
712         optional_parameters_hex += optional_parameter_hex
713
714         # Optional Parameters Length
715         optional_parameters_length = len(optional_parameters_hex)
716         optional_parameters_length_hex = struct.pack("B",
717                                                      optional_parameters_length)
718
719         # Length (big-endian)
720         length = (
721             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
722             len(my_autonomous_system_hex_2_bytes) +
723             len(hold_time_hex) + len(bgp_identifier_hex) +
724             len(optional_parameters_length_hex) +
725             len(optional_parameters_hex)
726         )
727         length_hex = struct.pack(">H", length)
728
729         # OPEN Message
730         message_hex = (
731             marker_hex +
732             length_hex +
733             type_hex +
734             version_hex +
735             my_autonomous_system_hex_2_bytes +
736             hold_time_hex +
737             bgp_identifier_hex +
738             optional_parameters_length_hex +
739             optional_parameters_hex
740         )
741
742         if self.log_debug:
743             logger.debug("OPEN message encoding")
744             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
745             logger.debug("  Length=" + str(length) + " (0x" +
746                          binascii.hexlify(length_hex) + ")")
747             logger.debug("  Type=" + str(type) + " (0x" +
748                          binascii.hexlify(type_hex) + ")")
749             logger.debug("  Version=" + str(version) + " (0x" +
750                          binascii.hexlify(version_hex) + ")")
751             logger.debug("  My Autonomous System=" +
752                          str(my_autonomous_system_2_bytes) + " (0x" +
753                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
754                          ")")
755             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
756                          binascii.hexlify(hold_time_hex) + ")")
757             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
758                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
759             logger.debug("  Optional Parameters Length=" +
760                          str(optional_parameters_length) + " (0x" +
761                          binascii.hexlify(optional_parameters_length_hex) +
762                          ")")
763             logger.debug("  Optional Parameters=0x" +
764                          binascii.hexlify(optional_parameters_hex))
765             logger.debug("OPEN message encoded: 0x%s",
766                          binascii.b2a_hex(message_hex))
767
768         return message_hex
769
770     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
771                        wr_prefix_length=None, nlri_prefix_length=None,
772                        my_autonomous_system=None, next_hop=None,
773                        originator_id=None, cluster_list_item=None):
774         """Generates an UPDATE Message (rfc4271#section-4.3)
775
776         Arguments:
777             :param wr_prefixes: see the rfc4271#section-4.3
778             :param nlri_prefixes: see the rfc4271#section-4.3
779             :param wr_prefix_length: see the rfc4271#section-4.3
780             :param nlri_prefix_length: see the rfc4271#section-4.3
781             :param my_autonomous_system: see the rfc4271#section-4.3
782             :param next_hop: see the rfc4271#section-4.3
783         Returns:
784             :return: encoded UPDATE message in HEX
785         """
786
787         # default values handling
788         # TODO optimize default values handling (use e.g. dicionary.update() approach)
789         if wr_prefixes is None:
790             wr_prefixes = self.wr_prefixes_default
791         if nlri_prefixes is None:
792             nlri_prefixes = self.nlri_prefixes_default
793         if wr_prefix_length is None:
794             wr_prefix_length = self.prefix_length_default
795         if nlri_prefix_length is None:
796             nlri_prefix_length = self.prefix_length_default
797         if my_autonomous_system is None:
798             my_autonomous_system = self.my_autonomous_system_default
799         if next_hop is None:
800             next_hop = self.next_hop_default
801         if originator_id is None:
802             originator_id = self.originator_id_default
803         if cluster_list_item is None:
804             cluster_list_item = self.cluster_list_item_default
805
806         # Marker
807         marker_hex = "\xFF" * 16
808
809         # Type
810         type = 2
811         type_hex = struct.pack("B", type)
812
813         # Withdrawn Routes
814         bytes = ((wr_prefix_length - 1) / 8) + 1
815         withdrawn_routes_hex = ""
816         for prefix in wr_prefixes:
817             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
818                                    struct.pack(">I", int(prefix))[:bytes])
819             withdrawn_routes_hex += withdrawn_route_hex
820
821         # Withdrawn Routes Length
822         withdrawn_routes_length = len(withdrawn_routes_hex)
823         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
824
825         # TODO: to replace hardcoded string by encoding?
826         # Path Attributes
827         path_attributes_hex = ""
828         if nlri_prefixes != []:
829             path_attributes_hex += (
830                 "\x40"  # Flags ("Well-Known")
831                 "\x01"  # Type (ORIGIN)
832                 "\x01"  # Length (1)
833                 "\x00"  # Origin: IGP
834             )
835             path_attributes_hex += (
836                 "\x40"  # Flags ("Well-Known")
837                 "\x02"  # Type (AS_PATH)
838                 "\x06"  # Length (6)
839                 "\x02"  # AS segment type (AS_SEQUENCE)
840                 "\x01"  # AS segment length (1)
841             )
842             my_as_hex = struct.pack(">I", my_autonomous_system)
843             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
844             path_attributes_hex += (
845                 "\x40"  # Flags ("Well-Known")
846                 "\x03"  # Type (NEXT_HOP)
847                 "\x04"  # Length (4)
848             )
849             next_hop_hex = struct.pack(">I", int(next_hop))
850             path_attributes_hex += (
851                 next_hop_hex  # IP address of the next hop (4 bytes)
852             )
853             if originator_id is not None:
854                 path_attributes_hex += (
855                     "\x80"  # Flags ("Optional, non-transitive")
856                     "\x09"  # Type (ORIGINATOR_ID)
857                     "\x04"  # Length (4)
858                 )           # ORIGINATOR_ID (4 bytes)
859                 path_attributes_hex += struct.pack(">I", int(originator_id))
860             if cluster_list_item is not None:
861                 path_attributes_hex += (
862                     "\x80"  # Flags ("Optional, non-transitive")
863                     "\x09"  # Type (CLUSTER_LIST)
864                     "\x04"  # Length (4)
865                 )           # one CLUSTER_LIST item (4 bytes)
866                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
867
868         # Total Path Attributes Length
869         total_path_attributes_length = len(path_attributes_hex)
870         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
871
872         # Network Layer Reachability Information
873         bytes = ((nlri_prefix_length - 1) / 8) + 1
874         nlri_hex = ""
875         for prefix in nlri_prefixes:
876             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
877                                struct.pack(">I", int(prefix))[:bytes])
878             nlri_hex += nlri_prefix_hex
879
880         # Length (big-endian)
881         length = (
882             len(marker_hex) + 2 + len(type_hex) +
883             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
884             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
885             len(nlri_hex))
886         length_hex = struct.pack(">H", length)
887
888         # UPDATE Message
889         message_hex = (
890             marker_hex +
891             length_hex +
892             type_hex +
893             withdrawn_routes_length_hex +
894             withdrawn_routes_hex +
895             total_path_attributes_length_hex +
896             path_attributes_hex +
897             nlri_hex
898         )
899
900         if self.log_debug:
901             logger.debug("UPDATE message encoding")
902             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
903             logger.debug("  Length=" + str(length) + " (0x" +
904                          binascii.hexlify(length_hex) + ")")
905             logger.debug("  Type=" + str(type) + " (0x" +
906                          binascii.hexlify(type_hex) + ")")
907             logger.debug("  withdrawn_routes_length=" +
908                          str(withdrawn_routes_length) + " (0x" +
909                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
910             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
911                          str(wr_prefix_length) + " (0x" +
912                          binascii.hexlify(withdrawn_routes_hex) + ")")
913             if total_path_attributes_length:
914                 logger.debug("  Total Path Attributes Length=" +
915                              str(total_path_attributes_length) + " (0x" +
916                              binascii.hexlify(total_path_attributes_length_hex) + ")")
917                 logger.debug("  Path Attributes=" + "(0x" +
918                              binascii.hexlify(path_attributes_hex) + ")")
919                 logger.debug("    Origin=IGP")
920                 logger.debug("    AS path=" + str(my_autonomous_system))
921                 logger.debug("    Next hop=" + str(next_hop))
922                 if originator_id is not None:
923                     logger.debug("    Originator id=" + str(originator_id))
924                 if cluster_list_item is not None:
925                     logger.debug("    Cluster list=" + str(cluster_list_item))
926             logger.debug("  Network Layer Reachability Information=" +
927                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
928                          " (0x" + binascii.hexlify(nlri_hex) + ")")
929             logger.debug("UPDATE message encoded: 0x" +
930                          binascii.b2a_hex(message_hex))
931
932         # updating counter
933         self.updates_sent += 1
934         # returning encoded message
935         return message_hex
936
937     def notification_message(self, error_code, error_subcode, data_hex=""):
938         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
939
940         Arguments:
941             :param error_code: see the rfc4271#section-4.5
942             :param error_subcode: see the rfc4271#section-4.5
943             :param data_hex: see the rfc4271#section-4.5
944         Returns:
945             :return: encoded NOTIFICATION message in HEX
946         """
947
948         # Marker
949         marker_hex = "\xFF" * 16
950
951         # Type
952         type = 3
953         type_hex = struct.pack("B", type)
954
955         # Error Code
956         error_code_hex = struct.pack("B", error_code)
957
958         # Error Subode
959         error_subcode_hex = struct.pack("B", error_subcode)
960
961         # Length (big-endian)
962         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
963                   len(error_subcode_hex) + len(data_hex))
964         length_hex = struct.pack(">H", length)
965
966         # NOTIFICATION Message
967         message_hex = (
968             marker_hex +
969             length_hex +
970             type_hex +
971             error_code_hex +
972             error_subcode_hex +
973             data_hex
974         )
975
976         if self.log_debug:
977             logger.debug("NOTIFICATION message encoding")
978             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
979             logger.debug("  Length=" + str(length) + " (0x" +
980                          binascii.hexlify(length_hex) + ")")
981             logger.debug("  Type=" + str(type) + " (0x" +
982                          binascii.hexlify(type_hex) + ")")
983             logger.debug("  Error Code=" + str(error_code) + " (0x" +
984                          binascii.hexlify(error_code_hex) + ")")
985             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
986                          binascii.hexlify(error_subcode_hex) + ")")
987             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
988             logger.debug("NOTIFICATION message encoded: 0x%s",
989                          binascii.b2a_hex(message_hex))
990
991         return message_hex
992
993     def keepalive_message(self):
994         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
995
996         Returns:
997             :return: encoded KEEP ALIVE message in HEX
998         """
999
1000         # Marker
1001         marker_hex = "\xFF" * 16
1002
1003         # Type
1004         type = 4
1005         type_hex = struct.pack("B", type)
1006
1007         # Length (big-endian)
1008         length = len(marker_hex) + 2 + len(type_hex)
1009         length_hex = struct.pack(">H", length)
1010
1011         # KEEP ALIVE Message
1012         message_hex = (
1013             marker_hex +
1014             length_hex +
1015             type_hex
1016         )
1017
1018         if self.log_debug:
1019             logger.debug("KEEP ALIVE message encoding")
1020             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1021             logger.debug("  Length=" + str(length) + " (0x" +
1022                          binascii.hexlify(length_hex) + ")")
1023             logger.debug("  Type=" + str(type) + " (0x" +
1024                          binascii.hexlify(type_hex) + ")")
1025             logger.debug("KEEP ALIVE message encoded: 0x%s",
1026                          binascii.b2a_hex(message_hex))
1027
1028         return message_hex
1029
1030
1031 class TimeTracker(object):
1032     """Class for tracking timers, both for my keepalives and
1033     peer's hold time.
1034     """
1035
1036     def __init__(self, msg_in):
1037         """Initialisation. based on defaults and OPEN message from peer.
1038
1039         Arguments:
1040             msg_in: the OPEN message received from peer.
1041         """
1042         # Note: Relative time is always named timedelta, to stress that
1043         # the (non-delta) time is absolute.
1044         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1045         # Upper bound for being stuck in the same state, we should
1046         # at least report something before continuing.
1047         # Negotiate the hold timer by taking the smaller
1048         # of the 2 values (mine and the peer's).
1049         hold_timedelta = 180  # Not an attribute of self yet.
1050         # TODO: Make the default value configurable,
1051         # default value could mirror what peer said.
1052         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1053         if hold_timedelta > peer_hold_timedelta:
1054             hold_timedelta = peer_hold_timedelta
1055         if hold_timedelta != 0 and hold_timedelta < 3:
1056             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1057             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1058         self.hold_timedelta = hold_timedelta
1059         # If we do not hear from peer this long, we assume it has died.
1060         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1061         # Upper limit for duration between messages, to avoid being
1062         # declared to be dead.
1063         # The same as calling snapshot(), but also declares a field.
1064         self.snapshot_time = time.time()
1065         # Sometimes we need to store time. This is where to get
1066         # the value from afterwards. Time_keepalive may be too strict.
1067         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1068         # At this time point, peer will be declared dead.
1069         self.my_keepalive_time = None  # to be set later
1070         # At this point, we should be sending keepalive message.
1071
1072     def snapshot(self):
1073         """Store current time in instance data to use later."""
1074         # Read as time before something interesting was called.
1075         self.snapshot_time = time.time()
1076
1077     def reset_peer_hold_time(self):
1078         """Move hold time to future as peer has just proven it still lives."""
1079         self.peer_hold_time = time.time() + self.hold_timedelta
1080
1081     # Some methods could rely on self.snapshot_time, but it is better
1082     # to require user to provide it explicitly.
1083     def reset_my_keepalive_time(self, keepalive_time):
1084         """Calculate and set the next my KEEP ALIVE timeout time
1085
1086         Arguments:
1087             :keepalive_time: the initial value of the KEEP ALIVE timer
1088         """
1089         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1090
1091     def is_time_for_my_keepalive(self):
1092         """Check for my KEEP ALIVE timeout occurence"""
1093         if self.hold_timedelta == 0:
1094             return False
1095         return self.snapshot_time >= self.my_keepalive_time
1096
1097     def get_next_event_time(self):
1098         """Set the time of the next expected or to be sent KEEP ALIVE"""
1099         if self.hold_timedelta == 0:
1100             return self.snapshot_time + 86400
1101         return min(self.my_keepalive_time, self.peer_hold_time)
1102
1103     def check_peer_hold_time(self, snapshot_time):
1104         """Raise error if nothing was read from peer until specified time."""
1105         # Hold time = 0 means keepalive checking off.
1106         if self.hold_timedelta != 0:
1107             # time.time() may be too strict
1108             if snapshot_time > self.peer_hold_time:
1109                 logger.error("Peer has overstepped the hold timer.")
1110                 raise RuntimeError("Peer has overstepped the hold timer.")
1111                 # TODO: Include hold_timedelta?
1112                 # TODO: Add notification sending (attempt). That means
1113                 # move to write tracker.
1114
1115
1116 class ReadTracker(object):
1117     """Class for tracking read of mesages chunk by chunk and
1118     for idle waiting.
1119     """
1120
1121     def __init__(self, bgp_socket, timer):
1122         """The reader initialisation.
1123
1124         Arguments:
1125             bgp_socket: socket to be used for sending
1126             timer: timer to be used for scheduling
1127         """
1128         # References to outside objects.
1129         self.socket = bgp_socket
1130         self.timer = timer
1131         # BGP marker length plus length field length.
1132         self.header_length = 18
1133         # TODO: make it class (constant) attribute
1134         # Computation of where next chunk ends depends on whether
1135         # we are beyond length field.
1136         self.reading_header = True
1137         # Countdown towards next size computation.
1138         self.bytes_to_read = self.header_length
1139         # Incremental buffer for message under read.
1140         self.msg_in = ""
1141         # Initialising counters
1142         self.updates_received = 0
1143         self.prefixes_introduced = 0
1144         self.prefixes_withdrawn = 0
1145         self.rx_idle_time = 0
1146         self.rx_activity_detected = True
1147
1148     def read_message_chunk(self):
1149         """Read up to one message
1150
1151         Note:
1152             Currently it does not return anything.
1153         """
1154         # TODO: We could return the whole message, currently not needed.
1155         # We assume the socket is readable.
1156         chunk_message = self.socket.recv(self.bytes_to_read)
1157         self.msg_in += chunk_message
1158         self.bytes_to_read -= len(chunk_message)
1159         # TODO: bytes_to_read < 0 is not possible, right?
1160         if not self.bytes_to_read:
1161             # Finished reading a logical block.
1162             if self.reading_header:
1163                 # The logical block was a BGP header.
1164                 # Now we know the size of the message.
1165                 self.reading_header = False
1166                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1167                                       self.header_length)
1168             else:  # We have finished reading the body of the message.
1169                 # Peer has just proven it is still alive.
1170                 self.timer.reset_peer_hold_time()
1171                 # TODO: Do we want to count received messages?
1172                 # This version ignores the received message.
1173                 # TODO: Should we do validation and exit on anything
1174                 # besides update or keepalive?
1175                 # Prepare state for reading another message.
1176                 message_type_hex = self.msg_in[self.header_length]
1177                 if message_type_hex == "\x01":
1178                     logger.info("OPEN message received: 0x%s",
1179                                 binascii.b2a_hex(self.msg_in))
1180                 elif message_type_hex == "\x02":
1181                     logger.debug("UPDATE message received: 0x%s",
1182                                  binascii.b2a_hex(self.msg_in))
1183                     self.decode_update_message(self.msg_in)
1184                 elif message_type_hex == "\x03":
1185                     logger.info("NOTIFICATION message received: 0x%s",
1186                                 binascii.b2a_hex(self.msg_in))
1187                 elif message_type_hex == "\x04":
1188                     logger.info("KEEP ALIVE message received: 0x%s",
1189                                 binascii.b2a_hex(self.msg_in))
1190                 else:
1191                     logger.warning("Unexpected message received: 0x%s",
1192                                    binascii.b2a_hex(self.msg_in))
1193                 self.msg_in = ""
1194                 self.reading_header = True
1195                 self.bytes_to_read = self.header_length
1196         # We should not act upon peer_hold_time if we are reading
1197         # something right now.
1198         return
1199
1200     def decode_path_attributes(self, path_attributes_hex):
1201         """Decode the Path Attributes field (rfc4271#section-4.3)
1202
1203         Arguments:
1204             :path_attributes: path_attributes field to be decoded in hex
1205         Returns:
1206             :return: None
1207         """
1208         hex_to_decode = path_attributes_hex
1209
1210         while len(hex_to_decode):
1211             attr_flags_hex = hex_to_decode[0]
1212             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1213 #            attr_optional_bit = attr_flags & 128
1214 #            attr_transitive_bit = attr_flags & 64
1215 #            attr_partial_bit = attr_flags & 32
1216             attr_extended_length_bit = attr_flags & 16
1217
1218             attr_type_code_hex = hex_to_decode[1]
1219             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1220
1221             if attr_extended_length_bit:
1222                 attr_length_hex = hex_to_decode[2:4]
1223                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1224                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1225                 hex_to_decode = hex_to_decode[4 + attr_length:]
1226             else:
1227                 attr_length_hex = hex_to_decode[2]
1228                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1229                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1230                 hex_to_decode = hex_to_decode[3 + attr_length:]
1231
1232             if attr_type_code == 1:
1233                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1234                              binascii.b2a_hex(attr_flags_hex))
1235                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1236             elif attr_type_code == 2:
1237                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1238                              binascii.b2a_hex(attr_flags_hex))
1239                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1240             elif attr_type_code == 3:
1241                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1242                              binascii.b2a_hex(attr_flags_hex))
1243                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1244             elif attr_type_code == 4:
1245                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1246                              binascii.b2a_hex(attr_flags_hex))
1247                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1248             elif attr_type_code == 5:
1249                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1250                              binascii.b2a_hex(attr_flags_hex))
1251                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1252             elif attr_type_code == 6:
1253                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1254                              binascii.b2a_hex(attr_flags_hex))
1255                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1256             elif attr_type_code == 7:
1257                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1258                              binascii.b2a_hex(attr_flags_hex))
1259                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1260             elif attr_type_code == 9:  # rfc4456#section-8
1261                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1262                              binascii.b2a_hex(attr_flags_hex))
1263                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1264             elif attr_type_code == 10:  # rfc4456#section-8
1265                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1266                              binascii.b2a_hex(attr_flags_hex))
1267                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1268             elif attr_type_code == 14:  # rfc4760#section-3
1269                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1270                              binascii.b2a_hex(attr_flags_hex))
1271                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1272                 address_family_identifier_hex = attr_value_hex[0:2]
1273                 logger.debug("  Address Family Identifier=0x%s",
1274                              binascii.b2a_hex(address_family_identifier_hex))
1275                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1276                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1277                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1278                 next_hop_netaddr_len_hex = attr_value_hex[3]
1279                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1280                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1281                              next_hop_netaddr_len,
1282                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1283                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1284                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1285                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1286                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1287                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1288                 logger.debug("  Reserved=0x%s",
1289                              binascii.b2a_hex(reserved_hex))
1290                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1291                 logger.debug("  Network Layer Reachability Information=0x%s",
1292                              binascii.b2a_hex(nlri_hex))
1293                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1294                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1295                 for prefix in nlri_prefix_list:
1296                     logger.debug("  nlri_prefix_received: %s", prefix)
1297                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1298             elif attr_type_code == 15:  # rfc4760#section-4
1299                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1300                              binascii.b2a_hex(attr_flags_hex))
1301                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1302                 address_family_identifier_hex = attr_value_hex[0:2]
1303                 logger.debug("  Address Family Identifier=0x%s",
1304                              binascii.b2a_hex(address_family_identifier_hex))
1305                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1306                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1307                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1308                 wd_hex = attr_value_hex[3:]
1309                 logger.debug("  Withdrawn Routes=0x%s",
1310                              binascii.b2a_hex(wd_hex))
1311                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1312                 logger.debug("  Withdrawn routes prefix list: %s",
1313                              wdr_prefix_list)
1314                 for prefix in wdr_prefix_list:
1315                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1316                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1317             else:
1318                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1319                              binascii.b2a_hex(attr_flags_hex))
1320                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1321         return None
1322
1323     def decode_update_message(self, msg):
1324         """Decode an UPDATE message (rfc4271#section-4.3)
1325
1326         Arguments:
1327             :msg: message to be decoded in hex
1328         Returns:
1329             :return: None
1330         """
1331         logger.debug("Decoding update message:")
1332         # message header - marker
1333         marker_hex = msg[:16]
1334         logger.debug("Message header marker: 0x%s",
1335                      binascii.b2a_hex(marker_hex))
1336         # message header - message length
1337         msg_length_hex = msg[16:18]
1338         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1339         logger.debug("Message lenght: 0x%s (%s)",
1340                      binascii.b2a_hex(msg_length_hex), msg_length)
1341         # message header - message type
1342         msg_type_hex = msg[18:19]
1343         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1344         if msg_type == 2:
1345             logger.debug("Message type: 0x%s (update)",
1346                          binascii.b2a_hex(msg_type_hex))
1347             # withdrawn routes length
1348             wdr_length_hex = msg[19:21]
1349             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1350             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1351                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1352             # withdrawn routes
1353             wdr_hex = msg[21:21 + wdr_length]
1354             logger.debug("Withdrawn routes: 0x%s",
1355                          binascii.b2a_hex(wdr_hex))
1356             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1357             logger.debug("Withdrawn routes prefix list: %s",
1358                          wdr_prefix_list)
1359             for prefix in wdr_prefix_list:
1360                 logger.debug("withdrawn_prefix_received: %s", prefix)
1361             # total path attribute length
1362             total_pa_length_offset = 21 + wdr_length
1363             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1364             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1365             logger.debug("Total path attribute lenght: 0x%s (%s)",
1366                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1367             # path attributes
1368             pa_offset = total_pa_length_offset + 2
1369             pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1370             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1371             self.decode_path_attributes(pa_hex)
1372             # network layer reachability information length
1373             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1374             logger.debug("Calculated NLRI length: %s", nlri_length)
1375             # network layer reachability information
1376             nlri_offset = pa_offset + total_pa_length
1377             nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1378             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1379             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1380             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1381             for prefix in nlri_prefix_list:
1382                 logger.debug("nlri_prefix_received: %s", prefix)
1383             # Updating counters
1384             self.updates_received += 1
1385             self.prefixes_introduced += len(nlri_prefix_list)
1386             self.prefixes_withdrawn += len(wdr_prefix_list)
1387         else:
1388             logger.error("Unexpeced message type 0x%s in 0x%s",
1389                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1390
1391     def wait_for_read(self):
1392         """Read message until timeout (next expected event).
1393
1394         Note:
1395             Used when no more updates has to be sent to avoid busy-wait.
1396             Currently it does not return anything.
1397         """
1398         # Compute time to the first predictable state change
1399         event_time = self.timer.get_next_event_time()
1400         # snapshot_time would be imprecise
1401         wait_timedelta = min(event_time - time.time(), 10)
1402         if wait_timedelta < 0:
1403             # The program got around to waiting to an event in "very near
1404             # future" so late that it became a "past" event, thus tell
1405             # "select" to not wait at all. Passing negative timedelta to
1406             # select() would lead to either waiting forever (for -1) or
1407             # select.error("Invalid parameter") (for everything else).
1408             wait_timedelta = 0
1409         # And wait for event or something to read.
1410
1411         if not self.rx_activity_detected or not (self.updates_received % 100):
1412             # right time to write statistics to the log (not for every update and
1413             # not too frequently to avoid having large log files)
1414             logger.info("total_received_update_message_counter: %s",
1415                         self.updates_received)
1416             logger.info("total_received_nlri_prefix_counter: %s",
1417                         self.prefixes_introduced)
1418             logger.info("total_received_withdrawn_prefix_counter: %s",
1419                         self.prefixes_withdrawn)
1420
1421         start_time = time.time()
1422         select.select([self.socket], [], [self.socket], wait_timedelta)
1423         timedelta = time.time() - start_time
1424         self.rx_idle_time += timedelta
1425         self.rx_activity_detected = timedelta < 1
1426
1427         if not self.rx_activity_detected or not (self.updates_received % 100):
1428             # right time to write statistics to the log (not for every update and
1429             # not too frequently to avoid having large log files)
1430             logger.info("... idle for %.3fs", timedelta)
1431             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1432         return
1433
1434
1435 class WriteTracker(object):
1436     """Class tracking enqueueing messages and sending chunks of them."""
1437
1438     def __init__(self, bgp_socket, generator, timer):
1439         """The writter initialisation.
1440
1441         Arguments:
1442             bgp_socket: socket to be used for sending
1443             generator: generator to be used for message generation
1444             timer: timer to be used for scheduling
1445         """
1446         # References to outside objects,
1447         self.socket = bgp_socket
1448         self.generator = generator
1449         self.timer = timer
1450         # Really new fields.
1451         # TODO: Would attribute docstrings add anything substantial?
1452         self.sending_message = False
1453         self.bytes_to_send = 0
1454         self.msg_out = ""
1455
1456     def enqueue_message_for_sending(self, message):
1457         """Enqueue message and change state.
1458
1459         Arguments:
1460             message: message to be enqueued into the msg_out buffer
1461         """
1462         self.msg_out += message
1463         self.bytes_to_send += len(message)
1464         self.sending_message = True
1465
1466     def send_message_chunk_is_whole(self):
1467         """Send enqueued data from msg_out buffer
1468
1469         Returns:
1470             :return: true if no remaining data to send
1471         """
1472         # We assume there is a msg_out to send and socket is writable.
1473         # print "going to send", repr(self.msg_out)
1474         self.timer.snapshot()
1475         bytes_sent = self.socket.send(self.msg_out)
1476         # Forget the part of message that was sent.
1477         self.msg_out = self.msg_out[bytes_sent:]
1478         self.bytes_to_send -= bytes_sent
1479         if not self.bytes_to_send:
1480             # TODO: Is it possible to hit negative bytes_to_send?
1481             self.sending_message = False
1482             # We should have reset hold timer on peer side.
1483             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1484             # The possible reason for not prioritizing reads is gone.
1485             return True
1486         return False
1487
1488
1489 class StateTracker(object):
1490     """Main loop has state so complex it warrants this separate class."""
1491
1492     def __init__(self, bgp_socket, generator, timer):
1493         """The state tracker initialisation.
1494
1495         Arguments:
1496             bgp_socket: socket to be used for sending / receiving
1497             generator: generator to be used for message generation
1498             timer: timer to be used for scheduling
1499         """
1500         # References to outside objects.
1501         self.socket = bgp_socket
1502         self.generator = generator
1503         self.timer = timer
1504         # Sub-trackers.
1505         self.reader = ReadTracker(bgp_socket, timer)
1506         self.writer = WriteTracker(bgp_socket, generator, timer)
1507         # Prioritization state.
1508         self.prioritize_writing = False
1509         # In general, we prioritize reading over writing. But in order
1510         # not to get blocked by neverending reads, we should
1511         # check whether we are not risking running out of holdtime.
1512         # So in some situations, this field is set to True to attempt
1513         # finishing sending a message, after which this field resets
1514         # back to False.
1515         # TODO: Alternative is to switch fairly between reading and
1516         # writing (called round robin from now on).
1517         # Message counting is done in generator.
1518
1519     def perform_one_loop_iteration(self):
1520         """ The main loop iteration
1521
1522         Notes:
1523             Calculates priority, resolves all conditions, calls
1524             appropriate method and returns to caller to repeat.
1525         """
1526         self.timer.snapshot()
1527         if not self.prioritize_writing:
1528             if self.timer.is_time_for_my_keepalive():
1529                 if not self.writer.sending_message:
1530                     # We need to schedule a keepalive ASAP.
1531                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1532                     logger.info("KEEP ALIVE is sent.")
1533                 # We are sending a message now, so let's prioritize it.
1534                 self.prioritize_writing = True
1535         # Now we know what our priorities are, we have to check
1536         # which actions are available.
1537         # socket.socket() returns three lists,
1538         # we store them to list of lists.
1539         list_list = select.select([self.socket], [self.socket], [self.socket],
1540                                   self.timer.report_timedelta)
1541         read_list, write_list, except_list = list_list
1542         # Lists are unpacked, each is either [] or [self.socket],
1543         # so we will test them as boolean.
1544         if except_list:
1545             logger.error("Exceptional state on the socket.")
1546             raise RuntimeError("Exceptional state on socket", self.socket)
1547         # We will do either read or write.
1548         if not (self.prioritize_writing and write_list):
1549             # Either we have no reason to rush writes,
1550             # or the socket is not writable.
1551             # We are focusing on reading here.
1552             if read_list:  # there is something to read indeed
1553                 # In this case we want to read chunk of message
1554                 # and repeat the select,
1555                 self.reader.read_message_chunk()
1556                 return
1557             # We were focusing on reading, but nothing to read was there.
1558             # Good time to check peer for hold timer.
1559             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1560             # Quiet on the read front, we can have attempt to write.
1561         if write_list:
1562             # Either we really want to reset peer's view of our hold
1563             # timer, or there was nothing to read.
1564             # Were we in the middle of sending a message?
1565             if self.writer.sending_message:
1566                 # Was it the end of a message?
1567                 whole = self.writer.send_message_chunk_is_whole()
1568                 # We were pressed to send something and we did it.
1569                 if self.prioritize_writing and whole:
1570                     # We prioritize reading again.
1571                     self.prioritize_writing = False
1572                 return
1573             # Finally to check if still update messages to be generated.
1574             if self.generator.remaining_prefixes:
1575                 msg_out = self.generator.compose_update_message()
1576                 if not self.generator.remaining_prefixes:
1577                     # We have just finished update generation,
1578                     # end-of-rib is due.
1579                     logger.info("All update messages generated.")
1580                     logger.info("Storing performance results.")
1581                     self.generator.store_results()
1582                     logger.info("Finally an END-OF-RIB is sent.")
1583                     msg_out += self.generator.update_message(wr_prefixes=[],
1584                                                              nlri_prefixes=[])
1585                 self.writer.enqueue_message_for_sending(msg_out)
1586                 # Attempt for real sending to be done in next iteration.
1587                 return
1588             # Nothing to write anymore.
1589             # To avoid busy loop, we do idle waiting here.
1590             self.reader.wait_for_read()
1591             return
1592         # We can neither read nor write.
1593         logger.warning("Input and output both blocked for " +
1594                        str(self.timer.report_timedelta) + " seconds.")
1595         # FIXME: Are we sure select has been really waiting
1596         # the whole period?
1597         return
1598
1599
1600 def create_logger(loglevel, logfile):
1601     """Create logger object
1602
1603     Arguments:
1604         :loglevel: log level
1605         :logfile: log file name
1606     Returns:
1607         :return: logger object
1608     """
1609     logger = logging.getLogger("logger")
1610     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1611     console_handler = logging.StreamHandler()
1612     file_handler = logging.FileHandler(logfile, mode="w")
1613     console_handler.setFormatter(log_formatter)
1614     file_handler.setFormatter(log_formatter)
1615     logger.addHandler(console_handler)
1616     logger.addHandler(file_handler)
1617     logger.setLevel(loglevel)
1618     return logger
1619
1620
1621 def job(arguments):
1622     """One time initialisation and iterations looping.
1623     Notes:
1624         Establish BGP connection and run iterations.
1625
1626     Arguments:
1627         :arguments: Command line arguments
1628     Returns:
1629         :return: None
1630     """
1631     bgp_socket = establish_connection(arguments)
1632     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1633     # Receive open message before sending anything.
1634     # FIXME: Add parameter to send default open message first,
1635     # to work with "you first" peers.
1636     msg_in = read_open_message(bgp_socket)
1637     timer = TimeTracker(msg_in)
1638     generator = MessageGenerator(arguments)
1639     msg_out = generator.open_message()
1640     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1641     # Send our open message to the peer.
1642     bgp_socket.send(msg_out)
1643     # Wait for confirming keepalive.
1644     # TODO: Surely in just one packet?
1645     # Using exact keepalive length to not to see possible updates.
1646     msg_in = bgp_socket.recv(19)
1647     if msg_in != generator.keepalive_message():
1648         logger.error("Open not confirmed by keepalive, instead got " +
1649                      binascii.hexlify(msg_in))
1650         raise MessageError("Open not confirmed by keepalive, instead got",
1651                            msg_in)
1652     timer.reset_peer_hold_time()
1653     # Send the keepalive to indicate the connection is accepted.
1654     timer.snapshot()  # Remember this time.
1655     msg_out = generator.keepalive_message()
1656     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1657     bgp_socket.send(msg_out)
1658     # Use the remembered time.
1659     timer.reset_my_keepalive_time(timer.snapshot_time)
1660     # End of initial handshake phase.
1661     state = StateTracker(bgp_socket, generator, timer)
1662     while True:  # main reactor loop
1663         state.perform_one_loop_iteration()
1664
1665
1666 def threaded_job(arguments):
1667     """Run the job threaded
1668
1669     Arguments:
1670         :arguments: Command line arguments
1671     Returns:
1672         :return: None
1673     """
1674     amount_left = arguments.amount
1675     utils_left = arguments.multiplicity
1676     prefix_current = arguments.firstprefix
1677     myip_current = arguments.myip
1678     thread_args = []
1679
1680     while 1:
1681         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1682         amount_left -= amount_per_util
1683         utils_left -= 1
1684
1685         args = deepcopy(arguments)
1686         args.amount = amount_per_util
1687         args.firstprefix = prefix_current
1688         args.myip = myip_current
1689         thread_args.append(args)
1690
1691         if not utils_left:
1692             break
1693         prefix_current += amount_per_util * 16
1694         myip_current += 1
1695
1696     try:
1697         # Create threads
1698         for t in thread_args:
1699             thread.start_new_thread(job, (t,))
1700     except Exception:
1701         print "Error: unable to start thread."
1702         raise SystemExit(2)
1703
1704     # Work remains forever
1705     while 1:
1706         time.sleep(5)
1707
1708
1709 if __name__ == "__main__":
1710     arguments = parse_arguments()
1711     logger = create_logger(arguments.loglevel, arguments.logfile)
1712     threaded_job(arguments)