File play.py was reworked to use threads
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
18
19 import argparse
20 import binascii
21 import ipaddr
22 import select
23 import socket
24 import time
25 import logging
26 import struct
27
28 import thread
29 from copy import deepcopy
30
31
32 def parse_arguments():
33     """Use argparse to get arguments,
34
35     Returns:
36         :return: args object.
37     """
38     parser = argparse.ArgumentParser()
39     # TODO: Should we use --argument-names-with-spaces?
40     str_help = "Autonomous System number use in the stream."
41     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
42     # FIXME: We are acting as iBGP peer,
43     # we should mirror AS number from peer's open message.
44     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
45     parser.add_argument("--amount", default="1", type=int, help=str_help)
46     str_help = "Maximum number of IP prefixes to be announced in one iteration"
47     parser.add_argument("--insert", default="1", type=int, help=str_help)
48     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
49     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
50     str_help = "The number of prefixes to process without withdrawals"
51     parser.add_argument("--prefill", default="0", type=int, help=str_help)
52     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
53     parser.add_argument("--updates", choices=["single", "separate"],
54                         default=["separate"], help=str_help)
55     str_help = "Base prefix IP address for prefix generation"
56     parser.add_argument("--firstprefix", default="8.0.1.0",
57                         type=ipaddr.IPv4Address, help=str_help)
58     str_help = "The prefix length."
59     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
60     str_help = "Listen for connection, instead of initiating it."
61     parser.add_argument("--listen", action="store_true", help=str_help)
62     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
63                 "Default value only suitable for listening.")
64     parser.add_argument("--myip", default="0.0.0.0",
65                         type=ipaddr.IPv4Address, help=str_help)
66     str_help = ("TCP port to bind to when listening or initiating connection." +
67                 "Default only suitable for initiating.")
68     parser.add_argument("--myport", default="0", type=int, help=str_help)
69     str_help = "The IP of the next hop to be placed into the update messages."
70     parser.add_argument("--nexthop", default="192.0.2.1",
71                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
72     str_help = ("Numeric IP Address to try to connect to." +
73                 "Currently no effect in listening mode.")
74     parser.add_argument("--peerip", default="127.0.0.2",
75                         type=ipaddr.IPv4Address, help=str_help)
76     str_help = "TCP port to try to connect to. No effect in listening mode."
77     parser.add_argument("--peerport", default="179", type=int, help=str_help)
78     str_help = "Local hold time."
79     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
80     str_help = "Log level (--error, --warning, --info, --debug)"
81     parser.add_argument("--error", dest="loglevel", action="store_const",
82                         const=logging.ERROR, default=logging.INFO,
83                         help=str_help)
84     parser.add_argument("--warning", dest="loglevel", action="store_const",
85                         const=logging.WARNING, default=logging.INFO,
86                         help=str_help)
87     parser.add_argument("--info", dest="loglevel", action="store_const",
88                         const=logging.INFO, default=logging.INFO,
89                         help=str_help)
90     parser.add_argument("--debug", dest="loglevel", action="store_const",
91                         const=logging.DEBUG, default=logging.INFO,
92                         help=str_help)
93     str_help = "Log file name"
94     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
95     str_help = "Trailing part of the csv result files for plotting purposes"
96     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
97     str_help = "Minimum number of updates to reach to include result into csv."
98     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
99     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
100     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
101     str_help = "How many play utilities are to be started."
102     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
103     arguments = parser.parse_args()
104     if arguments.multiplicity < 1:
105         print "Multiplicity", arguments.multiplicity, "is not positive."
106         raise SystemExit(1)
107     # TODO: Are sanity checks (such as asnumber>=0) required?
108     return arguments
109
110
111 def establish_connection(arguments):
112     """Establish connection to BGP peer.
113
114     Arguments:
115         :arguments: following command-line argumets are used
116             - arguments.myip: local IP address
117             - arguments.myport: local port
118             - arguments.peerip: remote IP address
119             - arguments.peerport: remote port
120     Returns:
121         :return: socket.
122     """
123     if arguments.listen:
124         logger.info("Connecting in the listening mode.")
125         logger.debug("Local IP address: " + str(arguments.myip))
126         logger.debug("Local port: " + str(arguments.myport))
127         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
128         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
129         # bind need single tuple as argument
130         listening_socket.bind((str(arguments.myip), arguments.myport))
131         listening_socket.listen(1)
132         bgp_socket, _ = listening_socket.accept()
133         # TODO: Verify client IP is cotroller IP.
134         listening_socket.close()
135     else:
136         logger.info("Connecting in the talking mode.")
137         logger.debug("Local IP address: " + str(arguments.myip))
138         logger.debug("Local port: " + str(arguments.myport))
139         logger.debug("Remote IP address: " + str(arguments.peerip))
140         logger.debug("Remote port: " + str(arguments.peerport))
141         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
142         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
143         # bind to force specified address and port
144         talking_socket.bind((str(arguments.myip), arguments.myport))
145         # socket does not spead ipaddr, hence str()
146         talking_socket.connect((str(arguments.peerip), arguments.peerport))
147         bgp_socket = talking_socket
148     logger.info("Connected to ODL.")
149     return bgp_socket
150
151
152 def get_short_int_from_message(message, offset=16):
153     """Extract 2-bytes number from provided message.
154
155     Arguments:
156         :message: given message
157         :offset: offset of the short_int inside the message
158     Returns:
159         :return: required short_inf value.
160     Notes:
161         default offset value is the BGP message size offset.
162     """
163     high_byte_int = ord(message[offset])
164     low_byte_int = ord(message[offset + 1])
165     short_int = high_byte_int * 256 + low_byte_int
166     return short_int
167
168
169 def get_prefix_list_from_hex(prefixes_hex):
170     """Get decoded list of prefixes (rfc4271#section-4.3)
171
172     Arguments:
173         :prefixes_hex: list of prefixes to be decoded in hex
174     Returns:
175         :return: list of prefixes in the form of ip address (X.X.X.X/X)
176     """
177     prefix_list = []
178     offset = 0
179     while offset < len(prefixes_hex):
180         prefix_bit_len_hex = prefixes_hex[offset]
181         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
182         prefix_len = ((prefix_bit_len - 1) / 8) + 1
183         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
184         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
185         offset += 1 + prefix_len
186         prefix_list.append(prefix + "/" + str(prefix_bit_len))
187     return prefix_list
188
189
190 class MessageError(ValueError):
191     """Value error with logging optimized for hexlified messages."""
192
193     def __init__(self, text, message, *args):
194         """Initialisation.
195
196         Store and call super init for textual comment,
197         store raw message which caused it.
198         """
199         self.text = text
200         self.msg = message
201         super(MessageError, self).__init__(text, message, *args)
202
203     def __str__(self):
204         """Generate human readable error message.
205
206         Returns:
207             :return: human readable message as string
208         Notes:
209             Use a placeholder string if the message is to be empty.
210         """
211         message = binascii.hexlify(self.msg)
212         if message == "":
213             message = "(empty message)"
214         return self.text + ": " + message
215
216
217 def read_open_message(bgp_socket):
218     """Receive peer's OPEN message
219
220     Arguments:
221         :bgp_socket: the socket to be read
222     Returns:
223         :return: received OPEN message.
224     Notes:
225         Performs just basic incomming message checks
226     """
227     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
228     # TODO: Can the incoming open message be split in more than one packet?
229     # Some validation.
230     if len(msg_in) < 37:
231         # 37 is minimal length of open message with 4-byte AS number.
232         logger.error("Got something else than open with 4-byte AS number: " +
233                      binascii.hexlify(msg_in))
234         raise MessageError("Got something else than open with 4-byte AS number", msg_in)
235     # TODO: We could check BGP marker, but it is defined only later;
236     # decide what to do.
237     reported_length = get_short_int_from_message(msg_in)
238     if len(msg_in) != reported_length:
239         logger.error("Message length is not " + str(reported_length) +
240                      " as stated in " + binascii.hexlify(msg_in))
241         raise MessageError("Message length is not " + reported_length +
242                            " as stated in ", msg_in)
243     logger.info("Open message received.")
244     return msg_in
245
246
247 class MessageGenerator(object):
248     """Class which generates messages, holds states and configuration values."""
249
250     # TODO: Define bgp marker as a class (constant) variable.
251     def __init__(self, args):
252         """Initialisation according to command-line args.
253
254         Arguments:
255             :args: argsparser's Namespace object which contains command-line
256                 options for MesageGenerator initialisation
257         Notes:
258             Calculates and stores default values used later on for
259             message geeration.
260         """
261         self.total_prefix_amount = args.amount
262         # Number of update messages left to be sent.
263         self.remaining_prefixes = self.total_prefix_amount
264
265         # New parameters initialisation
266         self.iteration = 0
267         self.prefix_base_default = args.firstprefix
268         self.prefix_length_default = args.prefixlen
269         self.wr_prefixes_default = []
270         self.nlri_prefixes_default = []
271         self.version_default = 4
272         self.my_autonomous_system_default = args.asnumber
273         self.hold_time_default = args.holdtime  # Local hold time.
274         self.bgp_identifier_default = int(args.myip)
275         self.next_hop_default = args.nexthop
276         self.single_update_default = args.updates == "single"
277         self.randomize_updates_default = args.updates == "random"
278         self.prefix_count_to_add_default = args.insert
279         self.prefix_count_to_del_default = args.withdraw
280         if self.prefix_count_to_del_default < 0:
281             self.prefix_count_to_del_default = 0
282         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
283             # total number of prefixes must grow to avoid infinite test loop
284             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
285         self.slot_size_default = self.prefix_count_to_add_default
286         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
287         self.results_file_name_default = args.results
288         self.performance_threshold_default = args.threshold
289         self.rfc4760 = args.rfc4760 == "yes"
290         # Default values used for randomized part
291         s1_slots = ((self.total_prefix_amount -
292                      self.remaining_prefixes_threshold - 1) /
293                     self.prefix_count_to_add_default + 1)
294         s2_slots = ((self.remaining_prefixes_threshold - 1) /
295                     (self.prefix_count_to_add_default -
296                     self.prefix_count_to_del_default) + 1)
297         # S1_First_Index = 0
298         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
299         s2_first_index = s1_slots * self.prefix_count_to_add_default
300         s2_last_index = (s2_first_index +
301                          s2_slots * (self.prefix_count_to_add_default -
302                                      self.prefix_count_to_del_default) - 1)
303         self.slot_gap_default = ((self.total_prefix_amount -
304                                   self.remaining_prefixes_threshold - 1) /
305                                  self.prefix_count_to_add_default + 1)
306         self.randomize_lowest_default = s2_first_index
307         self.randomize_highest_default = s2_last_index
308
309         # Initialising counters
310         self.phase1_start_time = 0
311         self.phase1_stop_time = 0
312         self.phase2_start_time = 0
313         self.phase2_stop_time = 0
314         self.phase1_updates_sent = 0
315         self.phase2_updates_sent = 0
316         self.updates_sent = 0
317
318         self.log_info = args.loglevel <= logging.INFO
319         self.log_debug = args.loglevel <= logging.DEBUG
320         """
321         Flags needed for the MessageGenerator performance optimization.
322         Calling logger methods each iteration even with proper log level set
323         slows down significantly the MessageGenerator performance.
324         Measured total generation time (1M updates, dry run, error log level):
325         - logging based on basic logger features: 36,2s
326         - logging based on advanced logger features (lazy logging): 21,2s
327         - conditional calling of logger methods enclosed inside condition: 8,6s
328         """
329
330         logger.info("Generator initialisation")
331         logger.info("  Target total number of prefixes to be introduced: " +
332                     str(self.total_prefix_amount))
333         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
334                     str(self.prefix_length_default))
335         logger.info("  My Autonomous System number: " +
336                     str(self.my_autonomous_system_default))
337         logger.info("  My Hold Time: " + str(self.hold_time_default))
338         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
339         logger.info("  Next Hop: " + str(self.next_hop_default))
340         logger.info("  Prefix count to be inserted at once: " +
341                     str(self.prefix_count_to_add_default))
342         logger.info("  Prefix count to be withdrawn at once: " +
343                     str(self.prefix_count_to_del_default))
344         logger.info("  Fast pre-fill up to " +
345                     str(self.total_prefix_amount -
346                         self.remaining_prefixes_threshold) + " prefixes")
347         logger.info("  Remaining number of prefixes to be processed " +
348                     "in parallel with withdrawals: " +
349                     str(self.remaining_prefixes_threshold))
350         logger.debug("  Prefix index range used after pre-fill procedure [" +
351                      str(self.randomize_lowest_default) + ", " +
352                      str(self.randomize_highest_default) + "]")
353         if self.single_update_default:
354             logger.info("  Common single UPDATE will be generated " +
355                         "for both NLRI & WITHDRAWN lists")
356         else:
357             logger.info("  Two separate UPDATEs will be generated " +
358                         "for each NLRI & WITHDRAWN lists")
359         if self.randomize_updates_default:
360             logger.info("  Generation of UPDATE messages will be randomized")
361         logger.info("  Let\'s go ...\n")
362
363         # TODO: Notification for hold timer expiration can be handy.
364
365     def store_results(self, file_name=None, threshold=None):
366         """ Stores specified results into files based on file_name value.
367
368         Arguments:
369             :param file_name: Trailing (common) part of result file names
370             :param threshold: Minimum number of sent updates needed for each
371                               result to be included into result csv file
372                               (mainly needed because of the result accuracy)
373         Returns:
374             :return: n/a
375         """
376         # default values handling
377         if file_name is None:
378             file_name = self.results_file_name_default
379         if threshold is None:
380             threshold = self.performance_threshold_default
381         # performance calculation
382         if self.phase1_updates_sent >= threshold:
383             totals1 = self.phase1_updates_sent
384             performance1 = int(self.phase1_updates_sent /
385                                (self.phase1_stop_time - self.phase1_start_time))
386         else:
387             totals1 = None
388             performance1 = None
389         if self.phase2_updates_sent >= threshold:
390             totals2 = self.phase2_updates_sent
391             performance2 = int(self.phase2_updates_sent /
392                                (self.phase2_stop_time - self.phase2_start_time))
393         else:
394             totals2 = None
395             performance2 = None
396
397         logger.info("#" * 10 + " Final results " + "#" * 10)
398         logger.info("Number of iterations: " + str(self.iteration))
399         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
400                     str(self.phase1_updates_sent))
401         logger.info("The pre-fill phase duration: " +
402                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
403         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
404                     str(self.phase2_updates_sent))
405         logger.info("The 2nd test phase duration: " +
406                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
407         logger.info("Threshold for performance reporting: " + str(threshold))
408
409         # making labels
410         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
411                         " route(s) per UPDATE")
412         if self.single_update_default:
413             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
414                                   "/-" + str(self.prefix_count_to_del_default) +
415                                   " routes per UPDATE")
416         else:
417             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
418                                   "/-" + str(self.prefix_count_to_del_default) +
419                                   " routes in two UPDATEs")
420         # collecting capacity and performance results
421         totals = {}
422         performance = {}
423         if totals1 is not None:
424             totals[phase1_label] = totals1
425             performance[phase1_label] = performance1
426         if totals2 is not None:
427             totals[phase2_label] = totals2
428             performance[phase2_label] = performance2
429         self.write_results_to_file(totals, "totals-" + file_name)
430         self.write_results_to_file(performance, "performance-" + file_name)
431
432     def write_results_to_file(self, results, file_name):
433         """Writes results to the csv plot file consumable by Jenkins.
434
435         Arguments:
436             :param file_name: Name of the (csv) file to be created
437         Returns:
438             :return: none
439         """
440         first_line = ""
441         second_line = ""
442         f = open(file_name, "wt")
443         try:
444             for key in sorted(results):
445                 first_line += key + ", "
446                 second_line += str(results[key]) + ", "
447             first_line = first_line[:-2]
448             second_line = second_line[:-2]
449             f.write(first_line + "\n")
450             f.write(second_line + "\n")
451             logger.info("Message generator performance results stored in " +
452                         file_name + ":")
453             logger.info("  " + first_line)
454             logger.info("  " + second_line)
455         finally:
456             f.close()
457
458     # Return pseudo-randomized (reproducible) index for selected range
459     def randomize_index(self, index, lowest=None, highest=None):
460         """Calculates pseudo-randomized index from selected range.
461
462         Arguments:
463             :param index: input index
464             :param lowest: the lowes index from the randomized area
465             :param highest: the highest index from the randomized area
466         Returns:
467             :return: the (pseudo)randomized index
468         Notes:
469             Created just as a fame for future generator enhancement.
470         """
471         # default values handling
472         if lowest is None:
473             lowest = self.randomize_lowest_default
474         if highest is None:
475             highest = self.randomize_highest_default
476         # randomize
477         if (index >= lowest) and (index <= highest):
478             # we are in the randomized range -> shuffle it inside
479             # the range (now just reverse the order)
480             new_index = highest - (index - lowest)
481         else:
482             # we are out of the randomized range -> nothing to do
483             new_index = index
484         return new_index
485
486     # Get list of prefixes
487     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
488                         prefix_len=None, prefix_count=None, randomize=None):
489         """Generates list of IP address prefixes.
490
491         Arguments:
492             :param slot_index: index of group of prefix addresses
493             :param slot_size: size of group of prefix addresses
494                 in [number of included prefixes]
495             :param prefix_base: IP address of the first prefix
496                 (slot_index = 0, prefix_index = 0)
497             :param prefix_len: length of the prefix in bites
498                 (the same as size of netmask)
499             :param prefix_count: number of prefixes to be returned
500                 from the specified slot
501         Returns:
502             :return: list of generated IP address prefixes
503         """
504         # default values handling
505         if slot_size is None:
506             slot_size = self.slot_size_default
507         if prefix_base is None:
508             prefix_base = self.prefix_base_default
509         if prefix_len is None:
510             prefix_len = self.prefix_length_default
511         if prefix_count is None:
512             prefix_count = slot_size
513         if randomize is None:
514             randomize = self.randomize_updates_default
515         # generating list of prefixes
516         indexes = []
517         prefixes = []
518         prefix_gap = 2 ** (32 - prefix_len)
519         for i in range(prefix_count):
520             prefix_index = slot_index * slot_size + i
521             if randomize:
522                 prefix_index = self.randomize_index(prefix_index)
523             indexes.append(prefix_index)
524             prefixes.append(prefix_base + prefix_index * prefix_gap)
525         if self.log_debug:
526             logger.debug("  Prefix slot index: " + str(slot_index))
527             logger.debug("  Prefix slot size: " + str(slot_size))
528             logger.debug("  Prefix count: " + str(prefix_count))
529             logger.debug("  Prefix indexes: " + str(indexes))
530             logger.debug("  Prefix list: " + str(prefixes))
531         return prefixes
532
533     def compose_update_message(self, prefix_count_to_add=None,
534                                prefix_count_to_del=None):
535         """Composes an UPDATE message
536
537         Arguments:
538             :param prefix_count_to_add: # of prefixes to put into NLRI list
539             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
540         Returns:
541             :return: encoded UPDATE message in HEX
542         Notes:
543             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
544             lists or common message wich includes both prefix lists.
545             Updates global counters.
546         """
547         # default values handling
548         if prefix_count_to_add is None:
549             prefix_count_to_add = self.prefix_count_to_add_default
550         if prefix_count_to_del is None:
551             prefix_count_to_del = self.prefix_count_to_del_default
552         # logging
553         if self.log_info and not (self.iteration % 1000):
554             logger.info("Iteration: " + str(self.iteration) +
555                         " - total remaining prefixes: " +
556                         str(self.remaining_prefixes))
557         if self.log_debug:
558             logger.debug("#" * 10 + " Iteration: " +
559                          str(self.iteration) + " " + "#" * 10)
560             logger.debug("Remaining prefixes: " +
561                          str(self.remaining_prefixes))
562         # scenario type & one-shot counter
563         straightforward_scenario = (self.remaining_prefixes >
564                                     self.remaining_prefixes_threshold)
565         if straightforward_scenario:
566             prefix_count_to_del = 0
567             if self.log_debug:
568                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
569             if not self.phase1_start_time:
570                 self.phase1_start_time = time.time()
571         else:
572             if self.log_debug:
573                 logger.debug("--- COMBINED SCENARIO ---")
574             if not self.phase2_start_time:
575                 self.phase2_start_time = time.time()
576         # tailor the number of prefixes if needed
577         prefix_count_to_add = (prefix_count_to_del +
578                                min(prefix_count_to_add - prefix_count_to_del,
579                                    self.remaining_prefixes))
580         # prefix slots selection for insertion and withdrawal
581         slot_index_to_add = self.iteration
582         slot_index_to_del = slot_index_to_add - self.slot_gap_default
583         # getting lists of prefixes for insertion in this iteration
584         if self.log_debug:
585             logger.debug("Prefixes to be inserted in this iteration:")
586         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
587                                                   prefix_count=prefix_count_to_add)
588         # getting lists of prefixes for withdrawal in this iteration
589         if self.log_debug:
590             logger.debug("Prefixes to be withdrawn in this iteration:")
591         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
592                                                   prefix_count=prefix_count_to_del)
593         # generating the mesage
594         if self.single_update_default:
595             # Send prefixes to be introduced and withdrawn
596             # in one UPDATE message
597             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
598                                           nlri_prefixes=prefix_list_to_add)
599         else:
600             # Send prefixes to be introduced and withdrawn
601             # in separate UPDATE messages (if needed)
602             msg_out = self.update_message(wr_prefixes=[],
603                                           nlri_prefixes=prefix_list_to_add)
604             if prefix_count_to_del:
605                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
606                                                nlri_prefixes=[])
607         # updating counters - who knows ... maybe I am last time here ;)
608         if straightforward_scenario:
609             self.phase1_stop_time = time.time()
610             self.phase1_updates_sent = self.updates_sent
611         else:
612             self.phase2_stop_time = time.time()
613             self.phase2_updates_sent = (self.updates_sent -
614                                         self.phase1_updates_sent)
615         # updating totals for the next iteration
616         self.iteration += 1
617         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
618         # returning the encoded message
619         return msg_out
620
621     # Section of message encoders
622
623     def open_message(self, version=None, my_autonomous_system=None,
624                      hold_time=None, bgp_identifier=None):
625         """Generates an OPEN Message (rfc4271#section-4.2)
626
627         Arguments:
628             :param version: see the rfc4271#section-4.2
629             :param my_autonomous_system: see the rfc4271#section-4.2
630             :param hold_time: see the rfc4271#section-4.2
631             :param bgp_identifier: see the rfc4271#section-4.2
632         Returns:
633             :return: encoded OPEN message in HEX
634         """
635
636         # Default values handling
637         if version is None:
638             version = self.version_default
639         if my_autonomous_system is None:
640             my_autonomous_system = self.my_autonomous_system_default
641         if hold_time is None:
642             hold_time = self.hold_time_default
643         if bgp_identifier is None:
644             bgp_identifier = self.bgp_identifier_default
645
646         # Marker
647         marker_hex = "\xFF" * 16
648
649         # Type
650         type = 1
651         type_hex = struct.pack("B", type)
652
653         # version
654         version_hex = struct.pack("B", version)
655
656         # my_autonomous_system
657         # AS_TRANS value, 23456 decadic.
658         my_autonomous_system_2_bytes = 23456
659         # AS number is mappable to 2 bytes
660         if my_autonomous_system < 65536:
661             my_autonomous_system_2_bytes = my_autonomous_system
662         my_autonomous_system_hex_2_bytes = struct.pack(">H",
663                                                        my_autonomous_system)
664
665         # Hold Time
666         hold_time_hex = struct.pack(">H", hold_time)
667
668         # BGP Identifier
669         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
670
671         # Optional Parameters
672         optional_parameters_hex = ""
673         if self.rfc4760:
674             optional_parameter_hex = (
675                 "\x02"  # Param type ("Capability Ad")
676                 "\x06"  # Length (6 bytes)
677                 "\x01"  # Capability type (NLRI Unicast),
678                         # see RFC 4760, secton 8
679                 "\x04"  # Capability value length
680                 "\x00\x01"  # AFI (Ipv4)
681                 "\x00"  # (reserved)
682                 "\x01"  # SAFI (Unicast)
683             )
684             optional_parameters_hex += optional_parameter_hex
685
686         optional_parameter_hex = (
687             "\x02"  # Param type ("Capability Ad")
688             "\x06"  # Length (6 bytes)
689             "\x41"  # "32 bit AS Numbers Support"
690                     # (see RFC 6793, section 3)
691             "\x04"  # Capability value length
692                     # My AS in 32 bit format
693             + struct.pack(">I", my_autonomous_system)
694         )
695         optional_parameters_hex += optional_parameter_hex
696
697         # Optional Parameters Length
698         optional_parameters_length = len(optional_parameters_hex)
699         optional_parameters_length_hex = struct.pack("B",
700                                                      optional_parameters_length)
701
702         # Length (big-endian)
703         length = (
704             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
705             len(my_autonomous_system_hex_2_bytes) +
706             len(hold_time_hex) + len(bgp_identifier_hex) +
707             len(optional_parameters_length_hex) +
708             len(optional_parameters_hex)
709         )
710         length_hex = struct.pack(">H", length)
711
712         # OPEN Message
713         message_hex = (
714             marker_hex +
715             length_hex +
716             type_hex +
717             version_hex +
718             my_autonomous_system_hex_2_bytes +
719             hold_time_hex +
720             bgp_identifier_hex +
721             optional_parameters_length_hex +
722             optional_parameters_hex
723         )
724
725         if self.log_debug:
726             logger.debug("OPEN message encoding")
727             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
728             logger.debug("  Length=" + str(length) + " (0x" +
729                          binascii.hexlify(length_hex) + ")")
730             logger.debug("  Type=" + str(type) + " (0x" +
731                          binascii.hexlify(type_hex) + ")")
732             logger.debug("  Version=" + str(version) + " (0x" +
733                          binascii.hexlify(version_hex) + ")")
734             logger.debug("  My Autonomous System=" +
735                          str(my_autonomous_system_2_bytes) + " (0x" +
736                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
737                          ")")
738             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
739                          binascii.hexlify(hold_time_hex) + ")")
740             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
741                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
742             logger.debug("  Optional Parameters Length=" +
743                          str(optional_parameters_length) + " (0x" +
744                          binascii.hexlify(optional_parameters_length_hex) +
745                          ")")
746             logger.debug("  Optional Parameters=0x" +
747                          binascii.hexlify(optional_parameters_hex))
748             logger.debug("OPEN message encoded: 0x%s",
749                          binascii.b2a_hex(message_hex))
750
751         return message_hex
752
753     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
754                        wr_prefix_length=None, nlri_prefix_length=None,
755                        my_autonomous_system=None, next_hop=None):
756         """Generates an UPDATE Message (rfc4271#section-4.3)
757
758         Arguments:
759             :param wr_prefixes: see the rfc4271#section-4.3
760             :param nlri_prefixes: see the rfc4271#section-4.3
761             :param wr_prefix_length: see the rfc4271#section-4.3
762             :param nlri_prefix_length: see the rfc4271#section-4.3
763             :param my_autonomous_system: see the rfc4271#section-4.3
764             :param next_hop: see the rfc4271#section-4.3
765         Returns:
766             :return: encoded UPDATE message in HEX
767         """
768
769         # Default values handling
770         if wr_prefixes is None:
771             wr_prefixes = self.wr_prefixes_default
772         if nlri_prefixes is None:
773             nlri_prefixes = self.nlri_prefixes_default
774         if wr_prefix_length is None:
775             wr_prefix_length = self.prefix_length_default
776         if nlri_prefix_length is None:
777             nlri_prefix_length = self.prefix_length_default
778         if my_autonomous_system is None:
779             my_autonomous_system = self.my_autonomous_system_default
780         if next_hop is None:
781             next_hop = self.next_hop_default
782
783         # Marker
784         marker_hex = "\xFF" * 16
785
786         # Type
787         type = 2
788         type_hex = struct.pack("B", type)
789
790         # Withdrawn Routes
791         bytes = ((wr_prefix_length - 1) / 8) + 1
792         withdrawn_routes_hex = ""
793         for prefix in wr_prefixes:
794             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
795                                    struct.pack(">I", int(prefix))[:bytes])
796             withdrawn_routes_hex += withdrawn_route_hex
797
798         # Withdrawn Routes Length
799         withdrawn_routes_length = len(withdrawn_routes_hex)
800         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
801
802         # TODO: to replace hardcoded string by encoding?
803         # Path Attributes
804         if nlri_prefixes != []:
805             path_attributes_hex = (
806                 "\x40"  # Flags ("Well-Known")
807                 "\x01"  # Type (ORIGIN)
808                 "\x01"  # Length (1)
809                 "\x00"  # Origin: IGP
810                 "\x40"  # Flags ("Well-Known")
811                 "\x02"  # Type (AS_PATH)
812                 "\x06"  # Length (6)
813                 "\x02"  # AS segment type (AS_SEQUENCE)
814                 "\x01"  # AS segment length (1)
815                         # AS segment (4 bytes)
816                 + struct.pack(">I", my_autonomous_system) +
817                 "\x40"  # Flags ("Well-Known")
818                 "\x03"  # Type (NEXT_HOP)
819                 "\x04"  # Length (4)
820                         # IP address of the next hop (4 bytes)
821                 + struct.pack(">I", int(next_hop))
822             )
823         else:
824             path_attributes_hex = ""
825
826         # Total Path Attributes Length
827         total_path_attributes_length = len(path_attributes_hex)
828         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
829
830         # Network Layer Reachability Information
831         bytes = ((nlri_prefix_length - 1) / 8) + 1
832         nlri_hex = ""
833         for prefix in nlri_prefixes:
834             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
835                                struct.pack(">I", int(prefix))[:bytes])
836             nlri_hex += nlri_prefix_hex
837
838         # Length (big-endian)
839         length = (
840             len(marker_hex) + 2 + len(type_hex) +
841             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
842             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
843             len(nlri_hex))
844         length_hex = struct.pack(">H", length)
845
846         # UPDATE Message
847         message_hex = (
848             marker_hex +
849             length_hex +
850             type_hex +
851             withdrawn_routes_length_hex +
852             withdrawn_routes_hex +
853             total_path_attributes_length_hex +
854             path_attributes_hex +
855             nlri_hex
856         )
857
858         if self.log_debug:
859             logger.debug("UPDATE message encoding")
860             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
861             logger.debug("  Length=" + str(length) + " (0x" +
862                          binascii.hexlify(length_hex) + ")")
863             logger.debug("  Type=" + str(type) + " (0x" +
864                          binascii.hexlify(type_hex) + ")")
865             logger.debug("  withdrawn_routes_length=" +
866                          str(withdrawn_routes_length) + " (0x" +
867                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
868             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
869                          str(wr_prefix_length) + " (0x" +
870                          binascii.hexlify(withdrawn_routes_hex) + ")")
871             logger.debug("  Total Path Attributes Length=" +
872                          str(total_path_attributes_length) + " (0x" +
873                          binascii.hexlify(total_path_attributes_length_hex) +
874                          ")")
875             logger.debug("  Path Attributes=" + "(0x" +
876                          binascii.hexlify(path_attributes_hex) + ")")
877             logger.debug("  Network Layer Reachability Information=" +
878                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
879                          " (0x" + binascii.hexlify(nlri_hex) + ")")
880             logger.debug("UPDATE message encoded: 0x" +
881                          binascii.b2a_hex(message_hex))
882
883         # updating counter
884         self.updates_sent += 1
885         # returning encoded message
886         return message_hex
887
888     def notification_message(self, error_code, error_subcode, data_hex=""):
889         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
890
891         Arguments:
892             :param error_code: see the rfc4271#section-4.5
893             :param error_subcode: see the rfc4271#section-4.5
894             :param data_hex: see the rfc4271#section-4.5
895         Returns:
896             :return: encoded NOTIFICATION message in HEX
897         """
898
899         # Marker
900         marker_hex = "\xFF" * 16
901
902         # Type
903         type = 3
904         type_hex = struct.pack("B", type)
905
906         # Error Code
907         error_code_hex = struct.pack("B", error_code)
908
909         # Error Subode
910         error_subcode_hex = struct.pack("B", error_subcode)
911
912         # Length (big-endian)
913         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
914                   len(error_subcode_hex) + len(data_hex))
915         length_hex = struct.pack(">H", length)
916
917         # NOTIFICATION Message
918         message_hex = (
919             marker_hex +
920             length_hex +
921             type_hex +
922             error_code_hex +
923             error_subcode_hex +
924             data_hex
925         )
926
927         if self.log_debug:
928             logger.debug("NOTIFICATION message encoding")
929             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
930             logger.debug("  Length=" + str(length) + " (0x" +
931                          binascii.hexlify(length_hex) + ")")
932             logger.debug("  Type=" + str(type) + " (0x" +
933                          binascii.hexlify(type_hex) + ")")
934             logger.debug("  Error Code=" + str(error_code) + " (0x" +
935                          binascii.hexlify(error_code_hex) + ")")
936             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
937                          binascii.hexlify(error_subcode_hex) + ")")
938             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
939             logger.debug("NOTIFICATION message encoded: 0x%s",
940                          binascii.b2a_hex(message_hex))
941
942         return message_hex
943
944     def keepalive_message(self):
945         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
946
947         Returns:
948             :return: encoded KEEP ALIVE message in HEX
949         """
950
951         # Marker
952         marker_hex = "\xFF" * 16
953
954         # Type
955         type = 4
956         type_hex = struct.pack("B", type)
957
958         # Length (big-endian)
959         length = len(marker_hex) + 2 + len(type_hex)
960         length_hex = struct.pack(">H", length)
961
962         # KEEP ALIVE Message
963         message_hex = (
964             marker_hex +
965             length_hex +
966             type_hex
967         )
968
969         if self.log_debug:
970             logger.debug("KEEP ALIVE message encoding")
971             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
972             logger.debug("  Length=" + str(length) + " (0x" +
973                          binascii.hexlify(length_hex) + ")")
974             logger.debug("  Type=" + str(type) + " (0x" +
975                          binascii.hexlify(type_hex) + ")")
976             logger.debug("KEEP ALIVE message encoded: 0x%s",
977                          binascii.b2a_hex(message_hex))
978
979         return message_hex
980
981
982 class TimeTracker(object):
983     """Class for tracking timers, both for my keepalives and
984     peer's hold time.
985     """
986
987     def __init__(self, msg_in):
988         """Initialisation. based on defaults and OPEN message from peer.
989
990         Arguments:
991             msg_in: the OPEN message received from peer.
992         """
993         # Note: Relative time is always named timedelta, to stress that
994         # the (non-delta) time is absolute.
995         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
996         # Upper bound for being stuck in the same state, we should
997         # at least report something before continuing.
998         # Negotiate the hold timer by taking the smaller
999         # of the 2 values (mine and the peer's).
1000         hold_timedelta = 180  # Not an attribute of self yet.
1001         # TODO: Make the default value configurable,
1002         # default value could mirror what peer said.
1003         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1004         if hold_timedelta > peer_hold_timedelta:
1005             hold_timedelta = peer_hold_timedelta
1006         if hold_timedelta != 0 and hold_timedelta < 3:
1007             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1008             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1009         self.hold_timedelta = hold_timedelta
1010         # If we do not hear from peer this long, we assume it has died.
1011         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1012         # Upper limit for duration between messages, to avoid being
1013         # declared to be dead.
1014         # The same as calling snapshot(), but also declares a field.
1015         self.snapshot_time = time.time()
1016         # Sometimes we need to store time. This is where to get
1017         # the value from afterwards. Time_keepalive may be too strict.
1018         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1019         # At this time point, peer will be declared dead.
1020         self.my_keepalive_time = None  # to be set later
1021         # At this point, we should be sending keepalive message.
1022
1023     def snapshot(self):
1024         """Store current time in instance data to use later."""
1025         # Read as time before something interesting was called.
1026         self.snapshot_time = time.time()
1027
1028     def reset_peer_hold_time(self):
1029         """Move hold time to future as peer has just proven it still lives."""
1030         self.peer_hold_time = time.time() + self.hold_timedelta
1031
1032     # Some methods could rely on self.snapshot_time, but it is better
1033     # to require user to provide it explicitly.
1034     def reset_my_keepalive_time(self, keepalive_time):
1035         """Calculate and set the next my KEEP ALIVE timeout time
1036
1037         Arguments:
1038             :keepalive_time: the initial value of the KEEP ALIVE timer
1039         """
1040         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1041
1042     def is_time_for_my_keepalive(self):
1043         """Check for my KEEP ALIVE timeout occurence"""
1044         if self.hold_timedelta == 0:
1045             return False
1046         return self.snapshot_time >= self.my_keepalive_time
1047
1048     def get_next_event_time(self):
1049         """Set the time of the next expected or to be sent KEEP ALIVE"""
1050         if self.hold_timedelta == 0:
1051             return self.snapshot_time + 86400
1052         return min(self.my_keepalive_time, self.peer_hold_time)
1053
1054     def check_peer_hold_time(self, snapshot_time):
1055         """Raise error if nothing was read from peer until specified time."""
1056         # Hold time = 0 means keepalive checking off.
1057         if self.hold_timedelta != 0:
1058             # time.time() may be too strict
1059             if snapshot_time > self.peer_hold_time:
1060                 logger.error("Peer has overstepped the hold timer.")
1061                 raise RuntimeError("Peer has overstepped the hold timer.")
1062                 # TODO: Include hold_timedelta?
1063                 # TODO: Add notification sending (attempt). That means
1064                 # move to write tracker.
1065
1066
1067 class ReadTracker(object):
1068     """Class for tracking read of mesages chunk by chunk and
1069     for idle waiting.
1070     """
1071
1072     def __init__(self, bgp_socket, timer):
1073         """The reader initialisation.
1074
1075         Arguments:
1076             bgp_socket: socket to be used for sending
1077             timer: timer to be used for scheduling
1078         """
1079         # References to outside objects.
1080         self.socket = bgp_socket
1081         self.timer = timer
1082         # BGP marker length plus length field length.
1083         self.header_length = 18
1084         # TODO: make it class (constant) attribute
1085         # Computation of where next chunk ends depends on whether
1086         # we are beyond length field.
1087         self.reading_header = True
1088         # Countdown towards next size computation.
1089         self.bytes_to_read = self.header_length
1090         # Incremental buffer for message under read.
1091         self.msg_in = ""
1092         # Initialising counters
1093         self.updates_received = 0
1094         self.prefixes_introduced = 0
1095         self.prefixes_withdrawn = 0
1096         self.rx_idle_time = 0
1097         self.rx_activity_detected = True
1098
1099     def read_message_chunk(self):
1100         """Read up to one message
1101
1102         Note:
1103             Currently it does not return anything.
1104         """
1105         # TODO: We could return the whole message, currently not needed.
1106         # We assume the socket is readable.
1107         chunk_message = self.socket.recv(self.bytes_to_read)
1108         self.msg_in += chunk_message
1109         self.bytes_to_read -= len(chunk_message)
1110         # TODO: bytes_to_read < 0 is not possible, right?
1111         if not self.bytes_to_read:
1112             # Finished reading a logical block.
1113             if self.reading_header:
1114                 # The logical block was a BGP header.
1115                 # Now we know the size of the message.
1116                 self.reading_header = False
1117                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1118                                       self.header_length)
1119             else:  # We have finished reading the body of the message.
1120                 # Peer has just proven it is still alive.
1121                 self.timer.reset_peer_hold_time()
1122                 # TODO: Do we want to count received messages?
1123                 # This version ignores the received message.
1124                 # TODO: Should we do validation and exit on anything
1125                 # besides update or keepalive?
1126                 # Prepare state for reading another message.
1127                 message_type_hex = self.msg_in[self.header_length]
1128                 if message_type_hex == "\x01":
1129                     logger.info("OPEN message received: 0x%s",
1130                                 binascii.b2a_hex(self.msg_in))
1131                 elif message_type_hex == "\x02":
1132                     logger.debug("UPDATE message received: 0x%s",
1133                                  binascii.b2a_hex(self.msg_in))
1134                     self.decode_update_message(self.msg_in)
1135                 elif message_type_hex == "\x03":
1136                     logger.info("NOTIFICATION message received: 0x%s",
1137                                 binascii.b2a_hex(self.msg_in))
1138                 elif message_type_hex == "\x04":
1139                     logger.info("KEEP ALIVE message received: 0x%s",
1140                                 binascii.b2a_hex(self.msg_in))
1141                 else:
1142                     logger.warning("Unexpected message received: 0x%s",
1143                                    binascii.b2a_hex(self.msg_in))
1144                 self.msg_in = ""
1145                 self.reading_header = True
1146                 self.bytes_to_read = self.header_length
1147         # We should not act upon peer_hold_time if we are reading
1148         # something right now.
1149         return
1150
1151     def decode_path_attributes(self, path_attributes_hex):
1152         """Decode the Path Attributes field (rfc4271#section-4.3)
1153
1154         Arguments:
1155             :path_attributes: path_attributes field to be decoded in hex
1156         Returns:
1157             :return: None
1158         """
1159         hex_to_decode = path_attributes_hex
1160
1161         while len(hex_to_decode):
1162             attr_flags_hex = hex_to_decode[0]
1163             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1164 #            attr_optional_bit = attr_flags & 128
1165 #            attr_transitive_bit = attr_flags & 64
1166 #            attr_partial_bit = attr_flags & 32
1167             attr_extended_length_bit = attr_flags & 16
1168
1169             attr_type_code_hex = hex_to_decode[1]
1170             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1171
1172             if attr_extended_length_bit:
1173                 attr_length_hex = hex_to_decode[2:4]
1174                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1175                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1176                 hex_to_decode = hex_to_decode[4 + attr_length:]
1177             else:
1178                 attr_length_hex = hex_to_decode[2]
1179                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1180                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1181                 hex_to_decode = hex_to_decode[3 + attr_length:]
1182
1183             if attr_type_code == 1:
1184                 logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
1185                              binascii.b2a_hex(attr_flags_hex))
1186                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1187             elif attr_type_code == 2:
1188                 logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
1189                              binascii.b2a_hex(attr_flags_hex))
1190                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1191             elif attr_type_code == 3:
1192                 logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
1193                              binascii.b2a_hex(attr_flags_hex))
1194                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1195             elif attr_type_code == 4:
1196                 logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
1197                              binascii.b2a_hex(attr_flags_hex))
1198                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1199             elif attr_type_code == 5:
1200                 logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
1201                              binascii.b2a_hex(attr_flags_hex))
1202                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1203             elif attr_type_code == 6:
1204                 logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
1205                              binascii.b2a_hex(attr_flags_hex))
1206                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1207             elif attr_type_code == 7:
1208                 logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
1209                              binascii.b2a_hex(attr_flags_hex))
1210                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1211             elif attr_type_code == 14:  # rfc4760#section-3
1212                 logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
1213                              binascii.b2a_hex(attr_flags_hex))
1214                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1215                 address_family_identifier_hex = attr_value_hex[0:2]
1216                 logger.debug("  Address Family Identifier = 0x%s",
1217                              binascii.b2a_hex(address_family_identifier_hex))
1218                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1219                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1220                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1221                 next_hop_netaddr_len_hex = attr_value_hex[3]
1222                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1223                 logger.debug("  Length of Next Hop Network Address = 0x%s (%s)",
1224                              binascii.b2a_hex(next_hop_netaddr_len_hex),
1225                              next_hop_netaddr_len)
1226                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1227                 logger.debug("  Network Address of Next Hop = 0x%s",
1228                              binascii.b2a_hex(next_hop_netaddr_hex))
1229                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1230                 logger.debug("  Reserved = 0x%s",
1231                              binascii.b2a_hex(reserved_hex))
1232                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1233                 logger.debug("  Network Layer Reachability Information = 0x%s",
1234                              binascii.b2a_hex(nlri_hex))
1235                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1236                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1237                 for prefix in nlri_prefix_list:
1238                     logger.debug("  nlri_prefix_received: %s", prefix)
1239                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1240             elif attr_type_code == 15:  # rfc4760#section-4
1241                 logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
1242                              binascii.b2a_hex(attr_flags_hex))
1243                 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1244                 address_family_identifier_hex = attr_value_hex[0:2]
1245                 logger.debug("  Address Family Identifier = 0x%s",
1246                              binascii.b2a_hex(address_family_identifier_hex))
1247                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1248                 logger.debug("  Subsequent Address Family Identifier = 0x%s",
1249                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1250                 wd_hex = attr_value_hex[3:]
1251                 logger.debug("  Withdrawn Routes = 0x%s",
1252                              binascii.b2a_hex(wd_hex))
1253                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1254                 logger.debug("  Withdrawn routes prefix list: %s",
1255                              wdr_prefix_list)
1256                 for prefix in wdr_prefix_list:
1257                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1258                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1259             else:
1260                 logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
1261                              binascii.b2a_hex(attr_flags_hex))
1262                 logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1263         return None
1264
1265     def decode_update_message(self, msg):
1266         """Decode an UPDATE message (rfc4271#section-4.3)
1267
1268         Arguments:
1269             :msg: message to be decoded in hex
1270         Returns:
1271             :return: None
1272         """
1273         logger.debug("Decoding update message:")
1274         # message header - marker
1275         marker_hex = msg[:16]
1276         logger.debug("Message header marker: 0x%s",
1277                      binascii.b2a_hex(marker_hex))
1278         # message header - message length
1279         msg_length_hex = msg[16:18]
1280         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1281         logger.debug("Message lenght: 0x%s (%s)",
1282                      binascii.b2a_hex(msg_length_hex), msg_length)
1283         # message header - message type
1284         msg_type_hex = msg[18:19]
1285         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1286         if msg_type == 2:
1287             logger.debug("Message type: 0x%s (update)",
1288                          binascii.b2a_hex(msg_type_hex))
1289             # withdrawn routes length
1290             wdr_length_hex = msg[19:21]
1291             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1292             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1293                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1294             # withdrawn routes
1295             wdr_hex = msg[21:21 + wdr_length]
1296             logger.debug("Withdrawn routes: 0x%s",
1297                          binascii.b2a_hex(wdr_hex))
1298             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1299             logger.debug("Withdrawn routes prefix list: %s",
1300                          wdr_prefix_list)
1301             for prefix in wdr_prefix_list:
1302                 logger.debug("withdrawn_prefix_received: %s", prefix)
1303             # total path attribute length
1304             total_pa_length_offset = 21 + wdr_length
1305             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1306             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1307             logger.debug("Total path attribute lenght: 0x%s (%s)",
1308                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1309             # path attributes
1310             pa_offset = total_pa_length_offset + 2
1311             pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1312             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1313             self.decode_path_attributes(pa_hex)
1314             # network layer reachability information length
1315             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1316             logger.debug("Calculated NLRI length: %s", nlri_length)
1317             # network layer reachability information
1318             nlri_offset = pa_offset + total_pa_length
1319             nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1320             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1321             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1322             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1323             for prefix in nlri_prefix_list:
1324                 logger.debug("nlri_prefix_received: %s", prefix)
1325             # Updating counters
1326             self.updates_received += 1
1327             self.prefixes_introduced += len(nlri_prefix_list)
1328             self.prefixes_withdrawn += len(wdr_prefix_list)
1329         else:
1330             logger.error("Unexpeced message type 0x%s in 0x%s",
1331                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1332
1333     def wait_for_read(self):
1334         """Read message until timeout (next expected event).
1335
1336         Note:
1337             Used when no more updates has to be sent to avoid busy-wait.
1338             Currently it does not return anything.
1339         """
1340         # Compute time to the first predictable state change
1341         event_time = self.timer.get_next_event_time()
1342         # snapshot_time would be imprecise
1343         wait_timedelta = min(event_time - time.time(), 10)
1344         if wait_timedelta < 0:
1345             # The program got around to waiting to an event in "very near
1346             # future" so late that it became a "past" event, thus tell
1347             # "select" to not wait at all. Passing negative timedelta to
1348             # select() would lead to either waiting forever (for -1) or
1349             # select.error("Invalid parameter") (for everything else).
1350             wait_timedelta = 0
1351         # And wait for event or something to read.
1352
1353         if not self.rx_activity_detected or not (self.updates_received % 100):
1354             # right time to write statistics to the log (not for every update and
1355             # not too frequently to avoid having large log files)
1356             logger.info("total_received_update_message_counter: %s",
1357                         self.updates_received)
1358             logger.info("total_received_nlri_prefix_counter: %s",
1359                         self.prefixes_introduced)
1360             logger.info("total_received_withdrawn_prefix_counter: %s",
1361                         self.prefixes_withdrawn)
1362
1363         start_time = time.time()
1364         select.select([self.socket], [], [self.socket], wait_timedelta)
1365         timedelta = time.time() - start_time
1366         self.rx_idle_time += timedelta
1367         self.rx_activity_detected = timedelta < 1
1368
1369         if not self.rx_activity_detected or not (self.updates_received % 100):
1370             # right time to write statistics to the log (not for every update and
1371             # not too frequently to avoid having large log files)
1372             logger.info("... idle for %.3fs", timedelta)
1373             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1374         return
1375
1376
1377 class WriteTracker(object):
1378     """Class tracking enqueueing messages and sending chunks of them."""
1379
1380     def __init__(self, bgp_socket, generator, timer):
1381         """The writter initialisation.
1382
1383         Arguments:
1384             bgp_socket: socket to be used for sending
1385             generator: generator to be used for message generation
1386             timer: timer to be used for scheduling
1387         """
1388         # References to outside objects,
1389         self.socket = bgp_socket
1390         self.generator = generator
1391         self.timer = timer
1392         # Really new fields.
1393         # TODO: Would attribute docstrings add anything substantial?
1394         self.sending_message = False
1395         self.bytes_to_send = 0
1396         self.msg_out = ""
1397
1398     def enqueue_message_for_sending(self, message):
1399         """Enqueue message and change state.
1400
1401         Arguments:
1402             message: message to be enqueued into the msg_out buffer
1403         """
1404         self.msg_out += message
1405         self.bytes_to_send += len(message)
1406         self.sending_message = True
1407
1408     def send_message_chunk_is_whole(self):
1409         """Send enqueued data from msg_out buffer
1410
1411         Returns:
1412             :return: true if no remaining data to send
1413         """
1414         # We assume there is a msg_out to send and socket is writable.
1415         # print "going to send", repr(self.msg_out)
1416         self.timer.snapshot()
1417         bytes_sent = self.socket.send(self.msg_out)
1418         # Forget the part of message that was sent.
1419         self.msg_out = self.msg_out[bytes_sent:]
1420         self.bytes_to_send -= bytes_sent
1421         if not self.bytes_to_send:
1422             # TODO: Is it possible to hit negative bytes_to_send?
1423             self.sending_message = False
1424             # We should have reset hold timer on peer side.
1425             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1426             # The possible reason for not prioritizing reads is gone.
1427             return True
1428         return False
1429
1430
1431 class StateTracker(object):
1432     """Main loop has state so complex it warrants this separate class."""
1433
1434     def __init__(self, bgp_socket, generator, timer):
1435         """The state tracker initialisation.
1436
1437         Arguments:
1438             bgp_socket: socket to be used for sending / receiving
1439             generator: generator to be used for message generation
1440             timer: timer to be used for scheduling
1441         """
1442         # References to outside objects.
1443         self.socket = bgp_socket
1444         self.generator = generator
1445         self.timer = timer
1446         # Sub-trackers.
1447         self.reader = ReadTracker(bgp_socket, timer)
1448         self.writer = WriteTracker(bgp_socket, generator, timer)
1449         # Prioritization state.
1450         self.prioritize_writing = False
1451         # In general, we prioritize reading over writing. But in order
1452         # not to get blocked by neverending reads, we should
1453         # check whether we are not risking running out of holdtime.
1454         # So in some situations, this field is set to True to attempt
1455         # finishing sending a message, after which this field resets
1456         # back to False.
1457         # TODO: Alternative is to switch fairly between reading and
1458         # writing (called round robin from now on).
1459         # Message counting is done in generator.
1460
1461     def perform_one_loop_iteration(self):
1462         """ The main loop iteration
1463
1464         Notes:
1465             Calculates priority, resolves all conditions, calls
1466             appropriate method and returns to caller to repeat.
1467         """
1468         self.timer.snapshot()
1469         if not self.prioritize_writing:
1470             if self.timer.is_time_for_my_keepalive():
1471                 if not self.writer.sending_message:
1472                     # We need to schedule a keepalive ASAP.
1473                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1474                     logger.info("KEEP ALIVE is sent.")
1475                 # We are sending a message now, so let's prioritize it.
1476                 self.prioritize_writing = True
1477         # Now we know what our priorities are, we have to check
1478         # which actions are available.
1479         # socket.socket() returns three lists,
1480         # we store them to list of lists.
1481         list_list = select.select([self.socket], [self.socket], [self.socket],
1482                                   self.timer.report_timedelta)
1483         read_list, write_list, except_list = list_list
1484         # Lists are unpacked, each is either [] or [self.socket],
1485         # so we will test them as boolean.
1486         if except_list:
1487             logger.error("Exceptional state on the socket.")
1488             raise RuntimeError("Exceptional state on socket", self.socket)
1489         # We will do either read or write.
1490         if not (self.prioritize_writing and write_list):
1491             # Either we have no reason to rush writes,
1492             # or the socket is not writable.
1493             # We are focusing on reading here.
1494             if read_list:  # there is something to read indeed
1495                 # In this case we want to read chunk of message
1496                 # and repeat the select,
1497                 self.reader.read_message_chunk()
1498                 return
1499             # We were focusing on reading, but nothing to read was there.
1500             # Good time to check peer for hold timer.
1501             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1502             # Quiet on the read front, we can have attempt to write.
1503         if write_list:
1504             # Either we really want to reset peer's view of our hold
1505             # timer, or there was nothing to read.
1506             # Were we in the middle of sending a message?
1507             if self.writer.sending_message:
1508                 # Was it the end of a message?
1509                 whole = self.writer.send_message_chunk_is_whole()
1510                 # We were pressed to send something and we did it.
1511                 if self.prioritize_writing and whole:
1512                     # We prioritize reading again.
1513                     self.prioritize_writing = False
1514                 return
1515             # Finally to check if still update messages to be generated.
1516             if self.generator.remaining_prefixes:
1517                 msg_out = self.generator.compose_update_message()
1518                 if not self.generator.remaining_prefixes:
1519                     # We have just finished update generation,
1520                     # end-of-rib is due.
1521                     logger.info("All update messages generated.")
1522                     logger.info("Storing performance results.")
1523                     self.generator.store_results()
1524                     logger.info("Finally an END-OF-RIB is sent.")
1525                     msg_out += self.generator.update_message(wr_prefixes=[],
1526                                                              nlri_prefixes=[])
1527                 self.writer.enqueue_message_for_sending(msg_out)
1528                 # Attempt for real sending to be done in next iteration.
1529                 return
1530             # Nothing to write anymore.
1531             # To avoid busy loop, we do idle waiting here.
1532             self.reader.wait_for_read()
1533             return
1534         # We can neither read nor write.
1535         logger.warning("Input and output both blocked for " +
1536                        str(self.timer.report_timedelta) + " seconds.")
1537         # FIXME: Are we sure select has been really waiting
1538         # the whole period?
1539         return
1540
1541
1542 def create_logger(loglevel, logfile):
1543     """Create logger object
1544
1545     Arguments:
1546         :loglevel: log level
1547         :logfile: log file name
1548     Returns:
1549         :return: logger object
1550     """
1551     logger = logging.getLogger("logger")
1552     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1553     console_handler = logging.StreamHandler()
1554     file_handler = logging.FileHandler(logfile, mode="w")
1555     console_handler.setFormatter(log_formatter)
1556     file_handler.setFormatter(log_formatter)
1557     logger.addHandler(console_handler)
1558     logger.addHandler(file_handler)
1559     logger.setLevel(loglevel)
1560     return logger
1561
1562
1563 def job(arguments):
1564     """One time initialisation and iterations looping.
1565     Notes:
1566         Establish BGP connection and run iterations.
1567
1568     Arguments:
1569         :arguments: Command line arguments
1570     Returns:
1571         :return: None
1572     """
1573     bgp_socket = establish_connection(arguments)
1574     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1575     # Receive open message before sending anything.
1576     # FIXME: Add parameter to send default open message first,
1577     # to work with "you first" peers.
1578     msg_in = read_open_message(bgp_socket)
1579     timer = TimeTracker(msg_in)
1580     generator = MessageGenerator(arguments)
1581     msg_out = generator.open_message()
1582     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1583     # Send our open message to the peer.
1584     bgp_socket.send(msg_out)
1585     # Wait for confirming keepalive.
1586     # TODO: Surely in just one packet?
1587     # Using exact keepalive length to not to see possible updates.
1588     msg_in = bgp_socket.recv(19)
1589     if msg_in != generator.keepalive_message():
1590         logger.error("Open not confirmed by keepalive, instead got " +
1591                      binascii.hexlify(msg_in))
1592         raise MessageError("Open not confirmed by keepalive, instead got",
1593                            msg_in)
1594     timer.reset_peer_hold_time()
1595     # Send the keepalive to indicate the connection is accepted.
1596     timer.snapshot()  # Remember this time.
1597     msg_out = generator.keepalive_message()
1598     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1599     bgp_socket.send(msg_out)
1600     # Use the remembered time.
1601     timer.reset_my_keepalive_time(timer.snapshot_time)
1602     # End of initial handshake phase.
1603     state = StateTracker(bgp_socket, generator, timer)
1604     while True:  # main reactor loop
1605         state.perform_one_loop_iteration()
1606
1607
1608 def threaded_job(arguments):
1609     """Run the job threaded
1610
1611     Arguments:
1612         :arguments: Command line arguments
1613     Returns:
1614         :return: None
1615     """
1616     amount_left = arguments.amount
1617     utils_left = arguments.multiplicity
1618     prefix_current = arguments.firstprefix
1619     myip_current = arguments.myip
1620     thread_args = []
1621
1622     while 1:
1623         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1624         amount_left -= amount_per_util
1625         utils_left -= 1
1626
1627         args = deepcopy(arguments)
1628         args.amount = amount_per_util
1629         args.firstprefix = prefix_current
1630         args.myip = myip_current
1631         thread_args.append(args)
1632
1633         if not utils_left:
1634             break
1635         prefix_current += amount_per_util * 16
1636         myip_current += 1
1637
1638     try:
1639         # Create threads
1640         for t in thread_args:
1641             thread.start_new_thread(job, (t,))
1642     except Exception:
1643         print "Error: unable to start thread."
1644         raise SystemExit(2)
1645
1646     # Work remains forever
1647     while 1:
1648         time.sleep(5)
1649
1650
1651 if __name__ == "__main__":
1652     arguments = parse_arguments()
1653     logger = create_logger(arguments.loglevel, arguments.logfile)
1654     if arguments.multiplicity > 1:
1655         threaded_job(arguments)
1656     else:
1657         job(arguments)