Add bgp funct rt constrain validation test suite
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Rpc server port."
69     parser.add_argument("--port", default="8000", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be announced in one iteration"
71     parser.add_argument("--insert", default="1", type=int, help=str_help)
72     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74     str_help = "The number of prefixes to process without withdrawals"
75     parser.add_argument("--prefill", default="0", type=int, help=str_help)
76     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77     parser.add_argument("--updates", choices=["single", "separate"],
78                         default=["separate"], help=str_help)
79     str_help = "Base prefix IP address for prefix generation"
80     parser.add_argument("--firstprefix", default="8.0.1.0",
81                         type=ipaddr.IPv4Address, help=str_help)
82     str_help = "The prefix length."
83     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84     str_help = "Listen for connection, instead of initiating it."
85     parser.add_argument("--listen", action="store_true", help=str_help)
86     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87                 "Default value only suitable for listening.")
88     parser.add_argument("--myip", default="0.0.0.0",
89                         type=ipaddr.IPv4Address, help=str_help)
90     str_help = ("TCP port to bind to when listening or initiating connection." +
91                 "Default only suitable for initiating.")
92     parser.add_argument("--myport", default="0", type=int, help=str_help)
93     str_help = "The IP of the next hop to be placed into the update messages."
94     parser.add_argument("--nexthop", default="192.0.2.1",
95                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96     str_help = "Identifier of the route originator."
97     parser.add_argument("--originator", default=None,
98                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
99     str_help = "Cluster list item identifier."
100     parser.add_argument("--cluster", default=None,
101                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102     str_help = ("Numeric IP Address to try to connect to." +
103                 "Currently no effect in listening mode.")
104     parser.add_argument("--peerip", default="127.0.0.2",
105                         type=ipaddr.IPv4Address, help=str_help)
106     str_help = "TCP port to try to connect to. No effect in listening mode."
107     parser.add_argument("--peerport", default="179", type=int, help=str_help)
108     str_help = "Local hold time."
109     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110     str_help = "Log level (--error, --warning, --info, --debug)"
111     parser.add_argument("--error", dest="loglevel", action="store_const",
112                         const=logging.ERROR, default=logging.INFO,
113                         help=str_help)
114     parser.add_argument("--warning", dest="loglevel", action="store_const",
115                         const=logging.WARNING, default=logging.INFO,
116                         help=str_help)
117     parser.add_argument("--info", dest="loglevel", action="store_const",
118                         const=logging.INFO, default=logging.INFO,
119                         help=str_help)
120     parser.add_argument("--debug", dest="loglevel", action="store_const",
121                         const=logging.DEBUG, default=logging.INFO,
122                         help=str_help)
123     str_help = "Log file name"
124     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125     str_help = "Trailing part of the csv result files for plotting purposes"
126     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127     str_help = "Minimum number of updates to reach to include result into csv."
128     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131     str_help = "Using peerip instead of myip for xmlrpc server"
132     parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133     str_help = "Link-State NLRI supported"
134     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135     str_help = "Link-State NLRI: Identifier"
136     parser.add_argument("-lsid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: Tunnel ID"
138     parser.add_argument("-lstid", default="1", type=int, help=str_help)
139     str_help = "Link-State NLRI: LSP ID"
140     parser.add_argument("-lspid", default="1", type=int, help=str_help)
141     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142     parser.add_argument("--lstsaddr", default="1.2.3.4",
143                         type=ipaddr.IPv4Address, help=str_help)
144     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145     parser.add_argument("--lsteaddr", default="5.6.7.8",
146                         type=ipaddr.IPv4Address, help=str_help)
147     str_help = "Link-State NLRI: Identifier Step"
148     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149     str_help = "Link-State NLRI: Tunnel ID Step"
150     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151     str_help = "Link-State NLRI: LSP ID Step"
152     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157     str_help = "How many play utilities are to be started."
158     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160     Enabling this flag makes the script not decoding the update mesage, because of not\
161     supported decoding for these elements."
162     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164     Enabling this flag makes the script not decoding the update mesage, because of not\
165     supported decoding for these elements."
166     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
167     str_help = "Open message includes L3VPN-MULTICAST arguments.\
168     Enabling this flag makes the script not decoding the update mesage, because of not\
169     supported decoding for these elements."
170     parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
171     str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
172     parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
173     str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
174     parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
175     str_help = "Add all supported families without message decoding."
176     parser.add_argument("--allf", default=False, action="store_true", help=str_help)
177     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
178     str_help = "Skipping well known attributes for update message"
179     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
180     arguments = parser.parse_args()
181     if arguments.multiplicity < 1:
182         print "Multiplicity", arguments.multiplicity, "is not positive."
183         raise SystemExit(1)
184     # TODO: Are sanity checks (such as asnumber>=0) required?
185     return arguments
186
187
188 def establish_connection(arguments):
189     """Establish connection to BGP peer.
190
191     Arguments:
192         :arguments: following command-line arguments are used
193             - arguments.myip: local IP address
194             - arguments.myport: local port
195             - arguments.peerip: remote IP address
196             - arguments.peerport: remote port
197     Returns:
198         :return: socket.
199     """
200     if arguments.listen:
201         logger.info("Connecting in the listening mode.")
202         logger.debug("Local IP address: " + str(arguments.myip))
203         logger.debug("Local port: " + str(arguments.myport))
204         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
205         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
206         # bind need single tuple as argument
207         listening_socket.bind((str(arguments.myip), arguments.myport))
208         listening_socket.listen(1)
209         bgp_socket, _ = listening_socket.accept()
210         # TODO: Verify client IP is cotroller IP.
211         listening_socket.close()
212     else:
213         logger.info("Connecting in the talking mode.")
214         logger.debug("Local IP address: " + str(arguments.myip))
215         logger.debug("Local port: " + str(arguments.myport))
216         logger.debug("Remote IP address: " + str(arguments.peerip))
217         logger.debug("Remote port: " + str(arguments.peerport))
218         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
219         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
220         # bind to force specified address and port
221         talking_socket.bind((str(arguments.myip), arguments.myport))
222         # socket does not spead ipaddr, hence str()
223         talking_socket.connect((str(arguments.peerip), arguments.peerport))
224         bgp_socket = talking_socket
225     logger.info("Connected to ODL.")
226     return bgp_socket
227
228
229 def get_short_int_from_message(message, offset=16):
230     """Extract 2-bytes number from provided message.
231
232     Arguments:
233         :message: given message
234         :offset: offset of the short_int inside the message
235     Returns:
236         :return: required short_inf value.
237     Notes:
238         default offset value is the BGP message size offset.
239     """
240     high_byte_int = ord(message[offset])
241     low_byte_int = ord(message[offset + 1])
242     short_int = high_byte_int * 256 + low_byte_int
243     return short_int
244
245
246 def get_prefix_list_from_hex(prefixes_hex):
247     """Get decoded list of prefixes (rfc4271#section-4.3)
248
249     Arguments:
250         :prefixes_hex: list of prefixes to be decoded in hex
251     Returns:
252         :return: list of prefixes in the form of ip address (X.X.X.X/X)
253     """
254     prefix_list = []
255     offset = 0
256     while offset < len(prefixes_hex):
257         prefix_bit_len_hex = prefixes_hex[offset]
258         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
259         prefix_len = ((prefix_bit_len - 1) / 8) + 1
260         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
261         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
262         offset += 1 + prefix_len
263         prefix_list.append(prefix + "/" + str(prefix_bit_len))
264     return prefix_list
265
266
267 class MessageError(ValueError):
268     """Value error with logging optimized for hexlified messages."""
269
270     def __init__(self, text, message, *args):
271         """Initialisation.
272
273         Store and call super init for textual comment,
274         store raw message which caused it.
275         """
276         self.text = text
277         self.msg = message
278         super(MessageError, self).__init__(text, message, *args)
279
280     def __str__(self):
281         """Generate human readable error message.
282
283         Returns:
284             :return: human readable message as string
285         Notes:
286             Use a placeholder string if the message is to be empty.
287         """
288         message = binascii.hexlify(self.msg)
289         if message == "":
290             message = "(empty message)"
291         return self.text + ": " + message
292
293
294 def read_open_message(bgp_socket):
295     """Receive peer's OPEN message
296
297     Arguments:
298         :bgp_socket: the socket to be read
299     Returns:
300         :return: received OPEN message.
301     Notes:
302         Performs just basic incomming message checks
303     """
304     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
305     # TODO: Can the incoming open message be split in more than one packet?
306     # Some validation.
307     if len(msg_in) < 37:
308         # 37 is minimal length of open message with 4-byte AS number.
309         error_msg = (
310             "Message length (" + str(len(msg_in)) + ") is smaller than "
311             "minimal length of OPEN message with 4-byte AS number (37)"
312         )
313         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
314         raise MessageError(error_msg, msg_in)
315     # TODO: We could check BGP marker, but it is defined only later;
316     # decide what to do.
317     reported_length = get_short_int_from_message(msg_in)
318     if len(msg_in) != reported_length:
319         error_msg = (
320             "Expected message length (" + reported_length +
321             ") does not match actual length (" + str(len(msg_in)) + ")"
322         )
323         logger.error(error_msg + binascii.hexlify(msg_in))
324         raise MessageError(error_msg, msg_in)
325     logger.info("Open message received.")
326     return msg_in
327
328
329 class MessageGenerator(object):
330     """Class which generates messages, holds states and configuration values."""
331
332     # TODO: Define bgp marker as a class (constant) variable.
333     def __init__(self, args):
334         """Initialisation according to command-line args.
335
336         Arguments:
337             :args: argsparser's Namespace object which contains command-line
338                 options for MesageGenerator initialisation
339         Notes:
340             Calculates and stores default values used later on for
341             message generation.
342         """
343         self.total_prefix_amount = args.amount
344         # Number of update messages left to be sent.
345         self.remaining_prefixes = self.total_prefix_amount
346
347         # New parameters initialisation
348         self.port = args.port
349         self.iteration = 0
350         self.prefix_base_default = args.firstprefix
351         self.prefix_length_default = args.prefixlen
352         self.wr_prefixes_default = []
353         self.nlri_prefixes_default = []
354         self.version_default = 4
355         self.my_autonomous_system_default = args.asnumber
356         self.hold_time_default = args.holdtime  # Local hold time.
357         self.bgp_identifier_default = int(args.myip)
358         self.next_hop_default = args.nexthop
359         self.originator_id_default = args.originator
360         self.cluster_list_item_default = args.cluster
361         self.single_update_default = args.updates == "single"
362         self.randomize_updates_default = args.updates == "random"
363         self.prefix_count_to_add_default = args.insert
364         self.prefix_count_to_del_default = args.withdraw
365         if self.prefix_count_to_del_default < 0:
366             self.prefix_count_to_del_default = 0
367         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
368             # total number of prefixes must grow to avoid infinite test loop
369             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
370         self.slot_size_default = self.prefix_count_to_add_default
371         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
372         self.results_file_name_default = args.results
373         self.performance_threshold_default = args.threshold
374         self.rfc4760 = args.rfc4760
375         self.bgpls = args.bgpls
376         self.evpn = args.evpn
377         self.mvpn = args.mvpn
378         self.l3vpn_mcast = args.l3vpn_mcast
379         self.l3vpn = args.l3vpn
380         self.rt_constrain = args.rt_constrain
381         self.allf = args.allf
382         self.skipattr = args.skipattr
383         # Default values when BGP-LS Attributes are used
384         if self.bgpls:
385             self.prefix_count_to_add_default = 1
386             self.prefix_count_to_del_default = 0
387         self.ls_nlri_default = {"Identifier": args.lsid,
388                                 "TunnelID": args.lstid,
389                                 "LSPID": args.lspid,
390                                 "IPv4TunnelSenderAddress": args.lstsaddr,
391                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
392         self.lsid_step = args.lsidstep
393         self.lstid_step = args.lstidstep
394         self.lspid_step = args.lspidstep
395         self.lstsaddr_step = args.lstsaddrstep
396         self.lsteaddr_step = args.lsteaddrstep
397         # Default values used for randomized part
398         s1_slots = ((self.total_prefix_amount -
399                      self.remaining_prefixes_threshold - 1) /
400                     self.prefix_count_to_add_default + 1)
401         s2_slots = ((self.remaining_prefixes_threshold - 1) /
402                     (self.prefix_count_to_add_default -
403                      self.prefix_count_to_del_default) + 1)
404         # S1_First_Index = 0
405         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
406         s2_first_index = s1_slots * self.prefix_count_to_add_default
407         s2_last_index = (s2_first_index +
408                          s2_slots * (self.prefix_count_to_add_default -
409                                      self.prefix_count_to_del_default) - 1)
410         self.slot_gap_default = ((self.total_prefix_amount -
411                                   self.remaining_prefixes_threshold - 1) /
412                                  self.prefix_count_to_add_default + 1)
413         self.randomize_lowest_default = s2_first_index
414         self.randomize_highest_default = s2_last_index
415         # Initialising counters
416         self.phase1_start_time = 0
417         self.phase1_stop_time = 0
418         self.phase2_start_time = 0
419         self.phase2_stop_time = 0
420         self.phase1_updates_sent = 0
421         self.phase2_updates_sent = 0
422         self.updates_sent = 0
423
424         self.log_info = args.loglevel <= logging.INFO
425         self.log_debug = args.loglevel <= logging.DEBUG
426         """
427         Flags needed for the MessageGenerator performance optimization.
428         Calling logger methods each iteration even with proper log level set
429         slows down significantly the MessageGenerator performance.
430         Measured total generation time (1M updates, dry run, error log level):
431         - logging based on basic logger features: 36,2s
432         - logging based on advanced logger features (lazy logging): 21,2s
433         - conditional calling of logger methods enclosed inside condition: 8,6s
434         """
435
436         logger.info("Generator initialisation")
437         logger.info("  Target total number of prefixes to be introduced: " +
438                     str(self.total_prefix_amount))
439         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
440                     str(self.prefix_length_default))
441         logger.info("  My Autonomous System number: " +
442                     str(self.my_autonomous_system_default))
443         logger.info("  My Hold Time: " + str(self.hold_time_default))
444         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
445         logger.info("  Next Hop: " + str(self.next_hop_default))
446         logger.info("  Originator ID: " + str(self.originator_id_default))
447         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
448         logger.info("  Prefix count to be inserted at once: " +
449                     str(self.prefix_count_to_add_default))
450         logger.info("  Prefix count to be withdrawn at once: " +
451                     str(self.prefix_count_to_del_default))
452         logger.info("  Fast pre-fill up to " +
453                     str(self.total_prefix_amount -
454                         self.remaining_prefixes_threshold) + " prefixes")
455         logger.info("  Remaining number of prefixes to be processed " +
456                     "in parallel with withdrawals: " +
457                     str(self.remaining_prefixes_threshold))
458         logger.debug("  Prefix index range used after pre-fill procedure [" +
459                      str(self.randomize_lowest_default) + ", " +
460                      str(self.randomize_highest_default) + "]")
461         if self.single_update_default:
462             logger.info("  Common single UPDATE will be generated " +
463                         "for both NLRI & WITHDRAWN lists")
464         else:
465             logger.info("  Two separate UPDATEs will be generated " +
466                         "for each NLRI & WITHDRAWN lists")
467         if self.randomize_updates_default:
468             logger.info("  Generation of UPDATE messages will be randomized")
469         logger.info("  Let\'s go ...\n")
470
471         # TODO: Notification for hold timer expiration can be handy.
472
473     def store_results(self, file_name=None, threshold=None):
474         """ Stores specified results into files based on file_name value.
475
476         Arguments:
477             :param file_name: Trailing (common) part of result file names
478             :param threshold: Minimum number of sent updates needed for each
479                               result to be included into result csv file
480                               (mainly needed because of the result accuracy)
481         Returns:
482             :return: n/a
483         """
484         # default values handling
485         # TODO optimize default values handling (use e.g. dicionary.update() approach)
486         if file_name is None:
487             file_name = self.results_file_name_default
488         if threshold is None:
489             threshold = self.performance_threshold_default
490         # performance calculation
491         if self.phase1_updates_sent >= threshold:
492             totals1 = self.phase1_updates_sent
493             performance1 = int(self.phase1_updates_sent /
494                                (self.phase1_stop_time - self.phase1_start_time))
495         else:
496             totals1 = None
497             performance1 = None
498         if self.phase2_updates_sent >= threshold:
499             totals2 = self.phase2_updates_sent
500             performance2 = int(self.phase2_updates_sent /
501                                (self.phase2_stop_time - self.phase2_start_time))
502         else:
503             totals2 = None
504             performance2 = None
505
506         logger.info("#" * 10 + " Final results " + "#" * 10)
507         logger.info("Number of iterations: " + str(self.iteration))
508         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
509                     str(self.phase1_updates_sent))
510         logger.info("The pre-fill phase duration: " +
511                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
512         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
513                     str(self.phase2_updates_sent))
514         logger.info("The 2nd test phase duration: " +
515                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
516         logger.info("Threshold for performance reporting: " + str(threshold))
517
518         # making labels
519         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
520                         " route(s) per UPDATE")
521         if self.single_update_default:
522             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
523                                   "/-" + str(self.prefix_count_to_del_default) +
524                                   " routes per UPDATE")
525         else:
526             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
527                                   "/-" + str(self.prefix_count_to_del_default) +
528                                   " routes in two UPDATEs")
529         # collecting capacity and performance results
530         totals = {}
531         performance = {}
532         if totals1 is not None:
533             totals[phase1_label] = totals1
534             performance[phase1_label] = performance1
535         if totals2 is not None:
536             totals[phase2_label] = totals2
537             performance[phase2_label] = performance2
538         self.write_results_to_file(totals, "totals-" + file_name)
539         self.write_results_to_file(performance, "performance-" + file_name)
540
541     def write_results_to_file(self, results, file_name):
542         """Writes results to the csv plot file consumable by Jenkins.
543
544         Arguments:
545             :param file_name: Name of the (csv) file to be created
546         Returns:
547             :return: none
548         """
549         first_line = ""
550         second_line = ""
551         f = open(file_name, "wt")
552         try:
553             for key in sorted(results):
554                 first_line += key + ", "
555                 second_line += str(results[key]) + ", "
556             first_line = first_line[:-2]
557             second_line = second_line[:-2]
558             f.write(first_line + "\n")
559             f.write(second_line + "\n")
560             logger.info("Message generator performance results stored in " +
561                         file_name + ":")
562             logger.info("  " + first_line)
563             logger.info("  " + second_line)
564         finally:
565             f.close()
566
567     # Return pseudo-randomized (reproducible) index for selected range
568     def randomize_index(self, index, lowest=None, highest=None):
569         """Calculates pseudo-randomized index from selected range.
570
571         Arguments:
572             :param index: input index
573             :param lowest: the lowes index from the randomized area
574             :param highest: the highest index from the randomized area
575         Returns:
576             :return: the (pseudo)randomized index
577         Notes:
578             Created just as a fame for future generator enhancement.
579         """
580         # default values handling
581         # TODO optimize default values handling (use e.g. dicionary.update() approach)
582         if lowest is None:
583             lowest = self.randomize_lowest_default
584         if highest is None:
585             highest = self.randomize_highest_default
586         # randomize
587         if (index >= lowest) and (index <= highest):
588             # we are in the randomized range -> shuffle it inside
589             # the range (now just reverse the order)
590             new_index = highest - (index - lowest)
591         else:
592             # we are out of the randomized range -> nothing to do
593             new_index = index
594         return new_index
595
596     def get_ls_nlri_values(self, index):
597         """Generates LS-NLRI parameters.
598         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
599
600         Arguments:
601             :param index: index (iteration)
602         Returns:
603             :return: dictionary of LS NLRI parameters and values
604         """
605         # generating list of LS NLRI parameters
606         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
607         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
608         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
609         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
610         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
611         ls_nlri_values = {"Identifier": identifier,
612                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
613                           "TunnelID": tunnel_id, "LSPID": lsp_id,
614                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
615         return ls_nlri_values
616
617     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
618                         prefix_len=None, prefix_count=None, randomize=None):
619         """Generates list of IP address prefixes.
620
621         Arguments:
622             :param slot_index: index of group of prefix addresses
623             :param slot_size: size of group of prefix addresses
624                 in [number of included prefixes]
625             :param prefix_base: IP address of the first prefix
626                 (slot_index = 0, prefix_index = 0)
627             :param prefix_len: length of the prefix in bites
628                 (the same as size of netmask)
629             :param prefix_count: number of prefixes to be returned
630                 from the specified slot
631         Returns:
632             :return: list of generated IP address prefixes
633         """
634         # default values handling
635         # TODO optimize default values handling (use e.g. dicionary.update() approach)
636         if slot_size is None:
637             slot_size = self.slot_size_default
638         if prefix_base is None:
639             prefix_base = self.prefix_base_default
640         if prefix_len is None:
641             prefix_len = self.prefix_length_default
642         if prefix_count is None:
643             prefix_count = slot_size
644         if randomize is None:
645             randomize = self.randomize_updates_default
646         # generating list of prefixes
647         indexes = []
648         prefixes = []
649         prefix_gap = 2 ** (32 - prefix_len)
650         for i in range(prefix_count):
651             prefix_index = slot_index * slot_size + i
652             if randomize:
653                 prefix_index = self.randomize_index(prefix_index)
654             indexes.append(prefix_index)
655             prefixes.append(prefix_base + prefix_index * prefix_gap)
656         if self.log_debug:
657             logger.debug("  Prefix slot index: " + str(slot_index))
658             logger.debug("  Prefix slot size: " + str(slot_size))
659             logger.debug("  Prefix count: " + str(prefix_count))
660             logger.debug("  Prefix indexes: " + str(indexes))
661             logger.debug("  Prefix list: " + str(prefixes))
662         return prefixes
663
664     def compose_update_message(self, prefix_count_to_add=None,
665                                prefix_count_to_del=None):
666         """Composes an UPDATE message
667
668         Arguments:
669             :param prefix_count_to_add: # of prefixes to put into NLRI list
670             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
671         Returns:
672             :return: encoded UPDATE message in HEX
673         Notes:
674             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
675             lists or common message wich includes both prefix lists.
676             Updates global counters.
677         """
678         # default values handling
679         # TODO optimize default values handling (use e.g. dicionary.update() approach)
680         if prefix_count_to_add is None:
681             prefix_count_to_add = self.prefix_count_to_add_default
682         if prefix_count_to_del is None:
683             prefix_count_to_del = self.prefix_count_to_del_default
684         # logging
685         if self.log_info and not (self.iteration % 1000):
686             logger.info("Iteration: " + str(self.iteration) +
687                         " - total remaining prefixes: " +
688                         str(self.remaining_prefixes))
689         if self.log_debug:
690             logger.debug("#" * 10 + " Iteration: " +
691                          str(self.iteration) + " " + "#" * 10)
692             logger.debug("Remaining prefixes: " +
693                          str(self.remaining_prefixes))
694         # scenario type & one-shot counter
695         straightforward_scenario = (self.remaining_prefixes >
696                                     self.remaining_prefixes_threshold)
697         if straightforward_scenario:
698             prefix_count_to_del = 0
699             if self.log_debug:
700                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
701             if not self.phase1_start_time:
702                 self.phase1_start_time = time.time()
703         else:
704             if self.log_debug:
705                 logger.debug("--- COMBINED SCENARIO ---")
706             if not self.phase2_start_time:
707                 self.phase2_start_time = time.time()
708         # tailor the number of prefixes if needed
709         prefix_count_to_add = (prefix_count_to_del +
710                                min(prefix_count_to_add - prefix_count_to_del,
711                                    self.remaining_prefixes))
712         # prefix slots selection for insertion and withdrawal
713         slot_index_to_add = self.iteration
714         slot_index_to_del = slot_index_to_add - self.slot_gap_default
715         # getting lists of prefixes for insertion in this iteration
716         if self.log_debug:
717             logger.debug("Prefixes to be inserted in this iteration:")
718         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
719                                                   prefix_count=prefix_count_to_add)
720         # getting lists of prefixes for withdrawal in this iteration
721         if self.log_debug:
722             logger.debug("Prefixes to be withdrawn in this iteration:")
723         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
724                                                   prefix_count=prefix_count_to_del)
725         # generating the UPDATE mesage with LS-NLRI only
726         if self.bgpls:
727             ls_nlri = self.get_ls_nlri_values(self.iteration)
728             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
729                                           **ls_nlri)
730         else:
731             # generating the UPDATE message with prefix lists
732             if self.single_update_default:
733                 # Send prefixes to be introduced and withdrawn
734                 # in one UPDATE message
735                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
736                                               nlri_prefixes=prefix_list_to_add)
737             else:
738                 # Send prefixes to be introduced and withdrawn
739                 # in separate UPDATE messages (if needed)
740                 msg_out = self.update_message(wr_prefixes=[],
741                                               nlri_prefixes=prefix_list_to_add)
742                 if prefix_count_to_del:
743                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
744                                                    nlri_prefixes=[])
745         # updating counters - who knows ... maybe I am last time here ;)
746         if straightforward_scenario:
747             self.phase1_stop_time = time.time()
748             self.phase1_updates_sent = self.updates_sent
749         else:
750             self.phase2_stop_time = time.time()
751             self.phase2_updates_sent = (self.updates_sent -
752                                         self.phase1_updates_sent)
753         # updating totals for the next iteration
754         self.iteration += 1
755         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
756         # returning the encoded message
757         return msg_out
758
759     # Section of message encoders
760
761     def open_message(self, version=None, my_autonomous_system=None,
762                      hold_time=None, bgp_identifier=None):
763         """Generates an OPEN Message (rfc4271#section-4.2)
764
765         Arguments:
766             :param version: see the rfc4271#section-4.2
767             :param my_autonomous_system: see the rfc4271#section-4.2
768             :param hold_time: see the rfc4271#section-4.2
769             :param bgp_identifier: see the rfc4271#section-4.2
770         Returns:
771             :return: encoded OPEN message in HEX
772         """
773
774         # default values handling
775         # TODO optimize default values handling (use e.g. dicionary.update() approach)
776         if version is None:
777             version = self.version_default
778         if my_autonomous_system is None:
779             my_autonomous_system = self.my_autonomous_system_default
780         if hold_time is None:
781             hold_time = self.hold_time_default
782         if bgp_identifier is None:
783             bgp_identifier = self.bgp_identifier_default
784
785         # Marker
786         marker_hex = "\xFF" * 16
787
788         # Type
789         type = 1
790         type_hex = struct.pack("B", type)
791
792         # version
793         version_hex = struct.pack("B", version)
794
795         # my_autonomous_system
796         # AS_TRANS value, 23456 decadic.
797         my_autonomous_system_2_bytes = 23456
798         # AS number is mappable to 2 bytes
799         if my_autonomous_system < 65536:
800             my_autonomous_system_2_bytes = my_autonomous_system
801         my_autonomous_system_hex_2_bytes = struct.pack(">H",
802                                                        my_autonomous_system)
803
804         # Hold Time
805         hold_time_hex = struct.pack(">H", hold_time)
806
807         # BGP Identifier
808         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
809
810         # Optional Parameters
811         optional_parameters_hex = ""
812         if self.rfc4760 or self.allf:
813             optional_parameter_hex = (
814                 "\x02"  # Param type ("Capability Ad")
815                 "\x06"  # Length (6 bytes)
816                 "\x01"  # Capability type (NLRI Unicast),
817                         # see RFC 4760, secton 8
818                 "\x04"  # Capability value length
819                 "\x00\x01"  # AFI (Ipv4)
820                 "\x00"  # (reserved)
821                 "\x01"  # SAFI (Unicast)
822             )
823             optional_parameters_hex += optional_parameter_hex
824
825         if self.bgpls or self.allf:
826             optional_parameter_hex = (
827                 "\x02"  # Param type ("Capability Ad")
828                 "\x06"  # Length (6 bytes)
829                 "\x01"  # Capability type (NLRI Unicast),
830                         # see RFC 4760, secton 8
831                 "\x04"  # Capability value length
832                 "\x40\x04"  # AFI (BGP-LS)
833                 "\x00"  # (reserved)
834                 "\x47"  # SAFI (BGP-LS)
835             )
836             optional_parameters_hex += optional_parameter_hex
837
838         if self.evpn or self.allf:
839             optional_parameter_hex = (
840                 "\x02"  # Param type ("Capability Ad")
841                 "\x06"  # Length (6 bytes)
842                 "\x01"  # Multiprotocol extetension capability,
843                 "\x04"  # Capability value length
844                 "\x00\x19"  # AFI (L2-VPN)
845                 "\x00"  # (reserved)
846                 "\x46"  # SAFI (EVPN)
847             )
848             optional_parameters_hex += optional_parameter_hex
849
850         if self.mvpn or self.allf:
851             optional_parameter_hex = (
852                 "\x02"  # Param type ("Capability Ad")
853                 "\x06"  # Length (6 bytes)
854                 "\x01"  # Multiprotocol extetension capability,
855                 "\x04"  # Capability value length
856                 "\x00\x01"  # AFI (IPV4)
857                 "\x00"  # (reserved)
858                 "\x05"  # SAFI (MCAST-VPN)
859             )
860             optional_parameters_hex += optional_parameter_hex
861             optional_parameter_hex = (
862                 "\x02"  # Param type ("Capability Ad")
863                 "\x06"  # Length (6 bytes)
864                 "\x01"  # Multiprotocol extetension capability,
865                 "\x04"  # Capability value length
866                 "\x00\x02"  # AFI (IPV6)
867                 "\x00"  # (reserved)
868                 "\x05"  # SAFI (MCAST-VPN)
869             )
870             optional_parameters_hex += optional_parameter_hex
871
872         if self.l3vpn_mcast or self.allf:
873             optional_parameter_hex = (
874                 "\x02"  # Param type ("Capability Ad")
875                 "\x06"  # Length (6 bytes)
876                 "\x01"  # Multiprotocol extetension capability,
877                 "\x04"  # Capability value length
878                 "\x00\x01"  # AFI (IPV4)
879                 "\x00"  # (reserved)
880                 "\x81"  # SAFI (L3VPN-MCAST)
881             )
882             optional_parameters_hex += optional_parameter_hex
883             optional_parameter_hex = (
884                 "\x02"  # Param type ("Capability Ad")
885                 "\x06"  # Length (6 bytes)
886                 "\x01"  # Multiprotocol extetension capability,
887                 "\x04"  # Capability value length
888                 "\x00\x02"  # AFI (IPV6)
889                 "\x00"  # (reserved)
890                 "\x81"  # SAFI (L3VPN-MCAST)
891             )
892             optional_parameters_hex += optional_parameter_hex
893
894         if self.l3vpn or self.allf:
895             optional_parameter_hex = (
896                 "\x02"  # Param type ("Capability Ad")
897                 "\x06"  # Length (6 bytes)
898                 "\x01"  # Multiprotocol extetension capability,
899                 "\x04"  # Capability value length
900                 "\x00\x01"  # AFI (IPV4)
901                 "\x00"  # (reserved)
902                 "\x80"  # SAFI (L3VPN-UNICAST)
903             )
904             optional_parameters_hex += optional_parameter_hex
905             optional_parameter_hex = (
906                 "\x02"  # Param type ("Capability Ad")
907                 "\x06"  # Length (6 bytes)
908                 "\x01"  # Multiprotocol extetension capability,
909                 "\x04"  # Capability value length
910                 "\x00\x02"  # AFI (IPV6)
911                 "\x00"  # (reserved)
912                 "\x80"  # SAFI (L3VPN-UNICAST)
913             )
914             optional_parameters_hex += optional_parameter_hex
915
916         if self.rt_constrain or self.allf:
917             optional_parameter_hex = (
918                 "\x02"  # Param type ("Capability Ad")
919                 "\x06"  # Length (6 bytes)
920                 "\x01"  # Multiprotocol extetension capability,
921                 "\x04"  # Capability value length
922                 "\x00\x01"  # AFI (IPV4)
923                 "\x00"  # (reserved)
924                 "\x84"  # SAFI (ROUTE-TARGET-CONSTRAIN)
925             )
926             optional_parameters_hex += optional_parameter_hex
927
928         optional_parameter_hex = (
929             "\x02"  # Param type ("Capability Ad")
930             "\x06"  # Length (6 bytes)
931             "\x41"  # "32 bit AS Numbers Support"
932                     # (see RFC 6793, section 3)
933             "\x04"  # Capability value length
934         )
935         optional_parameter_hex += (
936             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
937         )
938         optional_parameters_hex += optional_parameter_hex
939
940         # Optional Parameters Length
941         optional_parameters_length = len(optional_parameters_hex)
942         optional_parameters_length_hex = struct.pack("B",
943                                                      optional_parameters_length)
944
945         # Length (big-endian)
946         length = (
947             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
948             len(my_autonomous_system_hex_2_bytes) +
949             len(hold_time_hex) + len(bgp_identifier_hex) +
950             len(optional_parameters_length_hex) +
951             len(optional_parameters_hex)
952         )
953         length_hex = struct.pack(">H", length)
954
955         # OPEN Message
956         message_hex = (
957             marker_hex +
958             length_hex +
959             type_hex +
960             version_hex +
961             my_autonomous_system_hex_2_bytes +
962             hold_time_hex +
963             bgp_identifier_hex +
964             optional_parameters_length_hex +
965             optional_parameters_hex
966         )
967
968         if self.log_debug:
969             logger.debug("OPEN message encoding")
970             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
971             logger.debug("  Length=" + str(length) + " (0x" +
972                          binascii.hexlify(length_hex) + ")")
973             logger.debug("  Type=" + str(type) + " (0x" +
974                          binascii.hexlify(type_hex) + ")")
975             logger.debug("  Version=" + str(version) + " (0x" +
976                          binascii.hexlify(version_hex) + ")")
977             logger.debug("  My Autonomous System=" +
978                          str(my_autonomous_system_2_bytes) + " (0x" +
979                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
980                          ")")
981             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
982                          binascii.hexlify(hold_time_hex) + ")")
983             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
984                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
985             logger.debug("  Optional Parameters Length=" +
986                          str(optional_parameters_length) + " (0x" +
987                          binascii.hexlify(optional_parameters_length_hex) +
988                          ")")
989             logger.debug("  Optional Parameters=0x" +
990                          binascii.hexlify(optional_parameters_hex))
991             logger.debug("OPEN message encoded: 0x%s",
992                          binascii.b2a_hex(message_hex))
993
994         return message_hex
995
996     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
997                        wr_prefix_length=None, nlri_prefix_length=None,
998                        my_autonomous_system=None, next_hop=None,
999                        originator_id=None, cluster_list_item=None,
1000                        end_of_rib=False, **ls_nlri_params):
1001         """Generates an UPDATE Message (rfc4271#section-4.3)
1002
1003         Arguments:
1004             :param wr_prefixes: see the rfc4271#section-4.3
1005             :param nlri_prefixes: see the rfc4271#section-4.3
1006             :param wr_prefix_length: see the rfc4271#section-4.3
1007             :param nlri_prefix_length: see the rfc4271#section-4.3
1008             :param my_autonomous_system: see the rfc4271#section-4.3
1009             :param next_hop: see the rfc4271#section-4.3
1010         Returns:
1011             :return: encoded UPDATE message in HEX
1012         """
1013
1014         # default values handling
1015         # TODO optimize default values handling (use e.g. dicionary.update() approach)
1016         if wr_prefixes is None:
1017             wr_prefixes = self.wr_prefixes_default
1018         if nlri_prefixes is None:
1019             nlri_prefixes = self.nlri_prefixes_default
1020         if wr_prefix_length is None:
1021             wr_prefix_length = self.prefix_length_default
1022         if nlri_prefix_length is None:
1023             nlri_prefix_length = self.prefix_length_default
1024         if my_autonomous_system is None:
1025             my_autonomous_system = self.my_autonomous_system_default
1026         if next_hop is None:
1027             next_hop = self.next_hop_default
1028         if originator_id is None:
1029             originator_id = self.originator_id_default
1030         if cluster_list_item is None:
1031             cluster_list_item = self.cluster_list_item_default
1032         ls_nlri = self.ls_nlri_default.copy()
1033         ls_nlri.update(ls_nlri_params)
1034
1035         # Marker
1036         marker_hex = "\xFF" * 16
1037
1038         # Type
1039         type = 2
1040         type_hex = struct.pack("B", type)
1041
1042         # Withdrawn Routes
1043         withdrawn_routes_hex = ""
1044         if not self.bgpls:
1045             bytes = ((wr_prefix_length - 1) / 8) + 1
1046             for prefix in wr_prefixes:
1047                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1048                                        struct.pack(">I", int(prefix))[:bytes])
1049                 withdrawn_routes_hex += withdrawn_route_hex
1050
1051         # Withdrawn Routes Length
1052         withdrawn_routes_length = len(withdrawn_routes_hex)
1053         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1054
1055         # TODO: to replace hardcoded string by encoding?
1056         # Path Attributes
1057         path_attributes_hex = ""
1058         if not self.skipattr:
1059             path_attributes_hex += (
1060                 "\x40"  # Flags ("Well-Known")
1061                 "\x01"  # Type (ORIGIN)
1062                 "\x01"  # Length (1)
1063                 "\x00"  # Origin: IGP
1064             )
1065             path_attributes_hex += (
1066                 "\x40"  # Flags ("Well-Known")
1067                 "\x02"  # Type (AS_PATH)
1068                 "\x06"  # Length (6)
1069                 "\x02"  # AS segment type (AS_SEQUENCE)
1070                 "\x01"  # AS segment length (1)
1071             )
1072             my_as_hex = struct.pack(">I", my_autonomous_system)
1073             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1074             path_attributes_hex += (
1075                 "\x40"  # Flags ("Well-Known")
1076                 "\x05"  # Type (LOCAL_PREF)
1077                 "\x04"  # Length (4)
1078                 "\x00\x00\x00\x64"  # (100)
1079             )
1080         if nlri_prefixes != []:
1081             path_attributes_hex += (
1082                 "\x40"  # Flags ("Well-Known")
1083                 "\x03"  # Type (NEXT_HOP)
1084                 "\x04"  # Length (4)
1085             )
1086             next_hop_hex = struct.pack(">I", int(next_hop))
1087             path_attributes_hex += (
1088                 next_hop_hex  # IP address of the next hop (4 bytes)
1089             )
1090             if originator_id is not None:
1091                 path_attributes_hex += (
1092                     "\x80"  # Flags ("Optional, non-transitive")
1093                     "\x09"  # Type (ORIGINATOR_ID)
1094                     "\x04"  # Length (4)
1095                 )           # ORIGINATOR_ID (4 bytes)
1096                 path_attributes_hex += struct.pack(">I", int(originator_id))
1097             if cluster_list_item is not None:
1098                 path_attributes_hex += (
1099                     "\x80"  # Flags ("Optional, non-transitive")
1100                     "\x0a"  # Type (CLUSTER_LIST)
1101                     "\x04"  # Length (4)
1102                 )           # one CLUSTER_LIST item (4 bytes)
1103                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1104
1105         if self.bgpls and not end_of_rib:
1106             path_attributes_hex += (
1107                 "\x80"  # Flags ("Optional, non-transitive")
1108                 "\x0e"  # Type (MP_REACH_NLRI)
1109                 "\x22"  # Length (34)
1110                 "\x40\x04"  # AFI (BGP-LS)
1111                 "\x47"  # SAFI (BGP-LS)
1112                 "\x04"  # Next Hop Length (4)
1113             )
1114             path_attributes_hex += struct.pack(">I", int(next_hop))
1115             path_attributes_hex += "\x00"           # Reserved
1116             path_attributes_hex += (
1117                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1118                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1119                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1120             )
1121             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1122             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1123             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1124             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1125             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1126
1127         # Total Path Attributes Length
1128         total_path_attributes_length = len(path_attributes_hex)
1129         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1130
1131         # Network Layer Reachability Information
1132         nlri_hex = ""
1133         if not self.bgpls:
1134             bytes = ((nlri_prefix_length - 1) / 8) + 1
1135             for prefix in nlri_prefixes:
1136                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1137                                    struct.pack(">I", int(prefix))[:bytes])
1138                 nlri_hex += nlri_prefix_hex
1139
1140         # Length (big-endian)
1141         length = (
1142             len(marker_hex) + 2 + len(type_hex) +
1143             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1144             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1145             len(nlri_hex))
1146         length_hex = struct.pack(">H", length)
1147
1148         # UPDATE Message
1149         message_hex = (
1150             marker_hex +
1151             length_hex +
1152             type_hex +
1153             withdrawn_routes_length_hex +
1154             withdrawn_routes_hex +
1155             total_path_attributes_length_hex +
1156             path_attributes_hex +
1157             nlri_hex
1158         )
1159
1160         if self.log_debug:
1161             logger.debug("UPDATE message encoding")
1162             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1163             logger.debug("  Length=" + str(length) + " (0x" +
1164                          binascii.hexlify(length_hex) + ")")
1165             logger.debug("  Type=" + str(type) + " (0x" +
1166                          binascii.hexlify(type_hex) + ")")
1167             logger.debug("  withdrawn_routes_length=" +
1168                          str(withdrawn_routes_length) + " (0x" +
1169                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1170             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1171                          str(wr_prefix_length) + " (0x" +
1172                          binascii.hexlify(withdrawn_routes_hex) + ")")
1173             if total_path_attributes_length:
1174                 logger.debug("  Total Path Attributes Length=" +
1175                              str(total_path_attributes_length) + " (0x" +
1176                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1177                 logger.debug("  Path Attributes=" + "(0x" +
1178                              binascii.hexlify(path_attributes_hex) + ")")
1179                 logger.debug("    Origin=IGP")
1180                 logger.debug("    AS path=" + str(my_autonomous_system))
1181                 logger.debug("    Next hop=" + str(next_hop))
1182                 if originator_id is not None:
1183                     logger.debug("    Originator id=" + str(originator_id))
1184                 if cluster_list_item is not None:
1185                     logger.debug("    Cluster list=" + str(cluster_list_item))
1186                 if self.bgpls:
1187                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1188             logger.debug("  Network Layer Reachability Information=" +
1189                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1190                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1191             logger.debug("UPDATE message encoded: 0x" +
1192                          binascii.b2a_hex(message_hex))
1193
1194         # updating counter
1195         self.updates_sent += 1
1196         # returning encoded message
1197         return message_hex
1198
1199     def notification_message(self, error_code, error_subcode, data_hex=""):
1200         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1201
1202         Arguments:
1203             :param error_code: see the rfc4271#section-4.5
1204             :param error_subcode: see the rfc4271#section-4.5
1205             :param data_hex: see the rfc4271#section-4.5
1206         Returns:
1207             :return: encoded NOTIFICATION message in HEX
1208         """
1209
1210         # Marker
1211         marker_hex = "\xFF" * 16
1212
1213         # Type
1214         type = 3
1215         type_hex = struct.pack("B", type)
1216
1217         # Error Code
1218         error_code_hex = struct.pack("B", error_code)
1219
1220         # Error Subode
1221         error_subcode_hex = struct.pack("B", error_subcode)
1222
1223         # Length (big-endian)
1224         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1225                   len(error_subcode_hex) + len(data_hex))
1226         length_hex = struct.pack(">H", length)
1227
1228         # NOTIFICATION Message
1229         message_hex = (
1230             marker_hex +
1231             length_hex +
1232             type_hex +
1233             error_code_hex +
1234             error_subcode_hex +
1235             data_hex
1236         )
1237
1238         if self.log_debug:
1239             logger.debug("NOTIFICATION message encoding")
1240             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1241             logger.debug("  Length=" + str(length) + " (0x" +
1242                          binascii.hexlify(length_hex) + ")")
1243             logger.debug("  Type=" + str(type) + " (0x" +
1244                          binascii.hexlify(type_hex) + ")")
1245             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1246                          binascii.hexlify(error_code_hex) + ")")
1247             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1248                          binascii.hexlify(error_subcode_hex) + ")")
1249             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1250             logger.debug("NOTIFICATION message encoded: 0x%s",
1251                          binascii.b2a_hex(message_hex))
1252
1253         return message_hex
1254
1255     def keepalive_message(self):
1256         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1257
1258         Returns:
1259             :return: encoded KEEP ALIVE message in HEX
1260         """
1261
1262         # Marker
1263         marker_hex = "\xFF" * 16
1264
1265         # Type
1266         type = 4
1267         type_hex = struct.pack("B", type)
1268
1269         # Length (big-endian)
1270         length = len(marker_hex) + 2 + len(type_hex)
1271         length_hex = struct.pack(">H", length)
1272
1273         # KEEP ALIVE Message
1274         message_hex = (
1275             marker_hex +
1276             length_hex +
1277             type_hex
1278         )
1279
1280         if self.log_debug:
1281             logger.debug("KEEP ALIVE message encoding")
1282             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1283             logger.debug("  Length=" + str(length) + " (0x" +
1284                          binascii.hexlify(length_hex) + ")")
1285             logger.debug("  Type=" + str(type) + " (0x" +
1286                          binascii.hexlify(type_hex) + ")")
1287             logger.debug("KEEP ALIVE message encoded: 0x%s",
1288                          binascii.b2a_hex(message_hex))
1289
1290         return message_hex
1291
1292
1293 class TimeTracker(object):
1294     """Class for tracking timers, both for my keepalives and
1295     peer's hold time.
1296     """
1297
1298     def __init__(self, msg_in):
1299         """Initialisation. based on defaults and OPEN message from peer.
1300
1301         Arguments:
1302             msg_in: the OPEN message received from peer.
1303         """
1304         # Note: Relative time is always named timedelta, to stress that
1305         # the (non-delta) time is absolute.
1306         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1307         # Upper bound for being stuck in the same state, we should
1308         # at least report something before continuing.
1309         # Negotiate the hold timer by taking the smaller
1310         # of the 2 values (mine and the peer's).
1311         hold_timedelta = 180  # Not an attribute of self yet.
1312         # TODO: Make the default value configurable,
1313         # default value could mirror what peer said.
1314         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1315         if hold_timedelta > peer_hold_timedelta:
1316             hold_timedelta = peer_hold_timedelta
1317         if hold_timedelta != 0 and hold_timedelta < 3:
1318             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1319             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1320         self.hold_timedelta = hold_timedelta
1321         # If we do not hear from peer this long, we assume it has died.
1322         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1323         # Upper limit for duration between messages, to avoid being
1324         # declared to be dead.
1325         # The same as calling snapshot(), but also declares a field.
1326         self.snapshot_time = time.time()
1327         # Sometimes we need to store time. This is where to get
1328         # the value from afterwards. Time_keepalive may be too strict.
1329         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1330         # At this time point, peer will be declared dead.
1331         self.my_keepalive_time = None  # to be set later
1332         # At this point, we should be sending keepalive message.
1333
1334     def snapshot(self):
1335         """Store current time in instance data to use later."""
1336         # Read as time before something interesting was called.
1337         self.snapshot_time = time.time()
1338
1339     def reset_peer_hold_time(self):
1340         """Move hold time to future as peer has just proven it still lives."""
1341         self.peer_hold_time = time.time() + self.hold_timedelta
1342
1343     # Some methods could rely on self.snapshot_time, but it is better
1344     # to require user to provide it explicitly.
1345     def reset_my_keepalive_time(self, keepalive_time):
1346         """Calculate and set the next my KEEP ALIVE timeout time
1347
1348         Arguments:
1349             :keepalive_time: the initial value of the KEEP ALIVE timer
1350         """
1351         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1352
1353     def is_time_for_my_keepalive(self):
1354         """Check for my KEEP ALIVE timeout occurence"""
1355         if self.hold_timedelta == 0:
1356             return False
1357         return self.snapshot_time >= self.my_keepalive_time
1358
1359     def get_next_event_time(self):
1360         """Set the time of the next expected or to be sent KEEP ALIVE"""
1361         if self.hold_timedelta == 0:
1362             return self.snapshot_time + 86400
1363         return min(self.my_keepalive_time, self.peer_hold_time)
1364
1365     def check_peer_hold_time(self, snapshot_time):
1366         """Raise error if nothing was read from peer until specified time."""
1367         # Hold time = 0 means keepalive checking off.
1368         if self.hold_timedelta != 0:
1369             # time.time() may be too strict
1370             if snapshot_time > self.peer_hold_time:
1371                 logger.error("Peer has overstepped the hold timer.")
1372                 raise RuntimeError("Peer has overstepped the hold timer.")
1373                 # TODO: Include hold_timedelta?
1374                 # TODO: Add notification sending (attempt). That means
1375                 # move to write tracker.
1376
1377
1378 class ReadTracker(object):
1379     """Class for tracking read of mesages chunk by chunk and
1380     for idle waiting.
1381     """
1382
1383     def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1384                  l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1385                  wait_for_read=10):
1386         """The reader initialisation.
1387
1388         Arguments:
1389             bgp_socket: socket to be used for sending
1390             timer: timer to be used for scheduling
1391             storage: thread safe dict
1392             evpn: flag that evpn functionality is tested
1393             mvpn: flag that mvpn functionality is tested
1394             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1395             l3vpn: flag that l3vpn unicast functionality is tested
1396             rt_constrain: flag that rt-constrain functionality is tested
1397             allf: flag for all family testing.
1398         """
1399         # References to outside objects.
1400         self.socket = bgp_socket
1401         self.timer = timer
1402         # BGP marker length plus length field length.
1403         self.header_length = 18
1404         # TODO: make it class (constant) attribute
1405         # Computation of where next chunk ends depends on whether
1406         # we are beyond length field.
1407         self.reading_header = True
1408         # Countdown towards next size computation.
1409         self.bytes_to_read = self.header_length
1410         # Incremental buffer for message under read.
1411         self.msg_in = ""
1412         # Initialising counters
1413         self.updates_received = 0
1414         self.prefixes_introduced = 0
1415         self.prefixes_withdrawn = 0
1416         self.rx_idle_time = 0
1417         self.rx_activity_detected = True
1418         self.storage = storage
1419         self.evpn = evpn
1420         self.mvpn = mvpn
1421         self.l3vpn_mcast = l3vpn_mcast
1422         self.l3vpn = l3vpn
1423         self.rt_constrain = rt_constrain
1424         self.allf = allf
1425         self.wfr = wait_for_read
1426
1427     def read_message_chunk(self):
1428         """Read up to one message
1429
1430         Note:
1431             Currently it does not return anything.
1432         """
1433         # TODO: We could return the whole message, currently not needed.
1434         # We assume the socket is readable.
1435         chunk_message = self.socket.recv(self.bytes_to_read)
1436         self.msg_in += chunk_message
1437         self.bytes_to_read -= len(chunk_message)
1438         # TODO: bytes_to_read < 0 is not possible, right?
1439         if not self.bytes_to_read:
1440             # Finished reading a logical block.
1441             if self.reading_header:
1442                 # The logical block was a BGP header.
1443                 # Now we know the size of the message.
1444                 self.reading_header = False
1445                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1446                                       self.header_length)
1447             else:  # We have finished reading the body of the message.
1448                 # Peer has just proven it is still alive.
1449                 self.timer.reset_peer_hold_time()
1450                 # TODO: Do we want to count received messages?
1451                 # This version ignores the received message.
1452                 # TODO: Should we do validation and exit on anything
1453                 # besides update or keepalive?
1454                 # Prepare state for reading another message.
1455                 message_type_hex = self.msg_in[self.header_length]
1456                 if message_type_hex == "\x01":
1457                     logger.info("OPEN message received: 0x%s",
1458                                 binascii.b2a_hex(self.msg_in))
1459                 elif message_type_hex == "\x02":
1460                     logger.debug("UPDATE message received: 0x%s",
1461                                  binascii.b2a_hex(self.msg_in))
1462                     self.decode_update_message(self.msg_in)
1463                 elif message_type_hex == "\x03":
1464                     logger.info("NOTIFICATION message received: 0x%s",
1465                                 binascii.b2a_hex(self.msg_in))
1466                 elif message_type_hex == "\x04":
1467                     logger.info("KEEP ALIVE message received: 0x%s",
1468                                 binascii.b2a_hex(self.msg_in))
1469                 else:
1470                     logger.warning("Unexpected message received: 0x%s",
1471                                    binascii.b2a_hex(self.msg_in))
1472                 self.msg_in = ""
1473                 self.reading_header = True
1474                 self.bytes_to_read = self.header_length
1475         # We should not act upon peer_hold_time if we are reading
1476         # something right now.
1477         return
1478
1479     def decode_path_attributes(self, path_attributes_hex):
1480         """Decode the Path Attributes field (rfc4271#section-4.3)
1481
1482         Arguments:
1483             :path_attributes: path_attributes field to be decoded in hex
1484         Returns:
1485             :return: None
1486         """
1487         hex_to_decode = path_attributes_hex
1488
1489         while len(hex_to_decode):
1490             attr_flags_hex = hex_to_decode[0]
1491             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1492 #            attr_optional_bit = attr_flags & 128
1493 #            attr_transitive_bit = attr_flags & 64
1494 #            attr_partial_bit = attr_flags & 32
1495             attr_extended_length_bit = attr_flags & 16
1496
1497             attr_type_code_hex = hex_to_decode[1]
1498             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1499
1500             if attr_extended_length_bit:
1501                 attr_length_hex = hex_to_decode[2:4]
1502                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1503                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1504                 hex_to_decode = hex_to_decode[4 + attr_length:]
1505             else:
1506                 attr_length_hex = hex_to_decode[2]
1507                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1508                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1509                 hex_to_decode = hex_to_decode[3 + attr_length:]
1510
1511             if attr_type_code == 1:
1512                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1513                              binascii.b2a_hex(attr_flags_hex))
1514                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1515             elif attr_type_code == 2:
1516                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1517                              binascii.b2a_hex(attr_flags_hex))
1518                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1519             elif attr_type_code == 3:
1520                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1521                              binascii.b2a_hex(attr_flags_hex))
1522                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1523             elif attr_type_code == 4:
1524                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1525                              binascii.b2a_hex(attr_flags_hex))
1526                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1527             elif attr_type_code == 5:
1528                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1529                              binascii.b2a_hex(attr_flags_hex))
1530                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1531             elif attr_type_code == 6:
1532                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1533                              binascii.b2a_hex(attr_flags_hex))
1534                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1535             elif attr_type_code == 7:
1536                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1537                              binascii.b2a_hex(attr_flags_hex))
1538                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1539             elif attr_type_code == 9:  # rfc4456#section-8
1540                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1541                              binascii.b2a_hex(attr_flags_hex))
1542                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1543             elif attr_type_code == 10:  # rfc4456#section-8
1544                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1545                              binascii.b2a_hex(attr_flags_hex))
1546                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1547             elif attr_type_code == 14:  # rfc4760#section-3
1548                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1549                              binascii.b2a_hex(attr_flags_hex))
1550                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1551                 address_family_identifier_hex = attr_value_hex[0:2]
1552                 logger.debug("  Address Family Identifier=0x%s",
1553                              binascii.b2a_hex(address_family_identifier_hex))
1554                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1555                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1556                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1557                 next_hop_netaddr_len_hex = attr_value_hex[3]
1558                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1559                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1560                              next_hop_netaddr_len,
1561                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1562                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1563                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1564                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1565                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1566                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1567                 logger.debug("  Reserved=0x%s",
1568                              binascii.b2a_hex(reserved_hex))
1569                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1570                 logger.debug("  Network Layer Reachability Information=0x%s",
1571                              binascii.b2a_hex(nlri_hex))
1572                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1573                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1574                 for prefix in nlri_prefix_list:
1575                     logger.debug("  nlri_prefix_received: %s", prefix)
1576                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1577             elif attr_type_code == 15:  # rfc4760#section-4
1578                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1579                              binascii.b2a_hex(attr_flags_hex))
1580                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1581                 address_family_identifier_hex = attr_value_hex[0:2]
1582                 logger.debug("  Address Family Identifier=0x%s",
1583                              binascii.b2a_hex(address_family_identifier_hex))
1584                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1585                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1586                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1587                 wd_hex = attr_value_hex[3:]
1588                 logger.debug("  Withdrawn Routes=0x%s",
1589                              binascii.b2a_hex(wd_hex))
1590                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1591                 logger.debug("  Withdrawn routes prefix list: %s",
1592                              wdr_prefix_list)
1593                 for prefix in wdr_prefix_list:
1594                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1595                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1596             else:
1597                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1598                              binascii.b2a_hex(attr_flags_hex))
1599                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1600         return None
1601
1602     def decode_update_message(self, msg):
1603         """Decode an UPDATE message (rfc4271#section-4.3)
1604
1605         Arguments:
1606             :msg: message to be decoded in hex
1607         Returns:
1608             :return: None
1609         """
1610         logger.debug("Decoding update message:")
1611         # message header - marker
1612         marker_hex = msg[:16]
1613         logger.debug("Message header marker: 0x%s",
1614                      binascii.b2a_hex(marker_hex))
1615         # message header - message length
1616         msg_length_hex = msg[16:18]
1617         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1618         logger.debug("Message lenght: 0x%s (%s)",
1619                      binascii.b2a_hex(msg_length_hex), msg_length)
1620         # message header - message type
1621         msg_type_hex = msg[18:19]
1622         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1623
1624         with self.storage as stor:
1625             # this will replace the previously stored message
1626             stor['update'] = binascii.hexlify(msg)
1627
1628         logger.debug("Evpn {}".format(self.evpn))
1629         if self.evpn:
1630             logger.debug("Skipping update decoding due to evpn data expected")
1631             return
1632
1633         logger.debug("Mvpn {}".format(self.mvpn))
1634         if self.mvpn:
1635             logger.debug("Skipping update decoding due to mvpn data expected")
1636             return
1637
1638         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1639         if self.l3vpn_mcast:
1640             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1641             return
1642
1643         logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1644         if self.l3vpn_mcast:
1645             logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1646             return
1647
1648         logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1649         if self.rt_constrain:
1650             logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1651             return
1652
1653         logger.debug("Allf {}".format(self.allf))
1654         if self.allf:
1655             logger.debug("Skipping update decoding")
1656             return
1657
1658         if msg_type == 2:
1659             logger.debug("Message type: 0x%s (update)",
1660                          binascii.b2a_hex(msg_type_hex))
1661             # withdrawn routes length
1662             wdr_length_hex = msg[19:21]
1663             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1664             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1665                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1666             # withdrawn routes
1667             wdr_hex = msg[21:21 + wdr_length]
1668             logger.debug("Withdrawn routes: 0x%s",
1669                          binascii.b2a_hex(wdr_hex))
1670             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1671             logger.debug("Withdrawn routes prefix list: %s",
1672                          wdr_prefix_list)
1673             for prefix in wdr_prefix_list:
1674                 logger.debug("withdrawn_prefix_received: %s", prefix)
1675             # total path attribute length
1676             total_pa_length_offset = 21 + wdr_length
1677             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1678             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1679             logger.debug("Total path attribute lenght: 0x%s (%s)",
1680                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1681             # path attributes
1682             pa_offset = total_pa_length_offset + 2
1683             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1684             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1685             self.decode_path_attributes(pa_hex)
1686             # network layer reachability information length
1687             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1688             logger.debug("Calculated NLRI length: %s", nlri_length)
1689             # network layer reachability information
1690             nlri_offset = pa_offset + total_pa_length
1691             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1692             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1693             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1694             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1695             for prefix in nlri_prefix_list:
1696                 logger.debug("nlri_prefix_received: %s", prefix)
1697             # Updating counters
1698             self.updates_received += 1
1699             self.prefixes_introduced += len(nlri_prefix_list)
1700             self.prefixes_withdrawn += len(wdr_prefix_list)
1701         else:
1702             logger.error("Unexpeced message type 0x%s in 0x%s",
1703                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1704
1705     def wait_for_read(self):
1706         """Read message until timeout (next expected event).
1707
1708         Note:
1709             Used when no more updates has to be sent to avoid busy-wait.
1710             Currently it does not return anything.
1711         """
1712         # Compute time to the first predictable state change
1713         event_time = self.timer.get_next_event_time()
1714         # snapshot_time would be imprecise
1715         wait_timedelta = min(event_time - time.time(), self.wfr)
1716         if wait_timedelta < 0:
1717             # The program got around to waiting to an event in "very near
1718             # future" so late that it became a "past" event, thus tell
1719             # "select" to not wait at all. Passing negative timedelta to
1720             # select() would lead to either waiting forever (for -1) or
1721             # select.error("Invalid parameter") (for everything else).
1722             wait_timedelta = 0
1723         # And wait for event or something to read.
1724
1725         if not self.rx_activity_detected or not (self.updates_received % 100):
1726             # right time to write statistics to the log (not for every update and
1727             # not too frequently to avoid having large log files)
1728             logger.info("total_received_update_message_counter: %s",
1729                         self.updates_received)
1730             logger.info("total_received_nlri_prefix_counter: %s",
1731                         self.prefixes_introduced)
1732             logger.info("total_received_withdrawn_prefix_counter: %s",
1733                         self.prefixes_withdrawn)
1734
1735         start_time = time.time()
1736         select.select([self.socket], [], [self.socket], wait_timedelta)
1737         timedelta = time.time() - start_time
1738         self.rx_idle_time += timedelta
1739         self.rx_activity_detected = timedelta < 1
1740
1741         if not self.rx_activity_detected or not (self.updates_received % 100):
1742             # right time to write statistics to the log (not for every update and
1743             # not too frequently to avoid having large log files)
1744             logger.info("... idle for %.3fs", timedelta)
1745             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1746         return
1747
1748
1749 class WriteTracker(object):
1750     """Class tracking enqueueing messages and sending chunks of them."""
1751
1752     def __init__(self, bgp_socket, generator, timer):
1753         """The writter initialisation.
1754
1755         Arguments:
1756             bgp_socket: socket to be used for sending
1757             generator: generator to be used for message generation
1758             timer: timer to be used for scheduling
1759         """
1760         # References to outside objects,
1761         self.socket = bgp_socket
1762         self.generator = generator
1763         self.timer = timer
1764         # Really new fields.
1765         # TODO: Would attribute docstrings add anything substantial?
1766         self.sending_message = False
1767         self.bytes_to_send = 0
1768         self.msg_out = ""
1769
1770     def enqueue_message_for_sending(self, message):
1771         """Enqueue message and change state.
1772
1773         Arguments:
1774             message: message to be enqueued into the msg_out buffer
1775         """
1776         self.msg_out += message
1777         self.bytes_to_send += len(message)
1778         self.sending_message = True
1779
1780     def send_message_chunk_is_whole(self):
1781         """Send enqueued data from msg_out buffer
1782
1783         Returns:
1784             :return: true if no remaining data to send
1785         """
1786         # We assume there is a msg_out to send and socket is writable.
1787         # print "going to send", repr(self.msg_out)
1788         self.timer.snapshot()
1789         bytes_sent = self.socket.send(self.msg_out)
1790         # Forget the part of message that was sent.
1791         self.msg_out = self.msg_out[bytes_sent:]
1792         self.bytes_to_send -= bytes_sent
1793         if not self.bytes_to_send:
1794             # TODO: Is it possible to hit negative bytes_to_send?
1795             self.sending_message = False
1796             # We should have reset hold timer on peer side.
1797             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1798             # The possible reason for not prioritizing reads is gone.
1799             return True
1800         return False
1801
1802
1803 class StateTracker(object):
1804     """Main loop has state so complex it warrants this separate class."""
1805
1806     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1807         """The state tracker initialisation.
1808
1809         Arguments:
1810             bgp_socket: socket to be used for sending / receiving
1811             generator: generator to be used for message generation
1812             timer: timer to be used for scheduling
1813             inqueue: user initiated messages queue
1814             storage: thread safe dict to store data for the rpc server
1815             cliargs: cli args from the user
1816         """
1817         # References to outside objects.
1818         self.socket = bgp_socket
1819         self.generator = generator
1820         self.timer = timer
1821         # Sub-trackers.
1822         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1823                                   l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1824                                   rt_constrain=cliargs.rt_constrain, wait_for_read=cliargs.wfr)
1825         self.writer = WriteTracker(bgp_socket, generator, timer)
1826         # Prioritization state.
1827         self.prioritize_writing = False
1828         # In general, we prioritize reading over writing. But in order
1829         # not to get blocked by neverending reads, we should
1830         # check whether we are not risking running out of holdtime.
1831         # So in some situations, this field is set to True to attempt
1832         # finishing sending a message, after which this field resets
1833         # back to False.
1834         # TODO: Alternative is to switch fairly between reading and
1835         # writing (called round robin from now on).
1836         # Message counting is done in generator.
1837         self.inqueue = inqueue
1838
1839     def perform_one_loop_iteration(self):
1840         """ The main loop iteration
1841
1842         Notes:
1843             Calculates priority, resolves all conditions, calls
1844             appropriate method and returns to caller to repeat.
1845         """
1846         self.timer.snapshot()
1847         if not self.prioritize_writing:
1848             if self.timer.is_time_for_my_keepalive():
1849                 if not self.writer.sending_message:
1850                     # We need to schedule a keepalive ASAP.
1851                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1852                     logger.info("KEEP ALIVE is sent.")
1853                 # We are sending a message now, so let's prioritize it.
1854                 self.prioritize_writing = True
1855
1856         try:
1857             msg = self.inqueue.get_nowait()
1858             logger.info("Received message: {}".format(msg))
1859             msgbin = binascii.unhexlify(msg)
1860             self.writer.enqueue_message_for_sending(msgbin)
1861         except Queue.Empty:
1862             pass
1863         # Now we know what our priorities are, we have to check
1864         # which actions are available.
1865         # socket.socket() returns three lists,
1866         # we store them to list of lists.
1867         list_list = select.select([self.socket], [self.socket], [self.socket],
1868                                   self.timer.report_timedelta)
1869         read_list, write_list, except_list = list_list
1870         # Lists are unpacked, each is either [] or [self.socket],
1871         # so we will test them as boolean.
1872         if except_list:
1873             logger.error("Exceptional state on the socket.")
1874             raise RuntimeError("Exceptional state on socket", self.socket)
1875         # We will do either read or write.
1876         if not (self.prioritize_writing and write_list):
1877             # Either we have no reason to rush writes,
1878             # or the socket is not writable.
1879             # We are focusing on reading here.
1880             if read_list:  # there is something to read indeed
1881                 # In this case we want to read chunk of message
1882                 # and repeat the select,
1883                 self.reader.read_message_chunk()
1884                 return
1885             # We were focusing on reading, but nothing to read was there.
1886             # Good time to check peer for hold timer.
1887             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1888             # Quiet on the read front, we can have attempt to write.
1889         if write_list:
1890             # Either we really want to reset peer's view of our hold
1891             # timer, or there was nothing to read.
1892             # Were we in the middle of sending a message?
1893             if self.writer.sending_message:
1894                 # Was it the end of a message?
1895                 whole = self.writer.send_message_chunk_is_whole()
1896                 # We were pressed to send something and we did it.
1897                 if self.prioritize_writing and whole:
1898                     # We prioritize reading again.
1899                     self.prioritize_writing = False
1900                 return
1901             # Finally to check if still update messages to be generated.
1902             if self.generator.remaining_prefixes:
1903                 msg_out = self.generator.compose_update_message()
1904                 if not self.generator.remaining_prefixes:
1905                     # We have just finished update generation,
1906                     # end-of-rib is due.
1907                     logger.info("All update messages generated.")
1908                     logger.info("Storing performance results.")
1909                     self.generator.store_results()
1910                     logger.info("Finally an END-OF-RIB is sent.")
1911                     msg_out += self.generator.update_message(wr_prefixes=[],
1912                                                              nlri_prefixes=[],
1913                                                              end_of_rib=True)
1914                 self.writer.enqueue_message_for_sending(msg_out)
1915                 # Attempt for real sending to be done in next iteration.
1916                 return
1917             # Nothing to write anymore.
1918             # To avoid busy loop, we do idle waiting here.
1919             self.reader.wait_for_read()
1920             return
1921         # We can neither read nor write.
1922         logger.warning("Input and output both blocked for " +
1923                        str(self.timer.report_timedelta) + " seconds.")
1924         # FIXME: Are we sure select has been really waiting
1925         # the whole period?
1926         return
1927
1928
1929 def create_logger(loglevel, logfile):
1930     """Create logger object
1931
1932     Arguments:
1933         :loglevel: log level
1934         :logfile: log file name
1935     Returns:
1936         :return: logger object
1937     """
1938     logger = logging.getLogger("logger")
1939     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1940     console_handler = logging.StreamHandler()
1941     file_handler = logging.FileHandler(logfile, mode="w")
1942     console_handler.setFormatter(log_formatter)
1943     file_handler.setFormatter(log_formatter)
1944     logger.addHandler(console_handler)
1945     logger.addHandler(file_handler)
1946     logger.setLevel(loglevel)
1947     return logger
1948
1949
1950 def job(arguments, inqueue, storage):
1951     """One time initialisation and iterations looping.
1952     Notes:
1953         Establish BGP connection and run iterations.
1954
1955     Arguments:
1956         :arguments: Command line arguments
1957         :inqueue: Data to be sent from play.py
1958         :storage: Shared dict for rpc server
1959     Returns:
1960         :return: None
1961     """
1962     bgp_socket = establish_connection(arguments)
1963     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1964     # Receive open message before sending anything.
1965     # FIXME: Add parameter to send default open message first,
1966     # to work with "you first" peers.
1967     msg_in = read_open_message(bgp_socket)
1968     timer = TimeTracker(msg_in)
1969     generator = MessageGenerator(arguments)
1970     msg_out = generator.open_message()
1971     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1972     # Send our open message to the peer.
1973     bgp_socket.send(msg_out)
1974     # Wait for confirming keepalive.
1975     # TODO: Surely in just one packet?
1976     # Using exact keepalive length to not to see possible updates.
1977     msg_in = bgp_socket.recv(19)
1978     if msg_in != generator.keepalive_message():
1979         error_msg = "Open not confirmed by keepalive, instead got"
1980         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1981         raise MessageError(error_msg, msg_in)
1982     timer.reset_peer_hold_time()
1983     # Send the keepalive to indicate the connection is accepted.
1984     timer.snapshot()  # Remember this time.
1985     msg_out = generator.keepalive_message()
1986     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1987     bgp_socket.send(msg_out)
1988     # Use the remembered time.
1989     timer.reset_my_keepalive_time(timer.snapshot_time)
1990     # End of initial handshake phase.
1991     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1992     while True:  # main reactor loop
1993         state.perform_one_loop_iteration()
1994
1995
1996 class Rpcs:
1997     '''Handler for SimpleXMLRPCServer'''
1998
1999     def __init__(self, sendqueue, storage):
2000         '''Init method
2001
2002         Arguments:
2003             :sendqueue: queue for data to be sent towards odl
2004             :storage: thread safe dict
2005         '''
2006         self.queue = sendqueue
2007         self.storage = storage
2008
2009     def send(self, text):
2010         '''Data to be sent
2011
2012         Arguments:
2013             :text: hes string of the data to be sent
2014         '''
2015         self.queue.put(text)
2016
2017     def get(self, text=''):
2018         '''Reads data form the storage
2019
2020         - returns stored data or an empty string, at the moment only
2021           'update' is stored
2022
2023         Arguments:
2024             :text: a key to the storage to get the data
2025         Returns:
2026             :data: stored data
2027         '''
2028         with self.storage as stor:
2029             return stor.get(text, '')
2030
2031     def clean(self, text=''):
2032         '''Cleans data form the storage
2033
2034         Arguments:
2035             :text: a key to the storage to clean the data
2036         '''
2037         with self.storage as stor:
2038             if text in stor:
2039                 del stor[text]
2040
2041
2042 def threaded_job(arguments):
2043     """Run the job threaded
2044
2045     Arguments:
2046         :arguments: Command line arguments
2047     Returns:
2048         :return: None
2049     """
2050     amount_left = arguments.amount
2051     utils_left = arguments.multiplicity
2052     prefix_current = arguments.firstprefix
2053     myip_current = arguments.myip
2054     port = arguments.port
2055     thread_args = []
2056     rpcqueue = Queue.Queue()
2057     storage = SafeDict()
2058
2059     while 1:
2060         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
2061         amount_left -= amount_per_util
2062         utils_left -= 1
2063
2064         args = deepcopy(arguments)
2065         args.amount = amount_per_util
2066         args.firstprefix = prefix_current
2067         args.myip = myip_current
2068         thread_args.append(args)
2069
2070         if not utils_left:
2071             break
2072         prefix_current += amount_per_util * 16
2073         myip_current += 1
2074
2075     try:
2076         # Create threads
2077         for t in thread_args:
2078             thread.start_new_thread(job, (t, rpcqueue, storage))
2079     except Exception:
2080         print "Error: unable to start thread."
2081         raise SystemExit(2)
2082
2083     if arguments.usepeerip:
2084         ip = arguments.peerip
2085     else:
2086         ip = arguments.myip
2087     rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2088     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2089     rpcserver.serve_forever()
2090
2091
2092 if __name__ == "__main__":
2093     arguments = parse_arguments()
2094     logger = create_logger(arguments.loglevel, arguments.logfile)
2095     threaded_job(arguments)