d82cbab0073211264b61fc82eb93cddfe0fc25da
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Rpc server port."
69     parser.add_argument("--port", default="8000", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be announced in one iteration"
71     parser.add_argument("--insert", default="1", type=int, help=str_help)
72     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74     str_help = "The number of prefixes to process without withdrawals"
75     parser.add_argument("--prefill", default="0", type=int, help=str_help)
76     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77     parser.add_argument("--updates", choices=["single", "separate"],
78                         default=["separate"], help=str_help)
79     str_help = "Base prefix IP address for prefix generation"
80     parser.add_argument("--firstprefix", default="8.0.1.0",
81                         type=ipaddr.IPv4Address, help=str_help)
82     str_help = "The prefix length."
83     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84     str_help = "Listen for connection, instead of initiating it."
85     parser.add_argument("--listen", action="store_true", help=str_help)
86     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87                 "Default value only suitable for listening.")
88     parser.add_argument("--myip", default="0.0.0.0",
89                         type=ipaddr.IPv4Address, help=str_help)
90     str_help = ("TCP port to bind to when listening or initiating connection." +
91                 "Default only suitable for initiating.")
92     parser.add_argument("--myport", default="0", type=int, help=str_help)
93     str_help = "The IP of the next hop to be placed into the update messages."
94     parser.add_argument("--nexthop", default="192.0.2.1",
95                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96     str_help = "Identifier of the route originator."
97     parser.add_argument("--originator", default=None,
98                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
99     str_help = "Cluster list item identifier."
100     parser.add_argument("--cluster", default=None,
101                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102     str_help = ("Numeric IP Address to try to connect to." +
103                 "Currently no effect in listening mode.")
104     parser.add_argument("--peerip", default="127.0.0.2",
105                         type=ipaddr.IPv4Address, help=str_help)
106     str_help = "TCP port to try to connect to. No effect in listening mode."
107     parser.add_argument("--peerport", default="179", type=int, help=str_help)
108     str_help = "Local hold time."
109     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110     str_help = "Log level (--error, --warning, --info, --debug)"
111     parser.add_argument("--error", dest="loglevel", action="store_const",
112                         const=logging.ERROR, default=logging.INFO,
113                         help=str_help)
114     parser.add_argument("--warning", dest="loglevel", action="store_const",
115                         const=logging.WARNING, default=logging.INFO,
116                         help=str_help)
117     parser.add_argument("--info", dest="loglevel", action="store_const",
118                         const=logging.INFO, default=logging.INFO,
119                         help=str_help)
120     parser.add_argument("--debug", dest="loglevel", action="store_const",
121                         const=logging.DEBUG, default=logging.INFO,
122                         help=str_help)
123     str_help = "Log file name"
124     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125     str_help = "Trailing part of the csv result files for plotting purposes"
126     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127     str_help = "Minimum number of updates to reach to include result into csv."
128     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131     str_help = "Using peerip instead of myip for xmlrpc server"
132     parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133     str_help = "Link-State NLRI supported"
134     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135     str_help = "Link-State NLRI: Identifier"
136     parser.add_argument("-lsid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: Tunnel ID"
138     parser.add_argument("-lstid", default="1", type=int, help=str_help)
139     str_help = "Link-State NLRI: LSP ID"
140     parser.add_argument("-lspid", default="1", type=int, help=str_help)
141     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142     parser.add_argument("--lstsaddr", default="1.2.3.4",
143                         type=ipaddr.IPv4Address, help=str_help)
144     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145     parser.add_argument("--lsteaddr", default="5.6.7.8",
146                         type=ipaddr.IPv4Address, help=str_help)
147     str_help = "Link-State NLRI: Identifier Step"
148     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149     str_help = "Link-State NLRI: Tunnel ID Step"
150     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151     str_help = "Link-State NLRI: LSP ID Step"
152     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157     str_help = "How many play utilities are to be started."
158     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160     Enabling this flag makes the script not decoding the update mesage, because of not\
161     supported decoding for these elements."
162     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164     Enabling this flag makes the script not decoding the update mesage, because of not\
165     supported decoding for these elements."
166     parser.add_argument("--grace", default="8", type=int, help=str_help)
167     str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
168     IPV4-Unicast, IPV6-Unicast, BGP-LS\
169     Enabling this flag makes the script not decoding the update mesage, because of not\
170     supported decoding for these elements."
171     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
172     str_help = "Open message includes L3VPN-MULTICAST arguments.\
173     Enabling this flag makes the script not decoding the update mesage, because of not\
174     supported decoding for these elements."
175     parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
176     str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
177     parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
178     str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
179     parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
180     str_help = "Open message includes ipv6-unicast family, without message decoding."
181     parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
182     str_help = "Add all supported families without message decoding."
183     parser.add_argument("--allf", default=False, action="store_true", help=str_help)
184     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
185     str_help = "Skipping well known attributes for update message"
186     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
187     arguments = parser.parse_args()
188     if arguments.multiplicity < 1:
189         print "Multiplicity", arguments.multiplicity, "is not positive."
190         raise SystemExit(1)
191     # TODO: Are sanity checks (such as asnumber>=0) required?
192     return arguments
193
194
195 def establish_connection(arguments):
196     """Establish connection to BGP peer.
197
198     Arguments:
199         :arguments: following command-line arguments are used
200             - arguments.myip: local IP address
201             - arguments.myport: local port
202             - arguments.peerip: remote IP address
203             - arguments.peerport: remote port
204     Returns:
205         :return: socket.
206     """
207     if arguments.listen:
208         logger.info("Connecting in the listening mode.")
209         logger.debug("Local IP address: " + str(arguments.myip))
210         logger.debug("Local port: " + str(arguments.myport))
211         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
213         # bind need single tuple as argument
214         listening_socket.bind((str(arguments.myip), arguments.myport))
215         listening_socket.listen(1)
216         bgp_socket, _ = listening_socket.accept()
217         # TODO: Verify client IP is cotroller IP.
218         listening_socket.close()
219     else:
220         logger.info("Connecting in the talking mode.")
221         logger.debug("Local IP address: " + str(arguments.myip))
222         logger.debug("Local port: " + str(arguments.myport))
223         logger.debug("Remote IP address: " + str(arguments.peerip))
224         logger.debug("Remote port: " + str(arguments.peerport))
225         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227         # bind to force specified address and port
228         talking_socket.bind((str(arguments.myip), arguments.myport))
229         # socket does not spead ipaddr, hence str()
230         talking_socket.connect((str(arguments.peerip), arguments.peerport))
231         bgp_socket = talking_socket
232     logger.info("Connected to ODL.")
233     return bgp_socket
234
235
236 def get_short_int_from_message(message, offset=16):
237     """Extract 2-bytes number from provided message.
238
239     Arguments:
240         :message: given message
241         :offset: offset of the short_int inside the message
242     Returns:
243         :return: required short_inf value.
244     Notes:
245         default offset value is the BGP message size offset.
246     """
247     high_byte_int = ord(message[offset])
248     low_byte_int = ord(message[offset + 1])
249     short_int = high_byte_int * 256 + low_byte_int
250     return short_int
251
252
253 def get_prefix_list_from_hex(prefixes_hex):
254     """Get decoded list of prefixes (rfc4271#section-4.3)
255
256     Arguments:
257         :prefixes_hex: list of prefixes to be decoded in hex
258     Returns:
259         :return: list of prefixes in the form of ip address (X.X.X.X/X)
260     """
261     prefix_list = []
262     offset = 0
263     while offset < len(prefixes_hex):
264         prefix_bit_len_hex = prefixes_hex[offset]
265         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
266         prefix_len = ((prefix_bit_len - 1) / 8) + 1
267         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
268         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
269         offset += 1 + prefix_len
270         prefix_list.append(prefix + "/" + str(prefix_bit_len))
271     return prefix_list
272
273
274 class MessageError(ValueError):
275     """Value error with logging optimized for hexlified messages."""
276
277     def __init__(self, text, message, *args):
278         """Initialisation.
279
280         Store and call super init for textual comment,
281         store raw message which caused it.
282         """
283         self.text = text
284         self.msg = message
285         super(MessageError, self).__init__(text, message, *args)
286
287     def __str__(self):
288         """Generate human readable error message.
289
290         Returns:
291             :return: human readable message as string
292         Notes:
293             Use a placeholder string if the message is to be empty.
294         """
295         message = binascii.hexlify(self.msg)
296         if message == "":
297             message = "(empty message)"
298         return self.text + ": " + message
299
300
301 def read_open_message(bgp_socket):
302     """Receive peer's OPEN message
303
304     Arguments:
305         :bgp_socket: the socket to be read
306     Returns:
307         :return: received OPEN message.
308     Notes:
309         Performs just basic incomming message checks
310     """
311     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
312     # TODO: Can the incoming open message be split in more than one packet?
313     # Some validation.
314     if len(msg_in) < 37:
315         # 37 is minimal length of open message with 4-byte AS number.
316         error_msg = (
317             "Message length (" + str(len(msg_in)) + ") is smaller than "
318             "minimal length of OPEN message with 4-byte AS number (37)"
319         )
320         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
321         raise MessageError(error_msg, msg_in)
322     # TODO: We could check BGP marker, but it is defined only later;
323     # decide what to do.
324     reported_length = get_short_int_from_message(msg_in)
325     if len(msg_in) != reported_length:
326         error_msg = (
327             "Expected message length (" + reported_length +
328             ") does not match actual length (" + str(len(msg_in)) + ")"
329         )
330         logger.error(error_msg + binascii.hexlify(msg_in))
331         raise MessageError(error_msg, msg_in)
332     logger.info("Open message received.")
333     return msg_in
334
335
336 class MessageGenerator(object):
337     """Class which generates messages, holds states and configuration values."""
338
339     # TODO: Define bgp marker as a class (constant) variable.
340     def __init__(self, args):
341         """Initialisation according to command-line args.
342
343         Arguments:
344             :args: argsparser's Namespace object which contains command-line
345                 options for MesageGenerator initialisation
346         Notes:
347             Calculates and stores default values used later on for
348             message generation.
349         """
350         self.total_prefix_amount = args.amount
351         # Number of update messages left to be sent.
352         self.remaining_prefixes = self.total_prefix_amount
353
354         # New parameters initialisation
355         self.port = args.port
356         self.iteration = 0
357         self.prefix_base_default = args.firstprefix
358         self.prefix_length_default = args.prefixlen
359         self.wr_prefixes_default = []
360         self.nlri_prefixes_default = []
361         self.version_default = 4
362         self.my_autonomous_system_default = args.asnumber
363         self.hold_time_default = args.holdtime  # Local hold time.
364         self.bgp_identifier_default = int(args.myip)
365         self.next_hop_default = args.nexthop
366         self.originator_id_default = args.originator
367         self.cluster_list_item_default = args.cluster
368         self.single_update_default = args.updates == "single"
369         self.randomize_updates_default = args.updates == "random"
370         self.prefix_count_to_add_default = args.insert
371         self.prefix_count_to_del_default = args.withdraw
372         if self.prefix_count_to_del_default < 0:
373             self.prefix_count_to_del_default = 0
374         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
375             # total number of prefixes must grow to avoid infinite test loop
376             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
377         self.slot_size_default = self.prefix_count_to_add_default
378         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
379         self.results_file_name_default = args.results
380         self.performance_threshold_default = args.threshold
381         self.rfc4760 = args.rfc4760
382         self.bgpls = args.bgpls
383         self.evpn = args.evpn
384         self.mvpn = args.mvpn
385         self.l3vpn_mcast = args.l3vpn_mcast
386         self.l3vpn = args.l3vpn
387         self.rt_constrain = args.rt_constrain
388         self.ipv6 = args.ipv6
389         self.allf = args.allf
390         self.skipattr = args.skipattr
391         self.grace = args.grace
392         # Default values when BGP-LS Attributes are used
393         if self.bgpls:
394             self.prefix_count_to_add_default = 1
395             self.prefix_count_to_del_default = 0
396         self.ls_nlri_default = {"Identifier": args.lsid,
397                                 "TunnelID": args.lstid,
398                                 "LSPID": args.lspid,
399                                 "IPv4TunnelSenderAddress": args.lstsaddr,
400                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
401         self.lsid_step = args.lsidstep
402         self.lstid_step = args.lstidstep
403         self.lspid_step = args.lspidstep
404         self.lstsaddr_step = args.lstsaddrstep
405         self.lsteaddr_step = args.lsteaddrstep
406         # Default values used for randomized part
407         s1_slots = ((self.total_prefix_amount -
408                      self.remaining_prefixes_threshold - 1) /
409                     self.prefix_count_to_add_default + 1)
410         s2_slots = (
411             (self.remaining_prefixes_threshold - 1)
412             / (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
413             + 1
414         )
415         # S1_First_Index = 0
416         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
417         s2_first_index = s1_slots * self.prefix_count_to_add_default
418         s2_last_index = (s2_first_index +
419                          s2_slots * (self.prefix_count_to_add_default -
420                                      self.prefix_count_to_del_default) - 1)
421         self.slot_gap_default = ((self.total_prefix_amount -
422                                   self.remaining_prefixes_threshold - 1) /
423                                  self.prefix_count_to_add_default + 1)
424         self.randomize_lowest_default = s2_first_index
425         self.randomize_highest_default = s2_last_index
426         # Initialising counters
427         self.phase1_start_time = 0
428         self.phase1_stop_time = 0
429         self.phase2_start_time = 0
430         self.phase2_stop_time = 0
431         self.phase1_updates_sent = 0
432         self.phase2_updates_sent = 0
433         self.updates_sent = 0
434
435         self.log_info = args.loglevel <= logging.INFO
436         self.log_debug = args.loglevel <= logging.DEBUG
437         """
438         Flags needed for the MessageGenerator performance optimization.
439         Calling logger methods each iteration even with proper log level set
440         slows down significantly the MessageGenerator performance.
441         Measured total generation time (1M updates, dry run, error log level):
442         - logging based on basic logger features: 36,2s
443         - logging based on advanced logger features (lazy logging): 21,2s
444         - conditional calling of logger methods enclosed inside condition: 8,6s
445         """
446
447         logger.info("Generator initialisation")
448         logger.info("  Target total number of prefixes to be introduced: " +
449                     str(self.total_prefix_amount))
450         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
451                     str(self.prefix_length_default))
452         logger.info("  My Autonomous System number: " +
453                     str(self.my_autonomous_system_default))
454         logger.info("  My Hold Time: " + str(self.hold_time_default))
455         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
456         logger.info("  Next Hop: " + str(self.next_hop_default))
457         logger.info("  Originator ID: " + str(self.originator_id_default))
458         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
459         logger.info("  Prefix count to be inserted at once: " +
460                     str(self.prefix_count_to_add_default))
461         logger.info("  Prefix count to be withdrawn at once: " +
462                     str(self.prefix_count_to_del_default))
463         logger.info("  Fast pre-fill up to " +
464                     str(self.total_prefix_amount -
465                         self.remaining_prefixes_threshold) + " prefixes")
466         logger.info("  Remaining number of prefixes to be processed " +
467                     "in parallel with withdrawals: " +
468                     str(self.remaining_prefixes_threshold))
469         logger.debug("  Prefix index range used after pre-fill procedure [" +
470                      str(self.randomize_lowest_default) + ", " +
471                      str(self.randomize_highest_default) + "]")
472         if self.single_update_default:
473             logger.info("  Common single UPDATE will be generated " +
474                         "for both NLRI & WITHDRAWN lists")
475         else:
476             logger.info("  Two separate UPDATEs will be generated " +
477                         "for each NLRI & WITHDRAWN lists")
478         if self.randomize_updates_default:
479             logger.info("  Generation of UPDATE messages will be randomized")
480         logger.info("  Let\'s go ...\n")
481
482         # TODO: Notification for hold timer expiration can be handy.
483
484     def store_results(self, file_name=None, threshold=None):
485         """ Stores specified results into files based on file_name value.
486
487         Arguments:
488             :param file_name: Trailing (common) part of result file names
489             :param threshold: Minimum number of sent updates needed for each
490                               result to be included into result csv file
491                               (mainly needed because of the result accuracy)
492         Returns:
493             :return: n/a
494         """
495         # default values handling
496         # TODO optimize default values handling (use e.g. dicionary.update() approach)
497         if file_name is None:
498             file_name = self.results_file_name_default
499         if threshold is None:
500             threshold = self.performance_threshold_default
501         # performance calculation
502         if self.phase1_updates_sent >= threshold:
503             totals1 = self.phase1_updates_sent
504             performance1 = int(self.phase1_updates_sent /
505                                (self.phase1_stop_time - self.phase1_start_time))
506         else:
507             totals1 = None
508             performance1 = None
509         if self.phase2_updates_sent >= threshold:
510             totals2 = self.phase2_updates_sent
511             performance2 = int(self.phase2_updates_sent /
512                                (self.phase2_stop_time - self.phase2_start_time))
513         else:
514             totals2 = None
515             performance2 = None
516
517         logger.info("#" * 10 + " Final results " + "#" * 10)
518         logger.info("Number of iterations: " + str(self.iteration))
519         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
520                     str(self.phase1_updates_sent))
521         logger.info("The pre-fill phase duration: " +
522                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
523         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
524                     str(self.phase2_updates_sent))
525         logger.info("The 2nd test phase duration: " +
526                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
527         logger.info("Threshold for performance reporting: " + str(threshold))
528
529         # making labels
530         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
531                         " route(s) per UPDATE")
532         if self.single_update_default:
533             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
534                                   "/-" + str(self.prefix_count_to_del_default) +
535                                   " routes per UPDATE")
536         else:
537             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
538                                   "/-" + str(self.prefix_count_to_del_default) +
539                                   " routes in two UPDATEs")
540         # collecting capacity and performance results
541         totals = {}
542         performance = {}
543         if totals1 is not None:
544             totals[phase1_label] = totals1
545             performance[phase1_label] = performance1
546         if totals2 is not None:
547             totals[phase2_label] = totals2
548             performance[phase2_label] = performance2
549         self.write_results_to_file(totals, "totals-" + file_name)
550         self.write_results_to_file(performance, "performance-" + file_name)
551
552     def write_results_to_file(self, results, file_name):
553         """Writes results to the csv plot file consumable by Jenkins.
554
555         Arguments:
556             :param file_name: Name of the (csv) file to be created
557         Returns:
558             :return: none
559         """
560         first_line = ""
561         second_line = ""
562         f = open(file_name, "wt")
563         try:
564             for key in sorted(results):
565                 first_line += key + ", "
566                 second_line += str(results[key]) + ", "
567             first_line = first_line[:-2]
568             second_line = second_line[:-2]
569             f.write(first_line + "\n")
570             f.write(second_line + "\n")
571             logger.info("Message generator performance results stored in " +
572                         file_name + ":")
573             logger.info("  " + first_line)
574             logger.info("  " + second_line)
575         finally:
576             f.close()
577
578     # Return pseudo-randomized (reproducible) index for selected range
579     def randomize_index(self, index, lowest=None, highest=None):
580         """Calculates pseudo-randomized index from selected range.
581
582         Arguments:
583             :param index: input index
584             :param lowest: the lowes index from the randomized area
585             :param highest: the highest index from the randomized area
586         Returns:
587             :return: the (pseudo)randomized index
588         Notes:
589             Created just as a fame for future generator enhancement.
590         """
591         # default values handling
592         # TODO optimize default values handling (use e.g. dicionary.update() approach)
593         if lowest is None:
594             lowest = self.randomize_lowest_default
595         if highest is None:
596             highest = self.randomize_highest_default
597         # randomize
598         if (index >= lowest) and (index <= highest):
599             # we are in the randomized range -> shuffle it inside
600             # the range (now just reverse the order)
601             new_index = highest - (index - lowest)
602         else:
603             # we are out of the randomized range -> nothing to do
604             new_index = index
605         return new_index
606
607     def get_ls_nlri_values(self, index):
608         """Generates LS-NLRI parameters.
609         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
610
611         Arguments:
612             :param index: index (iteration)
613         Returns:
614             :return: dictionary of LS NLRI parameters and values
615         """
616         # generating list of LS NLRI parameters
617         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
618         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
619         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
620         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
621         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
622         ls_nlri_values = {"Identifier": identifier,
623                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
624                           "TunnelID": tunnel_id, "LSPID": lsp_id,
625                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
626         return ls_nlri_values
627
628     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
629                         prefix_len=None, prefix_count=None, randomize=None):
630         """Generates list of IP address prefixes.
631
632         Arguments:
633             :param slot_index: index of group of prefix addresses
634             :param slot_size: size of group of prefix addresses
635                 in [number of included prefixes]
636             :param prefix_base: IP address of the first prefix
637                 (slot_index = 0, prefix_index = 0)
638             :param prefix_len: length of the prefix in bites
639                 (the same as size of netmask)
640             :param prefix_count: number of prefixes to be returned
641                 from the specified slot
642         Returns:
643             :return: list of generated IP address prefixes
644         """
645         # default values handling
646         # TODO optimize default values handling (use e.g. dicionary.update() approach)
647         if slot_size is None:
648             slot_size = self.slot_size_default
649         if prefix_base is None:
650             prefix_base = self.prefix_base_default
651         if prefix_len is None:
652             prefix_len = self.prefix_length_default
653         if prefix_count is None:
654             prefix_count = slot_size
655         if randomize is None:
656             randomize = self.randomize_updates_default
657         # generating list of prefixes
658         indexes = []
659         prefixes = []
660         prefix_gap = 2 ** (32 - prefix_len)
661         for i in range(prefix_count):
662             prefix_index = slot_index * slot_size + i
663             if randomize:
664                 prefix_index = self.randomize_index(prefix_index)
665             indexes.append(prefix_index)
666             prefixes.append(prefix_base + prefix_index * prefix_gap)
667         if self.log_debug:
668             logger.debug("  Prefix slot index: " + str(slot_index))
669             logger.debug("  Prefix slot size: " + str(slot_size))
670             logger.debug("  Prefix count: " + str(prefix_count))
671             logger.debug("  Prefix indexes: " + str(indexes))
672             logger.debug("  Prefix list: " + str(prefixes))
673         return prefixes
674
675     def compose_update_message(self, prefix_count_to_add=None,
676                                prefix_count_to_del=None):
677         """Composes an UPDATE message
678
679         Arguments:
680             :param prefix_count_to_add: # of prefixes to put into NLRI list
681             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
682         Returns:
683             :return: encoded UPDATE message in HEX
684         Notes:
685             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
686             lists or common message wich includes both prefix lists.
687             Updates global counters.
688         """
689         # default values handling
690         # TODO optimize default values handling (use e.g. dicionary.update() approach)
691         if prefix_count_to_add is None:
692             prefix_count_to_add = self.prefix_count_to_add_default
693         if prefix_count_to_del is None:
694             prefix_count_to_del = self.prefix_count_to_del_default
695         # logging
696         if self.log_info and not (self.iteration % 1000):
697             logger.info("Iteration: " + str(self.iteration) +
698                         " - total remaining prefixes: " +
699                         str(self.remaining_prefixes))
700         if self.log_debug:
701             logger.debug("#" * 10 + " Iteration: " +
702                          str(self.iteration) + " " + "#" * 10)
703             logger.debug("Remaining prefixes: " +
704                          str(self.remaining_prefixes))
705         # scenario type & one-shot counter
706         straightforward_scenario = (self.remaining_prefixes >
707                                     self.remaining_prefixes_threshold)
708         if straightforward_scenario:
709             prefix_count_to_del = 0
710             if self.log_debug:
711                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
712             if not self.phase1_start_time:
713                 self.phase1_start_time = time.time()
714         else:
715             if self.log_debug:
716                 logger.debug("--- COMBINED SCENARIO ---")
717             if not self.phase2_start_time:
718                 self.phase2_start_time = time.time()
719         # tailor the number of prefixes if needed
720         prefix_count_to_add = (
721             prefix_count_to_del
722             + min(prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes)
723         )
724         # prefix slots selection for insertion and withdrawal
725         slot_index_to_add = self.iteration
726         slot_index_to_del = slot_index_to_add - self.slot_gap_default
727         # getting lists of prefixes for insertion in this iteration
728         if self.log_debug:
729             logger.debug("Prefixes to be inserted in this iteration:")
730         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
731                                                   prefix_count=prefix_count_to_add)
732         # getting lists of prefixes for withdrawal in this iteration
733         if self.log_debug:
734             logger.debug("Prefixes to be withdrawn in this iteration:")
735         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
736                                                   prefix_count=prefix_count_to_del)
737         # generating the UPDATE mesage with LS-NLRI only
738         if self.bgpls:
739             ls_nlri = self.get_ls_nlri_values(self.iteration)
740             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
741                                           **ls_nlri)
742         else:
743             # generating the UPDATE message with prefix lists
744             if self.single_update_default:
745                 # Send prefixes to be introduced and withdrawn
746                 # in one UPDATE message
747                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
748                                               nlri_prefixes=prefix_list_to_add)
749             else:
750                 # Send prefixes to be introduced and withdrawn
751                 # in separate UPDATE messages (if needed)
752                 msg_out = self.update_message(wr_prefixes=[],
753                                               nlri_prefixes=prefix_list_to_add)
754                 if prefix_count_to_del:
755                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
756                                                    nlri_prefixes=[])
757         # updating counters - who knows ... maybe I am last time here ;)
758         if straightforward_scenario:
759             self.phase1_stop_time = time.time()
760             self.phase1_updates_sent = self.updates_sent
761         else:
762             self.phase2_stop_time = time.time()
763             self.phase2_updates_sent = (self.updates_sent -
764                                         self.phase1_updates_sent)
765         # updating totals for the next iteration
766         self.iteration += 1
767         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
768         # returning the encoded message
769         return msg_out
770
771     # Section of message encoders
772
773     def open_message(self, version=None, my_autonomous_system=None,
774                      hold_time=None, bgp_identifier=None):
775         """Generates an OPEN Message (rfc4271#section-4.2)
776
777         Arguments:
778             :param version: see the rfc4271#section-4.2
779             :param my_autonomous_system: see the rfc4271#section-4.2
780             :param hold_time: see the rfc4271#section-4.2
781             :param bgp_identifier: see the rfc4271#section-4.2
782         Returns:
783             :return: encoded OPEN message in HEX
784         """
785
786         # default values handling
787         # TODO optimize default values handling (use e.g. dicionary.update() approach)
788         if version is None:
789             version = self.version_default
790         if my_autonomous_system is None:
791             my_autonomous_system = self.my_autonomous_system_default
792         if hold_time is None:
793             hold_time = self.hold_time_default
794         if bgp_identifier is None:
795             bgp_identifier = self.bgp_identifier_default
796
797         # Marker
798         marker_hex = "\xFF" * 16
799
800         # Type
801         type = 1
802         type_hex = struct.pack("B", type)
803
804         # version
805         version_hex = struct.pack("B", version)
806
807         # my_autonomous_system
808         # AS_TRANS value, 23456 decadic.
809         my_autonomous_system_2_bytes = 23456
810         # AS number is mappable to 2 bytes
811         if my_autonomous_system < 65536:
812             my_autonomous_system_2_bytes = my_autonomous_system
813         my_autonomous_system_hex_2_bytes = struct.pack(">H",
814                                                        my_autonomous_system)
815
816         # Hold Time
817         hold_time_hex = struct.pack(">H", hold_time)
818
819         # BGP Identifier
820         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
821
822         # Optional Parameters
823         optional_parameters_hex = ""
824         if self.rfc4760 or self.allf:
825             optional_parameter_hex = (
826                 "\x02"  # Param type ("Capability Ad")
827                 "\x06"  # Length (6 bytes)
828                 "\x01"  # Capability type (NLRI Unicast),
829                         # see RFC 4760, secton 8
830                 "\x04"  # Capability value length
831                 "\x00\x01"  # AFI (Ipv4)
832                 "\x00"  # (reserved)
833                 "\x01"  # SAFI (Unicast)
834             )
835             optional_parameters_hex += optional_parameter_hex
836
837         if self.ipv6 or self.allf:
838             optional_parameter_hex = (
839                 "\x02"  # Param type ("Capability Ad")
840                 "\x06"  # Length (6 bytes)
841                 "\x01"  # Multiprotocol extetension capability,
842                 "\x04"  # Capability value length
843                 "\x00\x02"  # AFI (IPV6)
844                 "\x00"  # (reserved)
845                 "\x01"  # SAFI (UNICAST)
846             )
847             optional_parameters_hex += optional_parameter_hex
848
849         if self.bgpls or self.allf:
850             optional_parameter_hex = (
851                 "\x02"  # Param type ("Capability Ad")
852                 "\x06"  # Length (6 bytes)
853                 "\x01"  # Capability type (NLRI Unicast),
854                         # see RFC 4760, secton 8
855                 "\x04"  # Capability value length
856                 "\x40\x04"  # AFI (BGP-LS)
857                 "\x00"  # (reserved)
858                 "\x47"  # SAFI (BGP-LS)
859             )
860             optional_parameters_hex += optional_parameter_hex
861
862         if self.evpn or self.allf:
863             optional_parameter_hex = (
864                 "\x02"  # Param type ("Capability Ad")
865                 "\x06"  # Length (6 bytes)
866                 "\x01"  # Multiprotocol extetension capability,
867                 "\x04"  # Capability value length
868                 "\x00\x19"  # AFI (L2-VPN)
869                 "\x00"  # (reserved)
870                 "\x46"  # SAFI (EVPN)
871             )
872             optional_parameters_hex += optional_parameter_hex
873
874         if self.mvpn or self.allf:
875             optional_parameter_hex = (
876                 "\x02"  # Param type ("Capability Ad")
877                 "\x06"  # Length (6 bytes)
878                 "\x01"  # Multiprotocol extetension capability,
879                 "\x04"  # Capability value length
880                 "\x00\x01"  # AFI (IPV4)
881                 "\x00"  # (reserved)
882                 "\x05"  # SAFI (MCAST-VPN)
883             )
884             optional_parameters_hex += optional_parameter_hex
885             optional_parameter_hex = (
886                 "\x02"  # Param type ("Capability Ad")
887                 "\x06"  # Length (6 bytes)
888                 "\x01"  # Multiprotocol extetension capability,
889                 "\x04"  # Capability value length
890                 "\x00\x02"  # AFI (IPV6)
891                 "\x00"  # (reserved)
892                 "\x05"  # SAFI (MCAST-VPN)
893             )
894             optional_parameters_hex += optional_parameter_hex
895
896         if self.l3vpn_mcast or self.allf:
897             optional_parameter_hex = (
898                 "\x02"  # Param type ("Capability Ad")
899                 "\x06"  # Length (6 bytes)
900                 "\x01"  # Multiprotocol extetension capability,
901                 "\x04"  # Capability value length
902                 "\x00\x01"  # AFI (IPV4)
903                 "\x00"  # (reserved)
904                 "\x81"  # SAFI (L3VPN-MCAST)
905             )
906             optional_parameters_hex += optional_parameter_hex
907             optional_parameter_hex = (
908                 "\x02"  # Param type ("Capability Ad")
909                 "\x06"  # Length (6 bytes)
910                 "\x01"  # Multiprotocol extetension capability,
911                 "\x04"  # Capability value length
912                 "\x00\x02"  # AFI (IPV6)
913                 "\x00"  # (reserved)
914                 "\x81"  # SAFI (L3VPN-MCAST)
915             )
916             optional_parameters_hex += optional_parameter_hex
917
918         if self.l3vpn or self.allf:
919             optional_parameter_hex = (
920                 "\x02"  # Param type ("Capability Ad")
921                 "\x06"  # Length (6 bytes)
922                 "\x01"  # Multiprotocol extetension capability,
923                 "\x04"  # Capability value length
924                 "\x00\x01"  # AFI (IPV4)
925                 "\x00"  # (reserved)
926                 "\x80"  # SAFI (L3VPN-UNICAST)
927             )
928             optional_parameters_hex += optional_parameter_hex
929             optional_parameter_hex = (
930                 "\x02"  # Param type ("Capability Ad")
931                 "\x06"  # Length (6 bytes)
932                 "\x01"  # Multiprotocol extetension capability,
933                 "\x04"  # Capability value length
934                 "\x00\x02"  # AFI (IPV6)
935                 "\x00"  # (reserved)
936                 "\x80"  # SAFI (L3VPN-UNICAST)
937             )
938             optional_parameters_hex += optional_parameter_hex
939
940         if self.rt_constrain or self.allf:
941             optional_parameter_hex = (
942                 "\x02"  # Param type ("Capability Ad")
943                 "\x06"  # Length (6 bytes)
944                 "\x01"  # Multiprotocol extetension capability,
945                 "\x04"  # Capability value length
946                 "\x00\x01"  # AFI (IPV4)
947                 "\x00"  # (reserved)
948                 "\x84"  # SAFI (ROUTE-TARGET-CONSTRAIN)
949             )
950             optional_parameters_hex += optional_parameter_hex
951
952         optional_parameter_hex = (
953             "\x02"  # Param type ("Capability Ad")
954             "\x06"  # Length (6 bytes)
955             "\x41"  # "32 bit AS Numbers Support"
956                     # (see RFC 6793, section 3)
957             "\x04"  # Capability value length
958         )
959         optional_parameter_hex += (
960             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
961         )
962         optional_parameters_hex += optional_parameter_hex
963
964         if self.grace != 8:
965             b = list(bin(self.grace)[2:])
966             b = b + [0] * (3 - len(b))
967             length = "\x08"
968             if b[1] == '1':
969                 restart_flag = "\x80\x05"
970             else:
971                 restart_flag = "\x00\x05"
972             if b[2] == '1':
973                 ipv4_flag = "\x80"
974             else:
975                 ipv4_flag = "\x00"
976             if b[0] == '1':
977                 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
978                 length = "\x11"
979             else:
980                 ll_gr = ""
981             logger.debug("Grace parameters list: {}".format(b))
982             # "\x02" Param type ("Capability Ad")
983             # :param length: Length of whole message
984             # "\x40" Graceful-restart capability
985             # "\x06" Length (6 bytes)
986             # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
987             # "\x05" Restart timer (5sec)
988             # "\x00\x01" AFI (IPV4)
989             # "\x01"  SAFI (Unicast)
990             # "\x00"  Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
991             # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
992             # ll-gr turned on when grace is between 4-7
993             optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
994                 length, restart_flag, ipv4_flag, ll_gr)
995             optional_parameters_hex += optional_parameter_hex
996
997         # Optional Parameters Length
998         optional_parameters_length = len(optional_parameters_hex)
999         optional_parameters_length_hex = struct.pack("B",
1000                                                      optional_parameters_length)
1001
1002         # Length (big-endian)
1003         length = (
1004             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
1005             len(my_autonomous_system_hex_2_bytes) +
1006             len(hold_time_hex) + len(bgp_identifier_hex) +
1007             len(optional_parameters_length_hex) +
1008             len(optional_parameters_hex)
1009         )
1010         length_hex = struct.pack(">H", length)
1011
1012         # OPEN Message
1013         message_hex = (
1014             marker_hex +
1015             length_hex +
1016             type_hex +
1017             version_hex +
1018             my_autonomous_system_hex_2_bytes +
1019             hold_time_hex +
1020             bgp_identifier_hex +
1021             optional_parameters_length_hex +
1022             optional_parameters_hex
1023         )
1024
1025         if self.log_debug:
1026             logger.debug("OPEN message encoding")
1027             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1028             logger.debug("  Length=" + str(length) + " (0x" +
1029                          binascii.hexlify(length_hex) + ")")
1030             logger.debug("  Type=" + str(type) + " (0x" +
1031                          binascii.hexlify(type_hex) + ")")
1032             logger.debug("  Version=" + str(version) + " (0x" +
1033                          binascii.hexlify(version_hex) + ")")
1034             logger.debug("  My Autonomous System=" +
1035                          str(my_autonomous_system_2_bytes) + " (0x" +
1036                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
1037                          ")")
1038             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
1039                          binascii.hexlify(hold_time_hex) + ")")
1040             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
1041                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
1042             logger.debug("  Optional Parameters Length=" +
1043                          str(optional_parameters_length) + " (0x" +
1044                          binascii.hexlify(optional_parameters_length_hex) +
1045                          ")")
1046             logger.debug("  Optional Parameters=0x" +
1047                          binascii.hexlify(optional_parameters_hex))
1048             logger.debug("OPEN message encoded: 0x%s",
1049                          binascii.b2a_hex(message_hex))
1050
1051         return message_hex
1052
1053     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
1054                        wr_prefix_length=None, nlri_prefix_length=None,
1055                        my_autonomous_system=None, next_hop=None,
1056                        originator_id=None, cluster_list_item=None,
1057                        end_of_rib=False, **ls_nlri_params):
1058         """Generates an UPDATE Message (rfc4271#section-4.3)
1059
1060         Arguments:
1061             :param wr_prefixes: see the rfc4271#section-4.3
1062             :param nlri_prefixes: see the rfc4271#section-4.3
1063             :param wr_prefix_length: see the rfc4271#section-4.3
1064             :param nlri_prefix_length: see the rfc4271#section-4.3
1065             :param my_autonomous_system: see the rfc4271#section-4.3
1066             :param next_hop: see the rfc4271#section-4.3
1067         Returns:
1068             :return: encoded UPDATE message in HEX
1069         """
1070
1071         # default values handling
1072         # TODO optimize default values handling (use e.g. dicionary.update() approach)
1073         if wr_prefixes is None:
1074             wr_prefixes = self.wr_prefixes_default
1075         if nlri_prefixes is None:
1076             nlri_prefixes = self.nlri_prefixes_default
1077         if wr_prefix_length is None:
1078             wr_prefix_length = self.prefix_length_default
1079         if nlri_prefix_length is None:
1080             nlri_prefix_length = self.prefix_length_default
1081         if my_autonomous_system is None:
1082             my_autonomous_system = self.my_autonomous_system_default
1083         if next_hop is None:
1084             next_hop = self.next_hop_default
1085         if originator_id is None:
1086             originator_id = self.originator_id_default
1087         if cluster_list_item is None:
1088             cluster_list_item = self.cluster_list_item_default
1089         ls_nlri = self.ls_nlri_default.copy()
1090         ls_nlri.update(ls_nlri_params)
1091
1092         # Marker
1093         marker_hex = "\xFF" * 16
1094
1095         # Type
1096         type = 2
1097         type_hex = struct.pack("B", type)
1098
1099         # Withdrawn Routes
1100         withdrawn_routes_hex = ""
1101         if not self.bgpls:
1102             bytes = ((wr_prefix_length - 1) / 8) + 1
1103             for prefix in wr_prefixes:
1104                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1105                                        struct.pack(">I", int(prefix))[:bytes])
1106                 withdrawn_routes_hex += withdrawn_route_hex
1107
1108         # Withdrawn Routes Length
1109         withdrawn_routes_length = len(withdrawn_routes_hex)
1110         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1111
1112         # TODO: to replace hardcoded string by encoding?
1113         # Path Attributes
1114         path_attributes_hex = ""
1115         if not self.skipattr:
1116             path_attributes_hex += (
1117                 "\x40"  # Flags ("Well-Known")
1118                 "\x01"  # Type (ORIGIN)
1119                 "\x01"  # Length (1)
1120                 "\x00"  # Origin: IGP
1121             )
1122             path_attributes_hex += (
1123                 "\x40"  # Flags ("Well-Known")
1124                 "\x02"  # Type (AS_PATH)
1125                 "\x06"  # Length (6)
1126                 "\x02"  # AS segment type (AS_SEQUENCE)
1127                 "\x01"  # AS segment length (1)
1128             )
1129             my_as_hex = struct.pack(">I", my_autonomous_system)
1130             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1131             path_attributes_hex += (
1132                 "\x40"  # Flags ("Well-Known")
1133                 "\x05"  # Type (LOCAL_PREF)
1134                 "\x04"  # Length (4)
1135                 "\x00\x00\x00\x64"  # (100)
1136             )
1137         if nlri_prefixes != []:
1138             path_attributes_hex += (
1139                 "\x40"  # Flags ("Well-Known")
1140                 "\x03"  # Type (NEXT_HOP)
1141                 "\x04"  # Length (4)
1142             )
1143             next_hop_hex = struct.pack(">I", int(next_hop))
1144             path_attributes_hex += (
1145                 next_hop_hex  # IP address of the next hop (4 bytes)
1146             )
1147             if originator_id is not None:
1148                 path_attributes_hex += (
1149                     "\x80"  # Flags ("Optional, non-transitive")
1150                     "\x09"  # Type (ORIGINATOR_ID)
1151                     "\x04"  # Length (4)
1152                 )           # ORIGINATOR_ID (4 bytes)
1153                 path_attributes_hex += struct.pack(">I", int(originator_id))
1154             if cluster_list_item is not None:
1155                 path_attributes_hex += (
1156                     "\x80"  # Flags ("Optional, non-transitive")
1157                     "\x0a"  # Type (CLUSTER_LIST)
1158                     "\x04"  # Length (4)
1159                 )           # one CLUSTER_LIST item (4 bytes)
1160                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1161
1162         if self.bgpls and not end_of_rib:
1163             path_attributes_hex += (
1164                 "\x80"  # Flags ("Optional, non-transitive")
1165                 "\x0e"  # Type (MP_REACH_NLRI)
1166                 "\x22"  # Length (34)
1167                 "\x40\x04"  # AFI (BGP-LS)
1168                 "\x47"  # SAFI (BGP-LS)
1169                 "\x04"  # Next Hop Length (4)
1170             )
1171             path_attributes_hex += struct.pack(">I", int(next_hop))
1172             path_attributes_hex += "\x00"           # Reserved
1173             path_attributes_hex += (
1174                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1175                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1176                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1177             )
1178             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1179             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1180             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1181             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1182             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1183
1184         # Total Path Attributes Length
1185         total_path_attributes_length = len(path_attributes_hex)
1186         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1187
1188         # Network Layer Reachability Information
1189         nlri_hex = ""
1190         if not self.bgpls:
1191             bytes = ((nlri_prefix_length - 1) / 8) + 1
1192             for prefix in nlri_prefixes:
1193                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1194                                    struct.pack(">I", int(prefix))[:bytes])
1195                 nlri_hex += nlri_prefix_hex
1196
1197         # Length (big-endian)
1198         length = (
1199             len(marker_hex) + 2 + len(type_hex) +
1200             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1201             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1202             len(nlri_hex))
1203         length_hex = struct.pack(">H", length)
1204
1205         # UPDATE Message
1206         message_hex = (
1207             marker_hex +
1208             length_hex +
1209             type_hex +
1210             withdrawn_routes_length_hex +
1211             withdrawn_routes_hex +
1212             total_path_attributes_length_hex +
1213             path_attributes_hex +
1214             nlri_hex
1215         )
1216
1217         if self.grace != 8 and self.grace != 0 and end_of_rib:
1218             message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
1219
1220         if self.log_debug:
1221             logger.debug("UPDATE message encoding")
1222             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1223             logger.debug("  Length=" + str(length) + " (0x" +
1224                          binascii.hexlify(length_hex) + ")")
1225             logger.debug("  Type=" + str(type) + " (0x" +
1226                          binascii.hexlify(type_hex) + ")")
1227             logger.debug("  withdrawn_routes_length=" +
1228                          str(withdrawn_routes_length) + " (0x" +
1229                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1230             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1231                          str(wr_prefix_length) + " (0x" +
1232                          binascii.hexlify(withdrawn_routes_hex) + ")")
1233             if total_path_attributes_length:
1234                 logger.debug("  Total Path Attributes Length=" +
1235                              str(total_path_attributes_length) + " (0x" +
1236                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1237                 logger.debug("  Path Attributes=" + "(0x" +
1238                              binascii.hexlify(path_attributes_hex) + ")")
1239                 logger.debug("    Origin=IGP")
1240                 logger.debug("    AS path=" + str(my_autonomous_system))
1241                 logger.debug("    Next hop=" + str(next_hop))
1242                 if originator_id is not None:
1243                     logger.debug("    Originator id=" + str(originator_id))
1244                 if cluster_list_item is not None:
1245                     logger.debug("    Cluster list=" + str(cluster_list_item))
1246                 if self.bgpls:
1247                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1248             logger.debug("  Network Layer Reachability Information=" +
1249                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1250                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1251             logger.debug("UPDATE message encoded: 0x" +
1252                          binascii.b2a_hex(message_hex))
1253
1254         # updating counter
1255         self.updates_sent += 1
1256         # returning encoded message
1257         return message_hex
1258
1259     def notification_message(self, error_code, error_subcode, data_hex=""):
1260         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1261
1262         Arguments:
1263             :param error_code: see the rfc4271#section-4.5
1264             :param error_subcode: see the rfc4271#section-4.5
1265             :param data_hex: see the rfc4271#section-4.5
1266         Returns:
1267             :return: encoded NOTIFICATION message in HEX
1268         """
1269
1270         # Marker
1271         marker_hex = "\xFF" * 16
1272
1273         # Type
1274         type = 3
1275         type_hex = struct.pack("B", type)
1276
1277         # Error Code
1278         error_code_hex = struct.pack("B", error_code)
1279
1280         # Error Subode
1281         error_subcode_hex = struct.pack("B", error_subcode)
1282
1283         # Length (big-endian)
1284         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1285                   len(error_subcode_hex) + len(data_hex))
1286         length_hex = struct.pack(">H", length)
1287
1288         # NOTIFICATION Message
1289         message_hex = (
1290             marker_hex +
1291             length_hex +
1292             type_hex +
1293             error_code_hex +
1294             error_subcode_hex +
1295             data_hex
1296         )
1297
1298         if self.log_debug:
1299             logger.debug("NOTIFICATION message encoding")
1300             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1301             logger.debug("  Length=" + str(length) + " (0x" +
1302                          binascii.hexlify(length_hex) + ")")
1303             logger.debug("  Type=" + str(type) + " (0x" +
1304                          binascii.hexlify(type_hex) + ")")
1305             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1306                          binascii.hexlify(error_code_hex) + ")")
1307             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1308                          binascii.hexlify(error_subcode_hex) + ")")
1309             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1310             logger.debug("NOTIFICATION message encoded: 0x%s",
1311                          binascii.b2a_hex(message_hex))
1312
1313         return message_hex
1314
1315     def keepalive_message(self):
1316         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1317
1318         Returns:
1319             :return: encoded KEEP ALIVE message in HEX
1320         """
1321
1322         # Marker
1323         marker_hex = "\xFF" * 16
1324
1325         # Type
1326         type = 4
1327         type_hex = struct.pack("B", type)
1328
1329         # Length (big-endian)
1330         length = len(marker_hex) + 2 + len(type_hex)
1331         length_hex = struct.pack(">H", length)
1332
1333         # KEEP ALIVE Message
1334         message_hex = (
1335             marker_hex +
1336             length_hex +
1337             type_hex
1338         )
1339
1340         if self.log_debug:
1341             logger.debug("KEEP ALIVE message encoding")
1342             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1343             logger.debug("  Length=" + str(length) + " (0x" +
1344                          binascii.hexlify(length_hex) + ")")
1345             logger.debug("  Type=" + str(type) + " (0x" +
1346                          binascii.hexlify(type_hex) + ")")
1347             logger.debug("KEEP ALIVE message encoded: 0x%s",
1348                          binascii.b2a_hex(message_hex))
1349
1350         return message_hex
1351
1352
1353 class TimeTracker(object):
1354     """Class for tracking timers, both for my keepalives and
1355     peer's hold time.
1356     """
1357
1358     def __init__(self, msg_in):
1359         """Initialisation. based on defaults and OPEN message from peer.
1360
1361         Arguments:
1362             msg_in: the OPEN message received from peer.
1363         """
1364         # Note: Relative time is always named timedelta, to stress that
1365         # the (non-delta) time is absolute.
1366         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1367         # Upper bound for being stuck in the same state, we should
1368         # at least report something before continuing.
1369         # Negotiate the hold timer by taking the smaller
1370         # of the 2 values (mine and the peer's).
1371         hold_timedelta = 180  # Not an attribute of self yet.
1372         # TODO: Make the default value configurable,
1373         # default value could mirror what peer said.
1374         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1375         if hold_timedelta > peer_hold_timedelta:
1376             hold_timedelta = peer_hold_timedelta
1377         if hold_timedelta != 0 and hold_timedelta < 3:
1378             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1379             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1380         self.hold_timedelta = hold_timedelta
1381         # If we do not hear from peer this long, we assume it has died.
1382         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1383         # Upper limit for duration between messages, to avoid being
1384         # declared to be dead.
1385         # The same as calling snapshot(), but also declares a field.
1386         self.snapshot_time = time.time()
1387         # Sometimes we need to store time. This is where to get
1388         # the value from afterwards. Time_keepalive may be too strict.
1389         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1390         # At this time point, peer will be declared dead.
1391         self.my_keepalive_time = None  # to be set later
1392         # At this point, we should be sending keepalive message.
1393
1394     def snapshot(self):
1395         """Store current time in instance data to use later."""
1396         # Read as time before something interesting was called.
1397         self.snapshot_time = time.time()
1398
1399     def reset_peer_hold_time(self):
1400         """Move hold time to future as peer has just proven it still lives."""
1401         self.peer_hold_time = time.time() + self.hold_timedelta
1402
1403     # Some methods could rely on self.snapshot_time, but it is better
1404     # to require user to provide it explicitly.
1405     def reset_my_keepalive_time(self, keepalive_time):
1406         """Calculate and set the next my KEEP ALIVE timeout time
1407
1408         Arguments:
1409             :keepalive_time: the initial value of the KEEP ALIVE timer
1410         """
1411         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1412
1413     def is_time_for_my_keepalive(self):
1414         """Check for my KEEP ALIVE timeout occurence"""
1415         if self.hold_timedelta == 0:
1416             return False
1417         return self.snapshot_time >= self.my_keepalive_time
1418
1419     def get_next_event_time(self):
1420         """Set the time of the next expected or to be sent KEEP ALIVE"""
1421         if self.hold_timedelta == 0:
1422             return self.snapshot_time + 86400
1423         return min(self.my_keepalive_time, self.peer_hold_time)
1424
1425     def check_peer_hold_time(self, snapshot_time):
1426         """Raise error if nothing was read from peer until specified time."""
1427         # Hold time = 0 means keepalive checking off.
1428         if self.hold_timedelta != 0:
1429             # time.time() may be too strict
1430             if snapshot_time > self.peer_hold_time:
1431                 logger.error("Peer has overstepped the hold timer.")
1432                 raise RuntimeError("Peer has overstepped the hold timer.")
1433                 # TODO: Include hold_timedelta?
1434                 # TODO: Add notification sending (attempt). That means
1435                 # move to write tracker.
1436
1437
1438 class ReadTracker(object):
1439     """Class for tracking read of mesages chunk by chunk and
1440     for idle waiting.
1441     """
1442
1443     def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1444                  l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1445                  ipv6=False, grace=8, wait_for_read=10):
1446         """The reader initialisation.
1447
1448         Arguments:
1449             bgp_socket: socket to be used for sending
1450             timer: timer to be used for scheduling
1451             storage: thread safe dict
1452             evpn: flag that evpn functionality is tested
1453             mvpn: flag that mvpn functionality is tested
1454             grace: flag that grace-restart functionality is tested
1455             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1456             l3vpn: flag that l3vpn unicast functionality is tested
1457             rt_constrain: flag that rt-constrain functionality is tested
1458             allf: flag for all family testing.
1459         """
1460         # References to outside objects.
1461         self.socket = bgp_socket
1462         self.timer = timer
1463         # BGP marker length plus length field length.
1464         self.header_length = 18
1465         # TODO: make it class (constant) attribute
1466         # Computation of where next chunk ends depends on whether
1467         # we are beyond length field.
1468         self.reading_header = True
1469         # Countdown towards next size computation.
1470         self.bytes_to_read = self.header_length
1471         # Incremental buffer for message under read.
1472         self.msg_in = ""
1473         # Initialising counters
1474         self.updates_received = 0
1475         self.prefixes_introduced = 0
1476         self.prefixes_withdrawn = 0
1477         self.rx_idle_time = 0
1478         self.rx_activity_detected = True
1479         self.storage = storage
1480         self.evpn = evpn
1481         self.mvpn = mvpn
1482         self.l3vpn_mcast = l3vpn_mcast
1483         self.l3vpn = l3vpn
1484         self.rt_constrain = rt_constrain
1485         self.ipv6 = ipv6
1486         self.allf = allf
1487         self.wfr = wait_for_read
1488         self.grace = grace
1489
1490     def read_message_chunk(self):
1491         """Read up to one message
1492
1493         Note:
1494             Currently it does not return anything.
1495         """
1496         # TODO: We could return the whole message, currently not needed.
1497         # We assume the socket is readable.
1498         chunk_message = self.socket.recv(self.bytes_to_read)
1499         self.msg_in += chunk_message
1500         self.bytes_to_read -= len(chunk_message)
1501         # TODO: bytes_to_read < 0 is not possible, right?
1502         if not self.bytes_to_read:
1503             # Finished reading a logical block.
1504             if self.reading_header:
1505                 # The logical block was a BGP header.
1506                 # Now we know the size of the message.
1507                 self.reading_header = False
1508                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1509                                       self.header_length)
1510             else:  # We have finished reading the body of the message.
1511                 # Peer has just proven it is still alive.
1512                 self.timer.reset_peer_hold_time()
1513                 # TODO: Do we want to count received messages?
1514                 # This version ignores the received message.
1515                 # TODO: Should we do validation and exit on anything
1516                 # besides update or keepalive?
1517                 # Prepare state for reading another message.
1518                 message_type_hex = self.msg_in[self.header_length]
1519                 if message_type_hex == "\x01":
1520                     logger.info("OPEN message received: 0x%s",
1521                                 binascii.b2a_hex(self.msg_in))
1522                 elif message_type_hex == "\x02":
1523                     logger.debug("UPDATE message received: 0x%s",
1524                                  binascii.b2a_hex(self.msg_in))
1525                     self.decode_update_message(self.msg_in)
1526                 elif message_type_hex == "\x03":
1527                     logger.info("NOTIFICATION message received: 0x%s",
1528                                 binascii.b2a_hex(self.msg_in))
1529                 elif message_type_hex == "\x04":
1530                     logger.info("KEEP ALIVE message received: 0x%s",
1531                                 binascii.b2a_hex(self.msg_in))
1532                 else:
1533                     logger.warning("Unexpected message received: 0x%s",
1534                                    binascii.b2a_hex(self.msg_in))
1535                 self.msg_in = ""
1536                 self.reading_header = True
1537                 self.bytes_to_read = self.header_length
1538         # We should not act upon peer_hold_time if we are reading
1539         # something right now.
1540         return
1541
1542     def decode_path_attributes(self, path_attributes_hex):
1543         """Decode the Path Attributes field (rfc4271#section-4.3)
1544
1545         Arguments:
1546             :path_attributes: path_attributes field to be decoded in hex
1547         Returns:
1548             :return: None
1549         """
1550         hex_to_decode = path_attributes_hex
1551
1552         while len(hex_to_decode):
1553             attr_flags_hex = hex_to_decode[0]
1554             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1555 #            attr_optional_bit = attr_flags & 128
1556 #            attr_transitive_bit = attr_flags & 64
1557 #            attr_partial_bit = attr_flags & 32
1558             attr_extended_length_bit = attr_flags & 16
1559
1560             attr_type_code_hex = hex_to_decode[1]
1561             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1562
1563             if attr_extended_length_bit:
1564                 attr_length_hex = hex_to_decode[2:4]
1565                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1566                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1567                 hex_to_decode = hex_to_decode[4 + attr_length:]
1568             else:
1569                 attr_length_hex = hex_to_decode[2]
1570                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1571                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1572                 hex_to_decode = hex_to_decode[3 + attr_length:]
1573
1574             if attr_type_code == 1:
1575                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1576                              binascii.b2a_hex(attr_flags_hex))
1577                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1578             elif attr_type_code == 2:
1579                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1580                              binascii.b2a_hex(attr_flags_hex))
1581                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1582             elif attr_type_code == 3:
1583                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1584                              binascii.b2a_hex(attr_flags_hex))
1585                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1586             elif attr_type_code == 4:
1587                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1588                              binascii.b2a_hex(attr_flags_hex))
1589                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1590             elif attr_type_code == 5:
1591                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1592                              binascii.b2a_hex(attr_flags_hex))
1593                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1594             elif attr_type_code == 6:
1595                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1596                              binascii.b2a_hex(attr_flags_hex))
1597                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1598             elif attr_type_code == 7:
1599                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1600                              binascii.b2a_hex(attr_flags_hex))
1601                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1602             elif attr_type_code == 9:  # rfc4456#section-8
1603                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1604                              binascii.b2a_hex(attr_flags_hex))
1605                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1606             elif attr_type_code == 10:  # rfc4456#section-8
1607                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1608                              binascii.b2a_hex(attr_flags_hex))
1609                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1610             elif attr_type_code == 14:  # rfc4760#section-3
1611                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1612                              binascii.b2a_hex(attr_flags_hex))
1613                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1614                 address_family_identifier_hex = attr_value_hex[0:2]
1615                 logger.debug("  Address Family Identifier=0x%s",
1616                              binascii.b2a_hex(address_family_identifier_hex))
1617                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1618                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1619                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1620                 next_hop_netaddr_len_hex = attr_value_hex[3]
1621                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1622                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1623                              next_hop_netaddr_len,
1624                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1625                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1626                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1627                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1628                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1629                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1630                 logger.debug("  Reserved=0x%s",
1631                              binascii.b2a_hex(reserved_hex))
1632                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1633                 logger.debug("  Network Layer Reachability Information=0x%s",
1634                              binascii.b2a_hex(nlri_hex))
1635                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1636                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1637                 for prefix in nlri_prefix_list:
1638                     logger.debug("  nlri_prefix_received: %s", prefix)
1639                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1640             elif attr_type_code == 15:  # rfc4760#section-4
1641                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1642                              binascii.b2a_hex(attr_flags_hex))
1643                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1644                 address_family_identifier_hex = attr_value_hex[0:2]
1645                 logger.debug("  Address Family Identifier=0x%s",
1646                              binascii.b2a_hex(address_family_identifier_hex))
1647                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1648                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1649                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1650                 wd_hex = attr_value_hex[3:]
1651                 logger.debug("  Withdrawn Routes=0x%s",
1652                              binascii.b2a_hex(wd_hex))
1653                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1654                 logger.debug("  Withdrawn routes prefix list: %s",
1655                              wdr_prefix_list)
1656                 for prefix in wdr_prefix_list:
1657                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1658                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1659             else:
1660                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1661                              binascii.b2a_hex(attr_flags_hex))
1662                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1663         return None
1664
1665     def decode_update_message(self, msg):
1666         """Decode an UPDATE message (rfc4271#section-4.3)
1667
1668         Arguments:
1669             :msg: message to be decoded in hex
1670         Returns:
1671             :return: None
1672         """
1673         logger.debug("Decoding update message:")
1674         # message header - marker
1675         marker_hex = msg[:16]
1676         logger.debug("Message header marker: 0x%s",
1677                      binascii.b2a_hex(marker_hex))
1678         # message header - message length
1679         msg_length_hex = msg[16:18]
1680         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1681         logger.debug("Message lenght: 0x%s (%s)",
1682                      binascii.b2a_hex(msg_length_hex), msg_length)
1683         # message header - message type
1684         msg_type_hex = msg[18:19]
1685         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1686
1687         with self.storage as stor:
1688             # this will replace the previously stored message
1689             stor['update'] = binascii.hexlify(msg)
1690
1691         logger.debug("Evpn {}".format(self.evpn))
1692         if self.evpn:
1693             logger.debug("Skipping update decoding due to evpn data expected")
1694             return
1695
1696         logger.debug("Graceful-restart {}".format(self.grace))
1697         if self.grace != 8:
1698             logger.debug("Skipping update decoding due to graceful-restart data expected")
1699             return
1700
1701         logger.debug("Mvpn {}".format(self.mvpn))
1702         if self.mvpn:
1703             logger.debug("Skipping update decoding due to mvpn data expected")
1704             return
1705
1706         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1707         if self.l3vpn_mcast:
1708             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1709             return
1710
1711         logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1712         if self.l3vpn_mcast:
1713             logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1714             return
1715
1716         logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1717         if self.rt_constrain:
1718             logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1719             return
1720
1721         logger.debug("Ipv6-Unicast {}".format(self.ipv6))
1722         if self.ipv6:
1723             logger.debug("Skipping update decoding due to Ipv6 data expected")
1724             return
1725
1726         logger.debug("Allf {}".format(self.allf))
1727         if self.allf:
1728             logger.debug("Skipping update decoding")
1729             return
1730
1731         if msg_type == 2:
1732             logger.debug("Message type: 0x%s (update)",
1733                          binascii.b2a_hex(msg_type_hex))
1734             # withdrawn routes length
1735             wdr_length_hex = msg[19:21]
1736             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1737             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1738                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1739             # withdrawn routes
1740             wdr_hex = msg[21:21 + wdr_length]
1741             logger.debug("Withdrawn routes: 0x%s",
1742                          binascii.b2a_hex(wdr_hex))
1743             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1744             logger.debug("Withdrawn routes prefix list: %s",
1745                          wdr_prefix_list)
1746             for prefix in wdr_prefix_list:
1747                 logger.debug("withdrawn_prefix_received: %s", prefix)
1748             # total path attribute length
1749             total_pa_length_offset = 21 + wdr_length
1750             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1751             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1752             logger.debug("Total path attribute lenght: 0x%s (%s)",
1753                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1754             # path attributes
1755             pa_offset = total_pa_length_offset + 2
1756             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1757             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1758             self.decode_path_attributes(pa_hex)
1759             # network layer reachability information length
1760             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1761             logger.debug("Calculated NLRI length: %s", nlri_length)
1762             # network layer reachability information
1763             nlri_offset = pa_offset + total_pa_length
1764             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1765             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1766             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1767             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1768             for prefix in nlri_prefix_list:
1769                 logger.debug("nlri_prefix_received: %s", prefix)
1770             # Updating counters
1771             self.updates_received += 1
1772             self.prefixes_introduced += len(nlri_prefix_list)
1773             self.prefixes_withdrawn += len(wdr_prefix_list)
1774         else:
1775             logger.error("Unexpeced message type 0x%s in 0x%s",
1776                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1777
1778     def wait_for_read(self):
1779         """Read message until timeout (next expected event).
1780
1781         Note:
1782             Used when no more updates has to be sent to avoid busy-wait.
1783             Currently it does not return anything.
1784         """
1785         # Compute time to the first predictable state change
1786         event_time = self.timer.get_next_event_time()
1787         # snapshot_time would be imprecise
1788         wait_timedelta = min(event_time - time.time(), self.wfr)
1789         if wait_timedelta < 0:
1790             # The program got around to waiting to an event in "very near
1791             # future" so late that it became a "past" event, thus tell
1792             # "select" to not wait at all. Passing negative timedelta to
1793             # select() would lead to either waiting forever (for -1) or
1794             # select.error("Invalid parameter") (for everything else).
1795             wait_timedelta = 0
1796         # And wait for event or something to read.
1797
1798         if not self.rx_activity_detected or not (self.updates_received % 100):
1799             # right time to write statistics to the log (not for every update and
1800             # not too frequently to avoid having large log files)
1801             logger.info("total_received_update_message_counter: %s",
1802                         self.updates_received)
1803             logger.info("total_received_nlri_prefix_counter: %s",
1804                         self.prefixes_introduced)
1805             logger.info("total_received_withdrawn_prefix_counter: %s",
1806                         self.prefixes_withdrawn)
1807
1808         start_time = time.time()
1809         select.select([self.socket], [], [self.socket], wait_timedelta)
1810         timedelta = time.time() - start_time
1811         self.rx_idle_time += timedelta
1812         self.rx_activity_detected = timedelta < 1
1813
1814         if not self.rx_activity_detected or not (self.updates_received % 100):
1815             # right time to write statistics to the log (not for every update and
1816             # not too frequently to avoid having large log files)
1817             logger.info("... idle for %.3fs", timedelta)
1818             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1819         return
1820
1821
1822 class WriteTracker(object):
1823     """Class tracking enqueueing messages and sending chunks of them."""
1824
1825     def __init__(self, bgp_socket, generator, timer):
1826         """The writter initialisation.
1827
1828         Arguments:
1829             bgp_socket: socket to be used for sending
1830             generator: generator to be used for message generation
1831             timer: timer to be used for scheduling
1832         """
1833         # References to outside objects,
1834         self.socket = bgp_socket
1835         self.generator = generator
1836         self.timer = timer
1837         # Really new fields.
1838         # TODO: Would attribute docstrings add anything substantial?
1839         self.sending_message = False
1840         self.bytes_to_send = 0
1841         self.msg_out = ""
1842
1843     def enqueue_message_for_sending(self, message):
1844         """Enqueue message and change state.
1845
1846         Arguments:
1847             message: message to be enqueued into the msg_out buffer
1848         """
1849         self.msg_out += message
1850         self.bytes_to_send += len(message)
1851         self.sending_message = True
1852
1853     def send_message_chunk_is_whole(self):
1854         """Send enqueued data from msg_out buffer
1855
1856         Returns:
1857             :return: true if no remaining data to send
1858         """
1859         # We assume there is a msg_out to send and socket is writable.
1860         # print "going to send", repr(self.msg_out)
1861         self.timer.snapshot()
1862         bytes_sent = self.socket.send(self.msg_out)
1863         # Forget the part of message that was sent.
1864         self.msg_out = self.msg_out[bytes_sent:]
1865         self.bytes_to_send -= bytes_sent
1866         if not self.bytes_to_send:
1867             # TODO: Is it possible to hit negative bytes_to_send?
1868             self.sending_message = False
1869             # We should have reset hold timer on peer side.
1870             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1871             # The possible reason for not prioritizing reads is gone.
1872             return True
1873         return False
1874
1875
1876 class StateTracker(object):
1877     """Main loop has state so complex it warrants this separate class."""
1878
1879     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1880         """The state tracker initialisation.
1881
1882         Arguments:
1883             bgp_socket: socket to be used for sending / receiving
1884             generator: generator to be used for message generation
1885             timer: timer to be used for scheduling
1886             inqueue: user initiated messages queue
1887             storage: thread safe dict to store data for the rpc server
1888             cliargs: cli args from the user
1889         """
1890         # References to outside objects.
1891         self.socket = bgp_socket
1892         self.generator = generator
1893         self.timer = timer
1894         # Sub-trackers.
1895         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1896                                   l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1897                                   rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
1898                                   wait_for_read=cliargs.wfr)
1899         self.writer = WriteTracker(bgp_socket, generator, timer)
1900         # Prioritization state.
1901         self.prioritize_writing = False
1902         # In general, we prioritize reading over writing. But in order
1903         # not to get blocked by neverending reads, we should
1904         # check whether we are not risking running out of holdtime.
1905         # So in some situations, this field is set to True to attempt
1906         # finishing sending a message, after which this field resets
1907         # back to False.
1908         # TODO: Alternative is to switch fairly between reading and
1909         # writing (called round robin from now on).
1910         # Message counting is done in generator.
1911         self.inqueue = inqueue
1912
1913     def perform_one_loop_iteration(self):
1914         """ The main loop iteration
1915
1916         Notes:
1917             Calculates priority, resolves all conditions, calls
1918             appropriate method and returns to caller to repeat.
1919         """
1920         self.timer.snapshot()
1921         if not self.prioritize_writing:
1922             if self.timer.is_time_for_my_keepalive():
1923                 if not self.writer.sending_message:
1924                     # We need to schedule a keepalive ASAP.
1925                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1926                     logger.info("KEEP ALIVE is sent.")
1927                 # We are sending a message now, so let's prioritize it.
1928                 self.prioritize_writing = True
1929
1930         try:
1931             msg = self.inqueue.get_nowait()
1932             logger.info("Received message: {}".format(msg))
1933             msgbin = binascii.unhexlify(msg)
1934             self.writer.enqueue_message_for_sending(msgbin)
1935         except Queue.Empty:
1936             pass
1937         # Now we know what our priorities are, we have to check
1938         # which actions are available.
1939         # socket.socket() returns three lists,
1940         # we store them to list of lists.
1941         list_list = select.select([self.socket], [self.socket], [self.socket],
1942                                   self.timer.report_timedelta)
1943         read_list, write_list, except_list = list_list
1944         # Lists are unpacked, each is either [] or [self.socket],
1945         # so we will test them as boolean.
1946         if except_list:
1947             logger.error("Exceptional state on the socket.")
1948             raise RuntimeError("Exceptional state on socket", self.socket)
1949         # We will do either read or write.
1950         if not (self.prioritize_writing and write_list):
1951             # Either we have no reason to rush writes,
1952             # or the socket is not writable.
1953             # We are focusing on reading here.
1954             if read_list:  # there is something to read indeed
1955                 # In this case we want to read chunk of message
1956                 # and repeat the select,
1957                 self.reader.read_message_chunk()
1958                 return
1959             # We were focusing on reading, but nothing to read was there.
1960             # Good time to check peer for hold timer.
1961             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1962             # Quiet on the read front, we can have attempt to write.
1963         if write_list:
1964             # Either we really want to reset peer's view of our hold
1965             # timer, or there was nothing to read.
1966             # Were we in the middle of sending a message?
1967             if self.writer.sending_message:
1968                 # Was it the end of a message?
1969                 whole = self.writer.send_message_chunk_is_whole()
1970                 # We were pressed to send something and we did it.
1971                 if self.prioritize_writing and whole:
1972                     # We prioritize reading again.
1973                     self.prioritize_writing = False
1974                 return
1975             # Finally to check if still update messages to be generated.
1976             if self.generator.remaining_prefixes:
1977                 msg_out = self.generator.compose_update_message()
1978                 if not self.generator.remaining_prefixes:
1979                     # We have just finished update generation,
1980                     # end-of-rib is due.
1981                     logger.info("All update messages generated.")
1982                     logger.info("Storing performance results.")
1983                     self.generator.store_results()
1984                     logger.info("Finally an END-OF-RIB is sent.")
1985                     msg_out += self.generator.update_message(wr_prefixes=[],
1986                                                              nlri_prefixes=[],
1987                                                              end_of_rib=True)
1988                 self.writer.enqueue_message_for_sending(msg_out)
1989                 # Attempt for real sending to be done in next iteration.
1990                 return
1991             # Nothing to write anymore.
1992             # To avoid busy loop, we do idle waiting here.
1993             self.reader.wait_for_read()
1994             return
1995         # We can neither read nor write.
1996         logger.warning("Input and output both blocked for " +
1997                        str(self.timer.report_timedelta) + " seconds.")
1998         # FIXME: Are we sure select has been really waiting
1999         # the whole period?
2000         return
2001
2002
2003 def create_logger(loglevel, logfile):
2004     """Create logger object
2005
2006     Arguments:
2007         :loglevel: log level
2008         :logfile: log file name
2009     Returns:
2010         :return: logger object
2011     """
2012     logger = logging.getLogger("logger")
2013     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
2014     console_handler = logging.StreamHandler()
2015     file_handler = logging.FileHandler(logfile, mode="w")
2016     console_handler.setFormatter(log_formatter)
2017     file_handler.setFormatter(log_formatter)
2018     logger.addHandler(console_handler)
2019     logger.addHandler(file_handler)
2020     logger.setLevel(loglevel)
2021     return logger
2022
2023
2024 def job(arguments, inqueue, storage):
2025     """One time initialisation and iterations looping.
2026     Notes:
2027         Establish BGP connection and run iterations.
2028
2029     Arguments:
2030         :arguments: Command line arguments
2031         :inqueue: Data to be sent from play.py
2032         :storage: Shared dict for rpc server
2033     Returns:
2034         :return: None
2035     """
2036     bgp_socket = establish_connection(arguments)
2037     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2038     # Receive open message before sending anything.
2039     # FIXME: Add parameter to send default open message first,
2040     # to work with "you first" peers.
2041     msg_in = read_open_message(bgp_socket)
2042     logger.info(binascii.hexlify(msg_in))
2043     storage['open'] = binascii.hexlify(msg_in)
2044     timer = TimeTracker(msg_in)
2045     generator = MessageGenerator(arguments)
2046     msg_out = generator.open_message()
2047     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2048     # Send our open message to the peer.
2049     bgp_socket.send(msg_out)
2050     # Wait for confirming keepalive.
2051     # TODO: Surely in just one packet?
2052     # Using exact keepalive length to not to see possible updates.
2053     msg_in = bgp_socket.recv(19)
2054     if msg_in != generator.keepalive_message():
2055         error_msg = "Open not confirmed by keepalive, instead got"
2056         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2057         raise MessageError(error_msg, msg_in)
2058     timer.reset_peer_hold_time()
2059     # Send the keepalive to indicate the connection is accepted.
2060     timer.snapshot()  # Remember this time.
2061     msg_out = generator.keepalive_message()
2062     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2063     bgp_socket.send(msg_out)
2064     # Use the remembered time.
2065     timer.reset_my_keepalive_time(timer.snapshot_time)
2066     # End of initial handshake phase.
2067     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2068     while True:  # main reactor loop
2069         state.perform_one_loop_iteration()
2070
2071
2072 class Rpcs:
2073     '''Handler for SimpleXMLRPCServer'''
2074
2075     def __init__(self, sendqueue, storage):
2076         '''Init method
2077
2078         Arguments:
2079             :sendqueue: queue for data to be sent towards odl
2080             :storage: thread safe dict
2081         '''
2082         self.queue = sendqueue
2083         self.storage = storage
2084
2085     def send(self, text):
2086         '''Data to be sent
2087
2088         Arguments:
2089             :text: hes string of the data to be sent
2090         '''
2091         self.queue.put(text)
2092
2093     def get(self, text=''):
2094         '''Reads data form the storage
2095
2096         - returns stored data or an empty string, at the moment only
2097           'update' is stored
2098
2099         Arguments:
2100             :text: a key to the storage to get the data
2101         Returns:
2102             :data: stored data
2103         '''
2104         with self.storage as stor:
2105             return stor.get(text, '')
2106
2107     def clean(self, text=''):
2108         '''Cleans data form the storage
2109
2110         Arguments:
2111             :text: a key to the storage to clean the data
2112         '''
2113         with self.storage as stor:
2114             if text in stor:
2115                 del stor[text]
2116
2117
2118 def threaded_job(arguments):
2119     """Run the job threaded
2120
2121     Arguments:
2122         :arguments: Command line arguments
2123     Returns:
2124         :return: None
2125     """
2126     amount_left = arguments.amount
2127     utils_left = arguments.multiplicity
2128     prefix_current = arguments.firstprefix
2129     myip_current = arguments.myip
2130     port = arguments.port
2131     thread_args = []
2132     rpcqueue = Queue.Queue()
2133     storage = SafeDict()
2134
2135     while 1:
2136         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
2137         amount_left -= amount_per_util
2138         utils_left -= 1
2139
2140         args = deepcopy(arguments)
2141         args.amount = amount_per_util
2142         args.firstprefix = prefix_current
2143         args.myip = myip_current
2144         thread_args.append(args)
2145
2146         if not utils_left:
2147             break
2148         prefix_current += amount_per_util * 16
2149         myip_current += 1
2150
2151     try:
2152         # Create threads
2153         for t in thread_args:
2154             thread.start_new_thread(job, (t, rpcqueue, storage))
2155     except Exception:
2156         print "Error: unable to start thread."
2157         raise SystemExit(2)
2158
2159     if arguments.usepeerip:
2160         ip = arguments.peerip
2161     else:
2162         ip = arguments.myip
2163     rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2164     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2165     rpcserver.serve_forever()
2166
2167
2168 if __name__ == "__main__":
2169     arguments = parse_arguments()
2170     logger = create_logger(arguments.loglevel, arguments.logfile)
2171     threaded_job(arguments)