Allow deploying RC distributions
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Maximum number of IP prefixes to be announced in one iteration"
69     parser.add_argument("--insert", default="1", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72     str_help = "The number of prefixes to process without withdrawals"
73     parser.add_argument("--prefill", default="0", type=int, help=str_help)
74     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75     parser.add_argument("--updates", choices=["single", "separate"],
76                         default=["separate"], help=str_help)
77     str_help = "Base prefix IP address for prefix generation"
78     parser.add_argument("--firstprefix", default="8.0.1.0",
79                         type=ipaddr.IPv4Address, help=str_help)
80     str_help = "The prefix length."
81     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82     str_help = "Listen for connection, instead of initiating it."
83     parser.add_argument("--listen", action="store_true", help=str_help)
84     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85                 "Default value only suitable for listening.")
86     parser.add_argument("--myip", default="0.0.0.0",
87                         type=ipaddr.IPv4Address, help=str_help)
88     str_help = ("TCP port to bind to when listening or initiating connection." +
89                 "Default only suitable for initiating.")
90     parser.add_argument("--myport", default="0", type=int, help=str_help)
91     str_help = "The IP of the next hop to be placed into the update messages."
92     parser.add_argument("--nexthop", default="192.0.2.1",
93                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94     str_help = "Identifier of the route originator."
95     parser.add_argument("--originator", default=None,
96                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
97     str_help = "Cluster list item identifier."
98     parser.add_argument("--cluster", default=None,
99                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100     str_help = ("Numeric IP Address to try to connect to." +
101                 "Currently no effect in listening mode.")
102     parser.add_argument("--peerip", default="127.0.0.2",
103                         type=ipaddr.IPv4Address, help=str_help)
104     str_help = "TCP port to try to connect to. No effect in listening mode."
105     parser.add_argument("--peerport", default="179", type=int, help=str_help)
106     str_help = "Local hold time."
107     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108     str_help = "Log level (--error, --warning, --info, --debug)"
109     parser.add_argument("--error", dest="loglevel", action="store_const",
110                         const=logging.ERROR, default=logging.INFO,
111                         help=str_help)
112     parser.add_argument("--warning", dest="loglevel", action="store_const",
113                         const=logging.WARNING, default=logging.INFO,
114                         help=str_help)
115     parser.add_argument("--info", dest="loglevel", action="store_const",
116                         const=logging.INFO, default=logging.INFO,
117                         help=str_help)
118     parser.add_argument("--debug", dest="loglevel", action="store_const",
119                         const=logging.DEBUG, default=logging.INFO,
120                         help=str_help)
121     str_help = "Log file name"
122     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123     str_help = "Trailing part of the csv result files for plotting purposes"
124     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125     str_help = "Minimum number of updates to reach to include result into csv."
126     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129     str_help = "Link-State NLRI supported"
130     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131     str_help = "Link-State NLRI: Identifier"
132     parser.add_argument("-lsid", default="1", type=int, help=str_help)
133     str_help = "Link-State NLRI: Tunnel ID"
134     parser.add_argument("-lstid", default="1", type=int, help=str_help)
135     str_help = "Link-State NLRI: LSP ID"
136     parser.add_argument("-lspid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138     parser.add_argument("--lstsaddr", default="1.2.3.4",
139                         type=ipaddr.IPv4Address, help=str_help)
140     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141     parser.add_argument("--lsteaddr", default="5.6.7.8",
142                         type=ipaddr.IPv4Address, help=str_help)
143     str_help = "Link-State NLRI: Identifier Step"
144     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145     str_help = "Link-State NLRI: Tunnel ID Step"
146     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147     str_help = "Link-State NLRI: LSP ID Step"
148     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153     str_help = "How many play utilities are to be started."
154     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
160     arguments = parser.parse_args()
161     if arguments.multiplicity < 1:
162         print "Multiplicity", arguments.multiplicity, "is not positive."
163         raise SystemExit(1)
164     # TODO: Are sanity checks (such as asnumber>=0) required?
165     return arguments
166
167
168 def establish_connection(arguments):
169     """Establish connection to BGP peer.
170
171     Arguments:
172         :arguments: following command-line argumets are used
173             - arguments.myip: local IP address
174             - arguments.myport: local port
175             - arguments.peerip: remote IP address
176             - arguments.peerport: remote port
177     Returns:
178         :return: socket.
179     """
180     if arguments.listen:
181         logger.info("Connecting in the listening mode.")
182         logger.debug("Local IP address: " + str(arguments.myip))
183         logger.debug("Local port: " + str(arguments.myport))
184         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
185         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
186         # bind need single tuple as argument
187         listening_socket.bind((str(arguments.myip), arguments.myport))
188         listening_socket.listen(1)
189         bgp_socket, _ = listening_socket.accept()
190         # TODO: Verify client IP is cotroller IP.
191         listening_socket.close()
192     else:
193         logger.info("Connecting in the talking mode.")
194         logger.debug("Local IP address: " + str(arguments.myip))
195         logger.debug("Local port: " + str(arguments.myport))
196         logger.debug("Remote IP address: " + str(arguments.peerip))
197         logger.debug("Remote port: " + str(arguments.peerport))
198         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
199         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
200         # bind to force specified address and port
201         talking_socket.bind((str(arguments.myip), arguments.myport))
202         # socket does not spead ipaddr, hence str()
203         talking_socket.connect((str(arguments.peerip), arguments.peerport))
204         bgp_socket = talking_socket
205     logger.info("Connected to ODL.")
206     return bgp_socket
207
208
209 def get_short_int_from_message(message, offset=16):
210     """Extract 2-bytes number from provided message.
211
212     Arguments:
213         :message: given message
214         :offset: offset of the short_int inside the message
215     Returns:
216         :return: required short_inf value.
217     Notes:
218         default offset value is the BGP message size offset.
219     """
220     high_byte_int = ord(message[offset])
221     low_byte_int = ord(message[offset + 1])
222     short_int = high_byte_int * 256 + low_byte_int
223     return short_int
224
225
226 def get_prefix_list_from_hex(prefixes_hex):
227     """Get decoded list of prefixes (rfc4271#section-4.3)
228
229     Arguments:
230         :prefixes_hex: list of prefixes to be decoded in hex
231     Returns:
232         :return: list of prefixes in the form of ip address (X.X.X.X/X)
233     """
234     prefix_list = []
235     offset = 0
236     while offset < len(prefixes_hex):
237         prefix_bit_len_hex = prefixes_hex[offset]
238         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
239         prefix_len = ((prefix_bit_len - 1) / 8) + 1
240         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
241         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
242         offset += 1 + prefix_len
243         prefix_list.append(prefix + "/" + str(prefix_bit_len))
244     return prefix_list
245
246
247 class MessageError(ValueError):
248     """Value error with logging optimized for hexlified messages."""
249
250     def __init__(self, text, message, *args):
251         """Initialisation.
252
253         Store and call super init for textual comment,
254         store raw message which caused it.
255         """
256         self.text = text
257         self.msg = message
258         super(MessageError, self).__init__(text, message, *args)
259
260     def __str__(self):
261         """Generate human readable error message.
262
263         Returns:
264             :return: human readable message as string
265         Notes:
266             Use a placeholder string if the message is to be empty.
267         """
268         message = binascii.hexlify(self.msg)
269         if message == "":
270             message = "(empty message)"
271         return self.text + ": " + message
272
273
274 def read_open_message(bgp_socket):
275     """Receive peer's OPEN message
276
277     Arguments:
278         :bgp_socket: the socket to be read
279     Returns:
280         :return: received OPEN message.
281     Notes:
282         Performs just basic incomming message checks
283     """
284     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
285     # TODO: Can the incoming open message be split in more than one packet?
286     # Some validation.
287     if len(msg_in) < 37:
288         # 37 is minimal length of open message with 4-byte AS number.
289         error_msg = (
290             "Message length (" + str(len(msg_in)) + ") is smaller than "
291             "minimal length of OPEN message with 4-byte AS number (37)"
292         )
293         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
294         raise MessageError(error_msg, msg_in)
295     # TODO: We could check BGP marker, but it is defined only later;
296     # decide what to do.
297     reported_length = get_short_int_from_message(msg_in)
298     if len(msg_in) != reported_length:
299         error_msg = (
300             "Expected message length (" + reported_length +
301             ") does not match actual length (" + str(len(msg_in)) + ")"
302         )
303         logger.error(error_msg + binascii.hexlify(msg_in))
304         raise MessageError(error_msg, msg_in)
305     logger.info("Open message received.")
306     return msg_in
307
308
309 class MessageGenerator(object):
310     """Class which generates messages, holds states and configuration values."""
311
312     # TODO: Define bgp marker as a class (constant) variable.
313     def __init__(self, args):
314         """Initialisation according to command-line args.
315
316         Arguments:
317             :args: argsparser's Namespace object which contains command-line
318                 options for MesageGenerator initialisation
319         Notes:
320             Calculates and stores default values used later on for
321             message geeration.
322         """
323         self.total_prefix_amount = args.amount
324         # Number of update messages left to be sent.
325         self.remaining_prefixes = self.total_prefix_amount
326
327         # New parameters initialisation
328         self.iteration = 0
329         self.prefix_base_default = args.firstprefix
330         self.prefix_length_default = args.prefixlen
331         self.wr_prefixes_default = []
332         self.nlri_prefixes_default = []
333         self.version_default = 4
334         self.my_autonomous_system_default = args.asnumber
335         self.hold_time_default = args.holdtime  # Local hold time.
336         self.bgp_identifier_default = int(args.myip)
337         self.next_hop_default = args.nexthop
338         self.originator_id_default = args.originator
339         self.cluster_list_item_default = args.cluster
340         self.single_update_default = args.updates == "single"
341         self.randomize_updates_default = args.updates == "random"
342         self.prefix_count_to_add_default = args.insert
343         self.prefix_count_to_del_default = args.withdraw
344         if self.prefix_count_to_del_default < 0:
345             self.prefix_count_to_del_default = 0
346         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
347             # total number of prefixes must grow to avoid infinite test loop
348             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
349         self.slot_size_default = self.prefix_count_to_add_default
350         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
351         self.results_file_name_default = args.results
352         self.performance_threshold_default = args.threshold
353         self.rfc4760 = args.rfc4760
354         self.bgpls = args.bgpls
355         self.evpn = args.evpn
356         # Default values when BGP-LS Attributes are used
357         if self.bgpls:
358             self.prefix_count_to_add_default = 1
359             self.prefix_count_to_del_default = 0
360         self.ls_nlri_default = {"Identifier": args.lsid,
361                                 "TunnelID": args.lstid,
362                                 "LSPID": args.lspid,
363                                 "IPv4TunnelSenderAddress": args.lstsaddr,
364                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
365         self.lsid_step = args.lsidstep
366         self.lstid_step = args.lstidstep
367         self.lspid_step = args.lspidstep
368         self.lstsaddr_step = args.lstsaddrstep
369         self.lsteaddr_step = args.lsteaddrstep
370         # Default values used for randomized part
371         s1_slots = ((self.total_prefix_amount -
372                      self.remaining_prefixes_threshold - 1) /
373                     self.prefix_count_to_add_default + 1)
374         s2_slots = ((self.remaining_prefixes_threshold - 1) /
375                     (self.prefix_count_to_add_default -
376                     self.prefix_count_to_del_default) + 1)
377         # S1_First_Index = 0
378         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
379         s2_first_index = s1_slots * self.prefix_count_to_add_default
380         s2_last_index = (s2_first_index +
381                          s2_slots * (self.prefix_count_to_add_default -
382                                      self.prefix_count_to_del_default) - 1)
383         self.slot_gap_default = ((self.total_prefix_amount -
384                                   self.remaining_prefixes_threshold - 1) /
385                                  self.prefix_count_to_add_default + 1)
386         self.randomize_lowest_default = s2_first_index
387         self.randomize_highest_default = s2_last_index
388         # Initialising counters
389         self.phase1_start_time = 0
390         self.phase1_stop_time = 0
391         self.phase2_start_time = 0
392         self.phase2_stop_time = 0
393         self.phase1_updates_sent = 0
394         self.phase2_updates_sent = 0
395         self.updates_sent = 0
396
397         self.log_info = args.loglevel <= logging.INFO
398         self.log_debug = args.loglevel <= logging.DEBUG
399         """
400         Flags needed for the MessageGenerator performance optimization.
401         Calling logger methods each iteration even with proper log level set
402         slows down significantly the MessageGenerator performance.
403         Measured total generation time (1M updates, dry run, error log level):
404         - logging based on basic logger features: 36,2s
405         - logging based on advanced logger features (lazy logging): 21,2s
406         - conditional calling of logger methods enclosed inside condition: 8,6s
407         """
408
409         logger.info("Generator initialisation")
410         logger.info("  Target total number of prefixes to be introduced: " +
411                     str(self.total_prefix_amount))
412         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
413                     str(self.prefix_length_default))
414         logger.info("  My Autonomous System number: " +
415                     str(self.my_autonomous_system_default))
416         logger.info("  My Hold Time: " + str(self.hold_time_default))
417         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
418         logger.info("  Next Hop: " + str(self.next_hop_default))
419         logger.info("  Originator ID: " + str(self.originator_id_default))
420         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
421         logger.info("  Prefix count to be inserted at once: " +
422                     str(self.prefix_count_to_add_default))
423         logger.info("  Prefix count to be withdrawn at once: " +
424                     str(self.prefix_count_to_del_default))
425         logger.info("  Fast pre-fill up to " +
426                     str(self.total_prefix_amount -
427                         self.remaining_prefixes_threshold) + " prefixes")
428         logger.info("  Remaining number of prefixes to be processed " +
429                     "in parallel with withdrawals: " +
430                     str(self.remaining_prefixes_threshold))
431         logger.debug("  Prefix index range used after pre-fill procedure [" +
432                      str(self.randomize_lowest_default) + ", " +
433                      str(self.randomize_highest_default) + "]")
434         if self.single_update_default:
435             logger.info("  Common single UPDATE will be generated " +
436                         "for both NLRI & WITHDRAWN lists")
437         else:
438             logger.info("  Two separate UPDATEs will be generated " +
439                         "for each NLRI & WITHDRAWN lists")
440         if self.randomize_updates_default:
441             logger.info("  Generation of UPDATE messages will be randomized")
442         logger.info("  Let\'s go ...\n")
443
444         # TODO: Notification for hold timer expiration can be handy.
445
446     def store_results(self, file_name=None, threshold=None):
447         """ Stores specified results into files based on file_name value.
448
449         Arguments:
450             :param file_name: Trailing (common) part of result file names
451             :param threshold: Minimum number of sent updates needed for each
452                               result to be included into result csv file
453                               (mainly needed because of the result accuracy)
454         Returns:
455             :return: n/a
456         """
457         # default values handling
458         # TODO optimize default values handling (use e.g. dicionary.update() approach)
459         if file_name is None:
460             file_name = self.results_file_name_default
461         if threshold is None:
462             threshold = self.performance_threshold_default
463         # performance calculation
464         if self.phase1_updates_sent >= threshold:
465             totals1 = self.phase1_updates_sent
466             performance1 = int(self.phase1_updates_sent /
467                                (self.phase1_stop_time - self.phase1_start_time))
468         else:
469             totals1 = None
470             performance1 = None
471         if self.phase2_updates_sent >= threshold:
472             totals2 = self.phase2_updates_sent
473             performance2 = int(self.phase2_updates_sent /
474                                (self.phase2_stop_time - self.phase2_start_time))
475         else:
476             totals2 = None
477             performance2 = None
478
479         logger.info("#" * 10 + " Final results " + "#" * 10)
480         logger.info("Number of iterations: " + str(self.iteration))
481         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
482                     str(self.phase1_updates_sent))
483         logger.info("The pre-fill phase duration: " +
484                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
485         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
486                     str(self.phase2_updates_sent))
487         logger.info("The 2nd test phase duration: " +
488                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
489         logger.info("Threshold for performance reporting: " + str(threshold))
490
491         # making labels
492         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
493                         " route(s) per UPDATE")
494         if self.single_update_default:
495             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
496                                   "/-" + str(self.prefix_count_to_del_default) +
497                                   " routes per UPDATE")
498         else:
499             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
500                                   "/-" + str(self.prefix_count_to_del_default) +
501                                   " routes in two UPDATEs")
502         # collecting capacity and performance results
503         totals = {}
504         performance = {}
505         if totals1 is not None:
506             totals[phase1_label] = totals1
507             performance[phase1_label] = performance1
508         if totals2 is not None:
509             totals[phase2_label] = totals2
510             performance[phase2_label] = performance2
511         self.write_results_to_file(totals, "totals-" + file_name)
512         self.write_results_to_file(performance, "performance-" + file_name)
513
514     def write_results_to_file(self, results, file_name):
515         """Writes results to the csv plot file consumable by Jenkins.
516
517         Arguments:
518             :param file_name: Name of the (csv) file to be created
519         Returns:
520             :return: none
521         """
522         first_line = ""
523         second_line = ""
524         f = open(file_name, "wt")
525         try:
526             for key in sorted(results):
527                 first_line += key + ", "
528                 second_line += str(results[key]) + ", "
529             first_line = first_line[:-2]
530             second_line = second_line[:-2]
531             f.write(first_line + "\n")
532             f.write(second_line + "\n")
533             logger.info("Message generator performance results stored in " +
534                         file_name + ":")
535             logger.info("  " + first_line)
536             logger.info("  " + second_line)
537         finally:
538             f.close()
539
540     # Return pseudo-randomized (reproducible) index for selected range
541     def randomize_index(self, index, lowest=None, highest=None):
542         """Calculates pseudo-randomized index from selected range.
543
544         Arguments:
545             :param index: input index
546             :param lowest: the lowes index from the randomized area
547             :param highest: the highest index from the randomized area
548         Returns:
549             :return: the (pseudo)randomized index
550         Notes:
551             Created just as a fame for future generator enhancement.
552         """
553         # default values handling
554         # TODO optimize default values handling (use e.g. dicionary.update() approach)
555         if lowest is None:
556             lowest = self.randomize_lowest_default
557         if highest is None:
558             highest = self.randomize_highest_default
559         # randomize
560         if (index >= lowest) and (index <= highest):
561             # we are in the randomized range -> shuffle it inside
562             # the range (now just reverse the order)
563             new_index = highest - (index - lowest)
564         else:
565             # we are out of the randomized range -> nothing to do
566             new_index = index
567         return new_index
568
569     def get_ls_nlri_values(self, index):
570         """Generates LS-NLRI parameters.
571         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
572
573         Arguments:
574             :param index: index (iteration)
575         Returns:
576             :return: dictionary of LS NLRI parameters and values
577         """
578         # generating list of LS NLRI parameters
579         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
580         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
581         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
582         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
583         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
584         ls_nlri_values = {"Identifier": identifier,
585                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
586                           "TunnelID": tunnel_id, "LSPID": lsp_id,
587                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
588         return ls_nlri_values
589
590     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
591                         prefix_len=None, prefix_count=None, randomize=None):
592         """Generates list of IP address prefixes.
593
594         Arguments:
595             :param slot_index: index of group of prefix addresses
596             :param slot_size: size of group of prefix addresses
597                 in [number of included prefixes]
598             :param prefix_base: IP address of the first prefix
599                 (slot_index = 0, prefix_index = 0)
600             :param prefix_len: length of the prefix in bites
601                 (the same as size of netmask)
602             :param prefix_count: number of prefixes to be returned
603                 from the specified slot
604         Returns:
605             :return: list of generated IP address prefixes
606         """
607         # default values handling
608         # TODO optimize default values handling (use e.g. dicionary.update() approach)
609         if slot_size is None:
610             slot_size = self.slot_size_default
611         if prefix_base is None:
612             prefix_base = self.prefix_base_default
613         if prefix_len is None:
614             prefix_len = self.prefix_length_default
615         if prefix_count is None:
616             prefix_count = slot_size
617         if randomize is None:
618             randomize = self.randomize_updates_default
619         # generating list of prefixes
620         indexes = []
621         prefixes = []
622         prefix_gap = 2 ** (32 - prefix_len)
623         for i in range(prefix_count):
624             prefix_index = slot_index * slot_size + i
625             if randomize:
626                 prefix_index = self.randomize_index(prefix_index)
627             indexes.append(prefix_index)
628             prefixes.append(prefix_base + prefix_index * prefix_gap)
629         if self.log_debug:
630             logger.debug("  Prefix slot index: " + str(slot_index))
631             logger.debug("  Prefix slot size: " + str(slot_size))
632             logger.debug("  Prefix count: " + str(prefix_count))
633             logger.debug("  Prefix indexes: " + str(indexes))
634             logger.debug("  Prefix list: " + str(prefixes))
635         return prefixes
636
637     def compose_update_message(self, prefix_count_to_add=None,
638                                prefix_count_to_del=None):
639         """Composes an UPDATE message
640
641         Arguments:
642             :param prefix_count_to_add: # of prefixes to put into NLRI list
643             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
644         Returns:
645             :return: encoded UPDATE message in HEX
646         Notes:
647             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
648             lists or common message wich includes both prefix lists.
649             Updates global counters.
650         """
651         # default values handling
652         # TODO optimize default values handling (use e.g. dicionary.update() approach)
653         if prefix_count_to_add is None:
654             prefix_count_to_add = self.prefix_count_to_add_default
655         if prefix_count_to_del is None:
656             prefix_count_to_del = self.prefix_count_to_del_default
657         # logging
658         if self.log_info and not (self.iteration % 1000):
659             logger.info("Iteration: " + str(self.iteration) +
660                         " - total remaining prefixes: " +
661                         str(self.remaining_prefixes))
662         if self.log_debug:
663             logger.debug("#" * 10 + " Iteration: " +
664                          str(self.iteration) + " " + "#" * 10)
665             logger.debug("Remaining prefixes: " +
666                          str(self.remaining_prefixes))
667         # scenario type & one-shot counter
668         straightforward_scenario = (self.remaining_prefixes >
669                                     self.remaining_prefixes_threshold)
670         if straightforward_scenario:
671             prefix_count_to_del = 0
672             if self.log_debug:
673                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
674             if not self.phase1_start_time:
675                 self.phase1_start_time = time.time()
676         else:
677             if self.log_debug:
678                 logger.debug("--- COMBINED SCENARIO ---")
679             if not self.phase2_start_time:
680                 self.phase2_start_time = time.time()
681         # tailor the number of prefixes if needed
682         prefix_count_to_add = (prefix_count_to_del +
683                                min(prefix_count_to_add - prefix_count_to_del,
684                                    self.remaining_prefixes))
685         # prefix slots selection for insertion and withdrawal
686         slot_index_to_add = self.iteration
687         slot_index_to_del = slot_index_to_add - self.slot_gap_default
688         # getting lists of prefixes for insertion in this iteration
689         if self.log_debug:
690             logger.debug("Prefixes to be inserted in this iteration:")
691         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
692                                                   prefix_count=prefix_count_to_add)
693         # getting lists of prefixes for withdrawal in this iteration
694         if self.log_debug:
695             logger.debug("Prefixes to be withdrawn in this iteration:")
696         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
697                                                   prefix_count=prefix_count_to_del)
698         # generating the UPDATE mesage with LS-NLRI only
699         if self.bgpls:
700             ls_nlri = self.get_ls_nlri_values(self.iteration)
701             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
702                                           **ls_nlri)
703         else:
704             # generating the UPDATE message with prefix lists
705             if self.single_update_default:
706                 # Send prefixes to be introduced and withdrawn
707                 # in one UPDATE message
708                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
709                                               nlri_prefixes=prefix_list_to_add)
710             else:
711                 # Send prefixes to be introduced and withdrawn
712                 # in separate UPDATE messages (if needed)
713                 msg_out = self.update_message(wr_prefixes=[],
714                                               nlri_prefixes=prefix_list_to_add)
715                 if prefix_count_to_del:
716                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
717                                                    nlri_prefixes=[])
718         # updating counters - who knows ... maybe I am last time here ;)
719         if straightforward_scenario:
720             self.phase1_stop_time = time.time()
721             self.phase1_updates_sent = self.updates_sent
722         else:
723             self.phase2_stop_time = time.time()
724             self.phase2_updates_sent = (self.updates_sent -
725                                         self.phase1_updates_sent)
726         # updating totals for the next iteration
727         self.iteration += 1
728         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
729         # returning the encoded message
730         return msg_out
731
732     # Section of message encoders
733
734     def open_message(self, version=None, my_autonomous_system=None,
735                      hold_time=None, bgp_identifier=None):
736         """Generates an OPEN Message (rfc4271#section-4.2)
737
738         Arguments:
739             :param version: see the rfc4271#section-4.2
740             :param my_autonomous_system: see the rfc4271#section-4.2
741             :param hold_time: see the rfc4271#section-4.2
742             :param bgp_identifier: see the rfc4271#section-4.2
743         Returns:
744             :return: encoded OPEN message in HEX
745         """
746
747         # default values handling
748         # TODO optimize default values handling (use e.g. dicionary.update() approach)
749         if version is None:
750             version = self.version_default
751         if my_autonomous_system is None:
752             my_autonomous_system = self.my_autonomous_system_default
753         if hold_time is None:
754             hold_time = self.hold_time_default
755         if bgp_identifier is None:
756             bgp_identifier = self.bgp_identifier_default
757
758         # Marker
759         marker_hex = "\xFF" * 16
760
761         # Type
762         type = 1
763         type_hex = struct.pack("B", type)
764
765         # version
766         version_hex = struct.pack("B", version)
767
768         # my_autonomous_system
769         # AS_TRANS value, 23456 decadic.
770         my_autonomous_system_2_bytes = 23456
771         # AS number is mappable to 2 bytes
772         if my_autonomous_system < 65536:
773             my_autonomous_system_2_bytes = my_autonomous_system
774         my_autonomous_system_hex_2_bytes = struct.pack(">H",
775                                                        my_autonomous_system)
776
777         # Hold Time
778         hold_time_hex = struct.pack(">H", hold_time)
779
780         # BGP Identifier
781         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
782
783         # Optional Parameters
784         optional_parameters_hex = ""
785         if self.rfc4760:
786             optional_parameter_hex = (
787                 "\x02"  # Param type ("Capability Ad")
788                 "\x06"  # Length (6 bytes)
789                 "\x01"  # Capability type (NLRI Unicast),
790                         # see RFC 4760, secton 8
791                 "\x04"  # Capability value length
792                 "\x00\x01"  # AFI (Ipv4)
793                 "\x00"  # (reserved)
794                 "\x01"  # SAFI (Unicast)
795             )
796             optional_parameters_hex += optional_parameter_hex
797
798         if self.bgpls:
799             optional_parameter_hex = (
800                 "\x02"  # Param type ("Capability Ad")
801                 "\x06"  # Length (6 bytes)
802                 "\x01"  # Capability type (NLRI Unicast),
803                         # see RFC 4760, secton 8
804                 "\x04"  # Capability value length
805                 "\x40\x04"  # AFI (BGP-LS)
806                 "\x00"  # (reserved)
807                 "\x47"  # SAFI (BGP-LS)
808             )
809             optional_parameters_hex += optional_parameter_hex
810
811         if self.evpn:
812             optional_parameter_hex = (
813                 "\x02"  # Param type ("Capability Ad")
814                 "\x06"  # Length (6 bytes)
815                 "\x01"  # Multiprotocol extetension capability,
816                 "\x04"  # Capability value length
817                 "\x00\x19"  # AFI (L2-VPN)
818                 "\x00"  # (reserved)
819                 "\x46"  # SAFI (EVPN)
820             )
821             optional_parameters_hex += optional_parameter_hex
822
823         optional_parameter_hex = (
824             "\x02"  # Param type ("Capability Ad")
825             "\x06"  # Length (6 bytes)
826             "\x41"  # "32 bit AS Numbers Support"
827                     # (see RFC 6793, section 3)
828             "\x04"  # Capability value length
829         )
830         optional_parameter_hex += (
831             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
832         )
833         optional_parameters_hex += optional_parameter_hex
834
835         # Optional Parameters Length
836         optional_parameters_length = len(optional_parameters_hex)
837         optional_parameters_length_hex = struct.pack("B",
838                                                      optional_parameters_length)
839
840         # Length (big-endian)
841         length = (
842             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
843             len(my_autonomous_system_hex_2_bytes) +
844             len(hold_time_hex) + len(bgp_identifier_hex) +
845             len(optional_parameters_length_hex) +
846             len(optional_parameters_hex)
847         )
848         length_hex = struct.pack(">H", length)
849
850         # OPEN Message
851         message_hex = (
852             marker_hex +
853             length_hex +
854             type_hex +
855             version_hex +
856             my_autonomous_system_hex_2_bytes +
857             hold_time_hex +
858             bgp_identifier_hex +
859             optional_parameters_length_hex +
860             optional_parameters_hex
861         )
862
863         if self.log_debug:
864             logger.debug("OPEN message encoding")
865             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
866             logger.debug("  Length=" + str(length) + " (0x" +
867                          binascii.hexlify(length_hex) + ")")
868             logger.debug("  Type=" + str(type) + " (0x" +
869                          binascii.hexlify(type_hex) + ")")
870             logger.debug("  Version=" + str(version) + " (0x" +
871                          binascii.hexlify(version_hex) + ")")
872             logger.debug("  My Autonomous System=" +
873                          str(my_autonomous_system_2_bytes) + " (0x" +
874                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
875                          ")")
876             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
877                          binascii.hexlify(hold_time_hex) + ")")
878             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
879                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
880             logger.debug("  Optional Parameters Length=" +
881                          str(optional_parameters_length) + " (0x" +
882                          binascii.hexlify(optional_parameters_length_hex) +
883                          ")")
884             logger.debug("  Optional Parameters=0x" +
885                          binascii.hexlify(optional_parameters_hex))
886             logger.debug("OPEN message encoded: 0x%s",
887                          binascii.b2a_hex(message_hex))
888
889         return message_hex
890
891     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
892                        wr_prefix_length=None, nlri_prefix_length=None,
893                        my_autonomous_system=None, next_hop=None,
894                        originator_id=None, cluster_list_item=None,
895                        end_of_rib=False, **ls_nlri_params):
896         """Generates an UPDATE Message (rfc4271#section-4.3)
897
898         Arguments:
899             :param wr_prefixes: see the rfc4271#section-4.3
900             :param nlri_prefixes: see the rfc4271#section-4.3
901             :param wr_prefix_length: see the rfc4271#section-4.3
902             :param nlri_prefix_length: see the rfc4271#section-4.3
903             :param my_autonomous_system: see the rfc4271#section-4.3
904             :param next_hop: see the rfc4271#section-4.3
905         Returns:
906             :return: encoded UPDATE message in HEX
907         """
908
909         # default values handling
910         # TODO optimize default values handling (use e.g. dicionary.update() approach)
911         if wr_prefixes is None:
912             wr_prefixes = self.wr_prefixes_default
913         if nlri_prefixes is None:
914             nlri_prefixes = self.nlri_prefixes_default
915         if wr_prefix_length is None:
916             wr_prefix_length = self.prefix_length_default
917         if nlri_prefix_length is None:
918             nlri_prefix_length = self.prefix_length_default
919         if my_autonomous_system is None:
920             my_autonomous_system = self.my_autonomous_system_default
921         if next_hop is None:
922             next_hop = self.next_hop_default
923         if originator_id is None:
924             originator_id = self.originator_id_default
925         if cluster_list_item is None:
926             cluster_list_item = self.cluster_list_item_default
927         ls_nlri = self.ls_nlri_default.copy()
928         ls_nlri.update(ls_nlri_params)
929
930         # Marker
931         marker_hex = "\xFF" * 16
932
933         # Type
934         type = 2
935         type_hex = struct.pack("B", type)
936
937         # Withdrawn Routes
938         withdrawn_routes_hex = ""
939         if not self.bgpls:
940             bytes = ((wr_prefix_length - 1) / 8) + 1
941             for prefix in wr_prefixes:
942                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
943                                        struct.pack(">I", int(prefix))[:bytes])
944                 withdrawn_routes_hex += withdrawn_route_hex
945
946         # Withdrawn Routes Length
947         withdrawn_routes_length = len(withdrawn_routes_hex)
948         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
949
950         # TODO: to replace hardcoded string by encoding?
951         # Path Attributes
952         path_attributes_hex = ""
953         if nlri_prefixes != []:
954             path_attributes_hex += (
955                 "\x40"  # Flags ("Well-Known")
956                 "\x01"  # Type (ORIGIN)
957                 "\x01"  # Length (1)
958                 "\x00"  # Origin: IGP
959             )
960             path_attributes_hex += (
961                 "\x40"  # Flags ("Well-Known")
962                 "\x02"  # Type (AS_PATH)
963                 "\x06"  # Length (6)
964                 "\x02"  # AS segment type (AS_SEQUENCE)
965                 "\x01"  # AS segment length (1)
966             )
967             my_as_hex = struct.pack(">I", my_autonomous_system)
968             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
969             path_attributes_hex += (
970                 "\x40"  # Flags ("Well-Known")
971                 "\x03"  # Type (NEXT_HOP)
972                 "\x04"  # Length (4)
973             )
974             next_hop_hex = struct.pack(">I", int(next_hop))
975             path_attributes_hex += (
976                 next_hop_hex  # IP address of the next hop (4 bytes)
977             )
978             path_attributes_hex += (
979                 "\x40"  # Flags ("Well-Known")
980                 "\x05"  # Type (LOCAL_PREF)
981                 "\x04"  # Length (4)
982                 "\x00\x00\x00\x64"  # (100)
983             )
984             if originator_id is not None:
985                 path_attributes_hex += (
986                     "\x80"  # Flags ("Optional, non-transitive")
987                     "\x09"  # Type (ORIGINATOR_ID)
988                     "\x04"  # Length (4)
989                 )           # ORIGINATOR_ID (4 bytes)
990                 path_attributes_hex += struct.pack(">I", int(originator_id))
991             if cluster_list_item is not None:
992                 path_attributes_hex += (
993                     "\x80"  # Flags ("Optional, non-transitive")
994                     "\x09"  # Type (CLUSTER_LIST)
995                     "\x04"  # Length (4)
996                 )           # one CLUSTER_LIST item (4 bytes)
997                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
998
999         if self.bgpls and not end_of_rib:
1000             path_attributes_hex += (
1001                 "\x80"  # Flags ("Optional, non-transitive")
1002                 "\x0e"  # Type (MP_REACH_NLRI)
1003                 "\x22"  # Length (34)
1004                 "\x40\x04"  # AFI (BGP-LS)
1005                 "\x47"  # SAFI (BGP-LS)
1006                 "\x04"  # Next Hop Length (4)
1007             )
1008             path_attributes_hex += struct.pack(">I", int(next_hop))
1009             path_attributes_hex += "\x00"           # Reserved
1010             path_attributes_hex += (
1011                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1012                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1013                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1014             )
1015             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1016             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1017             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1018             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1019             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1020
1021         # Total Path Attributes Length
1022         total_path_attributes_length = len(path_attributes_hex)
1023         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1024
1025         # Network Layer Reachability Information
1026         nlri_hex = ""
1027         if not self.bgpls:
1028             bytes = ((nlri_prefix_length - 1) / 8) + 1
1029             for prefix in nlri_prefixes:
1030                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1031                                    struct.pack(">I", int(prefix))[:bytes])
1032                 nlri_hex += nlri_prefix_hex
1033
1034         # Length (big-endian)
1035         length = (
1036             len(marker_hex) + 2 + len(type_hex) +
1037             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1038             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1039             len(nlri_hex))
1040         length_hex = struct.pack(">H", length)
1041
1042         # UPDATE Message
1043         message_hex = (
1044             marker_hex +
1045             length_hex +
1046             type_hex +
1047             withdrawn_routes_length_hex +
1048             withdrawn_routes_hex +
1049             total_path_attributes_length_hex +
1050             path_attributes_hex +
1051             nlri_hex
1052         )
1053
1054         if self.log_debug:
1055             logger.debug("UPDATE message encoding")
1056             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1057             logger.debug("  Length=" + str(length) + " (0x" +
1058                          binascii.hexlify(length_hex) + ")")
1059             logger.debug("  Type=" + str(type) + " (0x" +
1060                          binascii.hexlify(type_hex) + ")")
1061             logger.debug("  withdrawn_routes_length=" +
1062                          str(withdrawn_routes_length) + " (0x" +
1063                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1064             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1065                          str(wr_prefix_length) + " (0x" +
1066                          binascii.hexlify(withdrawn_routes_hex) + ")")
1067             if total_path_attributes_length:
1068                 logger.debug("  Total Path Attributes Length=" +
1069                              str(total_path_attributes_length) + " (0x" +
1070                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1071                 logger.debug("  Path Attributes=" + "(0x" +
1072                              binascii.hexlify(path_attributes_hex) + ")")
1073                 logger.debug("    Origin=IGP")
1074                 logger.debug("    AS path=" + str(my_autonomous_system))
1075                 logger.debug("    Next hop=" + str(next_hop))
1076                 if originator_id is not None:
1077                     logger.debug("    Originator id=" + str(originator_id))
1078                 if cluster_list_item is not None:
1079                     logger.debug("    Cluster list=" + str(cluster_list_item))
1080                 if self.bgpls:
1081                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1082             logger.debug("  Network Layer Reachability Information=" +
1083                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1084                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1085             logger.debug("UPDATE message encoded: 0x" +
1086                          binascii.b2a_hex(message_hex))
1087
1088         # updating counter
1089         self.updates_sent += 1
1090         # returning encoded message
1091         return message_hex
1092
1093     def notification_message(self, error_code, error_subcode, data_hex=""):
1094         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1095
1096         Arguments:
1097             :param error_code: see the rfc4271#section-4.5
1098             :param error_subcode: see the rfc4271#section-4.5
1099             :param data_hex: see the rfc4271#section-4.5
1100         Returns:
1101             :return: encoded NOTIFICATION message in HEX
1102         """
1103
1104         # Marker
1105         marker_hex = "\xFF" * 16
1106
1107         # Type
1108         type = 3
1109         type_hex = struct.pack("B", type)
1110
1111         # Error Code
1112         error_code_hex = struct.pack("B", error_code)
1113
1114         # Error Subode
1115         error_subcode_hex = struct.pack("B", error_subcode)
1116
1117         # Length (big-endian)
1118         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1119                   len(error_subcode_hex) + len(data_hex))
1120         length_hex = struct.pack(">H", length)
1121
1122         # NOTIFICATION Message
1123         message_hex = (
1124             marker_hex +
1125             length_hex +
1126             type_hex +
1127             error_code_hex +
1128             error_subcode_hex +
1129             data_hex
1130         )
1131
1132         if self.log_debug:
1133             logger.debug("NOTIFICATION message encoding")
1134             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1135             logger.debug("  Length=" + str(length) + " (0x" +
1136                          binascii.hexlify(length_hex) + ")")
1137             logger.debug("  Type=" + str(type) + " (0x" +
1138                          binascii.hexlify(type_hex) + ")")
1139             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1140                          binascii.hexlify(error_code_hex) + ")")
1141             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1142                          binascii.hexlify(error_subcode_hex) + ")")
1143             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1144             logger.debug("NOTIFICATION message encoded: 0x%s",
1145                          binascii.b2a_hex(message_hex))
1146
1147         return message_hex
1148
1149     def keepalive_message(self):
1150         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1151
1152         Returns:
1153             :return: encoded KEEP ALIVE message in HEX
1154         """
1155
1156         # Marker
1157         marker_hex = "\xFF" * 16
1158
1159         # Type
1160         type = 4
1161         type_hex = struct.pack("B", type)
1162
1163         # Length (big-endian)
1164         length = len(marker_hex) + 2 + len(type_hex)
1165         length_hex = struct.pack(">H", length)
1166
1167         # KEEP ALIVE Message
1168         message_hex = (
1169             marker_hex +
1170             length_hex +
1171             type_hex
1172         )
1173
1174         if self.log_debug:
1175             logger.debug("KEEP ALIVE message encoding")
1176             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1177             logger.debug("  Length=" + str(length) + " (0x" +
1178                          binascii.hexlify(length_hex) + ")")
1179             logger.debug("  Type=" + str(type) + " (0x" +
1180                          binascii.hexlify(type_hex) + ")")
1181             logger.debug("KEEP ALIVE message encoded: 0x%s",
1182                          binascii.b2a_hex(message_hex))
1183
1184         return message_hex
1185
1186
1187 class TimeTracker(object):
1188     """Class for tracking timers, both for my keepalives and
1189     peer's hold time.
1190     """
1191
1192     def __init__(self, msg_in):
1193         """Initialisation. based on defaults and OPEN message from peer.
1194
1195         Arguments:
1196             msg_in: the OPEN message received from peer.
1197         """
1198         # Note: Relative time is always named timedelta, to stress that
1199         # the (non-delta) time is absolute.
1200         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1201         # Upper bound for being stuck in the same state, we should
1202         # at least report something before continuing.
1203         # Negotiate the hold timer by taking the smaller
1204         # of the 2 values (mine and the peer's).
1205         hold_timedelta = 180  # Not an attribute of self yet.
1206         # TODO: Make the default value configurable,
1207         # default value could mirror what peer said.
1208         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1209         if hold_timedelta > peer_hold_timedelta:
1210             hold_timedelta = peer_hold_timedelta
1211         if hold_timedelta != 0 and hold_timedelta < 3:
1212             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1213             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1214         self.hold_timedelta = hold_timedelta
1215         # If we do not hear from peer this long, we assume it has died.
1216         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1217         # Upper limit for duration between messages, to avoid being
1218         # declared to be dead.
1219         # The same as calling snapshot(), but also declares a field.
1220         self.snapshot_time = time.time()
1221         # Sometimes we need to store time. This is where to get
1222         # the value from afterwards. Time_keepalive may be too strict.
1223         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1224         # At this time point, peer will be declared dead.
1225         self.my_keepalive_time = None  # to be set later
1226         # At this point, we should be sending keepalive message.
1227
1228     def snapshot(self):
1229         """Store current time in instance data to use later."""
1230         # Read as time before something interesting was called.
1231         self.snapshot_time = time.time()
1232
1233     def reset_peer_hold_time(self):
1234         """Move hold time to future as peer has just proven it still lives."""
1235         self.peer_hold_time = time.time() + self.hold_timedelta
1236
1237     # Some methods could rely on self.snapshot_time, but it is better
1238     # to require user to provide it explicitly.
1239     def reset_my_keepalive_time(self, keepalive_time):
1240         """Calculate and set the next my KEEP ALIVE timeout time
1241
1242         Arguments:
1243             :keepalive_time: the initial value of the KEEP ALIVE timer
1244         """
1245         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1246
1247     def is_time_for_my_keepalive(self):
1248         """Check for my KEEP ALIVE timeout occurence"""
1249         if self.hold_timedelta == 0:
1250             return False
1251         return self.snapshot_time >= self.my_keepalive_time
1252
1253     def get_next_event_time(self):
1254         """Set the time of the next expected or to be sent KEEP ALIVE"""
1255         if self.hold_timedelta == 0:
1256             return self.snapshot_time + 86400
1257         return min(self.my_keepalive_time, self.peer_hold_time)
1258
1259     def check_peer_hold_time(self, snapshot_time):
1260         """Raise error if nothing was read from peer until specified time."""
1261         # Hold time = 0 means keepalive checking off.
1262         if self.hold_timedelta != 0:
1263             # time.time() may be too strict
1264             if snapshot_time > self.peer_hold_time:
1265                 logger.error("Peer has overstepped the hold timer.")
1266                 raise RuntimeError("Peer has overstepped the hold timer.")
1267                 # TODO: Include hold_timedelta?
1268                 # TODO: Add notification sending (attempt). That means
1269                 # move to write tracker.
1270
1271
1272 class ReadTracker(object):
1273     """Class for tracking read of mesages chunk by chunk and
1274     for idle waiting.
1275     """
1276
1277     def __init__(self, bgp_socket, timer, storage, evpn=False, wait_for_read=10):
1278         """The reader initialisation.
1279
1280         Arguments:
1281             bgp_socket: socket to be used for sending
1282             timer: timer to be used for scheduling
1283             storage: thread safe dict
1284             evpn: flag that evpn functionality is tested
1285         """
1286         # References to outside objects.
1287         self.socket = bgp_socket
1288         self.timer = timer
1289         # BGP marker length plus length field length.
1290         self.header_length = 18
1291         # TODO: make it class (constant) attribute
1292         # Computation of where next chunk ends depends on whether
1293         # we are beyond length field.
1294         self.reading_header = True
1295         # Countdown towards next size computation.
1296         self.bytes_to_read = self.header_length
1297         # Incremental buffer for message under read.
1298         self.msg_in = ""
1299         # Initialising counters
1300         self.updates_received = 0
1301         self.prefixes_introduced = 0
1302         self.prefixes_withdrawn = 0
1303         self.rx_idle_time = 0
1304         self.rx_activity_detected = True
1305         self.storage = storage
1306         self.evpn = evpn
1307         self.wfr = wait_for_read
1308
1309     def read_message_chunk(self):
1310         """Read up to one message
1311
1312         Note:
1313             Currently it does not return anything.
1314         """
1315         # TODO: We could return the whole message, currently not needed.
1316         # We assume the socket is readable.
1317         chunk_message = self.socket.recv(self.bytes_to_read)
1318         self.msg_in += chunk_message
1319         self.bytes_to_read -= len(chunk_message)
1320         # TODO: bytes_to_read < 0 is not possible, right?
1321         if not self.bytes_to_read:
1322             # Finished reading a logical block.
1323             if self.reading_header:
1324                 # The logical block was a BGP header.
1325                 # Now we know the size of the message.
1326                 self.reading_header = False
1327                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1328                                       self.header_length)
1329             else:  # We have finished reading the body of the message.
1330                 # Peer has just proven it is still alive.
1331                 self.timer.reset_peer_hold_time()
1332                 # TODO: Do we want to count received messages?
1333                 # This version ignores the received message.
1334                 # TODO: Should we do validation and exit on anything
1335                 # besides update or keepalive?
1336                 # Prepare state for reading another message.
1337                 message_type_hex = self.msg_in[self.header_length]
1338                 if message_type_hex == "\x01":
1339                     logger.info("OPEN message received: 0x%s",
1340                                 binascii.b2a_hex(self.msg_in))
1341                 elif message_type_hex == "\x02":
1342                     logger.debug("UPDATE message received: 0x%s",
1343                                  binascii.b2a_hex(self.msg_in))
1344                     self.decode_update_message(self.msg_in)
1345                 elif message_type_hex == "\x03":
1346                     logger.info("NOTIFICATION message received: 0x%s",
1347                                 binascii.b2a_hex(self.msg_in))
1348                 elif message_type_hex == "\x04":
1349                     logger.info("KEEP ALIVE message received: 0x%s",
1350                                 binascii.b2a_hex(self.msg_in))
1351                 else:
1352                     logger.warning("Unexpected message received: 0x%s",
1353                                    binascii.b2a_hex(self.msg_in))
1354                 self.msg_in = ""
1355                 self.reading_header = True
1356                 self.bytes_to_read = self.header_length
1357         # We should not act upon peer_hold_time if we are reading
1358         # something right now.
1359         return
1360
1361     def decode_path_attributes(self, path_attributes_hex):
1362         """Decode the Path Attributes field (rfc4271#section-4.3)
1363
1364         Arguments:
1365             :path_attributes: path_attributes field to be decoded in hex
1366         Returns:
1367             :return: None
1368         """
1369         hex_to_decode = path_attributes_hex
1370
1371         while len(hex_to_decode):
1372             attr_flags_hex = hex_to_decode[0]
1373             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1374 #            attr_optional_bit = attr_flags & 128
1375 #            attr_transitive_bit = attr_flags & 64
1376 #            attr_partial_bit = attr_flags & 32
1377             attr_extended_length_bit = attr_flags & 16
1378
1379             attr_type_code_hex = hex_to_decode[1]
1380             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1381
1382             if attr_extended_length_bit:
1383                 attr_length_hex = hex_to_decode[2:4]
1384                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1385                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1386                 hex_to_decode = hex_to_decode[4 + attr_length:]
1387             else:
1388                 attr_length_hex = hex_to_decode[2]
1389                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1390                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1391                 hex_to_decode = hex_to_decode[3 + attr_length:]
1392
1393             if attr_type_code == 1:
1394                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1395                              binascii.b2a_hex(attr_flags_hex))
1396                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1397             elif attr_type_code == 2:
1398                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1399                              binascii.b2a_hex(attr_flags_hex))
1400                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1401             elif attr_type_code == 3:
1402                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1403                              binascii.b2a_hex(attr_flags_hex))
1404                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1405             elif attr_type_code == 4:
1406                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1407                              binascii.b2a_hex(attr_flags_hex))
1408                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1409             elif attr_type_code == 5:
1410                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1411                              binascii.b2a_hex(attr_flags_hex))
1412                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1413             elif attr_type_code == 6:
1414                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1415                              binascii.b2a_hex(attr_flags_hex))
1416                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1417             elif attr_type_code == 7:
1418                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1419                              binascii.b2a_hex(attr_flags_hex))
1420                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1421             elif attr_type_code == 9:  # rfc4456#section-8
1422                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1423                              binascii.b2a_hex(attr_flags_hex))
1424                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1425             elif attr_type_code == 10:  # rfc4456#section-8
1426                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1427                              binascii.b2a_hex(attr_flags_hex))
1428                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1429             elif attr_type_code == 14:  # rfc4760#section-3
1430                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1431                              binascii.b2a_hex(attr_flags_hex))
1432                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1433                 address_family_identifier_hex = attr_value_hex[0:2]
1434                 logger.debug("  Address Family Identifier=0x%s",
1435                              binascii.b2a_hex(address_family_identifier_hex))
1436                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1437                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1438                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1439                 next_hop_netaddr_len_hex = attr_value_hex[3]
1440                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1441                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1442                              next_hop_netaddr_len,
1443                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1444                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1445                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1446                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1447                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1448                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1449                 logger.debug("  Reserved=0x%s",
1450                              binascii.b2a_hex(reserved_hex))
1451                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1452                 logger.debug("  Network Layer Reachability Information=0x%s",
1453                              binascii.b2a_hex(nlri_hex))
1454                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1455                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1456                 for prefix in nlri_prefix_list:
1457                     logger.debug("  nlri_prefix_received: %s", prefix)
1458                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1459             elif attr_type_code == 15:  # rfc4760#section-4
1460                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1461                              binascii.b2a_hex(attr_flags_hex))
1462                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1463                 address_family_identifier_hex = attr_value_hex[0:2]
1464                 logger.debug("  Address Family Identifier=0x%s",
1465                              binascii.b2a_hex(address_family_identifier_hex))
1466                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1467                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1468                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1469                 wd_hex = attr_value_hex[3:]
1470                 logger.debug("  Withdrawn Routes=0x%s",
1471                              binascii.b2a_hex(wd_hex))
1472                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1473                 logger.debug("  Withdrawn routes prefix list: %s",
1474                              wdr_prefix_list)
1475                 for prefix in wdr_prefix_list:
1476                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1477                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1478             else:
1479                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1480                              binascii.b2a_hex(attr_flags_hex))
1481                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1482         return None
1483
1484     def decode_update_message(self, msg):
1485         """Decode an UPDATE message (rfc4271#section-4.3)
1486
1487         Arguments:
1488             :msg: message to be decoded in hex
1489         Returns:
1490             :return: None
1491         """
1492         logger.debug("Decoding update message:")
1493         # message header - marker
1494         marker_hex = msg[:16]
1495         logger.debug("Message header marker: 0x%s",
1496                      binascii.b2a_hex(marker_hex))
1497         # message header - message length
1498         msg_length_hex = msg[16:18]
1499         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1500         logger.debug("Message lenght: 0x%s (%s)",
1501                      binascii.b2a_hex(msg_length_hex), msg_length)
1502         # message header - message type
1503         msg_type_hex = msg[18:19]
1504         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1505
1506         with self.storage as stor:
1507             # this will replace the previously stored message
1508             stor['update'] = binascii.hexlify(msg)
1509
1510         logger.debug("Evpn {}".format(self.evpn))
1511         if self.evpn:
1512             logger.debug("Skipping update decoding due to evpn data expected")
1513             return
1514
1515         if msg_type == 2:
1516             logger.debug("Message type: 0x%s (update)",
1517                          binascii.b2a_hex(msg_type_hex))
1518             # withdrawn routes length
1519             wdr_length_hex = msg[19:21]
1520             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1521             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1522                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1523             # withdrawn routes
1524             wdr_hex = msg[21:21 + wdr_length]
1525             logger.debug("Withdrawn routes: 0x%s",
1526                          binascii.b2a_hex(wdr_hex))
1527             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1528             logger.debug("Withdrawn routes prefix list: %s",
1529                          wdr_prefix_list)
1530             for prefix in wdr_prefix_list:
1531                 logger.debug("withdrawn_prefix_received: %s", prefix)
1532             # total path attribute length
1533             total_pa_length_offset = 21 + wdr_length
1534             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1535             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1536             logger.debug("Total path attribute lenght: 0x%s (%s)",
1537                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1538             # path attributes
1539             pa_offset = total_pa_length_offset + 2
1540             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1541             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1542             self.decode_path_attributes(pa_hex)
1543             # network layer reachability information length
1544             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1545             logger.debug("Calculated NLRI length: %s", nlri_length)
1546             # network layer reachability information
1547             nlri_offset = pa_offset + total_pa_length
1548             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1549             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1550             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1551             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1552             for prefix in nlri_prefix_list:
1553                 logger.debug("nlri_prefix_received: %s", prefix)
1554             # Updating counters
1555             self.updates_received += 1
1556             self.prefixes_introduced += len(nlri_prefix_list)
1557             self.prefixes_withdrawn += len(wdr_prefix_list)
1558         else:
1559             logger.error("Unexpeced message type 0x%s in 0x%s",
1560                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1561
1562     def wait_for_read(self):
1563         """Read message until timeout (next expected event).
1564
1565         Note:
1566             Used when no more updates has to be sent to avoid busy-wait.
1567             Currently it does not return anything.
1568         """
1569         # Compute time to the first predictable state change
1570         event_time = self.timer.get_next_event_time()
1571         # snapshot_time would be imprecise
1572         wait_timedelta = min(event_time - time.time(), self.wfr)
1573         if wait_timedelta < 0:
1574             # The program got around to waiting to an event in "very near
1575             # future" so late that it became a "past" event, thus tell
1576             # "select" to not wait at all. Passing negative timedelta to
1577             # select() would lead to either waiting forever (for -1) or
1578             # select.error("Invalid parameter") (for everything else).
1579             wait_timedelta = 0
1580         # And wait for event or something to read.
1581
1582         if not self.rx_activity_detected or not (self.updates_received % 100):
1583             # right time to write statistics to the log (not for every update and
1584             # not too frequently to avoid having large log files)
1585             logger.info("total_received_update_message_counter: %s",
1586                         self.updates_received)
1587             logger.info("total_received_nlri_prefix_counter: %s",
1588                         self.prefixes_introduced)
1589             logger.info("total_received_withdrawn_prefix_counter: %s",
1590                         self.prefixes_withdrawn)
1591
1592         start_time = time.time()
1593         select.select([self.socket], [], [self.socket], wait_timedelta)
1594         timedelta = time.time() - start_time
1595         self.rx_idle_time += timedelta
1596         self.rx_activity_detected = timedelta < 1
1597
1598         if not self.rx_activity_detected or not (self.updates_received % 100):
1599             # right time to write statistics to the log (not for every update and
1600             # not too frequently to avoid having large log files)
1601             logger.info("... idle for %.3fs", timedelta)
1602             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1603         return
1604
1605
1606 class WriteTracker(object):
1607     """Class tracking enqueueing messages and sending chunks of them."""
1608
1609     def __init__(self, bgp_socket, generator, timer):
1610         """The writter initialisation.
1611
1612         Arguments:
1613             bgp_socket: socket to be used for sending
1614             generator: generator to be used for message generation
1615             timer: timer to be used for scheduling
1616         """
1617         # References to outside objects,
1618         self.socket = bgp_socket
1619         self.generator = generator
1620         self.timer = timer
1621         # Really new fields.
1622         # TODO: Would attribute docstrings add anything substantial?
1623         self.sending_message = False
1624         self.bytes_to_send = 0
1625         self.msg_out = ""
1626
1627     def enqueue_message_for_sending(self, message):
1628         """Enqueue message and change state.
1629
1630         Arguments:
1631             message: message to be enqueued into the msg_out buffer
1632         """
1633         self.msg_out += message
1634         self.bytes_to_send += len(message)
1635         self.sending_message = True
1636
1637     def send_message_chunk_is_whole(self):
1638         """Send enqueued data from msg_out buffer
1639
1640         Returns:
1641             :return: true if no remaining data to send
1642         """
1643         # We assume there is a msg_out to send and socket is writable.
1644         # print "going to send", repr(self.msg_out)
1645         self.timer.snapshot()
1646         bytes_sent = self.socket.send(self.msg_out)
1647         # Forget the part of message that was sent.
1648         self.msg_out = self.msg_out[bytes_sent:]
1649         self.bytes_to_send -= bytes_sent
1650         if not self.bytes_to_send:
1651             # TODO: Is it possible to hit negative bytes_to_send?
1652             self.sending_message = False
1653             # We should have reset hold timer on peer side.
1654             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1655             # The possible reason for not prioritizing reads is gone.
1656             return True
1657         return False
1658
1659
1660 class StateTracker(object):
1661     """Main loop has state so complex it warrants this separate class."""
1662
1663     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1664         """The state tracker initialisation.
1665
1666         Arguments:
1667             bgp_socket: socket to be used for sending / receiving
1668             generator: generator to be used for message generation
1669             timer: timer to be used for scheduling
1670             inqueue: user initiated messages queue
1671             storage: thread safe dict to store data for the rpc server
1672             cliargs: cli args from the user
1673         """
1674         # References to outside objects.
1675         self.socket = bgp_socket
1676         self.generator = generator
1677         self.timer = timer
1678         # Sub-trackers.
1679         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, wait_for_read=cliargs.wfr)
1680         self.writer = WriteTracker(bgp_socket, generator, timer)
1681         # Prioritization state.
1682         self.prioritize_writing = False
1683         # In general, we prioritize reading over writing. But in order
1684         # not to get blocked by neverending reads, we should
1685         # check whether we are not risking running out of holdtime.
1686         # So in some situations, this field is set to True to attempt
1687         # finishing sending a message, after which this field resets
1688         # back to False.
1689         # TODO: Alternative is to switch fairly between reading and
1690         # writing (called round robin from now on).
1691         # Message counting is done in generator.
1692         self.inqueue = inqueue
1693
1694     def perform_one_loop_iteration(self):
1695         """ The main loop iteration
1696
1697         Notes:
1698             Calculates priority, resolves all conditions, calls
1699             appropriate method and returns to caller to repeat.
1700         """
1701         self.timer.snapshot()
1702         if not self.prioritize_writing:
1703             if self.timer.is_time_for_my_keepalive():
1704                 if not self.writer.sending_message:
1705                     # We need to schedule a keepalive ASAP.
1706                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1707                     logger.info("KEEP ALIVE is sent.")
1708                 # We are sending a message now, so let's prioritize it.
1709                 self.prioritize_writing = True
1710
1711         try:
1712             msg = self.inqueue.get_nowait()
1713             logger.info("Received message: {}".format(msg))
1714             msgbin = binascii.unhexlify(msg)
1715             self.writer.enqueue_message_for_sending(msgbin)
1716         except Queue.Empty:
1717             pass
1718         # Now we know what our priorities are, we have to check
1719         # which actions are available.
1720         # socket.socket() returns three lists,
1721         # we store them to list of lists.
1722         list_list = select.select([self.socket], [self.socket], [self.socket],
1723                                   self.timer.report_timedelta)
1724         read_list, write_list, except_list = list_list
1725         # Lists are unpacked, each is either [] or [self.socket],
1726         # so we will test them as boolean.
1727         if except_list:
1728             logger.error("Exceptional state on the socket.")
1729             raise RuntimeError("Exceptional state on socket", self.socket)
1730         # We will do either read or write.
1731         if not (self.prioritize_writing and write_list):
1732             # Either we have no reason to rush writes,
1733             # or the socket is not writable.
1734             # We are focusing on reading here.
1735             if read_list:  # there is something to read indeed
1736                 # In this case we want to read chunk of message
1737                 # and repeat the select,
1738                 self.reader.read_message_chunk()
1739                 return
1740             # We were focusing on reading, but nothing to read was there.
1741             # Good time to check peer for hold timer.
1742             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1743             # Quiet on the read front, we can have attempt to write.
1744         if write_list:
1745             # Either we really want to reset peer's view of our hold
1746             # timer, or there was nothing to read.
1747             # Were we in the middle of sending a message?
1748             if self.writer.sending_message:
1749                 # Was it the end of a message?
1750                 whole = self.writer.send_message_chunk_is_whole()
1751                 # We were pressed to send something and we did it.
1752                 if self.prioritize_writing and whole:
1753                     # We prioritize reading again.
1754                     self.prioritize_writing = False
1755                 return
1756             # Finally to check if still update messages to be generated.
1757             if self.generator.remaining_prefixes:
1758                 msg_out = self.generator.compose_update_message()
1759                 if not self.generator.remaining_prefixes:
1760                     # We have just finished update generation,
1761                     # end-of-rib is due.
1762                     logger.info("All update messages generated.")
1763                     logger.info("Storing performance results.")
1764                     self.generator.store_results()
1765                     logger.info("Finally an END-OF-RIB is sent.")
1766                     msg_out += self.generator.update_message(wr_prefixes=[],
1767                                                              nlri_prefixes=[],
1768                                                              end_of_rib=True)
1769                 self.writer.enqueue_message_for_sending(msg_out)
1770                 # Attempt for real sending to be done in next iteration.
1771                 return
1772             # Nothing to write anymore.
1773             # To avoid busy loop, we do idle waiting here.
1774             self.reader.wait_for_read()
1775             return
1776         # We can neither read nor write.
1777         logger.warning("Input and output both blocked for " +
1778                        str(self.timer.report_timedelta) + " seconds.")
1779         # FIXME: Are we sure select has been really waiting
1780         # the whole period?
1781         return
1782
1783
1784 def create_logger(loglevel, logfile):
1785     """Create logger object
1786
1787     Arguments:
1788         :loglevel: log level
1789         :logfile: log file name
1790     Returns:
1791         :return: logger object
1792     """
1793     logger = logging.getLogger("logger")
1794     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1795     console_handler = logging.StreamHandler()
1796     file_handler = logging.FileHandler(logfile, mode="w")
1797     console_handler.setFormatter(log_formatter)
1798     file_handler.setFormatter(log_formatter)
1799     logger.addHandler(console_handler)
1800     logger.addHandler(file_handler)
1801     logger.setLevel(loglevel)
1802     return logger
1803
1804
1805 def job(arguments, inqueue, storage):
1806     """One time initialisation and iterations looping.
1807     Notes:
1808         Establish BGP connection and run iterations.
1809
1810     Arguments:
1811         :arguments: Command line arguments
1812         :inqueue: Data to be sent from play.py
1813         :storage: Shared dict for rpc server
1814     Returns:
1815         :return: None
1816     """
1817     bgp_socket = establish_connection(arguments)
1818     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1819     # Receive open message before sending anything.
1820     # FIXME: Add parameter to send default open message first,
1821     # to work with "you first" peers.
1822     msg_in = read_open_message(bgp_socket)
1823     timer = TimeTracker(msg_in)
1824     generator = MessageGenerator(arguments)
1825     msg_out = generator.open_message()
1826     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1827     # Send our open message to the peer.
1828     bgp_socket.send(msg_out)
1829     # Wait for confirming keepalive.
1830     # TODO: Surely in just one packet?
1831     # Using exact keepalive length to not to see possible updates.
1832     msg_in = bgp_socket.recv(19)
1833     if msg_in != generator.keepalive_message():
1834         error_msg = "Open not confirmed by keepalive, instead got"
1835         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1836         raise MessageError(error_msg, msg_in)
1837     timer.reset_peer_hold_time()
1838     # Send the keepalive to indicate the connection is accepted.
1839     timer.snapshot()  # Remember this time.
1840     msg_out = generator.keepalive_message()
1841     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1842     bgp_socket.send(msg_out)
1843     # Use the remembered time.
1844     timer.reset_my_keepalive_time(timer.snapshot_time)
1845     # End of initial handshake phase.
1846     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1847     while True:  # main reactor loop
1848         state.perform_one_loop_iteration()
1849
1850
1851 class Rpcs:
1852     '''Handler for SimpleXMLRPCServer'''
1853     def __init__(self, sendqueue, storage):
1854         '''Init method
1855
1856         Arguments:
1857             :sendqueue: queue for data to be sent towards odl
1858             :storage: thread safe dict
1859         '''
1860         self.queue = sendqueue
1861         self.storage = storage
1862
1863     def send(self, text):
1864         '''Data to be sent
1865
1866         Arguments:
1867             :text: hes string of the data to be sent
1868         '''
1869         self.queue.put(text)
1870
1871     def get(self, text=''):
1872         '''Reads data form the storage
1873
1874         - returns stored data or an empty string, at the moment only
1875           'update' is stored
1876
1877         Arguments:
1878             :text: a key to the storage to get the data
1879         Returns:
1880             :data: stored data
1881         '''
1882         with self.storage as stor:
1883             return stor.get(text, '')
1884
1885     def clean(self, text=''):
1886         '''Cleans data form the storage
1887
1888         Arguments:
1889             :text: a key to the storage to clean the data
1890         '''
1891         with self.storage as stor:
1892             if text in stor:
1893                 del stor[text]
1894
1895
1896 def threaded_job(arguments):
1897     """Run the job threaded
1898
1899     Arguments:
1900         :arguments: Command line arguments
1901     Returns:
1902         :return: None
1903     """
1904     amount_left = arguments.amount
1905     utils_left = arguments.multiplicity
1906     prefix_current = arguments.firstprefix
1907     myip_current = arguments.myip
1908     thread_args = []
1909     rpcqueue = Queue.Queue()
1910     storage = SafeDict()
1911
1912     while 1:
1913         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1914         amount_left -= amount_per_util
1915         utils_left -= 1
1916
1917         args = deepcopy(arguments)
1918         args.amount = amount_per_util
1919         args.firstprefix = prefix_current
1920         args.myip = myip_current
1921         thread_args.append(args)
1922
1923         if not utils_left:
1924             break
1925         prefix_current += amount_per_util * 16
1926         myip_current += 1
1927
1928     try:
1929         # Create threads
1930         for t in thread_args:
1931             thread.start_new_thread(job, (t, rpcqueue, storage))
1932     except Exception:
1933         print "Error: unable to start thread."
1934         raise SystemExit(2)
1935
1936     rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
1937     rpcserver.register_instance(Rpcs(rpcqueue, storage))
1938     rpcserver.serve_forever()
1939
1940
1941 if __name__ == "__main__":
1942     arguments = parse_arguments()
1943     logger = create_logger(arguments.loglevel, arguments.logfile)
1944     threaded_job(arguments)