21fd5a8175a9801df3c6fb51ac3ed31169174692
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Rpc server port."
69     parser.add_argument("--port", default="8000", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be announced in one iteration"
71     parser.add_argument("--insert", default="1", type=int, help=str_help)
72     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74     str_help = "The number of prefixes to process without withdrawals"
75     parser.add_argument("--prefill", default="0", type=int, help=str_help)
76     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77     parser.add_argument("--updates", choices=["single", "separate"],
78                         default=["separate"], help=str_help)
79     str_help = "Base prefix IP address for prefix generation"
80     parser.add_argument("--firstprefix", default="8.0.1.0",
81                         type=ipaddr.IPv4Address, help=str_help)
82     str_help = "The prefix length."
83     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84     str_help = "Listen for connection, instead of initiating it."
85     parser.add_argument("--listen", action="store_true", help=str_help)
86     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87                 "Default value only suitable for listening.")
88     parser.add_argument("--myip", default="0.0.0.0",
89                         type=ipaddr.IPv4Address, help=str_help)
90     str_help = ("TCP port to bind to when listening or initiating connection." +
91                 "Default only suitable for initiating.")
92     parser.add_argument("--myport", default="0", type=int, help=str_help)
93     str_help = "The IP of the next hop to be placed into the update messages."
94     parser.add_argument("--nexthop", default="192.0.2.1",
95                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96     str_help = "Identifier of the route originator."
97     parser.add_argument("--originator", default=None,
98                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
99     str_help = "Cluster list item identifier."
100     parser.add_argument("--cluster", default=None,
101                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102     str_help = ("Numeric IP Address to try to connect to." +
103                 "Currently no effect in listening mode.")
104     parser.add_argument("--peerip", default="127.0.0.2",
105                         type=ipaddr.IPv4Address, help=str_help)
106     str_help = "TCP port to try to connect to. No effect in listening mode."
107     parser.add_argument("--peerport", default="179", type=int, help=str_help)
108     str_help = "Local hold time."
109     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110     str_help = "Log level (--error, --warning, --info, --debug)"
111     parser.add_argument("--error", dest="loglevel", action="store_const",
112                         const=logging.ERROR, default=logging.INFO,
113                         help=str_help)
114     parser.add_argument("--warning", dest="loglevel", action="store_const",
115                         const=logging.WARNING, default=logging.INFO,
116                         help=str_help)
117     parser.add_argument("--info", dest="loglevel", action="store_const",
118                         const=logging.INFO, default=logging.INFO,
119                         help=str_help)
120     parser.add_argument("--debug", dest="loglevel", action="store_const",
121                         const=logging.DEBUG, default=logging.INFO,
122                         help=str_help)
123     str_help = "Log file name"
124     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125     str_help = "Trailing part of the csv result files for plotting purposes"
126     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127     str_help = "Minimum number of updates to reach to include result into csv."
128     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131     str_help = "Using peerip instead of myip for xmlrpc server"
132     parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133     str_help = "Link-State NLRI supported"
134     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135     str_help = "Link-State NLRI: Identifier"
136     parser.add_argument("-lsid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: Tunnel ID"
138     parser.add_argument("-lstid", default="1", type=int, help=str_help)
139     str_help = "Link-State NLRI: LSP ID"
140     parser.add_argument("-lspid", default="1", type=int, help=str_help)
141     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142     parser.add_argument("--lstsaddr", default="1.2.3.4",
143                         type=ipaddr.IPv4Address, help=str_help)
144     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145     parser.add_argument("--lsteaddr", default="5.6.7.8",
146                         type=ipaddr.IPv4Address, help=str_help)
147     str_help = "Link-State NLRI: Identifier Step"
148     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149     str_help = "Link-State NLRI: Tunnel ID Step"
150     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151     str_help = "Link-State NLRI: LSP ID Step"
152     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157     str_help = "How many play utilities are to be started."
158     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160     Enabling this flag makes the script not decoding the update mesage, because of not\
161     supported decoding for these elements."
162     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164     Enabling this flag makes the script not decoding the update mesage, because of not\
165     supported decoding for these elements."
166     parser.add_argument("--grace", default="8", type=int, help=str_help)
167     str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
168     IPV4-Unicast, IPV6-Unicast, BGP-LS\
169     Enabling this flag makes the script not decoding the update mesage, because of not\
170     supported decoding for these elements."
171     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
172     str_help = "Open message includes L3VPN-MULTICAST arguments.\
173     Enabling this flag makes the script not decoding the update mesage, because of not\
174     supported decoding for these elements."
175     parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
176     str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
177     parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
178     str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
179     parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
180     str_help = "Open message includes ipv6-unicast family, without message decoding."
181     parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
182     str_help = "Add all supported families without message decoding."
183     parser.add_argument("--allf", default=False, action="store_true", help=str_help)
184     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
185     str_help = "Skipping well known attributes for update message"
186     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
187     arguments = parser.parse_args()
188     if arguments.multiplicity < 1:
189         print "Multiplicity", arguments.multiplicity, "is not positive."
190         raise SystemExit(1)
191     # TODO: Are sanity checks (such as asnumber>=0) required?
192     return arguments
193
194
195 def establish_connection(arguments):
196     """Establish connection to BGP peer.
197
198     Arguments:
199         :arguments: following command-line arguments are used
200             - arguments.myip: local IP address
201             - arguments.myport: local port
202             - arguments.peerip: remote IP address
203             - arguments.peerport: remote port
204     Returns:
205         :return: socket.
206     """
207     if arguments.listen:
208         logger.info("Connecting in the listening mode.")
209         logger.debug("Local IP address: " + str(arguments.myip))
210         logger.debug("Local port: " + str(arguments.myport))
211         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
213         # bind need single tuple as argument
214         listening_socket.bind((str(arguments.myip), arguments.myport))
215         listening_socket.listen(1)
216         bgp_socket, _ = listening_socket.accept()
217         # TODO: Verify client IP is cotroller IP.
218         listening_socket.close()
219     else:
220         logger.info("Connecting in the talking mode.")
221         logger.debug("Local IP address: " + str(arguments.myip))
222         logger.debug("Local port: " + str(arguments.myport))
223         logger.debug("Remote IP address: " + str(arguments.peerip))
224         logger.debug("Remote port: " + str(arguments.peerport))
225         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227         # bind to force specified address and port
228         talking_socket.bind((str(arguments.myip), arguments.myport))
229         # socket does not spead ipaddr, hence str()
230         talking_socket.connect((str(arguments.peerip), arguments.peerport))
231         bgp_socket = talking_socket
232     logger.info("Connected to ODL.")
233     return bgp_socket
234
235
236 def get_short_int_from_message(message, offset=16):
237     """Extract 2-bytes number from provided message.
238
239     Arguments:
240         :message: given message
241         :offset: offset of the short_int inside the message
242     Returns:
243         :return: required short_inf value.
244     Notes:
245         default offset value is the BGP message size offset.
246     """
247     high_byte_int = ord(message[offset])
248     low_byte_int = ord(message[offset + 1])
249     short_int = high_byte_int * 256 + low_byte_int
250     return short_int
251
252
253 def get_prefix_list_from_hex(prefixes_hex):
254     """Get decoded list of prefixes (rfc4271#section-4.3)
255
256     Arguments:
257         :prefixes_hex: list of prefixes to be decoded in hex
258     Returns:
259         :return: list of prefixes in the form of ip address (X.X.X.X/X)
260     """
261     prefix_list = []
262     offset = 0
263     while offset < len(prefixes_hex):
264         prefix_bit_len_hex = prefixes_hex[offset]
265         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
266         prefix_len = ((prefix_bit_len - 1) / 8) + 1
267         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
268         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
269         offset += 1 + prefix_len
270         prefix_list.append(prefix + "/" + str(prefix_bit_len))
271     return prefix_list
272
273
274 class MessageError(ValueError):
275     """Value error with logging optimized for hexlified messages."""
276
277     def __init__(self, text, message, *args):
278         """Initialisation.
279
280         Store and call super init for textual comment,
281         store raw message which caused it.
282         """
283         self.text = text
284         self.msg = message
285         super(MessageError, self).__init__(text, message, *args)
286
287     def __str__(self):
288         """Generate human readable error message.
289
290         Returns:
291             :return: human readable message as string
292         Notes:
293             Use a placeholder string if the message is to be empty.
294         """
295         message = binascii.hexlify(self.msg)
296         if message == "":
297             message = "(empty message)"
298         return self.text + ": " + message
299
300
301 def read_open_message(bgp_socket):
302     """Receive peer's OPEN message
303
304     Arguments:
305         :bgp_socket: the socket to be read
306     Returns:
307         :return: received OPEN message.
308     Notes:
309         Performs just basic incomming message checks
310     """
311     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
312     # TODO: Can the incoming open message be split in more than one packet?
313     # Some validation.
314     if len(msg_in) < 37:
315         # 37 is minimal length of open message with 4-byte AS number.
316         error_msg = (
317             "Message length (" + str(len(msg_in)) + ") is smaller than "
318             "minimal length of OPEN message with 4-byte AS number (37)"
319         )
320         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
321         raise MessageError(error_msg, msg_in)
322     # TODO: We could check BGP marker, but it is defined only later;
323     # decide what to do.
324     reported_length = get_short_int_from_message(msg_in)
325     if len(msg_in) != reported_length:
326         error_msg = (
327             "Expected message length (" + reported_length +
328             ") does not match actual length (" + str(len(msg_in)) + ")"
329         )
330         logger.error(error_msg + binascii.hexlify(msg_in))
331         raise MessageError(error_msg, msg_in)
332     logger.info("Open message received.")
333     return msg_in
334
335
336 class MessageGenerator(object):
337     """Class which generates messages, holds states and configuration values."""
338
339     # TODO: Define bgp marker as a class (constant) variable.
340     def __init__(self, args):
341         """Initialisation according to command-line args.
342
343         Arguments:
344             :args: argsparser's Namespace object which contains command-line
345                 options for MesageGenerator initialisation
346         Notes:
347             Calculates and stores default values used later on for
348             message generation.
349         """
350         self.total_prefix_amount = args.amount
351         # Number of update messages left to be sent.
352         self.remaining_prefixes = self.total_prefix_amount
353
354         # New parameters initialisation
355         self.port = args.port
356         self.iteration = 0
357         self.prefix_base_default = args.firstprefix
358         self.prefix_length_default = args.prefixlen
359         self.wr_prefixes_default = []
360         self.nlri_prefixes_default = []
361         self.version_default = 4
362         self.my_autonomous_system_default = args.asnumber
363         self.hold_time_default = args.holdtime  # Local hold time.
364         self.bgp_identifier_default = int(args.myip)
365         self.next_hop_default = args.nexthop
366         self.originator_id_default = args.originator
367         self.cluster_list_item_default = args.cluster
368         self.single_update_default = args.updates == "single"
369         self.randomize_updates_default = args.updates == "random"
370         self.prefix_count_to_add_default = args.insert
371         self.prefix_count_to_del_default = args.withdraw
372         if self.prefix_count_to_del_default < 0:
373             self.prefix_count_to_del_default = 0
374         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
375             # total number of prefixes must grow to avoid infinite test loop
376             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
377         self.slot_size_default = self.prefix_count_to_add_default
378         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
379         self.results_file_name_default = args.results
380         self.performance_threshold_default = args.threshold
381         self.rfc4760 = args.rfc4760
382         self.bgpls = args.bgpls
383         self.evpn = args.evpn
384         self.mvpn = args.mvpn
385         self.l3vpn_mcast = args.l3vpn_mcast
386         self.l3vpn = args.l3vpn
387         self.rt_constrain = args.rt_constrain
388         self.ipv6 = args.ipv6
389         self.allf = args.allf
390         self.skipattr = args.skipattr
391         self.grace = args.grace
392         # Default values when BGP-LS Attributes are used
393         if self.bgpls:
394             self.prefix_count_to_add_default = 1
395             self.prefix_count_to_del_default = 0
396         self.ls_nlri_default = {"Identifier": args.lsid,
397                                 "TunnelID": args.lstid,
398                                 "LSPID": args.lspid,
399                                 "IPv4TunnelSenderAddress": args.lstsaddr,
400                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
401         self.lsid_step = args.lsidstep
402         self.lstid_step = args.lstidstep
403         self.lspid_step = args.lspidstep
404         self.lstsaddr_step = args.lstsaddrstep
405         self.lsteaddr_step = args.lsteaddrstep
406         # Default values used for randomized part
407         s1_slots = ((self.total_prefix_amount -
408                      self.remaining_prefixes_threshold - 1) /
409                     self.prefix_count_to_add_default + 1)
410         s2_slots = ((self.remaining_prefixes_threshold - 1) /
411                     (self.prefix_count_to_add_default -
412                      self.prefix_count_to_del_default) + 1)
413         # S1_First_Index = 0
414         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
415         s2_first_index = s1_slots * self.prefix_count_to_add_default
416         s2_last_index = (s2_first_index +
417                          s2_slots * (self.prefix_count_to_add_default -
418                                      self.prefix_count_to_del_default) - 1)
419         self.slot_gap_default = ((self.total_prefix_amount -
420                                   self.remaining_prefixes_threshold - 1) /
421                                  self.prefix_count_to_add_default + 1)
422         self.randomize_lowest_default = s2_first_index
423         self.randomize_highest_default = s2_last_index
424         # Initialising counters
425         self.phase1_start_time = 0
426         self.phase1_stop_time = 0
427         self.phase2_start_time = 0
428         self.phase2_stop_time = 0
429         self.phase1_updates_sent = 0
430         self.phase2_updates_sent = 0
431         self.updates_sent = 0
432
433         self.log_info = args.loglevel <= logging.INFO
434         self.log_debug = args.loglevel <= logging.DEBUG
435         """
436         Flags needed for the MessageGenerator performance optimization.
437         Calling logger methods each iteration even with proper log level set
438         slows down significantly the MessageGenerator performance.
439         Measured total generation time (1M updates, dry run, error log level):
440         - logging based on basic logger features: 36,2s
441         - logging based on advanced logger features (lazy logging): 21,2s
442         - conditional calling of logger methods enclosed inside condition: 8,6s
443         """
444
445         logger.info("Generator initialisation")
446         logger.info("  Target total number of prefixes to be introduced: " +
447                     str(self.total_prefix_amount))
448         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
449                     str(self.prefix_length_default))
450         logger.info("  My Autonomous System number: " +
451                     str(self.my_autonomous_system_default))
452         logger.info("  My Hold Time: " + str(self.hold_time_default))
453         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
454         logger.info("  Next Hop: " + str(self.next_hop_default))
455         logger.info("  Originator ID: " + str(self.originator_id_default))
456         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
457         logger.info("  Prefix count to be inserted at once: " +
458                     str(self.prefix_count_to_add_default))
459         logger.info("  Prefix count to be withdrawn at once: " +
460                     str(self.prefix_count_to_del_default))
461         logger.info("  Fast pre-fill up to " +
462                     str(self.total_prefix_amount -
463                         self.remaining_prefixes_threshold) + " prefixes")
464         logger.info("  Remaining number of prefixes to be processed " +
465                     "in parallel with withdrawals: " +
466                     str(self.remaining_prefixes_threshold))
467         logger.debug("  Prefix index range used after pre-fill procedure [" +
468                      str(self.randomize_lowest_default) + ", " +
469                      str(self.randomize_highest_default) + "]")
470         if self.single_update_default:
471             logger.info("  Common single UPDATE will be generated " +
472                         "for both NLRI & WITHDRAWN lists")
473         else:
474             logger.info("  Two separate UPDATEs will be generated " +
475                         "for each NLRI & WITHDRAWN lists")
476         if self.randomize_updates_default:
477             logger.info("  Generation of UPDATE messages will be randomized")
478         logger.info("  Let\'s go ...\n")
479
480         # TODO: Notification for hold timer expiration can be handy.
481
482     def store_results(self, file_name=None, threshold=None):
483         """ Stores specified results into files based on file_name value.
484
485         Arguments:
486             :param file_name: Trailing (common) part of result file names
487             :param threshold: Minimum number of sent updates needed for each
488                               result to be included into result csv file
489                               (mainly needed because of the result accuracy)
490         Returns:
491             :return: n/a
492         """
493         # default values handling
494         # TODO optimize default values handling (use e.g. dicionary.update() approach)
495         if file_name is None:
496             file_name = self.results_file_name_default
497         if threshold is None:
498             threshold = self.performance_threshold_default
499         # performance calculation
500         if self.phase1_updates_sent >= threshold:
501             totals1 = self.phase1_updates_sent
502             performance1 = int(self.phase1_updates_sent /
503                                (self.phase1_stop_time - self.phase1_start_time))
504         else:
505             totals1 = None
506             performance1 = None
507         if self.phase2_updates_sent >= threshold:
508             totals2 = self.phase2_updates_sent
509             performance2 = int(self.phase2_updates_sent /
510                                (self.phase2_stop_time - self.phase2_start_time))
511         else:
512             totals2 = None
513             performance2 = None
514
515         logger.info("#" * 10 + " Final results " + "#" * 10)
516         logger.info("Number of iterations: " + str(self.iteration))
517         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
518                     str(self.phase1_updates_sent))
519         logger.info("The pre-fill phase duration: " +
520                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
521         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
522                     str(self.phase2_updates_sent))
523         logger.info("The 2nd test phase duration: " +
524                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
525         logger.info("Threshold for performance reporting: " + str(threshold))
526
527         # making labels
528         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
529                         " route(s) per UPDATE")
530         if self.single_update_default:
531             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
532                                   "/-" + str(self.prefix_count_to_del_default) +
533                                   " routes per UPDATE")
534         else:
535             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
536                                   "/-" + str(self.prefix_count_to_del_default) +
537                                   " routes in two UPDATEs")
538         # collecting capacity and performance results
539         totals = {}
540         performance = {}
541         if totals1 is not None:
542             totals[phase1_label] = totals1
543             performance[phase1_label] = performance1
544         if totals2 is not None:
545             totals[phase2_label] = totals2
546             performance[phase2_label] = performance2
547         self.write_results_to_file(totals, "totals-" + file_name)
548         self.write_results_to_file(performance, "performance-" + file_name)
549
550     def write_results_to_file(self, results, file_name):
551         """Writes results to the csv plot file consumable by Jenkins.
552
553         Arguments:
554             :param file_name: Name of the (csv) file to be created
555         Returns:
556             :return: none
557         """
558         first_line = ""
559         second_line = ""
560         f = open(file_name, "wt")
561         try:
562             for key in sorted(results):
563                 first_line += key + ", "
564                 second_line += str(results[key]) + ", "
565             first_line = first_line[:-2]
566             second_line = second_line[:-2]
567             f.write(first_line + "\n")
568             f.write(second_line + "\n")
569             logger.info("Message generator performance results stored in " +
570                         file_name + ":")
571             logger.info("  " + first_line)
572             logger.info("  " + second_line)
573         finally:
574             f.close()
575
576     # Return pseudo-randomized (reproducible) index for selected range
577     def randomize_index(self, index, lowest=None, highest=None):
578         """Calculates pseudo-randomized index from selected range.
579
580         Arguments:
581             :param index: input index
582             :param lowest: the lowes index from the randomized area
583             :param highest: the highest index from the randomized area
584         Returns:
585             :return: the (pseudo)randomized index
586         Notes:
587             Created just as a fame for future generator enhancement.
588         """
589         # default values handling
590         # TODO optimize default values handling (use e.g. dicionary.update() approach)
591         if lowest is None:
592             lowest = self.randomize_lowest_default
593         if highest is None:
594             highest = self.randomize_highest_default
595         # randomize
596         if (index >= lowest) and (index <= highest):
597             # we are in the randomized range -> shuffle it inside
598             # the range (now just reverse the order)
599             new_index = highest - (index - lowest)
600         else:
601             # we are out of the randomized range -> nothing to do
602             new_index = index
603         return new_index
604
605     def get_ls_nlri_values(self, index):
606         """Generates LS-NLRI parameters.
607         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
608
609         Arguments:
610             :param index: index (iteration)
611         Returns:
612             :return: dictionary of LS NLRI parameters and values
613         """
614         # generating list of LS NLRI parameters
615         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
616         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
617         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
618         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
619         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
620         ls_nlri_values = {"Identifier": identifier,
621                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
622                           "TunnelID": tunnel_id, "LSPID": lsp_id,
623                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
624         return ls_nlri_values
625
626     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
627                         prefix_len=None, prefix_count=None, randomize=None):
628         """Generates list of IP address prefixes.
629
630         Arguments:
631             :param slot_index: index of group of prefix addresses
632             :param slot_size: size of group of prefix addresses
633                 in [number of included prefixes]
634             :param prefix_base: IP address of the first prefix
635                 (slot_index = 0, prefix_index = 0)
636             :param prefix_len: length of the prefix in bites
637                 (the same as size of netmask)
638             :param prefix_count: number of prefixes to be returned
639                 from the specified slot
640         Returns:
641             :return: list of generated IP address prefixes
642         """
643         # default values handling
644         # TODO optimize default values handling (use e.g. dicionary.update() approach)
645         if slot_size is None:
646             slot_size = self.slot_size_default
647         if prefix_base is None:
648             prefix_base = self.prefix_base_default
649         if prefix_len is None:
650             prefix_len = self.prefix_length_default
651         if prefix_count is None:
652             prefix_count = slot_size
653         if randomize is None:
654             randomize = self.randomize_updates_default
655         # generating list of prefixes
656         indexes = []
657         prefixes = []
658         prefix_gap = 2 ** (32 - prefix_len)
659         for i in range(prefix_count):
660             prefix_index = slot_index * slot_size + i
661             if randomize:
662                 prefix_index = self.randomize_index(prefix_index)
663             indexes.append(prefix_index)
664             prefixes.append(prefix_base + prefix_index * prefix_gap)
665         if self.log_debug:
666             logger.debug("  Prefix slot index: " + str(slot_index))
667             logger.debug("  Prefix slot size: " + str(slot_size))
668             logger.debug("  Prefix count: " + str(prefix_count))
669             logger.debug("  Prefix indexes: " + str(indexes))
670             logger.debug("  Prefix list: " + str(prefixes))
671         return prefixes
672
673     def compose_update_message(self, prefix_count_to_add=None,
674                                prefix_count_to_del=None):
675         """Composes an UPDATE message
676
677         Arguments:
678             :param prefix_count_to_add: # of prefixes to put into NLRI list
679             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
680         Returns:
681             :return: encoded UPDATE message in HEX
682         Notes:
683             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
684             lists or common message wich includes both prefix lists.
685             Updates global counters.
686         """
687         # default values handling
688         # TODO optimize default values handling (use e.g. dicionary.update() approach)
689         if prefix_count_to_add is None:
690             prefix_count_to_add = self.prefix_count_to_add_default
691         if prefix_count_to_del is None:
692             prefix_count_to_del = self.prefix_count_to_del_default
693         # logging
694         if self.log_info and not (self.iteration % 1000):
695             logger.info("Iteration: " + str(self.iteration) +
696                         " - total remaining prefixes: " +
697                         str(self.remaining_prefixes))
698         if self.log_debug:
699             logger.debug("#" * 10 + " Iteration: " +
700                          str(self.iteration) + " " + "#" * 10)
701             logger.debug("Remaining prefixes: " +
702                          str(self.remaining_prefixes))
703         # scenario type & one-shot counter
704         straightforward_scenario = (self.remaining_prefixes >
705                                     self.remaining_prefixes_threshold)
706         if straightforward_scenario:
707             prefix_count_to_del = 0
708             if self.log_debug:
709                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
710             if not self.phase1_start_time:
711                 self.phase1_start_time = time.time()
712         else:
713             if self.log_debug:
714                 logger.debug("--- COMBINED SCENARIO ---")
715             if not self.phase2_start_time:
716                 self.phase2_start_time = time.time()
717         # tailor the number of prefixes if needed
718         prefix_count_to_add = (prefix_count_to_del +
719                                min(prefix_count_to_add - prefix_count_to_del,
720                                    self.remaining_prefixes))
721         # prefix slots selection for insertion and withdrawal
722         slot_index_to_add = self.iteration
723         slot_index_to_del = slot_index_to_add - self.slot_gap_default
724         # getting lists of prefixes for insertion in this iteration
725         if self.log_debug:
726             logger.debug("Prefixes to be inserted in this iteration:")
727         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
728                                                   prefix_count=prefix_count_to_add)
729         # getting lists of prefixes for withdrawal in this iteration
730         if self.log_debug:
731             logger.debug("Prefixes to be withdrawn in this iteration:")
732         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
733                                                   prefix_count=prefix_count_to_del)
734         # generating the UPDATE mesage with LS-NLRI only
735         if self.bgpls:
736             ls_nlri = self.get_ls_nlri_values(self.iteration)
737             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
738                                           **ls_nlri)
739         else:
740             # generating the UPDATE message with prefix lists
741             if self.single_update_default:
742                 # Send prefixes to be introduced and withdrawn
743                 # in one UPDATE message
744                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
745                                               nlri_prefixes=prefix_list_to_add)
746             else:
747                 # Send prefixes to be introduced and withdrawn
748                 # in separate UPDATE messages (if needed)
749                 msg_out = self.update_message(wr_prefixes=[],
750                                               nlri_prefixes=prefix_list_to_add)
751                 if prefix_count_to_del:
752                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
753                                                    nlri_prefixes=[])
754         # updating counters - who knows ... maybe I am last time here ;)
755         if straightforward_scenario:
756             self.phase1_stop_time = time.time()
757             self.phase1_updates_sent = self.updates_sent
758         else:
759             self.phase2_stop_time = time.time()
760             self.phase2_updates_sent = (self.updates_sent -
761                                         self.phase1_updates_sent)
762         # updating totals for the next iteration
763         self.iteration += 1
764         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
765         # returning the encoded message
766         return msg_out
767
768     # Section of message encoders
769
770     def open_message(self, version=None, my_autonomous_system=None,
771                      hold_time=None, bgp_identifier=None):
772         """Generates an OPEN Message (rfc4271#section-4.2)
773
774         Arguments:
775             :param version: see the rfc4271#section-4.2
776             :param my_autonomous_system: see the rfc4271#section-4.2
777             :param hold_time: see the rfc4271#section-4.2
778             :param bgp_identifier: see the rfc4271#section-4.2
779         Returns:
780             :return: encoded OPEN message in HEX
781         """
782
783         # default values handling
784         # TODO optimize default values handling (use e.g. dicionary.update() approach)
785         if version is None:
786             version = self.version_default
787         if my_autonomous_system is None:
788             my_autonomous_system = self.my_autonomous_system_default
789         if hold_time is None:
790             hold_time = self.hold_time_default
791         if bgp_identifier is None:
792             bgp_identifier = self.bgp_identifier_default
793
794         # Marker
795         marker_hex = "\xFF" * 16
796
797         # Type
798         type = 1
799         type_hex = struct.pack("B", type)
800
801         # version
802         version_hex = struct.pack("B", version)
803
804         # my_autonomous_system
805         # AS_TRANS value, 23456 decadic.
806         my_autonomous_system_2_bytes = 23456
807         # AS number is mappable to 2 bytes
808         if my_autonomous_system < 65536:
809             my_autonomous_system_2_bytes = my_autonomous_system
810         my_autonomous_system_hex_2_bytes = struct.pack(">H",
811                                                        my_autonomous_system)
812
813         # Hold Time
814         hold_time_hex = struct.pack(">H", hold_time)
815
816         # BGP Identifier
817         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
818
819         # Optional Parameters
820         optional_parameters_hex = ""
821         if self.rfc4760 or self.allf:
822             optional_parameter_hex = (
823                 "\x02"  # Param type ("Capability Ad")
824                 "\x06"  # Length (6 bytes)
825                 "\x01"  # Capability type (NLRI Unicast),
826                         # see RFC 4760, secton 8
827                 "\x04"  # Capability value length
828                 "\x00\x01"  # AFI (Ipv4)
829                 "\x00"  # (reserved)
830                 "\x01"  # SAFI (Unicast)
831             )
832             optional_parameters_hex += optional_parameter_hex
833
834         if self.ipv6 or self.allf:
835             optional_parameter_hex = (
836                 "\x02"  # Param type ("Capability Ad")
837                 "\x06"  # Length (6 bytes)
838                 "\x01"  # Multiprotocol extetension capability,
839                 "\x04"  # Capability value length
840                 "\x00\x02"  # AFI (IPV6)
841                 "\x00"  # (reserved)
842                 "\x01"  # SAFI (UNICAST)
843             )
844             optional_parameters_hex += optional_parameter_hex
845
846         if self.bgpls or self.allf:
847             optional_parameter_hex = (
848                 "\x02"  # Param type ("Capability Ad")
849                 "\x06"  # Length (6 bytes)
850                 "\x01"  # Capability type (NLRI Unicast),
851                         # see RFC 4760, secton 8
852                 "\x04"  # Capability value length
853                 "\x40\x04"  # AFI (BGP-LS)
854                 "\x00"  # (reserved)
855                 "\x47"  # SAFI (BGP-LS)
856             )
857             optional_parameters_hex += optional_parameter_hex
858
859         if self.evpn or self.allf:
860             optional_parameter_hex = (
861                 "\x02"  # Param type ("Capability Ad")
862                 "\x06"  # Length (6 bytes)
863                 "\x01"  # Multiprotocol extetension capability,
864                 "\x04"  # Capability value length
865                 "\x00\x19"  # AFI (L2-VPN)
866                 "\x00"  # (reserved)
867                 "\x46"  # SAFI (EVPN)
868             )
869             optional_parameters_hex += optional_parameter_hex
870
871         if self.mvpn or self.allf:
872             optional_parameter_hex = (
873                 "\x02"  # Param type ("Capability Ad")
874                 "\x06"  # Length (6 bytes)
875                 "\x01"  # Multiprotocol extetension capability,
876                 "\x04"  # Capability value length
877                 "\x00\x01"  # AFI (IPV4)
878                 "\x00"  # (reserved)
879                 "\x05"  # SAFI (MCAST-VPN)
880             )
881             optional_parameters_hex += optional_parameter_hex
882             optional_parameter_hex = (
883                 "\x02"  # Param type ("Capability Ad")
884                 "\x06"  # Length (6 bytes)
885                 "\x01"  # Multiprotocol extetension capability,
886                 "\x04"  # Capability value length
887                 "\x00\x02"  # AFI (IPV6)
888                 "\x00"  # (reserved)
889                 "\x05"  # SAFI (MCAST-VPN)
890             )
891             optional_parameters_hex += optional_parameter_hex
892
893         if self.l3vpn_mcast or self.allf:
894             optional_parameter_hex = (
895                 "\x02"  # Param type ("Capability Ad")
896                 "\x06"  # Length (6 bytes)
897                 "\x01"  # Multiprotocol extetension capability,
898                 "\x04"  # Capability value length
899                 "\x00\x01"  # AFI (IPV4)
900                 "\x00"  # (reserved)
901                 "\x81"  # SAFI (L3VPN-MCAST)
902             )
903             optional_parameters_hex += optional_parameter_hex
904             optional_parameter_hex = (
905                 "\x02"  # Param type ("Capability Ad")
906                 "\x06"  # Length (6 bytes)
907                 "\x01"  # Multiprotocol extetension capability,
908                 "\x04"  # Capability value length
909                 "\x00\x02"  # AFI (IPV6)
910                 "\x00"  # (reserved)
911                 "\x81"  # SAFI (L3VPN-MCAST)
912             )
913             optional_parameters_hex += optional_parameter_hex
914
915         if self.l3vpn or self.allf:
916             optional_parameter_hex = (
917                 "\x02"  # Param type ("Capability Ad")
918                 "\x06"  # Length (6 bytes)
919                 "\x01"  # Multiprotocol extetension capability,
920                 "\x04"  # Capability value length
921                 "\x00\x01"  # AFI (IPV4)
922                 "\x00"  # (reserved)
923                 "\x80"  # SAFI (L3VPN-UNICAST)
924             )
925             optional_parameters_hex += optional_parameter_hex
926             optional_parameter_hex = (
927                 "\x02"  # Param type ("Capability Ad")
928                 "\x06"  # Length (6 bytes)
929                 "\x01"  # Multiprotocol extetension capability,
930                 "\x04"  # Capability value length
931                 "\x00\x02"  # AFI (IPV6)
932                 "\x00"  # (reserved)
933                 "\x80"  # SAFI (L3VPN-UNICAST)
934             )
935             optional_parameters_hex += optional_parameter_hex
936
937         if self.rt_constrain or self.allf:
938             optional_parameter_hex = (
939                 "\x02"  # Param type ("Capability Ad")
940                 "\x06"  # Length (6 bytes)
941                 "\x01"  # Multiprotocol extetension capability,
942                 "\x04"  # Capability value length
943                 "\x00\x01"  # AFI (IPV4)
944                 "\x00"  # (reserved)
945                 "\x84"  # SAFI (ROUTE-TARGET-CONSTRAIN)
946             )
947             optional_parameters_hex += optional_parameter_hex
948
949         optional_parameter_hex = (
950             "\x02"  # Param type ("Capability Ad")
951             "\x06"  # Length (6 bytes)
952             "\x41"  # "32 bit AS Numbers Support"
953                     # (see RFC 6793, section 3)
954             "\x04"  # Capability value length
955         )
956         optional_parameter_hex += (
957             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
958         )
959         optional_parameters_hex += optional_parameter_hex
960
961         if self.grace != 8:
962             b = list(bin(self.grace)[2:])
963             b = b + [0] * (3 - len(b))
964             length = "\x08"
965             if b[1] == '1':
966                 restart_flag = "\x80\x05"
967             else:
968                 restart_flag = "\x00\x05"
969             if b[2] == '1':
970                 ipv4_flag = "\x80"
971             else:
972                 ipv4_flag = "\x00"
973             if b[0] == '1':
974                 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
975                 length = "\x11"
976             else:
977                 ll_gr = ""
978             logger.debug("Grace parameters list: {}".format(b))
979             # "\x02" Param type ("Capability Ad")
980             # :param length: Length of whole message
981             # "\x40" Graceful-restart capability
982             # "\x06" Length (6 bytes)
983             # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
984             # "\x05" Restart timer (5sec)
985             # "\x00\x01" AFI (IPV4)
986             # "\x01"  SAFI (Unicast)
987             # "\x00"  Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
988             # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
989             # ll-gr turned on when grace is between 4-7
990             optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
991                 length, restart_flag, ipv4_flag, ll_gr)
992             optional_parameters_hex += optional_parameter_hex
993
994         # Optional Parameters Length
995         optional_parameters_length = len(optional_parameters_hex)
996         optional_parameters_length_hex = struct.pack("B",
997                                                      optional_parameters_length)
998
999         # Length (big-endian)
1000         length = (
1001             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
1002             len(my_autonomous_system_hex_2_bytes) +
1003             len(hold_time_hex) + len(bgp_identifier_hex) +
1004             len(optional_parameters_length_hex) +
1005             len(optional_parameters_hex)
1006         )
1007         length_hex = struct.pack(">H", length)
1008
1009         # OPEN Message
1010         message_hex = (
1011             marker_hex +
1012             length_hex +
1013             type_hex +
1014             version_hex +
1015             my_autonomous_system_hex_2_bytes +
1016             hold_time_hex +
1017             bgp_identifier_hex +
1018             optional_parameters_length_hex +
1019             optional_parameters_hex
1020         )
1021
1022         if self.log_debug:
1023             logger.debug("OPEN message encoding")
1024             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1025             logger.debug("  Length=" + str(length) + " (0x" +
1026                          binascii.hexlify(length_hex) + ")")
1027             logger.debug("  Type=" + str(type) + " (0x" +
1028                          binascii.hexlify(type_hex) + ")")
1029             logger.debug("  Version=" + str(version) + " (0x" +
1030                          binascii.hexlify(version_hex) + ")")
1031             logger.debug("  My Autonomous System=" +
1032                          str(my_autonomous_system_2_bytes) + " (0x" +
1033                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
1034                          ")")
1035             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
1036                          binascii.hexlify(hold_time_hex) + ")")
1037             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
1038                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
1039             logger.debug("  Optional Parameters Length=" +
1040                          str(optional_parameters_length) + " (0x" +
1041                          binascii.hexlify(optional_parameters_length_hex) +
1042                          ")")
1043             logger.debug("  Optional Parameters=0x" +
1044                          binascii.hexlify(optional_parameters_hex))
1045             logger.debug("OPEN message encoded: 0x%s",
1046                          binascii.b2a_hex(message_hex))
1047
1048         return message_hex
1049
1050     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
1051                        wr_prefix_length=None, nlri_prefix_length=None,
1052                        my_autonomous_system=None, next_hop=None,
1053                        originator_id=None, cluster_list_item=None,
1054                        end_of_rib=False, **ls_nlri_params):
1055         """Generates an UPDATE Message (rfc4271#section-4.3)
1056
1057         Arguments:
1058             :param wr_prefixes: see the rfc4271#section-4.3
1059             :param nlri_prefixes: see the rfc4271#section-4.3
1060             :param wr_prefix_length: see the rfc4271#section-4.3
1061             :param nlri_prefix_length: see the rfc4271#section-4.3
1062             :param my_autonomous_system: see the rfc4271#section-4.3
1063             :param next_hop: see the rfc4271#section-4.3
1064         Returns:
1065             :return: encoded UPDATE message in HEX
1066         """
1067
1068         # default values handling
1069         # TODO optimize default values handling (use e.g. dicionary.update() approach)
1070         if wr_prefixes is None:
1071             wr_prefixes = self.wr_prefixes_default
1072         if nlri_prefixes is None:
1073             nlri_prefixes = self.nlri_prefixes_default
1074         if wr_prefix_length is None:
1075             wr_prefix_length = self.prefix_length_default
1076         if nlri_prefix_length is None:
1077             nlri_prefix_length = self.prefix_length_default
1078         if my_autonomous_system is None:
1079             my_autonomous_system = self.my_autonomous_system_default
1080         if next_hop is None:
1081             next_hop = self.next_hop_default
1082         if originator_id is None:
1083             originator_id = self.originator_id_default
1084         if cluster_list_item is None:
1085             cluster_list_item = self.cluster_list_item_default
1086         ls_nlri = self.ls_nlri_default.copy()
1087         ls_nlri.update(ls_nlri_params)
1088
1089         # Marker
1090         marker_hex = "\xFF" * 16
1091
1092         # Type
1093         type = 2
1094         type_hex = struct.pack("B", type)
1095
1096         # Withdrawn Routes
1097         withdrawn_routes_hex = ""
1098         if not self.bgpls:
1099             bytes = ((wr_prefix_length - 1) / 8) + 1
1100             for prefix in wr_prefixes:
1101                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1102                                        struct.pack(">I", int(prefix))[:bytes])
1103                 withdrawn_routes_hex += withdrawn_route_hex
1104
1105         # Withdrawn Routes Length
1106         withdrawn_routes_length = len(withdrawn_routes_hex)
1107         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1108
1109         # TODO: to replace hardcoded string by encoding?
1110         # Path Attributes
1111         path_attributes_hex = ""
1112         if not self.skipattr:
1113             path_attributes_hex += (
1114                 "\x40"  # Flags ("Well-Known")
1115                 "\x01"  # Type (ORIGIN)
1116                 "\x01"  # Length (1)
1117                 "\x00"  # Origin: IGP
1118             )
1119             path_attributes_hex += (
1120                 "\x40"  # Flags ("Well-Known")
1121                 "\x02"  # Type (AS_PATH)
1122                 "\x06"  # Length (6)
1123                 "\x02"  # AS segment type (AS_SEQUENCE)
1124                 "\x01"  # AS segment length (1)
1125             )
1126             my_as_hex = struct.pack(">I", my_autonomous_system)
1127             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1128             path_attributes_hex += (
1129                 "\x40"  # Flags ("Well-Known")
1130                 "\x05"  # Type (LOCAL_PREF)
1131                 "\x04"  # Length (4)
1132                 "\x00\x00\x00\x64"  # (100)
1133             )
1134         if nlri_prefixes != []:
1135             path_attributes_hex += (
1136                 "\x40"  # Flags ("Well-Known")
1137                 "\x03"  # Type (NEXT_HOP)
1138                 "\x04"  # Length (4)
1139             )
1140             next_hop_hex = struct.pack(">I", int(next_hop))
1141             path_attributes_hex += (
1142                 next_hop_hex  # IP address of the next hop (4 bytes)
1143             )
1144             if originator_id is not None:
1145                 path_attributes_hex += (
1146                     "\x80"  # Flags ("Optional, non-transitive")
1147                     "\x09"  # Type (ORIGINATOR_ID)
1148                     "\x04"  # Length (4)
1149                 )           # ORIGINATOR_ID (4 bytes)
1150                 path_attributes_hex += struct.pack(">I", int(originator_id))
1151             if cluster_list_item is not None:
1152                 path_attributes_hex += (
1153                     "\x80"  # Flags ("Optional, non-transitive")
1154                     "\x0a"  # Type (CLUSTER_LIST)
1155                     "\x04"  # Length (4)
1156                 )           # one CLUSTER_LIST item (4 bytes)
1157                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1158
1159         if self.bgpls and not end_of_rib:
1160             path_attributes_hex += (
1161                 "\x80"  # Flags ("Optional, non-transitive")
1162                 "\x0e"  # Type (MP_REACH_NLRI)
1163                 "\x22"  # Length (34)
1164                 "\x40\x04"  # AFI (BGP-LS)
1165                 "\x47"  # SAFI (BGP-LS)
1166                 "\x04"  # Next Hop Length (4)
1167             )
1168             path_attributes_hex += struct.pack(">I", int(next_hop))
1169             path_attributes_hex += "\x00"           # Reserved
1170             path_attributes_hex += (
1171                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1172                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1173                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1174             )
1175             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1176             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1177             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1178             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1179             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1180
1181         # Total Path Attributes Length
1182         total_path_attributes_length = len(path_attributes_hex)
1183         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1184
1185         # Network Layer Reachability Information
1186         nlri_hex = ""
1187         if not self.bgpls:
1188             bytes = ((nlri_prefix_length - 1) / 8) + 1
1189             for prefix in nlri_prefixes:
1190                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1191                                    struct.pack(">I", int(prefix))[:bytes])
1192                 nlri_hex += nlri_prefix_hex
1193
1194         # Length (big-endian)
1195         length = (
1196             len(marker_hex) + 2 + len(type_hex) +
1197             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1198             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1199             len(nlri_hex))
1200         length_hex = struct.pack(">H", length)
1201
1202         # UPDATE Message
1203         message_hex = (
1204             marker_hex +
1205             length_hex +
1206             type_hex +
1207             withdrawn_routes_length_hex +
1208             withdrawn_routes_hex +
1209             total_path_attributes_length_hex +
1210             path_attributes_hex +
1211             nlri_hex
1212         )
1213
1214         if self.grace != 8 and self.grace != 0 and end_of_rib:
1215             message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
1216
1217         if self.log_debug:
1218             logger.debug("UPDATE message encoding")
1219             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1220             logger.debug("  Length=" + str(length) + " (0x" +
1221                          binascii.hexlify(length_hex) + ")")
1222             logger.debug("  Type=" + str(type) + " (0x" +
1223                          binascii.hexlify(type_hex) + ")")
1224             logger.debug("  withdrawn_routes_length=" +
1225                          str(withdrawn_routes_length) + " (0x" +
1226                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1227             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1228                          str(wr_prefix_length) + " (0x" +
1229                          binascii.hexlify(withdrawn_routes_hex) + ")")
1230             if total_path_attributes_length:
1231                 logger.debug("  Total Path Attributes Length=" +
1232                              str(total_path_attributes_length) + " (0x" +
1233                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1234                 logger.debug("  Path Attributes=" + "(0x" +
1235                              binascii.hexlify(path_attributes_hex) + ")")
1236                 logger.debug("    Origin=IGP")
1237                 logger.debug("    AS path=" + str(my_autonomous_system))
1238                 logger.debug("    Next hop=" + str(next_hop))
1239                 if originator_id is not None:
1240                     logger.debug("    Originator id=" + str(originator_id))
1241                 if cluster_list_item is not None:
1242                     logger.debug("    Cluster list=" + str(cluster_list_item))
1243                 if self.bgpls:
1244                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1245             logger.debug("  Network Layer Reachability Information=" +
1246                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1247                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1248             logger.debug("UPDATE message encoded: 0x" +
1249                          binascii.b2a_hex(message_hex))
1250
1251         # updating counter
1252         self.updates_sent += 1
1253         # returning encoded message
1254         return message_hex
1255
1256     def notification_message(self, error_code, error_subcode, data_hex=""):
1257         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1258
1259         Arguments:
1260             :param error_code: see the rfc4271#section-4.5
1261             :param error_subcode: see the rfc4271#section-4.5
1262             :param data_hex: see the rfc4271#section-4.5
1263         Returns:
1264             :return: encoded NOTIFICATION message in HEX
1265         """
1266
1267         # Marker
1268         marker_hex = "\xFF" * 16
1269
1270         # Type
1271         type = 3
1272         type_hex = struct.pack("B", type)
1273
1274         # Error Code
1275         error_code_hex = struct.pack("B", error_code)
1276
1277         # Error Subode
1278         error_subcode_hex = struct.pack("B", error_subcode)
1279
1280         # Length (big-endian)
1281         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1282                   len(error_subcode_hex) + len(data_hex))
1283         length_hex = struct.pack(">H", length)
1284
1285         # NOTIFICATION Message
1286         message_hex = (
1287             marker_hex +
1288             length_hex +
1289             type_hex +
1290             error_code_hex +
1291             error_subcode_hex +
1292             data_hex
1293         )
1294
1295         if self.log_debug:
1296             logger.debug("NOTIFICATION message encoding")
1297             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1298             logger.debug("  Length=" + str(length) + " (0x" +
1299                          binascii.hexlify(length_hex) + ")")
1300             logger.debug("  Type=" + str(type) + " (0x" +
1301                          binascii.hexlify(type_hex) + ")")
1302             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1303                          binascii.hexlify(error_code_hex) + ")")
1304             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1305                          binascii.hexlify(error_subcode_hex) + ")")
1306             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1307             logger.debug("NOTIFICATION message encoded: 0x%s",
1308                          binascii.b2a_hex(message_hex))
1309
1310         return message_hex
1311
1312     def keepalive_message(self):
1313         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1314
1315         Returns:
1316             :return: encoded KEEP ALIVE message in HEX
1317         """
1318
1319         # Marker
1320         marker_hex = "\xFF" * 16
1321
1322         # Type
1323         type = 4
1324         type_hex = struct.pack("B", type)
1325
1326         # Length (big-endian)
1327         length = len(marker_hex) + 2 + len(type_hex)
1328         length_hex = struct.pack(">H", length)
1329
1330         # KEEP ALIVE Message
1331         message_hex = (
1332             marker_hex +
1333             length_hex +
1334             type_hex
1335         )
1336
1337         if self.log_debug:
1338             logger.debug("KEEP ALIVE message encoding")
1339             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1340             logger.debug("  Length=" + str(length) + " (0x" +
1341                          binascii.hexlify(length_hex) + ")")
1342             logger.debug("  Type=" + str(type) + " (0x" +
1343                          binascii.hexlify(type_hex) + ")")
1344             logger.debug("KEEP ALIVE message encoded: 0x%s",
1345                          binascii.b2a_hex(message_hex))
1346
1347         return message_hex
1348
1349
1350 class TimeTracker(object):
1351     """Class for tracking timers, both for my keepalives and
1352     peer's hold time.
1353     """
1354
1355     def __init__(self, msg_in):
1356         """Initialisation. based on defaults and OPEN message from peer.
1357
1358         Arguments:
1359             msg_in: the OPEN message received from peer.
1360         """
1361         # Note: Relative time is always named timedelta, to stress that
1362         # the (non-delta) time is absolute.
1363         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1364         # Upper bound for being stuck in the same state, we should
1365         # at least report something before continuing.
1366         # Negotiate the hold timer by taking the smaller
1367         # of the 2 values (mine and the peer's).
1368         hold_timedelta = 180  # Not an attribute of self yet.
1369         # TODO: Make the default value configurable,
1370         # default value could mirror what peer said.
1371         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1372         if hold_timedelta > peer_hold_timedelta:
1373             hold_timedelta = peer_hold_timedelta
1374         if hold_timedelta != 0 and hold_timedelta < 3:
1375             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1376             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1377         self.hold_timedelta = hold_timedelta
1378         # If we do not hear from peer this long, we assume it has died.
1379         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1380         # Upper limit for duration between messages, to avoid being
1381         # declared to be dead.
1382         # The same as calling snapshot(), but also declares a field.
1383         self.snapshot_time = time.time()
1384         # Sometimes we need to store time. This is where to get
1385         # the value from afterwards. Time_keepalive may be too strict.
1386         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1387         # At this time point, peer will be declared dead.
1388         self.my_keepalive_time = None  # to be set later
1389         # At this point, we should be sending keepalive message.
1390
1391     def snapshot(self):
1392         """Store current time in instance data to use later."""
1393         # Read as time before something interesting was called.
1394         self.snapshot_time = time.time()
1395
1396     def reset_peer_hold_time(self):
1397         """Move hold time to future as peer has just proven it still lives."""
1398         self.peer_hold_time = time.time() + self.hold_timedelta
1399
1400     # Some methods could rely on self.snapshot_time, but it is better
1401     # to require user to provide it explicitly.
1402     def reset_my_keepalive_time(self, keepalive_time):
1403         """Calculate and set the next my KEEP ALIVE timeout time
1404
1405         Arguments:
1406             :keepalive_time: the initial value of the KEEP ALIVE timer
1407         """
1408         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1409
1410     def is_time_for_my_keepalive(self):
1411         """Check for my KEEP ALIVE timeout occurence"""
1412         if self.hold_timedelta == 0:
1413             return False
1414         return self.snapshot_time >= self.my_keepalive_time
1415
1416     def get_next_event_time(self):
1417         """Set the time of the next expected or to be sent KEEP ALIVE"""
1418         if self.hold_timedelta == 0:
1419             return self.snapshot_time + 86400
1420         return min(self.my_keepalive_time, self.peer_hold_time)
1421
1422     def check_peer_hold_time(self, snapshot_time):
1423         """Raise error if nothing was read from peer until specified time."""
1424         # Hold time = 0 means keepalive checking off.
1425         if self.hold_timedelta != 0:
1426             # time.time() may be too strict
1427             if snapshot_time > self.peer_hold_time:
1428                 logger.error("Peer has overstepped the hold timer.")
1429                 raise RuntimeError("Peer has overstepped the hold timer.")
1430                 # TODO: Include hold_timedelta?
1431                 # TODO: Add notification sending (attempt). That means
1432                 # move to write tracker.
1433
1434
1435 class ReadTracker(object):
1436     """Class for tracking read of mesages chunk by chunk and
1437     for idle waiting.
1438     """
1439
1440     def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1441                  l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1442                  ipv6=False, grace=8, wait_for_read=10):
1443         """The reader initialisation.
1444
1445         Arguments:
1446             bgp_socket: socket to be used for sending
1447             timer: timer to be used for scheduling
1448             storage: thread safe dict
1449             evpn: flag that evpn functionality is tested
1450             mvpn: flag that mvpn functionality is tested
1451             grace: flag that grace-restart functionality is tested
1452             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1453             l3vpn: flag that l3vpn unicast functionality is tested
1454             rt_constrain: flag that rt-constrain functionality is tested
1455             allf: flag for all family testing.
1456         """
1457         # References to outside objects.
1458         self.socket = bgp_socket
1459         self.timer = timer
1460         # BGP marker length plus length field length.
1461         self.header_length = 18
1462         # TODO: make it class (constant) attribute
1463         # Computation of where next chunk ends depends on whether
1464         # we are beyond length field.
1465         self.reading_header = True
1466         # Countdown towards next size computation.
1467         self.bytes_to_read = self.header_length
1468         # Incremental buffer for message under read.
1469         self.msg_in = ""
1470         # Initialising counters
1471         self.updates_received = 0
1472         self.prefixes_introduced = 0
1473         self.prefixes_withdrawn = 0
1474         self.rx_idle_time = 0
1475         self.rx_activity_detected = True
1476         self.storage = storage
1477         self.evpn = evpn
1478         self.mvpn = mvpn
1479         self.l3vpn_mcast = l3vpn_mcast
1480         self.l3vpn = l3vpn
1481         self.rt_constrain = rt_constrain
1482         self.ipv6 = ipv6
1483         self.allf = allf
1484         self.wfr = wait_for_read
1485         self.grace = grace
1486
1487     def read_message_chunk(self):
1488         """Read up to one message
1489
1490         Note:
1491             Currently it does not return anything.
1492         """
1493         # TODO: We could return the whole message, currently not needed.
1494         # We assume the socket is readable.
1495         logger.info("Receiving %d bytes", self.bytes_to_read)
1496         chunk_message = self.socket.recv(self.bytes_to_read)
1497         msglen = len(chunk_message)
1498         logger.info("Received %d bytes", msglen)
1499         self.msg_in += chunk_message
1500         logger.info("Current message is %d bytes", len(self.msg_in))
1501         self.bytes_to_read -= msglen
1502         logger.info("Checking %d bytes to read (header=%s)",
1503                     self.bytes_to_read, self.reading_header)
1504         # TODO: bytes_to_read < 0 is not possible, right?
1505         if not self.bytes_to_read:
1506             # Finished reading a logical block.
1507             if self.reading_header:
1508                 # The logical block was a BGP header.
1509                 # Now we know the size of the message.
1510                 self.reading_header = False
1511                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1512                                       self.header_length)
1513             else:  # We have finished reading the body of the message.
1514                 # Peer has just proven it is still alive.
1515                 self.timer.reset_peer_hold_time()
1516                 # TODO: Do we want to count received messages?
1517                 # This version ignores the received message.
1518                 # TODO: Should we do validation and exit on anything
1519                 # besides update or keepalive?
1520                 # Prepare state for reading another message.
1521                 message_type_hex = self.msg_in[self.header_length]
1522                 if message_type_hex == "\x01":
1523                     logger.info("OPEN message received: 0x%s",
1524                                 binascii.b2a_hex(self.msg_in))
1525                 elif message_type_hex == "\x02":
1526                     logger.debug("UPDATE message received: 0x%s",
1527                                  binascii.b2a_hex(self.msg_in))
1528                     self.decode_update_message(self.msg_in)
1529                 elif message_type_hex == "\x03":
1530                     logger.info("NOTIFICATION message received: 0x%s",
1531                                 binascii.b2a_hex(self.msg_in))
1532                 elif message_type_hex == "\x04":
1533                     logger.info("KEEP ALIVE message received: 0x%s",
1534                                 binascii.b2a_hex(self.msg_in))
1535                 else:
1536                     logger.warning("Unexpected message received: 0x%s",
1537                                    binascii.b2a_hex(self.msg_in))
1538                 self.msg_in = ""
1539                 self.reading_header = True
1540                 self.bytes_to_read = self.header_length
1541         # We should not act upon peer_hold_time if we are reading
1542         # something right now.
1543         logger.info("Require %d bytes to read (header=%s)",
1544                     self.bytes_to_read, self.reading_header)
1545         return
1546
1547     def decode_path_attributes(self, path_attributes_hex):
1548         """Decode the Path Attributes field (rfc4271#section-4.3)
1549
1550         Arguments:
1551             :path_attributes: path_attributes field to be decoded in hex
1552         Returns:
1553             :return: None
1554         """
1555         hex_to_decode = path_attributes_hex
1556
1557         while len(hex_to_decode):
1558             attr_flags_hex = hex_to_decode[0]
1559             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1560 #            attr_optional_bit = attr_flags & 128
1561 #            attr_transitive_bit = attr_flags & 64
1562 #            attr_partial_bit = attr_flags & 32
1563             attr_extended_length_bit = attr_flags & 16
1564
1565             attr_type_code_hex = hex_to_decode[1]
1566             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1567
1568             if attr_extended_length_bit:
1569                 attr_length_hex = hex_to_decode[2:4]
1570                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1571                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1572                 hex_to_decode = hex_to_decode[4 + attr_length:]
1573             else:
1574                 attr_length_hex = hex_to_decode[2]
1575                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1576                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1577                 hex_to_decode = hex_to_decode[3 + attr_length:]
1578
1579             if attr_type_code == 1:
1580                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1581                              binascii.b2a_hex(attr_flags_hex))
1582                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1583             elif attr_type_code == 2:
1584                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1585                              binascii.b2a_hex(attr_flags_hex))
1586                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1587             elif attr_type_code == 3:
1588                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1589                              binascii.b2a_hex(attr_flags_hex))
1590                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1591             elif attr_type_code == 4:
1592                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1593                              binascii.b2a_hex(attr_flags_hex))
1594                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1595             elif attr_type_code == 5:
1596                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1597                              binascii.b2a_hex(attr_flags_hex))
1598                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1599             elif attr_type_code == 6:
1600                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1601                              binascii.b2a_hex(attr_flags_hex))
1602                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1603             elif attr_type_code == 7:
1604                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1605                              binascii.b2a_hex(attr_flags_hex))
1606                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1607             elif attr_type_code == 9:  # rfc4456#section-8
1608                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1609                              binascii.b2a_hex(attr_flags_hex))
1610                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1611             elif attr_type_code == 10:  # rfc4456#section-8
1612                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1613                              binascii.b2a_hex(attr_flags_hex))
1614                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1615             elif attr_type_code == 14:  # rfc4760#section-3
1616                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1617                              binascii.b2a_hex(attr_flags_hex))
1618                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1619                 address_family_identifier_hex = attr_value_hex[0:2]
1620                 logger.debug("  Address Family Identifier=0x%s",
1621                              binascii.b2a_hex(address_family_identifier_hex))
1622                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1623                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1624                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1625                 next_hop_netaddr_len_hex = attr_value_hex[3]
1626                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1627                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1628                              next_hop_netaddr_len,
1629                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1630                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1631                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1632                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1633                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1634                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1635                 logger.debug("  Reserved=0x%s",
1636                              binascii.b2a_hex(reserved_hex))
1637                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1638                 logger.debug("  Network Layer Reachability Information=0x%s",
1639                              binascii.b2a_hex(nlri_hex))
1640                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1641                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1642                 for prefix in nlri_prefix_list:
1643                     logger.debug("  nlri_prefix_received: %s", prefix)
1644                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1645             elif attr_type_code == 15:  # rfc4760#section-4
1646                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1647                              binascii.b2a_hex(attr_flags_hex))
1648                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1649                 address_family_identifier_hex = attr_value_hex[0:2]
1650                 logger.debug("  Address Family Identifier=0x%s",
1651                              binascii.b2a_hex(address_family_identifier_hex))
1652                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1653                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1654                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1655                 wd_hex = attr_value_hex[3:]
1656                 logger.debug("  Withdrawn Routes=0x%s",
1657                              binascii.b2a_hex(wd_hex))
1658                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1659                 logger.debug("  Withdrawn routes prefix list: %s",
1660                              wdr_prefix_list)
1661                 for prefix in wdr_prefix_list:
1662                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1663                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1664             else:
1665                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1666                              binascii.b2a_hex(attr_flags_hex))
1667                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1668         return None
1669
1670     def decode_update_message(self, msg):
1671         """Decode an UPDATE message (rfc4271#section-4.3)
1672
1673         Arguments:
1674             :msg: message to be decoded in hex
1675         Returns:
1676             :return: None
1677         """
1678         logger.debug("Decoding update message:")
1679         # message header - marker
1680         marker_hex = msg[:16]
1681         logger.debug("Message header marker: 0x%s",
1682                      binascii.b2a_hex(marker_hex))
1683         # message header - message length
1684         msg_length_hex = msg[16:18]
1685         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1686         logger.debug("Message lenght: 0x%s (%s)",
1687                      binascii.b2a_hex(msg_length_hex), msg_length)
1688         # message header - message type
1689         msg_type_hex = msg[18:19]
1690         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1691
1692         with self.storage as stor:
1693             # this will replace the previously stored message
1694             stor['update'] = binascii.hexlify(msg)
1695
1696         logger.debug("Evpn {}".format(self.evpn))
1697         if self.evpn:
1698             logger.debug("Skipping update decoding due to evpn data expected")
1699             return
1700
1701         logger.debug("Graceful-restart {}".format(self.grace))
1702         if self.grace != 8:
1703             logger.debug("Skipping update decoding due to graceful-restart data expected")
1704             return
1705
1706         logger.debug("Mvpn {}".format(self.mvpn))
1707         if self.mvpn:
1708             logger.debug("Skipping update decoding due to mvpn data expected")
1709             return
1710
1711         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1712         if self.l3vpn_mcast:
1713             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1714             return
1715
1716         logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1717         if self.l3vpn_mcast:
1718             logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1719             return
1720
1721         logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1722         if self.rt_constrain:
1723             logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1724             return
1725
1726         logger.debug("Ipv6-Unicast {}".format(self.ipv6))
1727         if self.ipv6:
1728             logger.debug("Skipping update decoding due to Ipv6 data expected")
1729             return
1730
1731         logger.debug("Allf {}".format(self.allf))
1732         if self.allf:
1733             logger.debug("Skipping update decoding")
1734             return
1735
1736         if msg_type == 2:
1737             logger.debug("Message type: 0x%s (update)",
1738                          binascii.b2a_hex(msg_type_hex))
1739             # withdrawn routes length
1740             wdr_length_hex = msg[19:21]
1741             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1742             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1743                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1744             # withdrawn routes
1745             wdr_hex = msg[21:21 + wdr_length]
1746             logger.debug("Withdrawn routes: 0x%s",
1747                          binascii.b2a_hex(wdr_hex))
1748             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1749             logger.debug("Withdrawn routes prefix list: %s",
1750                          wdr_prefix_list)
1751             for prefix in wdr_prefix_list:
1752                 logger.debug("withdrawn_prefix_received: %s", prefix)
1753             # total path attribute length
1754             total_pa_length_offset = 21 + wdr_length
1755             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1756             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1757             logger.debug("Total path attribute lenght: 0x%s (%s)",
1758                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1759             # path attributes
1760             pa_offset = total_pa_length_offset + 2
1761             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1762             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1763             self.decode_path_attributes(pa_hex)
1764             # network layer reachability information length
1765             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1766             logger.debug("Calculated NLRI length: %s", nlri_length)
1767             # network layer reachability information
1768             nlri_offset = pa_offset + total_pa_length
1769             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1770             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1771             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1772             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1773             for prefix in nlri_prefix_list:
1774                 logger.debug("nlri_prefix_received: %s", prefix)
1775             # Updating counters
1776             self.updates_received += 1
1777             self.prefixes_introduced += len(nlri_prefix_list)
1778             self.prefixes_withdrawn += len(wdr_prefix_list)
1779         else:
1780             logger.error("Unexpeced message type 0x%s in 0x%s",
1781                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1782
1783     def wait_for_read(self):
1784         """Read message until timeout (next expected event).
1785
1786         Note:
1787             Used when no more updates has to be sent to avoid busy-wait.
1788             Currently it does not return anything.
1789         """
1790         # Compute time to the first predictable state change
1791         event_time = self.timer.get_next_event_time()
1792         # snapshot_time would be imprecise
1793         wait_timedelta = min(event_time - time.time(), self.wfr)
1794         if wait_timedelta < 0:
1795             # The program got around to waiting to an event in "very near
1796             # future" so late that it became a "past" event, thus tell
1797             # "select" to not wait at all. Passing negative timedelta to
1798             # select() would lead to either waiting forever (for -1) or
1799             # select.error("Invalid parameter") (for everything else).
1800             wait_timedelta = 0
1801         # And wait for event or something to read.
1802
1803         if not self.rx_activity_detected or not (self.updates_received % 100):
1804             # right time to write statistics to the log (not for every update and
1805             # not too frequently to avoid having large log files)
1806             logger.info("total_received_update_message_counter: %s",
1807                         self.updates_received)
1808             logger.info("total_received_nlri_prefix_counter: %s",
1809                         self.prefixes_introduced)
1810             logger.info("total_received_withdrawn_prefix_counter: %s",
1811                         self.prefixes_withdrawn)
1812
1813         start_time = time.time()
1814         select.select([self.socket], [], [self.socket], wait_timedelta)
1815         timedelta = time.time() - start_time
1816         self.rx_idle_time += timedelta
1817         self.rx_activity_detected = timedelta < 1
1818
1819         if not self.rx_activity_detected or not (self.updates_received % 100):
1820             # right time to write statistics to the log (not for every update and
1821             # not too frequently to avoid having large log files)
1822             logger.info("... idle for %.3fs", timedelta)
1823             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1824         return
1825
1826
1827 class WriteTracker(object):
1828     """Class tracking enqueueing messages and sending chunks of them."""
1829
1830     def __init__(self, bgp_socket, generator, timer):
1831         """The writter initialisation.
1832
1833         Arguments:
1834             bgp_socket: socket to be used for sending
1835             generator: generator to be used for message generation
1836             timer: timer to be used for scheduling
1837         """
1838         # References to outside objects,
1839         self.socket = bgp_socket
1840         self.generator = generator
1841         self.timer = timer
1842         # Really new fields.
1843         # TODO: Would attribute docstrings add anything substantial?
1844         self.sending_message = False
1845         self.bytes_to_send = 0
1846         self.msg_out = ""
1847
1848     def enqueue_message_for_sending(self, message):
1849         """Enqueue message and change state.
1850
1851         Arguments:
1852             message: message to be enqueued into the msg_out buffer
1853         """
1854         self.msg_out += message
1855         self.bytes_to_send += len(message)
1856         self.sending_message = True
1857
1858     def send_message_chunk_is_whole(self):
1859         """Send enqueued data from msg_out buffer
1860
1861         Returns:
1862             :return: true if no remaining data to send
1863         """
1864         # We assume there is a msg_out to send and socket is writable.
1865         # print "going to send", repr(self.msg_out)
1866         self.timer.snapshot()
1867         bytes_sent = self.socket.send(self.msg_out)
1868         # Forget the part of message that was sent.
1869         self.msg_out = self.msg_out[bytes_sent:]
1870         self.bytes_to_send -= bytes_sent
1871         if not self.bytes_to_send:
1872             # TODO: Is it possible to hit negative bytes_to_send?
1873             self.sending_message = False
1874             # We should have reset hold timer on peer side.
1875             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1876             # The possible reason for not prioritizing reads is gone.
1877             return True
1878         return False
1879
1880
1881 class StateTracker(object):
1882     """Main loop has state so complex it warrants this separate class."""
1883
1884     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1885         """The state tracker initialisation.
1886
1887         Arguments:
1888             bgp_socket: socket to be used for sending / receiving
1889             generator: generator to be used for message generation
1890             timer: timer to be used for scheduling
1891             inqueue: user initiated messages queue
1892             storage: thread safe dict to store data for the rpc server
1893             cliargs: cli args from the user
1894         """
1895         # References to outside objects.
1896         self.socket = bgp_socket
1897         self.generator = generator
1898         self.timer = timer
1899         # Sub-trackers.
1900         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1901                                   l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1902                                   rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
1903                                   wait_for_read=cliargs.wfr)
1904         self.writer = WriteTracker(bgp_socket, generator, timer)
1905         # Prioritization state.
1906         self.prioritize_writing = False
1907         # In general, we prioritize reading over writing. But in order
1908         # not to get blocked by neverending reads, we should
1909         # check whether we are not risking running out of holdtime.
1910         # So in some situations, this field is set to True to attempt
1911         # finishing sending a message, after which this field resets
1912         # back to False.
1913         # TODO: Alternative is to switch fairly between reading and
1914         # writing (called round robin from now on).
1915         # Message counting is done in generator.
1916         self.inqueue = inqueue
1917
1918     def perform_one_loop_iteration(self):
1919         """ The main loop iteration
1920
1921         Notes:
1922             Calculates priority, resolves all conditions, calls
1923             appropriate method and returns to caller to repeat.
1924         """
1925         self.timer.snapshot()
1926         if not self.prioritize_writing:
1927             if self.timer.is_time_for_my_keepalive():
1928                 if not self.writer.sending_message:
1929                     # We need to schedule a keepalive ASAP.
1930                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1931                     logger.info("KEEP ALIVE is sent.")
1932                 # We are sending a message now, so let's prioritize it.
1933                 self.prioritize_writing = True
1934
1935         try:
1936             msg = self.inqueue.get_nowait()
1937             logger.info("Received message: {}".format(msg))
1938             msgbin = binascii.unhexlify(msg)
1939             self.writer.enqueue_message_for_sending(msgbin)
1940         except Queue.Empty:
1941             pass
1942         # Now we know what our priorities are, we have to check
1943         # which actions are available.
1944         # socket.socket() returns three lists,
1945         # we store them to list of lists.
1946         list_list = select.select([self.socket], [self.socket], [self.socket],
1947                                   self.timer.report_timedelta)
1948         read_list, write_list, except_list = list_list
1949         # Lists are unpacked, each is either [] or [self.socket],
1950         # so we will test them as boolean.
1951         if except_list:
1952             logger.error("Exceptional state on the socket.")
1953             raise RuntimeError("Exceptional state on socket", self.socket)
1954         # We will do either read or write.
1955         if not (self.prioritize_writing and write_list):
1956             # Either we have no reason to rush writes,
1957             # or the socket is not writable.
1958             # We are focusing on reading here.
1959             if read_list:  # there is something to read indeed
1960                 # In this case we want to read chunk of message
1961                 # and repeat the select,
1962                 self.reader.read_message_chunk()
1963                 return
1964             # We were focusing on reading, but nothing to read was there.
1965             # Good time to check peer for hold timer.
1966             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1967             # Quiet on the read front, we can have attempt to write.
1968         if write_list:
1969             # Either we really want to reset peer's view of our hold
1970             # timer, or there was nothing to read.
1971             # Were we in the middle of sending a message?
1972             if self.writer.sending_message:
1973                 # Was it the end of a message?
1974                 whole = self.writer.send_message_chunk_is_whole()
1975                 # We were pressed to send something and we did it.
1976                 if self.prioritize_writing and whole:
1977                     # We prioritize reading again.
1978                     self.prioritize_writing = False
1979                 return
1980             # Finally to check if still update messages to be generated.
1981             if self.generator.remaining_prefixes:
1982                 msg_out = self.generator.compose_update_message()
1983                 if not self.generator.remaining_prefixes:
1984                     # We have just finished update generation,
1985                     # end-of-rib is due.
1986                     logger.info("All update messages generated.")
1987                     logger.info("Storing performance results.")
1988                     self.generator.store_results()
1989                     logger.info("Finally an END-OF-RIB is sent.")
1990                     msg_out += self.generator.update_message(wr_prefixes=[],
1991                                                              nlri_prefixes=[],
1992                                                              end_of_rib=True)
1993                 self.writer.enqueue_message_for_sending(msg_out)
1994                 # Attempt for real sending to be done in next iteration.
1995                 return
1996             # Nothing to write anymore.
1997             # To avoid busy loop, we do idle waiting here.
1998             self.reader.wait_for_read()
1999             return
2000         # We can neither read nor write.
2001         logger.warning("Input and output both blocked for " +
2002                        str(self.timer.report_timedelta) + " seconds.")
2003         # FIXME: Are we sure select has been really waiting
2004         # the whole period?
2005         return
2006
2007
2008 def create_logger(loglevel, logfile):
2009     """Create logger object
2010
2011     Arguments:
2012         :loglevel: log level
2013         :logfile: log file name
2014     Returns:
2015         :return: logger object
2016     """
2017     logger = logging.getLogger("logger")
2018     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
2019     console_handler = logging.StreamHandler()
2020     file_handler = logging.FileHandler(logfile, mode="w")
2021     console_handler.setFormatter(log_formatter)
2022     file_handler.setFormatter(log_formatter)
2023     logger.addHandler(console_handler)
2024     logger.addHandler(file_handler)
2025     logger.setLevel(loglevel)
2026     return logger
2027
2028
2029 def job(arguments, inqueue, storage):
2030     """One time initialisation and iterations looping.
2031     Notes:
2032         Establish BGP connection and run iterations.
2033
2034     Arguments:
2035         :arguments: Command line arguments
2036         :inqueue: Data to be sent from play.py
2037         :storage: Shared dict for rpc server
2038     Returns:
2039         :return: None
2040     """
2041     bgp_socket = establish_connection(arguments)
2042     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2043     # Receive open message before sending anything.
2044     # FIXME: Add parameter to send default open message first,
2045     # to work with "you first" peers.
2046     msg_in = read_open_message(bgp_socket)
2047     logger.info(binascii.hexlify(msg_in))
2048     storage['open'] = binascii.hexlify(msg_in)
2049     timer = TimeTracker(msg_in)
2050     generator = MessageGenerator(arguments)
2051     msg_out = generator.open_message()
2052     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2053     # Send our open message to the peer.
2054     bgp_socket.send(msg_out)
2055     # Wait for confirming keepalive.
2056     # TODO: Surely in just one packet?
2057     # Using exact keepalive length to not to see possible updates.
2058     msg_in = bgp_socket.recv(19)
2059     if msg_in != generator.keepalive_message():
2060         error_msg = "Open not confirmed by keepalive, instead got"
2061         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2062         raise MessageError(error_msg, msg_in)
2063     timer.reset_peer_hold_time()
2064     # Send the keepalive to indicate the connection is accepted.
2065     timer.snapshot()  # Remember this time.
2066     msg_out = generator.keepalive_message()
2067     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2068     bgp_socket.send(msg_out)
2069     # Use the remembered time.
2070     timer.reset_my_keepalive_time(timer.snapshot_time)
2071     # End of initial handshake phase.
2072     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2073     while True:  # main reactor loop
2074         state.perform_one_loop_iteration()
2075
2076
2077 class Rpcs:
2078     '''Handler for SimpleXMLRPCServer'''
2079
2080     def __init__(self, sendqueue, storage):
2081         '''Init method
2082
2083         Arguments:
2084             :sendqueue: queue for data to be sent towards odl
2085             :storage: thread safe dict
2086         '''
2087         self.queue = sendqueue
2088         self.storage = storage
2089
2090     def send(self, text):
2091         '''Data to be sent
2092
2093         Arguments:
2094             :text: hes string of the data to be sent
2095         '''
2096         self.queue.put(text)
2097
2098     def get(self, text=''):
2099         '''Reads data form the storage
2100
2101         - returns stored data or an empty string, at the moment only
2102           'update' is stored
2103
2104         Arguments:
2105             :text: a key to the storage to get the data
2106         Returns:
2107             :data: stored data
2108         '''
2109         with self.storage as stor:
2110             return stor.get(text, '')
2111
2112     def clean(self, text=''):
2113         '''Cleans data form the storage
2114
2115         Arguments:
2116             :text: a key to the storage to clean the data
2117         '''
2118         with self.storage as stor:
2119             if text in stor:
2120                 del stor[text]
2121
2122
2123 def threaded_job(arguments):
2124     """Run the job threaded
2125
2126     Arguments:
2127         :arguments: Command line arguments
2128     Returns:
2129         :return: None
2130     """
2131     amount_left = arguments.amount
2132     utils_left = arguments.multiplicity
2133     prefix_current = arguments.firstprefix
2134     myip_current = arguments.myip
2135     port = arguments.port
2136     thread_args = []
2137     rpcqueue = Queue.Queue()
2138     storage = SafeDict()
2139
2140     while 1:
2141         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
2142         amount_left -= amount_per_util
2143         utils_left -= 1
2144
2145         args = deepcopy(arguments)
2146         args.amount = amount_per_util
2147         args.firstprefix = prefix_current
2148         args.myip = myip_current
2149         thread_args.append(args)
2150
2151         if not utils_left:
2152             break
2153         prefix_current += amount_per_util * 16
2154         myip_current += 1
2155
2156     try:
2157         # Create threads
2158         for t in thread_args:
2159             thread.start_new_thread(job, (t, rpcqueue, storage))
2160     except Exception:
2161         print "Error: unable to start thread."
2162         raise SystemExit(2)
2163
2164     if arguments.usepeerip:
2165         ip = arguments.peerip
2166     else:
2167         ip = arguments.myip
2168     rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2169     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2170     rpcserver.serve_forever()
2171
2172
2173 if __name__ == "__main__":
2174     arguments = parse_arguments()
2175     logger = create_logger(arguments.loglevel, arguments.logfile)
2176     threaded_job(arguments)