BGP - route-target-constrain family tests
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Maximum number of IP prefixes to be announced in one iteration"
69     parser.add_argument("--insert", default="1", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72     str_help = "The number of prefixes to process without withdrawals"
73     parser.add_argument("--prefill", default="0", type=int, help=str_help)
74     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75     parser.add_argument("--updates", choices=["single", "separate"],
76                         default=["separate"], help=str_help)
77     str_help = "Base prefix IP address for prefix generation"
78     parser.add_argument("--firstprefix", default="8.0.1.0",
79                         type=ipaddr.IPv4Address, help=str_help)
80     str_help = "The prefix length."
81     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82     str_help = "Listen for connection, instead of initiating it."
83     parser.add_argument("--listen", action="store_true", help=str_help)
84     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85                 "Default value only suitable for listening.")
86     parser.add_argument("--myip", default="0.0.0.0",
87                         type=ipaddr.IPv4Address, help=str_help)
88     str_help = ("TCP port to bind to when listening or initiating connection." +
89                 "Default only suitable for initiating.")
90     parser.add_argument("--myport", default="0", type=int, help=str_help)
91     str_help = "The IP of the next hop to be placed into the update messages."
92     parser.add_argument("--nexthop", default="192.0.2.1",
93                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94     str_help = "Identifier of the route originator."
95     parser.add_argument("--originator", default=None,
96                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
97     str_help = "Cluster list item identifier."
98     parser.add_argument("--cluster", default=None,
99                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100     str_help = ("Numeric IP Address to try to connect to." +
101                 "Currently no effect in listening mode.")
102     parser.add_argument("--peerip", default="127.0.0.2",
103                         type=ipaddr.IPv4Address, help=str_help)
104     str_help = "TCP port to try to connect to. No effect in listening mode."
105     parser.add_argument("--peerport", default="179", type=int, help=str_help)
106     str_help = "Local hold time."
107     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108     str_help = "Log level (--error, --warning, --info, --debug)"
109     parser.add_argument("--error", dest="loglevel", action="store_const",
110                         const=logging.ERROR, default=logging.INFO,
111                         help=str_help)
112     parser.add_argument("--warning", dest="loglevel", action="store_const",
113                         const=logging.WARNING, default=logging.INFO,
114                         help=str_help)
115     parser.add_argument("--info", dest="loglevel", action="store_const",
116                         const=logging.INFO, default=logging.INFO,
117                         help=str_help)
118     parser.add_argument("--debug", dest="loglevel", action="store_const",
119                         const=logging.DEBUG, default=logging.INFO,
120                         help=str_help)
121     str_help = "Log file name"
122     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123     str_help = "Trailing part of the csv result files for plotting purposes"
124     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125     str_help = "Minimum number of updates to reach to include result into csv."
126     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129     str_help = "Link-State NLRI supported"
130     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131     str_help = "Link-State NLRI: Identifier"
132     parser.add_argument("-lsid", default="1", type=int, help=str_help)
133     str_help = "Link-State NLRI: Tunnel ID"
134     parser.add_argument("-lstid", default="1", type=int, help=str_help)
135     str_help = "Link-State NLRI: LSP ID"
136     parser.add_argument("-lspid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138     parser.add_argument("--lstsaddr", default="1.2.3.4",
139                         type=ipaddr.IPv4Address, help=str_help)
140     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141     parser.add_argument("--lsteaddr", default="5.6.7.8",
142                         type=ipaddr.IPv4Address, help=str_help)
143     str_help = "Link-State NLRI: Identifier Step"
144     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145     str_help = "Link-State NLRI: Tunnel ID Step"
146     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147     str_help = "Link-State NLRI: LSP ID Step"
148     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153     str_help = "How many play utilities are to be started."
154     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156     Enabling this flag makes the script not decoding the update mesage, because of not\
157     supported decoding for these elements."
158     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
160     Enabling this flag makes the script not decoding the update mesage, because of not\
161     supported decoding for these elements."
162     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
163     str_help = "Open message includes L3VPN-MULTICAST arguments.\
164     Enabling this flag makes the script not decoding the update mesage, because of not\
165     supported decoding for these elements."
166     parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
167     str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
168     parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
169     str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
170     parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
171     str_help = "Add all supported families without message decoding."
172     parser.add_argument("--allf", default=False, action="store_true", help=str_help)
173     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
174     str_help = "Skipping well known attributes for update message"
175     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
176     arguments = parser.parse_args()
177     if arguments.multiplicity < 1:
178         print "Multiplicity", arguments.multiplicity, "is not positive."
179         raise SystemExit(1)
180     # TODO: Are sanity checks (such as asnumber>=0) required?
181     return arguments
182
183
184 def establish_connection(arguments):
185     """Establish connection to BGP peer.
186
187     Arguments:
188         :arguments: following command-line argumets are used
189             - arguments.myip: local IP address
190             - arguments.myport: local port
191             - arguments.peerip: remote IP address
192             - arguments.peerport: remote port
193     Returns:
194         :return: socket.
195     """
196     if arguments.listen:
197         logger.info("Connecting in the listening mode.")
198         logger.debug("Local IP address: " + str(arguments.myip))
199         logger.debug("Local port: " + str(arguments.myport))
200         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
201         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
202         # bind need single tuple as argument
203         listening_socket.bind((str(arguments.myip), arguments.myport))
204         listening_socket.listen(1)
205         bgp_socket, _ = listening_socket.accept()
206         # TODO: Verify client IP is cotroller IP.
207         listening_socket.close()
208     else:
209         logger.info("Connecting in the talking mode.")
210         logger.debug("Local IP address: " + str(arguments.myip))
211         logger.debug("Local port: " + str(arguments.myport))
212         logger.debug("Remote IP address: " + str(arguments.peerip))
213         logger.debug("Remote port: " + str(arguments.peerport))
214         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
215         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
216         # bind to force specified address and port
217         talking_socket.bind((str(arguments.myip), arguments.myport))
218         # socket does not spead ipaddr, hence str()
219         talking_socket.connect((str(arguments.peerip), arguments.peerport))
220         bgp_socket = talking_socket
221     logger.info("Connected to ODL.")
222     return bgp_socket
223
224
225 def get_short_int_from_message(message, offset=16):
226     """Extract 2-bytes number from provided message.
227
228     Arguments:
229         :message: given message
230         :offset: offset of the short_int inside the message
231     Returns:
232         :return: required short_inf value.
233     Notes:
234         default offset value is the BGP message size offset.
235     """
236     high_byte_int = ord(message[offset])
237     low_byte_int = ord(message[offset + 1])
238     short_int = high_byte_int * 256 + low_byte_int
239     return short_int
240
241
242 def get_prefix_list_from_hex(prefixes_hex):
243     """Get decoded list of prefixes (rfc4271#section-4.3)
244
245     Arguments:
246         :prefixes_hex: list of prefixes to be decoded in hex
247     Returns:
248         :return: list of prefixes in the form of ip address (X.X.X.X/X)
249     """
250     prefix_list = []
251     offset = 0
252     while offset < len(prefixes_hex):
253         prefix_bit_len_hex = prefixes_hex[offset]
254         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
255         prefix_len = ((prefix_bit_len - 1) / 8) + 1
256         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
257         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
258         offset += 1 + prefix_len
259         prefix_list.append(prefix + "/" + str(prefix_bit_len))
260     return prefix_list
261
262
263 class MessageError(ValueError):
264     """Value error with logging optimized for hexlified messages."""
265
266     def __init__(self, text, message, *args):
267         """Initialisation.
268
269         Store and call super init for textual comment,
270         store raw message which caused it.
271         """
272         self.text = text
273         self.msg = message
274         super(MessageError, self).__init__(text, message, *args)
275
276     def __str__(self):
277         """Generate human readable error message.
278
279         Returns:
280             :return: human readable message as string
281         Notes:
282             Use a placeholder string if the message is to be empty.
283         """
284         message = binascii.hexlify(self.msg)
285         if message == "":
286             message = "(empty message)"
287         return self.text + ": " + message
288
289
290 def read_open_message(bgp_socket):
291     """Receive peer's OPEN message
292
293     Arguments:
294         :bgp_socket: the socket to be read
295     Returns:
296         :return: received OPEN message.
297     Notes:
298         Performs just basic incomming message checks
299     """
300     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
301     # TODO: Can the incoming open message be split in more than one packet?
302     # Some validation.
303     if len(msg_in) < 37:
304         # 37 is minimal length of open message with 4-byte AS number.
305         error_msg = (
306             "Message length (" + str(len(msg_in)) + ") is smaller than "
307             "minimal length of OPEN message with 4-byte AS number (37)"
308         )
309         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
310         raise MessageError(error_msg, msg_in)
311     # TODO: We could check BGP marker, but it is defined only later;
312     # decide what to do.
313     reported_length = get_short_int_from_message(msg_in)
314     if len(msg_in) != reported_length:
315         error_msg = (
316             "Expected message length (" + reported_length +
317             ") does not match actual length (" + str(len(msg_in)) + ")"
318         )
319         logger.error(error_msg + binascii.hexlify(msg_in))
320         raise MessageError(error_msg, msg_in)
321     logger.info("Open message received.")
322     return msg_in
323
324
325 class MessageGenerator(object):
326     """Class which generates messages, holds states and configuration values."""
327
328     # TODO: Define bgp marker as a class (constant) variable.
329     def __init__(self, args):
330         """Initialisation according to command-line args.
331
332         Arguments:
333             :args: argsparser's Namespace object which contains command-line
334                 options for MesageGenerator initialisation
335         Notes:
336             Calculates and stores default values used later on for
337             message geeration.
338         """
339         self.total_prefix_amount = args.amount
340         # Number of update messages left to be sent.
341         self.remaining_prefixes = self.total_prefix_amount
342
343         # New parameters initialisation
344         self.iteration = 0
345         self.prefix_base_default = args.firstprefix
346         self.prefix_length_default = args.prefixlen
347         self.wr_prefixes_default = []
348         self.nlri_prefixes_default = []
349         self.version_default = 4
350         self.my_autonomous_system_default = args.asnumber
351         self.hold_time_default = args.holdtime  # Local hold time.
352         self.bgp_identifier_default = int(args.myip)
353         self.next_hop_default = args.nexthop
354         self.originator_id_default = args.originator
355         self.cluster_list_item_default = args.cluster
356         self.single_update_default = args.updates == "single"
357         self.randomize_updates_default = args.updates == "random"
358         self.prefix_count_to_add_default = args.insert
359         self.prefix_count_to_del_default = args.withdraw
360         if self.prefix_count_to_del_default < 0:
361             self.prefix_count_to_del_default = 0
362         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
363             # total number of prefixes must grow to avoid infinite test loop
364             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
365         self.slot_size_default = self.prefix_count_to_add_default
366         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
367         self.results_file_name_default = args.results
368         self.performance_threshold_default = args.threshold
369         self.rfc4760 = args.rfc4760
370         self.bgpls = args.bgpls
371         self.evpn = args.evpn
372         self.mvpn = args.mvpn
373         self.l3vpn_mcast = args.l3vpn_mcast
374         self.l3vpn = args.l3vpn
375         self.rt_constrain = args.rt_constrain
376         self.allf = args.allf
377         self.skipattr = args.skipattr
378         # Default values when BGP-LS Attributes are used
379         if self.bgpls:
380             self.prefix_count_to_add_default = 1
381             self.prefix_count_to_del_default = 0
382         self.ls_nlri_default = {"Identifier": args.lsid,
383                                 "TunnelID": args.lstid,
384                                 "LSPID": args.lspid,
385                                 "IPv4TunnelSenderAddress": args.lstsaddr,
386                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
387         self.lsid_step = args.lsidstep
388         self.lstid_step = args.lstidstep
389         self.lspid_step = args.lspidstep
390         self.lstsaddr_step = args.lstsaddrstep
391         self.lsteaddr_step = args.lsteaddrstep
392         # Default values used for randomized part
393         s1_slots = ((self.total_prefix_amount -
394                      self.remaining_prefixes_threshold - 1) /
395                     self.prefix_count_to_add_default + 1)
396         s2_slots = ((self.remaining_prefixes_threshold - 1) /
397                     (self.prefix_count_to_add_default -
398                      self.prefix_count_to_del_default) + 1)
399         # S1_First_Index = 0
400         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
401         s2_first_index = s1_slots * self.prefix_count_to_add_default
402         s2_last_index = (s2_first_index +
403                          s2_slots * (self.prefix_count_to_add_default -
404                                      self.prefix_count_to_del_default) - 1)
405         self.slot_gap_default = ((self.total_prefix_amount -
406                                   self.remaining_prefixes_threshold - 1) /
407                                  self.prefix_count_to_add_default + 1)
408         self.randomize_lowest_default = s2_first_index
409         self.randomize_highest_default = s2_last_index
410         # Initialising counters
411         self.phase1_start_time = 0
412         self.phase1_stop_time = 0
413         self.phase2_start_time = 0
414         self.phase2_stop_time = 0
415         self.phase1_updates_sent = 0
416         self.phase2_updates_sent = 0
417         self.updates_sent = 0
418
419         self.log_info = args.loglevel <= logging.INFO
420         self.log_debug = args.loglevel <= logging.DEBUG
421         """
422         Flags needed for the MessageGenerator performance optimization.
423         Calling logger methods each iteration even with proper log level set
424         slows down significantly the MessageGenerator performance.
425         Measured total generation time (1M updates, dry run, error log level):
426         - logging based on basic logger features: 36,2s
427         - logging based on advanced logger features (lazy logging): 21,2s
428         - conditional calling of logger methods enclosed inside condition: 8,6s
429         """
430
431         logger.info("Generator initialisation")
432         logger.info("  Target total number of prefixes to be introduced: " +
433                     str(self.total_prefix_amount))
434         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
435                     str(self.prefix_length_default))
436         logger.info("  My Autonomous System number: " +
437                     str(self.my_autonomous_system_default))
438         logger.info("  My Hold Time: " + str(self.hold_time_default))
439         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
440         logger.info("  Next Hop: " + str(self.next_hop_default))
441         logger.info("  Originator ID: " + str(self.originator_id_default))
442         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
443         logger.info("  Prefix count to be inserted at once: " +
444                     str(self.prefix_count_to_add_default))
445         logger.info("  Prefix count to be withdrawn at once: " +
446                     str(self.prefix_count_to_del_default))
447         logger.info("  Fast pre-fill up to " +
448                     str(self.total_prefix_amount -
449                         self.remaining_prefixes_threshold) + " prefixes")
450         logger.info("  Remaining number of prefixes to be processed " +
451                     "in parallel with withdrawals: " +
452                     str(self.remaining_prefixes_threshold))
453         logger.debug("  Prefix index range used after pre-fill procedure [" +
454                      str(self.randomize_lowest_default) + ", " +
455                      str(self.randomize_highest_default) + "]")
456         if self.single_update_default:
457             logger.info("  Common single UPDATE will be generated " +
458                         "for both NLRI & WITHDRAWN lists")
459         else:
460             logger.info("  Two separate UPDATEs will be generated " +
461                         "for each NLRI & WITHDRAWN lists")
462         if self.randomize_updates_default:
463             logger.info("  Generation of UPDATE messages will be randomized")
464         logger.info("  Let\'s go ...\n")
465
466         # TODO: Notification for hold timer expiration can be handy.
467
468     def store_results(self, file_name=None, threshold=None):
469         """ Stores specified results into files based on file_name value.
470
471         Arguments:
472             :param file_name: Trailing (common) part of result file names
473             :param threshold: Minimum number of sent updates needed for each
474                               result to be included into result csv file
475                               (mainly needed because of the result accuracy)
476         Returns:
477             :return: n/a
478         """
479         # default values handling
480         # TODO optimize default values handling (use e.g. dicionary.update() approach)
481         if file_name is None:
482             file_name = self.results_file_name_default
483         if threshold is None:
484             threshold = self.performance_threshold_default
485         # performance calculation
486         if self.phase1_updates_sent >= threshold:
487             totals1 = self.phase1_updates_sent
488             performance1 = int(self.phase1_updates_sent /
489                                (self.phase1_stop_time - self.phase1_start_time))
490         else:
491             totals1 = None
492             performance1 = None
493         if self.phase2_updates_sent >= threshold:
494             totals2 = self.phase2_updates_sent
495             performance2 = int(self.phase2_updates_sent /
496                                (self.phase2_stop_time - self.phase2_start_time))
497         else:
498             totals2 = None
499             performance2 = None
500
501         logger.info("#" * 10 + " Final results " + "#" * 10)
502         logger.info("Number of iterations: " + str(self.iteration))
503         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
504                     str(self.phase1_updates_sent))
505         logger.info("The pre-fill phase duration: " +
506                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
507         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
508                     str(self.phase2_updates_sent))
509         logger.info("The 2nd test phase duration: " +
510                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
511         logger.info("Threshold for performance reporting: " + str(threshold))
512
513         # making labels
514         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
515                         " route(s) per UPDATE")
516         if self.single_update_default:
517             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
518                                   "/-" + str(self.prefix_count_to_del_default) +
519                                   " routes per UPDATE")
520         else:
521             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
522                                   "/-" + str(self.prefix_count_to_del_default) +
523                                   " routes in two UPDATEs")
524         # collecting capacity and performance results
525         totals = {}
526         performance = {}
527         if totals1 is not None:
528             totals[phase1_label] = totals1
529             performance[phase1_label] = performance1
530         if totals2 is not None:
531             totals[phase2_label] = totals2
532             performance[phase2_label] = performance2
533         self.write_results_to_file(totals, "totals-" + file_name)
534         self.write_results_to_file(performance, "performance-" + file_name)
535
536     def write_results_to_file(self, results, file_name):
537         """Writes results to the csv plot file consumable by Jenkins.
538
539         Arguments:
540             :param file_name: Name of the (csv) file to be created
541         Returns:
542             :return: none
543         """
544         first_line = ""
545         second_line = ""
546         f = open(file_name, "wt")
547         try:
548             for key in sorted(results):
549                 first_line += key + ", "
550                 second_line += str(results[key]) + ", "
551             first_line = first_line[:-2]
552             second_line = second_line[:-2]
553             f.write(first_line + "\n")
554             f.write(second_line + "\n")
555             logger.info("Message generator performance results stored in " +
556                         file_name + ":")
557             logger.info("  " + first_line)
558             logger.info("  " + second_line)
559         finally:
560             f.close()
561
562     # Return pseudo-randomized (reproducible) index for selected range
563     def randomize_index(self, index, lowest=None, highest=None):
564         """Calculates pseudo-randomized index from selected range.
565
566         Arguments:
567             :param index: input index
568             :param lowest: the lowes index from the randomized area
569             :param highest: the highest index from the randomized area
570         Returns:
571             :return: the (pseudo)randomized index
572         Notes:
573             Created just as a fame for future generator enhancement.
574         """
575         # default values handling
576         # TODO optimize default values handling (use e.g. dicionary.update() approach)
577         if lowest is None:
578             lowest = self.randomize_lowest_default
579         if highest is None:
580             highest = self.randomize_highest_default
581         # randomize
582         if (index >= lowest) and (index <= highest):
583             # we are in the randomized range -> shuffle it inside
584             # the range (now just reverse the order)
585             new_index = highest - (index - lowest)
586         else:
587             # we are out of the randomized range -> nothing to do
588             new_index = index
589         return new_index
590
591     def get_ls_nlri_values(self, index):
592         """Generates LS-NLRI parameters.
593         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
594
595         Arguments:
596             :param index: index (iteration)
597         Returns:
598             :return: dictionary of LS NLRI parameters and values
599         """
600         # generating list of LS NLRI parameters
601         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
602         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
603         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
604         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
605         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
606         ls_nlri_values = {"Identifier": identifier,
607                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
608                           "TunnelID": tunnel_id, "LSPID": lsp_id,
609                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
610         return ls_nlri_values
611
612     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
613                         prefix_len=None, prefix_count=None, randomize=None):
614         """Generates list of IP address prefixes.
615
616         Arguments:
617             :param slot_index: index of group of prefix addresses
618             :param slot_size: size of group of prefix addresses
619                 in [number of included prefixes]
620             :param prefix_base: IP address of the first prefix
621                 (slot_index = 0, prefix_index = 0)
622             :param prefix_len: length of the prefix in bites
623                 (the same as size of netmask)
624             :param prefix_count: number of prefixes to be returned
625                 from the specified slot
626         Returns:
627             :return: list of generated IP address prefixes
628         """
629         # default values handling
630         # TODO optimize default values handling (use e.g. dicionary.update() approach)
631         if slot_size is None:
632             slot_size = self.slot_size_default
633         if prefix_base is None:
634             prefix_base = self.prefix_base_default
635         if prefix_len is None:
636             prefix_len = self.prefix_length_default
637         if prefix_count is None:
638             prefix_count = slot_size
639         if randomize is None:
640             randomize = self.randomize_updates_default
641         # generating list of prefixes
642         indexes = []
643         prefixes = []
644         prefix_gap = 2 ** (32 - prefix_len)
645         for i in range(prefix_count):
646             prefix_index = slot_index * slot_size + i
647             if randomize:
648                 prefix_index = self.randomize_index(prefix_index)
649             indexes.append(prefix_index)
650             prefixes.append(prefix_base + prefix_index * prefix_gap)
651         if self.log_debug:
652             logger.debug("  Prefix slot index: " + str(slot_index))
653             logger.debug("  Prefix slot size: " + str(slot_size))
654             logger.debug("  Prefix count: " + str(prefix_count))
655             logger.debug("  Prefix indexes: " + str(indexes))
656             logger.debug("  Prefix list: " + str(prefixes))
657         return prefixes
658
659     def compose_update_message(self, prefix_count_to_add=None,
660                                prefix_count_to_del=None):
661         """Composes an UPDATE message
662
663         Arguments:
664             :param prefix_count_to_add: # of prefixes to put into NLRI list
665             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
666         Returns:
667             :return: encoded UPDATE message in HEX
668         Notes:
669             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
670             lists or common message wich includes both prefix lists.
671             Updates global counters.
672         """
673         # default values handling
674         # TODO optimize default values handling (use e.g. dicionary.update() approach)
675         if prefix_count_to_add is None:
676             prefix_count_to_add = self.prefix_count_to_add_default
677         if prefix_count_to_del is None:
678             prefix_count_to_del = self.prefix_count_to_del_default
679         # logging
680         if self.log_info and not (self.iteration % 1000):
681             logger.info("Iteration: " + str(self.iteration) +
682                         " - total remaining prefixes: " +
683                         str(self.remaining_prefixes))
684         if self.log_debug:
685             logger.debug("#" * 10 + " Iteration: " +
686                          str(self.iteration) + " " + "#" * 10)
687             logger.debug("Remaining prefixes: " +
688                          str(self.remaining_prefixes))
689         # scenario type & one-shot counter
690         straightforward_scenario = (self.remaining_prefixes >
691                                     self.remaining_prefixes_threshold)
692         if straightforward_scenario:
693             prefix_count_to_del = 0
694             if self.log_debug:
695                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
696             if not self.phase1_start_time:
697                 self.phase1_start_time = time.time()
698         else:
699             if self.log_debug:
700                 logger.debug("--- COMBINED SCENARIO ---")
701             if not self.phase2_start_time:
702                 self.phase2_start_time = time.time()
703         # tailor the number of prefixes if needed
704         prefix_count_to_add = (prefix_count_to_del +
705                                min(prefix_count_to_add - prefix_count_to_del,
706                                    self.remaining_prefixes))
707         # prefix slots selection for insertion and withdrawal
708         slot_index_to_add = self.iteration
709         slot_index_to_del = slot_index_to_add - self.slot_gap_default
710         # getting lists of prefixes for insertion in this iteration
711         if self.log_debug:
712             logger.debug("Prefixes to be inserted in this iteration:")
713         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
714                                                   prefix_count=prefix_count_to_add)
715         # getting lists of prefixes for withdrawal in this iteration
716         if self.log_debug:
717             logger.debug("Prefixes to be withdrawn in this iteration:")
718         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
719                                                   prefix_count=prefix_count_to_del)
720         # generating the UPDATE mesage with LS-NLRI only
721         if self.bgpls:
722             ls_nlri = self.get_ls_nlri_values(self.iteration)
723             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
724                                           **ls_nlri)
725         else:
726             # generating the UPDATE message with prefix lists
727             if self.single_update_default:
728                 # Send prefixes to be introduced and withdrawn
729                 # in one UPDATE message
730                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
731                                               nlri_prefixes=prefix_list_to_add)
732             else:
733                 # Send prefixes to be introduced and withdrawn
734                 # in separate UPDATE messages (if needed)
735                 msg_out = self.update_message(wr_prefixes=[],
736                                               nlri_prefixes=prefix_list_to_add)
737                 if prefix_count_to_del:
738                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
739                                                    nlri_prefixes=[])
740         # updating counters - who knows ... maybe I am last time here ;)
741         if straightforward_scenario:
742             self.phase1_stop_time = time.time()
743             self.phase1_updates_sent = self.updates_sent
744         else:
745             self.phase2_stop_time = time.time()
746             self.phase2_updates_sent = (self.updates_sent -
747                                         self.phase1_updates_sent)
748         # updating totals for the next iteration
749         self.iteration += 1
750         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
751         # returning the encoded message
752         return msg_out
753
754     # Section of message encoders
755
756     def open_message(self, version=None, my_autonomous_system=None,
757                      hold_time=None, bgp_identifier=None):
758         """Generates an OPEN Message (rfc4271#section-4.2)
759
760         Arguments:
761             :param version: see the rfc4271#section-4.2
762             :param my_autonomous_system: see the rfc4271#section-4.2
763             :param hold_time: see the rfc4271#section-4.2
764             :param bgp_identifier: see the rfc4271#section-4.2
765         Returns:
766             :return: encoded OPEN message in HEX
767         """
768
769         # default values handling
770         # TODO optimize default values handling (use e.g. dicionary.update() approach)
771         if version is None:
772             version = self.version_default
773         if my_autonomous_system is None:
774             my_autonomous_system = self.my_autonomous_system_default
775         if hold_time is None:
776             hold_time = self.hold_time_default
777         if bgp_identifier is None:
778             bgp_identifier = self.bgp_identifier_default
779
780         # Marker
781         marker_hex = "\xFF" * 16
782
783         # Type
784         type = 1
785         type_hex = struct.pack("B", type)
786
787         # version
788         version_hex = struct.pack("B", version)
789
790         # my_autonomous_system
791         # AS_TRANS value, 23456 decadic.
792         my_autonomous_system_2_bytes = 23456
793         # AS number is mappable to 2 bytes
794         if my_autonomous_system < 65536:
795             my_autonomous_system_2_bytes = my_autonomous_system
796         my_autonomous_system_hex_2_bytes = struct.pack(">H",
797                                                        my_autonomous_system)
798
799         # Hold Time
800         hold_time_hex = struct.pack(">H", hold_time)
801
802         # BGP Identifier
803         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
804
805         # Optional Parameters
806         optional_parameters_hex = ""
807         if self.rfc4760 or self.allf:
808             optional_parameter_hex = (
809                 "\x02"  # Param type ("Capability Ad")
810                 "\x06"  # Length (6 bytes)
811                 "\x01"  # Capability type (NLRI Unicast),
812                         # see RFC 4760, secton 8
813                 "\x04"  # Capability value length
814                 "\x00\x01"  # AFI (Ipv4)
815                 "\x00"  # (reserved)
816                 "\x01"  # SAFI (Unicast)
817             )
818             optional_parameters_hex += optional_parameter_hex
819
820         if self.bgpls or self.allf:
821             optional_parameter_hex = (
822                 "\x02"  # Param type ("Capability Ad")
823                 "\x06"  # Length (6 bytes)
824                 "\x01"  # Capability type (NLRI Unicast),
825                         # see RFC 4760, secton 8
826                 "\x04"  # Capability value length
827                 "\x40\x04"  # AFI (BGP-LS)
828                 "\x00"  # (reserved)
829                 "\x47"  # SAFI (BGP-LS)
830             )
831             optional_parameters_hex += optional_parameter_hex
832
833         if self.evpn or self.allf:
834             optional_parameter_hex = (
835                 "\x02"  # Param type ("Capability Ad")
836                 "\x06"  # Length (6 bytes)
837                 "\x01"  # Multiprotocol extetension capability,
838                 "\x04"  # Capability value length
839                 "\x00\x19"  # AFI (L2-VPN)
840                 "\x00"  # (reserved)
841                 "\x46"  # SAFI (EVPN)
842             )
843             optional_parameters_hex += optional_parameter_hex
844
845         if self.mvpn or self.allf:
846             optional_parameter_hex = (
847                 "\x02"  # Param type ("Capability Ad")
848                 "\x06"  # Length (6 bytes)
849                 "\x01"  # Multiprotocol extetension capability,
850                 "\x04"  # Capability value length
851                 "\x00\x01"  # AFI (IPV4)
852                 "\x00"  # (reserved)
853                 "\x05"  # SAFI (MCAST-VPN)
854             )
855             optional_parameters_hex += optional_parameter_hex
856             optional_parameter_hex = (
857                 "\x02"  # Param type ("Capability Ad")
858                 "\x06"  # Length (6 bytes)
859                 "\x01"  # Multiprotocol extetension capability,
860                 "\x04"  # Capability value length
861                 "\x00\x02"  # AFI (IPV6)
862                 "\x00"  # (reserved)
863                 "\x05"  # SAFI (MCAST-VPN)
864             )
865             optional_parameters_hex += optional_parameter_hex
866
867         if self.l3vpn_mcast or self.allf:
868             optional_parameter_hex = (
869                 "\x02"  # Param type ("Capability Ad")
870                 "\x06"  # Length (6 bytes)
871                 "\x01"  # Multiprotocol extetension capability,
872                 "\x04"  # Capability value length
873                 "\x00\x01"  # AFI (IPV4)
874                 "\x00"  # (reserved)
875                 "\x81"  # SAFI (L3VPN-MCAST)
876             )
877             optional_parameters_hex += optional_parameter_hex
878             optional_parameter_hex = (
879                 "\x02"  # Param type ("Capability Ad")
880                 "\x06"  # Length (6 bytes)
881                 "\x01"  # Multiprotocol extetension capability,
882                 "\x04"  # Capability value length
883                 "\x00\x02"  # AFI (IPV6)
884                 "\x00"  # (reserved)
885                 "\x81"  # SAFI (L3VPN-MCAST)
886             )
887             optional_parameters_hex += optional_parameter_hex
888
889         if self.l3vpn or self.allf:
890             optional_parameter_hex = (
891                 "\x02"  # Param type ("Capability Ad")
892                 "\x06"  # Length (6 bytes)
893                 "\x01"  # Multiprotocol extetension capability,
894                 "\x04"  # Capability value length
895                 "\x00\x01"  # AFI (IPV4)
896                 "\x00"  # (reserved)
897                 "\x80"  # SAFI (L3VPN-UNICAST)
898             )
899             optional_parameters_hex += optional_parameter_hex
900             optional_parameter_hex = (
901                 "\x02"  # Param type ("Capability Ad")
902                 "\x06"  # Length (6 bytes)
903                 "\x01"  # Multiprotocol extetension capability,
904                 "\x04"  # Capability value length
905                 "\x00\x02"  # AFI (IPV6)
906                 "\x00"  # (reserved)
907                 "\x80"  # SAFI (L3VPN-UNICAST)
908             )
909             optional_parameters_hex += optional_parameter_hex
910
911         if self.rt_constrain or self.allf:
912             optional_parameter_hex = (
913                 "\x02"  # Param type ("Capability Ad")
914                 "\x06"  # Length (6 bytes)
915                 "\x01"  # Multiprotocol extetension capability,
916                 "\x04"  # Capability value length
917                 "\x00\x01"  # AFI (IPV4)
918                 "\x00"  # (reserved)
919                 "\x84"  # SAFI (ROUTE-TARGET-CONSTRAIN)
920             )
921             optional_parameters_hex += optional_parameter_hex
922
923         optional_parameter_hex = (
924             "\x02"  # Param type ("Capability Ad")
925             "\x06"  # Length (6 bytes)
926             "\x41"  # "32 bit AS Numbers Support"
927                     # (see RFC 6793, section 3)
928             "\x04"  # Capability value length
929         )
930         optional_parameter_hex += (
931             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
932         )
933         optional_parameters_hex += optional_parameter_hex
934
935         # Optional Parameters Length
936         optional_parameters_length = len(optional_parameters_hex)
937         optional_parameters_length_hex = struct.pack("B",
938                                                      optional_parameters_length)
939
940         # Length (big-endian)
941         length = (
942             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
943             len(my_autonomous_system_hex_2_bytes) +
944             len(hold_time_hex) + len(bgp_identifier_hex) +
945             len(optional_parameters_length_hex) +
946             len(optional_parameters_hex)
947         )
948         length_hex = struct.pack(">H", length)
949
950         # OPEN Message
951         message_hex = (
952             marker_hex +
953             length_hex +
954             type_hex +
955             version_hex +
956             my_autonomous_system_hex_2_bytes +
957             hold_time_hex +
958             bgp_identifier_hex +
959             optional_parameters_length_hex +
960             optional_parameters_hex
961         )
962
963         if self.log_debug:
964             logger.debug("OPEN message encoding")
965             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
966             logger.debug("  Length=" + str(length) + " (0x" +
967                          binascii.hexlify(length_hex) + ")")
968             logger.debug("  Type=" + str(type) + " (0x" +
969                          binascii.hexlify(type_hex) + ")")
970             logger.debug("  Version=" + str(version) + " (0x" +
971                          binascii.hexlify(version_hex) + ")")
972             logger.debug("  My Autonomous System=" +
973                          str(my_autonomous_system_2_bytes) + " (0x" +
974                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
975                          ")")
976             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
977                          binascii.hexlify(hold_time_hex) + ")")
978             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
979                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
980             logger.debug("  Optional Parameters Length=" +
981                          str(optional_parameters_length) + " (0x" +
982                          binascii.hexlify(optional_parameters_length_hex) +
983                          ")")
984             logger.debug("  Optional Parameters=0x" +
985                          binascii.hexlify(optional_parameters_hex))
986             logger.debug("OPEN message encoded: 0x%s",
987                          binascii.b2a_hex(message_hex))
988
989         return message_hex
990
991     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
992                        wr_prefix_length=None, nlri_prefix_length=None,
993                        my_autonomous_system=None, next_hop=None,
994                        originator_id=None, cluster_list_item=None,
995                        end_of_rib=False, **ls_nlri_params):
996         """Generates an UPDATE Message (rfc4271#section-4.3)
997
998         Arguments:
999             :param wr_prefixes: see the rfc4271#section-4.3
1000             :param nlri_prefixes: see the rfc4271#section-4.3
1001             :param wr_prefix_length: see the rfc4271#section-4.3
1002             :param nlri_prefix_length: see the rfc4271#section-4.3
1003             :param my_autonomous_system: see the rfc4271#section-4.3
1004             :param next_hop: see the rfc4271#section-4.3
1005         Returns:
1006             :return: encoded UPDATE message in HEX
1007         """
1008
1009         # default values handling
1010         # TODO optimize default values handling (use e.g. dicionary.update() approach)
1011         if wr_prefixes is None:
1012             wr_prefixes = self.wr_prefixes_default
1013         if nlri_prefixes is None:
1014             nlri_prefixes = self.nlri_prefixes_default
1015         if wr_prefix_length is None:
1016             wr_prefix_length = self.prefix_length_default
1017         if nlri_prefix_length is None:
1018             nlri_prefix_length = self.prefix_length_default
1019         if my_autonomous_system is None:
1020             my_autonomous_system = self.my_autonomous_system_default
1021         if next_hop is None:
1022             next_hop = self.next_hop_default
1023         if originator_id is None:
1024             originator_id = self.originator_id_default
1025         if cluster_list_item is None:
1026             cluster_list_item = self.cluster_list_item_default
1027         ls_nlri = self.ls_nlri_default.copy()
1028         ls_nlri.update(ls_nlri_params)
1029
1030         # Marker
1031         marker_hex = "\xFF" * 16
1032
1033         # Type
1034         type = 2
1035         type_hex = struct.pack("B", type)
1036
1037         # Withdrawn Routes
1038         withdrawn_routes_hex = ""
1039         if not self.bgpls:
1040             bytes = ((wr_prefix_length - 1) / 8) + 1
1041             for prefix in wr_prefixes:
1042                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1043                                        struct.pack(">I", int(prefix))[:bytes])
1044                 withdrawn_routes_hex += withdrawn_route_hex
1045
1046         # Withdrawn Routes Length
1047         withdrawn_routes_length = len(withdrawn_routes_hex)
1048         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1049
1050         # TODO: to replace hardcoded string by encoding?
1051         # Path Attributes
1052         path_attributes_hex = ""
1053         if not self.skipattr:
1054             path_attributes_hex += (
1055                 "\x40"  # Flags ("Well-Known")
1056                 "\x01"  # Type (ORIGIN)
1057                 "\x01"  # Length (1)
1058                 "\x00"  # Origin: IGP
1059             )
1060             path_attributes_hex += (
1061                 "\x40"  # Flags ("Well-Known")
1062                 "\x02"  # Type (AS_PATH)
1063                 "\x06"  # Length (6)
1064                 "\x02"  # AS segment type (AS_SEQUENCE)
1065                 "\x01"  # AS segment length (1)
1066             )
1067             my_as_hex = struct.pack(">I", my_autonomous_system)
1068             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1069             path_attributes_hex += (
1070                 "\x40"  # Flags ("Well-Known")
1071                 "\x05"  # Type (LOCAL_PREF)
1072                 "\x04"  # Length (4)
1073                 "\x00\x00\x00\x64"  # (100)
1074             )
1075         if nlri_prefixes != []:
1076             path_attributes_hex += (
1077                 "\x40"  # Flags ("Well-Known")
1078                 "\x03"  # Type (NEXT_HOP)
1079                 "\x04"  # Length (4)
1080             )
1081             next_hop_hex = struct.pack(">I", int(next_hop))
1082             path_attributes_hex += (
1083                 next_hop_hex  # IP address of the next hop (4 bytes)
1084             )
1085             if originator_id is not None:
1086                 path_attributes_hex += (
1087                     "\x80"  # Flags ("Optional, non-transitive")
1088                     "\x09"  # Type (ORIGINATOR_ID)
1089                     "\x04"  # Length (4)
1090                 )           # ORIGINATOR_ID (4 bytes)
1091                 path_attributes_hex += struct.pack(">I", int(originator_id))
1092             if cluster_list_item is not None:
1093                 path_attributes_hex += (
1094                     "\x80"  # Flags ("Optional, non-transitive")
1095                     "\x0a"  # Type (CLUSTER_LIST)
1096                     "\x04"  # Length (4)
1097                 )           # one CLUSTER_LIST item (4 bytes)
1098                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1099
1100         if self.bgpls and not end_of_rib:
1101             path_attributes_hex += (
1102                 "\x80"  # Flags ("Optional, non-transitive")
1103                 "\x0e"  # Type (MP_REACH_NLRI)
1104                 "\x22"  # Length (34)
1105                 "\x40\x04"  # AFI (BGP-LS)
1106                 "\x47"  # SAFI (BGP-LS)
1107                 "\x04"  # Next Hop Length (4)
1108             )
1109             path_attributes_hex += struct.pack(">I", int(next_hop))
1110             path_attributes_hex += "\x00"           # Reserved
1111             path_attributes_hex += (
1112                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1113                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1114                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1115             )
1116             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1117             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1118             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1119             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1120             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1121
1122         # Total Path Attributes Length
1123         total_path_attributes_length = len(path_attributes_hex)
1124         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1125
1126         # Network Layer Reachability Information
1127         nlri_hex = ""
1128         if not self.bgpls:
1129             bytes = ((nlri_prefix_length - 1) / 8) + 1
1130             for prefix in nlri_prefixes:
1131                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1132                                    struct.pack(">I", int(prefix))[:bytes])
1133                 nlri_hex += nlri_prefix_hex
1134
1135         # Length (big-endian)
1136         length = (
1137             len(marker_hex) + 2 + len(type_hex) +
1138             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1139             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1140             len(nlri_hex))
1141         length_hex = struct.pack(">H", length)
1142
1143         # UPDATE Message
1144         message_hex = (
1145             marker_hex +
1146             length_hex +
1147             type_hex +
1148             withdrawn_routes_length_hex +
1149             withdrawn_routes_hex +
1150             total_path_attributes_length_hex +
1151             path_attributes_hex +
1152             nlri_hex
1153         )
1154
1155         if self.log_debug:
1156             logger.debug("UPDATE message encoding")
1157             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1158             logger.debug("  Length=" + str(length) + " (0x" +
1159                          binascii.hexlify(length_hex) + ")")
1160             logger.debug("  Type=" + str(type) + " (0x" +
1161                          binascii.hexlify(type_hex) + ")")
1162             logger.debug("  withdrawn_routes_length=" +
1163                          str(withdrawn_routes_length) + " (0x" +
1164                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1165             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1166                          str(wr_prefix_length) + " (0x" +
1167                          binascii.hexlify(withdrawn_routes_hex) + ")")
1168             if total_path_attributes_length:
1169                 logger.debug("  Total Path Attributes Length=" +
1170                              str(total_path_attributes_length) + " (0x" +
1171                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1172                 logger.debug("  Path Attributes=" + "(0x" +
1173                              binascii.hexlify(path_attributes_hex) + ")")
1174                 logger.debug("    Origin=IGP")
1175                 logger.debug("    AS path=" + str(my_autonomous_system))
1176                 logger.debug("    Next hop=" + str(next_hop))
1177                 if originator_id is not None:
1178                     logger.debug("    Originator id=" + str(originator_id))
1179                 if cluster_list_item is not None:
1180                     logger.debug("    Cluster list=" + str(cluster_list_item))
1181                 if self.bgpls:
1182                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1183             logger.debug("  Network Layer Reachability Information=" +
1184                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1185                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1186             logger.debug("UPDATE message encoded: 0x" +
1187                          binascii.b2a_hex(message_hex))
1188
1189         # updating counter
1190         self.updates_sent += 1
1191         # returning encoded message
1192         return message_hex
1193
1194     def notification_message(self, error_code, error_subcode, data_hex=""):
1195         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1196
1197         Arguments:
1198             :param error_code: see the rfc4271#section-4.5
1199             :param error_subcode: see the rfc4271#section-4.5
1200             :param data_hex: see the rfc4271#section-4.5
1201         Returns:
1202             :return: encoded NOTIFICATION message in HEX
1203         """
1204
1205         # Marker
1206         marker_hex = "\xFF" * 16
1207
1208         # Type
1209         type = 3
1210         type_hex = struct.pack("B", type)
1211
1212         # Error Code
1213         error_code_hex = struct.pack("B", error_code)
1214
1215         # Error Subode
1216         error_subcode_hex = struct.pack("B", error_subcode)
1217
1218         # Length (big-endian)
1219         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1220                   len(error_subcode_hex) + len(data_hex))
1221         length_hex = struct.pack(">H", length)
1222
1223         # NOTIFICATION Message
1224         message_hex = (
1225             marker_hex +
1226             length_hex +
1227             type_hex +
1228             error_code_hex +
1229             error_subcode_hex +
1230             data_hex
1231         )
1232
1233         if self.log_debug:
1234             logger.debug("NOTIFICATION message encoding")
1235             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1236             logger.debug("  Length=" + str(length) + " (0x" +
1237                          binascii.hexlify(length_hex) + ")")
1238             logger.debug("  Type=" + str(type) + " (0x" +
1239                          binascii.hexlify(type_hex) + ")")
1240             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1241                          binascii.hexlify(error_code_hex) + ")")
1242             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1243                          binascii.hexlify(error_subcode_hex) + ")")
1244             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1245             logger.debug("NOTIFICATION message encoded: 0x%s",
1246                          binascii.b2a_hex(message_hex))
1247
1248         return message_hex
1249
1250     def keepalive_message(self):
1251         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1252
1253         Returns:
1254             :return: encoded KEEP ALIVE message in HEX
1255         """
1256
1257         # Marker
1258         marker_hex = "\xFF" * 16
1259
1260         # Type
1261         type = 4
1262         type_hex = struct.pack("B", type)
1263
1264         # Length (big-endian)
1265         length = len(marker_hex) + 2 + len(type_hex)
1266         length_hex = struct.pack(">H", length)
1267
1268         # KEEP ALIVE Message
1269         message_hex = (
1270             marker_hex +
1271             length_hex +
1272             type_hex
1273         )
1274
1275         if self.log_debug:
1276             logger.debug("KEEP ALIVE message encoding")
1277             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1278             logger.debug("  Length=" + str(length) + " (0x" +
1279                          binascii.hexlify(length_hex) + ")")
1280             logger.debug("  Type=" + str(type) + " (0x" +
1281                          binascii.hexlify(type_hex) + ")")
1282             logger.debug("KEEP ALIVE message encoded: 0x%s",
1283                          binascii.b2a_hex(message_hex))
1284
1285         return message_hex
1286
1287
1288 class TimeTracker(object):
1289     """Class for tracking timers, both for my keepalives and
1290     peer's hold time.
1291     """
1292
1293     def __init__(self, msg_in):
1294         """Initialisation. based on defaults and OPEN message from peer.
1295
1296         Arguments:
1297             msg_in: the OPEN message received from peer.
1298         """
1299         # Note: Relative time is always named timedelta, to stress that
1300         # the (non-delta) time is absolute.
1301         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1302         # Upper bound for being stuck in the same state, we should
1303         # at least report something before continuing.
1304         # Negotiate the hold timer by taking the smaller
1305         # of the 2 values (mine and the peer's).
1306         hold_timedelta = 180  # Not an attribute of self yet.
1307         # TODO: Make the default value configurable,
1308         # default value could mirror what peer said.
1309         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1310         if hold_timedelta > peer_hold_timedelta:
1311             hold_timedelta = peer_hold_timedelta
1312         if hold_timedelta != 0 and hold_timedelta < 3:
1313             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1314             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1315         self.hold_timedelta = hold_timedelta
1316         # If we do not hear from peer this long, we assume it has died.
1317         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1318         # Upper limit for duration between messages, to avoid being
1319         # declared to be dead.
1320         # The same as calling snapshot(), but also declares a field.
1321         self.snapshot_time = time.time()
1322         # Sometimes we need to store time. This is where to get
1323         # the value from afterwards. Time_keepalive may be too strict.
1324         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1325         # At this time point, peer will be declared dead.
1326         self.my_keepalive_time = None  # to be set later
1327         # At this point, we should be sending keepalive message.
1328
1329     def snapshot(self):
1330         """Store current time in instance data to use later."""
1331         # Read as time before something interesting was called.
1332         self.snapshot_time = time.time()
1333
1334     def reset_peer_hold_time(self):
1335         """Move hold time to future as peer has just proven it still lives."""
1336         self.peer_hold_time = time.time() + self.hold_timedelta
1337
1338     # Some methods could rely on self.snapshot_time, but it is better
1339     # to require user to provide it explicitly.
1340     def reset_my_keepalive_time(self, keepalive_time):
1341         """Calculate and set the next my KEEP ALIVE timeout time
1342
1343         Arguments:
1344             :keepalive_time: the initial value of the KEEP ALIVE timer
1345         """
1346         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1347
1348     def is_time_for_my_keepalive(self):
1349         """Check for my KEEP ALIVE timeout occurence"""
1350         if self.hold_timedelta == 0:
1351             return False
1352         return self.snapshot_time >= self.my_keepalive_time
1353
1354     def get_next_event_time(self):
1355         """Set the time of the next expected or to be sent KEEP ALIVE"""
1356         if self.hold_timedelta == 0:
1357             return self.snapshot_time + 86400
1358         return min(self.my_keepalive_time, self.peer_hold_time)
1359
1360     def check_peer_hold_time(self, snapshot_time):
1361         """Raise error if nothing was read from peer until specified time."""
1362         # Hold time = 0 means keepalive checking off.
1363         if self.hold_timedelta != 0:
1364             # time.time() may be too strict
1365             if snapshot_time > self.peer_hold_time:
1366                 logger.error("Peer has overstepped the hold timer.")
1367                 raise RuntimeError("Peer has overstepped the hold timer.")
1368                 # TODO: Include hold_timedelta?
1369                 # TODO: Add notification sending (attempt). That means
1370                 # move to write tracker.
1371
1372
1373 class ReadTracker(object):
1374     """Class for tracking read of mesages chunk by chunk and
1375     for idle waiting.
1376     """
1377
1378     def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1379                  l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1380                  wait_for_read=10):
1381         """The reader initialisation.
1382
1383         Arguments:
1384             bgp_socket: socket to be used for sending
1385             timer: timer to be used for scheduling
1386             storage: thread safe dict
1387             evpn: flag that evpn functionality is tested
1388             mvpn: flag that mvpn functionality is tested
1389             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1390             l3vpn: flag that l3vpn unicast functionality is tested
1391             rt_constrain: flag that rt-constrain functionality is tested
1392             allf: flag for all family testing.
1393         """
1394         # References to outside objects.
1395         self.socket = bgp_socket
1396         self.timer = timer
1397         # BGP marker length plus length field length.
1398         self.header_length = 18
1399         # TODO: make it class (constant) attribute
1400         # Computation of where next chunk ends depends on whether
1401         # we are beyond length field.
1402         self.reading_header = True
1403         # Countdown towards next size computation.
1404         self.bytes_to_read = self.header_length
1405         # Incremental buffer for message under read.
1406         self.msg_in = ""
1407         # Initialising counters
1408         self.updates_received = 0
1409         self.prefixes_introduced = 0
1410         self.prefixes_withdrawn = 0
1411         self.rx_idle_time = 0
1412         self.rx_activity_detected = True
1413         self.storage = storage
1414         self.evpn = evpn
1415         self.mvpn = mvpn
1416         self.l3vpn_mcast = l3vpn_mcast
1417         self.l3vpn = l3vpn
1418         self.rt_constrain = rt_constrain
1419         self.allf = allf
1420         self.wfr = wait_for_read
1421
1422     def read_message_chunk(self):
1423         """Read up to one message
1424
1425         Note:
1426             Currently it does not return anything.
1427         """
1428         # TODO: We could return the whole message, currently not needed.
1429         # We assume the socket is readable.
1430         chunk_message = self.socket.recv(self.bytes_to_read)
1431         self.msg_in += chunk_message
1432         self.bytes_to_read -= len(chunk_message)
1433         # TODO: bytes_to_read < 0 is not possible, right?
1434         if not self.bytes_to_read:
1435             # Finished reading a logical block.
1436             if self.reading_header:
1437                 # The logical block was a BGP header.
1438                 # Now we know the size of the message.
1439                 self.reading_header = False
1440                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1441                                       self.header_length)
1442             else:  # We have finished reading the body of the message.
1443                 # Peer has just proven it is still alive.
1444                 self.timer.reset_peer_hold_time()
1445                 # TODO: Do we want to count received messages?
1446                 # This version ignores the received message.
1447                 # TODO: Should we do validation and exit on anything
1448                 # besides update or keepalive?
1449                 # Prepare state for reading another message.
1450                 message_type_hex = self.msg_in[self.header_length]
1451                 if message_type_hex == "\x01":
1452                     logger.info("OPEN message received: 0x%s",
1453                                 binascii.b2a_hex(self.msg_in))
1454                 elif message_type_hex == "\x02":
1455                     logger.debug("UPDATE message received: 0x%s",
1456                                  binascii.b2a_hex(self.msg_in))
1457                     self.decode_update_message(self.msg_in)
1458                 elif message_type_hex == "\x03":
1459                     logger.info("NOTIFICATION message received: 0x%s",
1460                                 binascii.b2a_hex(self.msg_in))
1461                 elif message_type_hex == "\x04":
1462                     logger.info("KEEP ALIVE message received: 0x%s",
1463                                 binascii.b2a_hex(self.msg_in))
1464                 else:
1465                     logger.warning("Unexpected message received: 0x%s",
1466                                    binascii.b2a_hex(self.msg_in))
1467                 self.msg_in = ""
1468                 self.reading_header = True
1469                 self.bytes_to_read = self.header_length
1470         # We should not act upon peer_hold_time if we are reading
1471         # something right now.
1472         return
1473
1474     def decode_path_attributes(self, path_attributes_hex):
1475         """Decode the Path Attributes field (rfc4271#section-4.3)
1476
1477         Arguments:
1478             :path_attributes: path_attributes field to be decoded in hex
1479         Returns:
1480             :return: None
1481         """
1482         hex_to_decode = path_attributes_hex
1483
1484         while len(hex_to_decode):
1485             attr_flags_hex = hex_to_decode[0]
1486             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1487 #            attr_optional_bit = attr_flags & 128
1488 #            attr_transitive_bit = attr_flags & 64
1489 #            attr_partial_bit = attr_flags & 32
1490             attr_extended_length_bit = attr_flags & 16
1491
1492             attr_type_code_hex = hex_to_decode[1]
1493             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1494
1495             if attr_extended_length_bit:
1496                 attr_length_hex = hex_to_decode[2:4]
1497                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1498                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1499                 hex_to_decode = hex_to_decode[4 + attr_length:]
1500             else:
1501                 attr_length_hex = hex_to_decode[2]
1502                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1503                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1504                 hex_to_decode = hex_to_decode[3 + attr_length:]
1505
1506             if attr_type_code == 1:
1507                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1508                              binascii.b2a_hex(attr_flags_hex))
1509                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1510             elif attr_type_code == 2:
1511                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1512                              binascii.b2a_hex(attr_flags_hex))
1513                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1514             elif attr_type_code == 3:
1515                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1516                              binascii.b2a_hex(attr_flags_hex))
1517                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1518             elif attr_type_code == 4:
1519                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1520                              binascii.b2a_hex(attr_flags_hex))
1521                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1522             elif attr_type_code == 5:
1523                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1524                              binascii.b2a_hex(attr_flags_hex))
1525                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1526             elif attr_type_code == 6:
1527                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1528                              binascii.b2a_hex(attr_flags_hex))
1529                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1530             elif attr_type_code == 7:
1531                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1532                              binascii.b2a_hex(attr_flags_hex))
1533                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1534             elif attr_type_code == 9:  # rfc4456#section-8
1535                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1536                              binascii.b2a_hex(attr_flags_hex))
1537                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1538             elif attr_type_code == 10:  # rfc4456#section-8
1539                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1540                              binascii.b2a_hex(attr_flags_hex))
1541                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1542             elif attr_type_code == 14:  # rfc4760#section-3
1543                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1544                              binascii.b2a_hex(attr_flags_hex))
1545                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1546                 address_family_identifier_hex = attr_value_hex[0:2]
1547                 logger.debug("  Address Family Identifier=0x%s",
1548                              binascii.b2a_hex(address_family_identifier_hex))
1549                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1550                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1551                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1552                 next_hop_netaddr_len_hex = attr_value_hex[3]
1553                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1554                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1555                              next_hop_netaddr_len,
1556                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1557                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1558                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1559                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1560                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1561                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1562                 logger.debug("  Reserved=0x%s",
1563                              binascii.b2a_hex(reserved_hex))
1564                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1565                 logger.debug("  Network Layer Reachability Information=0x%s",
1566                              binascii.b2a_hex(nlri_hex))
1567                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1568                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1569                 for prefix in nlri_prefix_list:
1570                     logger.debug("  nlri_prefix_received: %s", prefix)
1571                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1572             elif attr_type_code == 15:  # rfc4760#section-4
1573                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1574                              binascii.b2a_hex(attr_flags_hex))
1575                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1576                 address_family_identifier_hex = attr_value_hex[0:2]
1577                 logger.debug("  Address Family Identifier=0x%s",
1578                              binascii.b2a_hex(address_family_identifier_hex))
1579                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1580                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1581                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1582                 wd_hex = attr_value_hex[3:]
1583                 logger.debug("  Withdrawn Routes=0x%s",
1584                              binascii.b2a_hex(wd_hex))
1585                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1586                 logger.debug("  Withdrawn routes prefix list: %s",
1587                              wdr_prefix_list)
1588                 for prefix in wdr_prefix_list:
1589                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1590                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1591             else:
1592                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1593                              binascii.b2a_hex(attr_flags_hex))
1594                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1595         return None
1596
1597     def decode_update_message(self, msg):
1598         """Decode an UPDATE message (rfc4271#section-4.3)
1599
1600         Arguments:
1601             :msg: message to be decoded in hex
1602         Returns:
1603             :return: None
1604         """
1605         logger.debug("Decoding update message:")
1606         # message header - marker
1607         marker_hex = msg[:16]
1608         logger.debug("Message header marker: 0x%s",
1609                      binascii.b2a_hex(marker_hex))
1610         # message header - message length
1611         msg_length_hex = msg[16:18]
1612         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1613         logger.debug("Message lenght: 0x%s (%s)",
1614                      binascii.b2a_hex(msg_length_hex), msg_length)
1615         # message header - message type
1616         msg_type_hex = msg[18:19]
1617         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1618
1619         with self.storage as stor:
1620             # this will replace the previously stored message
1621             stor['update'] = binascii.hexlify(msg)
1622
1623         logger.debug("Evpn {}".format(self.evpn))
1624         if self.evpn:
1625             logger.debug("Skipping update decoding due to evpn data expected")
1626             return
1627
1628         logger.debug("Mvpn {}".format(self.mvpn))
1629         if self.mvpn:
1630             logger.debug("Skipping update decoding due to mvpn data expected")
1631             return
1632
1633         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1634         if self.l3vpn_mcast:
1635             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1636             return
1637
1638         logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1639         if self.l3vpn_mcast:
1640             logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1641             return
1642
1643         logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1644         if self.rt_constrain:
1645             logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1646             return
1647
1648         logger.debug("Allf {}".format(self.allf))
1649         if self.allf:
1650             logger.debug("Skipping update decoding")
1651             return
1652
1653         if msg_type == 2:
1654             logger.debug("Message type: 0x%s (update)",
1655                          binascii.b2a_hex(msg_type_hex))
1656             # withdrawn routes length
1657             wdr_length_hex = msg[19:21]
1658             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1659             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1660                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1661             # withdrawn routes
1662             wdr_hex = msg[21:21 + wdr_length]
1663             logger.debug("Withdrawn routes: 0x%s",
1664                          binascii.b2a_hex(wdr_hex))
1665             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1666             logger.debug("Withdrawn routes prefix list: %s",
1667                          wdr_prefix_list)
1668             for prefix in wdr_prefix_list:
1669                 logger.debug("withdrawn_prefix_received: %s", prefix)
1670             # total path attribute length
1671             total_pa_length_offset = 21 + wdr_length
1672             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1673             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1674             logger.debug("Total path attribute lenght: 0x%s (%s)",
1675                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1676             # path attributes
1677             pa_offset = total_pa_length_offset + 2
1678             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1679             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1680             self.decode_path_attributes(pa_hex)
1681             # network layer reachability information length
1682             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1683             logger.debug("Calculated NLRI length: %s", nlri_length)
1684             # network layer reachability information
1685             nlri_offset = pa_offset + total_pa_length
1686             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1687             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1688             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1689             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1690             for prefix in nlri_prefix_list:
1691                 logger.debug("nlri_prefix_received: %s", prefix)
1692             # Updating counters
1693             self.updates_received += 1
1694             self.prefixes_introduced += len(nlri_prefix_list)
1695             self.prefixes_withdrawn += len(wdr_prefix_list)
1696         else:
1697             logger.error("Unexpeced message type 0x%s in 0x%s",
1698                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1699
1700     def wait_for_read(self):
1701         """Read message until timeout (next expected event).
1702
1703         Note:
1704             Used when no more updates has to be sent to avoid busy-wait.
1705             Currently it does not return anything.
1706         """
1707         # Compute time to the first predictable state change
1708         event_time = self.timer.get_next_event_time()
1709         # snapshot_time would be imprecise
1710         wait_timedelta = min(event_time - time.time(), self.wfr)
1711         if wait_timedelta < 0:
1712             # The program got around to waiting to an event in "very near
1713             # future" so late that it became a "past" event, thus tell
1714             # "select" to not wait at all. Passing negative timedelta to
1715             # select() would lead to either waiting forever (for -1) or
1716             # select.error("Invalid parameter") (for everything else).
1717             wait_timedelta = 0
1718         # And wait for event or something to read.
1719
1720         if not self.rx_activity_detected or not (self.updates_received % 100):
1721             # right time to write statistics to the log (not for every update and
1722             # not too frequently to avoid having large log files)
1723             logger.info("total_received_update_message_counter: %s",
1724                         self.updates_received)
1725             logger.info("total_received_nlri_prefix_counter: %s",
1726                         self.prefixes_introduced)
1727             logger.info("total_received_withdrawn_prefix_counter: %s",
1728                         self.prefixes_withdrawn)
1729
1730         start_time = time.time()
1731         select.select([self.socket], [], [self.socket], wait_timedelta)
1732         timedelta = time.time() - start_time
1733         self.rx_idle_time += timedelta
1734         self.rx_activity_detected = timedelta < 1
1735
1736         if not self.rx_activity_detected or not (self.updates_received % 100):
1737             # right time to write statistics to the log (not for every update and
1738             # not too frequently to avoid having large log files)
1739             logger.info("... idle for %.3fs", timedelta)
1740             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1741         return
1742
1743
1744 class WriteTracker(object):
1745     """Class tracking enqueueing messages and sending chunks of them."""
1746
1747     def __init__(self, bgp_socket, generator, timer):
1748         """The writter initialisation.
1749
1750         Arguments:
1751             bgp_socket: socket to be used for sending
1752             generator: generator to be used for message generation
1753             timer: timer to be used for scheduling
1754         """
1755         # References to outside objects,
1756         self.socket = bgp_socket
1757         self.generator = generator
1758         self.timer = timer
1759         # Really new fields.
1760         # TODO: Would attribute docstrings add anything substantial?
1761         self.sending_message = False
1762         self.bytes_to_send = 0
1763         self.msg_out = ""
1764
1765     def enqueue_message_for_sending(self, message):
1766         """Enqueue message and change state.
1767
1768         Arguments:
1769             message: message to be enqueued into the msg_out buffer
1770         """
1771         self.msg_out += message
1772         self.bytes_to_send += len(message)
1773         self.sending_message = True
1774
1775     def send_message_chunk_is_whole(self):
1776         """Send enqueued data from msg_out buffer
1777
1778         Returns:
1779             :return: true if no remaining data to send
1780         """
1781         # We assume there is a msg_out to send and socket is writable.
1782         # print "going to send", repr(self.msg_out)
1783         self.timer.snapshot()
1784         bytes_sent = self.socket.send(self.msg_out)
1785         # Forget the part of message that was sent.
1786         self.msg_out = self.msg_out[bytes_sent:]
1787         self.bytes_to_send -= bytes_sent
1788         if not self.bytes_to_send:
1789             # TODO: Is it possible to hit negative bytes_to_send?
1790             self.sending_message = False
1791             # We should have reset hold timer on peer side.
1792             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1793             # The possible reason for not prioritizing reads is gone.
1794             return True
1795         return False
1796
1797
1798 class StateTracker(object):
1799     """Main loop has state so complex it warrants this separate class."""
1800
1801     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1802         """The state tracker initialisation.
1803
1804         Arguments:
1805             bgp_socket: socket to be used for sending / receiving
1806             generator: generator to be used for message generation
1807             timer: timer to be used for scheduling
1808             inqueue: user initiated messages queue
1809             storage: thread safe dict to store data for the rpc server
1810             cliargs: cli args from the user
1811         """
1812         # References to outside objects.
1813         self.socket = bgp_socket
1814         self.generator = generator
1815         self.timer = timer
1816         # Sub-trackers.
1817         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1818                                   l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1819                                   rt_constrain=cliargs.rt_constrain, wait_for_read=cliargs.wfr)
1820         self.writer = WriteTracker(bgp_socket, generator, timer)
1821         # Prioritization state.
1822         self.prioritize_writing = False
1823         # In general, we prioritize reading over writing. But in order
1824         # not to get blocked by neverending reads, we should
1825         # check whether we are not risking running out of holdtime.
1826         # So in some situations, this field is set to True to attempt
1827         # finishing sending a message, after which this field resets
1828         # back to False.
1829         # TODO: Alternative is to switch fairly between reading and
1830         # writing (called round robin from now on).
1831         # Message counting is done in generator.
1832         self.inqueue = inqueue
1833
1834     def perform_one_loop_iteration(self):
1835         """ The main loop iteration
1836
1837         Notes:
1838             Calculates priority, resolves all conditions, calls
1839             appropriate method and returns to caller to repeat.
1840         """
1841         self.timer.snapshot()
1842         if not self.prioritize_writing:
1843             if self.timer.is_time_for_my_keepalive():
1844                 if not self.writer.sending_message:
1845                     # We need to schedule a keepalive ASAP.
1846                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1847                     logger.info("KEEP ALIVE is sent.")
1848                 # We are sending a message now, so let's prioritize it.
1849                 self.prioritize_writing = True
1850
1851         try:
1852             msg = self.inqueue.get_nowait()
1853             logger.info("Received message: {}".format(msg))
1854             msgbin = binascii.unhexlify(msg)
1855             self.writer.enqueue_message_for_sending(msgbin)
1856         except Queue.Empty:
1857             pass
1858         # Now we know what our priorities are, we have to check
1859         # which actions are available.
1860         # socket.socket() returns three lists,
1861         # we store them to list of lists.
1862         list_list = select.select([self.socket], [self.socket], [self.socket],
1863                                   self.timer.report_timedelta)
1864         read_list, write_list, except_list = list_list
1865         # Lists are unpacked, each is either [] or [self.socket],
1866         # so we will test them as boolean.
1867         if except_list:
1868             logger.error("Exceptional state on the socket.")
1869             raise RuntimeError("Exceptional state on socket", self.socket)
1870         # We will do either read or write.
1871         if not (self.prioritize_writing and write_list):
1872             # Either we have no reason to rush writes,
1873             # or the socket is not writable.
1874             # We are focusing on reading here.
1875             if read_list:  # there is something to read indeed
1876                 # In this case we want to read chunk of message
1877                 # and repeat the select,
1878                 self.reader.read_message_chunk()
1879                 return
1880             # We were focusing on reading, but nothing to read was there.
1881             # Good time to check peer for hold timer.
1882             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1883             # Quiet on the read front, we can have attempt to write.
1884         if write_list:
1885             # Either we really want to reset peer's view of our hold
1886             # timer, or there was nothing to read.
1887             # Were we in the middle of sending a message?
1888             if self.writer.sending_message:
1889                 # Was it the end of a message?
1890                 whole = self.writer.send_message_chunk_is_whole()
1891                 # We were pressed to send something and we did it.
1892                 if self.prioritize_writing and whole:
1893                     # We prioritize reading again.
1894                     self.prioritize_writing = False
1895                 return
1896             # Finally to check if still update messages to be generated.
1897             if self.generator.remaining_prefixes:
1898                 msg_out = self.generator.compose_update_message()
1899                 if not self.generator.remaining_prefixes:
1900                     # We have just finished update generation,
1901                     # end-of-rib is due.
1902                     logger.info("All update messages generated.")
1903                     logger.info("Storing performance results.")
1904                     self.generator.store_results()
1905                     logger.info("Finally an END-OF-RIB is sent.")
1906                     msg_out += self.generator.update_message(wr_prefixes=[],
1907                                                              nlri_prefixes=[],
1908                                                              end_of_rib=True)
1909                 self.writer.enqueue_message_for_sending(msg_out)
1910                 # Attempt for real sending to be done in next iteration.
1911                 return
1912             # Nothing to write anymore.
1913             # To avoid busy loop, we do idle waiting here.
1914             self.reader.wait_for_read()
1915             return
1916         # We can neither read nor write.
1917         logger.warning("Input and output both blocked for " +
1918                        str(self.timer.report_timedelta) + " seconds.")
1919         # FIXME: Are we sure select has been really waiting
1920         # the whole period?
1921         return
1922
1923
1924 def create_logger(loglevel, logfile):
1925     """Create logger object
1926
1927     Arguments:
1928         :loglevel: log level
1929         :logfile: log file name
1930     Returns:
1931         :return: logger object
1932     """
1933     logger = logging.getLogger("logger")
1934     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1935     console_handler = logging.StreamHandler()
1936     file_handler = logging.FileHandler(logfile, mode="w")
1937     console_handler.setFormatter(log_formatter)
1938     file_handler.setFormatter(log_formatter)
1939     logger.addHandler(console_handler)
1940     logger.addHandler(file_handler)
1941     logger.setLevel(loglevel)
1942     return logger
1943
1944
1945 def job(arguments, inqueue, storage):
1946     """One time initialisation and iterations looping.
1947     Notes:
1948         Establish BGP connection and run iterations.
1949
1950     Arguments:
1951         :arguments: Command line arguments
1952         :inqueue: Data to be sent from play.py
1953         :storage: Shared dict for rpc server
1954     Returns:
1955         :return: None
1956     """
1957     bgp_socket = establish_connection(arguments)
1958     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1959     # Receive open message before sending anything.
1960     # FIXME: Add parameter to send default open message first,
1961     # to work with "you first" peers.
1962     msg_in = read_open_message(bgp_socket)
1963     timer = TimeTracker(msg_in)
1964     generator = MessageGenerator(arguments)
1965     msg_out = generator.open_message()
1966     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1967     # Send our open message to the peer.
1968     bgp_socket.send(msg_out)
1969     # Wait for confirming keepalive.
1970     # TODO: Surely in just one packet?
1971     # Using exact keepalive length to not to see possible updates.
1972     msg_in = bgp_socket.recv(19)
1973     if msg_in != generator.keepalive_message():
1974         error_msg = "Open not confirmed by keepalive, instead got"
1975         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1976         raise MessageError(error_msg, msg_in)
1977     timer.reset_peer_hold_time()
1978     # Send the keepalive to indicate the connection is accepted.
1979     timer.snapshot()  # Remember this time.
1980     msg_out = generator.keepalive_message()
1981     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1982     bgp_socket.send(msg_out)
1983     # Use the remembered time.
1984     timer.reset_my_keepalive_time(timer.snapshot_time)
1985     # End of initial handshake phase.
1986     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1987     while True:  # main reactor loop
1988         state.perform_one_loop_iteration()
1989
1990
1991 class Rpcs:
1992     '''Handler for SimpleXMLRPCServer'''
1993
1994     def __init__(self, sendqueue, storage):
1995         '''Init method
1996
1997         Arguments:
1998             :sendqueue: queue for data to be sent towards odl
1999             :storage: thread safe dict
2000         '''
2001         self.queue = sendqueue
2002         self.storage = storage
2003
2004     def send(self, text):
2005         '''Data to be sent
2006
2007         Arguments:
2008             :text: hes string of the data to be sent
2009         '''
2010         self.queue.put(text)
2011
2012     def get(self, text=''):
2013         '''Reads data form the storage
2014
2015         - returns stored data or an empty string, at the moment only
2016           'update' is stored
2017
2018         Arguments:
2019             :text: a key to the storage to get the data
2020         Returns:
2021             :data: stored data
2022         '''
2023         with self.storage as stor:
2024             return stor.get(text, '')
2025
2026     def clean(self, text=''):
2027         '''Cleans data form the storage
2028
2029         Arguments:
2030             :text: a key to the storage to clean the data
2031         '''
2032         with self.storage as stor:
2033             if text in stor:
2034                 del stor[text]
2035
2036
2037 def threaded_job(arguments):
2038     """Run the job threaded
2039
2040     Arguments:
2041         :arguments: Command line arguments
2042     Returns:
2043         :return: None
2044     """
2045     amount_left = arguments.amount
2046     utils_left = arguments.multiplicity
2047     prefix_current = arguments.firstprefix
2048     myip_current = arguments.myip
2049     thread_args = []
2050     rpcqueue = Queue.Queue()
2051     storage = SafeDict()
2052
2053     while 1:
2054         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
2055         amount_left -= amount_per_util
2056         utils_left -= 1
2057
2058         args = deepcopy(arguments)
2059         args.amount = amount_per_util
2060         args.firstprefix = prefix_current
2061         args.myip = myip_current
2062         thread_args.append(args)
2063
2064         if not utils_left:
2065             break
2066         prefix_current += amount_per_util * 16
2067         myip_current += 1
2068
2069     try:
2070         # Create threads
2071         for t in thread_args:
2072             thread.start_new_thread(job, (t, rpcqueue, storage))
2073     except Exception:
2074         print "Error: unable to start thread."
2075         raise SystemExit(2)
2076
2077     rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
2078     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2079     rpcserver.serve_forever()
2080
2081
2082 if __name__ == "__main__":
2083     arguments = parse_arguments()
2084     logger = create_logger(arguments.loglevel, arguments.logfile)
2085     threaded_job(arguments)