fixing play.py and data change counter
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 import argparse
15 import binascii
16 import ipaddr
17 import select
18 import socket
19 import time
20 import logging
21 import struct
22
23 import thread
24 from copy import deepcopy
25
26
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
31
32
33 def parse_arguments():
34     """Use argparse to get arguments,
35
36     Returns:
37         :return: args object.
38     """
39     parser = argparse.ArgumentParser()
40     # TODO: Should we use --argument-names-with-spaces?
41     str_help = "Autonomous System number use in the stream."
42     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43     # FIXME: We are acting as iBGP peer,
44     # we should mirror AS number from peer's open message.
45     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46     parser.add_argument("--amount", default="1", type=int, help=str_help)
47     str_help = "Maximum number of IP prefixes to be announced in one iteration"
48     parser.add_argument("--insert", default="1", type=int, help=str_help)
49     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51     str_help = "The number of prefixes to process without withdrawals"
52     parser.add_argument("--prefill", default="0", type=int, help=str_help)
53     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54     parser.add_argument("--updates", choices=["single", "separate"],
55                         default=["separate"], help=str_help)
56     str_help = "Base prefix IP address for prefix generation"
57     parser.add_argument("--firstprefix", default="8.0.1.0",
58                         type=ipaddr.IPv4Address, help=str_help)
59     str_help = "The prefix length."
60     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61     str_help = "Listen for connection, instead of initiating it."
62     parser.add_argument("--listen", action="store_true", help=str_help)
63     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64                 "Default value only suitable for listening.")
65     parser.add_argument("--myip", default="0.0.0.0",
66                         type=ipaddr.IPv4Address, help=str_help)
67     str_help = ("TCP port to bind to when listening or initiating connection." +
68                 "Default only suitable for initiating.")
69     parser.add_argument("--myport", default="0", type=int, help=str_help)
70     str_help = "The IP of the next hop to be placed into the update messages."
71     parser.add_argument("--nexthop", default="192.0.2.1",
72                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73     str_help = "Identifier of the route originator."
74     parser.add_argument("--originator", default=None,
75                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
76     str_help = "Cluster list item identifier."
77     parser.add_argument("--cluster", default=None,
78                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79     str_help = ("Numeric IP Address to try to connect to." +
80                 "Currently no effect in listening mode.")
81     parser.add_argument("--peerip", default="127.0.0.2",
82                         type=ipaddr.IPv4Address, help=str_help)
83     str_help = "TCP port to try to connect to. No effect in listening mode."
84     parser.add_argument("--peerport", default="179", type=int, help=str_help)
85     str_help = "Local hold time."
86     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87     str_help = "Log level (--error, --warning, --info, --debug)"
88     parser.add_argument("--error", dest="loglevel", action="store_const",
89                         const=logging.ERROR, default=logging.INFO,
90                         help=str_help)
91     parser.add_argument("--warning", dest="loglevel", action="store_const",
92                         const=logging.WARNING, default=logging.INFO,
93                         help=str_help)
94     parser.add_argument("--info", dest="loglevel", action="store_const",
95                         const=logging.INFO, default=logging.INFO,
96                         help=str_help)
97     parser.add_argument("--debug", dest="loglevel", action="store_const",
98                         const=logging.DEBUG, default=logging.INFO,
99                         help=str_help)
100     str_help = "Log file name"
101     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102     str_help = "Trailing part of the csv result files for plotting purposes"
103     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104     str_help = "Minimum number of updates to reach to include result into csv."
105     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
108     str_help = "Link-State NLRI supported"
109     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
110     str_help = "Link-State NLRI: Identifier"
111     parser.add_argument("-lsid", default="1", type=int, help=str_help)
112     str_help = "Link-State NLRI: Tunnel ID"
113     parser.add_argument("-lstid", default="1", type=int, help=str_help)
114     str_help = "Link-State NLRI: LSP ID"
115     parser.add_argument("-lspid", default="1", type=int, help=str_help)
116     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
117     parser.add_argument("--lstsaddr", default="1.2.3.4",
118                         type=ipaddr.IPv4Address, help=str_help)
119     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
120     parser.add_argument("--lsteaddr", default="5.6.7.8",
121                         type=ipaddr.IPv4Address, help=str_help)
122     str_help = "Link-State NLRI: Identifier Step"
123     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
124     str_help = "Link-State NLRI: Tunnel ID Step"
125     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
126     str_help = "Link-State NLRI: LSP ID Step"
127     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
128     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
129     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
130     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
131     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
132     str_help = "How many play utilities are to be started."
133     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
134     arguments = parser.parse_args()
135     if arguments.multiplicity < 1:
136         print "Multiplicity", arguments.multiplicity, "is not positive."
137         raise SystemExit(1)
138     # TODO: Are sanity checks (such as asnumber>=0) required?
139     return arguments
140
141
142 def establish_connection(arguments):
143     """Establish connection to BGP peer.
144
145     Arguments:
146         :arguments: following command-line argumets are used
147             - arguments.myip: local IP address
148             - arguments.myport: local port
149             - arguments.peerip: remote IP address
150             - arguments.peerport: remote port
151     Returns:
152         :return: socket.
153     """
154     if arguments.listen:
155         logger.info("Connecting in the listening mode.")
156         logger.debug("Local IP address: " + str(arguments.myip))
157         logger.debug("Local port: " + str(arguments.myport))
158         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
159         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
160         # bind need single tuple as argument
161         listening_socket.bind((str(arguments.myip), arguments.myport))
162         listening_socket.listen(1)
163         bgp_socket, _ = listening_socket.accept()
164         # TODO: Verify client IP is cotroller IP.
165         listening_socket.close()
166     else:
167         logger.info("Connecting in the talking mode.")
168         logger.debug("Local IP address: " + str(arguments.myip))
169         logger.debug("Local port: " + str(arguments.myport))
170         logger.debug("Remote IP address: " + str(arguments.peerip))
171         logger.debug("Remote port: " + str(arguments.peerport))
172         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
173         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
174         # bind to force specified address and port
175         talking_socket.bind((str(arguments.myip), arguments.myport))
176         # socket does not spead ipaddr, hence str()
177         talking_socket.connect((str(arguments.peerip), arguments.peerport))
178         bgp_socket = talking_socket
179     logger.info("Connected to ODL.")
180     return bgp_socket
181
182
183 def get_short_int_from_message(message, offset=16):
184     """Extract 2-bytes number from provided message.
185
186     Arguments:
187         :message: given message
188         :offset: offset of the short_int inside the message
189     Returns:
190         :return: required short_inf value.
191     Notes:
192         default offset value is the BGP message size offset.
193     """
194     high_byte_int = ord(message[offset])
195     low_byte_int = ord(message[offset + 1])
196     short_int = high_byte_int * 256 + low_byte_int
197     return short_int
198
199
200 def get_prefix_list_from_hex(prefixes_hex):
201     """Get decoded list of prefixes (rfc4271#section-4.3)
202
203     Arguments:
204         :prefixes_hex: list of prefixes to be decoded in hex
205     Returns:
206         :return: list of prefixes in the form of ip address (X.X.X.X/X)
207     """
208     prefix_list = []
209     offset = 0
210     while offset < len(prefixes_hex):
211         prefix_bit_len_hex = prefixes_hex[offset]
212         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
213         prefix_len = ((prefix_bit_len - 1) / 8) + 1
214         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
215         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
216         offset += 1 + prefix_len
217         prefix_list.append(prefix + "/" + str(prefix_bit_len))
218     return prefix_list
219
220
221 class MessageError(ValueError):
222     """Value error with logging optimized for hexlified messages."""
223
224     def __init__(self, text, message, *args):
225         """Initialisation.
226
227         Store and call super init for textual comment,
228         store raw message which caused it.
229         """
230         self.text = text
231         self.msg = message
232         super(MessageError, self).__init__(text, message, *args)
233
234     def __str__(self):
235         """Generate human readable error message.
236
237         Returns:
238             :return: human readable message as string
239         Notes:
240             Use a placeholder string if the message is to be empty.
241         """
242         message = binascii.hexlify(self.msg)
243         if message == "":
244             message = "(empty message)"
245         return self.text + ": " + message
246
247
248 def read_open_message(bgp_socket):
249     """Receive peer's OPEN message
250
251     Arguments:
252         :bgp_socket: the socket to be read
253     Returns:
254         :return: received OPEN message.
255     Notes:
256         Performs just basic incomming message checks
257     """
258     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
259     # TODO: Can the incoming open message be split in more than one packet?
260     # Some validation.
261     if len(msg_in) < 37:
262         # 37 is minimal length of open message with 4-byte AS number.
263         error_msg = (
264             "Message length (" + str(len(msg_in)) + ") is smaller than "
265             "minimal length of OPEN message with 4-byte AS number (37)"
266         )
267         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
268         raise MessageError(error_msg, msg_in)
269     # TODO: We could check BGP marker, but it is defined only later;
270     # decide what to do.
271     reported_length = get_short_int_from_message(msg_in)
272     if len(msg_in) != reported_length:
273         error_msg = (
274             "Expected message length (" + reported_length +
275             ") does not match actual length (" + str(len(msg_in)) + ")"
276         )
277         logger.error(error_msg + binascii.hexlify(msg_in))
278         raise MessageError(error_msg, msg_in)
279     logger.info("Open message received.")
280     return msg_in
281
282
283 class MessageGenerator(object):
284     """Class which generates messages, holds states and configuration values."""
285
286     # TODO: Define bgp marker as a class (constant) variable.
287     def __init__(self, args):
288         """Initialisation according to command-line args.
289
290         Arguments:
291             :args: argsparser's Namespace object which contains command-line
292                 options for MesageGenerator initialisation
293         Notes:
294             Calculates and stores default values used later on for
295             message geeration.
296         """
297         self.total_prefix_amount = args.amount
298         # Number of update messages left to be sent.
299         self.remaining_prefixes = self.total_prefix_amount
300
301         # New parameters initialisation
302         self.iteration = 0
303         self.prefix_base_default = args.firstprefix
304         self.prefix_length_default = args.prefixlen
305         self.wr_prefixes_default = []
306         self.nlri_prefixes_default = []
307         self.version_default = 4
308         self.my_autonomous_system_default = args.asnumber
309         self.hold_time_default = args.holdtime  # Local hold time.
310         self.bgp_identifier_default = int(args.myip)
311         self.next_hop_default = args.nexthop
312         self.originator_id_default = args.originator
313         self.cluster_list_item_default = args.cluster
314         self.single_update_default = args.updates == "single"
315         self.randomize_updates_default = args.updates == "random"
316         self.prefix_count_to_add_default = args.insert
317         self.prefix_count_to_del_default = args.withdraw
318         if self.prefix_count_to_del_default < 0:
319             self.prefix_count_to_del_default = 0
320         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
321             # total number of prefixes must grow to avoid infinite test loop
322             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
323         self.slot_size_default = self.prefix_count_to_add_default
324         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
325         self.results_file_name_default = args.results
326         self.performance_threshold_default = args.threshold
327         self.rfc4760 = args.rfc4760
328         self.bgpls = args.bgpls
329         # Default values when BGP-LS Attributes are used
330         if self.bgpls:
331             self.prefix_count_to_add_default = 1
332             self.prefix_count_to_del_default = 0
333         self.ls_nlri_default = {"Identifier": args.lsid,
334                                 "TunnelID": args.lstid,
335                                 "LSPID": args.lspid,
336                                 "IPv4TunnelSenderAddress": args.lstsaddr,
337                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
338         self.lsid_step = args.lsidstep
339         self.lstid_step = args.lstidstep
340         self.lspid_step = args.lspidstep
341         self.lstsaddr_step = args.lstsaddrstep
342         self.lsteaddr_step = args.lsteaddrstep
343         # Default values used for randomized part
344         s1_slots = ((self.total_prefix_amount -
345                      self.remaining_prefixes_threshold - 1) /
346                     self.prefix_count_to_add_default + 1)
347         s2_slots = ((self.remaining_prefixes_threshold - 1) /
348                     (self.prefix_count_to_add_default -
349                     self.prefix_count_to_del_default) + 1)
350         # S1_First_Index = 0
351         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
352         s2_first_index = s1_slots * self.prefix_count_to_add_default
353         s2_last_index = (s2_first_index +
354                          s2_slots * (self.prefix_count_to_add_default -
355                                      self.prefix_count_to_del_default) - 1)
356         self.slot_gap_default = ((self.total_prefix_amount -
357                                   self.remaining_prefixes_threshold - 1) /
358                                  self.prefix_count_to_add_default + 1)
359         self.randomize_lowest_default = s2_first_index
360         self.randomize_highest_default = s2_last_index
361         # Initialising counters
362         self.phase1_start_time = 0
363         self.phase1_stop_time = 0
364         self.phase2_start_time = 0
365         self.phase2_stop_time = 0
366         self.phase1_updates_sent = 0
367         self.phase2_updates_sent = 0
368         self.updates_sent = 0
369
370         self.log_info = args.loglevel <= logging.INFO
371         self.log_debug = args.loglevel <= logging.DEBUG
372         """
373         Flags needed for the MessageGenerator performance optimization.
374         Calling logger methods each iteration even with proper log level set
375         slows down significantly the MessageGenerator performance.
376         Measured total generation time (1M updates, dry run, error log level):
377         - logging based on basic logger features: 36,2s
378         - logging based on advanced logger features (lazy logging): 21,2s
379         - conditional calling of logger methods enclosed inside condition: 8,6s
380         """
381
382         logger.info("Generator initialisation")
383         logger.info("  Target total number of prefixes to be introduced: " +
384                     str(self.total_prefix_amount))
385         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
386                     str(self.prefix_length_default))
387         logger.info("  My Autonomous System number: " +
388                     str(self.my_autonomous_system_default))
389         logger.info("  My Hold Time: " + str(self.hold_time_default))
390         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
391         logger.info("  Next Hop: " + str(self.next_hop_default))
392         logger.info("  Originator ID: " + str(self.originator_id_default))
393         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
394         logger.info("  Prefix count to be inserted at once: " +
395                     str(self.prefix_count_to_add_default))
396         logger.info("  Prefix count to be withdrawn at once: " +
397                     str(self.prefix_count_to_del_default))
398         logger.info("  Fast pre-fill up to " +
399                     str(self.total_prefix_amount -
400                         self.remaining_prefixes_threshold) + " prefixes")
401         logger.info("  Remaining number of prefixes to be processed " +
402                     "in parallel with withdrawals: " +
403                     str(self.remaining_prefixes_threshold))
404         logger.debug("  Prefix index range used after pre-fill procedure [" +
405                      str(self.randomize_lowest_default) + ", " +
406                      str(self.randomize_highest_default) + "]")
407         if self.single_update_default:
408             logger.info("  Common single UPDATE will be generated " +
409                         "for both NLRI & WITHDRAWN lists")
410         else:
411             logger.info("  Two separate UPDATEs will be generated " +
412                         "for each NLRI & WITHDRAWN lists")
413         if self.randomize_updates_default:
414             logger.info("  Generation of UPDATE messages will be randomized")
415         logger.info("  Let\'s go ...\n")
416
417         # TODO: Notification for hold timer expiration can be handy.
418
419     def store_results(self, file_name=None, threshold=None):
420         """ Stores specified results into files based on file_name value.
421
422         Arguments:
423             :param file_name: Trailing (common) part of result file names
424             :param threshold: Minimum number of sent updates needed for each
425                               result to be included into result csv file
426                               (mainly needed because of the result accuracy)
427         Returns:
428             :return: n/a
429         """
430         # default values handling
431         # TODO optimize default values handling (use e.g. dicionary.update() approach)
432         if file_name is None:
433             file_name = self.results_file_name_default
434         if threshold is None:
435             threshold = self.performance_threshold_default
436         # performance calculation
437         if self.phase1_updates_sent >= threshold:
438             totals1 = self.phase1_updates_sent
439             performance1 = int(self.phase1_updates_sent /
440                                (self.phase1_stop_time - self.phase1_start_time))
441         else:
442             totals1 = None
443             performance1 = None
444         if self.phase2_updates_sent >= threshold:
445             totals2 = self.phase2_updates_sent
446             performance2 = int(self.phase2_updates_sent /
447                                (self.phase2_stop_time - self.phase2_start_time))
448         else:
449             totals2 = None
450             performance2 = None
451
452         logger.info("#" * 10 + " Final results " + "#" * 10)
453         logger.info("Number of iterations: " + str(self.iteration))
454         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
455                     str(self.phase1_updates_sent))
456         logger.info("The pre-fill phase duration: " +
457                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
458         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
459                     str(self.phase2_updates_sent))
460         logger.info("The 2nd test phase duration: " +
461                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
462         logger.info("Threshold for performance reporting: " + str(threshold))
463
464         # making labels
465         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
466                         " route(s) per UPDATE")
467         if self.single_update_default:
468             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
469                                   "/-" + str(self.prefix_count_to_del_default) +
470                                   " routes per UPDATE")
471         else:
472             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
473                                   "/-" + str(self.prefix_count_to_del_default) +
474                                   " routes in two UPDATEs")
475         # collecting capacity and performance results
476         totals = {}
477         performance = {}
478         if totals1 is not None:
479             totals[phase1_label] = totals1
480             performance[phase1_label] = performance1
481         if totals2 is not None:
482             totals[phase2_label] = totals2
483             performance[phase2_label] = performance2
484         self.write_results_to_file(totals, "totals-" + file_name)
485         self.write_results_to_file(performance, "performance-" + file_name)
486
487     def write_results_to_file(self, results, file_name):
488         """Writes results to the csv plot file consumable by Jenkins.
489
490         Arguments:
491             :param file_name: Name of the (csv) file to be created
492         Returns:
493             :return: none
494         """
495         first_line = ""
496         second_line = ""
497         f = open(file_name, "wt")
498         try:
499             for key in sorted(results):
500                 first_line += key + ", "
501                 second_line += str(results[key]) + ", "
502             first_line = first_line[:-2]
503             second_line = second_line[:-2]
504             f.write(first_line + "\n")
505             f.write(second_line + "\n")
506             logger.info("Message generator performance results stored in " +
507                         file_name + ":")
508             logger.info("  " + first_line)
509             logger.info("  " + second_line)
510         finally:
511             f.close()
512
513     # Return pseudo-randomized (reproducible) index for selected range
514     def randomize_index(self, index, lowest=None, highest=None):
515         """Calculates pseudo-randomized index from selected range.
516
517         Arguments:
518             :param index: input index
519             :param lowest: the lowes index from the randomized area
520             :param highest: the highest index from the randomized area
521         Returns:
522             :return: the (pseudo)randomized index
523         Notes:
524             Created just as a fame for future generator enhancement.
525         """
526         # default values handling
527         # TODO optimize default values handling (use e.g. dicionary.update() approach)
528         if lowest is None:
529             lowest = self.randomize_lowest_default
530         if highest is None:
531             highest = self.randomize_highest_default
532         # randomize
533         if (index >= lowest) and (index <= highest):
534             # we are in the randomized range -> shuffle it inside
535             # the range (now just reverse the order)
536             new_index = highest - (index - lowest)
537         else:
538             # we are out of the randomized range -> nothing to do
539             new_index = index
540         return new_index
541
542     def get_ls_nlri_values(self, index):
543         """Generates LS-NLRI parameters.
544         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
545
546         Arguments:
547             :param index: index (iteration)
548         Returns:
549             :return: dictionary of LS NLRI parameters and values
550         """
551         # generating list of LS NLRI parameters
552         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
553         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
554         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
555         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
556         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
557         ls_nlri_values = {"Identifier": identifier,
558                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
559                           "TunnelID": tunnel_id, "LSPID": lsp_id,
560                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
561         return ls_nlri_values
562
563     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
564                         prefix_len=None, prefix_count=None, randomize=None):
565         """Generates list of IP address prefixes.
566
567         Arguments:
568             :param slot_index: index of group of prefix addresses
569             :param slot_size: size of group of prefix addresses
570                 in [number of included prefixes]
571             :param prefix_base: IP address of the first prefix
572                 (slot_index = 0, prefix_index = 0)
573             :param prefix_len: length of the prefix in bites
574                 (the same as size of netmask)
575             :param prefix_count: number of prefixes to be returned
576                 from the specified slot
577         Returns:
578             :return: list of generated IP address prefixes
579         """
580         # default values handling
581         # TODO optimize default values handling (use e.g. dicionary.update() approach)
582         if slot_size is None:
583             slot_size = self.slot_size_default
584         if prefix_base is None:
585             prefix_base = self.prefix_base_default
586         if prefix_len is None:
587             prefix_len = self.prefix_length_default
588         if prefix_count is None:
589             prefix_count = slot_size
590         if randomize is None:
591             randomize = self.randomize_updates_default
592         # generating list of prefixes
593         indexes = []
594         prefixes = []
595         prefix_gap = 2 ** (32 - prefix_len)
596         for i in range(prefix_count):
597             prefix_index = slot_index * slot_size + i
598             if randomize:
599                 prefix_index = self.randomize_index(prefix_index)
600             indexes.append(prefix_index)
601             prefixes.append(prefix_base + prefix_index * prefix_gap)
602         if self.log_debug:
603             logger.debug("  Prefix slot index: " + str(slot_index))
604             logger.debug("  Prefix slot size: " + str(slot_size))
605             logger.debug("  Prefix count: " + str(prefix_count))
606             logger.debug("  Prefix indexes: " + str(indexes))
607             logger.debug("  Prefix list: " + str(prefixes))
608         return prefixes
609
610     def compose_update_message(self, prefix_count_to_add=None,
611                                prefix_count_to_del=None):
612         """Composes an UPDATE message
613
614         Arguments:
615             :param prefix_count_to_add: # of prefixes to put into NLRI list
616             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
617         Returns:
618             :return: encoded UPDATE message in HEX
619         Notes:
620             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
621             lists or common message wich includes both prefix lists.
622             Updates global counters.
623         """
624         # default values handling
625         # TODO optimize default values handling (use e.g. dicionary.update() approach)
626         if prefix_count_to_add is None:
627             prefix_count_to_add = self.prefix_count_to_add_default
628         if prefix_count_to_del is None:
629             prefix_count_to_del = self.prefix_count_to_del_default
630         # logging
631         if self.log_info and not (self.iteration % 1000):
632             logger.info("Iteration: " + str(self.iteration) +
633                         " - total remaining prefixes: " +
634                         str(self.remaining_prefixes))
635         if self.log_debug:
636             logger.debug("#" * 10 + " Iteration: " +
637                          str(self.iteration) + " " + "#" * 10)
638             logger.debug("Remaining prefixes: " +
639                          str(self.remaining_prefixes))
640         # scenario type & one-shot counter
641         straightforward_scenario = (self.remaining_prefixes >
642                                     self.remaining_prefixes_threshold)
643         if straightforward_scenario:
644             prefix_count_to_del = 0
645             if self.log_debug:
646                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
647             if not self.phase1_start_time:
648                 self.phase1_start_time = time.time()
649         else:
650             if self.log_debug:
651                 logger.debug("--- COMBINED SCENARIO ---")
652             if not self.phase2_start_time:
653                 self.phase2_start_time = time.time()
654         # tailor the number of prefixes if needed
655         prefix_count_to_add = (prefix_count_to_del +
656                                min(prefix_count_to_add - prefix_count_to_del,
657                                    self.remaining_prefixes))
658         # prefix slots selection for insertion and withdrawal
659         slot_index_to_add = self.iteration
660         slot_index_to_del = slot_index_to_add - self.slot_gap_default
661         # getting lists of prefixes for insertion in this iteration
662         if self.log_debug:
663             logger.debug("Prefixes to be inserted in this iteration:")
664         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
665                                                   prefix_count=prefix_count_to_add)
666         # getting lists of prefixes for withdrawal in this iteration
667         if self.log_debug:
668             logger.debug("Prefixes to be withdrawn in this iteration:")
669         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
670                                                   prefix_count=prefix_count_to_del)
671         # generating the UPDATE mesage with LS-NLRI only
672         if self.bgpls:
673             ls_nlri = self.get_ls_nlri_values(self.iteration)
674             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
675                                           **ls_nlri)
676         else:
677             # generating the UPDATE message with prefix lists
678             if self.single_update_default:
679                 # Send prefixes to be introduced and withdrawn
680                 # in one UPDATE message
681                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
682                                               nlri_prefixes=prefix_list_to_add)
683             else:
684                 # Send prefixes to be introduced and withdrawn
685                 # in separate UPDATE messages (if needed)
686                 msg_out = self.update_message(wr_prefixes=[],
687                                               nlri_prefixes=prefix_list_to_add)
688                 if prefix_count_to_del:
689                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
690                                                    nlri_prefixes=[])
691         # updating counters - who knows ... maybe I am last time here ;)
692         if straightforward_scenario:
693             self.phase1_stop_time = time.time()
694             self.phase1_updates_sent = self.updates_sent
695         else:
696             self.phase2_stop_time = time.time()
697             self.phase2_updates_sent = (self.updates_sent -
698                                         self.phase1_updates_sent)
699         # updating totals for the next iteration
700         self.iteration += 1
701         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
702         # returning the encoded message
703         return msg_out
704
705     # Section of message encoders
706
707     def open_message(self, version=None, my_autonomous_system=None,
708                      hold_time=None, bgp_identifier=None):
709         """Generates an OPEN Message (rfc4271#section-4.2)
710
711         Arguments:
712             :param version: see the rfc4271#section-4.2
713             :param my_autonomous_system: see the rfc4271#section-4.2
714             :param hold_time: see the rfc4271#section-4.2
715             :param bgp_identifier: see the rfc4271#section-4.2
716         Returns:
717             :return: encoded OPEN message in HEX
718         """
719
720         # default values handling
721         # TODO optimize default values handling (use e.g. dicionary.update() approach)
722         if version is None:
723             version = self.version_default
724         if my_autonomous_system is None:
725             my_autonomous_system = self.my_autonomous_system_default
726         if hold_time is None:
727             hold_time = self.hold_time_default
728         if bgp_identifier is None:
729             bgp_identifier = self.bgp_identifier_default
730
731         # Marker
732         marker_hex = "\xFF" * 16
733
734         # Type
735         type = 1
736         type_hex = struct.pack("B", type)
737
738         # version
739         version_hex = struct.pack("B", version)
740
741         # my_autonomous_system
742         # AS_TRANS value, 23456 decadic.
743         my_autonomous_system_2_bytes = 23456
744         # AS number is mappable to 2 bytes
745         if my_autonomous_system < 65536:
746             my_autonomous_system_2_bytes = my_autonomous_system
747         my_autonomous_system_hex_2_bytes = struct.pack(">H",
748                                                        my_autonomous_system)
749
750         # Hold Time
751         hold_time_hex = struct.pack(">H", hold_time)
752
753         # BGP Identifier
754         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
755
756         # Optional Parameters
757         optional_parameters_hex = ""
758         if self.rfc4760:
759             optional_parameter_hex = (
760                 "\x02"  # Param type ("Capability Ad")
761                 "\x06"  # Length (6 bytes)
762                 "\x01"  # Capability type (NLRI Unicast),
763                         # see RFC 4760, secton 8
764                 "\x04"  # Capability value length
765                 "\x00\x01"  # AFI (Ipv4)
766                 "\x00"  # (reserved)
767                 "\x01"  # SAFI (Unicast)
768             )
769             optional_parameters_hex += optional_parameter_hex
770
771         if self.bgpls:
772             optional_parameter_hex = (
773                 "\x02"  # Param type ("Capability Ad")
774                 "\x06"  # Length (6 bytes)
775                 "\x01"  # Capability type (NLRI Unicast),
776                         # see RFC 4760, secton 8
777                 "\x04"  # Capability value length
778                 "\x40\x04"  # AFI (BGP-LS)
779                 "\x00"  # (reserved)
780                 "\x47"  # SAFI (BGP-LS)
781             )
782             optional_parameters_hex += optional_parameter_hex
783
784         optional_parameter_hex = (
785             "\x02"  # Param type ("Capability Ad")
786             "\x06"  # Length (6 bytes)
787             "\x41"  # "32 bit AS Numbers Support"
788                     # (see RFC 6793, section 3)
789             "\x04"  # Capability value length
790         )
791         optional_parameter_hex += (
792             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
793         )
794         optional_parameters_hex += optional_parameter_hex
795
796         # Optional Parameters Length
797         optional_parameters_length = len(optional_parameters_hex)
798         optional_parameters_length_hex = struct.pack("B",
799                                                      optional_parameters_length)
800
801         # Length (big-endian)
802         length = (
803             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
804             len(my_autonomous_system_hex_2_bytes) +
805             len(hold_time_hex) + len(bgp_identifier_hex) +
806             len(optional_parameters_length_hex) +
807             len(optional_parameters_hex)
808         )
809         length_hex = struct.pack(">H", length)
810
811         # OPEN Message
812         message_hex = (
813             marker_hex +
814             length_hex +
815             type_hex +
816             version_hex +
817             my_autonomous_system_hex_2_bytes +
818             hold_time_hex +
819             bgp_identifier_hex +
820             optional_parameters_length_hex +
821             optional_parameters_hex
822         )
823
824         if self.log_debug:
825             logger.debug("OPEN message encoding")
826             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
827             logger.debug("  Length=" + str(length) + " (0x" +
828                          binascii.hexlify(length_hex) + ")")
829             logger.debug("  Type=" + str(type) + " (0x" +
830                          binascii.hexlify(type_hex) + ")")
831             logger.debug("  Version=" + str(version) + " (0x" +
832                          binascii.hexlify(version_hex) + ")")
833             logger.debug("  My Autonomous System=" +
834                          str(my_autonomous_system_2_bytes) + " (0x" +
835                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
836                          ")")
837             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
838                          binascii.hexlify(hold_time_hex) + ")")
839             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
840                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
841             logger.debug("  Optional Parameters Length=" +
842                          str(optional_parameters_length) + " (0x" +
843                          binascii.hexlify(optional_parameters_length_hex) +
844                          ")")
845             logger.debug("  Optional Parameters=0x" +
846                          binascii.hexlify(optional_parameters_hex))
847             logger.debug("OPEN message encoded: 0x%s",
848                          binascii.b2a_hex(message_hex))
849
850         return message_hex
851
852     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
853                        wr_prefix_length=None, nlri_prefix_length=None,
854                        my_autonomous_system=None, next_hop=None,
855                        originator_id=None, cluster_list_item=None,
856                        end_of_rib=False, **ls_nlri_params):
857         """Generates an UPDATE Message (rfc4271#section-4.3)
858
859         Arguments:
860             :param wr_prefixes: see the rfc4271#section-4.3
861             :param nlri_prefixes: see the rfc4271#section-4.3
862             :param wr_prefix_length: see the rfc4271#section-4.3
863             :param nlri_prefix_length: see the rfc4271#section-4.3
864             :param my_autonomous_system: see the rfc4271#section-4.3
865             :param next_hop: see the rfc4271#section-4.3
866         Returns:
867             :return: encoded UPDATE message in HEX
868         """
869
870         # default values handling
871         # TODO optimize default values handling (use e.g. dicionary.update() approach)
872         if wr_prefixes is None:
873             wr_prefixes = self.wr_prefixes_default
874         if nlri_prefixes is None:
875             nlri_prefixes = self.nlri_prefixes_default
876         if wr_prefix_length is None:
877             wr_prefix_length = self.prefix_length_default
878         if nlri_prefix_length is None:
879             nlri_prefix_length = self.prefix_length_default
880         if my_autonomous_system is None:
881             my_autonomous_system = self.my_autonomous_system_default
882         if next_hop is None:
883             next_hop = self.next_hop_default
884         if originator_id is None:
885             originator_id = self.originator_id_default
886         if cluster_list_item is None:
887             cluster_list_item = self.cluster_list_item_default
888         ls_nlri = self.ls_nlri_default.copy()
889         ls_nlri.update(ls_nlri_params)
890
891         # Marker
892         marker_hex = "\xFF" * 16
893
894         # Type
895         type = 2
896         type_hex = struct.pack("B", type)
897
898         # Withdrawn Routes
899         withdrawn_routes_hex = ""
900         if not self.bgpls:
901             bytes = ((wr_prefix_length - 1) / 8) + 1
902             for prefix in wr_prefixes:
903                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
904                                        struct.pack(">I", int(prefix))[:bytes])
905                 withdrawn_routes_hex += withdrawn_route_hex
906
907         # Withdrawn Routes Length
908         withdrawn_routes_length = len(withdrawn_routes_hex)
909         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
910
911         # TODO: to replace hardcoded string by encoding?
912         # Path Attributes
913         path_attributes_hex = ""
914         if nlri_prefixes != []:
915             path_attributes_hex += (
916                 "\x40"  # Flags ("Well-Known")
917                 "\x01"  # Type (ORIGIN)
918                 "\x01"  # Length (1)
919                 "\x00"  # Origin: IGP
920             )
921             path_attributes_hex += (
922                 "\x40"  # Flags ("Well-Known")
923                 "\x02"  # Type (AS_PATH)
924                 "\x06"  # Length (6)
925                 "\x02"  # AS segment type (AS_SEQUENCE)
926                 "\x01"  # AS segment length (1)
927             )
928             my_as_hex = struct.pack(">I", my_autonomous_system)
929             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
930             path_attributes_hex += (
931                 "\x40"  # Flags ("Well-Known")
932                 "\x03"  # Type (NEXT_HOP)
933                 "\x04"  # Length (4)
934             )
935             next_hop_hex = struct.pack(">I", int(next_hop))
936             path_attributes_hex += (
937                 next_hop_hex  # IP address of the next hop (4 bytes)
938             )
939             path_attributes_hex += (
940                 "\x40"  # Flags ("Well-Known")
941                 "\x05"  # Type (LOCAL_PREF)
942                 "\x04"  # Length (4)
943                 "\x00\x00\x00\x64"  # (100)
944             )
945             if originator_id is not None:
946                 path_attributes_hex += (
947                     "\x80"  # Flags ("Optional, non-transitive")
948                     "\x09"  # Type (ORIGINATOR_ID)
949                     "\x04"  # Length (4)
950                 )           # ORIGINATOR_ID (4 bytes)
951                 path_attributes_hex += struct.pack(">I", int(originator_id))
952             if cluster_list_item is not None:
953                 path_attributes_hex += (
954                     "\x80"  # Flags ("Optional, non-transitive")
955                     "\x09"  # Type (CLUSTER_LIST)
956                     "\x04"  # Length (4)
957                 )           # one CLUSTER_LIST item (4 bytes)
958                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
959
960         if self.bgpls and not end_of_rib:
961             path_attributes_hex += (
962                 "\x80"  # Flags ("Optional, non-transitive")
963                 "\x0e"  # Type (MP_REACH_NLRI)
964                 "\x22"  # Length (34)
965                 "\x40\x04"  # AFI (BGP-LS)
966                 "\x47"  # SAFI (BGP-LS)
967                 "\x04"  # Next Hop Length (4)
968             )
969             path_attributes_hex += struct.pack(">I", int(next_hop))
970             path_attributes_hex += "\x00"           # Reserved
971             path_attributes_hex += (
972                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
973                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
974                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
975             )
976             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
977             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
978             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
979             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
980             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
981
982         # Total Path Attributes Length
983         total_path_attributes_length = len(path_attributes_hex)
984         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
985
986         # Network Layer Reachability Information
987         nlri_hex = ""
988         if not self.bgpls:
989             bytes = ((nlri_prefix_length - 1) / 8) + 1
990             for prefix in nlri_prefixes:
991                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
992                                    struct.pack(">I", int(prefix))[:bytes])
993                 nlri_hex += nlri_prefix_hex
994
995         # Length (big-endian)
996         length = (
997             len(marker_hex) + 2 + len(type_hex) +
998             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
999             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1000             len(nlri_hex))
1001         length_hex = struct.pack(">H", length)
1002
1003         # UPDATE Message
1004         message_hex = (
1005             marker_hex +
1006             length_hex +
1007             type_hex +
1008             withdrawn_routes_length_hex +
1009             withdrawn_routes_hex +
1010             total_path_attributes_length_hex +
1011             path_attributes_hex +
1012             nlri_hex
1013         )
1014
1015         if self.log_debug:
1016             logger.debug("UPDATE message encoding")
1017             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1018             logger.debug("  Length=" + str(length) + " (0x" +
1019                          binascii.hexlify(length_hex) + ")")
1020             logger.debug("  Type=" + str(type) + " (0x" +
1021                          binascii.hexlify(type_hex) + ")")
1022             logger.debug("  withdrawn_routes_length=" +
1023                          str(withdrawn_routes_length) + " (0x" +
1024                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1025             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1026                          str(wr_prefix_length) + " (0x" +
1027                          binascii.hexlify(withdrawn_routes_hex) + ")")
1028             if total_path_attributes_length:
1029                 logger.debug("  Total Path Attributes Length=" +
1030                              str(total_path_attributes_length) + " (0x" +
1031                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1032                 logger.debug("  Path Attributes=" + "(0x" +
1033                              binascii.hexlify(path_attributes_hex) + ")")
1034                 logger.debug("    Origin=IGP")
1035                 logger.debug("    AS path=" + str(my_autonomous_system))
1036                 logger.debug("    Next hop=" + str(next_hop))
1037                 if originator_id is not None:
1038                     logger.debug("    Originator id=" + str(originator_id))
1039                 if cluster_list_item is not None:
1040                     logger.debug("    Cluster list=" + str(cluster_list_item))
1041                 if self.bgpls:
1042                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1043             logger.debug("  Network Layer Reachability Information=" +
1044                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1045                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1046             logger.debug("UPDATE message encoded: 0x" +
1047                          binascii.b2a_hex(message_hex))
1048
1049         # updating counter
1050         self.updates_sent += 1
1051         # returning encoded message
1052         return message_hex
1053
1054     def notification_message(self, error_code, error_subcode, data_hex=""):
1055         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1056
1057         Arguments:
1058             :param error_code: see the rfc4271#section-4.5
1059             :param error_subcode: see the rfc4271#section-4.5
1060             :param data_hex: see the rfc4271#section-4.5
1061         Returns:
1062             :return: encoded NOTIFICATION message in HEX
1063         """
1064
1065         # Marker
1066         marker_hex = "\xFF" * 16
1067
1068         # Type
1069         type = 3
1070         type_hex = struct.pack("B", type)
1071
1072         # Error Code
1073         error_code_hex = struct.pack("B", error_code)
1074
1075         # Error Subode
1076         error_subcode_hex = struct.pack("B", error_subcode)
1077
1078         # Length (big-endian)
1079         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1080                   len(error_subcode_hex) + len(data_hex))
1081         length_hex = struct.pack(">H", length)
1082
1083         # NOTIFICATION Message
1084         message_hex = (
1085             marker_hex +
1086             length_hex +
1087             type_hex +
1088             error_code_hex +
1089             error_subcode_hex +
1090             data_hex
1091         )
1092
1093         if self.log_debug:
1094             logger.debug("NOTIFICATION message encoding")
1095             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1096             logger.debug("  Length=" + str(length) + " (0x" +
1097                          binascii.hexlify(length_hex) + ")")
1098             logger.debug("  Type=" + str(type) + " (0x" +
1099                          binascii.hexlify(type_hex) + ")")
1100             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1101                          binascii.hexlify(error_code_hex) + ")")
1102             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1103                          binascii.hexlify(error_subcode_hex) + ")")
1104             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1105             logger.debug("NOTIFICATION message encoded: 0x%s",
1106                          binascii.b2a_hex(message_hex))
1107
1108         return message_hex
1109
1110     def keepalive_message(self):
1111         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1112
1113         Returns:
1114             :return: encoded KEEP ALIVE message in HEX
1115         """
1116
1117         # Marker
1118         marker_hex = "\xFF" * 16
1119
1120         # Type
1121         type = 4
1122         type_hex = struct.pack("B", type)
1123
1124         # Length (big-endian)
1125         length = len(marker_hex) + 2 + len(type_hex)
1126         length_hex = struct.pack(">H", length)
1127
1128         # KEEP ALIVE Message
1129         message_hex = (
1130             marker_hex +
1131             length_hex +
1132             type_hex
1133         )
1134
1135         if self.log_debug:
1136             logger.debug("KEEP ALIVE message encoding")
1137             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1138             logger.debug("  Length=" + str(length) + " (0x" +
1139                          binascii.hexlify(length_hex) + ")")
1140             logger.debug("  Type=" + str(type) + " (0x" +
1141                          binascii.hexlify(type_hex) + ")")
1142             logger.debug("KEEP ALIVE message encoded: 0x%s",
1143                          binascii.b2a_hex(message_hex))
1144
1145         return message_hex
1146
1147
1148 class TimeTracker(object):
1149     """Class for tracking timers, both for my keepalives and
1150     peer's hold time.
1151     """
1152
1153     def __init__(self, msg_in):
1154         """Initialisation. based on defaults and OPEN message from peer.
1155
1156         Arguments:
1157             msg_in: the OPEN message received from peer.
1158         """
1159         # Note: Relative time is always named timedelta, to stress that
1160         # the (non-delta) time is absolute.
1161         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1162         # Upper bound for being stuck in the same state, we should
1163         # at least report something before continuing.
1164         # Negotiate the hold timer by taking the smaller
1165         # of the 2 values (mine and the peer's).
1166         hold_timedelta = 180  # Not an attribute of self yet.
1167         # TODO: Make the default value configurable,
1168         # default value could mirror what peer said.
1169         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1170         if hold_timedelta > peer_hold_timedelta:
1171             hold_timedelta = peer_hold_timedelta
1172         if hold_timedelta != 0 and hold_timedelta < 3:
1173             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1174             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1175         self.hold_timedelta = hold_timedelta
1176         # If we do not hear from peer this long, we assume it has died.
1177         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1178         # Upper limit for duration between messages, to avoid being
1179         # declared to be dead.
1180         # The same as calling snapshot(), but also declares a field.
1181         self.snapshot_time = time.time()
1182         # Sometimes we need to store time. This is where to get
1183         # the value from afterwards. Time_keepalive may be too strict.
1184         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1185         # At this time point, peer will be declared dead.
1186         self.my_keepalive_time = None  # to be set later
1187         # At this point, we should be sending keepalive message.
1188
1189     def snapshot(self):
1190         """Store current time in instance data to use later."""
1191         # Read as time before something interesting was called.
1192         self.snapshot_time = time.time()
1193
1194     def reset_peer_hold_time(self):
1195         """Move hold time to future as peer has just proven it still lives."""
1196         self.peer_hold_time = time.time() + self.hold_timedelta
1197
1198     # Some methods could rely on self.snapshot_time, but it is better
1199     # to require user to provide it explicitly.
1200     def reset_my_keepalive_time(self, keepalive_time):
1201         """Calculate and set the next my KEEP ALIVE timeout time
1202
1203         Arguments:
1204             :keepalive_time: the initial value of the KEEP ALIVE timer
1205         """
1206         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1207
1208     def is_time_for_my_keepalive(self):
1209         """Check for my KEEP ALIVE timeout occurence"""
1210         if self.hold_timedelta == 0:
1211             return False
1212         return self.snapshot_time >= self.my_keepalive_time
1213
1214     def get_next_event_time(self):
1215         """Set the time of the next expected or to be sent KEEP ALIVE"""
1216         if self.hold_timedelta == 0:
1217             return self.snapshot_time + 86400
1218         return min(self.my_keepalive_time, self.peer_hold_time)
1219
1220     def check_peer_hold_time(self, snapshot_time):
1221         """Raise error if nothing was read from peer until specified time."""
1222         # Hold time = 0 means keepalive checking off.
1223         if self.hold_timedelta != 0:
1224             # time.time() may be too strict
1225             if snapshot_time > self.peer_hold_time:
1226                 logger.error("Peer has overstepped the hold timer.")
1227                 raise RuntimeError("Peer has overstepped the hold timer.")
1228                 # TODO: Include hold_timedelta?
1229                 # TODO: Add notification sending (attempt). That means
1230                 # move to write tracker.
1231
1232
1233 class ReadTracker(object):
1234     """Class for tracking read of mesages chunk by chunk and
1235     for idle waiting.
1236     """
1237
1238     def __init__(self, bgp_socket, timer):
1239         """The reader initialisation.
1240
1241         Arguments:
1242             bgp_socket: socket to be used for sending
1243             timer: timer to be used for scheduling
1244         """
1245         # References to outside objects.
1246         self.socket = bgp_socket
1247         self.timer = timer
1248         # BGP marker length plus length field length.
1249         self.header_length = 18
1250         # TODO: make it class (constant) attribute
1251         # Computation of where next chunk ends depends on whether
1252         # we are beyond length field.
1253         self.reading_header = True
1254         # Countdown towards next size computation.
1255         self.bytes_to_read = self.header_length
1256         # Incremental buffer for message under read.
1257         self.msg_in = ""
1258         # Initialising counters
1259         self.updates_received = 0
1260         self.prefixes_introduced = 0
1261         self.prefixes_withdrawn = 0
1262         self.rx_idle_time = 0
1263         self.rx_activity_detected = True
1264
1265     def read_message_chunk(self):
1266         """Read up to one message
1267
1268         Note:
1269             Currently it does not return anything.
1270         """
1271         # TODO: We could return the whole message, currently not needed.
1272         # We assume the socket is readable.
1273         chunk_message = self.socket.recv(self.bytes_to_read)
1274         self.msg_in += chunk_message
1275         self.bytes_to_read -= len(chunk_message)
1276         # TODO: bytes_to_read < 0 is not possible, right?
1277         if not self.bytes_to_read:
1278             # Finished reading a logical block.
1279             if self.reading_header:
1280                 # The logical block was a BGP header.
1281                 # Now we know the size of the message.
1282                 self.reading_header = False
1283                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1284                                       self.header_length)
1285             else:  # We have finished reading the body of the message.
1286                 # Peer has just proven it is still alive.
1287                 self.timer.reset_peer_hold_time()
1288                 # TODO: Do we want to count received messages?
1289                 # This version ignores the received message.
1290                 # TODO: Should we do validation and exit on anything
1291                 # besides update or keepalive?
1292                 # Prepare state for reading another message.
1293                 message_type_hex = self.msg_in[self.header_length]
1294                 if message_type_hex == "\x01":
1295                     logger.info("OPEN message received: 0x%s",
1296                                 binascii.b2a_hex(self.msg_in))
1297                 elif message_type_hex == "\x02":
1298                     logger.debug("UPDATE message received: 0x%s",
1299                                  binascii.b2a_hex(self.msg_in))
1300                     self.decode_update_message(self.msg_in)
1301                 elif message_type_hex == "\x03":
1302                     logger.info("NOTIFICATION message received: 0x%s",
1303                                 binascii.b2a_hex(self.msg_in))
1304                 elif message_type_hex == "\x04":
1305                     logger.info("KEEP ALIVE message received: 0x%s",
1306                                 binascii.b2a_hex(self.msg_in))
1307                 else:
1308                     logger.warning("Unexpected message received: 0x%s",
1309                                    binascii.b2a_hex(self.msg_in))
1310                 self.msg_in = ""
1311                 self.reading_header = True
1312                 self.bytes_to_read = self.header_length
1313         # We should not act upon peer_hold_time if we are reading
1314         # something right now.
1315         return
1316
1317     def decode_path_attributes(self, path_attributes_hex):
1318         """Decode the Path Attributes field (rfc4271#section-4.3)
1319
1320         Arguments:
1321             :path_attributes: path_attributes field to be decoded in hex
1322         Returns:
1323             :return: None
1324         """
1325         hex_to_decode = path_attributes_hex
1326
1327         while len(hex_to_decode):
1328             attr_flags_hex = hex_to_decode[0]
1329             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1330 #            attr_optional_bit = attr_flags & 128
1331 #            attr_transitive_bit = attr_flags & 64
1332 #            attr_partial_bit = attr_flags & 32
1333             attr_extended_length_bit = attr_flags & 16
1334
1335             attr_type_code_hex = hex_to_decode[1]
1336             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1337
1338             if attr_extended_length_bit:
1339                 attr_length_hex = hex_to_decode[2:4]
1340                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1341                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1342                 hex_to_decode = hex_to_decode[4 + attr_length:]
1343             else:
1344                 attr_length_hex = hex_to_decode[2]
1345                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1346                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1347                 hex_to_decode = hex_to_decode[3 + attr_length:]
1348
1349             if attr_type_code == 1:
1350                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1351                              binascii.b2a_hex(attr_flags_hex))
1352                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1353             elif attr_type_code == 2:
1354                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1355                              binascii.b2a_hex(attr_flags_hex))
1356                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1357             elif attr_type_code == 3:
1358                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1359                              binascii.b2a_hex(attr_flags_hex))
1360                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1361             elif attr_type_code == 4:
1362                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1363                              binascii.b2a_hex(attr_flags_hex))
1364                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1365             elif attr_type_code == 5:
1366                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1367                              binascii.b2a_hex(attr_flags_hex))
1368                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1369             elif attr_type_code == 6:
1370                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1371                              binascii.b2a_hex(attr_flags_hex))
1372                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1373             elif attr_type_code == 7:
1374                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1375                              binascii.b2a_hex(attr_flags_hex))
1376                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1377             elif attr_type_code == 9:  # rfc4456#section-8
1378                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1379                              binascii.b2a_hex(attr_flags_hex))
1380                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1381             elif attr_type_code == 10:  # rfc4456#section-8
1382                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1383                              binascii.b2a_hex(attr_flags_hex))
1384                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1385             elif attr_type_code == 14:  # rfc4760#section-3
1386                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1387                              binascii.b2a_hex(attr_flags_hex))
1388                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1389                 address_family_identifier_hex = attr_value_hex[0:2]
1390                 logger.debug("  Address Family Identifier=0x%s",
1391                              binascii.b2a_hex(address_family_identifier_hex))
1392                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1393                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1394                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1395                 next_hop_netaddr_len_hex = attr_value_hex[3]
1396                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1397                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1398                              next_hop_netaddr_len,
1399                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1400                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1401                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1402                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1403                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1404                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1405                 logger.debug("  Reserved=0x%s",
1406                              binascii.b2a_hex(reserved_hex))
1407                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1408                 logger.debug("  Network Layer Reachability Information=0x%s",
1409                              binascii.b2a_hex(nlri_hex))
1410                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1411                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1412                 for prefix in nlri_prefix_list:
1413                     logger.debug("  nlri_prefix_received: %s", prefix)
1414                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1415             elif attr_type_code == 15:  # rfc4760#section-4
1416                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1417                              binascii.b2a_hex(attr_flags_hex))
1418                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1419                 address_family_identifier_hex = attr_value_hex[0:2]
1420                 logger.debug("  Address Family Identifier=0x%s",
1421                              binascii.b2a_hex(address_family_identifier_hex))
1422                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1423                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1424                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1425                 wd_hex = attr_value_hex[3:]
1426                 logger.debug("  Withdrawn Routes=0x%s",
1427                              binascii.b2a_hex(wd_hex))
1428                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1429                 logger.debug("  Withdrawn routes prefix list: %s",
1430                              wdr_prefix_list)
1431                 for prefix in wdr_prefix_list:
1432                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1433                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1434             else:
1435                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1436                              binascii.b2a_hex(attr_flags_hex))
1437                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1438         return None
1439
1440     def decode_update_message(self, msg):
1441         """Decode an UPDATE message (rfc4271#section-4.3)
1442
1443         Arguments:
1444             :msg: message to be decoded in hex
1445         Returns:
1446             :return: None
1447         """
1448         logger.debug("Decoding update message:")
1449         # message header - marker
1450         marker_hex = msg[:16]
1451         logger.debug("Message header marker: 0x%s",
1452                      binascii.b2a_hex(marker_hex))
1453         # message header - message length
1454         msg_length_hex = msg[16:18]
1455         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1456         logger.debug("Message lenght: 0x%s (%s)",
1457                      binascii.b2a_hex(msg_length_hex), msg_length)
1458         # message header - message type
1459         msg_type_hex = msg[18:19]
1460         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1461         if msg_type == 2:
1462             logger.debug("Message type: 0x%s (update)",
1463                          binascii.b2a_hex(msg_type_hex))
1464             # withdrawn routes length
1465             wdr_length_hex = msg[19:21]
1466             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1467             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1468                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1469             # withdrawn routes
1470             wdr_hex = msg[21:21 + wdr_length]
1471             logger.debug("Withdrawn routes: 0x%s",
1472                          binascii.b2a_hex(wdr_hex))
1473             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1474             logger.debug("Withdrawn routes prefix list: %s",
1475                          wdr_prefix_list)
1476             for prefix in wdr_prefix_list:
1477                 logger.debug("withdrawn_prefix_received: %s", prefix)
1478             # total path attribute length
1479             total_pa_length_offset = 21 + wdr_length
1480             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1481             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1482             logger.debug("Total path attribute lenght: 0x%s (%s)",
1483                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1484             # path attributes
1485             pa_offset = total_pa_length_offset + 2
1486             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1487             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1488             self.decode_path_attributes(pa_hex)
1489             # network layer reachability information length
1490             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1491             logger.debug("Calculated NLRI length: %s", nlri_length)
1492             # network layer reachability information
1493             nlri_offset = pa_offset + total_pa_length
1494             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1495             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1496             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1497             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1498             for prefix in nlri_prefix_list:
1499                 logger.debug("nlri_prefix_received: %s", prefix)
1500             # Updating counters
1501             self.updates_received += 1
1502             self.prefixes_introduced += len(nlri_prefix_list)
1503             self.prefixes_withdrawn += len(wdr_prefix_list)
1504         else:
1505             logger.error("Unexpeced message type 0x%s in 0x%s",
1506                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1507
1508     def wait_for_read(self):
1509         """Read message until timeout (next expected event).
1510
1511         Note:
1512             Used when no more updates has to be sent to avoid busy-wait.
1513             Currently it does not return anything.
1514         """
1515         # Compute time to the first predictable state change
1516         event_time = self.timer.get_next_event_time()
1517         # snapshot_time would be imprecise
1518         wait_timedelta = min(event_time - time.time(), 10)
1519         if wait_timedelta < 0:
1520             # The program got around to waiting to an event in "very near
1521             # future" so late that it became a "past" event, thus tell
1522             # "select" to not wait at all. Passing negative timedelta to
1523             # select() would lead to either waiting forever (for -1) or
1524             # select.error("Invalid parameter") (for everything else).
1525             wait_timedelta = 0
1526         # And wait for event or something to read.
1527
1528         if not self.rx_activity_detected or not (self.updates_received % 100):
1529             # right time to write statistics to the log (not for every update and
1530             # not too frequently to avoid having large log files)
1531             logger.info("total_received_update_message_counter: %s",
1532                         self.updates_received)
1533             logger.info("total_received_nlri_prefix_counter: %s",
1534                         self.prefixes_introduced)
1535             logger.info("total_received_withdrawn_prefix_counter: %s",
1536                         self.prefixes_withdrawn)
1537
1538         start_time = time.time()
1539         select.select([self.socket], [], [self.socket], wait_timedelta)
1540         timedelta = time.time() - start_time
1541         self.rx_idle_time += timedelta
1542         self.rx_activity_detected = timedelta < 1
1543
1544         if not self.rx_activity_detected or not (self.updates_received % 100):
1545             # right time to write statistics to the log (not for every update and
1546             # not too frequently to avoid having large log files)
1547             logger.info("... idle for %.3fs", timedelta)
1548             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1549         return
1550
1551
1552 class WriteTracker(object):
1553     """Class tracking enqueueing messages and sending chunks of them."""
1554
1555     def __init__(self, bgp_socket, generator, timer):
1556         """The writter initialisation.
1557
1558         Arguments:
1559             bgp_socket: socket to be used for sending
1560             generator: generator to be used for message generation
1561             timer: timer to be used for scheduling
1562         """
1563         # References to outside objects,
1564         self.socket = bgp_socket
1565         self.generator = generator
1566         self.timer = timer
1567         # Really new fields.
1568         # TODO: Would attribute docstrings add anything substantial?
1569         self.sending_message = False
1570         self.bytes_to_send = 0
1571         self.msg_out = ""
1572
1573     def enqueue_message_for_sending(self, message):
1574         """Enqueue message and change state.
1575
1576         Arguments:
1577             message: message to be enqueued into the msg_out buffer
1578         """
1579         self.msg_out += message
1580         self.bytes_to_send += len(message)
1581         self.sending_message = True
1582
1583     def send_message_chunk_is_whole(self):
1584         """Send enqueued data from msg_out buffer
1585
1586         Returns:
1587             :return: true if no remaining data to send
1588         """
1589         # We assume there is a msg_out to send and socket is writable.
1590         # print "going to send", repr(self.msg_out)
1591         self.timer.snapshot()
1592         bytes_sent = self.socket.send(self.msg_out)
1593         # Forget the part of message that was sent.
1594         self.msg_out = self.msg_out[bytes_sent:]
1595         self.bytes_to_send -= bytes_sent
1596         if not self.bytes_to_send:
1597             # TODO: Is it possible to hit negative bytes_to_send?
1598             self.sending_message = False
1599             # We should have reset hold timer on peer side.
1600             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1601             # The possible reason for not prioritizing reads is gone.
1602             return True
1603         return False
1604
1605
1606 class StateTracker(object):
1607     """Main loop has state so complex it warrants this separate class."""
1608
1609     def __init__(self, bgp_socket, generator, timer):
1610         """The state tracker initialisation.
1611
1612         Arguments:
1613             bgp_socket: socket to be used for sending / receiving
1614             generator: generator to be used for message generation
1615             timer: timer to be used for scheduling
1616         """
1617         # References to outside objects.
1618         self.socket = bgp_socket
1619         self.generator = generator
1620         self.timer = timer
1621         # Sub-trackers.
1622         self.reader = ReadTracker(bgp_socket, timer)
1623         self.writer = WriteTracker(bgp_socket, generator, timer)
1624         # Prioritization state.
1625         self.prioritize_writing = False
1626         # In general, we prioritize reading over writing. But in order
1627         # not to get blocked by neverending reads, we should
1628         # check whether we are not risking running out of holdtime.
1629         # So in some situations, this field is set to True to attempt
1630         # finishing sending a message, after which this field resets
1631         # back to False.
1632         # TODO: Alternative is to switch fairly between reading and
1633         # writing (called round robin from now on).
1634         # Message counting is done in generator.
1635
1636     def perform_one_loop_iteration(self):
1637         """ The main loop iteration
1638
1639         Notes:
1640             Calculates priority, resolves all conditions, calls
1641             appropriate method and returns to caller to repeat.
1642         """
1643         self.timer.snapshot()
1644         if not self.prioritize_writing:
1645             if self.timer.is_time_for_my_keepalive():
1646                 if not self.writer.sending_message:
1647                     # We need to schedule a keepalive ASAP.
1648                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1649                     logger.info("KEEP ALIVE is sent.")
1650                 # We are sending a message now, so let's prioritize it.
1651                 self.prioritize_writing = True
1652         # Now we know what our priorities are, we have to check
1653         # which actions are available.
1654         # socket.socket() returns three lists,
1655         # we store them to list of lists.
1656         list_list = select.select([self.socket], [self.socket], [self.socket],
1657                                   self.timer.report_timedelta)
1658         read_list, write_list, except_list = list_list
1659         # Lists are unpacked, each is either [] or [self.socket],
1660         # so we will test them as boolean.
1661         if except_list:
1662             logger.error("Exceptional state on the socket.")
1663             raise RuntimeError("Exceptional state on socket", self.socket)
1664         # We will do either read or write.
1665         if not (self.prioritize_writing and write_list):
1666             # Either we have no reason to rush writes,
1667             # or the socket is not writable.
1668             # We are focusing on reading here.
1669             if read_list:  # there is something to read indeed
1670                 # In this case we want to read chunk of message
1671                 # and repeat the select,
1672                 self.reader.read_message_chunk()
1673                 return
1674             # We were focusing on reading, but nothing to read was there.
1675             # Good time to check peer for hold timer.
1676             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1677             # Quiet on the read front, we can have attempt to write.
1678         if write_list:
1679             # Either we really want to reset peer's view of our hold
1680             # timer, or there was nothing to read.
1681             # Were we in the middle of sending a message?
1682             if self.writer.sending_message:
1683                 # Was it the end of a message?
1684                 whole = self.writer.send_message_chunk_is_whole()
1685                 # We were pressed to send something and we did it.
1686                 if self.prioritize_writing and whole:
1687                     # We prioritize reading again.
1688                     self.prioritize_writing = False
1689                 return
1690             # Finally to check if still update messages to be generated.
1691             if self.generator.remaining_prefixes:
1692                 msg_out = self.generator.compose_update_message()
1693                 if not self.generator.remaining_prefixes:
1694                     # We have just finished update generation,
1695                     # end-of-rib is due.
1696                     logger.info("All update messages generated.")
1697                     logger.info("Storing performance results.")
1698                     self.generator.store_results()
1699                     logger.info("Finally an END-OF-RIB is sent.")
1700                     msg_out += self.generator.update_message(wr_prefixes=[],
1701                                                              nlri_prefixes=[],
1702                                                              end_of_rib=True)
1703                 self.writer.enqueue_message_for_sending(msg_out)
1704                 # Attempt for real sending to be done in next iteration.
1705                 return
1706             # Nothing to write anymore.
1707             # To avoid busy loop, we do idle waiting here.
1708             self.reader.wait_for_read()
1709             return
1710         # We can neither read nor write.
1711         logger.warning("Input and output both blocked for " +
1712                        str(self.timer.report_timedelta) + " seconds.")
1713         # FIXME: Are we sure select has been really waiting
1714         # the whole period?
1715         return
1716
1717
1718 def create_logger(loglevel, logfile):
1719     """Create logger object
1720
1721     Arguments:
1722         :loglevel: log level
1723         :logfile: log file name
1724     Returns:
1725         :return: logger object
1726     """
1727     logger = logging.getLogger("logger")
1728     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1729     console_handler = logging.StreamHandler()
1730     file_handler = logging.FileHandler(logfile, mode="w")
1731     console_handler.setFormatter(log_formatter)
1732     file_handler.setFormatter(log_formatter)
1733     logger.addHandler(console_handler)
1734     logger.addHandler(file_handler)
1735     logger.setLevel(loglevel)
1736     return logger
1737
1738
1739 def job(arguments):
1740     """One time initialisation and iterations looping.
1741     Notes:
1742         Establish BGP connection and run iterations.
1743
1744     Arguments:
1745         :arguments: Command line arguments
1746     Returns:
1747         :return: None
1748     """
1749     bgp_socket = establish_connection(arguments)
1750     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1751     # Receive open message before sending anything.
1752     # FIXME: Add parameter to send default open message first,
1753     # to work with "you first" peers.
1754     msg_in = read_open_message(bgp_socket)
1755     timer = TimeTracker(msg_in)
1756     generator = MessageGenerator(arguments)
1757     msg_out = generator.open_message()
1758     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1759     # Send our open message to the peer.
1760     bgp_socket.send(msg_out)
1761     # Wait for confirming keepalive.
1762     # TODO: Surely in just one packet?
1763     # Using exact keepalive length to not to see possible updates.
1764     msg_in = bgp_socket.recv(19)
1765     if msg_in != generator.keepalive_message():
1766         error_msg = "Open not confirmed by keepalive, instead got"
1767         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1768         raise MessageError(error_msg, msg_in)
1769     timer.reset_peer_hold_time()
1770     # Send the keepalive to indicate the connection is accepted.
1771     timer.snapshot()  # Remember this time.
1772     msg_out = generator.keepalive_message()
1773     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1774     bgp_socket.send(msg_out)
1775     # Use the remembered time.
1776     timer.reset_my_keepalive_time(timer.snapshot_time)
1777     # End of initial handshake phase.
1778     state = StateTracker(bgp_socket, generator, timer)
1779     while True:  # main reactor loop
1780         state.perform_one_loop_iteration()
1781
1782
1783 def threaded_job(arguments):
1784     """Run the job threaded
1785
1786     Arguments:
1787         :arguments: Command line arguments
1788     Returns:
1789         :return: None
1790     """
1791     amount_left = arguments.amount
1792     utils_left = arguments.multiplicity
1793     prefix_current = arguments.firstprefix
1794     myip_current = arguments.myip
1795     thread_args = []
1796
1797     while 1:
1798         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1799         amount_left -= amount_per_util
1800         utils_left -= 1
1801
1802         args = deepcopy(arguments)
1803         args.amount = amount_per_util
1804         args.firstprefix = prefix_current
1805         args.myip = myip_current
1806         thread_args.append(args)
1807
1808         if not utils_left:
1809             break
1810         prefix_current += amount_per_util * 16
1811         myip_current += 1
1812
1813     try:
1814         # Create threads
1815         for t in thread_args:
1816             thread.start_new_thread(job, (t,))
1817     except Exception:
1818         print "Error: unable to start thread."
1819         raise SystemExit(2)
1820
1821     # Work remains forever
1822     while 1:
1823         time.sleep(5)
1824
1825
1826 if __name__ == "__main__":
1827     arguments = parse_arguments()
1828     logger = create_logger(arguments.loglevel, arguments.logfile)
1829     threaded_job(arguments)