Bgpcep l3vpn multicast test coverage
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Maximum number of IP prefixes to be announced in one iteration"
69     parser.add_argument("--insert", default="1", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72     str_help = "The number of prefixes to process without withdrawals"
73     parser.add_argument("--prefill", default="0", type=int, help=str_help)
74     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75     parser.add_argument("--updates", choices=["single", "separate"],
76                         default=["separate"], help=str_help)
77     str_help = "Base prefix IP address for prefix generation"
78     parser.add_argument("--firstprefix", default="8.0.1.0",
79                         type=ipaddr.IPv4Address, help=str_help)
80     str_help = "The prefix length."
81     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82     str_help = "Listen for connection, instead of initiating it."
83     parser.add_argument("--listen", action="store_true", help=str_help)
84     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85                 "Default value only suitable for listening.")
86     parser.add_argument("--myip", default="0.0.0.0",
87                         type=ipaddr.IPv4Address, help=str_help)
88     str_help = ("TCP port to bind to when listening or initiating connection." +
89                 "Default only suitable for initiating.")
90     parser.add_argument("--myport", default="0", type=int, help=str_help)
91     str_help = "The IP of the next hop to be placed into the update messages."
92     parser.add_argument("--nexthop", default="192.0.2.1",
93                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94     str_help = "Identifier of the route originator."
95     parser.add_argument("--originator", default=None,
96                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
97     str_help = "Cluster list item identifier."
98     parser.add_argument("--cluster", default=None,
99                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100     str_help = ("Numeric IP Address to try to connect to." +
101                 "Currently no effect in listening mode.")
102     parser.add_argument("--peerip", default="127.0.0.2",
103                         type=ipaddr.IPv4Address, help=str_help)
104     str_help = "TCP port to try to connect to. No effect in listening mode."
105     parser.add_argument("--peerport", default="179", type=int, help=str_help)
106     str_help = "Local hold time."
107     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108     str_help = "Log level (--error, --warning, --info, --debug)"
109     parser.add_argument("--error", dest="loglevel", action="store_const",
110                         const=logging.ERROR, default=logging.INFO,
111                         help=str_help)
112     parser.add_argument("--warning", dest="loglevel", action="store_const",
113                         const=logging.WARNING, default=logging.INFO,
114                         help=str_help)
115     parser.add_argument("--info", dest="loglevel", action="store_const",
116                         const=logging.INFO, default=logging.INFO,
117                         help=str_help)
118     parser.add_argument("--debug", dest="loglevel", action="store_const",
119                         const=logging.DEBUG, default=logging.INFO,
120                         help=str_help)
121     str_help = "Log file name"
122     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123     str_help = "Trailing part of the csv result files for plotting purposes"
124     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125     str_help = "Minimum number of updates to reach to include result into csv."
126     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129     str_help = "Link-State NLRI supported"
130     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131     str_help = "Link-State NLRI: Identifier"
132     parser.add_argument("-lsid", default="1", type=int, help=str_help)
133     str_help = "Link-State NLRI: Tunnel ID"
134     parser.add_argument("-lstid", default="1", type=int, help=str_help)
135     str_help = "Link-State NLRI: LSP ID"
136     parser.add_argument("-lspid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138     parser.add_argument("--lstsaddr", default="1.2.3.4",
139                         type=ipaddr.IPv4Address, help=str_help)
140     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141     parser.add_argument("--lsteaddr", default="5.6.7.8",
142                         type=ipaddr.IPv4Address, help=str_help)
143     str_help = "Link-State NLRI: Identifier Step"
144     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145     str_help = "Link-State NLRI: Tunnel ID Step"
146     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147     str_help = "Link-State NLRI: LSP ID Step"
148     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153     str_help = "How many play utilities are to be started."
154     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
160     Enabling this flag makes the script not decoding the update mesage, because of not\
161     supported decoding for these elements."
162     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
163     str_help = "Open message includes L3VPN-MULTICAST arguments.\
164     Enabling this flag makes the script not decoding the update mesage, because of not\
165     supported decoding for these elements."
166     parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
167     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
168     str_help = "Skipping well known attributes for update message"
169     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
170     arguments = parser.parse_args()
171     if arguments.multiplicity < 1:
172         print "Multiplicity", arguments.multiplicity, "is not positive."
173         raise SystemExit(1)
174     # TODO: Are sanity checks (such as asnumber>=0) required?
175     return arguments
176
177
178 def establish_connection(arguments):
179     """Establish connection to BGP peer.
180
181     Arguments:
182         :arguments: following command-line argumets are used
183             - arguments.myip: local IP address
184             - arguments.myport: local port
185             - arguments.peerip: remote IP address
186             - arguments.peerport: remote port
187     Returns:
188         :return: socket.
189     """
190     if arguments.listen:
191         logger.info("Connecting in the listening mode.")
192         logger.debug("Local IP address: " + str(arguments.myip))
193         logger.debug("Local port: " + str(arguments.myport))
194         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
195         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
196         # bind need single tuple as argument
197         listening_socket.bind((str(arguments.myip), arguments.myport))
198         listening_socket.listen(1)
199         bgp_socket, _ = listening_socket.accept()
200         # TODO: Verify client IP is cotroller IP.
201         listening_socket.close()
202     else:
203         logger.info("Connecting in the talking mode.")
204         logger.debug("Local IP address: " + str(arguments.myip))
205         logger.debug("Local port: " + str(arguments.myport))
206         logger.debug("Remote IP address: " + str(arguments.peerip))
207         logger.debug("Remote port: " + str(arguments.peerport))
208         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
209         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
210         # bind to force specified address and port
211         talking_socket.bind((str(arguments.myip), arguments.myport))
212         # socket does not spead ipaddr, hence str()
213         talking_socket.connect((str(arguments.peerip), arguments.peerport))
214         bgp_socket = talking_socket
215     logger.info("Connected to ODL.")
216     return bgp_socket
217
218
219 def get_short_int_from_message(message, offset=16):
220     """Extract 2-bytes number from provided message.
221
222     Arguments:
223         :message: given message
224         :offset: offset of the short_int inside the message
225     Returns:
226         :return: required short_inf value.
227     Notes:
228         default offset value is the BGP message size offset.
229     """
230     high_byte_int = ord(message[offset])
231     low_byte_int = ord(message[offset + 1])
232     short_int = high_byte_int * 256 + low_byte_int
233     return short_int
234
235
236 def get_prefix_list_from_hex(prefixes_hex):
237     """Get decoded list of prefixes (rfc4271#section-4.3)
238
239     Arguments:
240         :prefixes_hex: list of prefixes to be decoded in hex
241     Returns:
242         :return: list of prefixes in the form of ip address (X.X.X.X/X)
243     """
244     prefix_list = []
245     offset = 0
246     while offset < len(prefixes_hex):
247         prefix_bit_len_hex = prefixes_hex[offset]
248         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
249         prefix_len = ((prefix_bit_len - 1) / 8) + 1
250         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
251         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
252         offset += 1 + prefix_len
253         prefix_list.append(prefix + "/" + str(prefix_bit_len))
254     return prefix_list
255
256
257 class MessageError(ValueError):
258     """Value error with logging optimized for hexlified messages."""
259
260     def __init__(self, text, message, *args):
261         """Initialisation.
262
263         Store and call super init for textual comment,
264         store raw message which caused it.
265         """
266         self.text = text
267         self.msg = message
268         super(MessageError, self).__init__(text, message, *args)
269
270     def __str__(self):
271         """Generate human readable error message.
272
273         Returns:
274             :return: human readable message as string
275         Notes:
276             Use a placeholder string if the message is to be empty.
277         """
278         message = binascii.hexlify(self.msg)
279         if message == "":
280             message = "(empty message)"
281         return self.text + ": " + message
282
283
284 def read_open_message(bgp_socket):
285     """Receive peer's OPEN message
286
287     Arguments:
288         :bgp_socket: the socket to be read
289     Returns:
290         :return: received OPEN message.
291     Notes:
292         Performs just basic incomming message checks
293     """
294     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
295     # TODO: Can the incoming open message be split in more than one packet?
296     # Some validation.
297     if len(msg_in) < 37:
298         # 37 is minimal length of open message with 4-byte AS number.
299         error_msg = (
300             "Message length (" + str(len(msg_in)) + ") is smaller than "
301             "minimal length of OPEN message with 4-byte AS number (37)"
302         )
303         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
304         raise MessageError(error_msg, msg_in)
305     # TODO: We could check BGP marker, but it is defined only later;
306     # decide what to do.
307     reported_length = get_short_int_from_message(msg_in)
308     if len(msg_in) != reported_length:
309         error_msg = (
310             "Expected message length (" + reported_length +
311             ") does not match actual length (" + str(len(msg_in)) + ")"
312         )
313         logger.error(error_msg + binascii.hexlify(msg_in))
314         raise MessageError(error_msg, msg_in)
315     logger.info("Open message received.")
316     return msg_in
317
318
319 class MessageGenerator(object):
320     """Class which generates messages, holds states and configuration values."""
321
322     # TODO: Define bgp marker as a class (constant) variable.
323     def __init__(self, args):
324         """Initialisation according to command-line args.
325
326         Arguments:
327             :args: argsparser's Namespace object which contains command-line
328                 options for MesageGenerator initialisation
329         Notes:
330             Calculates and stores default values used later on for
331             message geeration.
332         """
333         self.total_prefix_amount = args.amount
334         # Number of update messages left to be sent.
335         self.remaining_prefixes = self.total_prefix_amount
336
337         # New parameters initialisation
338         self.iteration = 0
339         self.prefix_base_default = args.firstprefix
340         self.prefix_length_default = args.prefixlen
341         self.wr_prefixes_default = []
342         self.nlri_prefixes_default = []
343         self.version_default = 4
344         self.my_autonomous_system_default = args.asnumber
345         self.hold_time_default = args.holdtime  # Local hold time.
346         self.bgp_identifier_default = int(args.myip)
347         self.next_hop_default = args.nexthop
348         self.originator_id_default = args.originator
349         self.cluster_list_item_default = args.cluster
350         self.single_update_default = args.updates == "single"
351         self.randomize_updates_default = args.updates == "random"
352         self.prefix_count_to_add_default = args.insert
353         self.prefix_count_to_del_default = args.withdraw
354         if self.prefix_count_to_del_default < 0:
355             self.prefix_count_to_del_default = 0
356         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
357             # total number of prefixes must grow to avoid infinite test loop
358             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
359         self.slot_size_default = self.prefix_count_to_add_default
360         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
361         self.results_file_name_default = args.results
362         self.performance_threshold_default = args.threshold
363         self.rfc4760 = args.rfc4760
364         self.bgpls = args.bgpls
365         self.evpn = args.evpn
366         self.mvpn = args.mvpn
367         self.l3vpn_mcast = args.l3vpn_mcast
368         self.skipattr = args.skipattr
369         # Default values when BGP-LS Attributes are used
370         if self.bgpls:
371             self.prefix_count_to_add_default = 1
372             self.prefix_count_to_del_default = 0
373         self.ls_nlri_default = {"Identifier": args.lsid,
374                                 "TunnelID": args.lstid,
375                                 "LSPID": args.lspid,
376                                 "IPv4TunnelSenderAddress": args.lstsaddr,
377                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
378         self.lsid_step = args.lsidstep
379         self.lstid_step = args.lstidstep
380         self.lspid_step = args.lspidstep
381         self.lstsaddr_step = args.lstsaddrstep
382         self.lsteaddr_step = args.lsteaddrstep
383         # Default values used for randomized part
384         s1_slots = ((self.total_prefix_amount -
385                      self.remaining_prefixes_threshold - 1) /
386                     self.prefix_count_to_add_default + 1)
387         s2_slots = ((self.remaining_prefixes_threshold - 1) /
388                     (self.prefix_count_to_add_default -
389                      self.prefix_count_to_del_default) + 1)
390         # S1_First_Index = 0
391         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
392         s2_first_index = s1_slots * self.prefix_count_to_add_default
393         s2_last_index = (s2_first_index +
394                          s2_slots * (self.prefix_count_to_add_default -
395                                      self.prefix_count_to_del_default) - 1)
396         self.slot_gap_default = ((self.total_prefix_amount -
397                                   self.remaining_prefixes_threshold - 1) /
398                                  self.prefix_count_to_add_default + 1)
399         self.randomize_lowest_default = s2_first_index
400         self.randomize_highest_default = s2_last_index
401         # Initialising counters
402         self.phase1_start_time = 0
403         self.phase1_stop_time = 0
404         self.phase2_start_time = 0
405         self.phase2_stop_time = 0
406         self.phase1_updates_sent = 0
407         self.phase2_updates_sent = 0
408         self.updates_sent = 0
409
410         self.log_info = args.loglevel <= logging.INFO
411         self.log_debug = args.loglevel <= logging.DEBUG
412         """
413         Flags needed for the MessageGenerator performance optimization.
414         Calling logger methods each iteration even with proper log level set
415         slows down significantly the MessageGenerator performance.
416         Measured total generation time (1M updates, dry run, error log level):
417         - logging based on basic logger features: 36,2s
418         - logging based on advanced logger features (lazy logging): 21,2s
419         - conditional calling of logger methods enclosed inside condition: 8,6s
420         """
421
422         logger.info("Generator initialisation")
423         logger.info("  Target total number of prefixes to be introduced: " +
424                     str(self.total_prefix_amount))
425         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
426                     str(self.prefix_length_default))
427         logger.info("  My Autonomous System number: " +
428                     str(self.my_autonomous_system_default))
429         logger.info("  My Hold Time: " + str(self.hold_time_default))
430         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
431         logger.info("  Next Hop: " + str(self.next_hop_default))
432         logger.info("  Originator ID: " + str(self.originator_id_default))
433         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
434         logger.info("  Prefix count to be inserted at once: " +
435                     str(self.prefix_count_to_add_default))
436         logger.info("  Prefix count to be withdrawn at once: " +
437                     str(self.prefix_count_to_del_default))
438         logger.info("  Fast pre-fill up to " +
439                     str(self.total_prefix_amount -
440                         self.remaining_prefixes_threshold) + " prefixes")
441         logger.info("  Remaining number of prefixes to be processed " +
442                     "in parallel with withdrawals: " +
443                     str(self.remaining_prefixes_threshold))
444         logger.debug("  Prefix index range used after pre-fill procedure [" +
445                      str(self.randomize_lowest_default) + ", " +
446                      str(self.randomize_highest_default) + "]")
447         if self.single_update_default:
448             logger.info("  Common single UPDATE will be generated " +
449                         "for both NLRI & WITHDRAWN lists")
450         else:
451             logger.info("  Two separate UPDATEs will be generated " +
452                         "for each NLRI & WITHDRAWN lists")
453         if self.randomize_updates_default:
454             logger.info("  Generation of UPDATE messages will be randomized")
455         logger.info("  Let\'s go ...\n")
456
457         # TODO: Notification for hold timer expiration can be handy.
458
459     def store_results(self, file_name=None, threshold=None):
460         """ Stores specified results into files based on file_name value.
461
462         Arguments:
463             :param file_name: Trailing (common) part of result file names
464             :param threshold: Minimum number of sent updates needed for each
465                               result to be included into result csv file
466                               (mainly needed because of the result accuracy)
467         Returns:
468             :return: n/a
469         """
470         # default values handling
471         # TODO optimize default values handling (use e.g. dicionary.update() approach)
472         if file_name is None:
473             file_name = self.results_file_name_default
474         if threshold is None:
475             threshold = self.performance_threshold_default
476         # performance calculation
477         if self.phase1_updates_sent >= threshold:
478             totals1 = self.phase1_updates_sent
479             performance1 = int(self.phase1_updates_sent /
480                                (self.phase1_stop_time - self.phase1_start_time))
481         else:
482             totals1 = None
483             performance1 = None
484         if self.phase2_updates_sent >= threshold:
485             totals2 = self.phase2_updates_sent
486             performance2 = int(self.phase2_updates_sent /
487                                (self.phase2_stop_time - self.phase2_start_time))
488         else:
489             totals2 = None
490             performance2 = None
491
492         logger.info("#" * 10 + " Final results " + "#" * 10)
493         logger.info("Number of iterations: " + str(self.iteration))
494         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
495                     str(self.phase1_updates_sent))
496         logger.info("The pre-fill phase duration: " +
497                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
498         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
499                     str(self.phase2_updates_sent))
500         logger.info("The 2nd test phase duration: " +
501                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
502         logger.info("Threshold for performance reporting: " + str(threshold))
503
504         # making labels
505         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
506                         " route(s) per UPDATE")
507         if self.single_update_default:
508             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
509                                   "/-" + str(self.prefix_count_to_del_default) +
510                                   " routes per UPDATE")
511         else:
512             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
513                                   "/-" + str(self.prefix_count_to_del_default) +
514                                   " routes in two UPDATEs")
515         # collecting capacity and performance results
516         totals = {}
517         performance = {}
518         if totals1 is not None:
519             totals[phase1_label] = totals1
520             performance[phase1_label] = performance1
521         if totals2 is not None:
522             totals[phase2_label] = totals2
523             performance[phase2_label] = performance2
524         self.write_results_to_file(totals, "totals-" + file_name)
525         self.write_results_to_file(performance, "performance-" + file_name)
526
527     def write_results_to_file(self, results, file_name):
528         """Writes results to the csv plot file consumable by Jenkins.
529
530         Arguments:
531             :param file_name: Name of the (csv) file to be created
532         Returns:
533             :return: none
534         """
535         first_line = ""
536         second_line = ""
537         f = open(file_name, "wt")
538         try:
539             for key in sorted(results):
540                 first_line += key + ", "
541                 second_line += str(results[key]) + ", "
542             first_line = first_line[:-2]
543             second_line = second_line[:-2]
544             f.write(first_line + "\n")
545             f.write(second_line + "\n")
546             logger.info("Message generator performance results stored in " +
547                         file_name + ":")
548             logger.info("  " + first_line)
549             logger.info("  " + second_line)
550         finally:
551             f.close()
552
553     # Return pseudo-randomized (reproducible) index for selected range
554     def randomize_index(self, index, lowest=None, highest=None):
555         """Calculates pseudo-randomized index from selected range.
556
557         Arguments:
558             :param index: input index
559             :param lowest: the lowes index from the randomized area
560             :param highest: the highest index from the randomized area
561         Returns:
562             :return: the (pseudo)randomized index
563         Notes:
564             Created just as a fame for future generator enhancement.
565         """
566         # default values handling
567         # TODO optimize default values handling (use e.g. dicionary.update() approach)
568         if lowest is None:
569             lowest = self.randomize_lowest_default
570         if highest is None:
571             highest = self.randomize_highest_default
572         # randomize
573         if (index >= lowest) and (index <= highest):
574             # we are in the randomized range -> shuffle it inside
575             # the range (now just reverse the order)
576             new_index = highest - (index - lowest)
577         else:
578             # we are out of the randomized range -> nothing to do
579             new_index = index
580         return new_index
581
582     def get_ls_nlri_values(self, index):
583         """Generates LS-NLRI parameters.
584         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
585
586         Arguments:
587             :param index: index (iteration)
588         Returns:
589             :return: dictionary of LS NLRI parameters and values
590         """
591         # generating list of LS NLRI parameters
592         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
593         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
594         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
595         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
596         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
597         ls_nlri_values = {"Identifier": identifier,
598                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
599                           "TunnelID": tunnel_id, "LSPID": lsp_id,
600                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
601         return ls_nlri_values
602
603     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
604                         prefix_len=None, prefix_count=None, randomize=None):
605         """Generates list of IP address prefixes.
606
607         Arguments:
608             :param slot_index: index of group of prefix addresses
609             :param slot_size: size of group of prefix addresses
610                 in [number of included prefixes]
611             :param prefix_base: IP address of the first prefix
612                 (slot_index = 0, prefix_index = 0)
613             :param prefix_len: length of the prefix in bites
614                 (the same as size of netmask)
615             :param prefix_count: number of prefixes to be returned
616                 from the specified slot
617         Returns:
618             :return: list of generated IP address prefixes
619         """
620         # default values handling
621         # TODO optimize default values handling (use e.g. dicionary.update() approach)
622         if slot_size is None:
623             slot_size = self.slot_size_default
624         if prefix_base is None:
625             prefix_base = self.prefix_base_default
626         if prefix_len is None:
627             prefix_len = self.prefix_length_default
628         if prefix_count is None:
629             prefix_count = slot_size
630         if randomize is None:
631             randomize = self.randomize_updates_default
632         # generating list of prefixes
633         indexes = []
634         prefixes = []
635         prefix_gap = 2 ** (32 - prefix_len)
636         for i in range(prefix_count):
637             prefix_index = slot_index * slot_size + i
638             if randomize:
639                 prefix_index = self.randomize_index(prefix_index)
640             indexes.append(prefix_index)
641             prefixes.append(prefix_base + prefix_index * prefix_gap)
642         if self.log_debug:
643             logger.debug("  Prefix slot index: " + str(slot_index))
644             logger.debug("  Prefix slot size: " + str(slot_size))
645             logger.debug("  Prefix count: " + str(prefix_count))
646             logger.debug("  Prefix indexes: " + str(indexes))
647             logger.debug("  Prefix list: " + str(prefixes))
648         return prefixes
649
650     def compose_update_message(self, prefix_count_to_add=None,
651                                prefix_count_to_del=None):
652         """Composes an UPDATE message
653
654         Arguments:
655             :param prefix_count_to_add: # of prefixes to put into NLRI list
656             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
657         Returns:
658             :return: encoded UPDATE message in HEX
659         Notes:
660             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
661             lists or common message wich includes both prefix lists.
662             Updates global counters.
663         """
664         # default values handling
665         # TODO optimize default values handling (use e.g. dicionary.update() approach)
666         if prefix_count_to_add is None:
667             prefix_count_to_add = self.prefix_count_to_add_default
668         if prefix_count_to_del is None:
669             prefix_count_to_del = self.prefix_count_to_del_default
670         # logging
671         if self.log_info and not (self.iteration % 1000):
672             logger.info("Iteration: " + str(self.iteration) +
673                         " - total remaining prefixes: " +
674                         str(self.remaining_prefixes))
675         if self.log_debug:
676             logger.debug("#" * 10 + " Iteration: " +
677                          str(self.iteration) + " " + "#" * 10)
678             logger.debug("Remaining prefixes: " +
679                          str(self.remaining_prefixes))
680         # scenario type & one-shot counter
681         straightforward_scenario = (self.remaining_prefixes >
682                                     self.remaining_prefixes_threshold)
683         if straightforward_scenario:
684             prefix_count_to_del = 0
685             if self.log_debug:
686                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
687             if not self.phase1_start_time:
688                 self.phase1_start_time = time.time()
689         else:
690             if self.log_debug:
691                 logger.debug("--- COMBINED SCENARIO ---")
692             if not self.phase2_start_time:
693                 self.phase2_start_time = time.time()
694         # tailor the number of prefixes if needed
695         prefix_count_to_add = (prefix_count_to_del +
696                                min(prefix_count_to_add - prefix_count_to_del,
697                                    self.remaining_prefixes))
698         # prefix slots selection for insertion and withdrawal
699         slot_index_to_add = self.iteration
700         slot_index_to_del = slot_index_to_add - self.slot_gap_default
701         # getting lists of prefixes for insertion in this iteration
702         if self.log_debug:
703             logger.debug("Prefixes to be inserted in this iteration:")
704         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
705                                                   prefix_count=prefix_count_to_add)
706         # getting lists of prefixes for withdrawal in this iteration
707         if self.log_debug:
708             logger.debug("Prefixes to be withdrawn in this iteration:")
709         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
710                                                   prefix_count=prefix_count_to_del)
711         # generating the UPDATE mesage with LS-NLRI only
712         if self.bgpls:
713             ls_nlri = self.get_ls_nlri_values(self.iteration)
714             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
715                                           **ls_nlri)
716         else:
717             # generating the UPDATE message with prefix lists
718             if self.single_update_default:
719                 # Send prefixes to be introduced and withdrawn
720                 # in one UPDATE message
721                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
722                                               nlri_prefixes=prefix_list_to_add)
723             else:
724                 # Send prefixes to be introduced and withdrawn
725                 # in separate UPDATE messages (if needed)
726                 msg_out = self.update_message(wr_prefixes=[],
727                                               nlri_prefixes=prefix_list_to_add)
728                 if prefix_count_to_del:
729                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
730                                                    nlri_prefixes=[])
731         # updating counters - who knows ... maybe I am last time here ;)
732         if straightforward_scenario:
733             self.phase1_stop_time = time.time()
734             self.phase1_updates_sent = self.updates_sent
735         else:
736             self.phase2_stop_time = time.time()
737             self.phase2_updates_sent = (self.updates_sent -
738                                         self.phase1_updates_sent)
739         # updating totals for the next iteration
740         self.iteration += 1
741         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
742         # returning the encoded message
743         return msg_out
744
745     # Section of message encoders
746
747     def open_message(self, version=None, my_autonomous_system=None,
748                      hold_time=None, bgp_identifier=None):
749         """Generates an OPEN Message (rfc4271#section-4.2)
750
751         Arguments:
752             :param version: see the rfc4271#section-4.2
753             :param my_autonomous_system: see the rfc4271#section-4.2
754             :param hold_time: see the rfc4271#section-4.2
755             :param bgp_identifier: see the rfc4271#section-4.2
756         Returns:
757             :return: encoded OPEN message in HEX
758         """
759
760         # default values handling
761         # TODO optimize default values handling (use e.g. dicionary.update() approach)
762         if version is None:
763             version = self.version_default
764         if my_autonomous_system is None:
765             my_autonomous_system = self.my_autonomous_system_default
766         if hold_time is None:
767             hold_time = self.hold_time_default
768         if bgp_identifier is None:
769             bgp_identifier = self.bgp_identifier_default
770
771         # Marker
772         marker_hex = "\xFF" * 16
773
774         # Type
775         type = 1
776         type_hex = struct.pack("B", type)
777
778         # version
779         version_hex = struct.pack("B", version)
780
781         # my_autonomous_system
782         # AS_TRANS value, 23456 decadic.
783         my_autonomous_system_2_bytes = 23456
784         # AS number is mappable to 2 bytes
785         if my_autonomous_system < 65536:
786             my_autonomous_system_2_bytes = my_autonomous_system
787         my_autonomous_system_hex_2_bytes = struct.pack(">H",
788                                                        my_autonomous_system)
789
790         # Hold Time
791         hold_time_hex = struct.pack(">H", hold_time)
792
793         # BGP Identifier
794         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
795
796         # Optional Parameters
797         optional_parameters_hex = ""
798         if self.rfc4760:
799             optional_parameter_hex = (
800                 "\x02"  # Param type ("Capability Ad")
801                 "\x06"  # Length (6 bytes)
802                 "\x01"  # Capability type (NLRI Unicast),
803                         # see RFC 4760, secton 8
804                 "\x04"  # Capability value length
805                 "\x00\x01"  # AFI (Ipv4)
806                 "\x00"  # (reserved)
807                 "\x01"  # SAFI (Unicast)
808             )
809             optional_parameters_hex += optional_parameter_hex
810
811         if self.bgpls:
812             optional_parameter_hex = (
813                 "\x02"  # Param type ("Capability Ad")
814                 "\x06"  # Length (6 bytes)
815                 "\x01"  # Capability type (NLRI Unicast),
816                         # see RFC 4760, secton 8
817                 "\x04"  # Capability value length
818                 "\x40\x04"  # AFI (BGP-LS)
819                 "\x00"  # (reserved)
820                 "\x47"  # SAFI (BGP-LS)
821             )
822             optional_parameters_hex += optional_parameter_hex
823
824         if self.evpn:
825             optional_parameter_hex = (
826                 "\x02"  # Param type ("Capability Ad")
827                 "\x06"  # Length (6 bytes)
828                 "\x01"  # Multiprotocol extetension capability,
829                 "\x04"  # Capability value length
830                 "\x00\x19"  # AFI (L2-VPN)
831                 "\x00"  # (reserved)
832                 "\x46"  # SAFI (EVPN)
833             )
834             optional_parameters_hex += optional_parameter_hex
835
836         if self.mvpn:
837             optional_parameter_hex = (
838                 "\x02"  # Param type ("Capability Ad")
839                 "\x06"  # Length (6 bytes)
840                 "\x01"  # Multiprotocol extetension capability,
841                 "\x04"  # Capability value length
842                 "\x00\x01"  # AFI (IPV4)
843                 "\x00"  # (reserved)
844                 "\x05"  # SAFI (MCAST-VPN)
845             )
846             optional_parameters_hex += optional_parameter_hex
847             optional_parameter_hex = (
848                 "\x02"  # Param type ("Capability Ad")
849                 "\x06"  # Length (6 bytes)
850                 "\x01"  # Multiprotocol extetension capability,
851                 "\x04"  # Capability value length
852                 "\x00\x02"  # AFI (IPV6)
853                 "\x00"  # (reserved)
854                 "\x05"  # SAFI (MCAST-VPN)
855             )
856             optional_parameters_hex += optional_parameter_hex
857
858         if self.l3vpn_mcast:
859             optional_parameter_hex = (
860                 "\x02"  # Param type ("Capability Ad")
861                 "\x06"  # Length (6 bytes)
862                 "\x01"  # Multiprotocol extetension capability,
863                 "\x04"  # Capability value length
864                 "\x00\x01"  # AFI (IPV4)
865                 "\x00"  # (reserved)
866                 "\x81"  # SAFI (L3VPN-MCAST)
867             )
868             optional_parameters_hex += optional_parameter_hex
869             optional_parameter_hex = (
870                 "\x02"  # Param type ("Capability Ad")
871                 "\x06"  # Length (6 bytes)
872                 "\x01"  # Multiprotocol extetension capability,
873                 "\x04"  # Capability value length
874                 "\x00\x02"  # AFI (IPV6)
875                 "\x00"  # (reserved)
876                 "\x81"  # SAFI (L3VPN-MCAST)
877             )
878             optional_parameters_hex += optional_parameter_hex
879
880         optional_parameter_hex = (
881             "\x02"  # Param type ("Capability Ad")
882             "\x06"  # Length (6 bytes)
883             "\x41"  # "32 bit AS Numbers Support"
884                     # (see RFC 6793, section 3)
885             "\x04"  # Capability value length
886         )
887         optional_parameter_hex += (
888             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
889         )
890         optional_parameters_hex += optional_parameter_hex
891
892         # Optional Parameters Length
893         optional_parameters_length = len(optional_parameters_hex)
894         optional_parameters_length_hex = struct.pack("B",
895                                                      optional_parameters_length)
896
897         # Length (big-endian)
898         length = (
899             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
900             len(my_autonomous_system_hex_2_bytes) +
901             len(hold_time_hex) + len(bgp_identifier_hex) +
902             len(optional_parameters_length_hex) +
903             len(optional_parameters_hex)
904         )
905         length_hex = struct.pack(">H", length)
906
907         # OPEN Message
908         message_hex = (
909             marker_hex +
910             length_hex +
911             type_hex +
912             version_hex +
913             my_autonomous_system_hex_2_bytes +
914             hold_time_hex +
915             bgp_identifier_hex +
916             optional_parameters_length_hex +
917             optional_parameters_hex
918         )
919
920         if self.log_debug:
921             logger.debug("OPEN message encoding")
922             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
923             logger.debug("  Length=" + str(length) + " (0x" +
924                          binascii.hexlify(length_hex) + ")")
925             logger.debug("  Type=" + str(type) + " (0x" +
926                          binascii.hexlify(type_hex) + ")")
927             logger.debug("  Version=" + str(version) + " (0x" +
928                          binascii.hexlify(version_hex) + ")")
929             logger.debug("  My Autonomous System=" +
930                          str(my_autonomous_system_2_bytes) + " (0x" +
931                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
932                          ")")
933             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
934                          binascii.hexlify(hold_time_hex) + ")")
935             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
936                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
937             logger.debug("  Optional Parameters Length=" +
938                          str(optional_parameters_length) + " (0x" +
939                          binascii.hexlify(optional_parameters_length_hex) +
940                          ")")
941             logger.debug("  Optional Parameters=0x" +
942                          binascii.hexlify(optional_parameters_hex))
943             logger.debug("OPEN message encoded: 0x%s",
944                          binascii.b2a_hex(message_hex))
945
946         return message_hex
947
948     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
949                        wr_prefix_length=None, nlri_prefix_length=None,
950                        my_autonomous_system=None, next_hop=None,
951                        originator_id=None, cluster_list_item=None,
952                        end_of_rib=False, **ls_nlri_params):
953         """Generates an UPDATE Message (rfc4271#section-4.3)
954
955         Arguments:
956             :param wr_prefixes: see the rfc4271#section-4.3
957             :param nlri_prefixes: see the rfc4271#section-4.3
958             :param wr_prefix_length: see the rfc4271#section-4.3
959             :param nlri_prefix_length: see the rfc4271#section-4.3
960             :param my_autonomous_system: see the rfc4271#section-4.3
961             :param next_hop: see the rfc4271#section-4.3
962         Returns:
963             :return: encoded UPDATE message in HEX
964         """
965
966         # default values handling
967         # TODO optimize default values handling (use e.g. dicionary.update() approach)
968         if wr_prefixes is None:
969             wr_prefixes = self.wr_prefixes_default
970         if nlri_prefixes is None:
971             nlri_prefixes = self.nlri_prefixes_default
972         if wr_prefix_length is None:
973             wr_prefix_length = self.prefix_length_default
974         if nlri_prefix_length is None:
975             nlri_prefix_length = self.prefix_length_default
976         if my_autonomous_system is None:
977             my_autonomous_system = self.my_autonomous_system_default
978         if next_hop is None:
979             next_hop = self.next_hop_default
980         if originator_id is None:
981             originator_id = self.originator_id_default
982         if cluster_list_item is None:
983             cluster_list_item = self.cluster_list_item_default
984         ls_nlri = self.ls_nlri_default.copy()
985         ls_nlri.update(ls_nlri_params)
986
987         # Marker
988         marker_hex = "\xFF" * 16
989
990         # Type
991         type = 2
992         type_hex = struct.pack("B", type)
993
994         # Withdrawn Routes
995         withdrawn_routes_hex = ""
996         if not self.bgpls:
997             bytes = ((wr_prefix_length - 1) / 8) + 1
998             for prefix in wr_prefixes:
999                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1000                                        struct.pack(">I", int(prefix))[:bytes])
1001                 withdrawn_routes_hex += withdrawn_route_hex
1002
1003         # Withdrawn Routes Length
1004         withdrawn_routes_length = len(withdrawn_routes_hex)
1005         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1006
1007         # TODO: to replace hardcoded string by encoding?
1008         # Path Attributes
1009         path_attributes_hex = ""
1010         if not self.skipattr:
1011             path_attributes_hex += (
1012                 "\x40"  # Flags ("Well-Known")
1013                 "\x01"  # Type (ORIGIN)
1014                 "\x01"  # Length (1)
1015                 "\x00"  # Origin: IGP
1016             )
1017             path_attributes_hex += (
1018                 "\x40"  # Flags ("Well-Known")
1019                 "\x02"  # Type (AS_PATH)
1020                 "\x06"  # Length (6)
1021                 "\x02"  # AS segment type (AS_SEQUENCE)
1022                 "\x01"  # AS segment length (1)
1023             )
1024             my_as_hex = struct.pack(">I", my_autonomous_system)
1025             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1026             path_attributes_hex += (
1027                 "\x40"  # Flags ("Well-Known")
1028                 "\x05"  # Type (LOCAL_PREF)
1029                 "\x04"  # Length (4)
1030                 "\x00\x00\x00\x64"  # (100)
1031             )
1032         if nlri_prefixes != []:
1033             path_attributes_hex += (
1034                 "\x40"  # Flags ("Well-Known")
1035                 "\x03"  # Type (NEXT_HOP)
1036                 "\x04"  # Length (4)
1037             )
1038             next_hop_hex = struct.pack(">I", int(next_hop))
1039             path_attributes_hex += (
1040                 next_hop_hex  # IP address of the next hop (4 bytes)
1041             )
1042             if originator_id is not None:
1043                 path_attributes_hex += (
1044                     "\x80"  # Flags ("Optional, non-transitive")
1045                     "\x09"  # Type (ORIGINATOR_ID)
1046                     "\x04"  # Length (4)
1047                 )           # ORIGINATOR_ID (4 bytes)
1048                 path_attributes_hex += struct.pack(">I", int(originator_id))
1049             if cluster_list_item is not None:
1050                 path_attributes_hex += (
1051                     "\x80"  # Flags ("Optional, non-transitive")
1052                     "\x0a"  # Type (CLUSTER_LIST)
1053                     "\x04"  # Length (4)
1054                 )           # one CLUSTER_LIST item (4 bytes)
1055                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1056
1057         if self.bgpls and not end_of_rib:
1058             path_attributes_hex += (
1059                 "\x80"  # Flags ("Optional, non-transitive")
1060                 "\x0e"  # Type (MP_REACH_NLRI)
1061                 "\x22"  # Length (34)
1062                 "\x40\x04"  # AFI (BGP-LS)
1063                 "\x47"  # SAFI (BGP-LS)
1064                 "\x04"  # Next Hop Length (4)
1065             )
1066             path_attributes_hex += struct.pack(">I", int(next_hop))
1067             path_attributes_hex += "\x00"           # Reserved
1068             path_attributes_hex += (
1069                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1070                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1071                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1072             )
1073             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1074             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1075             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1076             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1077             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1078
1079         # Total Path Attributes Length
1080         total_path_attributes_length = len(path_attributes_hex)
1081         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1082
1083         # Network Layer Reachability Information
1084         nlri_hex = ""
1085         if not self.bgpls:
1086             bytes = ((nlri_prefix_length - 1) / 8) + 1
1087             for prefix in nlri_prefixes:
1088                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1089                                    struct.pack(">I", int(prefix))[:bytes])
1090                 nlri_hex += nlri_prefix_hex
1091
1092         # Length (big-endian)
1093         length = (
1094             len(marker_hex) + 2 + len(type_hex) +
1095             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1096             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1097             len(nlri_hex))
1098         length_hex = struct.pack(">H", length)
1099
1100         # UPDATE Message
1101         message_hex = (
1102             marker_hex +
1103             length_hex +
1104             type_hex +
1105             withdrawn_routes_length_hex +
1106             withdrawn_routes_hex +
1107             total_path_attributes_length_hex +
1108             path_attributes_hex +
1109             nlri_hex
1110         )
1111
1112         if self.log_debug:
1113             logger.debug("UPDATE message encoding")
1114             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1115             logger.debug("  Length=" + str(length) + " (0x" +
1116                          binascii.hexlify(length_hex) + ")")
1117             logger.debug("  Type=" + str(type) + " (0x" +
1118                          binascii.hexlify(type_hex) + ")")
1119             logger.debug("  withdrawn_routes_length=" +
1120                          str(withdrawn_routes_length) + " (0x" +
1121                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1122             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1123                          str(wr_prefix_length) + " (0x" +
1124                          binascii.hexlify(withdrawn_routes_hex) + ")")
1125             if total_path_attributes_length:
1126                 logger.debug("  Total Path Attributes Length=" +
1127                              str(total_path_attributes_length) + " (0x" +
1128                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1129                 logger.debug("  Path Attributes=" + "(0x" +
1130                              binascii.hexlify(path_attributes_hex) + ")")
1131                 logger.debug("    Origin=IGP")
1132                 logger.debug("    AS path=" + str(my_autonomous_system))
1133                 logger.debug("    Next hop=" + str(next_hop))
1134                 if originator_id is not None:
1135                     logger.debug("    Originator id=" + str(originator_id))
1136                 if cluster_list_item is not None:
1137                     logger.debug("    Cluster list=" + str(cluster_list_item))
1138                 if self.bgpls:
1139                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1140             logger.debug("  Network Layer Reachability Information=" +
1141                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1142                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1143             logger.debug("UPDATE message encoded: 0x" +
1144                          binascii.b2a_hex(message_hex))
1145
1146         # updating counter
1147         self.updates_sent += 1
1148         # returning encoded message
1149         return message_hex
1150
1151     def notification_message(self, error_code, error_subcode, data_hex=""):
1152         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1153
1154         Arguments:
1155             :param error_code: see the rfc4271#section-4.5
1156             :param error_subcode: see the rfc4271#section-4.5
1157             :param data_hex: see the rfc4271#section-4.5
1158         Returns:
1159             :return: encoded NOTIFICATION message in HEX
1160         """
1161
1162         # Marker
1163         marker_hex = "\xFF" * 16
1164
1165         # Type
1166         type = 3
1167         type_hex = struct.pack("B", type)
1168
1169         # Error Code
1170         error_code_hex = struct.pack("B", error_code)
1171
1172         # Error Subode
1173         error_subcode_hex = struct.pack("B", error_subcode)
1174
1175         # Length (big-endian)
1176         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1177                   len(error_subcode_hex) + len(data_hex))
1178         length_hex = struct.pack(">H", length)
1179
1180         # NOTIFICATION Message
1181         message_hex = (
1182             marker_hex +
1183             length_hex +
1184             type_hex +
1185             error_code_hex +
1186             error_subcode_hex +
1187             data_hex
1188         )
1189
1190         if self.log_debug:
1191             logger.debug("NOTIFICATION message encoding")
1192             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1193             logger.debug("  Length=" + str(length) + " (0x" +
1194                          binascii.hexlify(length_hex) + ")")
1195             logger.debug("  Type=" + str(type) + " (0x" +
1196                          binascii.hexlify(type_hex) + ")")
1197             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1198                          binascii.hexlify(error_code_hex) + ")")
1199             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1200                          binascii.hexlify(error_subcode_hex) + ")")
1201             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1202             logger.debug("NOTIFICATION message encoded: 0x%s",
1203                          binascii.b2a_hex(message_hex))
1204
1205         return message_hex
1206
1207     def keepalive_message(self):
1208         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1209
1210         Returns:
1211             :return: encoded KEEP ALIVE message in HEX
1212         """
1213
1214         # Marker
1215         marker_hex = "\xFF" * 16
1216
1217         # Type
1218         type = 4
1219         type_hex = struct.pack("B", type)
1220
1221         # Length (big-endian)
1222         length = len(marker_hex) + 2 + len(type_hex)
1223         length_hex = struct.pack(">H", length)
1224
1225         # KEEP ALIVE Message
1226         message_hex = (
1227             marker_hex +
1228             length_hex +
1229             type_hex
1230         )
1231
1232         if self.log_debug:
1233             logger.debug("KEEP ALIVE message encoding")
1234             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1235             logger.debug("  Length=" + str(length) + " (0x" +
1236                          binascii.hexlify(length_hex) + ")")
1237             logger.debug("  Type=" + str(type) + " (0x" +
1238                          binascii.hexlify(type_hex) + ")")
1239             logger.debug("KEEP ALIVE message encoded: 0x%s",
1240                          binascii.b2a_hex(message_hex))
1241
1242         return message_hex
1243
1244
1245 class TimeTracker(object):
1246     """Class for tracking timers, both for my keepalives and
1247     peer's hold time.
1248     """
1249
1250     def __init__(self, msg_in):
1251         """Initialisation. based on defaults and OPEN message from peer.
1252
1253         Arguments:
1254             msg_in: the OPEN message received from peer.
1255         """
1256         # Note: Relative time is always named timedelta, to stress that
1257         # the (non-delta) time is absolute.
1258         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1259         # Upper bound for being stuck in the same state, we should
1260         # at least report something before continuing.
1261         # Negotiate the hold timer by taking the smaller
1262         # of the 2 values (mine and the peer's).
1263         hold_timedelta = 180  # Not an attribute of self yet.
1264         # TODO: Make the default value configurable,
1265         # default value could mirror what peer said.
1266         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1267         if hold_timedelta > peer_hold_timedelta:
1268             hold_timedelta = peer_hold_timedelta
1269         if hold_timedelta != 0 and hold_timedelta < 3:
1270             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1271             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1272         self.hold_timedelta = hold_timedelta
1273         # If we do not hear from peer this long, we assume it has died.
1274         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1275         # Upper limit for duration between messages, to avoid being
1276         # declared to be dead.
1277         # The same as calling snapshot(), but also declares a field.
1278         self.snapshot_time = time.time()
1279         # Sometimes we need to store time. This is where to get
1280         # the value from afterwards. Time_keepalive may be too strict.
1281         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1282         # At this time point, peer will be declared dead.
1283         self.my_keepalive_time = None  # to be set later
1284         # At this point, we should be sending keepalive message.
1285
1286     def snapshot(self):
1287         """Store current time in instance data to use later."""
1288         # Read as time before something interesting was called.
1289         self.snapshot_time = time.time()
1290
1291     def reset_peer_hold_time(self):
1292         """Move hold time to future as peer has just proven it still lives."""
1293         self.peer_hold_time = time.time() + self.hold_timedelta
1294
1295     # Some methods could rely on self.snapshot_time, but it is better
1296     # to require user to provide it explicitly.
1297     def reset_my_keepalive_time(self, keepalive_time):
1298         """Calculate and set the next my KEEP ALIVE timeout time
1299
1300         Arguments:
1301             :keepalive_time: the initial value of the KEEP ALIVE timer
1302         """
1303         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1304
1305     def is_time_for_my_keepalive(self):
1306         """Check for my KEEP ALIVE timeout occurence"""
1307         if self.hold_timedelta == 0:
1308             return False
1309         return self.snapshot_time >= self.my_keepalive_time
1310
1311     def get_next_event_time(self):
1312         """Set the time of the next expected or to be sent KEEP ALIVE"""
1313         if self.hold_timedelta == 0:
1314             return self.snapshot_time + 86400
1315         return min(self.my_keepalive_time, self.peer_hold_time)
1316
1317     def check_peer_hold_time(self, snapshot_time):
1318         """Raise error if nothing was read from peer until specified time."""
1319         # Hold time = 0 means keepalive checking off.
1320         if self.hold_timedelta != 0:
1321             # time.time() may be too strict
1322             if snapshot_time > self.peer_hold_time:
1323                 logger.error("Peer has overstepped the hold timer.")
1324                 raise RuntimeError("Peer has overstepped the hold timer.")
1325                 # TODO: Include hold_timedelta?
1326                 # TODO: Add notification sending (attempt). That means
1327                 # move to write tracker.
1328
1329
1330 class ReadTracker(object):
1331     """Class for tracking read of mesages chunk by chunk and
1332     for idle waiting.
1333     """
1334
1335     def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1336                  l3vpn_mcast=False, wait_for_read=10):
1337         """The reader initialisation.
1338
1339         Arguments:
1340             bgp_socket: socket to be used for sending
1341             timer: timer to be used for scheduling
1342             storage: thread safe dict
1343             evpn: flag that evpn functionality is tested
1344             mvpn: flag that mvpn functionality is tested
1345             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1346         """
1347         # References to outside objects.
1348         self.socket = bgp_socket
1349         self.timer = timer
1350         # BGP marker length plus length field length.
1351         self.header_length = 18
1352         # TODO: make it class (constant) attribute
1353         # Computation of where next chunk ends depends on whether
1354         # we are beyond length field.
1355         self.reading_header = True
1356         # Countdown towards next size computation.
1357         self.bytes_to_read = self.header_length
1358         # Incremental buffer for message under read.
1359         self.msg_in = ""
1360         # Initialising counters
1361         self.updates_received = 0
1362         self.prefixes_introduced = 0
1363         self.prefixes_withdrawn = 0
1364         self.rx_idle_time = 0
1365         self.rx_activity_detected = True
1366         self.storage = storage
1367         self.evpn = evpn
1368         self.mvpn = mvpn
1369         self.l3vpn_mcast = l3vpn_mcast
1370         self.wfr = wait_for_read
1371
1372     def read_message_chunk(self):
1373         """Read up to one message
1374
1375         Note:
1376             Currently it does not return anything.
1377         """
1378         # TODO: We could return the whole message, currently not needed.
1379         # We assume the socket is readable.
1380         chunk_message = self.socket.recv(self.bytes_to_read)
1381         self.msg_in += chunk_message
1382         self.bytes_to_read -= len(chunk_message)
1383         # TODO: bytes_to_read < 0 is not possible, right?
1384         if not self.bytes_to_read:
1385             # Finished reading a logical block.
1386             if self.reading_header:
1387                 # The logical block was a BGP header.
1388                 # Now we know the size of the message.
1389                 self.reading_header = False
1390                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1391                                       self.header_length)
1392             else:  # We have finished reading the body of the message.
1393                 # Peer has just proven it is still alive.
1394                 self.timer.reset_peer_hold_time()
1395                 # TODO: Do we want to count received messages?
1396                 # This version ignores the received message.
1397                 # TODO: Should we do validation and exit on anything
1398                 # besides update or keepalive?
1399                 # Prepare state for reading another message.
1400                 message_type_hex = self.msg_in[self.header_length]
1401                 if message_type_hex == "\x01":
1402                     logger.info("OPEN message received: 0x%s",
1403                                 binascii.b2a_hex(self.msg_in))
1404                 elif message_type_hex == "\x02":
1405                     logger.debug("UPDATE message received: 0x%s",
1406                                  binascii.b2a_hex(self.msg_in))
1407                     self.decode_update_message(self.msg_in)
1408                 elif message_type_hex == "\x03":
1409                     logger.info("NOTIFICATION message received: 0x%s",
1410                                 binascii.b2a_hex(self.msg_in))
1411                 elif message_type_hex == "\x04":
1412                     logger.info("KEEP ALIVE message received: 0x%s",
1413                                 binascii.b2a_hex(self.msg_in))
1414                 else:
1415                     logger.warning("Unexpected message received: 0x%s",
1416                                    binascii.b2a_hex(self.msg_in))
1417                 self.msg_in = ""
1418                 self.reading_header = True
1419                 self.bytes_to_read = self.header_length
1420         # We should not act upon peer_hold_time if we are reading
1421         # something right now.
1422         return
1423
1424     def decode_path_attributes(self, path_attributes_hex):
1425         """Decode the Path Attributes field (rfc4271#section-4.3)
1426
1427         Arguments:
1428             :path_attributes: path_attributes field to be decoded in hex
1429         Returns:
1430             :return: None
1431         """
1432         hex_to_decode = path_attributes_hex
1433
1434         while len(hex_to_decode):
1435             attr_flags_hex = hex_to_decode[0]
1436             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1437 #            attr_optional_bit = attr_flags & 128
1438 #            attr_transitive_bit = attr_flags & 64
1439 #            attr_partial_bit = attr_flags & 32
1440             attr_extended_length_bit = attr_flags & 16
1441
1442             attr_type_code_hex = hex_to_decode[1]
1443             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1444
1445             if attr_extended_length_bit:
1446                 attr_length_hex = hex_to_decode[2:4]
1447                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1448                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1449                 hex_to_decode = hex_to_decode[4 + attr_length:]
1450             else:
1451                 attr_length_hex = hex_to_decode[2]
1452                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1453                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1454                 hex_to_decode = hex_to_decode[3 + attr_length:]
1455
1456             if attr_type_code == 1:
1457                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1458                              binascii.b2a_hex(attr_flags_hex))
1459                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1460             elif attr_type_code == 2:
1461                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1462                              binascii.b2a_hex(attr_flags_hex))
1463                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1464             elif attr_type_code == 3:
1465                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1466                              binascii.b2a_hex(attr_flags_hex))
1467                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1468             elif attr_type_code == 4:
1469                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1470                              binascii.b2a_hex(attr_flags_hex))
1471                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1472             elif attr_type_code == 5:
1473                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1474                              binascii.b2a_hex(attr_flags_hex))
1475                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1476             elif attr_type_code == 6:
1477                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1478                              binascii.b2a_hex(attr_flags_hex))
1479                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1480             elif attr_type_code == 7:
1481                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1482                              binascii.b2a_hex(attr_flags_hex))
1483                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1484             elif attr_type_code == 9:  # rfc4456#section-8
1485                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1486                              binascii.b2a_hex(attr_flags_hex))
1487                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1488             elif attr_type_code == 10:  # rfc4456#section-8
1489                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1490                              binascii.b2a_hex(attr_flags_hex))
1491                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1492             elif attr_type_code == 14:  # rfc4760#section-3
1493                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1494                              binascii.b2a_hex(attr_flags_hex))
1495                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1496                 address_family_identifier_hex = attr_value_hex[0:2]
1497                 logger.debug("  Address Family Identifier=0x%s",
1498                              binascii.b2a_hex(address_family_identifier_hex))
1499                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1500                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1501                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1502                 next_hop_netaddr_len_hex = attr_value_hex[3]
1503                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1504                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1505                              next_hop_netaddr_len,
1506                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1507                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1508                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1509                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1510                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1511                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1512                 logger.debug("  Reserved=0x%s",
1513                              binascii.b2a_hex(reserved_hex))
1514                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1515                 logger.debug("  Network Layer Reachability Information=0x%s",
1516                              binascii.b2a_hex(nlri_hex))
1517                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1518                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1519                 for prefix in nlri_prefix_list:
1520                     logger.debug("  nlri_prefix_received: %s", prefix)
1521                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1522             elif attr_type_code == 15:  # rfc4760#section-4
1523                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1524                              binascii.b2a_hex(attr_flags_hex))
1525                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1526                 address_family_identifier_hex = attr_value_hex[0:2]
1527                 logger.debug("  Address Family Identifier=0x%s",
1528                              binascii.b2a_hex(address_family_identifier_hex))
1529                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1530                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1531                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1532                 wd_hex = attr_value_hex[3:]
1533                 logger.debug("  Withdrawn Routes=0x%s",
1534                              binascii.b2a_hex(wd_hex))
1535                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1536                 logger.debug("  Withdrawn routes prefix list: %s",
1537                              wdr_prefix_list)
1538                 for prefix in wdr_prefix_list:
1539                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1540                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1541             else:
1542                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1543                              binascii.b2a_hex(attr_flags_hex))
1544                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1545         return None
1546
1547     def decode_update_message(self, msg):
1548         """Decode an UPDATE message (rfc4271#section-4.3)
1549
1550         Arguments:
1551             :msg: message to be decoded in hex
1552         Returns:
1553             :return: None
1554         """
1555         logger.debug("Decoding update message:")
1556         # message header - marker
1557         marker_hex = msg[:16]
1558         logger.debug("Message header marker: 0x%s",
1559                      binascii.b2a_hex(marker_hex))
1560         # message header - message length
1561         msg_length_hex = msg[16:18]
1562         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1563         logger.debug("Message lenght: 0x%s (%s)",
1564                      binascii.b2a_hex(msg_length_hex), msg_length)
1565         # message header - message type
1566         msg_type_hex = msg[18:19]
1567         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1568
1569         with self.storage as stor:
1570             # this will replace the previously stored message
1571             stor['update'] = binascii.hexlify(msg)
1572
1573         logger.debug("Evpn {}".format(self.evpn))
1574         if self.evpn:
1575             logger.debug("Skipping update decoding due to evpn data expected")
1576             return
1577
1578         logger.debug("Mvpn {}".format(self.mvpn))
1579         if self.mvpn:
1580             logger.debug("Skipping update decoding due to mvpn data expected")
1581             return
1582
1583         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1584         if self.l3vpn_mcast:
1585             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1586             return
1587
1588         if msg_type == 2:
1589             logger.debug("Message type: 0x%s (update)",
1590                          binascii.b2a_hex(msg_type_hex))
1591             # withdrawn routes length
1592             wdr_length_hex = msg[19:21]
1593             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1594             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1595                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1596             # withdrawn routes
1597             wdr_hex = msg[21:21 + wdr_length]
1598             logger.debug("Withdrawn routes: 0x%s",
1599                          binascii.b2a_hex(wdr_hex))
1600             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1601             logger.debug("Withdrawn routes prefix list: %s",
1602                          wdr_prefix_list)
1603             for prefix in wdr_prefix_list:
1604                 logger.debug("withdrawn_prefix_received: %s", prefix)
1605             # total path attribute length
1606             total_pa_length_offset = 21 + wdr_length
1607             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1608             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1609             logger.debug("Total path attribute lenght: 0x%s (%s)",
1610                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1611             # path attributes
1612             pa_offset = total_pa_length_offset + 2
1613             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1614             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1615             self.decode_path_attributes(pa_hex)
1616             # network layer reachability information length
1617             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1618             logger.debug("Calculated NLRI length: %s", nlri_length)
1619             # network layer reachability information
1620             nlri_offset = pa_offset + total_pa_length
1621             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1622             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1623             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1624             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1625             for prefix in nlri_prefix_list:
1626                 logger.debug("nlri_prefix_received: %s", prefix)
1627             # Updating counters
1628             self.updates_received += 1
1629             self.prefixes_introduced += len(nlri_prefix_list)
1630             self.prefixes_withdrawn += len(wdr_prefix_list)
1631         else:
1632             logger.error("Unexpeced message type 0x%s in 0x%s",
1633                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1634
1635     def wait_for_read(self):
1636         """Read message until timeout (next expected event).
1637
1638         Note:
1639             Used when no more updates has to be sent to avoid busy-wait.
1640             Currently it does not return anything.
1641         """
1642         # Compute time to the first predictable state change
1643         event_time = self.timer.get_next_event_time()
1644         # snapshot_time would be imprecise
1645         wait_timedelta = min(event_time - time.time(), self.wfr)
1646         if wait_timedelta < 0:
1647             # The program got around to waiting to an event in "very near
1648             # future" so late that it became a "past" event, thus tell
1649             # "select" to not wait at all. Passing negative timedelta to
1650             # select() would lead to either waiting forever (for -1) or
1651             # select.error("Invalid parameter") (for everything else).
1652             wait_timedelta = 0
1653         # And wait for event or something to read.
1654
1655         if not self.rx_activity_detected or not (self.updates_received % 100):
1656             # right time to write statistics to the log (not for every update and
1657             # not too frequently to avoid having large log files)
1658             logger.info("total_received_update_message_counter: %s",
1659                         self.updates_received)
1660             logger.info("total_received_nlri_prefix_counter: %s",
1661                         self.prefixes_introduced)
1662             logger.info("total_received_withdrawn_prefix_counter: %s",
1663                         self.prefixes_withdrawn)
1664
1665         start_time = time.time()
1666         select.select([self.socket], [], [self.socket], wait_timedelta)
1667         timedelta = time.time() - start_time
1668         self.rx_idle_time += timedelta
1669         self.rx_activity_detected = timedelta < 1
1670
1671         if not self.rx_activity_detected or not (self.updates_received % 100):
1672             # right time to write statistics to the log (not for every update and
1673             # not too frequently to avoid having large log files)
1674             logger.info("... idle for %.3fs", timedelta)
1675             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1676         return
1677
1678
1679 class WriteTracker(object):
1680     """Class tracking enqueueing messages and sending chunks of them."""
1681
1682     def __init__(self, bgp_socket, generator, timer):
1683         """The writter initialisation.
1684
1685         Arguments:
1686             bgp_socket: socket to be used for sending
1687             generator: generator to be used for message generation
1688             timer: timer to be used for scheduling
1689         """
1690         # References to outside objects,
1691         self.socket = bgp_socket
1692         self.generator = generator
1693         self.timer = timer
1694         # Really new fields.
1695         # TODO: Would attribute docstrings add anything substantial?
1696         self.sending_message = False
1697         self.bytes_to_send = 0
1698         self.msg_out = ""
1699
1700     def enqueue_message_for_sending(self, message):
1701         """Enqueue message and change state.
1702
1703         Arguments:
1704             message: message to be enqueued into the msg_out buffer
1705         """
1706         self.msg_out += message
1707         self.bytes_to_send += len(message)
1708         self.sending_message = True
1709
1710     def send_message_chunk_is_whole(self):
1711         """Send enqueued data from msg_out buffer
1712
1713         Returns:
1714             :return: true if no remaining data to send
1715         """
1716         # We assume there is a msg_out to send and socket is writable.
1717         # print "going to send", repr(self.msg_out)
1718         self.timer.snapshot()
1719         bytes_sent = self.socket.send(self.msg_out)
1720         # Forget the part of message that was sent.
1721         self.msg_out = self.msg_out[bytes_sent:]
1722         self.bytes_to_send -= bytes_sent
1723         if not self.bytes_to_send:
1724             # TODO: Is it possible to hit negative bytes_to_send?
1725             self.sending_message = False
1726             # We should have reset hold timer on peer side.
1727             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1728             # The possible reason for not prioritizing reads is gone.
1729             return True
1730         return False
1731
1732
1733 class StateTracker(object):
1734     """Main loop has state so complex it warrants this separate class."""
1735
1736     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1737         """The state tracker initialisation.
1738
1739         Arguments:
1740             bgp_socket: socket to be used for sending / receiving
1741             generator: generator to be used for message generation
1742             timer: timer to be used for scheduling
1743             inqueue: user initiated messages queue
1744             storage: thread safe dict to store data for the rpc server
1745             cliargs: cli args from the user
1746         """
1747         # References to outside objects.
1748         self.socket = bgp_socket
1749         self.generator = generator
1750         self.timer = timer
1751         # Sub-trackers.
1752         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn,
1753                                   mvpn=cliargs.mvpn, l3vpn_mcast=cliargs.l3vpn_mcast,
1754                                   wait_for_read=cliargs.wfr)
1755         self.writer = WriteTracker(bgp_socket, generator, timer)
1756         # Prioritization state.
1757         self.prioritize_writing = False
1758         # In general, we prioritize reading over writing. But in order
1759         # not to get blocked by neverending reads, we should
1760         # check whether we are not risking running out of holdtime.
1761         # So in some situations, this field is set to True to attempt
1762         # finishing sending a message, after which this field resets
1763         # back to False.
1764         # TODO: Alternative is to switch fairly between reading and
1765         # writing (called round robin from now on).
1766         # Message counting is done in generator.
1767         self.inqueue = inqueue
1768
1769     def perform_one_loop_iteration(self):
1770         """ The main loop iteration
1771
1772         Notes:
1773             Calculates priority, resolves all conditions, calls
1774             appropriate method and returns to caller to repeat.
1775         """
1776         self.timer.snapshot()
1777         if not self.prioritize_writing:
1778             if self.timer.is_time_for_my_keepalive():
1779                 if not self.writer.sending_message:
1780                     # We need to schedule a keepalive ASAP.
1781                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1782                     logger.info("KEEP ALIVE is sent.")
1783                 # We are sending a message now, so let's prioritize it.
1784                 self.prioritize_writing = True
1785
1786         try:
1787             msg = self.inqueue.get_nowait()
1788             logger.info("Received message: {}".format(msg))
1789             msgbin = binascii.unhexlify(msg)
1790             self.writer.enqueue_message_for_sending(msgbin)
1791         except Queue.Empty:
1792             pass
1793         # Now we know what our priorities are, we have to check
1794         # which actions are available.
1795         # socket.socket() returns three lists,
1796         # we store them to list of lists.
1797         list_list = select.select([self.socket], [self.socket], [self.socket],
1798                                   self.timer.report_timedelta)
1799         read_list, write_list, except_list = list_list
1800         # Lists are unpacked, each is either [] or [self.socket],
1801         # so we will test them as boolean.
1802         if except_list:
1803             logger.error("Exceptional state on the socket.")
1804             raise RuntimeError("Exceptional state on socket", self.socket)
1805         # We will do either read or write.
1806         if not (self.prioritize_writing and write_list):
1807             # Either we have no reason to rush writes,
1808             # or the socket is not writable.
1809             # We are focusing on reading here.
1810             if read_list:  # there is something to read indeed
1811                 # In this case we want to read chunk of message
1812                 # and repeat the select,
1813                 self.reader.read_message_chunk()
1814                 return
1815             # We were focusing on reading, but nothing to read was there.
1816             # Good time to check peer for hold timer.
1817             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1818             # Quiet on the read front, we can have attempt to write.
1819         if write_list:
1820             # Either we really want to reset peer's view of our hold
1821             # timer, or there was nothing to read.
1822             # Were we in the middle of sending a message?
1823             if self.writer.sending_message:
1824                 # Was it the end of a message?
1825                 whole = self.writer.send_message_chunk_is_whole()
1826                 # We were pressed to send something and we did it.
1827                 if self.prioritize_writing and whole:
1828                     # We prioritize reading again.
1829                     self.prioritize_writing = False
1830                 return
1831             # Finally to check if still update messages to be generated.
1832             if self.generator.remaining_prefixes:
1833                 msg_out = self.generator.compose_update_message()
1834                 if not self.generator.remaining_prefixes:
1835                     # We have just finished update generation,
1836                     # end-of-rib is due.
1837                     logger.info("All update messages generated.")
1838                     logger.info("Storing performance results.")
1839                     self.generator.store_results()
1840                     logger.info("Finally an END-OF-RIB is sent.")
1841                     msg_out += self.generator.update_message(wr_prefixes=[],
1842                                                              nlri_prefixes=[],
1843                                                              end_of_rib=True)
1844                 self.writer.enqueue_message_for_sending(msg_out)
1845                 # Attempt for real sending to be done in next iteration.
1846                 return
1847             # Nothing to write anymore.
1848             # To avoid busy loop, we do idle waiting here.
1849             self.reader.wait_for_read()
1850             return
1851         # We can neither read nor write.
1852         logger.warning("Input and output both blocked for " +
1853                        str(self.timer.report_timedelta) + " seconds.")
1854         # FIXME: Are we sure select has been really waiting
1855         # the whole period?
1856         return
1857
1858
1859 def create_logger(loglevel, logfile):
1860     """Create logger object
1861
1862     Arguments:
1863         :loglevel: log level
1864         :logfile: log file name
1865     Returns:
1866         :return: logger object
1867     """
1868     logger = logging.getLogger("logger")
1869     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1870     console_handler = logging.StreamHandler()
1871     file_handler = logging.FileHandler(logfile, mode="w")
1872     console_handler.setFormatter(log_formatter)
1873     file_handler.setFormatter(log_formatter)
1874     logger.addHandler(console_handler)
1875     logger.addHandler(file_handler)
1876     logger.setLevel(loglevel)
1877     return logger
1878
1879
1880 def job(arguments, inqueue, storage):
1881     """One time initialisation and iterations looping.
1882     Notes:
1883         Establish BGP connection and run iterations.
1884
1885     Arguments:
1886         :arguments: Command line arguments
1887         :inqueue: Data to be sent from play.py
1888         :storage: Shared dict for rpc server
1889     Returns:
1890         :return: None
1891     """
1892     bgp_socket = establish_connection(arguments)
1893     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1894     # Receive open message before sending anything.
1895     # FIXME: Add parameter to send default open message first,
1896     # to work with "you first" peers.
1897     msg_in = read_open_message(bgp_socket)
1898     timer = TimeTracker(msg_in)
1899     generator = MessageGenerator(arguments)
1900     msg_out = generator.open_message()
1901     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1902     # Send our open message to the peer.
1903     bgp_socket.send(msg_out)
1904     # Wait for confirming keepalive.
1905     # TODO: Surely in just one packet?
1906     # Using exact keepalive length to not to see possible updates.
1907     msg_in = bgp_socket.recv(19)
1908     if msg_in != generator.keepalive_message():
1909         error_msg = "Open not confirmed by keepalive, instead got"
1910         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1911         raise MessageError(error_msg, msg_in)
1912     timer.reset_peer_hold_time()
1913     # Send the keepalive to indicate the connection is accepted.
1914     timer.snapshot()  # Remember this time.
1915     msg_out = generator.keepalive_message()
1916     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1917     bgp_socket.send(msg_out)
1918     # Use the remembered time.
1919     timer.reset_my_keepalive_time(timer.snapshot_time)
1920     # End of initial handshake phase.
1921     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1922     while True:  # main reactor loop
1923         state.perform_one_loop_iteration()
1924
1925
1926 class Rpcs:
1927     '''Handler for SimpleXMLRPCServer'''
1928
1929     def __init__(self, sendqueue, storage):
1930         '''Init method
1931
1932         Arguments:
1933             :sendqueue: queue for data to be sent towards odl
1934             :storage: thread safe dict
1935         '''
1936         self.queue = sendqueue
1937         self.storage = storage
1938
1939     def send(self, text):
1940         '''Data to be sent
1941
1942         Arguments:
1943             :text: hes string of the data to be sent
1944         '''
1945         self.queue.put(text)
1946
1947     def get(self, text=''):
1948         '''Reads data form the storage
1949
1950         - returns stored data or an empty string, at the moment only
1951           'update' is stored
1952
1953         Arguments:
1954             :text: a key to the storage to get the data
1955         Returns:
1956             :data: stored data
1957         '''
1958         with self.storage as stor:
1959             return stor.get(text, '')
1960
1961     def clean(self, text=''):
1962         '''Cleans data form the storage
1963
1964         Arguments:
1965             :text: a key to the storage to clean the data
1966         '''
1967         with self.storage as stor:
1968             if text in stor:
1969                 del stor[text]
1970
1971
1972 def threaded_job(arguments):
1973     """Run the job threaded
1974
1975     Arguments:
1976         :arguments: Command line arguments
1977     Returns:
1978         :return: None
1979     """
1980     amount_left = arguments.amount
1981     utils_left = arguments.multiplicity
1982     prefix_current = arguments.firstprefix
1983     myip_current = arguments.myip
1984     thread_args = []
1985     rpcqueue = Queue.Queue()
1986     storage = SafeDict()
1987
1988     while 1:
1989         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1990         amount_left -= amount_per_util
1991         utils_left -= 1
1992
1993         args = deepcopy(arguments)
1994         args.amount = amount_per_util
1995         args.firstprefix = prefix_current
1996         args.myip = myip_current
1997         thread_args.append(args)
1998
1999         if not utils_left:
2000             break
2001         prefix_current += amount_per_util * 16
2002         myip_current += 1
2003
2004     try:
2005         # Create threads
2006         for t in thread_args:
2007             thread.start_new_thread(job, (t, rpcqueue, storage))
2008     except Exception:
2009         print "Error: unable to start thread."
2010         raise SystemExit(2)
2011
2012     rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
2013     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2014     rpcserver.serve_forever()
2015
2016
2017 if __name__ == "__main__":
2018     arguments = parse_arguments()
2019     logger = create_logger(arguments.loglevel, arguments.logfile)
2020     threaded_job(arguments)