basic iBGP peers functional suite
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 import argparse
15 import binascii
16 import ipaddr
17 import select
18 import socket
19 import time
20 import logging
21 import struct
22
23 import thread
24 from copy import deepcopy
25
26
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
31
32
33 def parse_arguments():
34     """Use argparse to get arguments,
35
36     Returns:
37         :return: args object.
38     """
39     parser = argparse.ArgumentParser()
40     # TODO: Should we use --argument-names-with-spaces?
41     str_help = "Autonomous System number use in the stream."
42     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43     # FIXME: We are acting as iBGP peer,
44     # we should mirror AS number from peer's open message.
45     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46     parser.add_argument("--amount", default="1", type=int, help=str_help)
47     str_help = "Maximum number of IP prefixes to be announced in one iteration"
48     parser.add_argument("--insert", default="1", type=int, help=str_help)
49     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51     str_help = "The number of prefixes to process without withdrawals"
52     parser.add_argument("--prefill", default="0", type=int, help=str_help)
53     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54     parser.add_argument("--updates", choices=["single", "separate"],
55                         default=["separate"], help=str_help)
56     str_help = "Base prefix IP address for prefix generation"
57     parser.add_argument("--firstprefix", default="8.0.1.0",
58                         type=ipaddr.IPv4Address, help=str_help)
59     str_help = "The prefix length."
60     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61     str_help = "Listen for connection, instead of initiating it."
62     parser.add_argument("--listen", action="store_true", help=str_help)
63     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64                 "Default value only suitable for listening.")
65     parser.add_argument("--myip", default="0.0.0.0",
66                         type=ipaddr.IPv4Address, help=str_help)
67     str_help = ("TCP port to bind to when listening or initiating connection." +
68                 "Default only suitable for initiating.")
69     parser.add_argument("--myport", default="0", type=int, help=str_help)
70     str_help = "The IP of the next hop to be placed into the update messages."
71     parser.add_argument("--nexthop", default="192.0.2.1",
72                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73     str_help = "Identifier of the route originator."
74     parser.add_argument("--originator", default=None,
75                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
76     str_help = "Cluster list item identifier."
77     parser.add_argument("--cluster", default=None,
78                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79     str_help = ("Numeric IP Address to try to connect to." +
80                 "Currently no effect in listening mode.")
81     parser.add_argument("--peerip", default="127.0.0.2",
82                         type=ipaddr.IPv4Address, help=str_help)
83     str_help = "TCP port to try to connect to. No effect in listening mode."
84     parser.add_argument("--peerport", default="179", type=int, help=str_help)
85     str_help = "Local hold time."
86     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87     str_help = "Log level (--error, --warning, --info, --debug)"
88     parser.add_argument("--error", dest="loglevel", action="store_const",
89                         const=logging.ERROR, default=logging.INFO,
90                         help=str_help)
91     parser.add_argument("--warning", dest="loglevel", action="store_const",
92                         const=logging.WARNING, default=logging.INFO,
93                         help=str_help)
94     parser.add_argument("--info", dest="loglevel", action="store_const",
95                         const=logging.INFO, default=logging.INFO,
96                         help=str_help)
97     parser.add_argument("--debug", dest="loglevel", action="store_const",
98                         const=logging.DEBUG, default=logging.INFO,
99                         help=str_help)
100     str_help = "Log file name"
101     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102     str_help = "Trailing part of the csv result files for plotting purposes"
103     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104     str_help = "Minimum number of updates to reach to include result into csv."
105     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
108     str_help = "How many play utilities are to be started."
109     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
110     arguments = parser.parse_args()
111     if arguments.multiplicity < 1:
112         print "Multiplicity", arguments.multiplicity, "is not positive."
113         raise SystemExit(1)
114     # TODO: Are sanity checks (such as asnumber>=0) required?
115     return arguments
116
117
118 def establish_connection(arguments):
119     """Establish connection to BGP peer.
120
121     Arguments:
122         :arguments: following command-line argumets are used
123             - arguments.myip: local IP address
124             - arguments.myport: local port
125             - arguments.peerip: remote IP address
126             - arguments.peerport: remote port
127     Returns:
128         :return: socket.
129     """
130     if arguments.listen:
131         logger.info("Connecting in the listening mode.")
132         logger.debug("Local IP address: " + str(arguments.myip))
133         logger.debug("Local port: " + str(arguments.myport))
134         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
135         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
136         # bind need single tuple as argument
137         listening_socket.bind((str(arguments.myip), arguments.myport))
138         listening_socket.listen(1)
139         bgp_socket, _ = listening_socket.accept()
140         # TODO: Verify client IP is cotroller IP.
141         listening_socket.close()
142     else:
143         logger.info("Connecting in the talking mode.")
144         logger.debug("Local IP address: " + str(arguments.myip))
145         logger.debug("Local port: " + str(arguments.myport))
146         logger.debug("Remote IP address: " + str(arguments.peerip))
147         logger.debug("Remote port: " + str(arguments.peerport))
148         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
150         # bind to force specified address and port
151         talking_socket.bind((str(arguments.myip), arguments.myport))
152         # socket does not spead ipaddr, hence str()
153         talking_socket.connect((str(arguments.peerip), arguments.peerport))
154         bgp_socket = talking_socket
155     logger.info("Connected to ODL.")
156     return bgp_socket
157
158
159 def get_short_int_from_message(message, offset=16):
160     """Extract 2-bytes number from provided message.
161
162     Arguments:
163         :message: given message
164         :offset: offset of the short_int inside the message
165     Returns:
166         :return: required short_inf value.
167     Notes:
168         default offset value is the BGP message size offset.
169     """
170     high_byte_int = ord(message[offset])
171     low_byte_int = ord(message[offset + 1])
172     short_int = high_byte_int * 256 + low_byte_int
173     return short_int
174
175
176 def get_prefix_list_from_hex(prefixes_hex):
177     """Get decoded list of prefixes (rfc4271#section-4.3)
178
179     Arguments:
180         :prefixes_hex: list of prefixes to be decoded in hex
181     Returns:
182         :return: list of prefixes in the form of ip address (X.X.X.X/X)
183     """
184     prefix_list = []
185     offset = 0
186     while offset < len(prefixes_hex):
187         prefix_bit_len_hex = prefixes_hex[offset]
188         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
189         prefix_len = ((prefix_bit_len - 1) / 8) + 1
190         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
191         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
192         offset += 1 + prefix_len
193         prefix_list.append(prefix + "/" + str(prefix_bit_len))
194     return prefix_list
195
196
197 class MessageError(ValueError):
198     """Value error with logging optimized for hexlified messages."""
199
200     def __init__(self, text, message, *args):
201         """Initialisation.
202
203         Store and call super init for textual comment,
204         store raw message which caused it.
205         """
206         self.text = text
207         self.msg = message
208         super(MessageError, self).__init__(text, message, *args)
209
210     def __str__(self):
211         """Generate human readable error message.
212
213         Returns:
214             :return: human readable message as string
215         Notes:
216             Use a placeholder string if the message is to be empty.
217         """
218         message = binascii.hexlify(self.msg)
219         if message == "":
220             message = "(empty message)"
221         return self.text + ": " + message
222
223
224 def read_open_message(bgp_socket):
225     """Receive peer's OPEN message
226
227     Arguments:
228         :bgp_socket: the socket to be read
229     Returns:
230         :return: received OPEN message.
231     Notes:
232         Performs just basic incomming message checks
233     """
234     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
235     # TODO: Can the incoming open message be split in more than one packet?
236     # Some validation.
237     if len(msg_in) < 37:
238         # 37 is minimal length of open message with 4-byte AS number.
239         logger.error("Got something else than open with 4-byte AS number: " +
240                      binascii.hexlify(msg_in))
241         raise MessageError("Got something else than open with 4-byte AS number", msg_in)
242     # TODO: We could check BGP marker, but it is defined only later;
243     # decide what to do.
244     reported_length = get_short_int_from_message(msg_in)
245     if len(msg_in) != reported_length:
246         logger.error("Message length is not " + str(reported_length) +
247                      " as stated in " + binascii.hexlify(msg_in))
248         raise MessageError("Message length is not " + reported_length +
249                            " as stated in ", msg_in)
250     logger.info("Open message received.")
251     return msg_in
252
253
254 class MessageGenerator(object):
255     """Class which generates messages, holds states and configuration values."""
256
257     # TODO: Define bgp marker as a class (constant) variable.
258     def __init__(self, args):
259         """Initialisation according to command-line args.
260
261         Arguments:
262             :args: argsparser's Namespace object which contains command-line
263                 options for MesageGenerator initialisation
264         Notes:
265             Calculates and stores default values used later on for
266             message geeration.
267         """
268         self.total_prefix_amount = args.amount
269         # Number of update messages left to be sent.
270         self.remaining_prefixes = self.total_prefix_amount
271
272         # New parameters initialisation
273         self.iteration = 0
274         self.prefix_base_default = args.firstprefix
275         self.prefix_length_default = args.prefixlen
276         self.wr_prefixes_default = []
277         self.nlri_prefixes_default = []
278         self.version_default = 4
279         self.my_autonomous_system_default = args.asnumber
280         self.hold_time_default = args.holdtime  # Local hold time.
281         self.bgp_identifier_default = int(args.myip)
282         self.next_hop_default = args.nexthop
283         self.originator_id_default = args.originator
284         self.cluster_list_item_default = args.cluster
285         self.single_update_default = args.updates == "single"
286         self.randomize_updates_default = args.updates == "random"
287         self.prefix_count_to_add_default = args.insert
288         self.prefix_count_to_del_default = args.withdraw
289         if self.prefix_count_to_del_default < 0:
290             self.prefix_count_to_del_default = 0
291         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
292             # total number of prefixes must grow to avoid infinite test loop
293             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
294         self.slot_size_default = self.prefix_count_to_add_default
295         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
296         self.results_file_name_default = args.results
297         self.performance_threshold_default = args.threshold
298         self.rfc4760 = args.rfc4760 == "yes"
299         # Default values used for randomized part
300         s1_slots = ((self.total_prefix_amount -
301                      self.remaining_prefixes_threshold - 1) /
302                     self.prefix_count_to_add_default + 1)
303         s2_slots = ((self.remaining_prefixes_threshold - 1) /
304                     (self.prefix_count_to_add_default -
305                     self.prefix_count_to_del_default) + 1)
306         # S1_First_Index = 0
307         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
308         s2_first_index = s1_slots * self.prefix_count_to_add_default
309         s2_last_index = (s2_first_index +
310                          s2_slots * (self.prefix_count_to_add_default -
311                                      self.prefix_count_to_del_default) - 1)
312         self.slot_gap_default = ((self.total_prefix_amount -
313                                   self.remaining_prefixes_threshold - 1) /
314                                  self.prefix_count_to_add_default + 1)
315         self.randomize_lowest_default = s2_first_index
316         self.randomize_highest_default = s2_last_index
317
318         # Initialising counters
319         self.phase1_start_time = 0
320         self.phase1_stop_time = 0
321         self.phase2_start_time = 0
322         self.phase2_stop_time = 0
323         self.phase1_updates_sent = 0
324         self.phase2_updates_sent = 0
325         self.updates_sent = 0
326
327         self.log_info = args.loglevel <= logging.INFO
328         self.log_debug = args.loglevel <= logging.DEBUG
329         """
330         Flags needed for the MessageGenerator performance optimization.
331         Calling logger methods each iteration even with proper log level set
332         slows down significantly the MessageGenerator performance.
333         Measured total generation time (1M updates, dry run, error log level):
334         - logging based on basic logger features: 36,2s
335         - logging based on advanced logger features (lazy logging): 21,2s
336         - conditional calling of logger methods enclosed inside condition: 8,6s
337         """
338
339         logger.info("Generator initialisation")
340         logger.info("  Target total number of prefixes to be introduced: " +
341                     str(self.total_prefix_amount))
342         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
343                     str(self.prefix_length_default))
344         logger.info("  My Autonomous System number: " +
345                     str(self.my_autonomous_system_default))
346         logger.info("  My Hold Time: " + str(self.hold_time_default))
347         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
348         logger.info("  Next Hop: " + str(self.next_hop_default))
349         logger.info("  Originator ID: " + str(self.originator_id_default))
350         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
351         logger.info("  Prefix count to be inserted at once: " +
352                     str(self.prefix_count_to_add_default))
353         logger.info("  Prefix count to be withdrawn at once: " +
354                     str(self.prefix_count_to_del_default))
355         logger.info("  Fast pre-fill up to " +
356                     str(self.total_prefix_amount -
357                         self.remaining_prefixes_threshold) + " prefixes")
358         logger.info("  Remaining number of prefixes to be processed " +
359                     "in parallel with withdrawals: " +
360                     str(self.remaining_prefixes_threshold))
361         logger.debug("  Prefix index range used after pre-fill procedure [" +
362                      str(self.randomize_lowest_default) + ", " +
363                      str(self.randomize_highest_default) + "]")
364         if self.single_update_default:
365             logger.info("  Common single UPDATE will be generated " +
366                         "for both NLRI & WITHDRAWN lists")
367         else:
368             logger.info("  Two separate UPDATEs will be generated " +
369                         "for each NLRI & WITHDRAWN lists")
370         if self.randomize_updates_default:
371             logger.info("  Generation of UPDATE messages will be randomized")
372         logger.info("  Let\'s go ...\n")
373
374         # TODO: Notification for hold timer expiration can be handy.
375
376     def store_results(self, file_name=None, threshold=None):
377         """ Stores specified results into files based on file_name value.
378
379         Arguments:
380             :param file_name: Trailing (common) part of result file names
381             :param threshold: Minimum number of sent updates needed for each
382                               result to be included into result csv file
383                               (mainly needed because of the result accuracy)
384         Returns:
385             :return: n/a
386         """
387         # default values handling
388         # TODO optimize default values handling (use e.g. dicionary.update() approach)
389         if file_name is None:
390             file_name = self.results_file_name_default
391         if threshold is None:
392             threshold = self.performance_threshold_default
393         # performance calculation
394         if self.phase1_updates_sent >= threshold:
395             totals1 = self.phase1_updates_sent
396             performance1 = int(self.phase1_updates_sent /
397                                (self.phase1_stop_time - self.phase1_start_time))
398         else:
399             totals1 = None
400             performance1 = None
401         if self.phase2_updates_sent >= threshold:
402             totals2 = self.phase2_updates_sent
403             performance2 = int(self.phase2_updates_sent /
404                                (self.phase2_stop_time - self.phase2_start_time))
405         else:
406             totals2 = None
407             performance2 = None
408
409         logger.info("#" * 10 + " Final results " + "#" * 10)
410         logger.info("Number of iterations: " + str(self.iteration))
411         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
412                     str(self.phase1_updates_sent))
413         logger.info("The pre-fill phase duration: " +
414                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
415         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
416                     str(self.phase2_updates_sent))
417         logger.info("The 2nd test phase duration: " +
418                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
419         logger.info("Threshold for performance reporting: " + str(threshold))
420
421         # making labels
422         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
423                         " route(s) per UPDATE")
424         if self.single_update_default:
425             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
426                                   "/-" + str(self.prefix_count_to_del_default) +
427                                   " routes per UPDATE")
428         else:
429             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
430                                   "/-" + str(self.prefix_count_to_del_default) +
431                                   " routes in two UPDATEs")
432         # collecting capacity and performance results
433         totals = {}
434         performance = {}
435         if totals1 is not None:
436             totals[phase1_label] = totals1
437             performance[phase1_label] = performance1
438         if totals2 is not None:
439             totals[phase2_label] = totals2
440             performance[phase2_label] = performance2
441         self.write_results_to_file(totals, "totals-" + file_name)
442         self.write_results_to_file(performance, "performance-" + file_name)
443
444     def write_results_to_file(self, results, file_name):
445         """Writes results to the csv plot file consumable by Jenkins.
446
447         Arguments:
448             :param file_name: Name of the (csv) file to be created
449         Returns:
450             :return: none
451         """
452         first_line = ""
453         second_line = ""
454         f = open(file_name, "wt")
455         try:
456             for key in sorted(results):
457                 first_line += key + ", "
458                 second_line += str(results[key]) + ", "
459             first_line = first_line[:-2]
460             second_line = second_line[:-2]
461             f.write(first_line + "\n")
462             f.write(second_line + "\n")
463             logger.info("Message generator performance results stored in " +
464                         file_name + ":")
465             logger.info("  " + first_line)
466             logger.info("  " + second_line)
467         finally:
468             f.close()
469
470     # Return pseudo-randomized (reproducible) index for selected range
471     def randomize_index(self, index, lowest=None, highest=None):
472         """Calculates pseudo-randomized index from selected range.
473
474         Arguments:
475             :param index: input index
476             :param lowest: the lowes index from the randomized area
477             :param highest: the highest index from the randomized area
478         Returns:
479             :return: the (pseudo)randomized index
480         Notes:
481             Created just as a fame for future generator enhancement.
482         """
483         # default values handling
484         # TODO optimize default values handling (use e.g. dicionary.update() approach)
485         if lowest is None:
486             lowest = self.randomize_lowest_default
487         if highest is None:
488             highest = self.randomize_highest_default
489         # randomize
490         if (index >= lowest) and (index <= highest):
491             # we are in the randomized range -> shuffle it inside
492             # the range (now just reverse the order)
493             new_index = highest - (index - lowest)
494         else:
495             # we are out of the randomized range -> nothing to do
496             new_index = index
497         return new_index
498
499     # Get list of prefixes
500     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
501                         prefix_len=None, prefix_count=None, randomize=None):
502         """Generates list of IP address prefixes.
503
504         Arguments:
505             :param slot_index: index of group of prefix addresses
506             :param slot_size: size of group of prefix addresses
507                 in [number of included prefixes]
508             :param prefix_base: IP address of the first prefix
509                 (slot_index = 0, prefix_index = 0)
510             :param prefix_len: length of the prefix in bites
511                 (the same as size of netmask)
512             :param prefix_count: number of prefixes to be returned
513                 from the specified slot
514         Returns:
515             :return: list of generated IP address prefixes
516         """
517         # default values handling
518         # TODO optimize default values handling (use e.g. dicionary.update() approach)
519         if slot_size is None:
520             slot_size = self.slot_size_default
521         if prefix_base is None:
522             prefix_base = self.prefix_base_default
523         if prefix_len is None:
524             prefix_len = self.prefix_length_default
525         if prefix_count is None:
526             prefix_count = slot_size
527         if randomize is None:
528             randomize = self.randomize_updates_default
529         # generating list of prefixes
530         indexes = []
531         prefixes = []
532         prefix_gap = 2 ** (32 - prefix_len)
533         for i in range(prefix_count):
534             prefix_index = slot_index * slot_size + i
535             if randomize:
536                 prefix_index = self.randomize_index(prefix_index)
537             indexes.append(prefix_index)
538             prefixes.append(prefix_base + prefix_index * prefix_gap)
539         if self.log_debug:
540             logger.debug("  Prefix slot index: " + str(slot_index))
541             logger.debug("  Prefix slot size: " + str(slot_size))
542             logger.debug("  Prefix count: " + str(prefix_count))
543             logger.debug("  Prefix indexes: " + str(indexes))
544             logger.debug("  Prefix list: " + str(prefixes))
545         return prefixes
546
547     def compose_update_message(self, prefix_count_to_add=None,
548                                prefix_count_to_del=None):
549         """Composes an UPDATE message
550
551         Arguments:
552             :param prefix_count_to_add: # of prefixes to put into NLRI list
553             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
554         Returns:
555             :return: encoded UPDATE message in HEX
556         Notes:
557             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
558             lists or common message wich includes both prefix lists.
559             Updates global counters.
560         """
561         # default values handling
562         # TODO optimize default values handling (use e.g. dicionary.update() approach)
563         if prefix_count_to_add is None:
564             prefix_count_to_add = self.prefix_count_to_add_default
565         if prefix_count_to_del is None:
566             prefix_count_to_del = self.prefix_count_to_del_default
567         # logging
568         if self.log_info and not (self.iteration % 1000):
569             logger.info("Iteration: " + str(self.iteration) +
570                         " - total remaining prefixes: " +
571                         str(self.remaining_prefixes))
572         if self.log_debug:
573             logger.debug("#" * 10 + " Iteration: " +
574                          str(self.iteration) + " " + "#" * 10)
575             logger.debug("Remaining prefixes: " +
576                          str(self.remaining_prefixes))
577         # scenario type & one-shot counter
578         straightforward_scenario = (self.remaining_prefixes >
579                                     self.remaining_prefixes_threshold)
580         if straightforward_scenario:
581             prefix_count_to_del = 0
582             if self.log_debug:
583                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
584             if not self.phase1_start_time:
585                 self.phase1_start_time = time.time()
586         else:
587             if self.log_debug:
588                 logger.debug("--- COMBINED SCENARIO ---")
589             if not self.phase2_start_time:
590                 self.phase2_start_time = time.time()
591         # tailor the number of prefixes if needed
592         prefix_count_to_add = (prefix_count_to_del +
593                                min(prefix_count_to_add - prefix_count_to_del,
594                                    self.remaining_prefixes))
595         # prefix slots selection for insertion and withdrawal
596         slot_index_to_add = self.iteration
597         slot_index_to_del = slot_index_to_add - self.slot_gap_default
598         # getting lists of prefixes for insertion in this iteration
599         if self.log_debug:
600             logger.debug("Prefixes to be inserted in this iteration:")
601         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
602                                                   prefix_count=prefix_count_to_add)
603         # getting lists of prefixes for withdrawal in this iteration
604         if self.log_debug:
605             logger.debug("Prefixes to be withdrawn in this iteration:")
606         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
607                                                   prefix_count=prefix_count_to_del)
608         # generating the mesage
609         if self.single_update_default:
610             # Send prefixes to be introduced and withdrawn
611             # in one UPDATE message
612             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
613                                           nlri_prefixes=prefix_list_to_add)
614         else:
615             # Send prefixes to be introduced and withdrawn
616             # in separate UPDATE messages (if needed)
617             msg_out = self.update_message(wr_prefixes=[],
618                                           nlri_prefixes=prefix_list_to_add)
619             if prefix_count_to_del:
620                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
621                                                nlri_prefixes=[])
622         # updating counters - who knows ... maybe I am last time here ;)
623         if straightforward_scenario:
624             self.phase1_stop_time = time.time()
625             self.phase1_updates_sent = self.updates_sent
626         else:
627             self.phase2_stop_time = time.time()
628             self.phase2_updates_sent = (self.updates_sent -
629                                         self.phase1_updates_sent)
630         # updating totals for the next iteration
631         self.iteration += 1
632         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
633         # returning the encoded message
634         return msg_out
635
636     # Section of message encoders
637
638     def open_message(self, version=None, my_autonomous_system=None,
639                      hold_time=None, bgp_identifier=None):
640         """Generates an OPEN Message (rfc4271#section-4.2)
641
642         Arguments:
643             :param version: see the rfc4271#section-4.2
644             :param my_autonomous_system: see the rfc4271#section-4.2
645             :param hold_time: see the rfc4271#section-4.2
646             :param bgp_identifier: see the rfc4271#section-4.2
647         Returns:
648             :return: encoded OPEN message in HEX
649         """
650
651         # default values handling
652         # TODO optimize default values handling (use e.g. dicionary.update() approach)
653         if version is None:
654             version = self.version_default
655         if my_autonomous_system is None:
656             my_autonomous_system = self.my_autonomous_system_default
657         if hold_time is None:
658             hold_time = self.hold_time_default
659         if bgp_identifier is None:
660             bgp_identifier = self.bgp_identifier_default
661
662         # Marker
663         marker_hex = "\xFF" * 16
664
665         # Type
666         type = 1
667         type_hex = struct.pack("B", type)
668
669         # version
670         version_hex = struct.pack("B", version)
671
672         # my_autonomous_system
673         # AS_TRANS value, 23456 decadic.
674         my_autonomous_system_2_bytes = 23456
675         # AS number is mappable to 2 bytes
676         if my_autonomous_system < 65536:
677             my_autonomous_system_2_bytes = my_autonomous_system
678         my_autonomous_system_hex_2_bytes = struct.pack(">H",
679                                                        my_autonomous_system)
680
681         # Hold Time
682         hold_time_hex = struct.pack(">H", hold_time)
683
684         # BGP Identifier
685         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
686
687         # Optional Parameters
688         optional_parameters_hex = ""
689         if self.rfc4760:
690             optional_parameter_hex = (
691                 "\x02"  # Param type ("Capability Ad")
692                 "\x06"  # Length (6 bytes)
693                 "\x01"  # Capability type (NLRI Unicast),
694                         # see RFC 4760, secton 8
695                 "\x04"  # Capability value length
696                 "\x00\x01"  # AFI (Ipv4)
697                 "\x00"  # (reserved)
698                 "\x01"  # SAFI (Unicast)
699             )
700             optional_parameters_hex += optional_parameter_hex
701
702         optional_parameter_hex = (
703             "\x02"  # Param type ("Capability Ad")
704             "\x06"  # Length (6 bytes)
705             "\x41"  # "32 bit AS Numbers Support"
706                     # (see RFC 6793, section 3)
707             "\x04"  # Capability value length
708         )
709         optional_parameter_hex += (
710             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
711         )
712         optional_parameters_hex += optional_parameter_hex
713
714         # Optional Parameters Length
715         optional_parameters_length = len(optional_parameters_hex)
716         optional_parameters_length_hex = struct.pack("B",
717                                                      optional_parameters_length)
718
719         # Length (big-endian)
720         length = (
721             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
722             len(my_autonomous_system_hex_2_bytes) +
723             len(hold_time_hex) + len(bgp_identifier_hex) +
724             len(optional_parameters_length_hex) +
725             len(optional_parameters_hex)
726         )
727         length_hex = struct.pack(">H", length)
728
729         # OPEN Message
730         message_hex = (
731             marker_hex +
732             length_hex +
733             type_hex +
734             version_hex +
735             my_autonomous_system_hex_2_bytes +
736             hold_time_hex +
737             bgp_identifier_hex +
738             optional_parameters_length_hex +
739             optional_parameters_hex
740         )
741
742         if self.log_debug:
743             logger.debug("OPEN message encoding")
744             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
745             logger.debug("  Length=" + str(length) + " (0x" +
746                          binascii.hexlify(length_hex) + ")")
747             logger.debug("  Type=" + str(type) + " (0x" +
748                          binascii.hexlify(type_hex) + ")")
749             logger.debug("  Version=" + str(version) + " (0x" +
750                          binascii.hexlify(version_hex) + ")")
751             logger.debug("  My Autonomous System=" +
752                          str(my_autonomous_system_2_bytes) + " (0x" +
753                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
754                          ")")
755             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
756                          binascii.hexlify(hold_time_hex) + ")")
757             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
758                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
759             logger.debug("  Optional Parameters Length=" +
760                          str(optional_parameters_length) + " (0x" +
761                          binascii.hexlify(optional_parameters_length_hex) +
762                          ")")
763             logger.debug("  Optional Parameters=0x" +
764                          binascii.hexlify(optional_parameters_hex))
765             logger.debug("OPEN message encoded: 0x%s",
766                          binascii.b2a_hex(message_hex))
767
768         return message_hex
769
770     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
771                        wr_prefix_length=None, nlri_prefix_length=None,
772                        my_autonomous_system=None, next_hop=None,
773                        originator_id=None, cluster_list_item=None):
774         """Generates an UPDATE Message (rfc4271#section-4.3)
775
776         Arguments:
777             :param wr_prefixes: see the rfc4271#section-4.3
778             :param nlri_prefixes: see the rfc4271#section-4.3
779             :param wr_prefix_length: see the rfc4271#section-4.3
780             :param nlri_prefix_length: see the rfc4271#section-4.3
781             :param my_autonomous_system: see the rfc4271#section-4.3
782             :param next_hop: see the rfc4271#section-4.3
783         Returns:
784             :return: encoded UPDATE message in HEX
785         """
786
787         # default values handling
788         # TODO optimize default values handling (use e.g. dicionary.update() approach)
789         if wr_prefixes is None:
790             wr_prefixes = self.wr_prefixes_default
791         if nlri_prefixes is None:
792             nlri_prefixes = self.nlri_prefixes_default
793         if wr_prefix_length is None:
794             wr_prefix_length = self.prefix_length_default
795         if nlri_prefix_length is None:
796             nlri_prefix_length = self.prefix_length_default
797         if my_autonomous_system is None:
798             my_autonomous_system = self.my_autonomous_system_default
799         if next_hop is None:
800             next_hop = self.next_hop_default
801         if originator_id is None:
802             originator_id = self.originator_id_default
803         if cluster_list_item is None:
804             cluster_list_item = self.cluster_list_item_default
805
806         # Marker
807         marker_hex = "\xFF" * 16
808
809         # Type
810         type = 2
811         type_hex = struct.pack("B", type)
812
813         # Withdrawn Routes
814         bytes = ((wr_prefix_length - 1) / 8) + 1
815         withdrawn_routes_hex = ""
816         for prefix in wr_prefixes:
817             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
818                                    struct.pack(">I", int(prefix))[:bytes])
819             withdrawn_routes_hex += withdrawn_route_hex
820
821         # Withdrawn Routes Length
822         withdrawn_routes_length = len(withdrawn_routes_hex)
823         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
824
825         # TODO: to replace hardcoded string by encoding?
826         # Path Attributes
827         path_attributes_hex = ""
828         if nlri_prefixes != []:
829             path_attributes_hex += (
830                 "\x40"  # Flags ("Well-Known")
831                 "\x01"  # Type (ORIGIN)
832                 "\x01"  # Length (1)
833                 "\x00"  # Origin: IGP
834             )
835             path_attributes_hex += (
836                 "\x40"  # Flags ("Well-Known")
837                 "\x02"  # Type (AS_PATH)
838                 "\x06"  # Length (6)
839                 "\x02"  # AS segment type (AS_SEQUENCE)
840                 "\x01"  # AS segment length (1)
841             )
842             my_as_hex = struct.pack(">I", my_autonomous_system)
843             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
844             path_attributes_hex += (
845                 "\x40"  # Flags ("Well-Known")
846                 "\x03"  # Type (NEXT_HOP)
847                 "\x04"  # Length (4)
848             )
849             next_hop_hex = struct.pack(">I", int(next_hop))
850             path_attributes_hex += (
851                 next_hop_hex  # IP address of the next hop (4 bytes)
852             )
853             if originator_id is not None:
854                 path_attributes_hex += (
855                     "\x80"  # Flags ("Optional, non-transitive")
856                     "\x09"  # Type (ORIGINATOR_ID)
857                     "\x04"  # Length (4)
858                             # ORIGINATOR_ID (4 bytes)
859                     + struct.pack(">I", int(originator_id))
860                 )
861             if cluster_list_item is not None:
862                 path_attributes_hex += (
863                     "\x80"  # Flags ("Optional, non-transitive")
864                     "\x09"  # Type (CLUSTER_LIST)
865                     "\x04"  # Length (4)
866                             # one CLUSTER_LIST item (4 bytes)
867                     + struct.pack(">I", int(cluster_list_item))
868                 )
869
870         # Total Path Attributes Length
871         total_path_attributes_length = len(path_attributes_hex)
872         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
873
874         # Network Layer Reachability Information
875         bytes = ((nlri_prefix_length - 1) / 8) + 1
876         nlri_hex = ""
877         for prefix in nlri_prefixes:
878             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
879                                struct.pack(">I", int(prefix))[:bytes])
880             nlri_hex += nlri_prefix_hex
881
882         # Length (big-endian)
883         length = (
884             len(marker_hex) + 2 + len(type_hex) +
885             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
886             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
887             len(nlri_hex))
888         length_hex = struct.pack(">H", length)
889
890         # UPDATE Message
891         message_hex = (
892             marker_hex +
893             length_hex +
894             type_hex +
895             withdrawn_routes_length_hex +
896             withdrawn_routes_hex +
897             total_path_attributes_length_hex +
898             path_attributes_hex +
899             nlri_hex
900         )
901
902         if self.log_debug:
903             logger.debug("UPDATE message encoding")
904             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
905             logger.debug("  Length=" + str(length) + " (0x" +
906                          binascii.hexlify(length_hex) + ")")
907             logger.debug("  Type=" + str(type) + " (0x" +
908                          binascii.hexlify(type_hex) + ")")
909             logger.debug("  withdrawn_routes_length=" +
910                          str(withdrawn_routes_length) + " (0x" +
911                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
912             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
913                          str(wr_prefix_length) + " (0x" +
914                          binascii.hexlify(withdrawn_routes_hex) + ")")
915             if total_path_attributes_length:
916                 logger.debug("  Total Path Attributes Length=" +
917                              str(total_path_attributes_length) + " (0x" +
918                              binascii.hexlify(total_path_attributes_length_hex) + ")")
919                 logger.debug("  Path Attributes=" + "(0x" +
920                              binascii.hexlify(path_attributes_hex) + ")")
921                 logger.debug("    Origin=IGP")
922                 logger.debug("    AS path=" + str(my_autonomous_system))
923                 logger.debug("    Next hop=" + str(next_hop))
924                 if originator_id is not None:
925                     logger.debug("    Originator id=" + str(originator_id))
926                 if cluster_list_item is not None:
927                     logger.debug("    Cluster list=" + str(cluster_list_item))
928             logger.debug("  Network Layer Reachability Information=" +
929                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
930                          " (0x" + binascii.hexlify(nlri_hex) + ")")
931             logger.debug("UPDATE message encoded: 0x" +
932                          binascii.b2a_hex(message_hex))
933
934         # updating counter
935         self.updates_sent += 1
936         # returning encoded message
937         return message_hex
938
939     def notification_message(self, error_code, error_subcode, data_hex=""):
940         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
941
942         Arguments:
943             :param error_code: see the rfc4271#section-4.5
944             :param error_subcode: see the rfc4271#section-4.5
945             :param data_hex: see the rfc4271#section-4.5
946         Returns:
947             :return: encoded NOTIFICATION message in HEX
948         """
949
950         # Marker
951         marker_hex = "\xFF" * 16
952
953         # Type
954         type = 3
955         type_hex = struct.pack("B", type)
956
957         # Error Code
958         error_code_hex = struct.pack("B", error_code)
959
960         # Error Subode
961         error_subcode_hex = struct.pack("B", error_subcode)
962
963         # Length (big-endian)
964         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
965                   len(error_subcode_hex) + len(data_hex))
966         length_hex = struct.pack(">H", length)
967
968         # NOTIFICATION Message
969         message_hex = (
970             marker_hex +
971             length_hex +
972             type_hex +
973             error_code_hex +
974             error_subcode_hex +
975             data_hex
976         )
977
978         if self.log_debug:
979             logger.debug("NOTIFICATION message encoding")
980             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
981             logger.debug("  Length=" + str(length) + " (0x" +
982                          binascii.hexlify(length_hex) + ")")
983             logger.debug("  Type=" + str(type) + " (0x" +
984                          binascii.hexlify(type_hex) + ")")
985             logger.debug("  Error Code=" + str(error_code) + " (0x" +
986                          binascii.hexlify(error_code_hex) + ")")
987             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
988                          binascii.hexlify(error_subcode_hex) + ")")
989             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
990             logger.debug("NOTIFICATION message encoded: 0x%s",
991                          binascii.b2a_hex(message_hex))
992
993         return message_hex
994
995     def keepalive_message(self):
996         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
997
998         Returns:
999             :return: encoded KEEP ALIVE message in HEX
1000         """
1001
1002         # Marker
1003         marker_hex = "\xFF" * 16
1004
1005         # Type
1006         type = 4
1007         type_hex = struct.pack("B", type)
1008
1009         # Length (big-endian)
1010         length = len(marker_hex) + 2 + len(type_hex)
1011         length_hex = struct.pack(">H", length)
1012
1013         # KEEP ALIVE Message
1014         message_hex = (
1015             marker_hex +
1016             length_hex +
1017             type_hex
1018         )
1019
1020         if self.log_debug:
1021             logger.debug("KEEP ALIVE message encoding")
1022             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1023             logger.debug("  Length=" + str(length) + " (0x" +
1024                          binascii.hexlify(length_hex) + ")")
1025             logger.debug("  Type=" + str(type) + " (0x" +
1026                          binascii.hexlify(type_hex) + ")")
1027             logger.debug("KEEP ALIVE message encoded: 0x%s",
1028                          binascii.b2a_hex(message_hex))
1029
1030         return message_hex
1031
1032
1033 class TimeTracker(object):
1034     """Class for tracking timers, both for my keepalives and
1035     peer's hold time.
1036     """
1037
1038     def __init__(self, msg_in):
1039         """Initialisation. based on defaults and OPEN message from peer.
1040
1041         Arguments:
1042             msg_in: the OPEN message received from peer.
1043         """
1044         # Note: Relative time is always named timedelta, to stress that
1045         # the (non-delta) time is absolute.
1046         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1047         # Upper bound for being stuck in the same state, we should
1048         # at least report something before continuing.
1049         # Negotiate the hold timer by taking the smaller
1050         # of the 2 values (mine and the peer's).
1051         hold_timedelta = 180  # Not an attribute of self yet.
1052         # TODO: Make the default value configurable,
1053         # default value could mirror what peer said.
1054         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1055         if hold_timedelta > peer_hold_timedelta:
1056             hold_timedelta = peer_hold_timedelta
1057         if hold_timedelta != 0 and hold_timedelta < 3:
1058             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1059             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1060         self.hold_timedelta = hold_timedelta
1061         # If we do not hear from peer this long, we assume it has died.
1062         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1063         # Upper limit for duration between messages, to avoid being
1064         # declared to be dead.
1065         # The same as calling snapshot(), but also declares a field.
1066         self.snapshot_time = time.time()
1067         # Sometimes we need to store time. This is where to get
1068         # the value from afterwards. Time_keepalive may be too strict.
1069         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1070         # At this time point, peer will be declared dead.
1071         self.my_keepalive_time = None  # to be set later
1072         # At this point, we should be sending keepalive message.
1073
1074     def snapshot(self):
1075         """Store current time in instance data to use later."""
1076         # Read as time before something interesting was called.
1077         self.snapshot_time = time.time()
1078
1079     def reset_peer_hold_time(self):
1080         """Move hold time to future as peer has just proven it still lives."""
1081         self.peer_hold_time = time.time() + self.hold_timedelta
1082
1083     # Some methods could rely on self.snapshot_time, but it is better
1084     # to require user to provide it explicitly.
1085     def reset_my_keepalive_time(self, keepalive_time):
1086         """Calculate and set the next my KEEP ALIVE timeout time
1087
1088         Arguments:
1089             :keepalive_time: the initial value of the KEEP ALIVE timer
1090         """
1091         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1092
1093     def is_time_for_my_keepalive(self):
1094         """Check for my KEEP ALIVE timeout occurence"""
1095         if self.hold_timedelta == 0:
1096             return False
1097         return self.snapshot_time >= self.my_keepalive_time
1098
1099     def get_next_event_time(self):
1100         """Set the time of the next expected or to be sent KEEP ALIVE"""
1101         if self.hold_timedelta == 0:
1102             return self.snapshot_time + 86400
1103         return min(self.my_keepalive_time, self.peer_hold_time)
1104
1105     def check_peer_hold_time(self, snapshot_time):
1106         """Raise error if nothing was read from peer until specified time."""
1107         # Hold time = 0 means keepalive checking off.
1108         if self.hold_timedelta != 0:
1109             # time.time() may be too strict
1110             if snapshot_time > self.peer_hold_time:
1111                 logger.error("Peer has overstepped the hold timer.")
1112                 raise RuntimeError("Peer has overstepped the hold timer.")
1113                 # TODO: Include hold_timedelta?
1114                 # TODO: Add notification sending (attempt). That means
1115                 # move to write tracker.
1116
1117
1118 class ReadTracker(object):
1119     """Class for tracking read of mesages chunk by chunk and
1120     for idle waiting.
1121     """
1122
1123     def __init__(self, bgp_socket, timer):
1124         """The reader initialisation.
1125
1126         Arguments:
1127             bgp_socket: socket to be used for sending
1128             timer: timer to be used for scheduling
1129         """
1130         # References to outside objects.
1131         self.socket = bgp_socket
1132         self.timer = timer
1133         # BGP marker length plus length field length.
1134         self.header_length = 18
1135         # TODO: make it class (constant) attribute
1136         # Computation of where next chunk ends depends on whether
1137         # we are beyond length field.
1138         self.reading_header = True
1139         # Countdown towards next size computation.
1140         self.bytes_to_read = self.header_length
1141         # Incremental buffer for message under read.
1142         self.msg_in = ""
1143         # Initialising counters
1144         self.updates_received = 0
1145         self.prefixes_introduced = 0
1146         self.prefixes_withdrawn = 0
1147         self.rx_idle_time = 0
1148         self.rx_activity_detected = True
1149
1150     def read_message_chunk(self):
1151         """Read up to one message
1152
1153         Note:
1154             Currently it does not return anything.
1155         """
1156         # TODO: We could return the whole message, currently not needed.
1157         # We assume the socket is readable.
1158         chunk_message = self.socket.recv(self.bytes_to_read)
1159         self.msg_in += chunk_message
1160         self.bytes_to_read -= len(chunk_message)
1161         # TODO: bytes_to_read < 0 is not possible, right?
1162         if not self.bytes_to_read:
1163             # Finished reading a logical block.
1164             if self.reading_header:
1165                 # The logical block was a BGP header.
1166                 # Now we know the size of the message.
1167                 self.reading_header = False
1168                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1169                                       self.header_length)
1170             else:  # We have finished reading the body of the message.
1171                 # Peer has just proven it is still alive.
1172                 self.timer.reset_peer_hold_time()
1173                 # TODO: Do we want to count received messages?
1174                 # This version ignores the received message.
1175                 # TODO: Should we do validation and exit on anything
1176                 # besides update or keepalive?
1177                 # Prepare state for reading another message.
1178                 message_type_hex = self.msg_in[self.header_length]
1179                 if message_type_hex == "\x01":
1180                     logger.info("OPEN message received: 0x%s",
1181                                 binascii.b2a_hex(self.msg_in))
1182                 elif message_type_hex == "\x02":
1183                     logger.debug("UPDATE message received: 0x%s",
1184                                  binascii.b2a_hex(self.msg_in))
1185                     self.decode_update_message(self.msg_in)
1186                 elif message_type_hex == "\x03":
1187                     logger.info("NOTIFICATION message received: 0x%s",
1188                                 binascii.b2a_hex(self.msg_in))
1189                 elif message_type_hex == "\x04":
1190                     logger.info("KEEP ALIVE message received: 0x%s",
1191                                 binascii.b2a_hex(self.msg_in))
1192                 else:
1193                     logger.warning("Unexpected message received: 0x%s",
1194                                    binascii.b2a_hex(self.msg_in))
1195                 self.msg_in = ""
1196                 self.reading_header = True
1197                 self.bytes_to_read = self.header_length
1198         # We should not act upon peer_hold_time if we are reading
1199         # something right now.
1200         return
1201
1202     def decode_path_attributes(self, path_attributes_hex):
1203         """Decode the Path Attributes field (rfc4271#section-4.3)
1204
1205         Arguments:
1206             :path_attributes: path_attributes field to be decoded in hex
1207         Returns:
1208             :return: None
1209         """
1210         hex_to_decode = path_attributes_hex
1211
1212         while len(hex_to_decode):
1213             attr_flags_hex = hex_to_decode[0]
1214             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1215 #            attr_optional_bit = attr_flags & 128
1216 #            attr_transitive_bit = attr_flags & 64
1217 #            attr_partial_bit = attr_flags & 32
1218             attr_extended_length_bit = attr_flags & 16
1219
1220             attr_type_code_hex = hex_to_decode[1]
1221             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1222
1223             if attr_extended_length_bit:
1224                 attr_length_hex = hex_to_decode[2:4]
1225                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1226                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1227                 hex_to_decode = hex_to_decode[4 + attr_length:]
1228             else:
1229                 attr_length_hex = hex_to_decode[2]
1230                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1231                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1232                 hex_to_decode = hex_to_decode[3 + attr_length:]
1233
1234             if attr_type_code == 1:
1235                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1236                              binascii.b2a_hex(attr_flags_hex))
1237                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1238             elif attr_type_code == 2:
1239                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1240                              binascii.b2a_hex(attr_flags_hex))
1241                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1242             elif attr_type_code == 3:
1243                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1244                              binascii.b2a_hex(attr_flags_hex))
1245                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1246             elif attr_type_code == 4:
1247                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1248                              binascii.b2a_hex(attr_flags_hex))
1249                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1250             elif attr_type_code == 5:
1251                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1252                              binascii.b2a_hex(attr_flags_hex))
1253                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1254             elif attr_type_code == 6:
1255                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1256                              binascii.b2a_hex(attr_flags_hex))
1257                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1258             elif attr_type_code == 7:
1259                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1260                              binascii.b2a_hex(attr_flags_hex))
1261                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1262             elif attr_type_code == 9:  # rfc4456#section-8
1263                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1264                              binascii.b2a_hex(attr_flags_hex))
1265                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1266             elif attr_type_code == 10:  # rfc4456#section-8
1267                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1268                              binascii.b2a_hex(attr_flags_hex))
1269                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1270             elif attr_type_code == 14:  # rfc4760#section-3
1271                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1272                              binascii.b2a_hex(attr_flags_hex))
1273                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1274                 address_family_identifier_hex = attr_value_hex[0:2]
1275                 logger.debug("  Address Family Identifier=0x%s",
1276                              binascii.b2a_hex(address_family_identifier_hex))
1277                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1278                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1279                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1280                 next_hop_netaddr_len_hex = attr_value_hex[3]
1281                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1282                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1283                              next_hop_netaddr_len,
1284                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1285                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1286                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1287                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1288                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1289                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1290                 logger.debug("  Reserved=0x%s",
1291                              binascii.b2a_hex(reserved_hex))
1292                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1293                 logger.debug("  Network Layer Reachability Information=0x%s",
1294                              binascii.b2a_hex(nlri_hex))
1295                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1296                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1297                 for prefix in nlri_prefix_list:
1298                     logger.debug("  nlri_prefix_received: %s", prefix)
1299                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1300             elif attr_type_code == 15:  # rfc4760#section-4
1301                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1302                              binascii.b2a_hex(attr_flags_hex))
1303                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1304                 address_family_identifier_hex = attr_value_hex[0:2]
1305                 logger.debug("  Address Family Identifier=0x%s",
1306                              binascii.b2a_hex(address_family_identifier_hex))
1307                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1308                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1309                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1310                 wd_hex = attr_value_hex[3:]
1311                 logger.debug("  Withdrawn Routes=0x%s",
1312                              binascii.b2a_hex(wd_hex))
1313                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1314                 logger.debug("  Withdrawn routes prefix list: %s",
1315                              wdr_prefix_list)
1316                 for prefix in wdr_prefix_list:
1317                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1318                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1319             else:
1320                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1321                              binascii.b2a_hex(attr_flags_hex))
1322                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1323         return None
1324
1325     def decode_update_message(self, msg):
1326         """Decode an UPDATE message (rfc4271#section-4.3)
1327
1328         Arguments:
1329             :msg: message to be decoded in hex
1330         Returns:
1331             :return: None
1332         """
1333         logger.debug("Decoding update message:")
1334         # message header - marker
1335         marker_hex = msg[:16]
1336         logger.debug("Message header marker: 0x%s",
1337                      binascii.b2a_hex(marker_hex))
1338         # message header - message length
1339         msg_length_hex = msg[16:18]
1340         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1341         logger.debug("Message lenght: 0x%s (%s)",
1342                      binascii.b2a_hex(msg_length_hex), msg_length)
1343         # message header - message type
1344         msg_type_hex = msg[18:19]
1345         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1346         if msg_type == 2:
1347             logger.debug("Message type: 0x%s (update)",
1348                          binascii.b2a_hex(msg_type_hex))
1349             # withdrawn routes length
1350             wdr_length_hex = msg[19:21]
1351             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1352             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1353                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1354             # withdrawn routes
1355             wdr_hex = msg[21:21 + wdr_length]
1356             logger.debug("Withdrawn routes: 0x%s",
1357                          binascii.b2a_hex(wdr_hex))
1358             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1359             logger.debug("Withdrawn routes prefix list: %s",
1360                          wdr_prefix_list)
1361             for prefix in wdr_prefix_list:
1362                 logger.debug("withdrawn_prefix_received: %s", prefix)
1363             # total path attribute length
1364             total_pa_length_offset = 21 + wdr_length
1365             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1366             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1367             logger.debug("Total path attribute lenght: 0x%s (%s)",
1368                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1369             # path attributes
1370             pa_offset = total_pa_length_offset + 2
1371             pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1372             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1373             self.decode_path_attributes(pa_hex)
1374             # network layer reachability information length
1375             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1376             logger.debug("Calculated NLRI length: %s", nlri_length)
1377             # network layer reachability information
1378             nlri_offset = pa_offset + total_pa_length
1379             nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1380             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1381             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1382             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1383             for prefix in nlri_prefix_list:
1384                 logger.debug("nlri_prefix_received: %s", prefix)
1385             # Updating counters
1386             self.updates_received += 1
1387             self.prefixes_introduced += len(nlri_prefix_list)
1388             self.prefixes_withdrawn += len(wdr_prefix_list)
1389         else:
1390             logger.error("Unexpeced message type 0x%s in 0x%s",
1391                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1392
1393     def wait_for_read(self):
1394         """Read message until timeout (next expected event).
1395
1396         Note:
1397             Used when no more updates has to be sent to avoid busy-wait.
1398             Currently it does not return anything.
1399         """
1400         # Compute time to the first predictable state change
1401         event_time = self.timer.get_next_event_time()
1402         # snapshot_time would be imprecise
1403         wait_timedelta = min(event_time - time.time(), 10)
1404         if wait_timedelta < 0:
1405             # The program got around to waiting to an event in "very near
1406             # future" so late that it became a "past" event, thus tell
1407             # "select" to not wait at all. Passing negative timedelta to
1408             # select() would lead to either waiting forever (for -1) or
1409             # select.error("Invalid parameter") (for everything else).
1410             wait_timedelta = 0
1411         # And wait for event or something to read.
1412
1413         if not self.rx_activity_detected or not (self.updates_received % 100):
1414             # right time to write statistics to the log (not for every update and
1415             # not too frequently to avoid having large log files)
1416             logger.info("total_received_update_message_counter: %s",
1417                         self.updates_received)
1418             logger.info("total_received_nlri_prefix_counter: %s",
1419                         self.prefixes_introduced)
1420             logger.info("total_received_withdrawn_prefix_counter: %s",
1421                         self.prefixes_withdrawn)
1422
1423         start_time = time.time()
1424         select.select([self.socket], [], [self.socket], wait_timedelta)
1425         timedelta = time.time() - start_time
1426         self.rx_idle_time += timedelta
1427         self.rx_activity_detected = timedelta < 1
1428
1429         if not self.rx_activity_detected or not (self.updates_received % 100):
1430             # right time to write statistics to the log (not for every update and
1431             # not too frequently to avoid having large log files)
1432             logger.info("... idle for %.3fs", timedelta)
1433             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1434         return
1435
1436
1437 class WriteTracker(object):
1438     """Class tracking enqueueing messages and sending chunks of them."""
1439
1440     def __init__(self, bgp_socket, generator, timer):
1441         """The writter initialisation.
1442
1443         Arguments:
1444             bgp_socket: socket to be used for sending
1445             generator: generator to be used for message generation
1446             timer: timer to be used for scheduling
1447         """
1448         # References to outside objects,
1449         self.socket = bgp_socket
1450         self.generator = generator
1451         self.timer = timer
1452         # Really new fields.
1453         # TODO: Would attribute docstrings add anything substantial?
1454         self.sending_message = False
1455         self.bytes_to_send = 0
1456         self.msg_out = ""
1457
1458     def enqueue_message_for_sending(self, message):
1459         """Enqueue message and change state.
1460
1461         Arguments:
1462             message: message to be enqueued into the msg_out buffer
1463         """
1464         self.msg_out += message
1465         self.bytes_to_send += len(message)
1466         self.sending_message = True
1467
1468     def send_message_chunk_is_whole(self):
1469         """Send enqueued data from msg_out buffer
1470
1471         Returns:
1472             :return: true if no remaining data to send
1473         """
1474         # We assume there is a msg_out to send and socket is writable.
1475         # print "going to send", repr(self.msg_out)
1476         self.timer.snapshot()
1477         bytes_sent = self.socket.send(self.msg_out)
1478         # Forget the part of message that was sent.
1479         self.msg_out = self.msg_out[bytes_sent:]
1480         self.bytes_to_send -= bytes_sent
1481         if not self.bytes_to_send:
1482             # TODO: Is it possible to hit negative bytes_to_send?
1483             self.sending_message = False
1484             # We should have reset hold timer on peer side.
1485             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1486             # The possible reason for not prioritizing reads is gone.
1487             return True
1488         return False
1489
1490
1491 class StateTracker(object):
1492     """Main loop has state so complex it warrants this separate class."""
1493
1494     def __init__(self, bgp_socket, generator, timer):
1495         """The state tracker initialisation.
1496
1497         Arguments:
1498             bgp_socket: socket to be used for sending / receiving
1499             generator: generator to be used for message generation
1500             timer: timer to be used for scheduling
1501         """
1502         # References to outside objects.
1503         self.socket = bgp_socket
1504         self.generator = generator
1505         self.timer = timer
1506         # Sub-trackers.
1507         self.reader = ReadTracker(bgp_socket, timer)
1508         self.writer = WriteTracker(bgp_socket, generator, timer)
1509         # Prioritization state.
1510         self.prioritize_writing = False
1511         # In general, we prioritize reading over writing. But in order
1512         # not to get blocked by neverending reads, we should
1513         # check whether we are not risking running out of holdtime.
1514         # So in some situations, this field is set to True to attempt
1515         # finishing sending a message, after which this field resets
1516         # back to False.
1517         # TODO: Alternative is to switch fairly between reading and
1518         # writing (called round robin from now on).
1519         # Message counting is done in generator.
1520
1521     def perform_one_loop_iteration(self):
1522         """ The main loop iteration
1523
1524         Notes:
1525             Calculates priority, resolves all conditions, calls
1526             appropriate method and returns to caller to repeat.
1527         """
1528         self.timer.snapshot()
1529         if not self.prioritize_writing:
1530             if self.timer.is_time_for_my_keepalive():
1531                 if not self.writer.sending_message:
1532                     # We need to schedule a keepalive ASAP.
1533                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1534                     logger.info("KEEP ALIVE is sent.")
1535                 # We are sending a message now, so let's prioritize it.
1536                 self.prioritize_writing = True
1537         # Now we know what our priorities are, we have to check
1538         # which actions are available.
1539         # socket.socket() returns three lists,
1540         # we store them to list of lists.
1541         list_list = select.select([self.socket], [self.socket], [self.socket],
1542                                   self.timer.report_timedelta)
1543         read_list, write_list, except_list = list_list
1544         # Lists are unpacked, each is either [] or [self.socket],
1545         # so we will test them as boolean.
1546         if except_list:
1547             logger.error("Exceptional state on the socket.")
1548             raise RuntimeError("Exceptional state on socket", self.socket)
1549         # We will do either read or write.
1550         if not (self.prioritize_writing and write_list):
1551             # Either we have no reason to rush writes,
1552             # or the socket is not writable.
1553             # We are focusing on reading here.
1554             if read_list:  # there is something to read indeed
1555                 # In this case we want to read chunk of message
1556                 # and repeat the select,
1557                 self.reader.read_message_chunk()
1558                 return
1559             # We were focusing on reading, but nothing to read was there.
1560             # Good time to check peer for hold timer.
1561             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1562             # Quiet on the read front, we can have attempt to write.
1563         if write_list:
1564             # Either we really want to reset peer's view of our hold
1565             # timer, or there was nothing to read.
1566             # Were we in the middle of sending a message?
1567             if self.writer.sending_message:
1568                 # Was it the end of a message?
1569                 whole = self.writer.send_message_chunk_is_whole()
1570                 # We were pressed to send something and we did it.
1571                 if self.prioritize_writing and whole:
1572                     # We prioritize reading again.
1573                     self.prioritize_writing = False
1574                 return
1575             # Finally to check if still update messages to be generated.
1576             if self.generator.remaining_prefixes:
1577                 msg_out = self.generator.compose_update_message()
1578                 if not self.generator.remaining_prefixes:
1579                     # We have just finished update generation,
1580                     # end-of-rib is due.
1581                     logger.info("All update messages generated.")
1582                     logger.info("Storing performance results.")
1583                     self.generator.store_results()
1584                     logger.info("Finally an END-OF-RIB is sent.")
1585                     msg_out += self.generator.update_message(wr_prefixes=[],
1586                                                              nlri_prefixes=[])
1587                 self.writer.enqueue_message_for_sending(msg_out)
1588                 # Attempt for real sending to be done in next iteration.
1589                 return
1590             # Nothing to write anymore.
1591             # To avoid busy loop, we do idle waiting here.
1592             self.reader.wait_for_read()
1593             return
1594         # We can neither read nor write.
1595         logger.warning("Input and output both blocked for " +
1596                        str(self.timer.report_timedelta) + " seconds.")
1597         # FIXME: Are we sure select has been really waiting
1598         # the whole period?
1599         return
1600
1601
1602 def create_logger(loglevel, logfile):
1603     """Create logger object
1604
1605     Arguments:
1606         :loglevel: log level
1607         :logfile: log file name
1608     Returns:
1609         :return: logger object
1610     """
1611     logger = logging.getLogger("logger")
1612     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1613     console_handler = logging.StreamHandler()
1614     file_handler = logging.FileHandler(logfile, mode="w")
1615     console_handler.setFormatter(log_formatter)
1616     file_handler.setFormatter(log_formatter)
1617     logger.addHandler(console_handler)
1618     logger.addHandler(file_handler)
1619     logger.setLevel(loglevel)
1620     return logger
1621
1622
1623 def job(arguments):
1624     """One time initialisation and iterations looping.
1625     Notes:
1626         Establish BGP connection and run iterations.
1627
1628     Arguments:
1629         :arguments: Command line arguments
1630     Returns:
1631         :return: None
1632     """
1633     bgp_socket = establish_connection(arguments)
1634     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1635     # Receive open message before sending anything.
1636     # FIXME: Add parameter to send default open message first,
1637     # to work with "you first" peers.
1638     msg_in = read_open_message(bgp_socket)
1639     timer = TimeTracker(msg_in)
1640     generator = MessageGenerator(arguments)
1641     msg_out = generator.open_message()
1642     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1643     # Send our open message to the peer.
1644     bgp_socket.send(msg_out)
1645     # Wait for confirming keepalive.
1646     # TODO: Surely in just one packet?
1647     # Using exact keepalive length to not to see possible updates.
1648     msg_in = bgp_socket.recv(19)
1649     if msg_in != generator.keepalive_message():
1650         logger.error("Open not confirmed by keepalive, instead got " +
1651                      binascii.hexlify(msg_in))
1652         raise MessageError("Open not confirmed by keepalive, instead got",
1653                            msg_in)
1654     timer.reset_peer_hold_time()
1655     # Send the keepalive to indicate the connection is accepted.
1656     timer.snapshot()  # Remember this time.
1657     msg_out = generator.keepalive_message()
1658     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1659     bgp_socket.send(msg_out)
1660     # Use the remembered time.
1661     timer.reset_my_keepalive_time(timer.snapshot_time)
1662     # End of initial handshake phase.
1663     state = StateTracker(bgp_socket, generator, timer)
1664     while True:  # main reactor loop
1665         state.perform_one_loop_iteration()
1666
1667
1668 def threaded_job(arguments):
1669     """Run the job threaded
1670
1671     Arguments:
1672         :arguments: Command line arguments
1673     Returns:
1674         :return: None
1675     """
1676     amount_left = arguments.amount
1677     utils_left = arguments.multiplicity
1678     prefix_current = arguments.firstprefix
1679     myip_current = arguments.myip
1680     thread_args = []
1681
1682     while 1:
1683         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1684         amount_left -= amount_per_util
1685         utils_left -= 1
1686
1687         args = deepcopy(arguments)
1688         args.amount = amount_per_util
1689         args.firstprefix = prefix_current
1690         args.myip = myip_current
1691         thread_args.append(args)
1692
1693         if not utils_left:
1694             break
1695         prefix_current += amount_per_util * 16
1696         myip_current += 1
1697
1698     try:
1699         # Create threads
1700         for t in thread_args:
1701             thread.start_new_thread(job, (t,))
1702     except Exception:
1703         print "Error: unable to start thread."
1704         raise SystemExit(2)
1705
1706     # Work remains forever
1707     while 1:
1708         time.sleep(5)
1709
1710
1711 if __name__ == "__main__":
1712     arguments = parse_arguments()
1713     logger = create_logger(arguments.loglevel, arguments.logfile)
1714     if arguments.multiplicity > 1:
1715         threaded_job(arguments)
1716     else:
1717         job(arguments)