Add graceful-restart test suite
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Rpc server port."
69     parser.add_argument("--port", default="8000", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be announced in one iteration"
71     parser.add_argument("--insert", default="1", type=int, help=str_help)
72     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74     str_help = "The number of prefixes to process without withdrawals"
75     parser.add_argument("--prefill", default="0", type=int, help=str_help)
76     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77     parser.add_argument("--updates", choices=["single", "separate"],
78                         default=["separate"], help=str_help)
79     str_help = "Base prefix IP address for prefix generation"
80     parser.add_argument("--firstprefix", default="8.0.1.0",
81                         type=ipaddr.IPv4Address, help=str_help)
82     str_help = "The prefix length."
83     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84     str_help = "Listen for connection, instead of initiating it."
85     parser.add_argument("--listen", action="store_true", help=str_help)
86     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87                 "Default value only suitable for listening.")
88     parser.add_argument("--myip", default="0.0.0.0",
89                         type=ipaddr.IPv4Address, help=str_help)
90     str_help = ("TCP port to bind to when listening or initiating connection." +
91                 "Default only suitable for initiating.")
92     parser.add_argument("--myport", default="0", type=int, help=str_help)
93     str_help = "The IP of the next hop to be placed into the update messages."
94     parser.add_argument("--nexthop", default="192.0.2.1",
95                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96     str_help = "Identifier of the route originator."
97     parser.add_argument("--originator", default=None,
98                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
99     str_help = "Cluster list item identifier."
100     parser.add_argument("--cluster", default=None,
101                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102     str_help = ("Numeric IP Address to try to connect to." +
103                 "Currently no effect in listening mode.")
104     parser.add_argument("--peerip", default="127.0.0.2",
105                         type=ipaddr.IPv4Address, help=str_help)
106     str_help = "TCP port to try to connect to. No effect in listening mode."
107     parser.add_argument("--peerport", default="179", type=int, help=str_help)
108     str_help = "Local hold time."
109     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110     str_help = "Log level (--error, --warning, --info, --debug)"
111     parser.add_argument("--error", dest="loglevel", action="store_const",
112                         const=logging.ERROR, default=logging.INFO,
113                         help=str_help)
114     parser.add_argument("--warning", dest="loglevel", action="store_const",
115                         const=logging.WARNING, default=logging.INFO,
116                         help=str_help)
117     parser.add_argument("--info", dest="loglevel", action="store_const",
118                         const=logging.INFO, default=logging.INFO,
119                         help=str_help)
120     parser.add_argument("--debug", dest="loglevel", action="store_const",
121                         const=logging.DEBUG, default=logging.INFO,
122                         help=str_help)
123     str_help = "Log file name"
124     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125     str_help = "Trailing part of the csv result files for plotting purposes"
126     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127     str_help = "Minimum number of updates to reach to include result into csv."
128     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131     str_help = "Using peerip instead of myip for xmlrpc server"
132     parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133     str_help = "Link-State NLRI supported"
134     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135     str_help = "Link-State NLRI: Identifier"
136     parser.add_argument("-lsid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: Tunnel ID"
138     parser.add_argument("-lstid", default="1", type=int, help=str_help)
139     str_help = "Link-State NLRI: LSP ID"
140     parser.add_argument("-lspid", default="1", type=int, help=str_help)
141     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142     parser.add_argument("--lstsaddr", default="1.2.3.4",
143                         type=ipaddr.IPv4Address, help=str_help)
144     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145     parser.add_argument("--lsteaddr", default="5.6.7.8",
146                         type=ipaddr.IPv4Address, help=str_help)
147     str_help = "Link-State NLRI: Identifier Step"
148     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149     str_help = "Link-State NLRI: Tunnel ID Step"
150     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151     str_help = "Link-State NLRI: LSP ID Step"
152     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157     str_help = "How many play utilities are to be started."
158     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160     Enabling this flag makes the script not decoding the update mesage, because of not\
161     supported decoding for these elements."
162     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164     Enabling this flag makes the script not decoding the update mesage, because of not\
165     supported decoding for these elements."
166     parser.add_argument("--grace", default="8", type=int, help=str_help)
167     str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
168     IPV4-Unicast, IPV6-Unicast, BGP-LS\
169     Enabling this flag makes the script not decoding the update mesage, because of not\
170     supported decoding for these elements."
171     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
172     str_help = "Open message includes L3VPN-MULTICAST arguments.\
173     Enabling this flag makes the script not decoding the update mesage, because of not\
174     supported decoding for these elements."
175     parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
176     str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
177     parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
178     str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
179     parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
180     str_help = "Open message includes ipv6-unicast family, without message decoding."
181     parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
182     str_help = "Add all supported families without message decoding."
183     parser.add_argument("--allf", default=False, action="store_true", help=str_help)
184     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
185     str_help = "Skipping well known attributes for update message"
186     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
187     arguments = parser.parse_args()
188     if arguments.multiplicity < 1:
189         print "Multiplicity", arguments.multiplicity, "is not positive."
190         raise SystemExit(1)
191     # TODO: Are sanity checks (such as asnumber>=0) required?
192     return arguments
193
194
195 def establish_connection(arguments):
196     """Establish connection to BGP peer.
197
198     Arguments:
199         :arguments: following command-line arguments are used
200             - arguments.myip: local IP address
201             - arguments.myport: local port
202             - arguments.peerip: remote IP address
203             - arguments.peerport: remote port
204     Returns:
205         :return: socket.
206     """
207     if arguments.listen:
208         logger.info("Connecting in the listening mode.")
209         logger.debug("Local IP address: " + str(arguments.myip))
210         logger.debug("Local port: " + str(arguments.myport))
211         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
213         # bind need single tuple as argument
214         listening_socket.bind((str(arguments.myip), arguments.myport))
215         listening_socket.listen(1)
216         bgp_socket, _ = listening_socket.accept()
217         # TODO: Verify client IP is cotroller IP.
218         listening_socket.close()
219     else:
220         logger.info("Connecting in the talking mode.")
221         logger.debug("Local IP address: " + str(arguments.myip))
222         logger.debug("Local port: " + str(arguments.myport))
223         logger.debug("Remote IP address: " + str(arguments.peerip))
224         logger.debug("Remote port: " + str(arguments.peerport))
225         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227         # bind to force specified address and port
228         talking_socket.bind((str(arguments.myip), arguments.myport))
229         # socket does not spead ipaddr, hence str()
230         talking_socket.connect((str(arguments.peerip), arguments.peerport))
231         bgp_socket = talking_socket
232     logger.info("Connected to ODL.")
233     return bgp_socket
234
235
236 def get_short_int_from_message(message, offset=16):
237     """Extract 2-bytes number from provided message.
238
239     Arguments:
240         :message: given message
241         :offset: offset of the short_int inside the message
242     Returns:
243         :return: required short_inf value.
244     Notes:
245         default offset value is the BGP message size offset.
246     """
247     high_byte_int = ord(message[offset])
248     low_byte_int = ord(message[offset + 1])
249     short_int = high_byte_int * 256 + low_byte_int
250     return short_int
251
252
253 def get_prefix_list_from_hex(prefixes_hex):
254     """Get decoded list of prefixes (rfc4271#section-4.3)
255
256     Arguments:
257         :prefixes_hex: list of prefixes to be decoded in hex
258     Returns:
259         :return: list of prefixes in the form of ip address (X.X.X.X/X)
260     """
261     prefix_list = []
262     offset = 0
263     while offset < len(prefixes_hex):
264         prefix_bit_len_hex = prefixes_hex[offset]
265         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
266         prefix_len = ((prefix_bit_len - 1) / 8) + 1
267         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
268         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
269         offset += 1 + prefix_len
270         prefix_list.append(prefix + "/" + str(prefix_bit_len))
271     return prefix_list
272
273
274 class MessageError(ValueError):
275     """Value error with logging optimized for hexlified messages."""
276
277     def __init__(self, text, message, *args):
278         """Initialisation.
279
280         Store and call super init for textual comment,
281         store raw message which caused it.
282         """
283         self.text = text
284         self.msg = message
285         super(MessageError, self).__init__(text, message, *args)
286
287     def __str__(self):
288         """Generate human readable error message.
289
290         Returns:
291             :return: human readable message as string
292         Notes:
293             Use a placeholder string if the message is to be empty.
294         """
295         message = binascii.hexlify(self.msg)
296         if message == "":
297             message = "(empty message)"
298         return self.text + ": " + message
299
300
301 def read_open_message(bgp_socket):
302     """Receive peer's OPEN message
303
304     Arguments:
305         :bgp_socket: the socket to be read
306     Returns:
307         :return: received OPEN message.
308     Notes:
309         Performs just basic incomming message checks
310     """
311     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
312     # TODO: Can the incoming open message be split in more than one packet?
313     # Some validation.
314     if len(msg_in) < 37:
315         # 37 is minimal length of open message with 4-byte AS number.
316         error_msg = (
317             "Message length (" + str(len(msg_in)) + ") is smaller than "
318             "minimal length of OPEN message with 4-byte AS number (37)"
319         )
320         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
321         raise MessageError(error_msg, msg_in)
322     # TODO: We could check BGP marker, but it is defined only later;
323     # decide what to do.
324     reported_length = get_short_int_from_message(msg_in)
325     if len(msg_in) != reported_length:
326         error_msg = (
327             "Expected message length (" + reported_length +
328             ") does not match actual length (" + str(len(msg_in)) + ")"
329         )
330         logger.error(error_msg + binascii.hexlify(msg_in))
331         raise MessageError(error_msg, msg_in)
332     logger.info("Open message received.")
333     return msg_in
334
335
336 class MessageGenerator(object):
337     """Class which generates messages, holds states and configuration values."""
338
339     # TODO: Define bgp marker as a class (constant) variable.
340     def __init__(self, args):
341         """Initialisation according to command-line args.
342
343         Arguments:
344             :args: argsparser's Namespace object which contains command-line
345                 options for MesageGenerator initialisation
346         Notes:
347             Calculates and stores default values used later on for
348             message generation.
349         """
350         self.total_prefix_amount = args.amount
351         # Number of update messages left to be sent.
352         self.remaining_prefixes = self.total_prefix_amount
353
354         # New parameters initialisation
355         self.port = args.port
356         self.iteration = 0
357         self.prefix_base_default = args.firstprefix
358         self.prefix_length_default = args.prefixlen
359         self.wr_prefixes_default = []
360         self.nlri_prefixes_default = []
361         self.version_default = 4
362         self.my_autonomous_system_default = args.asnumber
363         self.hold_time_default = args.holdtime  # Local hold time.
364         self.bgp_identifier_default = int(args.myip)
365         self.next_hop_default = args.nexthop
366         self.originator_id_default = args.originator
367         self.cluster_list_item_default = args.cluster
368         self.single_update_default = args.updates == "single"
369         self.randomize_updates_default = args.updates == "random"
370         self.prefix_count_to_add_default = args.insert
371         self.prefix_count_to_del_default = args.withdraw
372         if self.prefix_count_to_del_default < 0:
373             self.prefix_count_to_del_default = 0
374         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
375             # total number of prefixes must grow to avoid infinite test loop
376             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
377         self.slot_size_default = self.prefix_count_to_add_default
378         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
379         self.results_file_name_default = args.results
380         self.performance_threshold_default = args.threshold
381         self.rfc4760 = args.rfc4760
382         self.bgpls = args.bgpls
383         self.evpn = args.evpn
384         self.mvpn = args.mvpn
385         self.l3vpn_mcast = args.l3vpn_mcast
386         self.l3vpn = args.l3vpn
387         self.rt_constrain = args.rt_constrain
388         self.ipv6 = args.ipv6
389         self.allf = args.allf
390         self.skipattr = args.skipattr
391         self.grace = args.grace
392         # Default values when BGP-LS Attributes are used
393         if self.bgpls:
394             self.prefix_count_to_add_default = 1
395             self.prefix_count_to_del_default = 0
396         self.ls_nlri_default = {"Identifier": args.lsid,
397                                 "TunnelID": args.lstid,
398                                 "LSPID": args.lspid,
399                                 "IPv4TunnelSenderAddress": args.lstsaddr,
400                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
401         self.lsid_step = args.lsidstep
402         self.lstid_step = args.lstidstep
403         self.lspid_step = args.lspidstep
404         self.lstsaddr_step = args.lstsaddrstep
405         self.lsteaddr_step = args.lsteaddrstep
406         # Default values used for randomized part
407         s1_slots = ((self.total_prefix_amount -
408                      self.remaining_prefixes_threshold - 1) /
409                     self.prefix_count_to_add_default + 1)
410         s2_slots = ((self.remaining_prefixes_threshold - 1) /
411                     (self.prefix_count_to_add_default -
412                      self.prefix_count_to_del_default) + 1)
413         # S1_First_Index = 0
414         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
415         s2_first_index = s1_slots * self.prefix_count_to_add_default
416         s2_last_index = (s2_first_index +
417                          s2_slots * (self.prefix_count_to_add_default -
418                                      self.prefix_count_to_del_default) - 1)
419         self.slot_gap_default = ((self.total_prefix_amount -
420                                   self.remaining_prefixes_threshold - 1) /
421                                  self.prefix_count_to_add_default + 1)
422         self.randomize_lowest_default = s2_first_index
423         self.randomize_highest_default = s2_last_index
424         # Initialising counters
425         self.phase1_start_time = 0
426         self.phase1_stop_time = 0
427         self.phase2_start_time = 0
428         self.phase2_stop_time = 0
429         self.phase1_updates_sent = 0
430         self.phase2_updates_sent = 0
431         self.updates_sent = 0
432
433         self.log_info = args.loglevel <= logging.INFO
434         self.log_debug = args.loglevel <= logging.DEBUG
435         """
436         Flags needed for the MessageGenerator performance optimization.
437         Calling logger methods each iteration even with proper log level set
438         slows down significantly the MessageGenerator performance.
439         Measured total generation time (1M updates, dry run, error log level):
440         - logging based on basic logger features: 36,2s
441         - logging based on advanced logger features (lazy logging): 21,2s
442         - conditional calling of logger methods enclosed inside condition: 8,6s
443         """
444
445         logger.info("Generator initialisation")
446         logger.info("  Target total number of prefixes to be introduced: " +
447                     str(self.total_prefix_amount))
448         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
449                     str(self.prefix_length_default))
450         logger.info("  My Autonomous System number: " +
451                     str(self.my_autonomous_system_default))
452         logger.info("  My Hold Time: " + str(self.hold_time_default))
453         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
454         logger.info("  Next Hop: " + str(self.next_hop_default))
455         logger.info("  Originator ID: " + str(self.originator_id_default))
456         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
457         logger.info("  Prefix count to be inserted at once: " +
458                     str(self.prefix_count_to_add_default))
459         logger.info("  Prefix count to be withdrawn at once: " +
460                     str(self.prefix_count_to_del_default))
461         logger.info("  Fast pre-fill up to " +
462                     str(self.total_prefix_amount -
463                         self.remaining_prefixes_threshold) + " prefixes")
464         logger.info("  Remaining number of prefixes to be processed " +
465                     "in parallel with withdrawals: " +
466                     str(self.remaining_prefixes_threshold))
467         logger.debug("  Prefix index range used after pre-fill procedure [" +
468                      str(self.randomize_lowest_default) + ", " +
469                      str(self.randomize_highest_default) + "]")
470         if self.single_update_default:
471             logger.info("  Common single UPDATE will be generated " +
472                         "for both NLRI & WITHDRAWN lists")
473         else:
474             logger.info("  Two separate UPDATEs will be generated " +
475                         "for each NLRI & WITHDRAWN lists")
476         if self.randomize_updates_default:
477             logger.info("  Generation of UPDATE messages will be randomized")
478         logger.info("  Let\'s go ...\n")
479
480         # TODO: Notification for hold timer expiration can be handy.
481
482     def store_results(self, file_name=None, threshold=None):
483         """ Stores specified results into files based on file_name value.
484
485         Arguments:
486             :param file_name: Trailing (common) part of result file names
487             :param threshold: Minimum number of sent updates needed for each
488                               result to be included into result csv file
489                               (mainly needed because of the result accuracy)
490         Returns:
491             :return: n/a
492         """
493         # default values handling
494         # TODO optimize default values handling (use e.g. dicionary.update() approach)
495         if file_name is None:
496             file_name = self.results_file_name_default
497         if threshold is None:
498             threshold = self.performance_threshold_default
499         # performance calculation
500         if self.phase1_updates_sent >= threshold:
501             totals1 = self.phase1_updates_sent
502             performance1 = int(self.phase1_updates_sent /
503                                (self.phase1_stop_time - self.phase1_start_time))
504         else:
505             totals1 = None
506             performance1 = None
507         if self.phase2_updates_sent >= threshold:
508             totals2 = self.phase2_updates_sent
509             performance2 = int(self.phase2_updates_sent /
510                                (self.phase2_stop_time - self.phase2_start_time))
511         else:
512             totals2 = None
513             performance2 = None
514
515         logger.info("#" * 10 + " Final results " + "#" * 10)
516         logger.info("Number of iterations: " + str(self.iteration))
517         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
518                     str(self.phase1_updates_sent))
519         logger.info("The pre-fill phase duration: " +
520                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
521         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
522                     str(self.phase2_updates_sent))
523         logger.info("The 2nd test phase duration: " +
524                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
525         logger.info("Threshold for performance reporting: " + str(threshold))
526
527         # making labels
528         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
529                         " route(s) per UPDATE")
530         if self.single_update_default:
531             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
532                                   "/-" + str(self.prefix_count_to_del_default) +
533                                   " routes per UPDATE")
534         else:
535             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
536                                   "/-" + str(self.prefix_count_to_del_default) +
537                                   " routes in two UPDATEs")
538         # collecting capacity and performance results
539         totals = {}
540         performance = {}
541         if totals1 is not None:
542             totals[phase1_label] = totals1
543             performance[phase1_label] = performance1
544         if totals2 is not None:
545             totals[phase2_label] = totals2
546             performance[phase2_label] = performance2
547         self.write_results_to_file(totals, "totals-" + file_name)
548         self.write_results_to_file(performance, "performance-" + file_name)
549
550     def write_results_to_file(self, results, file_name):
551         """Writes results to the csv plot file consumable by Jenkins.
552
553         Arguments:
554             :param file_name: Name of the (csv) file to be created
555         Returns:
556             :return: none
557         """
558         first_line = ""
559         second_line = ""
560         f = open(file_name, "wt")
561         try:
562             for key in sorted(results):
563                 first_line += key + ", "
564                 second_line += str(results[key]) + ", "
565             first_line = first_line[:-2]
566             second_line = second_line[:-2]
567             f.write(first_line + "\n")
568             f.write(second_line + "\n")
569             logger.info("Message generator performance results stored in " +
570                         file_name + ":")
571             logger.info("  " + first_line)
572             logger.info("  " + second_line)
573         finally:
574             f.close()
575
576     # Return pseudo-randomized (reproducible) index for selected range
577     def randomize_index(self, index, lowest=None, highest=None):
578         """Calculates pseudo-randomized index from selected range.
579
580         Arguments:
581             :param index: input index
582             :param lowest: the lowes index from the randomized area
583             :param highest: the highest index from the randomized area
584         Returns:
585             :return: the (pseudo)randomized index
586         Notes:
587             Created just as a fame for future generator enhancement.
588         """
589         # default values handling
590         # TODO optimize default values handling (use e.g. dicionary.update() approach)
591         if lowest is None:
592             lowest = self.randomize_lowest_default
593         if highest is None:
594             highest = self.randomize_highest_default
595         # randomize
596         if (index >= lowest) and (index <= highest):
597             # we are in the randomized range -> shuffle it inside
598             # the range (now just reverse the order)
599             new_index = highest - (index - lowest)
600         else:
601             # we are out of the randomized range -> nothing to do
602             new_index = index
603         return new_index
604
605     def get_ls_nlri_values(self, index):
606         """Generates LS-NLRI parameters.
607         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
608
609         Arguments:
610             :param index: index (iteration)
611         Returns:
612             :return: dictionary of LS NLRI parameters and values
613         """
614         # generating list of LS NLRI parameters
615         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
616         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
617         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
618         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
619         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
620         ls_nlri_values = {"Identifier": identifier,
621                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
622                           "TunnelID": tunnel_id, "LSPID": lsp_id,
623                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
624         return ls_nlri_values
625
626     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
627                         prefix_len=None, prefix_count=None, randomize=None):
628         """Generates list of IP address prefixes.
629
630         Arguments:
631             :param slot_index: index of group of prefix addresses
632             :param slot_size: size of group of prefix addresses
633                 in [number of included prefixes]
634             :param prefix_base: IP address of the first prefix
635                 (slot_index = 0, prefix_index = 0)
636             :param prefix_len: length of the prefix in bites
637                 (the same as size of netmask)
638             :param prefix_count: number of prefixes to be returned
639                 from the specified slot
640         Returns:
641             :return: list of generated IP address prefixes
642         """
643         # default values handling
644         # TODO optimize default values handling (use e.g. dicionary.update() approach)
645         if slot_size is None:
646             slot_size = self.slot_size_default
647         if prefix_base is None:
648             prefix_base = self.prefix_base_default
649         if prefix_len is None:
650             prefix_len = self.prefix_length_default
651         if prefix_count is None:
652             prefix_count = slot_size
653         if randomize is None:
654             randomize = self.randomize_updates_default
655         # generating list of prefixes
656         indexes = []
657         prefixes = []
658         prefix_gap = 2 ** (32 - prefix_len)
659         for i in range(prefix_count):
660             prefix_index = slot_index * slot_size + i
661             if randomize:
662                 prefix_index = self.randomize_index(prefix_index)
663             indexes.append(prefix_index)
664             prefixes.append(prefix_base + prefix_index * prefix_gap)
665         if self.log_debug:
666             logger.debug("  Prefix slot index: " + str(slot_index))
667             logger.debug("  Prefix slot size: " + str(slot_size))
668             logger.debug("  Prefix count: " + str(prefix_count))
669             logger.debug("  Prefix indexes: " + str(indexes))
670             logger.debug("  Prefix list: " + str(prefixes))
671         return prefixes
672
673     def compose_update_message(self, prefix_count_to_add=None,
674                                prefix_count_to_del=None):
675         """Composes an UPDATE message
676
677         Arguments:
678             :param prefix_count_to_add: # of prefixes to put into NLRI list
679             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
680         Returns:
681             :return: encoded UPDATE message in HEX
682         Notes:
683             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
684             lists or common message wich includes both prefix lists.
685             Updates global counters.
686         """
687         # default values handling
688         # TODO optimize default values handling (use e.g. dicionary.update() approach)
689         if prefix_count_to_add is None:
690             prefix_count_to_add = self.prefix_count_to_add_default
691         if prefix_count_to_del is None:
692             prefix_count_to_del = self.prefix_count_to_del_default
693         # logging
694         if self.log_info and not (self.iteration % 1000):
695             logger.info("Iteration: " + str(self.iteration) +
696                         " - total remaining prefixes: " +
697                         str(self.remaining_prefixes))
698         if self.log_debug:
699             logger.debug("#" * 10 + " Iteration: " +
700                          str(self.iteration) + " " + "#" * 10)
701             logger.debug("Remaining prefixes: " +
702                          str(self.remaining_prefixes))
703         # scenario type & one-shot counter
704         straightforward_scenario = (self.remaining_prefixes >
705                                     self.remaining_prefixes_threshold)
706         if straightforward_scenario:
707             prefix_count_to_del = 0
708             if self.log_debug:
709                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
710             if not self.phase1_start_time:
711                 self.phase1_start_time = time.time()
712         else:
713             if self.log_debug:
714                 logger.debug("--- COMBINED SCENARIO ---")
715             if not self.phase2_start_time:
716                 self.phase2_start_time = time.time()
717         # tailor the number of prefixes if needed
718         prefix_count_to_add = (prefix_count_to_del +
719                                min(prefix_count_to_add - prefix_count_to_del,
720                                    self.remaining_prefixes))
721         # prefix slots selection for insertion and withdrawal
722         slot_index_to_add = self.iteration
723         slot_index_to_del = slot_index_to_add - self.slot_gap_default
724         # getting lists of prefixes for insertion in this iteration
725         if self.log_debug:
726             logger.debug("Prefixes to be inserted in this iteration:")
727         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
728                                                   prefix_count=prefix_count_to_add)
729         # getting lists of prefixes for withdrawal in this iteration
730         if self.log_debug:
731             logger.debug("Prefixes to be withdrawn in this iteration:")
732         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
733                                                   prefix_count=prefix_count_to_del)
734         # generating the UPDATE mesage with LS-NLRI only
735         if self.bgpls:
736             ls_nlri = self.get_ls_nlri_values(self.iteration)
737             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
738                                           **ls_nlri)
739         else:
740             # generating the UPDATE message with prefix lists
741             if self.single_update_default:
742                 # Send prefixes to be introduced and withdrawn
743                 # in one UPDATE message
744                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
745                                               nlri_prefixes=prefix_list_to_add)
746             else:
747                 # Send prefixes to be introduced and withdrawn
748                 # in separate UPDATE messages (if needed)
749                 msg_out = self.update_message(wr_prefixes=[],
750                                               nlri_prefixes=prefix_list_to_add)
751                 if prefix_count_to_del:
752                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
753                                                    nlri_prefixes=[])
754         # updating counters - who knows ... maybe I am last time here ;)
755         if straightforward_scenario:
756             self.phase1_stop_time = time.time()
757             self.phase1_updates_sent = self.updates_sent
758         else:
759             self.phase2_stop_time = time.time()
760             self.phase2_updates_sent = (self.updates_sent -
761                                         self.phase1_updates_sent)
762         # updating totals for the next iteration
763         self.iteration += 1
764         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
765         # returning the encoded message
766         return msg_out
767
768     # Section of message encoders
769
770     def open_message(self, version=None, my_autonomous_system=None,
771                      hold_time=None, bgp_identifier=None):
772         """Generates an OPEN Message (rfc4271#section-4.2)
773
774         Arguments:
775             :param version: see the rfc4271#section-4.2
776             :param my_autonomous_system: see the rfc4271#section-4.2
777             :param hold_time: see the rfc4271#section-4.2
778             :param bgp_identifier: see the rfc4271#section-4.2
779         Returns:
780             :return: encoded OPEN message in HEX
781         """
782
783         # default values handling
784         # TODO optimize default values handling (use e.g. dicionary.update() approach)
785         if version is None:
786             version = self.version_default
787         if my_autonomous_system is None:
788             my_autonomous_system = self.my_autonomous_system_default
789         if hold_time is None:
790             hold_time = self.hold_time_default
791         if bgp_identifier is None:
792             bgp_identifier = self.bgp_identifier_default
793
794         # Marker
795         marker_hex = "\xFF" * 16
796
797         # Type
798         type = 1
799         type_hex = struct.pack("B", type)
800
801         # version
802         version_hex = struct.pack("B", version)
803
804         # my_autonomous_system
805         # AS_TRANS value, 23456 decadic.
806         my_autonomous_system_2_bytes = 23456
807         # AS number is mappable to 2 bytes
808         if my_autonomous_system < 65536:
809             my_autonomous_system_2_bytes = my_autonomous_system
810         my_autonomous_system_hex_2_bytes = struct.pack(">H",
811                                                        my_autonomous_system)
812
813         # Hold Time
814         hold_time_hex = struct.pack(">H", hold_time)
815
816         # BGP Identifier
817         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
818
819         # Optional Parameters
820         optional_parameters_hex = ""
821         if self.rfc4760 or self.allf:
822             optional_parameter_hex = (
823                 "\x02"  # Param type ("Capability Ad")
824                 "\x06"  # Length (6 bytes)
825                 "\x01"  # Capability type (NLRI Unicast),
826                         # see RFC 4760, secton 8
827                 "\x04"  # Capability value length
828                 "\x00\x01"  # AFI (Ipv4)
829                 "\x00"  # (reserved)
830                 "\x01"  # SAFI (Unicast)
831             )
832             optional_parameters_hex += optional_parameter_hex
833
834         if self.ipv6 or self.allf:
835             optional_parameter_hex = (
836                 "\x02"  # Param type ("Capability Ad")
837                 "\x06"  # Length (6 bytes)
838                 "\x01"  # Multiprotocol extetension capability,
839                 "\x04"  # Capability value length
840                 "\x00\x02"  # AFI (IPV6)
841                 "\x00"  # (reserved)
842                 "\x01"  # SAFI (UNICAST)
843             )
844             optional_parameters_hex += optional_parameter_hex
845
846         if self.bgpls or self.allf:
847             optional_parameter_hex = (
848                 "\x02"  # Param type ("Capability Ad")
849                 "\x06"  # Length (6 bytes)
850                 "\x01"  # Capability type (NLRI Unicast),
851                         # see RFC 4760, secton 8
852                 "\x04"  # Capability value length
853                 "\x40\x04"  # AFI (BGP-LS)
854                 "\x00"  # (reserved)
855                 "\x47"  # SAFI (BGP-LS)
856             )
857             optional_parameters_hex += optional_parameter_hex
858
859         if self.evpn or self.allf:
860             optional_parameter_hex = (
861                 "\x02"  # Param type ("Capability Ad")
862                 "\x06"  # Length (6 bytes)
863                 "\x01"  # Multiprotocol extetension capability,
864                 "\x04"  # Capability value length
865                 "\x00\x19"  # AFI (L2-VPN)
866                 "\x00"  # (reserved)
867                 "\x46"  # SAFI (EVPN)
868             )
869             optional_parameters_hex += optional_parameter_hex
870
871         if self.mvpn or self.allf:
872             optional_parameter_hex = (
873                 "\x02"  # Param type ("Capability Ad")
874                 "\x06"  # Length (6 bytes)
875                 "\x01"  # Multiprotocol extetension capability,
876                 "\x04"  # Capability value length
877                 "\x00\x01"  # AFI (IPV4)
878                 "\x00"  # (reserved)
879                 "\x05"  # SAFI (MCAST-VPN)
880             )
881             optional_parameters_hex += optional_parameter_hex
882             optional_parameter_hex = (
883                 "\x02"  # Param type ("Capability Ad")
884                 "\x06"  # Length (6 bytes)
885                 "\x01"  # Multiprotocol extetension capability,
886                 "\x04"  # Capability value length
887                 "\x00\x02"  # AFI (IPV6)
888                 "\x00"  # (reserved)
889                 "\x05"  # SAFI (MCAST-VPN)
890             )
891             optional_parameters_hex += optional_parameter_hex
892
893         if self.l3vpn_mcast or self.allf:
894             optional_parameter_hex = (
895                 "\x02"  # Param type ("Capability Ad")
896                 "\x06"  # Length (6 bytes)
897                 "\x01"  # Multiprotocol extetension capability,
898                 "\x04"  # Capability value length
899                 "\x00\x01"  # AFI (IPV4)
900                 "\x00"  # (reserved)
901                 "\x81"  # SAFI (L3VPN-MCAST)
902             )
903             optional_parameters_hex += optional_parameter_hex
904             optional_parameter_hex = (
905                 "\x02"  # Param type ("Capability Ad")
906                 "\x06"  # Length (6 bytes)
907                 "\x01"  # Multiprotocol extetension capability,
908                 "\x04"  # Capability value length
909                 "\x00\x02"  # AFI (IPV6)
910                 "\x00"  # (reserved)
911                 "\x81"  # SAFI (L3VPN-MCAST)
912             )
913             optional_parameters_hex += optional_parameter_hex
914
915         if self.l3vpn or self.allf:
916             optional_parameter_hex = (
917                 "\x02"  # Param type ("Capability Ad")
918                 "\x06"  # Length (6 bytes)
919                 "\x01"  # Multiprotocol extetension capability,
920                 "\x04"  # Capability value length
921                 "\x00\x01"  # AFI (IPV4)
922                 "\x00"  # (reserved)
923                 "\x80"  # SAFI (L3VPN-UNICAST)
924             )
925             optional_parameters_hex += optional_parameter_hex
926             optional_parameter_hex = (
927                 "\x02"  # Param type ("Capability Ad")
928                 "\x06"  # Length (6 bytes)
929                 "\x01"  # Multiprotocol extetension capability,
930                 "\x04"  # Capability value length
931                 "\x00\x02"  # AFI (IPV6)
932                 "\x00"  # (reserved)
933                 "\x80"  # SAFI (L3VPN-UNICAST)
934             )
935             optional_parameters_hex += optional_parameter_hex
936
937         if self.rt_constrain or self.allf:
938             optional_parameter_hex = (
939                 "\x02"  # Param type ("Capability Ad")
940                 "\x06"  # Length (6 bytes)
941                 "\x01"  # Multiprotocol extetension capability,
942                 "\x04"  # Capability value length
943                 "\x00\x01"  # AFI (IPV4)
944                 "\x00"  # (reserved)
945                 "\x84"  # SAFI (ROUTE-TARGET-CONSTRAIN)
946             )
947             optional_parameters_hex += optional_parameter_hex
948
949         optional_parameter_hex = (
950             "\x02"  # Param type ("Capability Ad")
951             "\x06"  # Length (6 bytes)
952             "\x41"  # "32 bit AS Numbers Support"
953                     # (see RFC 6793, section 3)
954             "\x04"  # Capability value length
955         )
956         optional_parameter_hex += (
957             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
958         )
959         optional_parameters_hex += optional_parameter_hex
960
961         if self.grace != 8:
962             b = list(bin(self.grace)[2:])
963             b = b + [0] * (3 - len(b))
964             length = "\x08"
965             if b[1] == '1':
966                 restart_flag = "\x80\x05"
967             else:
968                 restart_flag = "\x00\x05"
969             if b[2] == '1':
970                 ipv4_flag = "\x80"
971             else:
972                 ipv4_flag = "\x00"
973             if b[0] == '1':
974                 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
975                 length = "\x11"
976             else:
977                 ll_gr = ""
978             logger.debug("Grace parameters list: {}".format(b))
979             # "\x02" Param type ("Capability Ad")
980             # :param length: Length of whole message
981             # "\x40" Graceful-restart capability
982             # "\x06" Length (6 bytes)
983             # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
984             # "\x05" Restart timer (5sec)
985             # "\x00\x01" AFI (IPV4)
986             # "\x01"  SAFI (Unicast)
987             # "\x00"  Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
988             # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
989             # ll-gr turned on when grace is between 4-7
990             optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
991                 length, restart_flag, ipv4_flag, ll_gr)
992             optional_parameters_hex += optional_parameter_hex
993
994         # Optional Parameters Length
995         optional_parameters_length = len(optional_parameters_hex)
996         optional_parameters_length_hex = struct.pack("B",
997                                                      optional_parameters_length)
998
999         # Length (big-endian)
1000         length = (
1001             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
1002             len(my_autonomous_system_hex_2_bytes) +
1003             len(hold_time_hex) + len(bgp_identifier_hex) +
1004             len(optional_parameters_length_hex) +
1005             len(optional_parameters_hex)
1006         )
1007         length_hex = struct.pack(">H", length)
1008
1009         # OPEN Message
1010         message_hex = (
1011             marker_hex +
1012             length_hex +
1013             type_hex +
1014             version_hex +
1015             my_autonomous_system_hex_2_bytes +
1016             hold_time_hex +
1017             bgp_identifier_hex +
1018             optional_parameters_length_hex +
1019             optional_parameters_hex
1020         )
1021
1022         if self.log_debug:
1023             logger.debug("OPEN message encoding")
1024             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1025             logger.debug("  Length=" + str(length) + " (0x" +
1026                          binascii.hexlify(length_hex) + ")")
1027             logger.debug("  Type=" + str(type) + " (0x" +
1028                          binascii.hexlify(type_hex) + ")")
1029             logger.debug("  Version=" + str(version) + " (0x" +
1030                          binascii.hexlify(version_hex) + ")")
1031             logger.debug("  My Autonomous System=" +
1032                          str(my_autonomous_system_2_bytes) + " (0x" +
1033                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
1034                          ")")
1035             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
1036                          binascii.hexlify(hold_time_hex) + ")")
1037             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
1038                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
1039             logger.debug("  Optional Parameters Length=" +
1040                          str(optional_parameters_length) + " (0x" +
1041                          binascii.hexlify(optional_parameters_length_hex) +
1042                          ")")
1043             logger.debug("  Optional Parameters=0x" +
1044                          binascii.hexlify(optional_parameters_hex))
1045             logger.debug("OPEN message encoded: 0x%s",
1046                          binascii.b2a_hex(message_hex))
1047
1048         return message_hex
1049
1050     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
1051                        wr_prefix_length=None, nlri_prefix_length=None,
1052                        my_autonomous_system=None, next_hop=None,
1053                        originator_id=None, cluster_list_item=None,
1054                        end_of_rib=False, **ls_nlri_params):
1055         """Generates an UPDATE Message (rfc4271#section-4.3)
1056
1057         Arguments:
1058             :param wr_prefixes: see the rfc4271#section-4.3
1059             :param nlri_prefixes: see the rfc4271#section-4.3
1060             :param wr_prefix_length: see the rfc4271#section-4.3
1061             :param nlri_prefix_length: see the rfc4271#section-4.3
1062             :param my_autonomous_system: see the rfc4271#section-4.3
1063             :param next_hop: see the rfc4271#section-4.3
1064         Returns:
1065             :return: encoded UPDATE message in HEX
1066         """
1067
1068         # default values handling
1069         # TODO optimize default values handling (use e.g. dicionary.update() approach)
1070         if wr_prefixes is None:
1071             wr_prefixes = self.wr_prefixes_default
1072         if nlri_prefixes is None:
1073             nlri_prefixes = self.nlri_prefixes_default
1074         if wr_prefix_length is None:
1075             wr_prefix_length = self.prefix_length_default
1076         if nlri_prefix_length is None:
1077             nlri_prefix_length = self.prefix_length_default
1078         if my_autonomous_system is None:
1079             my_autonomous_system = self.my_autonomous_system_default
1080         if next_hop is None:
1081             next_hop = self.next_hop_default
1082         if originator_id is None:
1083             originator_id = self.originator_id_default
1084         if cluster_list_item is None:
1085             cluster_list_item = self.cluster_list_item_default
1086         ls_nlri = self.ls_nlri_default.copy()
1087         ls_nlri.update(ls_nlri_params)
1088
1089         # Marker
1090         marker_hex = "\xFF" * 16
1091
1092         # Type
1093         type = 2
1094         type_hex = struct.pack("B", type)
1095
1096         # Withdrawn Routes
1097         withdrawn_routes_hex = ""
1098         if not self.bgpls:
1099             bytes = ((wr_prefix_length - 1) / 8) + 1
1100             for prefix in wr_prefixes:
1101                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1102                                        struct.pack(">I", int(prefix))[:bytes])
1103                 withdrawn_routes_hex += withdrawn_route_hex
1104
1105         # Withdrawn Routes Length
1106         withdrawn_routes_length = len(withdrawn_routes_hex)
1107         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1108
1109         # TODO: to replace hardcoded string by encoding?
1110         # Path Attributes
1111         path_attributes_hex = ""
1112         if not self.skipattr:
1113             path_attributes_hex += (
1114                 "\x40"  # Flags ("Well-Known")
1115                 "\x01"  # Type (ORIGIN)
1116                 "\x01"  # Length (1)
1117                 "\x00"  # Origin: IGP
1118             )
1119             path_attributes_hex += (
1120                 "\x40"  # Flags ("Well-Known")
1121                 "\x02"  # Type (AS_PATH)
1122                 "\x06"  # Length (6)
1123                 "\x02"  # AS segment type (AS_SEQUENCE)
1124                 "\x01"  # AS segment length (1)
1125             )
1126             my_as_hex = struct.pack(">I", my_autonomous_system)
1127             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1128             path_attributes_hex += (
1129                 "\x40"  # Flags ("Well-Known")
1130                 "\x05"  # Type (LOCAL_PREF)
1131                 "\x04"  # Length (4)
1132                 "\x00\x00\x00\x64"  # (100)
1133             )
1134         if nlri_prefixes != []:
1135             path_attributes_hex += (
1136                 "\x40"  # Flags ("Well-Known")
1137                 "\x03"  # Type (NEXT_HOP)
1138                 "\x04"  # Length (4)
1139             )
1140             next_hop_hex = struct.pack(">I", int(next_hop))
1141             path_attributes_hex += (
1142                 next_hop_hex  # IP address of the next hop (4 bytes)
1143             )
1144             if originator_id is not None:
1145                 path_attributes_hex += (
1146                     "\x80"  # Flags ("Optional, non-transitive")
1147                     "\x09"  # Type (ORIGINATOR_ID)
1148                     "\x04"  # Length (4)
1149                 )           # ORIGINATOR_ID (4 bytes)
1150                 path_attributes_hex += struct.pack(">I", int(originator_id))
1151             if cluster_list_item is not None:
1152                 path_attributes_hex += (
1153                     "\x80"  # Flags ("Optional, non-transitive")
1154                     "\x0a"  # Type (CLUSTER_LIST)
1155                     "\x04"  # Length (4)
1156                 )           # one CLUSTER_LIST item (4 bytes)
1157                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1158
1159         if self.bgpls and not end_of_rib:
1160             path_attributes_hex += (
1161                 "\x80"  # Flags ("Optional, non-transitive")
1162                 "\x0e"  # Type (MP_REACH_NLRI)
1163                 "\x22"  # Length (34)
1164                 "\x40\x04"  # AFI (BGP-LS)
1165                 "\x47"  # SAFI (BGP-LS)
1166                 "\x04"  # Next Hop Length (4)
1167             )
1168             path_attributes_hex += struct.pack(">I", int(next_hop))
1169             path_attributes_hex += "\x00"           # Reserved
1170             path_attributes_hex += (
1171                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1172                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1173                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1174             )
1175             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1176             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1177             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1178             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1179             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1180
1181         # Total Path Attributes Length
1182         total_path_attributes_length = len(path_attributes_hex)
1183         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1184
1185         # Network Layer Reachability Information
1186         nlri_hex = ""
1187         if not self.bgpls:
1188             bytes = ((nlri_prefix_length - 1) / 8) + 1
1189             for prefix in nlri_prefixes:
1190                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1191                                    struct.pack(">I", int(prefix))[:bytes])
1192                 nlri_hex += nlri_prefix_hex
1193
1194         # Length (big-endian)
1195         length = (
1196             len(marker_hex) + 2 + len(type_hex) +
1197             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1198             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1199             len(nlri_hex))
1200         length_hex = struct.pack(">H", length)
1201
1202         # UPDATE Message
1203         message_hex = (
1204             marker_hex +
1205             length_hex +
1206             type_hex +
1207             withdrawn_routes_length_hex +
1208             withdrawn_routes_hex +
1209             total_path_attributes_length_hex +
1210             path_attributes_hex +
1211             nlri_hex
1212         )
1213
1214         if self.grace != 8 and self.grace != 0 and end_of_rib:
1215             message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
1216
1217         if self.log_debug:
1218             logger.debug("UPDATE message encoding")
1219             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1220             logger.debug("  Length=" + str(length) + " (0x" +
1221                          binascii.hexlify(length_hex) + ")")
1222             logger.debug("  Type=" + str(type) + " (0x" +
1223                          binascii.hexlify(type_hex) + ")")
1224             logger.debug("  withdrawn_routes_length=" +
1225                          str(withdrawn_routes_length) + " (0x" +
1226                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1227             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1228                          str(wr_prefix_length) + " (0x" +
1229                          binascii.hexlify(withdrawn_routes_hex) + ")")
1230             if total_path_attributes_length:
1231                 logger.debug("  Total Path Attributes Length=" +
1232                              str(total_path_attributes_length) + " (0x" +
1233                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1234                 logger.debug("  Path Attributes=" + "(0x" +
1235                              binascii.hexlify(path_attributes_hex) + ")")
1236                 logger.debug("    Origin=IGP")
1237                 logger.debug("    AS path=" + str(my_autonomous_system))
1238                 logger.debug("    Next hop=" + str(next_hop))
1239                 if originator_id is not None:
1240                     logger.debug("    Originator id=" + str(originator_id))
1241                 if cluster_list_item is not None:
1242                     logger.debug("    Cluster list=" + str(cluster_list_item))
1243                 if self.bgpls:
1244                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1245             logger.debug("  Network Layer Reachability Information=" +
1246                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1247                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1248             logger.debug("UPDATE message encoded: 0x" +
1249                          binascii.b2a_hex(message_hex))
1250
1251         # updating counter
1252         self.updates_sent += 1
1253         # returning encoded message
1254         return message_hex
1255
1256     def notification_message(self, error_code, error_subcode, data_hex=""):
1257         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1258
1259         Arguments:
1260             :param error_code: see the rfc4271#section-4.5
1261             :param error_subcode: see the rfc4271#section-4.5
1262             :param data_hex: see the rfc4271#section-4.5
1263         Returns:
1264             :return: encoded NOTIFICATION message in HEX
1265         """
1266
1267         # Marker
1268         marker_hex = "\xFF" * 16
1269
1270         # Type
1271         type = 3
1272         type_hex = struct.pack("B", type)
1273
1274         # Error Code
1275         error_code_hex = struct.pack("B", error_code)
1276
1277         # Error Subode
1278         error_subcode_hex = struct.pack("B", error_subcode)
1279
1280         # Length (big-endian)
1281         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1282                   len(error_subcode_hex) + len(data_hex))
1283         length_hex = struct.pack(">H", length)
1284
1285         # NOTIFICATION Message
1286         message_hex = (
1287             marker_hex +
1288             length_hex +
1289             type_hex +
1290             error_code_hex +
1291             error_subcode_hex +
1292             data_hex
1293         )
1294
1295         if self.log_debug:
1296             logger.debug("NOTIFICATION message encoding")
1297             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1298             logger.debug("  Length=" + str(length) + " (0x" +
1299                          binascii.hexlify(length_hex) + ")")
1300             logger.debug("  Type=" + str(type) + " (0x" +
1301                          binascii.hexlify(type_hex) + ")")
1302             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1303                          binascii.hexlify(error_code_hex) + ")")
1304             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1305                          binascii.hexlify(error_subcode_hex) + ")")
1306             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1307             logger.debug("NOTIFICATION message encoded: 0x%s",
1308                          binascii.b2a_hex(message_hex))
1309
1310         return message_hex
1311
1312     def keepalive_message(self):
1313         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1314
1315         Returns:
1316             :return: encoded KEEP ALIVE message in HEX
1317         """
1318
1319         # Marker
1320         marker_hex = "\xFF" * 16
1321
1322         # Type
1323         type = 4
1324         type_hex = struct.pack("B", type)
1325
1326         # Length (big-endian)
1327         length = len(marker_hex) + 2 + len(type_hex)
1328         length_hex = struct.pack(">H", length)
1329
1330         # KEEP ALIVE Message
1331         message_hex = (
1332             marker_hex +
1333             length_hex +
1334             type_hex
1335         )
1336
1337         if self.log_debug:
1338             logger.debug("KEEP ALIVE message encoding")
1339             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1340             logger.debug("  Length=" + str(length) + " (0x" +
1341                          binascii.hexlify(length_hex) + ")")
1342             logger.debug("  Type=" + str(type) + " (0x" +
1343                          binascii.hexlify(type_hex) + ")")
1344             logger.debug("KEEP ALIVE message encoded: 0x%s",
1345                          binascii.b2a_hex(message_hex))
1346
1347         return message_hex
1348
1349
1350 class TimeTracker(object):
1351     """Class for tracking timers, both for my keepalives and
1352     peer's hold time.
1353     """
1354
1355     def __init__(self, msg_in):
1356         """Initialisation. based on defaults and OPEN message from peer.
1357
1358         Arguments:
1359             msg_in: the OPEN message received from peer.
1360         """
1361         # Note: Relative time is always named timedelta, to stress that
1362         # the (non-delta) time is absolute.
1363         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1364         # Upper bound for being stuck in the same state, we should
1365         # at least report something before continuing.
1366         # Negotiate the hold timer by taking the smaller
1367         # of the 2 values (mine and the peer's).
1368         hold_timedelta = 180  # Not an attribute of self yet.
1369         # TODO: Make the default value configurable,
1370         # default value could mirror what peer said.
1371         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1372         if hold_timedelta > peer_hold_timedelta:
1373             hold_timedelta = peer_hold_timedelta
1374         if hold_timedelta != 0 and hold_timedelta < 3:
1375             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1376             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1377         self.hold_timedelta = hold_timedelta
1378         # If we do not hear from peer this long, we assume it has died.
1379         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1380         # Upper limit for duration between messages, to avoid being
1381         # declared to be dead.
1382         # The same as calling snapshot(), but also declares a field.
1383         self.snapshot_time = time.time()
1384         # Sometimes we need to store time. This is where to get
1385         # the value from afterwards. Time_keepalive may be too strict.
1386         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1387         # At this time point, peer will be declared dead.
1388         self.my_keepalive_time = None  # to be set later
1389         # At this point, we should be sending keepalive message.
1390
1391     def snapshot(self):
1392         """Store current time in instance data to use later."""
1393         # Read as time before something interesting was called.
1394         self.snapshot_time = time.time()
1395
1396     def reset_peer_hold_time(self):
1397         """Move hold time to future as peer has just proven it still lives."""
1398         self.peer_hold_time = time.time() + self.hold_timedelta
1399
1400     # Some methods could rely on self.snapshot_time, but it is better
1401     # to require user to provide it explicitly.
1402     def reset_my_keepalive_time(self, keepalive_time):
1403         """Calculate and set the next my KEEP ALIVE timeout time
1404
1405         Arguments:
1406             :keepalive_time: the initial value of the KEEP ALIVE timer
1407         """
1408         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1409
1410     def is_time_for_my_keepalive(self):
1411         """Check for my KEEP ALIVE timeout occurence"""
1412         if self.hold_timedelta == 0:
1413             return False
1414         return self.snapshot_time >= self.my_keepalive_time
1415
1416     def get_next_event_time(self):
1417         """Set the time of the next expected or to be sent KEEP ALIVE"""
1418         if self.hold_timedelta == 0:
1419             return self.snapshot_time + 86400
1420         return min(self.my_keepalive_time, self.peer_hold_time)
1421
1422     def check_peer_hold_time(self, snapshot_time):
1423         """Raise error if nothing was read from peer until specified time."""
1424         # Hold time = 0 means keepalive checking off.
1425         if self.hold_timedelta != 0:
1426             # time.time() may be too strict
1427             if snapshot_time > self.peer_hold_time:
1428                 logger.error("Peer has overstepped the hold timer.")
1429                 raise RuntimeError("Peer has overstepped the hold timer.")
1430                 # TODO: Include hold_timedelta?
1431                 # TODO: Add notification sending (attempt). That means
1432                 # move to write tracker.
1433
1434
1435 class ReadTracker(object):
1436     """Class for tracking read of mesages chunk by chunk and
1437     for idle waiting.
1438     """
1439
1440     def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1441                  l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1442                  ipv6=False, grace=8, wait_for_read=10):
1443         """The reader initialisation.
1444
1445         Arguments:
1446             bgp_socket: socket to be used for sending
1447             timer: timer to be used for scheduling
1448             storage: thread safe dict
1449             evpn: flag that evpn functionality is tested
1450             mvpn: flag that mvpn functionality is tested
1451             grace: flag that grace-restart functionality is tested
1452             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1453             l3vpn: flag that l3vpn unicast functionality is tested
1454             rt_constrain: flag that rt-constrain functionality is tested
1455             allf: flag for all family testing.
1456         """
1457         # References to outside objects.
1458         self.socket = bgp_socket
1459         self.timer = timer
1460         # BGP marker length plus length field length.
1461         self.header_length = 18
1462         # TODO: make it class (constant) attribute
1463         # Computation of where next chunk ends depends on whether
1464         # we are beyond length field.
1465         self.reading_header = True
1466         # Countdown towards next size computation.
1467         self.bytes_to_read = self.header_length
1468         # Incremental buffer for message under read.
1469         self.msg_in = ""
1470         # Initialising counters
1471         self.updates_received = 0
1472         self.prefixes_introduced = 0
1473         self.prefixes_withdrawn = 0
1474         self.rx_idle_time = 0
1475         self.rx_activity_detected = True
1476         self.storage = storage
1477         self.evpn = evpn
1478         self.mvpn = mvpn
1479         self.l3vpn_mcast = l3vpn_mcast
1480         self.l3vpn = l3vpn
1481         self.rt_constrain = rt_constrain
1482         self.ipv6 = ipv6
1483         self.allf = allf
1484         self.wfr = wait_for_read
1485         self.grace = grace
1486
1487     def read_message_chunk(self):
1488         """Read up to one message
1489
1490         Note:
1491             Currently it does not return anything.
1492         """
1493         # TODO: We could return the whole message, currently not needed.
1494         # We assume the socket is readable.
1495         chunk_message = self.socket.recv(self.bytes_to_read)
1496         self.msg_in += chunk_message
1497         self.bytes_to_read -= len(chunk_message)
1498         # TODO: bytes_to_read < 0 is not possible, right?
1499         if not self.bytes_to_read:
1500             # Finished reading a logical block.
1501             if self.reading_header:
1502                 # The logical block was a BGP header.
1503                 # Now we know the size of the message.
1504                 self.reading_header = False
1505                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1506                                       self.header_length)
1507             else:  # We have finished reading the body of the message.
1508                 # Peer has just proven it is still alive.
1509                 self.timer.reset_peer_hold_time()
1510                 # TODO: Do we want to count received messages?
1511                 # This version ignores the received message.
1512                 # TODO: Should we do validation and exit on anything
1513                 # besides update or keepalive?
1514                 # Prepare state for reading another message.
1515                 message_type_hex = self.msg_in[self.header_length]
1516                 if message_type_hex == "\x01":
1517                     logger.info("OPEN message received: 0x%s",
1518                                 binascii.b2a_hex(self.msg_in))
1519                 elif message_type_hex == "\x02":
1520                     logger.debug("UPDATE message received: 0x%s",
1521                                  binascii.b2a_hex(self.msg_in))
1522                     self.decode_update_message(self.msg_in)
1523                 elif message_type_hex == "\x03":
1524                     logger.info("NOTIFICATION message received: 0x%s",
1525                                 binascii.b2a_hex(self.msg_in))
1526                 elif message_type_hex == "\x04":
1527                     logger.info("KEEP ALIVE message received: 0x%s",
1528                                 binascii.b2a_hex(self.msg_in))
1529                 else:
1530                     logger.warning("Unexpected message received: 0x%s",
1531                                    binascii.b2a_hex(self.msg_in))
1532                 self.msg_in = ""
1533                 self.reading_header = True
1534                 self.bytes_to_read = self.header_length
1535         # We should not act upon peer_hold_time if we are reading
1536         # something right now.
1537         return
1538
1539     def decode_path_attributes(self, path_attributes_hex):
1540         """Decode the Path Attributes field (rfc4271#section-4.3)
1541
1542         Arguments:
1543             :path_attributes: path_attributes field to be decoded in hex
1544         Returns:
1545             :return: None
1546         """
1547         hex_to_decode = path_attributes_hex
1548
1549         while len(hex_to_decode):
1550             attr_flags_hex = hex_to_decode[0]
1551             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1552 #            attr_optional_bit = attr_flags & 128
1553 #            attr_transitive_bit = attr_flags & 64
1554 #            attr_partial_bit = attr_flags & 32
1555             attr_extended_length_bit = attr_flags & 16
1556
1557             attr_type_code_hex = hex_to_decode[1]
1558             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1559
1560             if attr_extended_length_bit:
1561                 attr_length_hex = hex_to_decode[2:4]
1562                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1563                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1564                 hex_to_decode = hex_to_decode[4 + attr_length:]
1565             else:
1566                 attr_length_hex = hex_to_decode[2]
1567                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1568                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1569                 hex_to_decode = hex_to_decode[3 + attr_length:]
1570
1571             if attr_type_code == 1:
1572                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1573                              binascii.b2a_hex(attr_flags_hex))
1574                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1575             elif attr_type_code == 2:
1576                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1577                              binascii.b2a_hex(attr_flags_hex))
1578                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1579             elif attr_type_code == 3:
1580                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1581                              binascii.b2a_hex(attr_flags_hex))
1582                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1583             elif attr_type_code == 4:
1584                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1585                              binascii.b2a_hex(attr_flags_hex))
1586                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1587             elif attr_type_code == 5:
1588                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1589                              binascii.b2a_hex(attr_flags_hex))
1590                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1591             elif attr_type_code == 6:
1592                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1593                              binascii.b2a_hex(attr_flags_hex))
1594                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1595             elif attr_type_code == 7:
1596                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1597                              binascii.b2a_hex(attr_flags_hex))
1598                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1599             elif attr_type_code == 9:  # rfc4456#section-8
1600                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1601                              binascii.b2a_hex(attr_flags_hex))
1602                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1603             elif attr_type_code == 10:  # rfc4456#section-8
1604                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1605                              binascii.b2a_hex(attr_flags_hex))
1606                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1607             elif attr_type_code == 14:  # rfc4760#section-3
1608                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1609                              binascii.b2a_hex(attr_flags_hex))
1610                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1611                 address_family_identifier_hex = attr_value_hex[0:2]
1612                 logger.debug("  Address Family Identifier=0x%s",
1613                              binascii.b2a_hex(address_family_identifier_hex))
1614                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1615                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1616                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1617                 next_hop_netaddr_len_hex = attr_value_hex[3]
1618                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1619                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1620                              next_hop_netaddr_len,
1621                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1622                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1623                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1624                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1625                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1626                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1627                 logger.debug("  Reserved=0x%s",
1628                              binascii.b2a_hex(reserved_hex))
1629                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1630                 logger.debug("  Network Layer Reachability Information=0x%s",
1631                              binascii.b2a_hex(nlri_hex))
1632                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1633                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1634                 for prefix in nlri_prefix_list:
1635                     logger.debug("  nlri_prefix_received: %s", prefix)
1636                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1637             elif attr_type_code == 15:  # rfc4760#section-4
1638                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1639                              binascii.b2a_hex(attr_flags_hex))
1640                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1641                 address_family_identifier_hex = attr_value_hex[0:2]
1642                 logger.debug("  Address Family Identifier=0x%s",
1643                              binascii.b2a_hex(address_family_identifier_hex))
1644                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1645                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1646                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1647                 wd_hex = attr_value_hex[3:]
1648                 logger.debug("  Withdrawn Routes=0x%s",
1649                              binascii.b2a_hex(wd_hex))
1650                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1651                 logger.debug("  Withdrawn routes prefix list: %s",
1652                              wdr_prefix_list)
1653                 for prefix in wdr_prefix_list:
1654                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1655                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1656             else:
1657                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1658                              binascii.b2a_hex(attr_flags_hex))
1659                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1660         return None
1661
1662     def decode_update_message(self, msg):
1663         """Decode an UPDATE message (rfc4271#section-4.3)
1664
1665         Arguments:
1666             :msg: message to be decoded in hex
1667         Returns:
1668             :return: None
1669         """
1670         logger.debug("Decoding update message:")
1671         # message header - marker
1672         marker_hex = msg[:16]
1673         logger.debug("Message header marker: 0x%s",
1674                      binascii.b2a_hex(marker_hex))
1675         # message header - message length
1676         msg_length_hex = msg[16:18]
1677         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1678         logger.debug("Message lenght: 0x%s (%s)",
1679                      binascii.b2a_hex(msg_length_hex), msg_length)
1680         # message header - message type
1681         msg_type_hex = msg[18:19]
1682         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1683
1684         with self.storage as stor:
1685             # this will replace the previously stored message
1686             stor['update'] = binascii.hexlify(msg)
1687
1688         logger.debug("Evpn {}".format(self.evpn))
1689         if self.evpn:
1690             logger.debug("Skipping update decoding due to evpn data expected")
1691             return
1692
1693         logger.debug("Graceful-restart {}".format(self.grace))
1694         if self.grace != 8:
1695             logger.debug("Skipping update decoding due to graceful-restart data expected")
1696             return
1697
1698         logger.debug("Mvpn {}".format(self.mvpn))
1699         if self.mvpn:
1700             logger.debug("Skipping update decoding due to mvpn data expected")
1701             return
1702
1703         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1704         if self.l3vpn_mcast:
1705             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1706             return
1707
1708         logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1709         if self.l3vpn_mcast:
1710             logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1711             return
1712
1713         logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1714         if self.rt_constrain:
1715             logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1716             return
1717
1718         logger.debug("Ipv6-Unicast {}".format(self.ipv6))
1719         if self.ipv6:
1720             logger.debug("Skipping update decoding due to Ipv6 data expected")
1721             return
1722
1723         logger.debug("Allf {}".format(self.allf))
1724         if self.allf:
1725             logger.debug("Skipping update decoding")
1726             return
1727
1728         if msg_type == 2:
1729             logger.debug("Message type: 0x%s (update)",
1730                          binascii.b2a_hex(msg_type_hex))
1731             # withdrawn routes length
1732             wdr_length_hex = msg[19:21]
1733             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1734             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1735                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1736             # withdrawn routes
1737             wdr_hex = msg[21:21 + wdr_length]
1738             logger.debug("Withdrawn routes: 0x%s",
1739                          binascii.b2a_hex(wdr_hex))
1740             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1741             logger.debug("Withdrawn routes prefix list: %s",
1742                          wdr_prefix_list)
1743             for prefix in wdr_prefix_list:
1744                 logger.debug("withdrawn_prefix_received: %s", prefix)
1745             # total path attribute length
1746             total_pa_length_offset = 21 + wdr_length
1747             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1748             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1749             logger.debug("Total path attribute lenght: 0x%s (%s)",
1750                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1751             # path attributes
1752             pa_offset = total_pa_length_offset + 2
1753             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1754             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1755             self.decode_path_attributes(pa_hex)
1756             # network layer reachability information length
1757             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1758             logger.debug("Calculated NLRI length: %s", nlri_length)
1759             # network layer reachability information
1760             nlri_offset = pa_offset + total_pa_length
1761             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1762             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1763             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1764             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1765             for prefix in nlri_prefix_list:
1766                 logger.debug("nlri_prefix_received: %s", prefix)
1767             # Updating counters
1768             self.updates_received += 1
1769             self.prefixes_introduced += len(nlri_prefix_list)
1770             self.prefixes_withdrawn += len(wdr_prefix_list)
1771         else:
1772             logger.error("Unexpeced message type 0x%s in 0x%s",
1773                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1774
1775     def wait_for_read(self):
1776         """Read message until timeout (next expected event).
1777
1778         Note:
1779             Used when no more updates has to be sent to avoid busy-wait.
1780             Currently it does not return anything.
1781         """
1782         # Compute time to the first predictable state change
1783         event_time = self.timer.get_next_event_time()
1784         # snapshot_time would be imprecise
1785         wait_timedelta = min(event_time - time.time(), self.wfr)
1786         if wait_timedelta < 0:
1787             # The program got around to waiting to an event in "very near
1788             # future" so late that it became a "past" event, thus tell
1789             # "select" to not wait at all. Passing negative timedelta to
1790             # select() would lead to either waiting forever (for -1) or
1791             # select.error("Invalid parameter") (for everything else).
1792             wait_timedelta = 0
1793         # And wait for event or something to read.
1794
1795         if not self.rx_activity_detected or not (self.updates_received % 100):
1796             # right time to write statistics to the log (not for every update and
1797             # not too frequently to avoid having large log files)
1798             logger.info("total_received_update_message_counter: %s",
1799                         self.updates_received)
1800             logger.info("total_received_nlri_prefix_counter: %s",
1801                         self.prefixes_introduced)
1802             logger.info("total_received_withdrawn_prefix_counter: %s",
1803                         self.prefixes_withdrawn)
1804
1805         start_time = time.time()
1806         select.select([self.socket], [], [self.socket], wait_timedelta)
1807         timedelta = time.time() - start_time
1808         self.rx_idle_time += timedelta
1809         self.rx_activity_detected = timedelta < 1
1810
1811         if not self.rx_activity_detected or not (self.updates_received % 100):
1812             # right time to write statistics to the log (not for every update and
1813             # not too frequently to avoid having large log files)
1814             logger.info("... idle for %.3fs", timedelta)
1815             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1816         return
1817
1818
1819 class WriteTracker(object):
1820     """Class tracking enqueueing messages and sending chunks of them."""
1821
1822     def __init__(self, bgp_socket, generator, timer):
1823         """The writter initialisation.
1824
1825         Arguments:
1826             bgp_socket: socket to be used for sending
1827             generator: generator to be used for message generation
1828             timer: timer to be used for scheduling
1829         """
1830         # References to outside objects,
1831         self.socket = bgp_socket
1832         self.generator = generator
1833         self.timer = timer
1834         # Really new fields.
1835         # TODO: Would attribute docstrings add anything substantial?
1836         self.sending_message = False
1837         self.bytes_to_send = 0
1838         self.msg_out = ""
1839
1840     def enqueue_message_for_sending(self, message):
1841         """Enqueue message and change state.
1842
1843         Arguments:
1844             message: message to be enqueued into the msg_out buffer
1845         """
1846         self.msg_out += message
1847         self.bytes_to_send += len(message)
1848         self.sending_message = True
1849
1850     def send_message_chunk_is_whole(self):
1851         """Send enqueued data from msg_out buffer
1852
1853         Returns:
1854             :return: true if no remaining data to send
1855         """
1856         # We assume there is a msg_out to send and socket is writable.
1857         # print "going to send", repr(self.msg_out)
1858         self.timer.snapshot()
1859         bytes_sent = self.socket.send(self.msg_out)
1860         # Forget the part of message that was sent.
1861         self.msg_out = self.msg_out[bytes_sent:]
1862         self.bytes_to_send -= bytes_sent
1863         if not self.bytes_to_send:
1864             # TODO: Is it possible to hit negative bytes_to_send?
1865             self.sending_message = False
1866             # We should have reset hold timer on peer side.
1867             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1868             # The possible reason for not prioritizing reads is gone.
1869             return True
1870         return False
1871
1872
1873 class StateTracker(object):
1874     """Main loop has state so complex it warrants this separate class."""
1875
1876     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1877         """The state tracker initialisation.
1878
1879         Arguments:
1880             bgp_socket: socket to be used for sending / receiving
1881             generator: generator to be used for message generation
1882             timer: timer to be used for scheduling
1883             inqueue: user initiated messages queue
1884             storage: thread safe dict to store data for the rpc server
1885             cliargs: cli args from the user
1886         """
1887         # References to outside objects.
1888         self.socket = bgp_socket
1889         self.generator = generator
1890         self.timer = timer
1891         # Sub-trackers.
1892         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1893                                   l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1894                                   rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
1895                                   wait_for_read=cliargs.wfr)
1896         self.writer = WriteTracker(bgp_socket, generator, timer)
1897         # Prioritization state.
1898         self.prioritize_writing = False
1899         # In general, we prioritize reading over writing. But in order
1900         # not to get blocked by neverending reads, we should
1901         # check whether we are not risking running out of holdtime.
1902         # So in some situations, this field is set to True to attempt
1903         # finishing sending a message, after which this field resets
1904         # back to False.
1905         # TODO: Alternative is to switch fairly between reading and
1906         # writing (called round robin from now on).
1907         # Message counting is done in generator.
1908         self.inqueue = inqueue
1909
1910     def perform_one_loop_iteration(self):
1911         """ The main loop iteration
1912
1913         Notes:
1914             Calculates priority, resolves all conditions, calls
1915             appropriate method and returns to caller to repeat.
1916         """
1917         self.timer.snapshot()
1918         if not self.prioritize_writing:
1919             if self.timer.is_time_for_my_keepalive():
1920                 if not self.writer.sending_message:
1921                     # We need to schedule a keepalive ASAP.
1922                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1923                     logger.info("KEEP ALIVE is sent.")
1924                 # We are sending a message now, so let's prioritize it.
1925                 self.prioritize_writing = True
1926
1927         try:
1928             msg = self.inqueue.get_nowait()
1929             logger.info("Received message: {}".format(msg))
1930             msgbin = binascii.unhexlify(msg)
1931             self.writer.enqueue_message_for_sending(msgbin)
1932         except Queue.Empty:
1933             pass
1934         # Now we know what our priorities are, we have to check
1935         # which actions are available.
1936         # socket.socket() returns three lists,
1937         # we store them to list of lists.
1938         list_list = select.select([self.socket], [self.socket], [self.socket],
1939                                   self.timer.report_timedelta)
1940         read_list, write_list, except_list = list_list
1941         # Lists are unpacked, each is either [] or [self.socket],
1942         # so we will test them as boolean.
1943         if except_list:
1944             logger.error("Exceptional state on the socket.")
1945             raise RuntimeError("Exceptional state on socket", self.socket)
1946         # We will do either read or write.
1947         if not (self.prioritize_writing and write_list):
1948             # Either we have no reason to rush writes,
1949             # or the socket is not writable.
1950             # We are focusing on reading here.
1951             if read_list:  # there is something to read indeed
1952                 # In this case we want to read chunk of message
1953                 # and repeat the select,
1954                 self.reader.read_message_chunk()
1955                 return
1956             # We were focusing on reading, but nothing to read was there.
1957             # Good time to check peer for hold timer.
1958             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1959             # Quiet on the read front, we can have attempt to write.
1960         if write_list:
1961             # Either we really want to reset peer's view of our hold
1962             # timer, or there was nothing to read.
1963             # Were we in the middle of sending a message?
1964             if self.writer.sending_message:
1965                 # Was it the end of a message?
1966                 whole = self.writer.send_message_chunk_is_whole()
1967                 # We were pressed to send something and we did it.
1968                 if self.prioritize_writing and whole:
1969                     # We prioritize reading again.
1970                     self.prioritize_writing = False
1971                 return
1972             # Finally to check if still update messages to be generated.
1973             if self.generator.remaining_prefixes:
1974                 msg_out = self.generator.compose_update_message()
1975                 if not self.generator.remaining_prefixes:
1976                     # We have just finished update generation,
1977                     # end-of-rib is due.
1978                     logger.info("All update messages generated.")
1979                     logger.info("Storing performance results.")
1980                     self.generator.store_results()
1981                     logger.info("Finally an END-OF-RIB is sent.")
1982                     msg_out += self.generator.update_message(wr_prefixes=[],
1983                                                              nlri_prefixes=[],
1984                                                              end_of_rib=True)
1985                 self.writer.enqueue_message_for_sending(msg_out)
1986                 # Attempt for real sending to be done in next iteration.
1987                 return
1988             # Nothing to write anymore.
1989             # To avoid busy loop, we do idle waiting here.
1990             self.reader.wait_for_read()
1991             return
1992         # We can neither read nor write.
1993         logger.warning("Input and output both blocked for " +
1994                        str(self.timer.report_timedelta) + " seconds.")
1995         # FIXME: Are we sure select has been really waiting
1996         # the whole period?
1997         return
1998
1999
2000 def create_logger(loglevel, logfile):
2001     """Create logger object
2002
2003     Arguments:
2004         :loglevel: log level
2005         :logfile: log file name
2006     Returns:
2007         :return: logger object
2008     """
2009     logger = logging.getLogger("logger")
2010     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
2011     console_handler = logging.StreamHandler()
2012     file_handler = logging.FileHandler(logfile, mode="w")
2013     console_handler.setFormatter(log_formatter)
2014     file_handler.setFormatter(log_formatter)
2015     logger.addHandler(console_handler)
2016     logger.addHandler(file_handler)
2017     logger.setLevel(loglevel)
2018     return logger
2019
2020
2021 def job(arguments, inqueue, storage):
2022     """One time initialisation and iterations looping.
2023     Notes:
2024         Establish BGP connection and run iterations.
2025
2026     Arguments:
2027         :arguments: Command line arguments
2028         :inqueue: Data to be sent from play.py
2029         :storage: Shared dict for rpc server
2030     Returns:
2031         :return: None
2032     """
2033     bgp_socket = establish_connection(arguments)
2034     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2035     # Receive open message before sending anything.
2036     # FIXME: Add parameter to send default open message first,
2037     # to work with "you first" peers.
2038     msg_in = read_open_message(bgp_socket)
2039     logger.info(binascii.hexlify(msg_in))
2040     storage['open'] = binascii.hexlify(msg_in)
2041     timer = TimeTracker(msg_in)
2042     generator = MessageGenerator(arguments)
2043     msg_out = generator.open_message()
2044     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2045     # Send our open message to the peer.
2046     bgp_socket.send(msg_out)
2047     # Wait for confirming keepalive.
2048     # TODO: Surely in just one packet?
2049     # Using exact keepalive length to not to see possible updates.
2050     msg_in = bgp_socket.recv(19)
2051     if msg_in != generator.keepalive_message():
2052         error_msg = "Open not confirmed by keepalive, instead got"
2053         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2054         raise MessageError(error_msg, msg_in)
2055     timer.reset_peer_hold_time()
2056     # Send the keepalive to indicate the connection is accepted.
2057     timer.snapshot()  # Remember this time.
2058     msg_out = generator.keepalive_message()
2059     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2060     bgp_socket.send(msg_out)
2061     # Use the remembered time.
2062     timer.reset_my_keepalive_time(timer.snapshot_time)
2063     # End of initial handshake phase.
2064     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2065     while True:  # main reactor loop
2066         state.perform_one_loop_iteration()
2067
2068
2069 class Rpcs:
2070     '''Handler for SimpleXMLRPCServer'''
2071
2072     def __init__(self, sendqueue, storage):
2073         '''Init method
2074
2075         Arguments:
2076             :sendqueue: queue for data to be sent towards odl
2077             :storage: thread safe dict
2078         '''
2079         self.queue = sendqueue
2080         self.storage = storage
2081
2082     def send(self, text):
2083         '''Data to be sent
2084
2085         Arguments:
2086             :text: hes string of the data to be sent
2087         '''
2088         self.queue.put(text)
2089
2090     def get(self, text=''):
2091         '''Reads data form the storage
2092
2093         - returns stored data or an empty string, at the moment only
2094           'update' is stored
2095
2096         Arguments:
2097             :text: a key to the storage to get the data
2098         Returns:
2099             :data: stored data
2100         '''
2101         with self.storage as stor:
2102             return stor.get(text, '')
2103
2104     def clean(self, text=''):
2105         '''Cleans data form the storage
2106
2107         Arguments:
2108             :text: a key to the storage to clean the data
2109         '''
2110         with self.storage as stor:
2111             if text in stor:
2112                 del stor[text]
2113
2114
2115 def threaded_job(arguments):
2116     """Run the job threaded
2117
2118     Arguments:
2119         :arguments: Command line arguments
2120     Returns:
2121         :return: None
2122     """
2123     amount_left = arguments.amount
2124     utils_left = arguments.multiplicity
2125     prefix_current = arguments.firstprefix
2126     myip_current = arguments.myip
2127     port = arguments.port
2128     thread_args = []
2129     rpcqueue = Queue.Queue()
2130     storage = SafeDict()
2131
2132     while 1:
2133         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
2134         amount_left -= amount_per_util
2135         utils_left -= 1
2136
2137         args = deepcopy(arguments)
2138         args.amount = amount_per_util
2139         args.firstprefix = prefix_current
2140         args.myip = myip_current
2141         thread_args.append(args)
2142
2143         if not utils_left:
2144             break
2145         prefix_current += amount_per_util * 16
2146         myip_current += 1
2147
2148     try:
2149         # Create threads
2150         for t in thread_args:
2151             thread.start_new_thread(job, (t, rpcqueue, storage))
2152     except Exception:
2153         print "Error: unable to start thread."
2154         raise SystemExit(2)
2155
2156     if arguments.usepeerip:
2157         ip = arguments.peerip
2158     else:
2159         ip = arguments.myip
2160     rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2161     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2162     rpcserver.serve_forever()
2163
2164
2165 if __name__ == "__main__":
2166     arguments = parse_arguments()
2167     logger = create_logger(arguments.loglevel, arguments.logfile)
2168     threaded_job(arguments)