Updated code to match new rules
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 import argparse
15 import binascii
16 import ipaddr
17 import select
18 import socket
19 import time
20 import logging
21 import struct
22
23 import thread
24 from copy import deepcopy
25
26
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
31
32
33 def parse_arguments():
34     """Use argparse to get arguments,
35
36     Returns:
37         :return: args object.
38     """
39     parser = argparse.ArgumentParser()
40     # TODO: Should we use --argument-names-with-spaces?
41     str_help = "Autonomous System number use in the stream."
42     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43     # FIXME: We are acting as iBGP peer,
44     # we should mirror AS number from peer's open message.
45     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46     parser.add_argument("--amount", default="1", type=int, help=str_help)
47     str_help = "Maximum number of IP prefixes to be announced in one iteration"
48     parser.add_argument("--insert", default="1", type=int, help=str_help)
49     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51     str_help = "The number of prefixes to process without withdrawals"
52     parser.add_argument("--prefill", default="0", type=int, help=str_help)
53     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54     parser.add_argument("--updates", choices=["single", "separate"],
55                         default=["separate"], help=str_help)
56     str_help = "Base prefix IP address for prefix generation"
57     parser.add_argument("--firstprefix", default="8.0.1.0",
58                         type=ipaddr.IPv4Address, help=str_help)
59     str_help = "The prefix length."
60     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61     str_help = "Listen for connection, instead of initiating it."
62     parser.add_argument("--listen", action="store_true", help=str_help)
63     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64                 "Default value only suitable for listening.")
65     parser.add_argument("--myip", default="0.0.0.0",
66                         type=ipaddr.IPv4Address, help=str_help)
67     str_help = ("TCP port to bind to when listening or initiating connection." +
68                 "Default only suitable for initiating.")
69     parser.add_argument("--myport", default="0", type=int, help=str_help)
70     str_help = "The IP of the next hop to be placed into the update messages."
71     parser.add_argument("--nexthop", default="192.0.2.1",
72                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73     str_help = ("Numeric IP Address to try to connect to." +
74                 "Currently no effect in listening mode.")
75     parser.add_argument("--peerip", default="127.0.0.2",
76                         type=ipaddr.IPv4Address, help=str_help)
77     str_help = "TCP port to try to connect to. No effect in listening mode."
78     parser.add_argument("--peerport", default="179", type=int, help=str_help)
79     str_help = "Local hold time."
80     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
81     str_help = "Log level (--error, --warning, --info, --debug)"
82     parser.add_argument("--error", dest="loglevel", action="store_const",
83                         const=logging.ERROR, default=logging.INFO,
84                         help=str_help)
85     parser.add_argument("--warning", dest="loglevel", action="store_const",
86                         const=logging.WARNING, default=logging.INFO,
87                         help=str_help)
88     parser.add_argument("--info", dest="loglevel", action="store_const",
89                         const=logging.INFO, default=logging.INFO,
90                         help=str_help)
91     parser.add_argument("--debug", dest="loglevel", action="store_const",
92                         const=logging.DEBUG, default=logging.INFO,
93                         help=str_help)
94     str_help = "Log file name"
95     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
96     str_help = "Trailing part of the csv result files for plotting purposes"
97     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
98     str_help = "Minimum number of updates to reach to include result into csv."
99     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
100     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
101     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
102     str_help = "How many play utilities are to be started."
103     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
104     arguments = parser.parse_args()
105     if arguments.multiplicity < 1:
106         print "Multiplicity", arguments.multiplicity, "is not positive."
107         raise SystemExit(1)
108     # TODO: Are sanity checks (such as asnumber>=0) required?
109     return arguments
110
111
112 def establish_connection(arguments):
113     """Establish connection to BGP peer.
114
115     Arguments:
116         :arguments: following command-line argumets are used
117             - arguments.myip: local IP address
118             - arguments.myport: local port
119             - arguments.peerip: remote IP address
120             - arguments.peerport: remote port
121     Returns:
122         :return: socket.
123     """
124     if arguments.listen:
125         logger.info("Connecting in the listening mode.")
126         logger.debug("Local IP address: " + str(arguments.myip))
127         logger.debug("Local port: " + str(arguments.myport))
128         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
130         # bind need single tuple as argument
131         listening_socket.bind((str(arguments.myip), arguments.myport))
132         listening_socket.listen(1)
133         bgp_socket, _ = listening_socket.accept()
134         # TODO: Verify client IP is cotroller IP.
135         listening_socket.close()
136     else:
137         logger.info("Connecting in the talking mode.")
138         logger.debug("Local IP address: " + str(arguments.myip))
139         logger.debug("Local port: " + str(arguments.myport))
140         logger.debug("Remote IP address: " + str(arguments.peerip))
141         logger.debug("Remote port: " + str(arguments.peerport))
142         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
143         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
144         # bind to force specified address and port
145         talking_socket.bind((str(arguments.myip), arguments.myport))
146         # socket does not spead ipaddr, hence str()
147         talking_socket.connect((str(arguments.peerip), arguments.peerport))
148         bgp_socket = talking_socket
149     logger.info("Connected to ODL.")
150     return bgp_socket
151
152
153 def get_short_int_from_message(message, offset=16):
154     """Extract 2-bytes number from provided message.
155
156     Arguments:
157         :message: given message
158         :offset: offset of the short_int inside the message
159     Returns:
160         :return: required short_inf value.
161     Notes:
162         default offset value is the BGP message size offset.
163     """
164     high_byte_int = ord(message[offset])
165     low_byte_int = ord(message[offset + 1])
166     short_int = high_byte_int * 256 + low_byte_int
167     return short_int
168
169
170 def get_prefix_list_from_hex(prefixes_hex):
171     """Get decoded list of prefixes (rfc4271#section-4.3)
172
173     Arguments:
174         :prefixes_hex: list of prefixes to be decoded in hex
175     Returns:
176         :return: list of prefixes in the form of ip address (X.X.X.X/X)
177     """
178     prefix_list = []
179     offset = 0
180     while offset < len(prefixes_hex):
181         prefix_bit_len_hex = prefixes_hex[offset]
182         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
183         prefix_len = ((prefix_bit_len - 1) / 8) + 1
184         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
185         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
186         offset += 1 + prefix_len
187         prefix_list.append(prefix + "/" + str(prefix_bit_len))
188     return prefix_list
189
190
191 class MessageError(ValueError):
192     """Value error with logging optimized for hexlified messages."""
193
194     def __init__(self, text, message, *args):
195         """Initialisation.
196
197         Store and call super init for textual comment,
198         store raw message which caused it.
199         """
200         self.text = text
201         self.msg = message
202         super(MessageError, self).__init__(text, message, *args)
203
204     def __str__(self):
205         """Generate human readable error message.
206
207         Returns:
208             :return: human readable message as string
209         Notes:
210             Use a placeholder string if the message is to be empty.
211         """
212         message = binascii.hexlify(self.msg)
213         if message == "":
214             message = "(empty message)"
215         return self.text + ": " + message
216
217
218 def read_open_message(bgp_socket):
219     """Receive peer's OPEN message
220
221     Arguments:
222         :bgp_socket: the socket to be read
223     Returns:
224         :return: received OPEN message.
225     Notes:
226         Performs just basic incomming message checks
227     """
228     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
229     # TODO: Can the incoming open message be split in more than one packet?
230     # Some validation.
231     if len(msg_in) < 37:
232         # 37 is minimal length of open message with 4-byte AS number.
233         logger.error("Got something else than open with 4-byte AS number: " +
234                      binascii.hexlify(msg_in))
235         raise MessageError("Got something else than open with 4-byte AS number", msg_in)
236     # TODO: We could check BGP marker, but it is defined only later;
237     # decide what to do.
238     reported_length = get_short_int_from_message(msg_in)
239     if len(msg_in) != reported_length:
240         logger.error("Message length is not " + str(reported_length) +
241                      " as stated in " + binascii.hexlify(msg_in))
242         raise MessageError("Message length is not " + reported_length +
243                            " as stated in ", msg_in)
244     logger.info("Open message received.")
245     return msg_in
246
247
248 class MessageGenerator(object):
249     """Class which generates messages, holds states and configuration values."""
250
251     # TODO: Define bgp marker as a class (constant) variable.
252     def __init__(self, args):
253         """Initialisation according to command-line args.
254
255         Arguments:
256             :args: argsparser's Namespace object which contains command-line
257                 options for MesageGenerator initialisation
258         Notes:
259             Calculates and stores default values used later on for
260             message geeration.
261         """
262         self.total_prefix_amount = args.amount
263         # Number of update messages left to be sent.
264         self.remaining_prefixes = self.total_prefix_amount
265
266         # New parameters initialisation
267         self.iteration = 0
268         self.prefix_base_default = args.firstprefix
269         self.prefix_length_default = args.prefixlen
270         self.wr_prefixes_default = []
271         self.nlri_prefixes_default = []
272         self.version_default = 4
273         self.my_autonomous_system_default = args.asnumber
274         self.hold_time_default = args.holdtime  # Local hold time.
275         self.bgp_identifier_default = int(args.myip)
276         self.next_hop_default = args.nexthop
277         self.single_update_default = args.updates == "single"
278         self.randomize_updates_default = args.updates == "random"
279         self.prefix_count_to_add_default = args.insert
280         self.prefix_count_to_del_default = args.withdraw
281         if self.prefix_count_to_del_default < 0:
282             self.prefix_count_to_del_default = 0
283         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
284             # total number of prefixes must grow to avoid infinite test loop
285             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
286         self.slot_size_default = self.prefix_count_to_add_default
287         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
288         self.results_file_name_default = args.results
289         self.performance_threshold_default = args.threshold
290         self.rfc4760 = args.rfc4760 == "yes"
291         # Default values used for randomized part
292         s1_slots = ((self.total_prefix_amount -
293                      self.remaining_prefixes_threshold - 1) /
294                     self.prefix_count_to_add_default + 1)
295         s2_slots = ((self.remaining_prefixes_threshold - 1) /
296                     (self.prefix_count_to_add_default -
297                     self.prefix_count_to_del_default) + 1)
298         # S1_First_Index = 0
299         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
300         s2_first_index = s1_slots * self.prefix_count_to_add_default
301         s2_last_index = (s2_first_index +
302                          s2_slots * (self.prefix_count_to_add_default -
303                                      self.prefix_count_to_del_default) - 1)
304         self.slot_gap_default = ((self.total_prefix_amount -
305                                   self.remaining_prefixes_threshold - 1) /
306                                  self.prefix_count_to_add_default + 1)
307         self.randomize_lowest_default = s2_first_index
308         self.randomize_highest_default = s2_last_index
309
310         # Initialising counters
311         self.phase1_start_time = 0
312         self.phase1_stop_time = 0
313         self.phase2_start_time = 0
314         self.phase2_stop_time = 0
315         self.phase1_updates_sent = 0
316         self.phase2_updates_sent = 0
317         self.updates_sent = 0
318
319         self.log_info = args.loglevel <= logging.INFO
320         self.log_debug = args.loglevel <= logging.DEBUG
321         """
322         Flags needed for the MessageGenerator performance optimization.
323         Calling logger methods each iteration even with proper log level set
324         slows down significantly the MessageGenerator performance.
325         Measured total generation time (1M updates, dry run, error log level):
326         - logging based on basic logger features: 36,2s
327         - logging based on advanced logger features (lazy logging): 21,2s
328         - conditional calling of logger methods enclosed inside condition: 8,6s
329         """
330
331         logger.info("Generator initialisation")
332         logger.info("  Target total number of prefixes to be introduced: " +
333                     str(self.total_prefix_amount))
334         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
335                     str(self.prefix_length_default))
336         logger.info("  My Autonomous System number: " +
337                     str(self.my_autonomous_system_default))
338         logger.info("  My Hold Time: " + str(self.hold_time_default))
339         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
340         logger.info("  Next Hop: " + str(self.next_hop_default))
341         logger.info("  Prefix count to be inserted at once: " +
342                     str(self.prefix_count_to_add_default))
343         logger.info("  Prefix count to be withdrawn at once: " +
344                     str(self.prefix_count_to_del_default))
345         logger.info("  Fast pre-fill up to " +
346                     str(self.total_prefix_amount -
347                         self.remaining_prefixes_threshold) + " prefixes")
348         logger.info("  Remaining number of prefixes to be processed " +
349                     "in parallel with withdrawals: " +
350                     str(self.remaining_prefixes_threshold))
351         logger.debug("  Prefix index range used after pre-fill procedure [" +
352                      str(self.randomize_lowest_default) + ", " +
353                      str(self.randomize_highest_default) + "]")
354         if self.single_update_default:
355             logger.info("  Common single UPDATE will be generated " +
356                         "for both NLRI & WITHDRAWN lists")
357         else:
358             logger.info("  Two separate UPDATEs will be generated " +
359                         "for each NLRI & WITHDRAWN lists")
360         if self.randomize_updates_default:
361             logger.info("  Generation of UPDATE messages will be randomized")
362         logger.info("  Let\'s go ...\n")
363
364         # TODO: Notification for hold timer expiration can be handy.
365
366     def store_results(self, file_name=None, threshold=None):
367         """ Stores specified results into files based on file_name value.
368
369         Arguments:
370             :param file_name: Trailing (common) part of result file names
371             :param threshold: Minimum number of sent updates needed for each
372                               result to be included into result csv file
373                               (mainly needed because of the result accuracy)
374         Returns:
375             :return: n/a
376         """
377         # default values handling
378         if file_name is None:
379             file_name = self.results_file_name_default
380         if threshold is None:
381             threshold = self.performance_threshold_default
382         # performance calculation
383         if self.phase1_updates_sent >= threshold:
384             totals1 = self.phase1_updates_sent
385             performance1 = int(self.phase1_updates_sent /
386                                (self.phase1_stop_time - self.phase1_start_time))
387         else:
388             totals1 = None
389             performance1 = None
390         if self.phase2_updates_sent >= threshold:
391             totals2 = self.phase2_updates_sent
392             performance2 = int(self.phase2_updates_sent /
393                                (self.phase2_stop_time - self.phase2_start_time))
394         else:
395             totals2 = None
396             performance2 = None
397
398         logger.info("#" * 10 + " Final results " + "#" * 10)
399         logger.info("Number of iterations: " + str(self.iteration))
400         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
401                     str(self.phase1_updates_sent))
402         logger.info("The pre-fill phase duration: " +
403                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
404         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
405                     str(self.phase2_updates_sent))
406         logger.info("The 2nd test phase duration: " +
407                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
408         logger.info("Threshold for performance reporting: " + str(threshold))
409
410         # making labels
411         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
412                         " route(s) per UPDATE")
413         if self.single_update_default:
414             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
415                                   "/-" + str(self.prefix_count_to_del_default) +
416                                   " routes per UPDATE")
417         else:
418             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
419                                   "/-" + str(self.prefix_count_to_del_default) +
420                                   " routes in two UPDATEs")
421         # collecting capacity and performance results
422         totals = {}
423         performance = {}
424         if totals1 is not None:
425             totals[phase1_label] = totals1
426             performance[phase1_label] = performance1
427         if totals2 is not None:
428             totals[phase2_label] = totals2
429             performance[phase2_label] = performance2
430         self.write_results_to_file(totals, "totals-" + file_name)
431         self.write_results_to_file(performance, "performance-" + file_name)
432
433     def write_results_to_file(self, results, file_name):
434         """Writes results to the csv plot file consumable by Jenkins.
435
436         Arguments:
437             :param file_name: Name of the (csv) file to be created
438         Returns:
439             :return: none
440         """
441         first_line = ""
442         second_line = ""
443         f = open(file_name, "wt")
444         try:
445             for key in sorted(results):
446                 first_line += key + ", "
447                 second_line += str(results[key]) + ", "
448             first_line = first_line[:-2]
449             second_line = second_line[:-2]
450             f.write(first_line + "\n")
451             f.write(second_line + "\n")
452             logger.info("Message generator performance results stored in " +
453                         file_name + ":")
454             logger.info("  " + first_line)
455             logger.info("  " + second_line)
456         finally:
457             f.close()
458
459     # Return pseudo-randomized (reproducible) index for selected range
460     def randomize_index(self, index, lowest=None, highest=None):
461         """Calculates pseudo-randomized index from selected range.
462
463         Arguments:
464             :param index: input index
465             :param lowest: the lowes index from the randomized area
466             :param highest: the highest index from the randomized area
467         Returns:
468             :return: the (pseudo)randomized index
469         Notes:
470             Created just as a fame for future generator enhancement.
471         """
472         # default values handling
473         if lowest is None:
474             lowest = self.randomize_lowest_default
475         if highest is None:
476             highest = self.randomize_highest_default
477         # randomize
478         if (index >= lowest) and (index <= highest):
479             # we are in the randomized range -> shuffle it inside
480             # the range (now just reverse the order)
481             new_index = highest - (index - lowest)
482         else:
483             # we are out of the randomized range -> nothing to do
484             new_index = index
485         return new_index
486
487     # Get list of prefixes
488     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
489                         prefix_len=None, prefix_count=None, randomize=None):
490         """Generates list of IP address prefixes.
491
492         Arguments:
493             :param slot_index: index of group of prefix addresses
494             :param slot_size: size of group of prefix addresses
495                 in [number of included prefixes]
496             :param prefix_base: IP address of the first prefix
497                 (slot_index = 0, prefix_index = 0)
498             :param prefix_len: length of the prefix in bites
499                 (the same as size of netmask)
500             :param prefix_count: number of prefixes to be returned
501                 from the specified slot
502         Returns:
503             :return: list of generated IP address prefixes
504         """
505         # default values handling
506         if slot_size is None:
507             slot_size = self.slot_size_default
508         if prefix_base is None:
509             prefix_base = self.prefix_base_default
510         if prefix_len is None:
511             prefix_len = self.prefix_length_default
512         if prefix_count is None:
513             prefix_count = slot_size
514         if randomize is None:
515             randomize = self.randomize_updates_default
516         # generating list of prefixes
517         indexes = []
518         prefixes = []
519         prefix_gap = 2 ** (32 - prefix_len)
520         for i in range(prefix_count):
521             prefix_index = slot_index * slot_size + i
522             if randomize:
523                 prefix_index = self.randomize_index(prefix_index)
524             indexes.append(prefix_index)
525             prefixes.append(prefix_base + prefix_index * prefix_gap)
526         if self.log_debug:
527             logger.debug("  Prefix slot index: " + str(slot_index))
528             logger.debug("  Prefix slot size: " + str(slot_size))
529             logger.debug("  Prefix count: " + str(prefix_count))
530             logger.debug("  Prefix indexes: " + str(indexes))
531             logger.debug("  Prefix list: " + str(prefixes))
532         return prefixes
533
534     def compose_update_message(self, prefix_count_to_add=None,
535                                prefix_count_to_del=None):
536         """Composes an UPDATE message
537
538         Arguments:
539             :param prefix_count_to_add: # of prefixes to put into NLRI list
540             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
541         Returns:
542             :return: encoded UPDATE message in HEX
543         Notes:
544             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
545             lists or common message wich includes both prefix lists.
546             Updates global counters.
547         """
548         # default values handling
549         if prefix_count_to_add is None:
550             prefix_count_to_add = self.prefix_count_to_add_default
551         if prefix_count_to_del is None:
552             prefix_count_to_del = self.prefix_count_to_del_default
553         # logging
554         if self.log_info and not (self.iteration % 1000):
555             logger.info("Iteration: " + str(self.iteration) +
556                         " - total remaining prefixes: " +
557                         str(self.remaining_prefixes))
558         if self.log_debug:
559             logger.debug("#" * 10 + " Iteration: " +
560                          str(self.iteration) + " " + "#" * 10)
561             logger.debug("Remaining prefixes: " +
562                          str(self.remaining_prefixes))
563         # scenario type & one-shot counter
564         straightforward_scenario = (self.remaining_prefixes >
565                                     self.remaining_prefixes_threshold)
566         if straightforward_scenario:
567             prefix_count_to_del = 0
568             if self.log_debug:
569                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
570             if not self.phase1_start_time:
571                 self.phase1_start_time = time.time()
572         else:
573             if self.log_debug:
574                 logger.debug("--- COMBINED SCENARIO ---")
575             if not self.phase2_start_time:
576                 self.phase2_start_time = time.time()
577         # tailor the number of prefixes if needed
578         prefix_count_to_add = (prefix_count_to_del +
579                                min(prefix_count_to_add - prefix_count_to_del,
580                                    self.remaining_prefixes))
581         # prefix slots selection for insertion and withdrawal
582         slot_index_to_add = self.iteration
583         slot_index_to_del = slot_index_to_add - self.slot_gap_default
584         # getting lists of prefixes for insertion in this iteration
585         if self.log_debug:
586             logger.debug("Prefixes to be inserted in this iteration:")
587         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
588                                                   prefix_count=prefix_count_to_add)
589         # getting lists of prefixes for withdrawal in this iteration
590         if self.log_debug:
591             logger.debug("Prefixes to be withdrawn in this iteration:")
592         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
593                                                   prefix_count=prefix_count_to_del)
594         # generating the mesage
595         if self.single_update_default:
596             # Send prefixes to be introduced and withdrawn
597             # in one UPDATE message
598             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
599                                           nlri_prefixes=prefix_list_to_add)
600         else:
601             # Send prefixes to be introduced and withdrawn
602             # in separate UPDATE messages (if needed)
603             msg_out = self.update_message(wr_prefixes=[],
604                                           nlri_prefixes=prefix_list_to_add)
605             if prefix_count_to_del:
606                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
607                                                nlri_prefixes=[])
608         # updating counters - who knows ... maybe I am last time here ;)
609         if straightforward_scenario:
610             self.phase1_stop_time = time.time()
611             self.phase1_updates_sent = self.updates_sent
612         else:
613             self.phase2_stop_time = time.time()
614             self.phase2_updates_sent = (self.updates_sent -
615                                         self.phase1_updates_sent)
616         # updating totals for the next iteration
617         self.iteration += 1
618         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
619         # returning the encoded message
620         return msg_out
621
622     # Section of message encoders
623
624     def open_message(self, version=None, my_autonomous_system=None,
625                      hold_time=None, bgp_identifier=None):
626         """Generates an OPEN Message (rfc4271#section-4.2)
627
628         Arguments:
629             :param version: see the rfc4271#section-4.2
630             :param my_autonomous_system: see the rfc4271#section-4.2
631             :param hold_time: see the rfc4271#section-4.2
632             :param bgp_identifier: see the rfc4271#section-4.2
633         Returns:
634             :return: encoded OPEN message in HEX
635         """
636
637         # Default values handling
638         if version is None:
639             version = self.version_default
640         if my_autonomous_system is None:
641             my_autonomous_system = self.my_autonomous_system_default
642         if hold_time is None:
643             hold_time = self.hold_time_default
644         if bgp_identifier is None:
645             bgp_identifier = self.bgp_identifier_default
646
647         # Marker
648         marker_hex = "\xFF" * 16
649
650         # Type
651         type = 1
652         type_hex = struct.pack("B", type)
653
654         # version
655         version_hex = struct.pack("B", version)
656
657         # my_autonomous_system
658         # AS_TRANS value, 23456 decadic.
659         my_autonomous_system_2_bytes = 23456
660         # AS number is mappable to 2 bytes
661         if my_autonomous_system < 65536:
662             my_autonomous_system_2_bytes = my_autonomous_system
663         my_autonomous_system_hex_2_bytes = struct.pack(">H",
664                                                        my_autonomous_system)
665
666         # Hold Time
667         hold_time_hex = struct.pack(">H", hold_time)
668
669         # BGP Identifier
670         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
671
672         # Optional Parameters
673         optional_parameters_hex = ""
674         if self.rfc4760:
675             optional_parameter_hex = (
676                 "\x02"  # Param type ("Capability Ad")
677                 "\x06"  # Length (6 bytes)
678                 "\x01"  # Capability type (NLRI Unicast),
679                         # see RFC 4760, secton 8
680                 "\x04"  # Capability value length
681                 "\x00\x01"  # AFI (Ipv4)
682                 "\x00"  # (reserved)
683                 "\x01"  # SAFI (Unicast)
684             )
685             optional_parameters_hex += optional_parameter_hex
686
687         optional_parameter_hex = (
688             "\x02"  # Param type ("Capability Ad")
689             "\x06"  # Length (6 bytes)
690             "\x41"  # "32 bit AS Numbers Support"
691                     # (see RFC 6793, section 3)
692             "\x04"  # Capability value length
693         )
694         optional_parameter_hex += (
695             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
696         )
697         optional_parameters_hex += optional_parameter_hex
698
699         # Optional Parameters Length
700         optional_parameters_length = len(optional_parameters_hex)
701         optional_parameters_length_hex = struct.pack("B",
702                                                      optional_parameters_length)
703
704         # Length (big-endian)
705         length = (
706             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
707             len(my_autonomous_system_hex_2_bytes) +
708             len(hold_time_hex) + len(bgp_identifier_hex) +
709             len(optional_parameters_length_hex) +
710             len(optional_parameters_hex)
711         )
712         length_hex = struct.pack(">H", length)
713
714         # OPEN Message
715         message_hex = (
716             marker_hex +
717             length_hex +
718             type_hex +
719             version_hex +
720             my_autonomous_system_hex_2_bytes +
721             hold_time_hex +
722             bgp_identifier_hex +
723             optional_parameters_length_hex +
724             optional_parameters_hex
725         )
726
727         if self.log_debug:
728             logger.debug("OPEN message encoding")
729             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
730             logger.debug("  Length=" + str(length) + " (0x" +
731                          binascii.hexlify(length_hex) + ")")
732             logger.debug("  Type=" + str(type) + " (0x" +
733                          binascii.hexlify(type_hex) + ")")
734             logger.debug("  Version=" + str(version) + " (0x" +
735                          binascii.hexlify(version_hex) + ")")
736             logger.debug("  My Autonomous System=" +
737                          str(my_autonomous_system_2_bytes) + " (0x" +
738                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
739                          ")")
740             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
741                          binascii.hexlify(hold_time_hex) + ")")
742             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
743                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
744             logger.debug("  Optional Parameters Length=" +
745                          str(optional_parameters_length) + " (0x" +
746                          binascii.hexlify(optional_parameters_length_hex) +
747                          ")")
748             logger.debug("  Optional Parameters=0x" +
749                          binascii.hexlify(optional_parameters_hex))
750             logger.debug("OPEN message encoded: 0x%s",
751                          binascii.b2a_hex(message_hex))
752
753         return message_hex
754
755     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
756                        wr_prefix_length=None, nlri_prefix_length=None,
757                        my_autonomous_system=None, next_hop=None):
758         """Generates an UPDATE Message (rfc4271#section-4.3)
759
760         Arguments:
761             :param wr_prefixes: see the rfc4271#section-4.3
762             :param nlri_prefixes: see the rfc4271#section-4.3
763             :param wr_prefix_length: see the rfc4271#section-4.3
764             :param nlri_prefix_length: see the rfc4271#section-4.3
765             :param my_autonomous_system: see the rfc4271#section-4.3
766             :param next_hop: see the rfc4271#section-4.3
767         Returns:
768             :return: encoded UPDATE message in HEX
769         """
770
771         # Default values handling
772         if wr_prefixes is None:
773             wr_prefixes = self.wr_prefixes_default
774         if nlri_prefixes is None:
775             nlri_prefixes = self.nlri_prefixes_default
776         if wr_prefix_length is None:
777             wr_prefix_length = self.prefix_length_default
778         if nlri_prefix_length is None:
779             nlri_prefix_length = self.prefix_length_default
780         if my_autonomous_system is None:
781             my_autonomous_system = self.my_autonomous_system_default
782         if next_hop is None:
783             next_hop = self.next_hop_default
784
785         # Marker
786         marker_hex = "\xFF" * 16
787
788         # Type
789         type = 2
790         type_hex = struct.pack("B", type)
791
792         # Withdrawn Routes
793         bytes = ((wr_prefix_length - 1) / 8) + 1
794         withdrawn_routes_hex = ""
795         for prefix in wr_prefixes:
796             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
797                                    struct.pack(">I", int(prefix))[:bytes])
798             withdrawn_routes_hex += withdrawn_route_hex
799
800         # Withdrawn Routes Length
801         withdrawn_routes_length = len(withdrawn_routes_hex)
802         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
803
804         # TODO: to replace hardcoded string by encoding?
805         # Path Attributes
806         if nlri_prefixes != []:
807             path_attributes_hex = (
808                 "\x40"  # Flags ("Well-Known")
809                 "\x01"  # Type (ORIGIN)
810                 "\x01"  # Length (1)
811                 "\x00"  # Origin: IGP
812                 "\x40"  # Flags ("Well-Known")
813                 "\x02"  # Type (AS_PATH)
814                 "\x06"  # Length (6)
815                 "\x02"  # AS segment type (AS_SEQUENCE)
816                 "\x01"  # AS segment length (1)
817             )
818             my_AS = struct.pack(">I", my_autonomous_system)
819             path_attributes_hex += my_AS  # AS segment (4 bytes)
820             path_attributes_hex += (
821                 "\x40"  # Flags ("Well-Known")
822                 "\x03"  # Type (NEXT_HOP)
823                 "\x04"  # Length (4)
824             )
825             next_hop = struct.pack(">I", int(next_hop))
826             path_attributes_hex += (
827                 next_hop  # IP address of the next hop (4 bytes)
828             )
829         else:
830             path_attributes_hex = ""
831
832         # Total Path Attributes Length
833         total_path_attributes_length = len(path_attributes_hex)
834         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
835
836         # Network Layer Reachability Information
837         bytes = ((nlri_prefix_length - 1) / 8) + 1
838         nlri_hex = ""
839         for prefix in nlri_prefixes:
840             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
841                                struct.pack(">I", int(prefix))[:bytes])
842             nlri_hex += nlri_prefix_hex
843
844         # Length (big-endian)
845         length = (
846             len(marker_hex) + 2 + len(type_hex) +
847             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
848             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
849             len(nlri_hex))
850         length_hex = struct.pack(">H", length)
851
852         # UPDATE Message
853         message_hex = (
854             marker_hex +
855             length_hex +
856             type_hex +
857             withdrawn_routes_length_hex +
858             withdrawn_routes_hex +
859             total_path_attributes_length_hex +
860             path_attributes_hex +
861             nlri_hex
862         )
863
864         if self.log_debug:
865             logger.debug("UPDATE message encoding")
866             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
867             logger.debug("  Length=" + str(length) + " (0x" +
868                          binascii.hexlify(length_hex) + ")")
869             logger.debug("  Type=" + str(type) + " (0x" +
870                          binascii.hexlify(type_hex) + ")")
871             logger.debug("  withdrawn_routes_length=" +
872                          str(withdrawn_routes_length) + " (0x" +
873                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
874             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
875                          str(wr_prefix_length) + " (0x" +
876                          binascii.hexlify(withdrawn_routes_hex) + ")")
877             logger.debug("  Total Path Attributes Length=" +
878                          str(total_path_attributes_length) + " (0x" +
879                          binascii.hexlify(total_path_attributes_length_hex) +
880                          ")")
881             logger.debug("  Path Attributes=" + "(0x" +
882                          binascii.hexlify(path_attributes_hex) + ")")
883             logger.debug("  Network Layer Reachability Information=" +
884                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
885                          " (0x" + binascii.hexlify(nlri_hex) + ")")
886             logger.debug("UPDATE message encoded: 0x" +
887                          binascii.b2a_hex(message_hex))
888
889         # updating counter
890         self.updates_sent += 1
891         # returning encoded message
892         return message_hex
893
894     def notification_message(self, error_code, error_subcode, data_hex=""):
895         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
896
897         Arguments:
898             :param error_code: see the rfc4271#section-4.5
899             :param error_subcode: see the rfc4271#section-4.5
900             :param data_hex: see the rfc4271#section-4.5
901         Returns:
902             :return: encoded NOTIFICATION message in HEX
903         """
904
905         # Marker
906         marker_hex = "\xFF" * 16
907
908         # Type
909         type = 3
910         type_hex = struct.pack("B", type)
911
912         # Error Code
913         error_code_hex = struct.pack("B", error_code)
914
915         # Error Subode
916         error_subcode_hex = struct.pack("B", error_subcode)
917
918         # Length (big-endian)
919         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
920                   len(error_subcode_hex) + len(data_hex))
921         length_hex = struct.pack(">H", length)
922
923         # NOTIFICATION Message
924         message_hex = (
925             marker_hex +
926             length_hex +
927             type_hex +
928             error_code_hex +
929             error_subcode_hex +
930             data_hex
931         )
932
933         if self.log_debug:
934             logger.debug("NOTIFICATION message encoding")
935             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
936             logger.debug("  Length=" + str(length) + " (0x" +
937                          binascii.hexlify(length_hex) + ")")
938             logger.debug("  Type=" + str(type) + " (0x" +
939                          binascii.hexlify(type_hex) + ")")
940             logger.debug("  Error Code=" + str(error_code) + " (0x" +
941                          binascii.hexlify(error_code_hex) + ")")
942             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
943                          binascii.hexlify(error_subcode_hex) + ")")
944             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
945             logger.debug("NOTIFICATION message encoded: 0x%s",
946                          binascii.b2a_hex(message_hex))
947
948         return message_hex
949
950     def keepalive_message(self):
951         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
952
953         Returns:
954             :return: encoded KEEP ALIVE message in HEX
955         """
956
957         # Marker
958         marker_hex = "\xFF" * 16
959
960         # Type
961         type = 4
962         type_hex = struct.pack("B", type)
963
964         # Length (big-endian)
965         length = len(marker_hex) + 2 + len(type_hex)
966         length_hex = struct.pack(">H", length)
967
968         # KEEP ALIVE Message
969         message_hex = (
970             marker_hex +
971             length_hex +
972             type_hex
973         )
974
975         if self.log_debug:
976             logger.debug("KEEP ALIVE message encoding")
977             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
978             logger.debug("  Length=" + str(length) + " (0x" +
979                          binascii.hexlify(length_hex) + ")")
980             logger.debug("  Type=" + str(type) + " (0x" +
981                          binascii.hexlify(type_hex) + ")")
982             logger.debug("KEEP ALIVE message encoded: 0x%s",
983                          binascii.b2a_hex(message_hex))
984
985         return message_hex
986
987
988 class TimeTracker(object):
989     """Class for tracking timers, both for my keepalives and
990     peer's hold time.
991     """
992
993     def __init__(self, msg_in):
994         """Initialisation. based on defaults and OPEN message from peer.
995
996         Arguments:
997             msg_in: the OPEN message received from peer.
998         """
999         # Note: Relative time is always named timedelta, to stress that
1000         # the (non-delta) time is absolute.
1001         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1002         # Upper bound for being stuck in the same state, we should
1003         # at least report something before continuing.
1004         # Negotiate the hold timer by taking the smaller
1005         # of the 2 values (mine and the peer's).
1006         hold_timedelta = 180  # Not an attribute of self yet.
1007         # TODO: Make the default value configurable,
1008         # default value could mirror what peer said.
1009         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1010         if hold_timedelta > peer_hold_timedelta:
1011             hold_timedelta = peer_hold_timedelta
1012         if hold_timedelta != 0 and hold_timedelta < 3:
1013             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1014             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1015         self.hold_timedelta = hold_timedelta
1016         # If we do not hear from peer this long, we assume it has died.
1017         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1018         # Upper limit for duration between messages, to avoid being
1019         # declared to be dead.
1020         # The same as calling snapshot(), but also declares a field.
1021         self.snapshot_time = time.time()
1022         # Sometimes we need to store time. This is where to get
1023         # the value from afterwards. Time_keepalive may be too strict.
1024         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1025         # At this time point, peer will be declared dead.
1026         self.my_keepalive_time = None  # to be set later
1027         # At this point, we should be sending keepalive message.
1028
1029     def snapshot(self):
1030         """Store current time in instance data to use later."""
1031         # Read as time before something interesting was called.
1032         self.snapshot_time = time.time()
1033
1034     def reset_peer_hold_time(self):
1035         """Move hold time to future as peer has just proven it still lives."""
1036         self.peer_hold_time = time.time() + self.hold_timedelta
1037
1038     # Some methods could rely on self.snapshot_time, but it is better
1039     # to require user to provide it explicitly.
1040     def reset_my_keepalive_time(self, keepalive_time):
1041         """Calculate and set the next my KEEP ALIVE timeout time
1042
1043         Arguments:
1044             :keepalive_time: the initial value of the KEEP ALIVE timer
1045         """
1046         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1047
1048     def is_time_for_my_keepalive(self):
1049         """Check for my KEEP ALIVE timeout occurence"""
1050         if self.hold_timedelta == 0:
1051             return False
1052         return self.snapshot_time >= self.my_keepalive_time
1053
1054     def get_next_event_time(self):
1055         """Set the time of the next expected or to be sent KEEP ALIVE"""
1056         if self.hold_timedelta == 0:
1057             return self.snapshot_time + 86400
1058         return min(self.my_keepalive_time, self.peer_hold_time)
1059
1060     def check_peer_hold_time(self, snapshot_time):
1061         """Raise error if nothing was read from peer until specified time."""
1062         # Hold time = 0 means keepalive checking off.
1063         if self.hold_timedelta != 0:
1064             # time.time() may be too strict
1065             if snapshot_time > self.peer_hold_time:
1066                 logger.error("Peer has overstepped the hold timer.")
1067                 raise RuntimeError("Peer has overstepped the hold timer.")
1068                 # TODO: Include hold_timedelta?
1069                 # TODO: Add notification sending (attempt). That means
1070                 # move to write tracker.
1071
1072
1073 class ReadTracker(object):
1074     """Class for tracking read of mesages chunk by chunk and
1075     for idle waiting.
1076     """
1077
1078     def __init__(self, bgp_socket, timer):
1079         """The reader initialisation.
1080
1081         Arguments:
1082             bgp_socket: socket to be used for sending
1083             timer: timer to be used for scheduling
1084         """
1085         # References to outside objects.
1086         self.socket = bgp_socket
1087         self.timer = timer
1088         # BGP marker length plus length field length.
1089         self.header_length = 18
1090         # TODO: make it class (constant) attribute
1091         # Computation of where next chunk ends depends on whether
1092         # we are beyond length field.
1093         self.reading_header = True
1094         # Countdown towards next size computation.
1095         self.bytes_to_read = self.header_length
1096         # Incremental buffer for message under read.
1097         self.msg_in = ""
1098         # Initialising counters
1099         self.updates_received = 0
1100         self.prefixes_introduced = 0
1101         self.prefixes_withdrawn = 0
1102         self.rx_idle_time = 0
1103         self.rx_activity_detected = True
1104
1105     def read_message_chunk(self):
1106         """Read up to one message
1107
1108         Note:
1109             Currently it does not return anything.
1110         """
1111         # TODO: We could return the whole message, currently not needed.
1112         # We assume the socket is readable.
1113         chunk_message = self.socket.recv(self.bytes_to_read)
1114         self.msg_in += chunk_message
1115         self.bytes_to_read -= len(chunk_message)
1116         # TODO: bytes_to_read < 0 is not possible, right?
1117         if not self.bytes_to_read:
1118             # Finished reading a logical block.
1119             if self.reading_header:
1120                 # The logical block was a BGP header.
1121                 # Now we know the size of the message.
1122                 self.reading_header = False
1123                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1124                                       self.header_length)
1125             else:  # We have finished reading the body of the message.
1126                 # Peer has just proven it is still alive.
1127                 self.timer.reset_peer_hold_time()
1128                 # TODO: Do we want to count received messages?
1129                 # This version ignores the received message.
1130                 # TODO: Should we do validation and exit on anything
1131                 # besides update or keepalive?
1132                 # Prepare state for reading another message.
1133                 message_type_hex = self.msg_in[self.header_length]
1134                 if message_type_hex == "\x01":
1135                     logger.info("OPEN message received: 0x%s",
1136                                 binascii.b2a_hex(self.msg_in))
1137                 elif message_type_hex == "\x02":
1138                     logger.debug("UPDATE message received: 0x%s",
1139                                  binascii.b2a_hex(self.msg_in))
1140                     self.decode_update_message(self.msg_in)
1141                 elif message_type_hex == "\x03":
1142                     logger.info("NOTIFICATION message received: 0x%s",
1143                                 binascii.b2a_hex(self.msg_in))
1144                 elif message_type_hex == "\x04":
1145                     logger.info("KEEP ALIVE message received: 0x%s",
1146                                 binascii.b2a_hex(self.msg_in))
1147                 else:
1148                     logger.warning("Unexpected message received: 0x%s",
1149                                    binascii.b2a_hex(self.msg_in))
1150                 self.msg_in = ""
1151                 self.reading_header = True
1152                 self.bytes_to_read = self.header_length
1153         # We should not act upon peer_hold_time if we are reading
1154         # something right now.
1155         return
1156
1157     def decode_path_attributes(self, path_attributes_hex):
1158         """Decode the Path Attributes field (rfc4271#section-4.3)
1159
1160         Arguments:
1161             :path_attributes: path_attributes field to be decoded in hex
1162         Returns:
1163             :return: None
1164         """
1165         hex_to_decode = path_attributes_hex
1166
1167         while len(hex_to_decode):
1168             attr_flags_hex = hex_to_decode[0]
1169             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1170 #            attr_optional_bit = attr_flags & 128
1171 #            attr_transitive_bit = attr_flags & 64
1172 #            attr_partial_bit = attr_flags & 32
1173             attr_extended_length_bit = attr_flags & 16
1174
1175             attr_type_code_hex = hex_to_decode[1]
1176             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1177
1178             if attr_extended_length_bit:
1179                 attr_length_hex = hex_to_decode[2:4]
1180                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1181                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1182                 hex_to_decode = hex_to_decode[4 + attr_length:]
1183             else:
1184                 attr_length_hex = hex_to_decode[2]
1185                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1186                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1187                 hex_to_decode = hex_to_decode[3 + attr_length:]
1188
1189             if attr_type_code == 1:
1190                 logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
1191                              binascii.b2a_hex(attr_flags_hex))
1192                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1193             elif attr_type_code == 2:
1194                 logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
1195                              binascii.b2a_hex(attr_flags_hex))
1196                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1197             elif attr_type_code == 3:
1198                 logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
1199                              binascii.b2a_hex(attr_flags_hex))
1200                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1201             elif attr_type_code == 4:
1202                 logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
1203                              binascii.b2a_hex(attr_flags_hex))
1204                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1205             elif attr_type_code == 5:
1206                 logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
1207                              binascii.b2a_hex(attr_flags_hex))
1208                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1209             elif attr_type_code == 6:
1210                 logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
1211                              binascii.b2a_hex(attr_flags_hex))
1212                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1213             elif attr_type_code == 7:
1214                 logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
1215                              binascii.b2a_hex(attr_flags_hex))
1216                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1217             elif attr_type_code == 14:  # rfc4760#section-3
1218                 logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
1219                              binascii.b2a_hex(attr_flags_hex))
1220                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1221                 address_family_identifier_hex = attr_value_hex[0:2]
1222                 logger.debug("  Address Family Identifier = 0x%s",
1223                              binascii.b2a_hex(address_family_identifier_hex))
1224                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1225                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1226                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1227                 next_hop_netaddr_len_hex = attr_value_hex[3]
1228                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1229                 logger.debug("  Length of Next Hop Network Address = 0x%s (%s)",
1230                              binascii.b2a_hex(next_hop_netaddr_len_hex),
1231                              next_hop_netaddr_len)
1232                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1233                 logger.debug("  Network Address of Next Hop = 0x%s",
1234                              binascii.b2a_hex(next_hop_netaddr_hex))
1235                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1236                 logger.debug("  Reserved = 0x%s",
1237                              binascii.b2a_hex(reserved_hex))
1238                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1239                 logger.debug("  Network Layer Reachability Information = 0x%s",
1240                              binascii.b2a_hex(nlri_hex))
1241                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1242                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1243                 for prefix in nlri_prefix_list:
1244                     logger.debug("  nlri_prefix_received: %s", prefix)
1245                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1246             elif attr_type_code == 15:  # rfc4760#section-4
1247                 logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
1248                              binascii.b2a_hex(attr_flags_hex))
1249                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1250                 address_family_identifier_hex = attr_value_hex[0:2]
1251                 logger.debug("  Address Family Identifier = 0x%s",
1252                              binascii.b2a_hex(address_family_identifier_hex))
1253                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1254                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1255                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1256                 wd_hex = attr_value_hex[3:]
1257                 logger.debug("  Withdrawn Routes = 0x%s",
1258                              binascii.b2a_hex(wd_hex))
1259                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1260                 logger.debug("  Withdrawn routes prefix list: %s",
1261                              wdr_prefix_list)
1262                 for prefix in wdr_prefix_list:
1263                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1264                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1265             else:
1266                 logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
1267                              binascii.b2a_hex(attr_flags_hex))
1268                 logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1269         return None
1270
1271     def decode_update_message(self, msg):
1272         """Decode an UPDATE message (rfc4271#section-4.3)
1273
1274         Arguments:
1275             :msg: message to be decoded in hex
1276         Returns:
1277             :return: None
1278         """
1279         logger.debug("Decoding update message:")
1280         # message header - marker
1281         marker_hex = msg[:16]
1282         logger.debug("Message header marker: 0x%s",
1283                      binascii.b2a_hex(marker_hex))
1284         # message header - message length
1285         msg_length_hex = msg[16:18]
1286         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1287         logger.debug("Message lenght: 0x%s (%s)",
1288                      binascii.b2a_hex(msg_length_hex), msg_length)
1289         # message header - message type
1290         msg_type_hex = msg[18:19]
1291         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1292         if msg_type == 2:
1293             logger.debug("Message type: 0x%s (update)",
1294                          binascii.b2a_hex(msg_type_hex))
1295             # withdrawn routes length
1296             wdr_length_hex = msg[19:21]
1297             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1298             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1299                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1300             # withdrawn routes
1301             wdr_hex = msg[21:21 + wdr_length]
1302             logger.debug("Withdrawn routes: 0x%s",
1303                          binascii.b2a_hex(wdr_hex))
1304             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1305             logger.debug("Withdrawn routes prefix list: %s",
1306                          wdr_prefix_list)
1307             for prefix in wdr_prefix_list:
1308                 logger.debug("withdrawn_prefix_received: %s", prefix)
1309             # total path attribute length
1310             total_pa_length_offset = 21 + wdr_length
1311             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1312             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1313             logger.debug("Total path attribute lenght: 0x%s (%s)",
1314                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1315             # path attributes
1316             pa_offset = total_pa_length_offset + 2
1317             pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1318             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1319             self.decode_path_attributes(pa_hex)
1320             # network layer reachability information length
1321             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1322             logger.debug("Calculated NLRI length: %s", nlri_length)
1323             # network layer reachability information
1324             nlri_offset = pa_offset + total_pa_length
1325             nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1326             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1327             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1328             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1329             for prefix in nlri_prefix_list:
1330                 logger.debug("nlri_prefix_received: %s", prefix)
1331             # Updating counters
1332             self.updates_received += 1
1333             self.prefixes_introduced += len(nlri_prefix_list)
1334             self.prefixes_withdrawn += len(wdr_prefix_list)
1335         else:
1336             logger.error("Unexpeced message type 0x%s in 0x%s",
1337                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1338
1339     def wait_for_read(self):
1340         """Read message until timeout (next expected event).
1341
1342         Note:
1343             Used when no more updates has to be sent to avoid busy-wait.
1344             Currently it does not return anything.
1345         """
1346         # Compute time to the first predictable state change
1347         event_time = self.timer.get_next_event_time()
1348         # snapshot_time would be imprecise
1349         wait_timedelta = min(event_time - time.time(), 10)
1350         if wait_timedelta < 0:
1351             # The program got around to waiting to an event in "very near
1352             # future" so late that it became a "past" event, thus tell
1353             # "select" to not wait at all. Passing negative timedelta to
1354             # select() would lead to either waiting forever (for -1) or
1355             # select.error("Invalid parameter") (for everything else).
1356             wait_timedelta = 0
1357         # And wait for event or something to read.
1358
1359         if not self.rx_activity_detected or not (self.updates_received % 100):
1360             # right time to write statistics to the log (not for every update and
1361             # not too frequently to avoid having large log files)
1362             logger.info("total_received_update_message_counter: %s",
1363                         self.updates_received)
1364             logger.info("total_received_nlri_prefix_counter: %s",
1365                         self.prefixes_introduced)
1366             logger.info("total_received_withdrawn_prefix_counter: %s",
1367                         self.prefixes_withdrawn)
1368
1369         start_time = time.time()
1370         select.select([self.socket], [], [self.socket], wait_timedelta)
1371         timedelta = time.time() - start_time
1372         self.rx_idle_time += timedelta
1373         self.rx_activity_detected = timedelta < 1
1374
1375         if not self.rx_activity_detected or not (self.updates_received % 100):
1376             # right time to write statistics to the log (not for every update and
1377             # not too frequently to avoid having large log files)
1378             logger.info("... idle for %.3fs", timedelta)
1379             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1380         return
1381
1382
1383 class WriteTracker(object):
1384     """Class tracking enqueueing messages and sending chunks of them."""
1385
1386     def __init__(self, bgp_socket, generator, timer):
1387         """The writter initialisation.
1388
1389         Arguments:
1390             bgp_socket: socket to be used for sending
1391             generator: generator to be used for message generation
1392             timer: timer to be used for scheduling
1393         """
1394         # References to outside objects,
1395         self.socket = bgp_socket
1396         self.generator = generator
1397         self.timer = timer
1398         # Really new fields.
1399         # TODO: Would attribute docstrings add anything substantial?
1400         self.sending_message = False
1401         self.bytes_to_send = 0
1402         self.msg_out = ""
1403
1404     def enqueue_message_for_sending(self, message):
1405         """Enqueue message and change state.
1406
1407         Arguments:
1408             message: message to be enqueued into the msg_out buffer
1409         """
1410         self.msg_out += message
1411         self.bytes_to_send += len(message)
1412         self.sending_message = True
1413
1414     def send_message_chunk_is_whole(self):
1415         """Send enqueued data from msg_out buffer
1416
1417         Returns:
1418             :return: true if no remaining data to send
1419         """
1420         # We assume there is a msg_out to send and socket is writable.
1421         # print "going to send", repr(self.msg_out)
1422         self.timer.snapshot()
1423         bytes_sent = self.socket.send(self.msg_out)
1424         # Forget the part of message that was sent.
1425         self.msg_out = self.msg_out[bytes_sent:]
1426         self.bytes_to_send -= bytes_sent
1427         if not self.bytes_to_send:
1428             # TODO: Is it possible to hit negative bytes_to_send?
1429             self.sending_message = False
1430             # We should have reset hold timer on peer side.
1431             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1432             # The possible reason for not prioritizing reads is gone.
1433             return True
1434         return False
1435
1436
1437 class StateTracker(object):
1438     """Main loop has state so complex it warrants this separate class."""
1439
1440     def __init__(self, bgp_socket, generator, timer):
1441         """The state tracker initialisation.
1442
1443         Arguments:
1444             bgp_socket: socket to be used for sending / receiving
1445             generator: generator to be used for message generation
1446             timer: timer to be used for scheduling
1447         """
1448         # References to outside objects.
1449         self.socket = bgp_socket
1450         self.generator = generator
1451         self.timer = timer
1452         # Sub-trackers.
1453         self.reader = ReadTracker(bgp_socket, timer)
1454         self.writer = WriteTracker(bgp_socket, generator, timer)
1455         # Prioritization state.
1456         self.prioritize_writing = False
1457         # In general, we prioritize reading over writing. But in order
1458         # not to get blocked by neverending reads, we should
1459         # check whether we are not risking running out of holdtime.
1460         # So in some situations, this field is set to True to attempt
1461         # finishing sending a message, after which this field resets
1462         # back to False.
1463         # TODO: Alternative is to switch fairly between reading and
1464         # writing (called round robin from now on).
1465         # Message counting is done in generator.
1466
1467     def perform_one_loop_iteration(self):
1468         """ The main loop iteration
1469
1470         Notes:
1471             Calculates priority, resolves all conditions, calls
1472             appropriate method and returns to caller to repeat.
1473         """
1474         self.timer.snapshot()
1475         if not self.prioritize_writing:
1476             if self.timer.is_time_for_my_keepalive():
1477                 if not self.writer.sending_message:
1478                     # We need to schedule a keepalive ASAP.
1479                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1480                     logger.info("KEEP ALIVE is sent.")
1481                 # We are sending a message now, so let's prioritize it.
1482                 self.prioritize_writing = True
1483         # Now we know what our priorities are, we have to check
1484         # which actions are available.
1485         # socket.socket() returns three lists,
1486         # we store them to list of lists.
1487         list_list = select.select([self.socket], [self.socket], [self.socket],
1488                                   self.timer.report_timedelta)
1489         read_list, write_list, except_list = list_list
1490         # Lists are unpacked, each is either [] or [self.socket],
1491         # so we will test them as boolean.
1492         if except_list:
1493             logger.error("Exceptional state on the socket.")
1494             raise RuntimeError("Exceptional state on socket", self.socket)
1495         # We will do either read or write.
1496         if not (self.prioritize_writing and write_list):
1497             # Either we have no reason to rush writes,
1498             # or the socket is not writable.
1499             # We are focusing on reading here.
1500             if read_list:  # there is something to read indeed
1501                 # In this case we want to read chunk of message
1502                 # and repeat the select,
1503                 self.reader.read_message_chunk()
1504                 return
1505             # We were focusing on reading, but nothing to read was there.
1506             # Good time to check peer for hold timer.
1507             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1508             # Quiet on the read front, we can have attempt to write.
1509         if write_list:
1510             # Either we really want to reset peer's view of our hold
1511             # timer, or there was nothing to read.
1512             # Were we in the middle of sending a message?
1513             if self.writer.sending_message:
1514                 # Was it the end of a message?
1515                 whole = self.writer.send_message_chunk_is_whole()
1516                 # We were pressed to send something and we did it.
1517                 if self.prioritize_writing and whole:
1518                     # We prioritize reading again.
1519                     self.prioritize_writing = False
1520                 return
1521             # Finally to check if still update messages to be generated.
1522             if self.generator.remaining_prefixes:
1523                 msg_out = self.generator.compose_update_message()
1524                 if not self.generator.remaining_prefixes:
1525                     # We have just finished update generation,
1526                     # end-of-rib is due.
1527                     logger.info("All update messages generated.")
1528                     logger.info("Storing performance results.")
1529                     self.generator.store_results()
1530                     logger.info("Finally an END-OF-RIB is sent.")
1531                     msg_out += self.generator.update_message(wr_prefixes=[],
1532                                                              nlri_prefixes=[])
1533                 self.writer.enqueue_message_for_sending(msg_out)
1534                 # Attempt for real sending to be done in next iteration.
1535                 return
1536             # Nothing to write anymore.
1537             # To avoid busy loop, we do idle waiting here.
1538             self.reader.wait_for_read()
1539             return
1540         # We can neither read nor write.
1541         logger.warning("Input and output both blocked for " +
1542                        str(self.timer.report_timedelta) + " seconds.")
1543         # FIXME: Are we sure select has been really waiting
1544         # the whole period?
1545         return
1546
1547
1548 def create_logger(loglevel, logfile):
1549     """Create logger object
1550
1551     Arguments:
1552         :loglevel: log level
1553         :logfile: log file name
1554     Returns:
1555         :return: logger object
1556     """
1557     logger = logging.getLogger("logger")
1558     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1559     console_handler = logging.StreamHandler()
1560     file_handler = logging.FileHandler(logfile, mode="w")
1561     console_handler.setFormatter(log_formatter)
1562     file_handler.setFormatter(log_formatter)
1563     logger.addHandler(console_handler)
1564     logger.addHandler(file_handler)
1565     logger.setLevel(loglevel)
1566     return logger
1567
1568
1569 def job(arguments):
1570     """One time initialisation and iterations looping.
1571     Notes:
1572         Establish BGP connection and run iterations.
1573
1574     Arguments:
1575         :arguments: Command line arguments
1576     Returns:
1577         :return: None
1578     """
1579     bgp_socket = establish_connection(arguments)
1580     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1581     # Receive open message before sending anything.
1582     # FIXME: Add parameter to send default open message first,
1583     # to work with "you first" peers.
1584     msg_in = read_open_message(bgp_socket)
1585     timer = TimeTracker(msg_in)
1586     generator = MessageGenerator(arguments)
1587     msg_out = generator.open_message()
1588     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1589     # Send our open message to the peer.
1590     bgp_socket.send(msg_out)
1591     # Wait for confirming keepalive.
1592     # TODO: Surely in just one packet?
1593     # Using exact keepalive length to not to see possible updates.
1594     msg_in = bgp_socket.recv(19)
1595     if msg_in != generator.keepalive_message():
1596         logger.error("Open not confirmed by keepalive, instead got " +
1597                      binascii.hexlify(msg_in))
1598         raise MessageError("Open not confirmed by keepalive, instead got",
1599                            msg_in)
1600     timer.reset_peer_hold_time()
1601     # Send the keepalive to indicate the connection is accepted.
1602     timer.snapshot()  # Remember this time.
1603     msg_out = generator.keepalive_message()
1604     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1605     bgp_socket.send(msg_out)
1606     # Use the remembered time.
1607     timer.reset_my_keepalive_time(timer.snapshot_time)
1608     # End of initial handshake phase.
1609     state = StateTracker(bgp_socket, generator, timer)
1610     while True:  # main reactor loop
1611         state.perform_one_loop_iteration()
1612
1613
1614 def threaded_job(arguments):
1615     """Run the job threaded
1616
1617     Arguments:
1618         :arguments: Command line arguments
1619     Returns:
1620         :return: None
1621     """
1622     amount_left = arguments.amount
1623     utils_left = arguments.multiplicity
1624     prefix_current = arguments.firstprefix
1625     myip_current = arguments.myip
1626     thread_args = []
1627
1628     while 1:
1629         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1630         amount_left -= amount_per_util
1631         utils_left -= 1
1632
1633         args = deepcopy(arguments)
1634         args.amount = amount_per_util
1635         args.firstprefix = prefix_current
1636         args.myip = myip_current
1637         thread_args.append(args)
1638
1639         if not utils_left:
1640             break
1641         prefix_current += amount_per_util * 16
1642         myip_current += 1
1643
1644     try:
1645         # Create threads
1646         for t in thread_args:
1647             thread.start_new_thread(job, (t,))
1648     except Exception:
1649         print "Error: unable to start thread."
1650         raise SystemExit(2)
1651
1652     # Work remains forever
1653     while 1:
1654         time.sleep(5)
1655
1656
1657 if __name__ == "__main__":
1658     arguments = parse_arguments()
1659     logger = create_logger(arguments.loglevel, arguments.logfile)
1660     if arguments.multiplicity > 1:
1661         threaded_job(arguments)
1662     else:
1663         job(arguments)