fixing pep8 problems for test verify tox job
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Maximum number of IP prefixes to be announced in one iteration"
69     parser.add_argument("--insert", default="1", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72     str_help = "The number of prefixes to process without withdrawals"
73     parser.add_argument("--prefill", default="0", type=int, help=str_help)
74     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75     parser.add_argument("--updates", choices=["single", "separate"],
76                         default=["separate"], help=str_help)
77     str_help = "Base prefix IP address for prefix generation"
78     parser.add_argument("--firstprefix", default="8.0.1.0",
79                         type=ipaddr.IPv4Address, help=str_help)
80     str_help = "The prefix length."
81     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82     str_help = "Listen for connection, instead of initiating it."
83     parser.add_argument("--listen", action="store_true", help=str_help)
84     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85                 "Default value only suitable for listening.")
86     parser.add_argument("--myip", default="0.0.0.0",
87                         type=ipaddr.IPv4Address, help=str_help)
88     str_help = ("TCP port to bind to when listening or initiating connection." +
89                 "Default only suitable for initiating.")
90     parser.add_argument("--myport", default="0", type=int, help=str_help)
91     str_help = "The IP of the next hop to be placed into the update messages."
92     parser.add_argument("--nexthop", default="192.0.2.1",
93                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94     str_help = "Identifier of the route originator."
95     parser.add_argument("--originator", default=None,
96                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
97     str_help = "Cluster list item identifier."
98     parser.add_argument("--cluster", default=None,
99                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100     str_help = ("Numeric IP Address to try to connect to." +
101                 "Currently no effect in listening mode.")
102     parser.add_argument("--peerip", default="127.0.0.2",
103                         type=ipaddr.IPv4Address, help=str_help)
104     str_help = "TCP port to try to connect to. No effect in listening mode."
105     parser.add_argument("--peerport", default="179", type=int, help=str_help)
106     str_help = "Local hold time."
107     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108     str_help = "Log level (--error, --warning, --info, --debug)"
109     parser.add_argument("--error", dest="loglevel", action="store_const",
110                         const=logging.ERROR, default=logging.INFO,
111                         help=str_help)
112     parser.add_argument("--warning", dest="loglevel", action="store_const",
113                         const=logging.WARNING, default=logging.INFO,
114                         help=str_help)
115     parser.add_argument("--info", dest="loglevel", action="store_const",
116                         const=logging.INFO, default=logging.INFO,
117                         help=str_help)
118     parser.add_argument("--debug", dest="loglevel", action="store_const",
119                         const=logging.DEBUG, default=logging.INFO,
120                         help=str_help)
121     str_help = "Log file name"
122     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123     str_help = "Trailing part of the csv result files for plotting purposes"
124     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125     str_help = "Minimum number of updates to reach to include result into csv."
126     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129     str_help = "Link-State NLRI supported"
130     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131     str_help = "Link-State NLRI: Identifier"
132     parser.add_argument("-lsid", default="1", type=int, help=str_help)
133     str_help = "Link-State NLRI: Tunnel ID"
134     parser.add_argument("-lstid", default="1", type=int, help=str_help)
135     str_help = "Link-State NLRI: LSP ID"
136     parser.add_argument("-lspid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138     parser.add_argument("--lstsaddr", default="1.2.3.4",
139                         type=ipaddr.IPv4Address, help=str_help)
140     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141     parser.add_argument("--lsteaddr", default="5.6.7.8",
142                         type=ipaddr.IPv4Address, help=str_help)
143     str_help = "Link-State NLRI: Identifier Step"
144     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145     str_help = "Link-State NLRI: Tunnel ID Step"
146     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147     str_help = "Link-State NLRI: LSP ID Step"
148     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153     str_help = "How many play utilities are to be started."
154     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
160     str_help = "Skipping well known attributes for update message"
161     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
162     arguments = parser.parse_args()
163     if arguments.multiplicity < 1:
164         print "Multiplicity", arguments.multiplicity, "is not positive."
165         raise SystemExit(1)
166     # TODO: Are sanity checks (such as asnumber>=0) required?
167     return arguments
168
169
170 def establish_connection(arguments):
171     """Establish connection to BGP peer.
172
173     Arguments:
174         :arguments: following command-line argumets are used
175             - arguments.myip: local IP address
176             - arguments.myport: local port
177             - arguments.peerip: remote IP address
178             - arguments.peerport: remote port
179     Returns:
180         :return: socket.
181     """
182     if arguments.listen:
183         logger.info("Connecting in the listening mode.")
184         logger.debug("Local IP address: " + str(arguments.myip))
185         logger.debug("Local port: " + str(arguments.myport))
186         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
187         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
188         # bind need single tuple as argument
189         listening_socket.bind((str(arguments.myip), arguments.myport))
190         listening_socket.listen(1)
191         bgp_socket, _ = listening_socket.accept()
192         # TODO: Verify client IP is cotroller IP.
193         listening_socket.close()
194     else:
195         logger.info("Connecting in the talking mode.")
196         logger.debug("Local IP address: " + str(arguments.myip))
197         logger.debug("Local port: " + str(arguments.myport))
198         logger.debug("Remote IP address: " + str(arguments.peerip))
199         logger.debug("Remote port: " + str(arguments.peerport))
200         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
201         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
202         # bind to force specified address and port
203         talking_socket.bind((str(arguments.myip), arguments.myport))
204         # socket does not spead ipaddr, hence str()
205         talking_socket.connect((str(arguments.peerip), arguments.peerport))
206         bgp_socket = talking_socket
207     logger.info("Connected to ODL.")
208     return bgp_socket
209
210
211 def get_short_int_from_message(message, offset=16):
212     """Extract 2-bytes number from provided message.
213
214     Arguments:
215         :message: given message
216         :offset: offset of the short_int inside the message
217     Returns:
218         :return: required short_inf value.
219     Notes:
220         default offset value is the BGP message size offset.
221     """
222     high_byte_int = ord(message[offset])
223     low_byte_int = ord(message[offset + 1])
224     short_int = high_byte_int * 256 + low_byte_int
225     return short_int
226
227
228 def get_prefix_list_from_hex(prefixes_hex):
229     """Get decoded list of prefixes (rfc4271#section-4.3)
230
231     Arguments:
232         :prefixes_hex: list of prefixes to be decoded in hex
233     Returns:
234         :return: list of prefixes in the form of ip address (X.X.X.X/X)
235     """
236     prefix_list = []
237     offset = 0
238     while offset < len(prefixes_hex):
239         prefix_bit_len_hex = prefixes_hex[offset]
240         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
241         prefix_len = ((prefix_bit_len - 1) / 8) + 1
242         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
243         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
244         offset += 1 + prefix_len
245         prefix_list.append(prefix + "/" + str(prefix_bit_len))
246     return prefix_list
247
248
249 class MessageError(ValueError):
250     """Value error with logging optimized for hexlified messages."""
251
252     def __init__(self, text, message, *args):
253         """Initialisation.
254
255         Store and call super init for textual comment,
256         store raw message which caused it.
257         """
258         self.text = text
259         self.msg = message
260         super(MessageError, self).__init__(text, message, *args)
261
262     def __str__(self):
263         """Generate human readable error message.
264
265         Returns:
266             :return: human readable message as string
267         Notes:
268             Use a placeholder string if the message is to be empty.
269         """
270         message = binascii.hexlify(self.msg)
271         if message == "":
272             message = "(empty message)"
273         return self.text + ": " + message
274
275
276 def read_open_message(bgp_socket):
277     """Receive peer's OPEN message
278
279     Arguments:
280         :bgp_socket: the socket to be read
281     Returns:
282         :return: received OPEN message.
283     Notes:
284         Performs just basic incomming message checks
285     """
286     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
287     # TODO: Can the incoming open message be split in more than one packet?
288     # Some validation.
289     if len(msg_in) < 37:
290         # 37 is minimal length of open message with 4-byte AS number.
291         error_msg = (
292             "Message length (" + str(len(msg_in)) + ") is smaller than "
293             "minimal length of OPEN message with 4-byte AS number (37)"
294         )
295         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
296         raise MessageError(error_msg, msg_in)
297     # TODO: We could check BGP marker, but it is defined only later;
298     # decide what to do.
299     reported_length = get_short_int_from_message(msg_in)
300     if len(msg_in) != reported_length:
301         error_msg = (
302             "Expected message length (" + reported_length +
303             ") does not match actual length (" + str(len(msg_in)) + ")"
304         )
305         logger.error(error_msg + binascii.hexlify(msg_in))
306         raise MessageError(error_msg, msg_in)
307     logger.info("Open message received.")
308     return msg_in
309
310
311 class MessageGenerator(object):
312     """Class which generates messages, holds states and configuration values."""
313
314     # TODO: Define bgp marker as a class (constant) variable.
315     def __init__(self, args):
316         """Initialisation according to command-line args.
317
318         Arguments:
319             :args: argsparser's Namespace object which contains command-line
320                 options for MesageGenerator initialisation
321         Notes:
322             Calculates and stores default values used later on for
323             message geeration.
324         """
325         self.total_prefix_amount = args.amount
326         # Number of update messages left to be sent.
327         self.remaining_prefixes = self.total_prefix_amount
328
329         # New parameters initialisation
330         self.iteration = 0
331         self.prefix_base_default = args.firstprefix
332         self.prefix_length_default = args.prefixlen
333         self.wr_prefixes_default = []
334         self.nlri_prefixes_default = []
335         self.version_default = 4
336         self.my_autonomous_system_default = args.asnumber
337         self.hold_time_default = args.holdtime  # Local hold time.
338         self.bgp_identifier_default = int(args.myip)
339         self.next_hop_default = args.nexthop
340         self.originator_id_default = args.originator
341         self.cluster_list_item_default = args.cluster
342         self.single_update_default = args.updates == "single"
343         self.randomize_updates_default = args.updates == "random"
344         self.prefix_count_to_add_default = args.insert
345         self.prefix_count_to_del_default = args.withdraw
346         if self.prefix_count_to_del_default < 0:
347             self.prefix_count_to_del_default = 0
348         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
349             # total number of prefixes must grow to avoid infinite test loop
350             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
351         self.slot_size_default = self.prefix_count_to_add_default
352         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
353         self.results_file_name_default = args.results
354         self.performance_threshold_default = args.threshold
355         self.rfc4760 = args.rfc4760
356         self.bgpls = args.bgpls
357         self.evpn = args.evpn
358         self.skipattr = args.skipattr
359         # Default values when BGP-LS Attributes are used
360         if self.bgpls:
361             self.prefix_count_to_add_default = 1
362             self.prefix_count_to_del_default = 0
363         self.ls_nlri_default = {"Identifier": args.lsid,
364                                 "TunnelID": args.lstid,
365                                 "LSPID": args.lspid,
366                                 "IPv4TunnelSenderAddress": args.lstsaddr,
367                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
368         self.lsid_step = args.lsidstep
369         self.lstid_step = args.lstidstep
370         self.lspid_step = args.lspidstep
371         self.lstsaddr_step = args.lstsaddrstep
372         self.lsteaddr_step = args.lsteaddrstep
373         # Default values used for randomized part
374         s1_slots = ((self.total_prefix_amount -
375                      self.remaining_prefixes_threshold - 1) /
376                     self.prefix_count_to_add_default + 1)
377         s2_slots = ((self.remaining_prefixes_threshold - 1) /
378                     (self.prefix_count_to_add_default -
379                     self.prefix_count_to_del_default) + 1)
380         # S1_First_Index = 0
381         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
382         s2_first_index = s1_slots * self.prefix_count_to_add_default
383         s2_last_index = (s2_first_index +
384                          s2_slots * (self.prefix_count_to_add_default -
385                                      self.prefix_count_to_del_default) - 1)
386         self.slot_gap_default = ((self.total_prefix_amount -
387                                   self.remaining_prefixes_threshold - 1) /
388                                  self.prefix_count_to_add_default + 1)
389         self.randomize_lowest_default = s2_first_index
390         self.randomize_highest_default = s2_last_index
391         # Initialising counters
392         self.phase1_start_time = 0
393         self.phase1_stop_time = 0
394         self.phase2_start_time = 0
395         self.phase2_stop_time = 0
396         self.phase1_updates_sent = 0
397         self.phase2_updates_sent = 0
398         self.updates_sent = 0
399
400         self.log_info = args.loglevel <= logging.INFO
401         self.log_debug = args.loglevel <= logging.DEBUG
402         """
403         Flags needed for the MessageGenerator performance optimization.
404         Calling logger methods each iteration even with proper log level set
405         slows down significantly the MessageGenerator performance.
406         Measured total generation time (1M updates, dry run, error log level):
407         - logging based on basic logger features: 36,2s
408         - logging based on advanced logger features (lazy logging): 21,2s
409         - conditional calling of logger methods enclosed inside condition: 8,6s
410         """
411
412         logger.info("Generator initialisation")
413         logger.info("  Target total number of prefixes to be introduced: " +
414                     str(self.total_prefix_amount))
415         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
416                     str(self.prefix_length_default))
417         logger.info("  My Autonomous System number: " +
418                     str(self.my_autonomous_system_default))
419         logger.info("  My Hold Time: " + str(self.hold_time_default))
420         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
421         logger.info("  Next Hop: " + str(self.next_hop_default))
422         logger.info("  Originator ID: " + str(self.originator_id_default))
423         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
424         logger.info("  Prefix count to be inserted at once: " +
425                     str(self.prefix_count_to_add_default))
426         logger.info("  Prefix count to be withdrawn at once: " +
427                     str(self.prefix_count_to_del_default))
428         logger.info("  Fast pre-fill up to " +
429                     str(self.total_prefix_amount -
430                         self.remaining_prefixes_threshold) + " prefixes")
431         logger.info("  Remaining number of prefixes to be processed " +
432                     "in parallel with withdrawals: " +
433                     str(self.remaining_prefixes_threshold))
434         logger.debug("  Prefix index range used after pre-fill procedure [" +
435                      str(self.randomize_lowest_default) + ", " +
436                      str(self.randomize_highest_default) + "]")
437         if self.single_update_default:
438             logger.info("  Common single UPDATE will be generated " +
439                         "for both NLRI & WITHDRAWN lists")
440         else:
441             logger.info("  Two separate UPDATEs will be generated " +
442                         "for each NLRI & WITHDRAWN lists")
443         if self.randomize_updates_default:
444             logger.info("  Generation of UPDATE messages will be randomized")
445         logger.info("  Let\'s go ...\n")
446
447         # TODO: Notification for hold timer expiration can be handy.
448
449     def store_results(self, file_name=None, threshold=None):
450         """ Stores specified results into files based on file_name value.
451
452         Arguments:
453             :param file_name: Trailing (common) part of result file names
454             :param threshold: Minimum number of sent updates needed for each
455                               result to be included into result csv file
456                               (mainly needed because of the result accuracy)
457         Returns:
458             :return: n/a
459         """
460         # default values handling
461         # TODO optimize default values handling (use e.g. dicionary.update() approach)
462         if file_name is None:
463             file_name = self.results_file_name_default
464         if threshold is None:
465             threshold = self.performance_threshold_default
466         # performance calculation
467         if self.phase1_updates_sent >= threshold:
468             totals1 = self.phase1_updates_sent
469             performance1 = int(self.phase1_updates_sent /
470                                (self.phase1_stop_time - self.phase1_start_time))
471         else:
472             totals1 = None
473             performance1 = None
474         if self.phase2_updates_sent >= threshold:
475             totals2 = self.phase2_updates_sent
476             performance2 = int(self.phase2_updates_sent /
477                                (self.phase2_stop_time - self.phase2_start_time))
478         else:
479             totals2 = None
480             performance2 = None
481
482         logger.info("#" * 10 + " Final results " + "#" * 10)
483         logger.info("Number of iterations: " + str(self.iteration))
484         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
485                     str(self.phase1_updates_sent))
486         logger.info("The pre-fill phase duration: " +
487                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
488         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
489                     str(self.phase2_updates_sent))
490         logger.info("The 2nd test phase duration: " +
491                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
492         logger.info("Threshold for performance reporting: " + str(threshold))
493
494         # making labels
495         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
496                         " route(s) per UPDATE")
497         if self.single_update_default:
498             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
499                                   "/-" + str(self.prefix_count_to_del_default) +
500                                   " routes per UPDATE")
501         else:
502             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
503                                   "/-" + str(self.prefix_count_to_del_default) +
504                                   " routes in two UPDATEs")
505         # collecting capacity and performance results
506         totals = {}
507         performance = {}
508         if totals1 is not None:
509             totals[phase1_label] = totals1
510             performance[phase1_label] = performance1
511         if totals2 is not None:
512             totals[phase2_label] = totals2
513             performance[phase2_label] = performance2
514         self.write_results_to_file(totals, "totals-" + file_name)
515         self.write_results_to_file(performance, "performance-" + file_name)
516
517     def write_results_to_file(self, results, file_name):
518         """Writes results to the csv plot file consumable by Jenkins.
519
520         Arguments:
521             :param file_name: Name of the (csv) file to be created
522         Returns:
523             :return: none
524         """
525         first_line = ""
526         second_line = ""
527         f = open(file_name, "wt")
528         try:
529             for key in sorted(results):
530                 first_line += key + ", "
531                 second_line += str(results[key]) + ", "
532             first_line = first_line[:-2]
533             second_line = second_line[:-2]
534             f.write(first_line + "\n")
535             f.write(second_line + "\n")
536             logger.info("Message generator performance results stored in " +
537                         file_name + ":")
538             logger.info("  " + first_line)
539             logger.info("  " + second_line)
540         finally:
541             f.close()
542
543     # Return pseudo-randomized (reproducible) index for selected range
544     def randomize_index(self, index, lowest=None, highest=None):
545         """Calculates pseudo-randomized index from selected range.
546
547         Arguments:
548             :param index: input index
549             :param lowest: the lowes index from the randomized area
550             :param highest: the highest index from the randomized area
551         Returns:
552             :return: the (pseudo)randomized index
553         Notes:
554             Created just as a fame for future generator enhancement.
555         """
556         # default values handling
557         # TODO optimize default values handling (use e.g. dicionary.update() approach)
558         if lowest is None:
559             lowest = self.randomize_lowest_default
560         if highest is None:
561             highest = self.randomize_highest_default
562         # randomize
563         if (index >= lowest) and (index <= highest):
564             # we are in the randomized range -> shuffle it inside
565             # the range (now just reverse the order)
566             new_index = highest - (index - lowest)
567         else:
568             # we are out of the randomized range -> nothing to do
569             new_index = index
570         return new_index
571
572     def get_ls_nlri_values(self, index):
573         """Generates LS-NLRI parameters.
574         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
575
576         Arguments:
577             :param index: index (iteration)
578         Returns:
579             :return: dictionary of LS NLRI parameters and values
580         """
581         # generating list of LS NLRI parameters
582         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
583         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
584         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
585         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
586         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
587         ls_nlri_values = {"Identifier": identifier,
588                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
589                           "TunnelID": tunnel_id, "LSPID": lsp_id,
590                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
591         return ls_nlri_values
592
593     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
594                         prefix_len=None, prefix_count=None, randomize=None):
595         """Generates list of IP address prefixes.
596
597         Arguments:
598             :param slot_index: index of group of prefix addresses
599             :param slot_size: size of group of prefix addresses
600                 in [number of included prefixes]
601             :param prefix_base: IP address of the first prefix
602                 (slot_index = 0, prefix_index = 0)
603             :param prefix_len: length of the prefix in bites
604                 (the same as size of netmask)
605             :param prefix_count: number of prefixes to be returned
606                 from the specified slot
607         Returns:
608             :return: list of generated IP address prefixes
609         """
610         # default values handling
611         # TODO optimize default values handling (use e.g. dicionary.update() approach)
612         if slot_size is None:
613             slot_size = self.slot_size_default
614         if prefix_base is None:
615             prefix_base = self.prefix_base_default
616         if prefix_len is None:
617             prefix_len = self.prefix_length_default
618         if prefix_count is None:
619             prefix_count = slot_size
620         if randomize is None:
621             randomize = self.randomize_updates_default
622         # generating list of prefixes
623         indexes = []
624         prefixes = []
625         prefix_gap = 2 ** (32 - prefix_len)
626         for i in range(prefix_count):
627             prefix_index = slot_index * slot_size + i
628             if randomize:
629                 prefix_index = self.randomize_index(prefix_index)
630             indexes.append(prefix_index)
631             prefixes.append(prefix_base + prefix_index * prefix_gap)
632         if self.log_debug:
633             logger.debug("  Prefix slot index: " + str(slot_index))
634             logger.debug("  Prefix slot size: " + str(slot_size))
635             logger.debug("  Prefix count: " + str(prefix_count))
636             logger.debug("  Prefix indexes: " + str(indexes))
637             logger.debug("  Prefix list: " + str(prefixes))
638         return prefixes
639
640     def compose_update_message(self, prefix_count_to_add=None,
641                                prefix_count_to_del=None):
642         """Composes an UPDATE message
643
644         Arguments:
645             :param prefix_count_to_add: # of prefixes to put into NLRI list
646             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
647         Returns:
648             :return: encoded UPDATE message in HEX
649         Notes:
650             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
651             lists or common message wich includes both prefix lists.
652             Updates global counters.
653         """
654         # default values handling
655         # TODO optimize default values handling (use e.g. dicionary.update() approach)
656         if prefix_count_to_add is None:
657             prefix_count_to_add = self.prefix_count_to_add_default
658         if prefix_count_to_del is None:
659             prefix_count_to_del = self.prefix_count_to_del_default
660         # logging
661         if self.log_info and not (self.iteration % 1000):
662             logger.info("Iteration: " + str(self.iteration) +
663                         " - total remaining prefixes: " +
664                         str(self.remaining_prefixes))
665         if self.log_debug:
666             logger.debug("#" * 10 + " Iteration: " +
667                          str(self.iteration) + " " + "#" * 10)
668             logger.debug("Remaining prefixes: " +
669                          str(self.remaining_prefixes))
670         # scenario type & one-shot counter
671         straightforward_scenario = (self.remaining_prefixes >
672                                     self.remaining_prefixes_threshold)
673         if straightforward_scenario:
674             prefix_count_to_del = 0
675             if self.log_debug:
676                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
677             if not self.phase1_start_time:
678                 self.phase1_start_time = time.time()
679         else:
680             if self.log_debug:
681                 logger.debug("--- COMBINED SCENARIO ---")
682             if not self.phase2_start_time:
683                 self.phase2_start_time = time.time()
684         # tailor the number of prefixes if needed
685         prefix_count_to_add = (prefix_count_to_del +
686                                min(prefix_count_to_add - prefix_count_to_del,
687                                    self.remaining_prefixes))
688         # prefix slots selection for insertion and withdrawal
689         slot_index_to_add = self.iteration
690         slot_index_to_del = slot_index_to_add - self.slot_gap_default
691         # getting lists of prefixes for insertion in this iteration
692         if self.log_debug:
693             logger.debug("Prefixes to be inserted in this iteration:")
694         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
695                                                   prefix_count=prefix_count_to_add)
696         # getting lists of prefixes for withdrawal in this iteration
697         if self.log_debug:
698             logger.debug("Prefixes to be withdrawn in this iteration:")
699         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
700                                                   prefix_count=prefix_count_to_del)
701         # generating the UPDATE mesage with LS-NLRI only
702         if self.bgpls:
703             ls_nlri = self.get_ls_nlri_values(self.iteration)
704             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
705                                           **ls_nlri)
706         else:
707             # generating the UPDATE message with prefix lists
708             if self.single_update_default:
709                 # Send prefixes to be introduced and withdrawn
710                 # in one UPDATE message
711                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
712                                               nlri_prefixes=prefix_list_to_add)
713             else:
714                 # Send prefixes to be introduced and withdrawn
715                 # in separate UPDATE messages (if needed)
716                 msg_out = self.update_message(wr_prefixes=[],
717                                               nlri_prefixes=prefix_list_to_add)
718                 if prefix_count_to_del:
719                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
720                                                    nlri_prefixes=[])
721         # updating counters - who knows ... maybe I am last time here ;)
722         if straightforward_scenario:
723             self.phase1_stop_time = time.time()
724             self.phase1_updates_sent = self.updates_sent
725         else:
726             self.phase2_stop_time = time.time()
727             self.phase2_updates_sent = (self.updates_sent -
728                                         self.phase1_updates_sent)
729         # updating totals for the next iteration
730         self.iteration += 1
731         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
732         # returning the encoded message
733         return msg_out
734
735     # Section of message encoders
736
737     def open_message(self, version=None, my_autonomous_system=None,
738                      hold_time=None, bgp_identifier=None):
739         """Generates an OPEN Message (rfc4271#section-4.2)
740
741         Arguments:
742             :param version: see the rfc4271#section-4.2
743             :param my_autonomous_system: see the rfc4271#section-4.2
744             :param hold_time: see the rfc4271#section-4.2
745             :param bgp_identifier: see the rfc4271#section-4.2
746         Returns:
747             :return: encoded OPEN message in HEX
748         """
749
750         # default values handling
751         # TODO optimize default values handling (use e.g. dicionary.update() approach)
752         if version is None:
753             version = self.version_default
754         if my_autonomous_system is None:
755             my_autonomous_system = self.my_autonomous_system_default
756         if hold_time is None:
757             hold_time = self.hold_time_default
758         if bgp_identifier is None:
759             bgp_identifier = self.bgp_identifier_default
760
761         # Marker
762         marker_hex = "\xFF" * 16
763
764         # Type
765         type = 1
766         type_hex = struct.pack("B", type)
767
768         # version
769         version_hex = struct.pack("B", version)
770
771         # my_autonomous_system
772         # AS_TRANS value, 23456 decadic.
773         my_autonomous_system_2_bytes = 23456
774         # AS number is mappable to 2 bytes
775         if my_autonomous_system < 65536:
776             my_autonomous_system_2_bytes = my_autonomous_system
777         my_autonomous_system_hex_2_bytes = struct.pack(">H",
778                                                        my_autonomous_system)
779
780         # Hold Time
781         hold_time_hex = struct.pack(">H", hold_time)
782
783         # BGP Identifier
784         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
785
786         # Optional Parameters
787         optional_parameters_hex = ""
788         if self.rfc4760:
789             optional_parameter_hex = (
790                 "\x02"  # Param type ("Capability Ad")
791                 "\x06"  # Length (6 bytes)
792                 "\x01"  # Capability type (NLRI Unicast),
793                         # see RFC 4760, secton 8
794                 "\x04"  # Capability value length
795                 "\x00\x01"  # AFI (Ipv4)
796                 "\x00"  # (reserved)
797                 "\x01"  # SAFI (Unicast)
798             )
799             optional_parameters_hex += optional_parameter_hex
800
801         if self.bgpls:
802             optional_parameter_hex = (
803                 "\x02"  # Param type ("Capability Ad")
804                 "\x06"  # Length (6 bytes)
805                 "\x01"  # Capability type (NLRI Unicast),
806                         # see RFC 4760, secton 8
807                 "\x04"  # Capability value length
808                 "\x40\x04"  # AFI (BGP-LS)
809                 "\x00"  # (reserved)
810                 "\x47"  # SAFI (BGP-LS)
811             )
812             optional_parameters_hex += optional_parameter_hex
813
814         if self.evpn:
815             optional_parameter_hex = (
816                 "\x02"  # Param type ("Capability Ad")
817                 "\x06"  # Length (6 bytes)
818                 "\x01"  # Multiprotocol extetension capability,
819                 "\x04"  # Capability value length
820                 "\x00\x19"  # AFI (L2-VPN)
821                 "\x00"  # (reserved)
822                 "\x46"  # SAFI (EVPN)
823             )
824             optional_parameters_hex += optional_parameter_hex
825
826         optional_parameter_hex = (
827             "\x02"  # Param type ("Capability Ad")
828             "\x06"  # Length (6 bytes)
829             "\x41"  # "32 bit AS Numbers Support"
830                     # (see RFC 6793, section 3)
831             "\x04"  # Capability value length
832         )
833         optional_parameter_hex += (
834             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
835         )
836         optional_parameters_hex += optional_parameter_hex
837
838         # Optional Parameters Length
839         optional_parameters_length = len(optional_parameters_hex)
840         optional_parameters_length_hex = struct.pack("B",
841                                                      optional_parameters_length)
842
843         # Length (big-endian)
844         length = (
845             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
846             len(my_autonomous_system_hex_2_bytes) +
847             len(hold_time_hex) + len(bgp_identifier_hex) +
848             len(optional_parameters_length_hex) +
849             len(optional_parameters_hex)
850         )
851         length_hex = struct.pack(">H", length)
852
853         # OPEN Message
854         message_hex = (
855             marker_hex +
856             length_hex +
857             type_hex +
858             version_hex +
859             my_autonomous_system_hex_2_bytes +
860             hold_time_hex +
861             bgp_identifier_hex +
862             optional_parameters_length_hex +
863             optional_parameters_hex
864         )
865
866         if self.log_debug:
867             logger.debug("OPEN message encoding")
868             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
869             logger.debug("  Length=" + str(length) + " (0x" +
870                          binascii.hexlify(length_hex) + ")")
871             logger.debug("  Type=" + str(type) + " (0x" +
872                          binascii.hexlify(type_hex) + ")")
873             logger.debug("  Version=" + str(version) + " (0x" +
874                          binascii.hexlify(version_hex) + ")")
875             logger.debug("  My Autonomous System=" +
876                          str(my_autonomous_system_2_bytes) + " (0x" +
877                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
878                          ")")
879             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
880                          binascii.hexlify(hold_time_hex) + ")")
881             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
882                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
883             logger.debug("  Optional Parameters Length=" +
884                          str(optional_parameters_length) + " (0x" +
885                          binascii.hexlify(optional_parameters_length_hex) +
886                          ")")
887             logger.debug("  Optional Parameters=0x" +
888                          binascii.hexlify(optional_parameters_hex))
889             logger.debug("OPEN message encoded: 0x%s",
890                          binascii.b2a_hex(message_hex))
891
892         return message_hex
893
894     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
895                        wr_prefix_length=None, nlri_prefix_length=None,
896                        my_autonomous_system=None, next_hop=None,
897                        originator_id=None, cluster_list_item=None,
898                        end_of_rib=False, **ls_nlri_params):
899         """Generates an UPDATE Message (rfc4271#section-4.3)
900
901         Arguments:
902             :param wr_prefixes: see the rfc4271#section-4.3
903             :param nlri_prefixes: see the rfc4271#section-4.3
904             :param wr_prefix_length: see the rfc4271#section-4.3
905             :param nlri_prefix_length: see the rfc4271#section-4.3
906             :param my_autonomous_system: see the rfc4271#section-4.3
907             :param next_hop: see the rfc4271#section-4.3
908         Returns:
909             :return: encoded UPDATE message in HEX
910         """
911
912         # default values handling
913         # TODO optimize default values handling (use e.g. dicionary.update() approach)
914         if wr_prefixes is None:
915             wr_prefixes = self.wr_prefixes_default
916         if nlri_prefixes is None:
917             nlri_prefixes = self.nlri_prefixes_default
918         if wr_prefix_length is None:
919             wr_prefix_length = self.prefix_length_default
920         if nlri_prefix_length is None:
921             nlri_prefix_length = self.prefix_length_default
922         if my_autonomous_system is None:
923             my_autonomous_system = self.my_autonomous_system_default
924         if next_hop is None:
925             next_hop = self.next_hop_default
926         if originator_id is None:
927             originator_id = self.originator_id_default
928         if cluster_list_item is None:
929             cluster_list_item = self.cluster_list_item_default
930         ls_nlri = self.ls_nlri_default.copy()
931         ls_nlri.update(ls_nlri_params)
932
933         # Marker
934         marker_hex = "\xFF" * 16
935
936         # Type
937         type = 2
938         type_hex = struct.pack("B", type)
939
940         # Withdrawn Routes
941         withdrawn_routes_hex = ""
942         if not self.bgpls:
943             bytes = ((wr_prefix_length - 1) / 8) + 1
944             for prefix in wr_prefixes:
945                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
946                                        struct.pack(">I", int(prefix))[:bytes])
947                 withdrawn_routes_hex += withdrawn_route_hex
948
949         # Withdrawn Routes Length
950         withdrawn_routes_length = len(withdrawn_routes_hex)
951         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
952
953         # TODO: to replace hardcoded string by encoding?
954         # Path Attributes
955         path_attributes_hex = ""
956         if not self.skipattr:
957             path_attributes_hex += (
958                 "\x40"  # Flags ("Well-Known")
959                 "\x01"  # Type (ORIGIN)
960                 "\x01"  # Length (1)
961                 "\x00"  # Origin: IGP
962             )
963             path_attributes_hex += (
964                 "\x40"  # Flags ("Well-Known")
965                 "\x02"  # Type (AS_PATH)
966                 "\x06"  # Length (6)
967                 "\x02"  # AS segment type (AS_SEQUENCE)
968                 "\x01"  # AS segment length (1)
969             )
970             my_as_hex = struct.pack(">I", my_autonomous_system)
971             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
972             path_attributes_hex += (
973                 "\x40"  # Flags ("Well-Known")
974                 "\x05"  # Type (LOCAL_PREF)
975                 "\x04"  # Length (4)
976                 "\x00\x00\x00\x64"  # (100)
977             )
978         if nlri_prefixes != []:
979             path_attributes_hex += (
980                 "\x40"  # Flags ("Well-Known")
981                 "\x03"  # Type (NEXT_HOP)
982                 "\x04"  # Length (4)
983             )
984             next_hop_hex = struct.pack(">I", int(next_hop))
985             path_attributes_hex += (
986                 next_hop_hex  # IP address of the next hop (4 bytes)
987             )
988             if originator_id is not None:
989                 path_attributes_hex += (
990                     "\x80"  # Flags ("Optional, non-transitive")
991                     "\x09"  # Type (ORIGINATOR_ID)
992                     "\x04"  # Length (4)
993                 )           # ORIGINATOR_ID (4 bytes)
994                 path_attributes_hex += struct.pack(">I", int(originator_id))
995             if cluster_list_item is not None:
996                 path_attributes_hex += (
997                     "\x80"  # Flags ("Optional, non-transitive")
998                     "\x09"  # Type (CLUSTER_LIST)
999                     "\x04"  # Length (4)
1000                 )           # one CLUSTER_LIST item (4 bytes)
1001                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1002
1003         if self.bgpls and not end_of_rib:
1004             path_attributes_hex += (
1005                 "\x80"  # Flags ("Optional, non-transitive")
1006                 "\x0e"  # Type (MP_REACH_NLRI)
1007                 "\x22"  # Length (34)
1008                 "\x40\x04"  # AFI (BGP-LS)
1009                 "\x47"  # SAFI (BGP-LS)
1010                 "\x04"  # Next Hop Length (4)
1011             )
1012             path_attributes_hex += struct.pack(">I", int(next_hop))
1013             path_attributes_hex += "\x00"           # Reserved
1014             path_attributes_hex += (
1015                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1016                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1017                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1018             )
1019             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1020             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1021             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1022             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1023             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1024
1025         # Total Path Attributes Length
1026         total_path_attributes_length = len(path_attributes_hex)
1027         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1028
1029         # Network Layer Reachability Information
1030         nlri_hex = ""
1031         if not self.bgpls:
1032             bytes = ((nlri_prefix_length - 1) / 8) + 1
1033             for prefix in nlri_prefixes:
1034                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1035                                    struct.pack(">I", int(prefix))[:bytes])
1036                 nlri_hex += nlri_prefix_hex
1037
1038         # Length (big-endian)
1039         length = (
1040             len(marker_hex) + 2 + len(type_hex) +
1041             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1042             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1043             len(nlri_hex))
1044         length_hex = struct.pack(">H", length)
1045
1046         # UPDATE Message
1047         message_hex = (
1048             marker_hex +
1049             length_hex +
1050             type_hex +
1051             withdrawn_routes_length_hex +
1052             withdrawn_routes_hex +
1053             total_path_attributes_length_hex +
1054             path_attributes_hex +
1055             nlri_hex
1056         )
1057
1058         if self.log_debug:
1059             logger.debug("UPDATE message encoding")
1060             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1061             logger.debug("  Length=" + str(length) + " (0x" +
1062                          binascii.hexlify(length_hex) + ")")
1063             logger.debug("  Type=" + str(type) + " (0x" +
1064                          binascii.hexlify(type_hex) + ")")
1065             logger.debug("  withdrawn_routes_length=" +
1066                          str(withdrawn_routes_length) + " (0x" +
1067                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1068             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1069                          str(wr_prefix_length) + " (0x" +
1070                          binascii.hexlify(withdrawn_routes_hex) + ")")
1071             if total_path_attributes_length:
1072                 logger.debug("  Total Path Attributes Length=" +
1073                              str(total_path_attributes_length) + " (0x" +
1074                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1075                 logger.debug("  Path Attributes=" + "(0x" +
1076                              binascii.hexlify(path_attributes_hex) + ")")
1077                 logger.debug("    Origin=IGP")
1078                 logger.debug("    AS path=" + str(my_autonomous_system))
1079                 logger.debug("    Next hop=" + str(next_hop))
1080                 if originator_id is not None:
1081                     logger.debug("    Originator id=" + str(originator_id))
1082                 if cluster_list_item is not None:
1083                     logger.debug("    Cluster list=" + str(cluster_list_item))
1084                 if self.bgpls:
1085                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1086             logger.debug("  Network Layer Reachability Information=" +
1087                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1088                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1089             logger.debug("UPDATE message encoded: 0x" +
1090                          binascii.b2a_hex(message_hex))
1091
1092         # updating counter
1093         self.updates_sent += 1
1094         # returning encoded message
1095         return message_hex
1096
1097     def notification_message(self, error_code, error_subcode, data_hex=""):
1098         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1099
1100         Arguments:
1101             :param error_code: see the rfc4271#section-4.5
1102             :param error_subcode: see the rfc4271#section-4.5
1103             :param data_hex: see the rfc4271#section-4.5
1104         Returns:
1105             :return: encoded NOTIFICATION message in HEX
1106         """
1107
1108         # Marker
1109         marker_hex = "\xFF" * 16
1110
1111         # Type
1112         type = 3
1113         type_hex = struct.pack("B", type)
1114
1115         # Error Code
1116         error_code_hex = struct.pack("B", error_code)
1117
1118         # Error Subode
1119         error_subcode_hex = struct.pack("B", error_subcode)
1120
1121         # Length (big-endian)
1122         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1123                   len(error_subcode_hex) + len(data_hex))
1124         length_hex = struct.pack(">H", length)
1125
1126         # NOTIFICATION Message
1127         message_hex = (
1128             marker_hex +
1129             length_hex +
1130             type_hex +
1131             error_code_hex +
1132             error_subcode_hex +
1133             data_hex
1134         )
1135
1136         if self.log_debug:
1137             logger.debug("NOTIFICATION message encoding")
1138             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1139             logger.debug("  Length=" + str(length) + " (0x" +
1140                          binascii.hexlify(length_hex) + ")")
1141             logger.debug("  Type=" + str(type) + " (0x" +
1142                          binascii.hexlify(type_hex) + ")")
1143             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1144                          binascii.hexlify(error_code_hex) + ")")
1145             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1146                          binascii.hexlify(error_subcode_hex) + ")")
1147             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1148             logger.debug("NOTIFICATION message encoded: 0x%s",
1149                          binascii.b2a_hex(message_hex))
1150
1151         return message_hex
1152
1153     def keepalive_message(self):
1154         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1155
1156         Returns:
1157             :return: encoded KEEP ALIVE message in HEX
1158         """
1159
1160         # Marker
1161         marker_hex = "\xFF" * 16
1162
1163         # Type
1164         type = 4
1165         type_hex = struct.pack("B", type)
1166
1167         # Length (big-endian)
1168         length = len(marker_hex) + 2 + len(type_hex)
1169         length_hex = struct.pack(">H", length)
1170
1171         # KEEP ALIVE Message
1172         message_hex = (
1173             marker_hex +
1174             length_hex +
1175             type_hex
1176         )
1177
1178         if self.log_debug:
1179             logger.debug("KEEP ALIVE message encoding")
1180             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1181             logger.debug("  Length=" + str(length) + " (0x" +
1182                          binascii.hexlify(length_hex) + ")")
1183             logger.debug("  Type=" + str(type) + " (0x" +
1184                          binascii.hexlify(type_hex) + ")")
1185             logger.debug("KEEP ALIVE message encoded: 0x%s",
1186                          binascii.b2a_hex(message_hex))
1187
1188         return message_hex
1189
1190
1191 class TimeTracker(object):
1192     """Class for tracking timers, both for my keepalives and
1193     peer's hold time.
1194     """
1195
1196     def __init__(self, msg_in):
1197         """Initialisation. based on defaults and OPEN message from peer.
1198
1199         Arguments:
1200             msg_in: the OPEN message received from peer.
1201         """
1202         # Note: Relative time is always named timedelta, to stress that
1203         # the (non-delta) time is absolute.
1204         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1205         # Upper bound for being stuck in the same state, we should
1206         # at least report something before continuing.
1207         # Negotiate the hold timer by taking the smaller
1208         # of the 2 values (mine and the peer's).
1209         hold_timedelta = 180  # Not an attribute of self yet.
1210         # TODO: Make the default value configurable,
1211         # default value could mirror what peer said.
1212         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1213         if hold_timedelta > peer_hold_timedelta:
1214             hold_timedelta = peer_hold_timedelta
1215         if hold_timedelta != 0 and hold_timedelta < 3:
1216             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1217             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1218         self.hold_timedelta = hold_timedelta
1219         # If we do not hear from peer this long, we assume it has died.
1220         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1221         # Upper limit for duration between messages, to avoid being
1222         # declared to be dead.
1223         # The same as calling snapshot(), but also declares a field.
1224         self.snapshot_time = time.time()
1225         # Sometimes we need to store time. This is where to get
1226         # the value from afterwards. Time_keepalive may be too strict.
1227         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1228         # At this time point, peer will be declared dead.
1229         self.my_keepalive_time = None  # to be set later
1230         # At this point, we should be sending keepalive message.
1231
1232     def snapshot(self):
1233         """Store current time in instance data to use later."""
1234         # Read as time before something interesting was called.
1235         self.snapshot_time = time.time()
1236
1237     def reset_peer_hold_time(self):
1238         """Move hold time to future as peer has just proven it still lives."""
1239         self.peer_hold_time = time.time() + self.hold_timedelta
1240
1241     # Some methods could rely on self.snapshot_time, but it is better
1242     # to require user to provide it explicitly.
1243     def reset_my_keepalive_time(self, keepalive_time):
1244         """Calculate and set the next my KEEP ALIVE timeout time
1245
1246         Arguments:
1247             :keepalive_time: the initial value of the KEEP ALIVE timer
1248         """
1249         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1250
1251     def is_time_for_my_keepalive(self):
1252         """Check for my KEEP ALIVE timeout occurence"""
1253         if self.hold_timedelta == 0:
1254             return False
1255         return self.snapshot_time >= self.my_keepalive_time
1256
1257     def get_next_event_time(self):
1258         """Set the time of the next expected or to be sent KEEP ALIVE"""
1259         if self.hold_timedelta == 0:
1260             return self.snapshot_time + 86400
1261         return min(self.my_keepalive_time, self.peer_hold_time)
1262
1263     def check_peer_hold_time(self, snapshot_time):
1264         """Raise error if nothing was read from peer until specified time."""
1265         # Hold time = 0 means keepalive checking off.
1266         if self.hold_timedelta != 0:
1267             # time.time() may be too strict
1268             if snapshot_time > self.peer_hold_time:
1269                 logger.error("Peer has overstepped the hold timer.")
1270                 raise RuntimeError("Peer has overstepped the hold timer.")
1271                 # TODO: Include hold_timedelta?
1272                 # TODO: Add notification sending (attempt). That means
1273                 # move to write tracker.
1274
1275
1276 class ReadTracker(object):
1277     """Class for tracking read of mesages chunk by chunk and
1278     for idle waiting.
1279     """
1280
1281     def __init__(self, bgp_socket, timer, storage, evpn=False, wait_for_read=10):
1282         """The reader initialisation.
1283
1284         Arguments:
1285             bgp_socket: socket to be used for sending
1286             timer: timer to be used for scheduling
1287             storage: thread safe dict
1288             evpn: flag that evpn functionality is tested
1289         """
1290         # References to outside objects.
1291         self.socket = bgp_socket
1292         self.timer = timer
1293         # BGP marker length plus length field length.
1294         self.header_length = 18
1295         # TODO: make it class (constant) attribute
1296         # Computation of where next chunk ends depends on whether
1297         # we are beyond length field.
1298         self.reading_header = True
1299         # Countdown towards next size computation.
1300         self.bytes_to_read = self.header_length
1301         # Incremental buffer for message under read.
1302         self.msg_in = ""
1303         # Initialising counters
1304         self.updates_received = 0
1305         self.prefixes_introduced = 0
1306         self.prefixes_withdrawn = 0
1307         self.rx_idle_time = 0
1308         self.rx_activity_detected = True
1309         self.storage = storage
1310         self.evpn = evpn
1311         self.wfr = wait_for_read
1312
1313     def read_message_chunk(self):
1314         """Read up to one message
1315
1316         Note:
1317             Currently it does not return anything.
1318         """
1319         # TODO: We could return the whole message, currently not needed.
1320         # We assume the socket is readable.
1321         chunk_message = self.socket.recv(self.bytes_to_read)
1322         self.msg_in += chunk_message
1323         self.bytes_to_read -= len(chunk_message)
1324         # TODO: bytes_to_read < 0 is not possible, right?
1325         if not self.bytes_to_read:
1326             # Finished reading a logical block.
1327             if self.reading_header:
1328                 # The logical block was a BGP header.
1329                 # Now we know the size of the message.
1330                 self.reading_header = False
1331                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1332                                       self.header_length)
1333             else:  # We have finished reading the body of the message.
1334                 # Peer has just proven it is still alive.
1335                 self.timer.reset_peer_hold_time()
1336                 # TODO: Do we want to count received messages?
1337                 # This version ignores the received message.
1338                 # TODO: Should we do validation and exit on anything
1339                 # besides update or keepalive?
1340                 # Prepare state for reading another message.
1341                 message_type_hex = self.msg_in[self.header_length]
1342                 if message_type_hex == "\x01":
1343                     logger.info("OPEN message received: 0x%s",
1344                                 binascii.b2a_hex(self.msg_in))
1345                 elif message_type_hex == "\x02":
1346                     logger.debug("UPDATE message received: 0x%s",
1347                                  binascii.b2a_hex(self.msg_in))
1348                     self.decode_update_message(self.msg_in)
1349                 elif message_type_hex == "\x03":
1350                     logger.info("NOTIFICATION message received: 0x%s",
1351                                 binascii.b2a_hex(self.msg_in))
1352                 elif message_type_hex == "\x04":
1353                     logger.info("KEEP ALIVE message received: 0x%s",
1354                                 binascii.b2a_hex(self.msg_in))
1355                 else:
1356                     logger.warning("Unexpected message received: 0x%s",
1357                                    binascii.b2a_hex(self.msg_in))
1358                 self.msg_in = ""
1359                 self.reading_header = True
1360                 self.bytes_to_read = self.header_length
1361         # We should not act upon peer_hold_time if we are reading
1362         # something right now.
1363         return
1364
1365     def decode_path_attributes(self, path_attributes_hex):
1366         """Decode the Path Attributes field (rfc4271#section-4.3)
1367
1368         Arguments:
1369             :path_attributes: path_attributes field to be decoded in hex
1370         Returns:
1371             :return: None
1372         """
1373         hex_to_decode = path_attributes_hex
1374
1375         while len(hex_to_decode):
1376             attr_flags_hex = hex_to_decode[0]
1377             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1378 #            attr_optional_bit = attr_flags & 128
1379 #            attr_transitive_bit = attr_flags & 64
1380 #            attr_partial_bit = attr_flags & 32
1381             attr_extended_length_bit = attr_flags & 16
1382
1383             attr_type_code_hex = hex_to_decode[1]
1384             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1385
1386             if attr_extended_length_bit:
1387                 attr_length_hex = hex_to_decode[2:4]
1388                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1389                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1390                 hex_to_decode = hex_to_decode[4 + attr_length:]
1391             else:
1392                 attr_length_hex = hex_to_decode[2]
1393                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1394                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1395                 hex_to_decode = hex_to_decode[3 + attr_length:]
1396
1397             if attr_type_code == 1:
1398                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1399                              binascii.b2a_hex(attr_flags_hex))
1400                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1401             elif attr_type_code == 2:
1402                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1403                              binascii.b2a_hex(attr_flags_hex))
1404                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1405             elif attr_type_code == 3:
1406                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1407                              binascii.b2a_hex(attr_flags_hex))
1408                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1409             elif attr_type_code == 4:
1410                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1411                              binascii.b2a_hex(attr_flags_hex))
1412                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1413             elif attr_type_code == 5:
1414                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1415                              binascii.b2a_hex(attr_flags_hex))
1416                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1417             elif attr_type_code == 6:
1418                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1419                              binascii.b2a_hex(attr_flags_hex))
1420                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1421             elif attr_type_code == 7:
1422                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1423                              binascii.b2a_hex(attr_flags_hex))
1424                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1425             elif attr_type_code == 9:  # rfc4456#section-8
1426                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1427                              binascii.b2a_hex(attr_flags_hex))
1428                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1429             elif attr_type_code == 10:  # rfc4456#section-8
1430                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1431                              binascii.b2a_hex(attr_flags_hex))
1432                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1433             elif attr_type_code == 14:  # rfc4760#section-3
1434                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1435                              binascii.b2a_hex(attr_flags_hex))
1436                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1437                 address_family_identifier_hex = attr_value_hex[0:2]
1438                 logger.debug("  Address Family Identifier=0x%s",
1439                              binascii.b2a_hex(address_family_identifier_hex))
1440                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1441                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1442                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1443                 next_hop_netaddr_len_hex = attr_value_hex[3]
1444                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1445                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1446                              next_hop_netaddr_len,
1447                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1448                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1449                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1450                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1451                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1452                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1453                 logger.debug("  Reserved=0x%s",
1454                              binascii.b2a_hex(reserved_hex))
1455                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1456                 logger.debug("  Network Layer Reachability Information=0x%s",
1457                              binascii.b2a_hex(nlri_hex))
1458                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1459                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1460                 for prefix in nlri_prefix_list:
1461                     logger.debug("  nlri_prefix_received: %s", prefix)
1462                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1463             elif attr_type_code == 15:  # rfc4760#section-4
1464                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1465                              binascii.b2a_hex(attr_flags_hex))
1466                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1467                 address_family_identifier_hex = attr_value_hex[0:2]
1468                 logger.debug("  Address Family Identifier=0x%s",
1469                              binascii.b2a_hex(address_family_identifier_hex))
1470                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1471                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1472                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1473                 wd_hex = attr_value_hex[3:]
1474                 logger.debug("  Withdrawn Routes=0x%s",
1475                              binascii.b2a_hex(wd_hex))
1476                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1477                 logger.debug("  Withdrawn routes prefix list: %s",
1478                              wdr_prefix_list)
1479                 for prefix in wdr_prefix_list:
1480                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1481                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1482             else:
1483                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1484                              binascii.b2a_hex(attr_flags_hex))
1485                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1486         return None
1487
1488     def decode_update_message(self, msg):
1489         """Decode an UPDATE message (rfc4271#section-4.3)
1490
1491         Arguments:
1492             :msg: message to be decoded in hex
1493         Returns:
1494             :return: None
1495         """
1496         logger.debug("Decoding update message:")
1497         # message header - marker
1498         marker_hex = msg[:16]
1499         logger.debug("Message header marker: 0x%s",
1500                      binascii.b2a_hex(marker_hex))
1501         # message header - message length
1502         msg_length_hex = msg[16:18]
1503         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1504         logger.debug("Message lenght: 0x%s (%s)",
1505                      binascii.b2a_hex(msg_length_hex), msg_length)
1506         # message header - message type
1507         msg_type_hex = msg[18:19]
1508         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1509
1510         with self.storage as stor:
1511             # this will replace the previously stored message
1512             stor['update'] = binascii.hexlify(msg)
1513
1514         logger.debug("Evpn {}".format(self.evpn))
1515         if self.evpn:
1516             logger.debug("Skipping update decoding due to evpn data expected")
1517             return
1518
1519         if msg_type == 2:
1520             logger.debug("Message type: 0x%s (update)",
1521                          binascii.b2a_hex(msg_type_hex))
1522             # withdrawn routes length
1523             wdr_length_hex = msg[19:21]
1524             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1525             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1526                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1527             # withdrawn routes
1528             wdr_hex = msg[21:21 + wdr_length]
1529             logger.debug("Withdrawn routes: 0x%s",
1530                          binascii.b2a_hex(wdr_hex))
1531             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1532             logger.debug("Withdrawn routes prefix list: %s",
1533                          wdr_prefix_list)
1534             for prefix in wdr_prefix_list:
1535                 logger.debug("withdrawn_prefix_received: %s", prefix)
1536             # total path attribute length
1537             total_pa_length_offset = 21 + wdr_length
1538             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1539             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1540             logger.debug("Total path attribute lenght: 0x%s (%s)",
1541                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1542             # path attributes
1543             pa_offset = total_pa_length_offset + 2
1544             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1545             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1546             self.decode_path_attributes(pa_hex)
1547             # network layer reachability information length
1548             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1549             logger.debug("Calculated NLRI length: %s", nlri_length)
1550             # network layer reachability information
1551             nlri_offset = pa_offset + total_pa_length
1552             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1553             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1554             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1555             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1556             for prefix in nlri_prefix_list:
1557                 logger.debug("nlri_prefix_received: %s", prefix)
1558             # Updating counters
1559             self.updates_received += 1
1560             self.prefixes_introduced += len(nlri_prefix_list)
1561             self.prefixes_withdrawn += len(wdr_prefix_list)
1562         else:
1563             logger.error("Unexpeced message type 0x%s in 0x%s",
1564                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1565
1566     def wait_for_read(self):
1567         """Read message until timeout (next expected event).
1568
1569         Note:
1570             Used when no more updates has to be sent to avoid busy-wait.
1571             Currently it does not return anything.
1572         """
1573         # Compute time to the first predictable state change
1574         event_time = self.timer.get_next_event_time()
1575         # snapshot_time would be imprecise
1576         wait_timedelta = min(event_time - time.time(), self.wfr)
1577         if wait_timedelta < 0:
1578             # The program got around to waiting to an event in "very near
1579             # future" so late that it became a "past" event, thus tell
1580             # "select" to not wait at all. Passing negative timedelta to
1581             # select() would lead to either waiting forever (for -1) or
1582             # select.error("Invalid parameter") (for everything else).
1583             wait_timedelta = 0
1584         # And wait for event or something to read.
1585
1586         if not self.rx_activity_detected or not (self.updates_received % 100):
1587             # right time to write statistics to the log (not for every update and
1588             # not too frequently to avoid having large log files)
1589             logger.info("total_received_update_message_counter: %s",
1590                         self.updates_received)
1591             logger.info("total_received_nlri_prefix_counter: %s",
1592                         self.prefixes_introduced)
1593             logger.info("total_received_withdrawn_prefix_counter: %s",
1594                         self.prefixes_withdrawn)
1595
1596         start_time = time.time()
1597         select.select([self.socket], [], [self.socket], wait_timedelta)
1598         timedelta = time.time() - start_time
1599         self.rx_idle_time += timedelta
1600         self.rx_activity_detected = timedelta < 1
1601
1602         if not self.rx_activity_detected or not (self.updates_received % 100):
1603             # right time to write statistics to the log (not for every update and
1604             # not too frequently to avoid having large log files)
1605             logger.info("... idle for %.3fs", timedelta)
1606             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1607         return
1608
1609
1610 class WriteTracker(object):
1611     """Class tracking enqueueing messages and sending chunks of them."""
1612
1613     def __init__(self, bgp_socket, generator, timer):
1614         """The writter initialisation.
1615
1616         Arguments:
1617             bgp_socket: socket to be used for sending
1618             generator: generator to be used for message generation
1619             timer: timer to be used for scheduling
1620         """
1621         # References to outside objects,
1622         self.socket = bgp_socket
1623         self.generator = generator
1624         self.timer = timer
1625         # Really new fields.
1626         # TODO: Would attribute docstrings add anything substantial?
1627         self.sending_message = False
1628         self.bytes_to_send = 0
1629         self.msg_out = ""
1630
1631     def enqueue_message_for_sending(self, message):
1632         """Enqueue message and change state.
1633
1634         Arguments:
1635             message: message to be enqueued into the msg_out buffer
1636         """
1637         self.msg_out += message
1638         self.bytes_to_send += len(message)
1639         self.sending_message = True
1640
1641     def send_message_chunk_is_whole(self):
1642         """Send enqueued data from msg_out buffer
1643
1644         Returns:
1645             :return: true if no remaining data to send
1646         """
1647         # We assume there is a msg_out to send and socket is writable.
1648         # print "going to send", repr(self.msg_out)
1649         self.timer.snapshot()
1650         bytes_sent = self.socket.send(self.msg_out)
1651         # Forget the part of message that was sent.
1652         self.msg_out = self.msg_out[bytes_sent:]
1653         self.bytes_to_send -= bytes_sent
1654         if not self.bytes_to_send:
1655             # TODO: Is it possible to hit negative bytes_to_send?
1656             self.sending_message = False
1657             # We should have reset hold timer on peer side.
1658             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1659             # The possible reason for not prioritizing reads is gone.
1660             return True
1661         return False
1662
1663
1664 class StateTracker(object):
1665     """Main loop has state so complex it warrants this separate class."""
1666
1667     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1668         """The state tracker initialisation.
1669
1670         Arguments:
1671             bgp_socket: socket to be used for sending / receiving
1672             generator: generator to be used for message generation
1673             timer: timer to be used for scheduling
1674             inqueue: user initiated messages queue
1675             storage: thread safe dict to store data for the rpc server
1676             cliargs: cli args from the user
1677         """
1678         # References to outside objects.
1679         self.socket = bgp_socket
1680         self.generator = generator
1681         self.timer = timer
1682         # Sub-trackers.
1683         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, wait_for_read=cliargs.wfr)
1684         self.writer = WriteTracker(bgp_socket, generator, timer)
1685         # Prioritization state.
1686         self.prioritize_writing = False
1687         # In general, we prioritize reading over writing. But in order
1688         # not to get blocked by neverending reads, we should
1689         # check whether we are not risking running out of holdtime.
1690         # So in some situations, this field is set to True to attempt
1691         # finishing sending a message, after which this field resets
1692         # back to False.
1693         # TODO: Alternative is to switch fairly between reading and
1694         # writing (called round robin from now on).
1695         # Message counting is done in generator.
1696         self.inqueue = inqueue
1697
1698     def perform_one_loop_iteration(self):
1699         """ The main loop iteration
1700
1701         Notes:
1702             Calculates priority, resolves all conditions, calls
1703             appropriate method and returns to caller to repeat.
1704         """
1705         self.timer.snapshot()
1706         if not self.prioritize_writing:
1707             if self.timer.is_time_for_my_keepalive():
1708                 if not self.writer.sending_message:
1709                     # We need to schedule a keepalive ASAP.
1710                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1711                     logger.info("KEEP ALIVE is sent.")
1712                 # We are sending a message now, so let's prioritize it.
1713                 self.prioritize_writing = True
1714
1715         try:
1716             msg = self.inqueue.get_nowait()
1717             logger.info("Received message: {}".format(msg))
1718             msgbin = binascii.unhexlify(msg)
1719             self.writer.enqueue_message_for_sending(msgbin)
1720         except Queue.Empty:
1721             pass
1722         # Now we know what our priorities are, we have to check
1723         # which actions are available.
1724         # socket.socket() returns three lists,
1725         # we store them to list of lists.
1726         list_list = select.select([self.socket], [self.socket], [self.socket],
1727                                   self.timer.report_timedelta)
1728         read_list, write_list, except_list = list_list
1729         # Lists are unpacked, each is either [] or [self.socket],
1730         # so we will test them as boolean.
1731         if except_list:
1732             logger.error("Exceptional state on the socket.")
1733             raise RuntimeError("Exceptional state on socket", self.socket)
1734         # We will do either read or write.
1735         if not (self.prioritize_writing and write_list):
1736             # Either we have no reason to rush writes,
1737             # or the socket is not writable.
1738             # We are focusing on reading here.
1739             if read_list:  # there is something to read indeed
1740                 # In this case we want to read chunk of message
1741                 # and repeat the select,
1742                 self.reader.read_message_chunk()
1743                 return
1744             # We were focusing on reading, but nothing to read was there.
1745             # Good time to check peer for hold timer.
1746             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1747             # Quiet on the read front, we can have attempt to write.
1748         if write_list:
1749             # Either we really want to reset peer's view of our hold
1750             # timer, or there was nothing to read.
1751             # Were we in the middle of sending a message?
1752             if self.writer.sending_message:
1753                 # Was it the end of a message?
1754                 whole = self.writer.send_message_chunk_is_whole()
1755                 # We were pressed to send something and we did it.
1756                 if self.prioritize_writing and whole:
1757                     # We prioritize reading again.
1758                     self.prioritize_writing = False
1759                 return
1760             # Finally to check if still update messages to be generated.
1761             if self.generator.remaining_prefixes:
1762                 msg_out = self.generator.compose_update_message()
1763                 if not self.generator.remaining_prefixes:
1764                     # We have just finished update generation,
1765                     # end-of-rib is due.
1766                     logger.info("All update messages generated.")
1767                     logger.info("Storing performance results.")
1768                     self.generator.store_results()
1769                     logger.info("Finally an END-OF-RIB is sent.")
1770                     msg_out += self.generator.update_message(wr_prefixes=[],
1771                                                              nlri_prefixes=[],
1772                                                              end_of_rib=True)
1773                 self.writer.enqueue_message_for_sending(msg_out)
1774                 # Attempt for real sending to be done in next iteration.
1775                 return
1776             # Nothing to write anymore.
1777             # To avoid busy loop, we do idle waiting here.
1778             self.reader.wait_for_read()
1779             return
1780         # We can neither read nor write.
1781         logger.warning("Input and output both blocked for " +
1782                        str(self.timer.report_timedelta) + " seconds.")
1783         # FIXME: Are we sure select has been really waiting
1784         # the whole period?
1785         return
1786
1787
1788 def create_logger(loglevel, logfile):
1789     """Create logger object
1790
1791     Arguments:
1792         :loglevel: log level
1793         :logfile: log file name
1794     Returns:
1795         :return: logger object
1796     """
1797     logger = logging.getLogger("logger")
1798     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1799     console_handler = logging.StreamHandler()
1800     file_handler = logging.FileHandler(logfile, mode="w")
1801     console_handler.setFormatter(log_formatter)
1802     file_handler.setFormatter(log_formatter)
1803     logger.addHandler(console_handler)
1804     logger.addHandler(file_handler)
1805     logger.setLevel(loglevel)
1806     return logger
1807
1808
1809 def job(arguments, inqueue, storage):
1810     """One time initialisation and iterations looping.
1811     Notes:
1812         Establish BGP connection and run iterations.
1813
1814     Arguments:
1815         :arguments: Command line arguments
1816         :inqueue: Data to be sent from play.py
1817         :storage: Shared dict for rpc server
1818     Returns:
1819         :return: None
1820     """
1821     bgp_socket = establish_connection(arguments)
1822     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1823     # Receive open message before sending anything.
1824     # FIXME: Add parameter to send default open message first,
1825     # to work with "you first" peers.
1826     msg_in = read_open_message(bgp_socket)
1827     timer = TimeTracker(msg_in)
1828     generator = MessageGenerator(arguments)
1829     msg_out = generator.open_message()
1830     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1831     # Send our open message to the peer.
1832     bgp_socket.send(msg_out)
1833     # Wait for confirming keepalive.
1834     # TODO: Surely in just one packet?
1835     # Using exact keepalive length to not to see possible updates.
1836     msg_in = bgp_socket.recv(19)
1837     if msg_in != generator.keepalive_message():
1838         error_msg = "Open not confirmed by keepalive, instead got"
1839         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1840         raise MessageError(error_msg, msg_in)
1841     timer.reset_peer_hold_time()
1842     # Send the keepalive to indicate the connection is accepted.
1843     timer.snapshot()  # Remember this time.
1844     msg_out = generator.keepalive_message()
1845     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1846     bgp_socket.send(msg_out)
1847     # Use the remembered time.
1848     timer.reset_my_keepalive_time(timer.snapshot_time)
1849     # End of initial handshake phase.
1850     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1851     while True:  # main reactor loop
1852         state.perform_one_loop_iteration()
1853
1854
1855 class Rpcs:
1856     '''Handler for SimpleXMLRPCServer'''
1857     def __init__(self, sendqueue, storage):
1858         '''Init method
1859
1860         Arguments:
1861             :sendqueue: queue for data to be sent towards odl
1862             :storage: thread safe dict
1863         '''
1864         self.queue = sendqueue
1865         self.storage = storage
1866
1867     def send(self, text):
1868         '''Data to be sent
1869
1870         Arguments:
1871             :text: hes string of the data to be sent
1872         '''
1873         self.queue.put(text)
1874
1875     def get(self, text=''):
1876         '''Reads data form the storage
1877
1878         - returns stored data or an empty string, at the moment only
1879           'update' is stored
1880
1881         Arguments:
1882             :text: a key to the storage to get the data
1883         Returns:
1884             :data: stored data
1885         '''
1886         with self.storage as stor:
1887             return stor.get(text, '')
1888
1889     def clean(self, text=''):
1890         '''Cleans data form the storage
1891
1892         Arguments:
1893             :text: a key to the storage to clean the data
1894         '''
1895         with self.storage as stor:
1896             if text in stor:
1897                 del stor[text]
1898
1899
1900 def threaded_job(arguments):
1901     """Run the job threaded
1902
1903     Arguments:
1904         :arguments: Command line arguments
1905     Returns:
1906         :return: None
1907     """
1908     amount_left = arguments.amount
1909     utils_left = arguments.multiplicity
1910     prefix_current = arguments.firstprefix
1911     myip_current = arguments.myip
1912     thread_args = []
1913     rpcqueue = Queue.Queue()
1914     storage = SafeDict()
1915
1916     while 1:
1917         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1918         amount_left -= amount_per_util
1919         utils_left -= 1
1920
1921         args = deepcopy(arguments)
1922         args.amount = amount_per_util
1923         args.firstprefix = prefix_current
1924         args.myip = myip_current
1925         thread_args.append(args)
1926
1927         if not utils_left:
1928             break
1929         prefix_current += amount_per_util * 16
1930         myip_current += 1
1931
1932     try:
1933         # Create threads
1934         for t in thread_args:
1935             thread.start_new_thread(job, (t, rpcqueue, storage))
1936     except Exception:
1937         print "Error: unable to start thread."
1938         raise SystemExit(2)
1939
1940     rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
1941     rpcserver.register_instance(Rpcs(rpcqueue, storage))
1942     rpcserver.serve_forever()
1943
1944
1945 if __name__ == "__main__":
1946     arguments = parse_arguments()
1947     logger = create_logger(arguments.loglevel, arguments.logfile)
1948     threaded_job(arguments)