Carrying LSP State Information in BGP
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 import argparse
15 import binascii
16 import ipaddr
17 import select
18 import socket
19 import time
20 import logging
21 import struct
22
23 import thread
24 from copy import deepcopy
25
26
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
31
32
33 def parse_arguments():
34     """Use argparse to get arguments,
35
36     Returns:
37         :return: args object.
38     """
39     parser = argparse.ArgumentParser()
40     # TODO: Should we use --argument-names-with-spaces?
41     str_help = "Autonomous System number use in the stream."
42     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43     # FIXME: We are acting as iBGP peer,
44     # we should mirror AS number from peer's open message.
45     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46     parser.add_argument("--amount", default="1", type=int, help=str_help)
47     str_help = "Maximum number of IP prefixes to be announced in one iteration"
48     parser.add_argument("--insert", default="1", type=int, help=str_help)
49     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51     str_help = "The number of prefixes to process without withdrawals"
52     parser.add_argument("--prefill", default="0", type=int, help=str_help)
53     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54     parser.add_argument("--updates", choices=["single", "separate"],
55                         default=["separate"], help=str_help)
56     str_help = "Base prefix IP address for prefix generation"
57     parser.add_argument("--firstprefix", default="8.0.1.0",
58                         type=ipaddr.IPv4Address, help=str_help)
59     str_help = "The prefix length."
60     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61     str_help = "Listen for connection, instead of initiating it."
62     parser.add_argument("--listen", action="store_true", help=str_help)
63     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64                 "Default value only suitable for listening.")
65     parser.add_argument("--myip", default="0.0.0.0",
66                         type=ipaddr.IPv4Address, help=str_help)
67     str_help = ("TCP port to bind to when listening or initiating connection." +
68                 "Default only suitable for initiating.")
69     parser.add_argument("--myport", default="0", type=int, help=str_help)
70     str_help = "The IP of the next hop to be placed into the update messages."
71     parser.add_argument("--nexthop", default="192.0.2.1",
72                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73     str_help = "Identifier of the route originator."
74     parser.add_argument("--originator", default=None,
75                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
76     str_help = "Cluster list item identifier."
77     parser.add_argument("--cluster", default=None,
78                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79     str_help = ("Numeric IP Address to try to connect to." +
80                 "Currently no effect in listening mode.")
81     parser.add_argument("--peerip", default="127.0.0.2",
82                         type=ipaddr.IPv4Address, help=str_help)
83     str_help = "TCP port to try to connect to. No effect in listening mode."
84     parser.add_argument("--peerport", default="179", type=int, help=str_help)
85     str_help = "Local hold time."
86     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87     str_help = "Log level (--error, --warning, --info, --debug)"
88     parser.add_argument("--error", dest="loglevel", action="store_const",
89                         const=logging.ERROR, default=logging.INFO,
90                         help=str_help)
91     parser.add_argument("--warning", dest="loglevel", action="store_const",
92                         const=logging.WARNING, default=logging.INFO,
93                         help=str_help)
94     parser.add_argument("--info", dest="loglevel", action="store_const",
95                         const=logging.INFO, default=logging.INFO,
96                         help=str_help)
97     parser.add_argument("--debug", dest="loglevel", action="store_const",
98                         const=logging.DEBUG, default=logging.INFO,
99                         help=str_help)
100     str_help = "Log file name"
101     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102     str_help = "Trailing part of the csv result files for plotting purposes"
103     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104     str_help = "Minimum number of updates to reach to include result into csv."
105     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
108     str_help = "Link-State NLRI supported"
109     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
110     str_help = "Link-State NLRI: Identifier"
111     parser.add_argument("-lsid", default="1", type=int, help=str_help)
112     str_help = "Link-State NLRI: Tunnel ID"
113     parser.add_argument("-lstid", default="1", type=int, help=str_help)
114     str_help = "Link-State NLRI: LSP ID"
115     parser.add_argument("-lspid", default="1", type=int, help=str_help)
116     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
117     parser.add_argument("--lstsaddr", default="1.2.3.4",
118                         type=ipaddr.IPv4Address, help=str_help)
119     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
120     parser.add_argument("--lsteaddr", default="5.6.7.8",
121                         type=ipaddr.IPv4Address, help=str_help)
122     str_help = "Link-State NLRI: Identifier Step"
123     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
124     str_help = "Link-State NLRI: Tunnel ID Step"
125     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
126     str_help = "Link-State NLRI: LSP ID Step"
127     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
128     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
129     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
130     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
131     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
132     str_help = "How many play utilities are to be started."
133     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
134     arguments = parser.parse_args()
135     if arguments.multiplicity < 1:
136         print "Multiplicity", arguments.multiplicity, "is not positive."
137         raise SystemExit(1)
138     # TODO: Are sanity checks (such as asnumber>=0) required?
139     return arguments
140
141
142 def establish_connection(arguments):
143     """Establish connection to BGP peer.
144
145     Arguments:
146         :arguments: following command-line argumets are used
147             - arguments.myip: local IP address
148             - arguments.myport: local port
149             - arguments.peerip: remote IP address
150             - arguments.peerport: remote port
151     Returns:
152         :return: socket.
153     """
154     if arguments.listen:
155         logger.info("Connecting in the listening mode.")
156         logger.debug("Local IP address: " + str(arguments.myip))
157         logger.debug("Local port: " + str(arguments.myport))
158         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
159         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
160         # bind need single tuple as argument
161         listening_socket.bind((str(arguments.myip), arguments.myport))
162         listening_socket.listen(1)
163         bgp_socket, _ = listening_socket.accept()
164         # TODO: Verify client IP is cotroller IP.
165         listening_socket.close()
166     else:
167         logger.info("Connecting in the talking mode.")
168         logger.debug("Local IP address: " + str(arguments.myip))
169         logger.debug("Local port: " + str(arguments.myport))
170         logger.debug("Remote IP address: " + str(arguments.peerip))
171         logger.debug("Remote port: " + str(arguments.peerport))
172         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
173         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
174         # bind to force specified address and port
175         talking_socket.bind((str(arguments.myip), arguments.myport))
176         # socket does not spead ipaddr, hence str()
177         talking_socket.connect((str(arguments.peerip), arguments.peerport))
178         bgp_socket = talking_socket
179     logger.info("Connected to ODL.")
180     return bgp_socket
181
182
183 def get_short_int_from_message(message, offset=16):
184     """Extract 2-bytes number from provided message.
185
186     Arguments:
187         :message: given message
188         :offset: offset of the short_int inside the message
189     Returns:
190         :return: required short_inf value.
191     Notes:
192         default offset value is the BGP message size offset.
193     """
194     high_byte_int = ord(message[offset])
195     low_byte_int = ord(message[offset + 1])
196     short_int = high_byte_int * 256 + low_byte_int
197     return short_int
198
199
200 def get_prefix_list_from_hex(prefixes_hex):
201     """Get decoded list of prefixes (rfc4271#section-4.3)
202
203     Arguments:
204         :prefixes_hex: list of prefixes to be decoded in hex
205     Returns:
206         :return: list of prefixes in the form of ip address (X.X.X.X/X)
207     """
208     prefix_list = []
209     offset = 0
210     while offset < len(prefixes_hex):
211         prefix_bit_len_hex = prefixes_hex[offset]
212         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
213         prefix_len = ((prefix_bit_len - 1) / 8) + 1
214         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
215         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
216         offset += 1 + prefix_len
217         prefix_list.append(prefix + "/" + str(prefix_bit_len))
218     return prefix_list
219
220
221 class MessageError(ValueError):
222     """Value error with logging optimized for hexlified messages."""
223
224     def __init__(self, text, message, *args):
225         """Initialisation.
226
227         Store and call super init for textual comment,
228         store raw message which caused it.
229         """
230         self.text = text
231         self.msg = message
232         super(MessageError, self).__init__(text, message, *args)
233
234     def __str__(self):
235         """Generate human readable error message.
236
237         Returns:
238             :return: human readable message as string
239         Notes:
240             Use a placeholder string if the message is to be empty.
241         """
242         message = binascii.hexlify(self.msg)
243         if message == "":
244             message = "(empty message)"
245         return self.text + ": " + message
246
247
248 def read_open_message(bgp_socket):
249     """Receive peer's OPEN message
250
251     Arguments:
252         :bgp_socket: the socket to be read
253     Returns:
254         :return: received OPEN message.
255     Notes:
256         Performs just basic incomming message checks
257     """
258     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
259     # TODO: Can the incoming open message be split in more than one packet?
260     # Some validation.
261     if len(msg_in) < 37:
262         # 37 is minimal length of open message with 4-byte AS number.
263         error_msg = (
264             "Message length (" + str(len(msg_in)) + ") is smaller than "
265             "minimal length of OPEN message with 4-byte AS number (37)"
266         )
267         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
268         raise MessageError(error_msg, msg_in)
269     # TODO: We could check BGP marker, but it is defined only later;
270     # decide what to do.
271     reported_length = get_short_int_from_message(msg_in)
272     if len(msg_in) != reported_length:
273         error_msg = (
274             "Expected message length (" + reported_length +
275             ") does not match actual length (" + str(len(msg_in)) + ")"
276         )
277         logger.error(error_msg + binascii.hexlify(msg_in))
278         raise MessageError(error_msg, msg_in)
279     logger.info("Open message received.")
280     return msg_in
281
282
283 class MessageGenerator(object):
284     """Class which generates messages, holds states and configuration values."""
285
286     # TODO: Define bgp marker as a class (constant) variable.
287     def __init__(self, args):
288         """Initialisation according to command-line args.
289
290         Arguments:
291             :args: argsparser's Namespace object which contains command-line
292                 options for MesageGenerator initialisation
293         Notes:
294             Calculates and stores default values used later on for
295             message geeration.
296         """
297         self.total_prefix_amount = args.amount
298         # Number of update messages left to be sent.
299         self.remaining_prefixes = self.total_prefix_amount
300
301         # New parameters initialisation
302         self.iteration = 0
303         self.prefix_base_default = args.firstprefix
304         self.prefix_length_default = args.prefixlen
305         self.wr_prefixes_default = []
306         self.nlri_prefixes_default = []
307         self.version_default = 4
308         self.my_autonomous_system_default = args.asnumber
309         self.hold_time_default = args.holdtime  # Local hold time.
310         self.bgp_identifier_default = int(args.myip)
311         self.next_hop_default = args.nexthop
312         self.originator_id_default = args.originator
313         self.cluster_list_item_default = args.cluster
314         self.single_update_default = args.updates == "single"
315         self.randomize_updates_default = args.updates == "random"
316         self.prefix_count_to_add_default = args.insert
317         self.prefix_count_to_del_default = args.withdraw
318         if self.prefix_count_to_del_default < 0:
319             self.prefix_count_to_del_default = 0
320         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
321             # total number of prefixes must grow to avoid infinite test loop
322             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
323         self.slot_size_default = self.prefix_count_to_add_default
324         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
325         self.results_file_name_default = args.results
326         self.performance_threshold_default = args.threshold
327         self.rfc4760 = args.rfc4760
328         self.bgpls = args.bgpls
329         # Default values when BGP-LS Attributes are used
330         if self.bgpls:
331             self.prefix_count_to_add_default = 1
332             self.prefix_count_to_del_default = 0
333         self.ls_nlri_default = {"Identifier": args.lsid,
334                                 "TunnelID": args.lstid,
335                                 "LSPID": args.lspid,
336                                 "IPv4TunnelSenderAddress": args.lstsaddr,
337                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
338         self.lsid_step = args.lsidstep
339         self.lstid_step = args.lstidstep
340         self.lspid_step = args.lspidstep
341         self.lstsaddr_step = args.lstsaddrstep
342         self.lsteaddr_step = args.lsteaddrstep
343         # Default values used for randomized part
344         s1_slots = ((self.total_prefix_amount -
345                      self.remaining_prefixes_threshold - 1) /
346                     self.prefix_count_to_add_default + 1)
347         s2_slots = ((self.remaining_prefixes_threshold - 1) /
348                     (self.prefix_count_to_add_default -
349                     self.prefix_count_to_del_default) + 1)
350         # S1_First_Index = 0
351         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
352         s2_first_index = s1_slots * self.prefix_count_to_add_default
353         s2_last_index = (s2_first_index +
354                          s2_slots * (self.prefix_count_to_add_default -
355                                      self.prefix_count_to_del_default) - 1)
356         self.slot_gap_default = ((self.total_prefix_amount -
357                                   self.remaining_prefixes_threshold - 1) /
358                                  self.prefix_count_to_add_default + 1)
359         self.randomize_lowest_default = s2_first_index
360         self.randomize_highest_default = s2_last_index
361         # Initialising counters
362         self.phase1_start_time = 0
363         self.phase1_stop_time = 0
364         self.phase2_start_time = 0
365         self.phase2_stop_time = 0
366         self.phase1_updates_sent = 0
367         self.phase2_updates_sent = 0
368         self.updates_sent = 0
369
370         self.log_info = args.loglevel <= logging.INFO
371         self.log_debug = args.loglevel <= logging.DEBUG
372         """
373         Flags needed for the MessageGenerator performance optimization.
374         Calling logger methods each iteration even with proper log level set
375         slows down significantly the MessageGenerator performance.
376         Measured total generation time (1M updates, dry run, error log level):
377         - logging based on basic logger features: 36,2s
378         - logging based on advanced logger features (lazy logging): 21,2s
379         - conditional calling of logger methods enclosed inside condition: 8,6s
380         """
381
382         logger.info("Generator initialisation")
383         logger.info("  Target total number of prefixes to be introduced: " +
384                     str(self.total_prefix_amount))
385         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
386                     str(self.prefix_length_default))
387         logger.info("  My Autonomous System number: " +
388                     str(self.my_autonomous_system_default))
389         logger.info("  My Hold Time: " + str(self.hold_time_default))
390         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
391         logger.info("  Next Hop: " + str(self.next_hop_default))
392         logger.info("  Originator ID: " + str(self.originator_id_default))
393         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
394         logger.info("  Prefix count to be inserted at once: " +
395                     str(self.prefix_count_to_add_default))
396         logger.info("  Prefix count to be withdrawn at once: " +
397                     str(self.prefix_count_to_del_default))
398         logger.info("  Fast pre-fill up to " +
399                     str(self.total_prefix_amount -
400                         self.remaining_prefixes_threshold) + " prefixes")
401         logger.info("  Remaining number of prefixes to be processed " +
402                     "in parallel with withdrawals: " +
403                     str(self.remaining_prefixes_threshold))
404         logger.debug("  Prefix index range used after pre-fill procedure [" +
405                      str(self.randomize_lowest_default) + ", " +
406                      str(self.randomize_highest_default) + "]")
407         if self.single_update_default:
408             logger.info("  Common single UPDATE will be generated " +
409                         "for both NLRI & WITHDRAWN lists")
410         else:
411             logger.info("  Two separate UPDATEs will be generated " +
412                         "for each NLRI & WITHDRAWN lists")
413         if self.randomize_updates_default:
414             logger.info("  Generation of UPDATE messages will be randomized")
415         logger.info("  Let\'s go ...\n")
416
417         # TODO: Notification for hold timer expiration can be handy.
418
419     def store_results(self, file_name=None, threshold=None):
420         """ Stores specified results into files based on file_name value.
421
422         Arguments:
423             :param file_name: Trailing (common) part of result file names
424             :param threshold: Minimum number of sent updates needed for each
425                               result to be included into result csv file
426                               (mainly needed because of the result accuracy)
427         Returns:
428             :return: n/a
429         """
430         # default values handling
431         # TODO optimize default values handling (use e.g. dicionary.update() approach)
432         if file_name is None:
433             file_name = self.results_file_name_default
434         if threshold is None:
435             threshold = self.performance_threshold_default
436         # performance calculation
437         if self.phase1_updates_sent >= threshold:
438             totals1 = self.phase1_updates_sent
439             performance1 = int(self.phase1_updates_sent /
440                                (self.phase1_stop_time - self.phase1_start_time))
441         else:
442             totals1 = None
443             performance1 = None
444         if self.phase2_updates_sent >= threshold:
445             totals2 = self.phase2_updates_sent
446             performance2 = int(self.phase2_updates_sent /
447                                (self.phase2_stop_time - self.phase2_start_time))
448         else:
449             totals2 = None
450             performance2 = None
451
452         logger.info("#" * 10 + " Final results " + "#" * 10)
453         logger.info("Number of iterations: " + str(self.iteration))
454         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
455                     str(self.phase1_updates_sent))
456         logger.info("The pre-fill phase duration: " +
457                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
458         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
459                     str(self.phase2_updates_sent))
460         logger.info("The 2nd test phase duration: " +
461                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
462         logger.info("Threshold for performance reporting: " + str(threshold))
463
464         # making labels
465         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
466                         " route(s) per UPDATE")
467         if self.single_update_default:
468             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
469                                   "/-" + str(self.prefix_count_to_del_default) +
470                                   " routes per UPDATE")
471         else:
472             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
473                                   "/-" + str(self.prefix_count_to_del_default) +
474                                   " routes in two UPDATEs")
475         # collecting capacity and performance results
476         totals = {}
477         performance = {}
478         if totals1 is not None:
479             totals[phase1_label] = totals1
480             performance[phase1_label] = performance1
481         if totals2 is not None:
482             totals[phase2_label] = totals2
483             performance[phase2_label] = performance2
484         self.write_results_to_file(totals, "totals-" + file_name)
485         self.write_results_to_file(performance, "performance-" + file_name)
486
487     def write_results_to_file(self, results, file_name):
488         """Writes results to the csv plot file consumable by Jenkins.
489
490         Arguments:
491             :param file_name: Name of the (csv) file to be created
492         Returns:
493             :return: none
494         """
495         first_line = ""
496         second_line = ""
497         f = open(file_name, "wt")
498         try:
499             for key in sorted(results):
500                 first_line += key + ", "
501                 second_line += str(results[key]) + ", "
502             first_line = first_line[:-2]
503             second_line = second_line[:-2]
504             f.write(first_line + "\n")
505             f.write(second_line + "\n")
506             logger.info("Message generator performance results stored in " +
507                         file_name + ":")
508             logger.info("  " + first_line)
509             logger.info("  " + second_line)
510         finally:
511             f.close()
512
513     # Return pseudo-randomized (reproducible) index for selected range
514     def randomize_index(self, index, lowest=None, highest=None):
515         """Calculates pseudo-randomized index from selected range.
516
517         Arguments:
518             :param index: input index
519             :param lowest: the lowes index from the randomized area
520             :param highest: the highest index from the randomized area
521         Returns:
522             :return: the (pseudo)randomized index
523         Notes:
524             Created just as a fame for future generator enhancement.
525         """
526         # default values handling
527         # TODO optimize default values handling (use e.g. dicionary.update() approach)
528         if lowest is None:
529             lowest = self.randomize_lowest_default
530         if highest is None:
531             highest = self.randomize_highest_default
532         # randomize
533         if (index >= lowest) and (index <= highest):
534             # we are in the randomized range -> shuffle it inside
535             # the range (now just reverse the order)
536             new_index = highest - (index - lowest)
537         else:
538             # we are out of the randomized range -> nothing to do
539             new_index = index
540         return new_index
541
542     def get_ls_nlri_values(self, index):
543         """Generates LS-NLRI parameters.
544         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
545
546         Arguments:
547             :param index: index (iteration)
548         Returns:
549             :return: dictionary of LS NLRI parameters and values
550         """
551         # generating list of LS NLRI parameters
552         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
553         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
554         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
555         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
556         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
557         ls_nlri_values = {"Identifier": identifier,
558                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
559                           "TunnelID": tunnel_id, "LSPID": lsp_id,
560                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
561         return ls_nlri_values
562
563     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
564                         prefix_len=None, prefix_count=None, randomize=None):
565         """Generates list of IP address prefixes.
566
567         Arguments:
568             :param slot_index: index of group of prefix addresses
569             :param slot_size: size of group of prefix addresses
570                 in [number of included prefixes]
571             :param prefix_base: IP address of the first prefix
572                 (slot_index = 0, prefix_index = 0)
573             :param prefix_len: length of the prefix in bites
574                 (the same as size of netmask)
575             :param prefix_count: number of prefixes to be returned
576                 from the specified slot
577         Returns:
578             :return: list of generated IP address prefixes
579         """
580         # default values handling
581         # TODO optimize default values handling (use e.g. dicionary.update() approach)
582         if slot_size is None:
583             slot_size = self.slot_size_default
584         if prefix_base is None:
585             prefix_base = self.prefix_base_default
586         if prefix_len is None:
587             prefix_len = self.prefix_length_default
588         if prefix_count is None:
589             prefix_count = slot_size
590         if randomize is None:
591             randomize = self.randomize_updates_default
592         # generating list of prefixes
593         indexes = []
594         prefixes = []
595         prefix_gap = 2 ** (32 - prefix_len)
596         for i in range(prefix_count):
597             prefix_index = slot_index * slot_size + i
598             if randomize:
599                 prefix_index = self.randomize_index(prefix_index)
600             indexes.append(prefix_index)
601             prefixes.append(prefix_base + prefix_index * prefix_gap)
602         if self.log_debug:
603             logger.debug("  Prefix slot index: " + str(slot_index))
604             logger.debug("  Prefix slot size: " + str(slot_size))
605             logger.debug("  Prefix count: " + str(prefix_count))
606             logger.debug("  Prefix indexes: " + str(indexes))
607             logger.debug("  Prefix list: " + str(prefixes))
608         return prefixes
609
610     def compose_update_message(self, prefix_count_to_add=None,
611                                prefix_count_to_del=None):
612         """Composes an UPDATE message
613
614         Arguments:
615             :param prefix_count_to_add: # of prefixes to put into NLRI list
616             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
617         Returns:
618             :return: encoded UPDATE message in HEX
619         Notes:
620             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
621             lists or common message wich includes both prefix lists.
622             Updates global counters.
623         """
624         # default values handling
625         # TODO optimize default values handling (use e.g. dicionary.update() approach)
626         if prefix_count_to_add is None:
627             prefix_count_to_add = self.prefix_count_to_add_default
628         if prefix_count_to_del is None:
629             prefix_count_to_del = self.prefix_count_to_del_default
630         # logging
631         if self.log_info and not (self.iteration % 1000):
632             logger.info("Iteration: " + str(self.iteration) +
633                         " - total remaining prefixes: " +
634                         str(self.remaining_prefixes))
635         if self.log_debug:
636             logger.debug("#" * 10 + " Iteration: " +
637                          str(self.iteration) + " " + "#" * 10)
638             logger.debug("Remaining prefixes: " +
639                          str(self.remaining_prefixes))
640         # scenario type & one-shot counter
641         straightforward_scenario = (self.remaining_prefixes >
642                                     self.remaining_prefixes_threshold)
643         if straightforward_scenario:
644             prefix_count_to_del = 0
645             if self.log_debug:
646                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
647             if not self.phase1_start_time:
648                 self.phase1_start_time = time.time()
649         else:
650             if self.log_debug:
651                 logger.debug("--- COMBINED SCENARIO ---")
652             if not self.phase2_start_time:
653                 self.phase2_start_time = time.time()
654         # tailor the number of prefixes if needed
655         prefix_count_to_add = (prefix_count_to_del +
656                                min(prefix_count_to_add - prefix_count_to_del,
657                                    self.remaining_prefixes))
658         # prefix slots selection for insertion and withdrawal
659         slot_index_to_add = self.iteration
660         slot_index_to_del = slot_index_to_add - self.slot_gap_default
661         # getting lists of prefixes for insertion in this iteration
662         if self.log_debug:
663             logger.debug("Prefixes to be inserted in this iteration:")
664         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
665                                                   prefix_count=prefix_count_to_add)
666         # getting lists of prefixes for withdrawal in this iteration
667         if self.log_debug:
668             logger.debug("Prefixes to be withdrawn in this iteration:")
669         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
670                                                   prefix_count=prefix_count_to_del)
671         # generating the UPDATE mesage with LS-NLRI only
672         if self.bgpls:
673             ls_nlri = self.get_ls_nlri_values(self.iteration)
674             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
675                                           **ls_nlri)
676         else:
677             # generating the UPDATE message with prefix lists
678             if self.single_update_default:
679                 # Send prefixes to be introduced and withdrawn
680                 # in one UPDATE message
681                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
682                                               nlri_prefixes=prefix_list_to_add)
683             else:
684                 # Send prefixes to be introduced and withdrawn
685                 # in separate UPDATE messages (if needed)
686                 msg_out = self.update_message(wr_prefixes=[],
687                                               nlri_prefixes=prefix_list_to_add)
688                 if prefix_count_to_del:
689                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
690                                                    nlri_prefixes=[])
691         # updating counters - who knows ... maybe I am last time here ;)
692         if straightforward_scenario:
693             self.phase1_stop_time = time.time()
694             self.phase1_updates_sent = self.updates_sent
695         else:
696             self.phase2_stop_time = time.time()
697             self.phase2_updates_sent = (self.updates_sent -
698                                         self.phase1_updates_sent)
699         # updating totals for the next iteration
700         self.iteration += 1
701         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
702         # returning the encoded message
703         return msg_out
704
705     # Section of message encoders
706
707     def open_message(self, version=None, my_autonomous_system=None,
708                      hold_time=None, bgp_identifier=None):
709         """Generates an OPEN Message (rfc4271#section-4.2)
710
711         Arguments:
712             :param version: see the rfc4271#section-4.2
713             :param my_autonomous_system: see the rfc4271#section-4.2
714             :param hold_time: see the rfc4271#section-4.2
715             :param bgp_identifier: see the rfc4271#section-4.2
716         Returns:
717             :return: encoded OPEN message in HEX
718         """
719
720         # default values handling
721         # TODO optimize default values handling (use e.g. dicionary.update() approach)
722         if version is None:
723             version = self.version_default
724         if my_autonomous_system is None:
725             my_autonomous_system = self.my_autonomous_system_default
726         if hold_time is None:
727             hold_time = self.hold_time_default
728         if bgp_identifier is None:
729             bgp_identifier = self.bgp_identifier_default
730
731         # Marker
732         marker_hex = "\xFF" * 16
733
734         # Type
735         type = 1
736         type_hex = struct.pack("B", type)
737
738         # version
739         version_hex = struct.pack("B", version)
740
741         # my_autonomous_system
742         # AS_TRANS value, 23456 decadic.
743         my_autonomous_system_2_bytes = 23456
744         # AS number is mappable to 2 bytes
745         if my_autonomous_system < 65536:
746             my_autonomous_system_2_bytes = my_autonomous_system
747         my_autonomous_system_hex_2_bytes = struct.pack(">H",
748                                                        my_autonomous_system)
749
750         # Hold Time
751         hold_time_hex = struct.pack(">H", hold_time)
752
753         # BGP Identifier
754         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
755
756         # Optional Parameters
757         optional_parameters_hex = ""
758         if self.rfc4760:
759             optional_parameter_hex = (
760                 "\x02"  # Param type ("Capability Ad")
761                 "\x06"  # Length (6 bytes)
762                 "\x01"  # Capability type (NLRI Unicast),
763                         # see RFC 4760, secton 8
764                 "\x04"  # Capability value length
765                 "\x00\x01"  # AFI (Ipv4)
766                 "\x00"  # (reserved)
767                 "\x01"  # SAFI (Unicast)
768             )
769             optional_parameters_hex += optional_parameter_hex
770
771         if self.bgpls:
772             optional_parameter_hex = (
773                 "\x02"  # Param type ("Capability Ad")
774                 "\x06"  # Length (6 bytes)
775                 "\x01"  # Capability type (NLRI Unicast),
776                         # see RFC 4760, secton 8
777                 "\x04"  # Capability value length
778                 "\x40\x04"  # AFI (BGP-LS)
779                 "\x00"  # (reserved)
780                 "\x47"  # SAFI (BGP-LS)
781             )
782             optional_parameters_hex += optional_parameter_hex
783
784         optional_parameter_hex = (
785             "\x02"  # Param type ("Capability Ad")
786             "\x06"  # Length (6 bytes)
787             "\x41"  # "32 bit AS Numbers Support"
788                     # (see RFC 6793, section 3)
789             "\x04"  # Capability value length
790         )
791         optional_parameter_hex += (
792             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
793         )
794         optional_parameters_hex += optional_parameter_hex
795
796         # Optional Parameters Length
797         optional_parameters_length = len(optional_parameters_hex)
798         optional_parameters_length_hex = struct.pack("B",
799                                                      optional_parameters_length)
800
801         # Length (big-endian)
802         length = (
803             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
804             len(my_autonomous_system_hex_2_bytes) +
805             len(hold_time_hex) + len(bgp_identifier_hex) +
806             len(optional_parameters_length_hex) +
807             len(optional_parameters_hex)
808         )
809         length_hex = struct.pack(">H", length)
810
811         # OPEN Message
812         message_hex = (
813             marker_hex +
814             length_hex +
815             type_hex +
816             version_hex +
817             my_autonomous_system_hex_2_bytes +
818             hold_time_hex +
819             bgp_identifier_hex +
820             optional_parameters_length_hex +
821             optional_parameters_hex
822         )
823
824         if self.log_debug:
825             logger.debug("OPEN message encoding")
826             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
827             logger.debug("  Length=" + str(length) + " (0x" +
828                          binascii.hexlify(length_hex) + ")")
829             logger.debug("  Type=" + str(type) + " (0x" +
830                          binascii.hexlify(type_hex) + ")")
831             logger.debug("  Version=" + str(version) + " (0x" +
832                          binascii.hexlify(version_hex) + ")")
833             logger.debug("  My Autonomous System=" +
834                          str(my_autonomous_system_2_bytes) + " (0x" +
835                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
836                          ")")
837             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
838                          binascii.hexlify(hold_time_hex) + ")")
839             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
840                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
841             logger.debug("  Optional Parameters Length=" +
842                          str(optional_parameters_length) + " (0x" +
843                          binascii.hexlify(optional_parameters_length_hex) +
844                          ")")
845             logger.debug("  Optional Parameters=0x" +
846                          binascii.hexlify(optional_parameters_hex))
847             logger.debug("OPEN message encoded: 0x%s",
848                          binascii.b2a_hex(message_hex))
849
850         return message_hex
851
852     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
853                        wr_prefix_length=None, nlri_prefix_length=None,
854                        my_autonomous_system=None, next_hop=None,
855                        originator_id=None, cluster_list_item=None,
856                        end_of_rib=False, **ls_nlri_params):
857         """Generates an UPDATE Message (rfc4271#section-4.3)
858
859         Arguments:
860             :param wr_prefixes: see the rfc4271#section-4.3
861             :param nlri_prefixes: see the rfc4271#section-4.3
862             :param wr_prefix_length: see the rfc4271#section-4.3
863             :param nlri_prefix_length: see the rfc4271#section-4.3
864             :param my_autonomous_system: see the rfc4271#section-4.3
865             :param next_hop: see the rfc4271#section-4.3
866         Returns:
867             :return: encoded UPDATE message in HEX
868         """
869
870         # default values handling
871         # TODO optimize default values handling (use e.g. dicionary.update() approach)
872         if wr_prefixes is None:
873             wr_prefixes = self.wr_prefixes_default
874         if nlri_prefixes is None:
875             nlri_prefixes = self.nlri_prefixes_default
876         if wr_prefix_length is None:
877             wr_prefix_length = self.prefix_length_default
878         if nlri_prefix_length is None:
879             nlri_prefix_length = self.prefix_length_default
880         if my_autonomous_system is None:
881             my_autonomous_system = self.my_autonomous_system_default
882         if next_hop is None:
883             next_hop = self.next_hop_default
884         if originator_id is None:
885             originator_id = self.originator_id_default
886         if cluster_list_item is None:
887             cluster_list_item = self.cluster_list_item_default
888         ls_nlri = self.ls_nlri_default.copy()
889         ls_nlri.update(ls_nlri_params)
890
891         # Marker
892         marker_hex = "\xFF" * 16
893
894         # Type
895         type = 2
896         type_hex = struct.pack("B", type)
897
898         # Withdrawn Routes
899         withdrawn_routes_hex = ""
900         if not self.bgpls:
901             bytes = ((wr_prefix_length - 1) / 8) + 1
902             for prefix in wr_prefixes:
903                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
904                                        struct.pack(">I", int(prefix))[:bytes])
905                 withdrawn_routes_hex += withdrawn_route_hex
906
907         # Withdrawn Routes Length
908         withdrawn_routes_length = len(withdrawn_routes_hex)
909         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
910
911         # TODO: to replace hardcoded string by encoding?
912         # Path Attributes
913         path_attributes_hex = ""
914         if nlri_prefixes != []:
915             path_attributes_hex += (
916                 "\x40"  # Flags ("Well-Known")
917                 "\x01"  # Type (ORIGIN)
918                 "\x01"  # Length (1)
919                 "\x00"  # Origin: IGP
920             )
921             path_attributes_hex += (
922                 "\x40"  # Flags ("Well-Known")
923                 "\x02"  # Type (AS_PATH)
924                 "\x06"  # Length (6)
925                 "\x02"  # AS segment type (AS_SEQUENCE)
926                 "\x01"  # AS segment length (1)
927             )
928             my_as_hex = struct.pack(">I", my_autonomous_system)
929             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
930             path_attributes_hex += (
931                 "\x40"  # Flags ("Well-Known")
932                 "\x03"  # Type (NEXT_HOP)
933                 "\x04"  # Length (4)
934             )
935             next_hop_hex = struct.pack(">I", int(next_hop))
936             path_attributes_hex += (
937                 next_hop_hex  # IP address of the next hop (4 bytes)
938             )
939             if originator_id is not None:
940                 path_attributes_hex += (
941                     "\x80"  # Flags ("Optional, non-transitive")
942                     "\x09"  # Type (ORIGINATOR_ID)
943                     "\x04"  # Length (4)
944                 )           # ORIGINATOR_ID (4 bytes)
945                 path_attributes_hex += struct.pack(">I", int(originator_id))
946             if cluster_list_item is not None:
947                 path_attributes_hex += (
948                     "\x80"  # Flags ("Optional, non-transitive")
949                     "\x09"  # Type (CLUSTER_LIST)
950                     "\x04"  # Length (4)
951                 )           # one CLUSTER_LIST item (4 bytes)
952                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
953
954         if self.bgpls and not end_of_rib:
955             path_attributes_hex += (
956                 "\x80"  # Flags ("Optional, non-transitive")
957                 "\x0e"  # Type (MP_REACH_NLRI)
958                 "\x22"  # Length (34)
959                 "\x40\x04"  # AFI (BGP-LS)
960                 "\x47"  # SAFI (BGP-LS)
961                 "\x04"  # Next Hop Length (4)
962             )
963             path_attributes_hex += struct.pack(">I", int(next_hop))
964             path_attributes_hex += "\x00"           # Reserved
965             path_attributes_hex += (
966                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
967                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
968                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
969             )
970             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
971             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
972             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
973             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
974             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
975
976         # Total Path Attributes Length
977         total_path_attributes_length = len(path_attributes_hex)
978         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
979
980         # Network Layer Reachability Information
981         nlri_hex = ""
982         if not self.bgpls:
983             bytes = ((nlri_prefix_length - 1) / 8) + 1
984             for prefix in nlri_prefixes:
985                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
986                                    struct.pack(">I", int(prefix))[:bytes])
987                 nlri_hex += nlri_prefix_hex
988
989         # Length (big-endian)
990         length = (
991             len(marker_hex) + 2 + len(type_hex) +
992             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
993             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
994             len(nlri_hex))
995         length_hex = struct.pack(">H", length)
996
997         # UPDATE Message
998         message_hex = (
999             marker_hex +
1000             length_hex +
1001             type_hex +
1002             withdrawn_routes_length_hex +
1003             withdrawn_routes_hex +
1004             total_path_attributes_length_hex +
1005             path_attributes_hex +
1006             nlri_hex
1007         )
1008
1009         if self.log_debug:
1010             logger.debug("UPDATE message encoding")
1011             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1012             logger.debug("  Length=" + str(length) + " (0x" +
1013                          binascii.hexlify(length_hex) + ")")
1014             logger.debug("  Type=" + str(type) + " (0x" +
1015                          binascii.hexlify(type_hex) + ")")
1016             logger.debug("  withdrawn_routes_length=" +
1017                          str(withdrawn_routes_length) + " (0x" +
1018                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1019             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1020                          str(wr_prefix_length) + " (0x" +
1021                          binascii.hexlify(withdrawn_routes_hex) + ")")
1022             if total_path_attributes_length:
1023                 logger.debug("  Total Path Attributes Length=" +
1024                              str(total_path_attributes_length) + " (0x" +
1025                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1026                 logger.debug("  Path Attributes=" + "(0x" +
1027                              binascii.hexlify(path_attributes_hex) + ")")
1028                 logger.debug("    Origin=IGP")
1029                 logger.debug("    AS path=" + str(my_autonomous_system))
1030                 logger.debug("    Next hop=" + str(next_hop))
1031                 if originator_id is not None:
1032                     logger.debug("    Originator id=" + str(originator_id))
1033                 if cluster_list_item is not None:
1034                     logger.debug("    Cluster list=" + str(cluster_list_item))
1035                 if self.bgpls:
1036                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1037             logger.debug("  Network Layer Reachability Information=" +
1038                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1039                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1040             logger.debug("UPDATE message encoded: 0x" +
1041                          binascii.b2a_hex(message_hex))
1042
1043         # updating counter
1044         self.updates_sent += 1
1045         # returning encoded message
1046         return message_hex
1047
1048     def notification_message(self, error_code, error_subcode, data_hex=""):
1049         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1050
1051         Arguments:
1052             :param error_code: see the rfc4271#section-4.5
1053             :param error_subcode: see the rfc4271#section-4.5
1054             :param data_hex: see the rfc4271#section-4.5
1055         Returns:
1056             :return: encoded NOTIFICATION message in HEX
1057         """
1058
1059         # Marker
1060         marker_hex = "\xFF" * 16
1061
1062         # Type
1063         type = 3
1064         type_hex = struct.pack("B", type)
1065
1066         # Error Code
1067         error_code_hex = struct.pack("B", error_code)
1068
1069         # Error Subode
1070         error_subcode_hex = struct.pack("B", error_subcode)
1071
1072         # Length (big-endian)
1073         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1074                   len(error_subcode_hex) + len(data_hex))
1075         length_hex = struct.pack(">H", length)
1076
1077         # NOTIFICATION Message
1078         message_hex = (
1079             marker_hex +
1080             length_hex +
1081             type_hex +
1082             error_code_hex +
1083             error_subcode_hex +
1084             data_hex
1085         )
1086
1087         if self.log_debug:
1088             logger.debug("NOTIFICATION message encoding")
1089             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1090             logger.debug("  Length=" + str(length) + " (0x" +
1091                          binascii.hexlify(length_hex) + ")")
1092             logger.debug("  Type=" + str(type) + " (0x" +
1093                          binascii.hexlify(type_hex) + ")")
1094             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1095                          binascii.hexlify(error_code_hex) + ")")
1096             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1097                          binascii.hexlify(error_subcode_hex) + ")")
1098             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1099             logger.debug("NOTIFICATION message encoded: 0x%s",
1100                          binascii.b2a_hex(message_hex))
1101
1102         return message_hex
1103
1104     def keepalive_message(self):
1105         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1106
1107         Returns:
1108             :return: encoded KEEP ALIVE message in HEX
1109         """
1110
1111         # Marker
1112         marker_hex = "\xFF" * 16
1113
1114         # Type
1115         type = 4
1116         type_hex = struct.pack("B", type)
1117
1118         # Length (big-endian)
1119         length = len(marker_hex) + 2 + len(type_hex)
1120         length_hex = struct.pack(">H", length)
1121
1122         # KEEP ALIVE Message
1123         message_hex = (
1124             marker_hex +
1125             length_hex +
1126             type_hex
1127         )
1128
1129         if self.log_debug:
1130             logger.debug("KEEP ALIVE message encoding")
1131             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1132             logger.debug("  Length=" + str(length) + " (0x" +
1133                          binascii.hexlify(length_hex) + ")")
1134             logger.debug("  Type=" + str(type) + " (0x" +
1135                          binascii.hexlify(type_hex) + ")")
1136             logger.debug("KEEP ALIVE message encoded: 0x%s",
1137                          binascii.b2a_hex(message_hex))
1138
1139         return message_hex
1140
1141
1142 class TimeTracker(object):
1143     """Class for tracking timers, both for my keepalives and
1144     peer's hold time.
1145     """
1146
1147     def __init__(self, msg_in):
1148         """Initialisation. based on defaults and OPEN message from peer.
1149
1150         Arguments:
1151             msg_in: the OPEN message received from peer.
1152         """
1153         # Note: Relative time is always named timedelta, to stress that
1154         # the (non-delta) time is absolute.
1155         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1156         # Upper bound for being stuck in the same state, we should
1157         # at least report something before continuing.
1158         # Negotiate the hold timer by taking the smaller
1159         # of the 2 values (mine and the peer's).
1160         hold_timedelta = 180  # Not an attribute of self yet.
1161         # TODO: Make the default value configurable,
1162         # default value could mirror what peer said.
1163         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1164         if hold_timedelta > peer_hold_timedelta:
1165             hold_timedelta = peer_hold_timedelta
1166         if hold_timedelta != 0 and hold_timedelta < 3:
1167             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1168             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1169         self.hold_timedelta = hold_timedelta
1170         # If we do not hear from peer this long, we assume it has died.
1171         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1172         # Upper limit for duration between messages, to avoid being
1173         # declared to be dead.
1174         # The same as calling snapshot(), but also declares a field.
1175         self.snapshot_time = time.time()
1176         # Sometimes we need to store time. This is where to get
1177         # the value from afterwards. Time_keepalive may be too strict.
1178         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1179         # At this time point, peer will be declared dead.
1180         self.my_keepalive_time = None  # to be set later
1181         # At this point, we should be sending keepalive message.
1182
1183     def snapshot(self):
1184         """Store current time in instance data to use later."""
1185         # Read as time before something interesting was called.
1186         self.snapshot_time = time.time()
1187
1188     def reset_peer_hold_time(self):
1189         """Move hold time to future as peer has just proven it still lives."""
1190         self.peer_hold_time = time.time() + self.hold_timedelta
1191
1192     # Some methods could rely on self.snapshot_time, but it is better
1193     # to require user to provide it explicitly.
1194     def reset_my_keepalive_time(self, keepalive_time):
1195         """Calculate and set the next my KEEP ALIVE timeout time
1196
1197         Arguments:
1198             :keepalive_time: the initial value of the KEEP ALIVE timer
1199         """
1200         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1201
1202     def is_time_for_my_keepalive(self):
1203         """Check for my KEEP ALIVE timeout occurence"""
1204         if self.hold_timedelta == 0:
1205             return False
1206         return self.snapshot_time >= self.my_keepalive_time
1207
1208     def get_next_event_time(self):
1209         """Set the time of the next expected or to be sent KEEP ALIVE"""
1210         if self.hold_timedelta == 0:
1211             return self.snapshot_time + 86400
1212         return min(self.my_keepalive_time, self.peer_hold_time)
1213
1214     def check_peer_hold_time(self, snapshot_time):
1215         """Raise error if nothing was read from peer until specified time."""
1216         # Hold time = 0 means keepalive checking off.
1217         if self.hold_timedelta != 0:
1218             # time.time() may be too strict
1219             if snapshot_time > self.peer_hold_time:
1220                 logger.error("Peer has overstepped the hold timer.")
1221                 raise RuntimeError("Peer has overstepped the hold timer.")
1222                 # TODO: Include hold_timedelta?
1223                 # TODO: Add notification sending (attempt). That means
1224                 # move to write tracker.
1225
1226
1227 class ReadTracker(object):
1228     """Class for tracking read of mesages chunk by chunk and
1229     for idle waiting.
1230     """
1231
1232     def __init__(self, bgp_socket, timer):
1233         """The reader initialisation.
1234
1235         Arguments:
1236             bgp_socket: socket to be used for sending
1237             timer: timer to be used for scheduling
1238         """
1239         # References to outside objects.
1240         self.socket = bgp_socket
1241         self.timer = timer
1242         # BGP marker length plus length field length.
1243         self.header_length = 18
1244         # TODO: make it class (constant) attribute
1245         # Computation of where next chunk ends depends on whether
1246         # we are beyond length field.
1247         self.reading_header = True
1248         # Countdown towards next size computation.
1249         self.bytes_to_read = self.header_length
1250         # Incremental buffer for message under read.
1251         self.msg_in = ""
1252         # Initialising counters
1253         self.updates_received = 0
1254         self.prefixes_introduced = 0
1255         self.prefixes_withdrawn = 0
1256         self.rx_idle_time = 0
1257         self.rx_activity_detected = True
1258
1259     def read_message_chunk(self):
1260         """Read up to one message
1261
1262         Note:
1263             Currently it does not return anything.
1264         """
1265         # TODO: We could return the whole message, currently not needed.
1266         # We assume the socket is readable.
1267         chunk_message = self.socket.recv(self.bytes_to_read)
1268         self.msg_in += chunk_message
1269         self.bytes_to_read -= len(chunk_message)
1270         # TODO: bytes_to_read < 0 is not possible, right?
1271         if not self.bytes_to_read:
1272             # Finished reading a logical block.
1273             if self.reading_header:
1274                 # The logical block was a BGP header.
1275                 # Now we know the size of the message.
1276                 self.reading_header = False
1277                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1278                                       self.header_length)
1279             else:  # We have finished reading the body of the message.
1280                 # Peer has just proven it is still alive.
1281                 self.timer.reset_peer_hold_time()
1282                 # TODO: Do we want to count received messages?
1283                 # This version ignores the received message.
1284                 # TODO: Should we do validation and exit on anything
1285                 # besides update or keepalive?
1286                 # Prepare state for reading another message.
1287                 message_type_hex = self.msg_in[self.header_length]
1288                 if message_type_hex == "\x01":
1289                     logger.info("OPEN message received: 0x%s",
1290                                 binascii.b2a_hex(self.msg_in))
1291                 elif message_type_hex == "\x02":
1292                     logger.debug("UPDATE message received: 0x%s",
1293                                  binascii.b2a_hex(self.msg_in))
1294                     self.decode_update_message(self.msg_in)
1295                 elif message_type_hex == "\x03":
1296                     logger.info("NOTIFICATION message received: 0x%s",
1297                                 binascii.b2a_hex(self.msg_in))
1298                 elif message_type_hex == "\x04":
1299                     logger.info("KEEP ALIVE message received: 0x%s",
1300                                 binascii.b2a_hex(self.msg_in))
1301                 else:
1302                     logger.warning("Unexpected message received: 0x%s",
1303                                    binascii.b2a_hex(self.msg_in))
1304                 self.msg_in = ""
1305                 self.reading_header = True
1306                 self.bytes_to_read = self.header_length
1307         # We should not act upon peer_hold_time if we are reading
1308         # something right now.
1309         return
1310
1311     def decode_path_attributes(self, path_attributes_hex):
1312         """Decode the Path Attributes field (rfc4271#section-4.3)
1313
1314         Arguments:
1315             :path_attributes: path_attributes field to be decoded in hex
1316         Returns:
1317             :return: None
1318         """
1319         hex_to_decode = path_attributes_hex
1320
1321         while len(hex_to_decode):
1322             attr_flags_hex = hex_to_decode[0]
1323             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1324 #            attr_optional_bit = attr_flags & 128
1325 #            attr_transitive_bit = attr_flags & 64
1326 #            attr_partial_bit = attr_flags & 32
1327             attr_extended_length_bit = attr_flags & 16
1328
1329             attr_type_code_hex = hex_to_decode[1]
1330             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1331
1332             if attr_extended_length_bit:
1333                 attr_length_hex = hex_to_decode[2:4]
1334                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1335                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1336                 hex_to_decode = hex_to_decode[4 + attr_length:]
1337             else:
1338                 attr_length_hex = hex_to_decode[2]
1339                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1340                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1341                 hex_to_decode = hex_to_decode[3 + attr_length:]
1342
1343             if attr_type_code == 1:
1344                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1345                              binascii.b2a_hex(attr_flags_hex))
1346                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1347             elif attr_type_code == 2:
1348                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1349                              binascii.b2a_hex(attr_flags_hex))
1350                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1351             elif attr_type_code == 3:
1352                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1353                              binascii.b2a_hex(attr_flags_hex))
1354                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1355             elif attr_type_code == 4:
1356                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1357                              binascii.b2a_hex(attr_flags_hex))
1358                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1359             elif attr_type_code == 5:
1360                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1361                              binascii.b2a_hex(attr_flags_hex))
1362                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1363             elif attr_type_code == 6:
1364                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1365                              binascii.b2a_hex(attr_flags_hex))
1366                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1367             elif attr_type_code == 7:
1368                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1369                              binascii.b2a_hex(attr_flags_hex))
1370                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1371             elif attr_type_code == 9:  # rfc4456#section-8
1372                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1373                              binascii.b2a_hex(attr_flags_hex))
1374                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1375             elif attr_type_code == 10:  # rfc4456#section-8
1376                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1377                              binascii.b2a_hex(attr_flags_hex))
1378                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1379             elif attr_type_code == 14:  # rfc4760#section-3
1380                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1381                              binascii.b2a_hex(attr_flags_hex))
1382                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1383                 address_family_identifier_hex = attr_value_hex[0:2]
1384                 logger.debug("  Address Family Identifier=0x%s",
1385                              binascii.b2a_hex(address_family_identifier_hex))
1386                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1387                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1388                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1389                 next_hop_netaddr_len_hex = attr_value_hex[3]
1390                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1391                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1392                              next_hop_netaddr_len,
1393                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1394                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1395                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1396                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1397                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1398                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1399                 logger.debug("  Reserved=0x%s",
1400                              binascii.b2a_hex(reserved_hex))
1401                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1402                 logger.debug("  Network Layer Reachability Information=0x%s",
1403                              binascii.b2a_hex(nlri_hex))
1404                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1405                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1406                 for prefix in nlri_prefix_list:
1407                     logger.debug("  nlri_prefix_received: %s", prefix)
1408                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1409             elif attr_type_code == 15:  # rfc4760#section-4
1410                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1411                              binascii.b2a_hex(attr_flags_hex))
1412                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1413                 address_family_identifier_hex = attr_value_hex[0:2]
1414                 logger.debug("  Address Family Identifier=0x%s",
1415                              binascii.b2a_hex(address_family_identifier_hex))
1416                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1417                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1418                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1419                 wd_hex = attr_value_hex[3:]
1420                 logger.debug("  Withdrawn Routes=0x%s",
1421                              binascii.b2a_hex(wd_hex))
1422                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1423                 logger.debug("  Withdrawn routes prefix list: %s",
1424                              wdr_prefix_list)
1425                 for prefix in wdr_prefix_list:
1426                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1427                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1428             else:
1429                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1430                              binascii.b2a_hex(attr_flags_hex))
1431                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1432         return None
1433
1434     def decode_update_message(self, msg):
1435         """Decode an UPDATE message (rfc4271#section-4.3)
1436
1437         Arguments:
1438             :msg: message to be decoded in hex
1439         Returns:
1440             :return: None
1441         """
1442         logger.debug("Decoding update message:")
1443         # message header - marker
1444         marker_hex = msg[:16]
1445         logger.debug("Message header marker: 0x%s",
1446                      binascii.b2a_hex(marker_hex))
1447         # message header - message length
1448         msg_length_hex = msg[16:18]
1449         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1450         logger.debug("Message lenght: 0x%s (%s)",
1451                      binascii.b2a_hex(msg_length_hex), msg_length)
1452         # message header - message type
1453         msg_type_hex = msg[18:19]
1454         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1455         if msg_type == 2:
1456             logger.debug("Message type: 0x%s (update)",
1457                          binascii.b2a_hex(msg_type_hex))
1458             # withdrawn routes length
1459             wdr_length_hex = msg[19:21]
1460             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1461             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1462                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1463             # withdrawn routes
1464             wdr_hex = msg[21:21 + wdr_length]
1465             logger.debug("Withdrawn routes: 0x%s",
1466                          binascii.b2a_hex(wdr_hex))
1467             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1468             logger.debug("Withdrawn routes prefix list: %s",
1469                          wdr_prefix_list)
1470             for prefix in wdr_prefix_list:
1471                 logger.debug("withdrawn_prefix_received: %s", prefix)
1472             # total path attribute length
1473             total_pa_length_offset = 21 + wdr_length
1474             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1475             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1476             logger.debug("Total path attribute lenght: 0x%s (%s)",
1477                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1478             # path attributes
1479             pa_offset = total_pa_length_offset + 2
1480             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1481             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1482             self.decode_path_attributes(pa_hex)
1483             # network layer reachability information length
1484             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1485             logger.debug("Calculated NLRI length: %s", nlri_length)
1486             # network layer reachability information
1487             nlri_offset = pa_offset + total_pa_length
1488             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1489             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1490             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1491             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1492             for prefix in nlri_prefix_list:
1493                 logger.debug("nlri_prefix_received: %s", prefix)
1494             # Updating counters
1495             self.updates_received += 1
1496             self.prefixes_introduced += len(nlri_prefix_list)
1497             self.prefixes_withdrawn += len(wdr_prefix_list)
1498         else:
1499             logger.error("Unexpeced message type 0x%s in 0x%s",
1500                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1501
1502     def wait_for_read(self):
1503         """Read message until timeout (next expected event).
1504
1505         Note:
1506             Used when no more updates has to be sent to avoid busy-wait.
1507             Currently it does not return anything.
1508         """
1509         # Compute time to the first predictable state change
1510         event_time = self.timer.get_next_event_time()
1511         # snapshot_time would be imprecise
1512         wait_timedelta = min(event_time - time.time(), 10)
1513         if wait_timedelta < 0:
1514             # The program got around to waiting to an event in "very near
1515             # future" so late that it became a "past" event, thus tell
1516             # "select" to not wait at all. Passing negative timedelta to
1517             # select() would lead to either waiting forever (for -1) or
1518             # select.error("Invalid parameter") (for everything else).
1519             wait_timedelta = 0
1520         # And wait for event or something to read.
1521
1522         if not self.rx_activity_detected or not (self.updates_received % 100):
1523             # right time to write statistics to the log (not for every update and
1524             # not too frequently to avoid having large log files)
1525             logger.info("total_received_update_message_counter: %s",
1526                         self.updates_received)
1527             logger.info("total_received_nlri_prefix_counter: %s",
1528                         self.prefixes_introduced)
1529             logger.info("total_received_withdrawn_prefix_counter: %s",
1530                         self.prefixes_withdrawn)
1531
1532         start_time = time.time()
1533         select.select([self.socket], [], [self.socket], wait_timedelta)
1534         timedelta = time.time() - start_time
1535         self.rx_idle_time += timedelta
1536         self.rx_activity_detected = timedelta < 1
1537
1538         if not self.rx_activity_detected or not (self.updates_received % 100):
1539             # right time to write statistics to the log (not for every update and
1540             # not too frequently to avoid having large log files)
1541             logger.info("... idle for %.3fs", timedelta)
1542             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1543         return
1544
1545
1546 class WriteTracker(object):
1547     """Class tracking enqueueing messages and sending chunks of them."""
1548
1549     def __init__(self, bgp_socket, generator, timer):
1550         """The writter initialisation.
1551
1552         Arguments:
1553             bgp_socket: socket to be used for sending
1554             generator: generator to be used for message generation
1555             timer: timer to be used for scheduling
1556         """
1557         # References to outside objects,
1558         self.socket = bgp_socket
1559         self.generator = generator
1560         self.timer = timer
1561         # Really new fields.
1562         # TODO: Would attribute docstrings add anything substantial?
1563         self.sending_message = False
1564         self.bytes_to_send = 0
1565         self.msg_out = ""
1566
1567     def enqueue_message_for_sending(self, message):
1568         """Enqueue message and change state.
1569
1570         Arguments:
1571             message: message to be enqueued into the msg_out buffer
1572         """
1573         self.msg_out += message
1574         self.bytes_to_send += len(message)
1575         self.sending_message = True
1576
1577     def send_message_chunk_is_whole(self):
1578         """Send enqueued data from msg_out buffer
1579
1580         Returns:
1581             :return: true if no remaining data to send
1582         """
1583         # We assume there is a msg_out to send and socket is writable.
1584         # print "going to send", repr(self.msg_out)
1585         self.timer.snapshot()
1586         bytes_sent = self.socket.send(self.msg_out)
1587         # Forget the part of message that was sent.
1588         self.msg_out = self.msg_out[bytes_sent:]
1589         self.bytes_to_send -= bytes_sent
1590         if not self.bytes_to_send:
1591             # TODO: Is it possible to hit negative bytes_to_send?
1592             self.sending_message = False
1593             # We should have reset hold timer on peer side.
1594             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1595             # The possible reason for not prioritizing reads is gone.
1596             return True
1597         return False
1598
1599
1600 class StateTracker(object):
1601     """Main loop has state so complex it warrants this separate class."""
1602
1603     def __init__(self, bgp_socket, generator, timer):
1604         """The state tracker initialisation.
1605
1606         Arguments:
1607             bgp_socket: socket to be used for sending / receiving
1608             generator: generator to be used for message generation
1609             timer: timer to be used for scheduling
1610         """
1611         # References to outside objects.
1612         self.socket = bgp_socket
1613         self.generator = generator
1614         self.timer = timer
1615         # Sub-trackers.
1616         self.reader = ReadTracker(bgp_socket, timer)
1617         self.writer = WriteTracker(bgp_socket, generator, timer)
1618         # Prioritization state.
1619         self.prioritize_writing = False
1620         # In general, we prioritize reading over writing. But in order
1621         # not to get blocked by neverending reads, we should
1622         # check whether we are not risking running out of holdtime.
1623         # So in some situations, this field is set to True to attempt
1624         # finishing sending a message, after which this field resets
1625         # back to False.
1626         # TODO: Alternative is to switch fairly between reading and
1627         # writing (called round robin from now on).
1628         # Message counting is done in generator.
1629
1630     def perform_one_loop_iteration(self):
1631         """ The main loop iteration
1632
1633         Notes:
1634             Calculates priority, resolves all conditions, calls
1635             appropriate method and returns to caller to repeat.
1636         """
1637         self.timer.snapshot()
1638         if not self.prioritize_writing:
1639             if self.timer.is_time_for_my_keepalive():
1640                 if not self.writer.sending_message:
1641                     # We need to schedule a keepalive ASAP.
1642                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1643                     logger.info("KEEP ALIVE is sent.")
1644                 # We are sending a message now, so let's prioritize it.
1645                 self.prioritize_writing = True
1646         # Now we know what our priorities are, we have to check
1647         # which actions are available.
1648         # socket.socket() returns three lists,
1649         # we store them to list of lists.
1650         list_list = select.select([self.socket], [self.socket], [self.socket],
1651                                   self.timer.report_timedelta)
1652         read_list, write_list, except_list = list_list
1653         # Lists are unpacked, each is either [] or [self.socket],
1654         # so we will test them as boolean.
1655         if except_list:
1656             logger.error("Exceptional state on the socket.")
1657             raise RuntimeError("Exceptional state on socket", self.socket)
1658         # We will do either read or write.
1659         if not (self.prioritize_writing and write_list):
1660             # Either we have no reason to rush writes,
1661             # or the socket is not writable.
1662             # We are focusing on reading here.
1663             if read_list:  # there is something to read indeed
1664                 # In this case we want to read chunk of message
1665                 # and repeat the select,
1666                 self.reader.read_message_chunk()
1667                 return
1668             # We were focusing on reading, but nothing to read was there.
1669             # Good time to check peer for hold timer.
1670             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1671             # Quiet on the read front, we can have attempt to write.
1672         if write_list:
1673             # Either we really want to reset peer's view of our hold
1674             # timer, or there was nothing to read.
1675             # Were we in the middle of sending a message?
1676             if self.writer.sending_message:
1677                 # Was it the end of a message?
1678                 whole = self.writer.send_message_chunk_is_whole()
1679                 # We were pressed to send something and we did it.
1680                 if self.prioritize_writing and whole:
1681                     # We prioritize reading again.
1682                     self.prioritize_writing = False
1683                 return
1684             # Finally to check if still update messages to be generated.
1685             if self.generator.remaining_prefixes:
1686                 msg_out = self.generator.compose_update_message()
1687                 if not self.generator.remaining_prefixes:
1688                     # We have just finished update generation,
1689                     # end-of-rib is due.
1690                     logger.info("All update messages generated.")
1691                     logger.info("Storing performance results.")
1692                     self.generator.store_results()
1693                     logger.info("Finally an END-OF-RIB is sent.")
1694                     msg_out += self.generator.update_message(wr_prefixes=[],
1695                                                              nlri_prefixes=[],
1696                                                              end_of_rib=True)
1697                 self.writer.enqueue_message_for_sending(msg_out)
1698                 # Attempt for real sending to be done in next iteration.
1699                 return
1700             # Nothing to write anymore.
1701             # To avoid busy loop, we do idle waiting here.
1702             self.reader.wait_for_read()
1703             return
1704         # We can neither read nor write.
1705         logger.warning("Input and output both blocked for " +
1706                        str(self.timer.report_timedelta) + " seconds.")
1707         # FIXME: Are we sure select has been really waiting
1708         # the whole period?
1709         return
1710
1711
1712 def create_logger(loglevel, logfile):
1713     """Create logger object
1714
1715     Arguments:
1716         :loglevel: log level
1717         :logfile: log file name
1718     Returns:
1719         :return: logger object
1720     """
1721     logger = logging.getLogger("logger")
1722     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1723     console_handler = logging.StreamHandler()
1724     file_handler = logging.FileHandler(logfile, mode="w")
1725     console_handler.setFormatter(log_formatter)
1726     file_handler.setFormatter(log_formatter)
1727     logger.addHandler(console_handler)
1728     logger.addHandler(file_handler)
1729     logger.setLevel(loglevel)
1730     return logger
1731
1732
1733 def job(arguments):
1734     """One time initialisation and iterations looping.
1735     Notes:
1736         Establish BGP connection and run iterations.
1737
1738     Arguments:
1739         :arguments: Command line arguments
1740     Returns:
1741         :return: None
1742     """
1743     bgp_socket = establish_connection(arguments)
1744     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1745     # Receive open message before sending anything.
1746     # FIXME: Add parameter to send default open message first,
1747     # to work with "you first" peers.
1748     msg_in = read_open_message(bgp_socket)
1749     timer = TimeTracker(msg_in)
1750     generator = MessageGenerator(arguments)
1751     msg_out = generator.open_message()
1752     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1753     # Send our open message to the peer.
1754     bgp_socket.send(msg_out)
1755     # Wait for confirming keepalive.
1756     # TODO: Surely in just one packet?
1757     # Using exact keepalive length to not to see possible updates.
1758     msg_in = bgp_socket.recv(19)
1759     if msg_in != generator.keepalive_message():
1760         error_msg = "Open not confirmed by keepalive, instead got"
1761         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1762         raise MessageError(error_msg, msg_in)
1763     timer.reset_peer_hold_time()
1764     # Send the keepalive to indicate the connection is accepted.
1765     timer.snapshot()  # Remember this time.
1766     msg_out = generator.keepalive_message()
1767     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1768     bgp_socket.send(msg_out)
1769     # Use the remembered time.
1770     timer.reset_my_keepalive_time(timer.snapshot_time)
1771     # End of initial handshake phase.
1772     state = StateTracker(bgp_socket, generator, timer)
1773     while True:  # main reactor loop
1774         state.perform_one_loop_iteration()
1775
1776
1777 def threaded_job(arguments):
1778     """Run the job threaded
1779
1780     Arguments:
1781         :arguments: Command line arguments
1782     Returns:
1783         :return: None
1784     """
1785     amount_left = arguments.amount
1786     utils_left = arguments.multiplicity
1787     prefix_current = arguments.firstprefix
1788     myip_current = arguments.myip
1789     thread_args = []
1790
1791     while 1:
1792         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1793         amount_left -= amount_per_util
1794         utils_left -= 1
1795
1796         args = deepcopy(arguments)
1797         args.amount = amount_per_util
1798         args.firstprefix = prefix_current
1799         args.myip = myip_current
1800         thread_args.append(args)
1801
1802         if not utils_left:
1803             break
1804         prefix_current += amount_per_util * 16
1805         myip_current += 1
1806
1807     try:
1808         # Create threads
1809         for t in thread_args:
1810             thread.start_new_thread(job, (t,))
1811     except Exception:
1812         print "Error: unable to start thread."
1813         raise SystemExit(2)
1814
1815     # Work remains forever
1816     while 1:
1817         time.sleep(5)
1818
1819
1820 if __name__ == "__main__":
1821     arguments = parse_arguments()
1822     logger = create_logger(arguments.loglevel, arguments.logfile)
1823     threaded_job(arguments)