Mvpn functional tests
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     '''Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     '''
41
42     def __init__(self, * p_arg, ** n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Maximum number of IP prefixes to be announced in one iteration"
69     parser.add_argument("--insert", default="1", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72     str_help = "The number of prefixes to process without withdrawals"
73     parser.add_argument("--prefill", default="0", type=int, help=str_help)
74     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75     parser.add_argument("--updates", choices=["single", "separate"],
76                         default=["separate"], help=str_help)
77     str_help = "Base prefix IP address for prefix generation"
78     parser.add_argument("--firstprefix", default="8.0.1.0",
79                         type=ipaddr.IPv4Address, help=str_help)
80     str_help = "The prefix length."
81     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82     str_help = "Listen for connection, instead of initiating it."
83     parser.add_argument("--listen", action="store_true", help=str_help)
84     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85                 "Default value only suitable for listening.")
86     parser.add_argument("--myip", default="0.0.0.0",
87                         type=ipaddr.IPv4Address, help=str_help)
88     str_help = ("TCP port to bind to when listening or initiating connection." +
89                 "Default only suitable for initiating.")
90     parser.add_argument("--myport", default="0", type=int, help=str_help)
91     str_help = "The IP of the next hop to be placed into the update messages."
92     parser.add_argument("--nexthop", default="192.0.2.1",
93                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94     str_help = "Identifier of the route originator."
95     parser.add_argument("--originator", default=None,
96                         type=ipaddr.IPv4Address, dest="originator", help=str_help)
97     str_help = "Cluster list item identifier."
98     parser.add_argument("--cluster", default=None,
99                         type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100     str_help = ("Numeric IP Address to try to connect to." +
101                 "Currently no effect in listening mode.")
102     parser.add_argument("--peerip", default="127.0.0.2",
103                         type=ipaddr.IPv4Address, help=str_help)
104     str_help = "TCP port to try to connect to. No effect in listening mode."
105     parser.add_argument("--peerport", default="179", type=int, help=str_help)
106     str_help = "Local hold time."
107     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108     str_help = "Log level (--error, --warning, --info, --debug)"
109     parser.add_argument("--error", dest="loglevel", action="store_const",
110                         const=logging.ERROR, default=logging.INFO,
111                         help=str_help)
112     parser.add_argument("--warning", dest="loglevel", action="store_const",
113                         const=logging.WARNING, default=logging.INFO,
114                         help=str_help)
115     parser.add_argument("--info", dest="loglevel", action="store_const",
116                         const=logging.INFO, default=logging.INFO,
117                         help=str_help)
118     parser.add_argument("--debug", dest="loglevel", action="store_const",
119                         const=logging.DEBUG, default=logging.INFO,
120                         help=str_help)
121     str_help = "Log file name"
122     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123     str_help = "Trailing part of the csv result files for plotting purposes"
124     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125     str_help = "Minimum number of updates to reach to include result into csv."
126     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129     str_help = "Link-State NLRI supported"
130     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131     str_help = "Link-State NLRI: Identifier"
132     parser.add_argument("-lsid", default="1", type=int, help=str_help)
133     str_help = "Link-State NLRI: Tunnel ID"
134     parser.add_argument("-lstid", default="1", type=int, help=str_help)
135     str_help = "Link-State NLRI: LSP ID"
136     parser.add_argument("-lspid", default="1", type=int, help=str_help)
137     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138     parser.add_argument("--lstsaddr", default="1.2.3.4",
139                         type=ipaddr.IPv4Address, help=str_help)
140     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141     parser.add_argument("--lsteaddr", default="5.6.7.8",
142                         type=ipaddr.IPv4Address, help=str_help)
143     str_help = "Link-State NLRI: Identifier Step"
144     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145     str_help = "Link-State NLRI: Tunnel ID Step"
146     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147     str_help = "Link-State NLRI: LSP ID Step"
148     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153     str_help = "How many play utilities are to be started."
154     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
160     Enabling this flag makes the script not decoding the update mesage, because of not\
161     supported decoding for these elements."
162     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
163     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
164     str_help = "Skipping well known attributes for update message"
165     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
166     arguments = parser.parse_args()
167     if arguments.multiplicity < 1:
168         print "Multiplicity", arguments.multiplicity, "is not positive."
169         raise SystemExit(1)
170     # TODO: Are sanity checks (such as asnumber>=0) required?
171     return arguments
172
173
174 def establish_connection(arguments):
175     """Establish connection to BGP peer.
176
177     Arguments:
178         :arguments: following command-line argumets are used
179             - arguments.myip: local IP address
180             - arguments.myport: local port
181             - arguments.peerip: remote IP address
182             - arguments.peerport: remote port
183     Returns:
184         :return: socket.
185     """
186     if arguments.listen:
187         logger.info("Connecting in the listening mode.")
188         logger.debug("Local IP address: " + str(arguments.myip))
189         logger.debug("Local port: " + str(arguments.myport))
190         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
191         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
192         # bind need single tuple as argument
193         listening_socket.bind((str(arguments.myip), arguments.myport))
194         listening_socket.listen(1)
195         bgp_socket, _ = listening_socket.accept()
196         # TODO: Verify client IP is cotroller IP.
197         listening_socket.close()
198     else:
199         logger.info("Connecting in the talking mode.")
200         logger.debug("Local IP address: " + str(arguments.myip))
201         logger.debug("Local port: " + str(arguments.myport))
202         logger.debug("Remote IP address: " + str(arguments.peerip))
203         logger.debug("Remote port: " + str(arguments.peerport))
204         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
205         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
206         # bind to force specified address and port
207         talking_socket.bind((str(arguments.myip), arguments.myport))
208         # socket does not spead ipaddr, hence str()
209         talking_socket.connect((str(arguments.peerip), arguments.peerport))
210         bgp_socket = talking_socket
211     logger.info("Connected to ODL.")
212     return bgp_socket
213
214
215 def get_short_int_from_message(message, offset=16):
216     """Extract 2-bytes number from provided message.
217
218     Arguments:
219         :message: given message
220         :offset: offset of the short_int inside the message
221     Returns:
222         :return: required short_inf value.
223     Notes:
224         default offset value is the BGP message size offset.
225     """
226     high_byte_int = ord(message[offset])
227     low_byte_int = ord(message[offset + 1])
228     short_int = high_byte_int * 256 + low_byte_int
229     return short_int
230
231
232 def get_prefix_list_from_hex(prefixes_hex):
233     """Get decoded list of prefixes (rfc4271#section-4.3)
234
235     Arguments:
236         :prefixes_hex: list of prefixes to be decoded in hex
237     Returns:
238         :return: list of prefixes in the form of ip address (X.X.X.X/X)
239     """
240     prefix_list = []
241     offset = 0
242     while offset < len(prefixes_hex):
243         prefix_bit_len_hex = prefixes_hex[offset]
244         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
245         prefix_len = ((prefix_bit_len - 1) / 8) + 1
246         prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
247         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
248         offset += 1 + prefix_len
249         prefix_list.append(prefix + "/" + str(prefix_bit_len))
250     return prefix_list
251
252
253 class MessageError(ValueError):
254     """Value error with logging optimized for hexlified messages."""
255
256     def __init__(self, text, message, *args):
257         """Initialisation.
258
259         Store and call super init for textual comment,
260         store raw message which caused it.
261         """
262         self.text = text
263         self.msg = message
264         super(MessageError, self).__init__(text, message, *args)
265
266     def __str__(self):
267         """Generate human readable error message.
268
269         Returns:
270             :return: human readable message as string
271         Notes:
272             Use a placeholder string if the message is to be empty.
273         """
274         message = binascii.hexlify(self.msg)
275         if message == "":
276             message = "(empty message)"
277         return self.text + ": " + message
278
279
280 def read_open_message(bgp_socket):
281     """Receive peer's OPEN message
282
283     Arguments:
284         :bgp_socket: the socket to be read
285     Returns:
286         :return: received OPEN message.
287     Notes:
288         Performs just basic incomming message checks
289     """
290     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
291     # TODO: Can the incoming open message be split in more than one packet?
292     # Some validation.
293     if len(msg_in) < 37:
294         # 37 is minimal length of open message with 4-byte AS number.
295         error_msg = (
296             "Message length (" + str(len(msg_in)) + ") is smaller than "
297             "minimal length of OPEN message with 4-byte AS number (37)"
298         )
299         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
300         raise MessageError(error_msg, msg_in)
301     # TODO: We could check BGP marker, but it is defined only later;
302     # decide what to do.
303     reported_length = get_short_int_from_message(msg_in)
304     if len(msg_in) != reported_length:
305         error_msg = (
306             "Expected message length (" + reported_length +
307             ") does not match actual length (" + str(len(msg_in)) + ")"
308         )
309         logger.error(error_msg + binascii.hexlify(msg_in))
310         raise MessageError(error_msg, msg_in)
311     logger.info("Open message received.")
312     return msg_in
313
314
315 class MessageGenerator(object):
316     """Class which generates messages, holds states and configuration values."""
317
318     # TODO: Define bgp marker as a class (constant) variable.
319     def __init__(self, args):
320         """Initialisation according to command-line args.
321
322         Arguments:
323             :args: argsparser's Namespace object which contains command-line
324                 options for MesageGenerator initialisation
325         Notes:
326             Calculates and stores default values used later on for
327             message geeration.
328         """
329         self.total_prefix_amount = args.amount
330         # Number of update messages left to be sent.
331         self.remaining_prefixes = self.total_prefix_amount
332
333         # New parameters initialisation
334         self.iteration = 0
335         self.prefix_base_default = args.firstprefix
336         self.prefix_length_default = args.prefixlen
337         self.wr_prefixes_default = []
338         self.nlri_prefixes_default = []
339         self.version_default = 4
340         self.my_autonomous_system_default = args.asnumber
341         self.hold_time_default = args.holdtime  # Local hold time.
342         self.bgp_identifier_default = int(args.myip)
343         self.next_hop_default = args.nexthop
344         self.originator_id_default = args.originator
345         self.cluster_list_item_default = args.cluster
346         self.single_update_default = args.updates == "single"
347         self.randomize_updates_default = args.updates == "random"
348         self.prefix_count_to_add_default = args.insert
349         self.prefix_count_to_del_default = args.withdraw
350         if self.prefix_count_to_del_default < 0:
351             self.prefix_count_to_del_default = 0
352         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
353             # total number of prefixes must grow to avoid infinite test loop
354             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
355         self.slot_size_default = self.prefix_count_to_add_default
356         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
357         self.results_file_name_default = args.results
358         self.performance_threshold_default = args.threshold
359         self.rfc4760 = args.rfc4760
360         self.bgpls = args.bgpls
361         self.evpn = args.evpn
362         self.mvpn = args.mvpn
363         self.skipattr = args.skipattr
364         # Default values when BGP-LS Attributes are used
365         if self.bgpls:
366             self.prefix_count_to_add_default = 1
367             self.prefix_count_to_del_default = 0
368         self.ls_nlri_default = {"Identifier": args.lsid,
369                                 "TunnelID": args.lstid,
370                                 "LSPID": args.lspid,
371                                 "IPv4TunnelSenderAddress": args.lstsaddr,
372                                 "IPv4TunnelEndPointAddress": args.lsteaddr}
373         self.lsid_step = args.lsidstep
374         self.lstid_step = args.lstidstep
375         self.lspid_step = args.lspidstep
376         self.lstsaddr_step = args.lstsaddrstep
377         self.lsteaddr_step = args.lsteaddrstep
378         # Default values used for randomized part
379         s1_slots = ((self.total_prefix_amount -
380                      self.remaining_prefixes_threshold - 1) /
381                     self.prefix_count_to_add_default + 1)
382         s2_slots = ((self.remaining_prefixes_threshold - 1) /
383                     (self.prefix_count_to_add_default -
384                      self.prefix_count_to_del_default) + 1)
385         # S1_First_Index = 0
386         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
387         s2_first_index = s1_slots * self.prefix_count_to_add_default
388         s2_last_index = (s2_first_index +
389                          s2_slots * (self.prefix_count_to_add_default -
390                                      self.prefix_count_to_del_default) - 1)
391         self.slot_gap_default = ((self.total_prefix_amount -
392                                   self.remaining_prefixes_threshold - 1) /
393                                  self.prefix_count_to_add_default + 1)
394         self.randomize_lowest_default = s2_first_index
395         self.randomize_highest_default = s2_last_index
396         # Initialising counters
397         self.phase1_start_time = 0
398         self.phase1_stop_time = 0
399         self.phase2_start_time = 0
400         self.phase2_stop_time = 0
401         self.phase1_updates_sent = 0
402         self.phase2_updates_sent = 0
403         self.updates_sent = 0
404
405         self.log_info = args.loglevel <= logging.INFO
406         self.log_debug = args.loglevel <= logging.DEBUG
407         """
408         Flags needed for the MessageGenerator performance optimization.
409         Calling logger methods each iteration even with proper log level set
410         slows down significantly the MessageGenerator performance.
411         Measured total generation time (1M updates, dry run, error log level):
412         - logging based on basic logger features: 36,2s
413         - logging based on advanced logger features (lazy logging): 21,2s
414         - conditional calling of logger methods enclosed inside condition: 8,6s
415         """
416
417         logger.info("Generator initialisation")
418         logger.info("  Target total number of prefixes to be introduced: " +
419                     str(self.total_prefix_amount))
420         logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
421                     str(self.prefix_length_default))
422         logger.info("  My Autonomous System number: " +
423                     str(self.my_autonomous_system_default))
424         logger.info("  My Hold Time: " + str(self.hold_time_default))
425         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
426         logger.info("  Next Hop: " + str(self.next_hop_default))
427         logger.info("  Originator ID: " + str(self.originator_id_default))
428         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
429         logger.info("  Prefix count to be inserted at once: " +
430                     str(self.prefix_count_to_add_default))
431         logger.info("  Prefix count to be withdrawn at once: " +
432                     str(self.prefix_count_to_del_default))
433         logger.info("  Fast pre-fill up to " +
434                     str(self.total_prefix_amount -
435                         self.remaining_prefixes_threshold) + " prefixes")
436         logger.info("  Remaining number of prefixes to be processed " +
437                     "in parallel with withdrawals: " +
438                     str(self.remaining_prefixes_threshold))
439         logger.debug("  Prefix index range used after pre-fill procedure [" +
440                      str(self.randomize_lowest_default) + ", " +
441                      str(self.randomize_highest_default) + "]")
442         if self.single_update_default:
443             logger.info("  Common single UPDATE will be generated " +
444                         "for both NLRI & WITHDRAWN lists")
445         else:
446             logger.info("  Two separate UPDATEs will be generated " +
447                         "for each NLRI & WITHDRAWN lists")
448         if self.randomize_updates_default:
449             logger.info("  Generation of UPDATE messages will be randomized")
450         logger.info("  Let\'s go ...\n")
451
452         # TODO: Notification for hold timer expiration can be handy.
453
454     def store_results(self, file_name=None, threshold=None):
455         """ Stores specified results into files based on file_name value.
456
457         Arguments:
458             :param file_name: Trailing (common) part of result file names
459             :param threshold: Minimum number of sent updates needed for each
460                               result to be included into result csv file
461                               (mainly needed because of the result accuracy)
462         Returns:
463             :return: n/a
464         """
465         # default values handling
466         # TODO optimize default values handling (use e.g. dicionary.update() approach)
467         if file_name is None:
468             file_name = self.results_file_name_default
469         if threshold is None:
470             threshold = self.performance_threshold_default
471         # performance calculation
472         if self.phase1_updates_sent >= threshold:
473             totals1 = self.phase1_updates_sent
474             performance1 = int(self.phase1_updates_sent /
475                                (self.phase1_stop_time - self.phase1_start_time))
476         else:
477             totals1 = None
478             performance1 = None
479         if self.phase2_updates_sent >= threshold:
480             totals2 = self.phase2_updates_sent
481             performance2 = int(self.phase2_updates_sent /
482                                (self.phase2_stop_time - self.phase2_start_time))
483         else:
484             totals2 = None
485             performance2 = None
486
487         logger.info("#" * 10 + " Final results " + "#" * 10)
488         logger.info("Number of iterations: " + str(self.iteration))
489         logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
490                     str(self.phase1_updates_sent))
491         logger.info("The pre-fill phase duration: " +
492                     str(self.phase1_stop_time - self.phase1_start_time) + "s")
493         logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
494                     str(self.phase2_updates_sent))
495         logger.info("The 2nd test phase duration: " +
496                     str(self.phase2_stop_time - self.phase2_start_time) + "s")
497         logger.info("Threshold for performance reporting: " + str(threshold))
498
499         # making labels
500         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
501                         " route(s) per UPDATE")
502         if self.single_update_default:
503             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
504                                   "/-" + str(self.prefix_count_to_del_default) +
505                                   " routes per UPDATE")
506         else:
507             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
508                                   "/-" + str(self.prefix_count_to_del_default) +
509                                   " routes in two UPDATEs")
510         # collecting capacity and performance results
511         totals = {}
512         performance = {}
513         if totals1 is not None:
514             totals[phase1_label] = totals1
515             performance[phase1_label] = performance1
516         if totals2 is not None:
517             totals[phase2_label] = totals2
518             performance[phase2_label] = performance2
519         self.write_results_to_file(totals, "totals-" + file_name)
520         self.write_results_to_file(performance, "performance-" + file_name)
521
522     def write_results_to_file(self, results, file_name):
523         """Writes results to the csv plot file consumable by Jenkins.
524
525         Arguments:
526             :param file_name: Name of the (csv) file to be created
527         Returns:
528             :return: none
529         """
530         first_line = ""
531         second_line = ""
532         f = open(file_name, "wt")
533         try:
534             for key in sorted(results):
535                 first_line += key + ", "
536                 second_line += str(results[key]) + ", "
537             first_line = first_line[:-2]
538             second_line = second_line[:-2]
539             f.write(first_line + "\n")
540             f.write(second_line + "\n")
541             logger.info("Message generator performance results stored in " +
542                         file_name + ":")
543             logger.info("  " + first_line)
544             logger.info("  " + second_line)
545         finally:
546             f.close()
547
548     # Return pseudo-randomized (reproducible) index for selected range
549     def randomize_index(self, index, lowest=None, highest=None):
550         """Calculates pseudo-randomized index from selected range.
551
552         Arguments:
553             :param index: input index
554             :param lowest: the lowes index from the randomized area
555             :param highest: the highest index from the randomized area
556         Returns:
557             :return: the (pseudo)randomized index
558         Notes:
559             Created just as a fame for future generator enhancement.
560         """
561         # default values handling
562         # TODO optimize default values handling (use e.g. dicionary.update() approach)
563         if lowest is None:
564             lowest = self.randomize_lowest_default
565         if highest is None:
566             highest = self.randomize_highest_default
567         # randomize
568         if (index >= lowest) and (index <= highest):
569             # we are in the randomized range -> shuffle it inside
570             # the range (now just reverse the order)
571             new_index = highest - (index - lowest)
572         else:
573             # we are out of the randomized range -> nothing to do
574             new_index = index
575         return new_index
576
577     def get_ls_nlri_values(self, index):
578         """Generates LS-NLRI parameters.
579         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
580
581         Arguments:
582             :param index: index (iteration)
583         Returns:
584             :return: dictionary of LS NLRI parameters and values
585         """
586         # generating list of LS NLRI parameters
587         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
588         ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
589         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
590         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
591         ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
592         ls_nlri_values = {"Identifier": identifier,
593                           "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
594                           "TunnelID": tunnel_id, "LSPID": lsp_id,
595                           "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
596         return ls_nlri_values
597
598     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
599                         prefix_len=None, prefix_count=None, randomize=None):
600         """Generates list of IP address prefixes.
601
602         Arguments:
603             :param slot_index: index of group of prefix addresses
604             :param slot_size: size of group of prefix addresses
605                 in [number of included prefixes]
606             :param prefix_base: IP address of the first prefix
607                 (slot_index = 0, prefix_index = 0)
608             :param prefix_len: length of the prefix in bites
609                 (the same as size of netmask)
610             :param prefix_count: number of prefixes to be returned
611                 from the specified slot
612         Returns:
613             :return: list of generated IP address prefixes
614         """
615         # default values handling
616         # TODO optimize default values handling (use e.g. dicionary.update() approach)
617         if slot_size is None:
618             slot_size = self.slot_size_default
619         if prefix_base is None:
620             prefix_base = self.prefix_base_default
621         if prefix_len is None:
622             prefix_len = self.prefix_length_default
623         if prefix_count is None:
624             prefix_count = slot_size
625         if randomize is None:
626             randomize = self.randomize_updates_default
627         # generating list of prefixes
628         indexes = []
629         prefixes = []
630         prefix_gap = 2 ** (32 - prefix_len)
631         for i in range(prefix_count):
632             prefix_index = slot_index * slot_size + i
633             if randomize:
634                 prefix_index = self.randomize_index(prefix_index)
635             indexes.append(prefix_index)
636             prefixes.append(prefix_base + prefix_index * prefix_gap)
637         if self.log_debug:
638             logger.debug("  Prefix slot index: " + str(slot_index))
639             logger.debug("  Prefix slot size: " + str(slot_size))
640             logger.debug("  Prefix count: " + str(prefix_count))
641             logger.debug("  Prefix indexes: " + str(indexes))
642             logger.debug("  Prefix list: " + str(prefixes))
643         return prefixes
644
645     def compose_update_message(self, prefix_count_to_add=None,
646                                prefix_count_to_del=None):
647         """Composes an UPDATE message
648
649         Arguments:
650             :param prefix_count_to_add: # of prefixes to put into NLRI list
651             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
652         Returns:
653             :return: encoded UPDATE message in HEX
654         Notes:
655             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
656             lists or common message wich includes both prefix lists.
657             Updates global counters.
658         """
659         # default values handling
660         # TODO optimize default values handling (use e.g. dicionary.update() approach)
661         if prefix_count_to_add is None:
662             prefix_count_to_add = self.prefix_count_to_add_default
663         if prefix_count_to_del is None:
664             prefix_count_to_del = self.prefix_count_to_del_default
665         # logging
666         if self.log_info and not (self.iteration % 1000):
667             logger.info("Iteration: " + str(self.iteration) +
668                         " - total remaining prefixes: " +
669                         str(self.remaining_prefixes))
670         if self.log_debug:
671             logger.debug("#" * 10 + " Iteration: " +
672                          str(self.iteration) + " " + "#" * 10)
673             logger.debug("Remaining prefixes: " +
674                          str(self.remaining_prefixes))
675         # scenario type & one-shot counter
676         straightforward_scenario = (self.remaining_prefixes >
677                                     self.remaining_prefixes_threshold)
678         if straightforward_scenario:
679             prefix_count_to_del = 0
680             if self.log_debug:
681                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
682             if not self.phase1_start_time:
683                 self.phase1_start_time = time.time()
684         else:
685             if self.log_debug:
686                 logger.debug("--- COMBINED SCENARIO ---")
687             if not self.phase2_start_time:
688                 self.phase2_start_time = time.time()
689         # tailor the number of prefixes if needed
690         prefix_count_to_add = (prefix_count_to_del +
691                                min(prefix_count_to_add - prefix_count_to_del,
692                                    self.remaining_prefixes))
693         # prefix slots selection for insertion and withdrawal
694         slot_index_to_add = self.iteration
695         slot_index_to_del = slot_index_to_add - self.slot_gap_default
696         # getting lists of prefixes for insertion in this iteration
697         if self.log_debug:
698             logger.debug("Prefixes to be inserted in this iteration:")
699         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
700                                                   prefix_count=prefix_count_to_add)
701         # getting lists of prefixes for withdrawal in this iteration
702         if self.log_debug:
703             logger.debug("Prefixes to be withdrawn in this iteration:")
704         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
705                                                   prefix_count=prefix_count_to_del)
706         # generating the UPDATE mesage with LS-NLRI only
707         if self.bgpls:
708             ls_nlri = self.get_ls_nlri_values(self.iteration)
709             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
710                                           **ls_nlri)
711         else:
712             # generating the UPDATE message with prefix lists
713             if self.single_update_default:
714                 # Send prefixes to be introduced and withdrawn
715                 # in one UPDATE message
716                 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
717                                               nlri_prefixes=prefix_list_to_add)
718             else:
719                 # Send prefixes to be introduced and withdrawn
720                 # in separate UPDATE messages (if needed)
721                 msg_out = self.update_message(wr_prefixes=[],
722                                               nlri_prefixes=prefix_list_to_add)
723                 if prefix_count_to_del:
724                     msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
725                                                    nlri_prefixes=[])
726         # updating counters - who knows ... maybe I am last time here ;)
727         if straightforward_scenario:
728             self.phase1_stop_time = time.time()
729             self.phase1_updates_sent = self.updates_sent
730         else:
731             self.phase2_stop_time = time.time()
732             self.phase2_updates_sent = (self.updates_sent -
733                                         self.phase1_updates_sent)
734         # updating totals for the next iteration
735         self.iteration += 1
736         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
737         # returning the encoded message
738         return msg_out
739
740     # Section of message encoders
741
742     def open_message(self, version=None, my_autonomous_system=None,
743                      hold_time=None, bgp_identifier=None):
744         """Generates an OPEN Message (rfc4271#section-4.2)
745
746         Arguments:
747             :param version: see the rfc4271#section-4.2
748             :param my_autonomous_system: see the rfc4271#section-4.2
749             :param hold_time: see the rfc4271#section-4.2
750             :param bgp_identifier: see the rfc4271#section-4.2
751         Returns:
752             :return: encoded OPEN message in HEX
753         """
754
755         # default values handling
756         # TODO optimize default values handling (use e.g. dicionary.update() approach)
757         if version is None:
758             version = self.version_default
759         if my_autonomous_system is None:
760             my_autonomous_system = self.my_autonomous_system_default
761         if hold_time is None:
762             hold_time = self.hold_time_default
763         if bgp_identifier is None:
764             bgp_identifier = self.bgp_identifier_default
765
766         # Marker
767         marker_hex = "\xFF" * 16
768
769         # Type
770         type = 1
771         type_hex = struct.pack("B", type)
772
773         # version
774         version_hex = struct.pack("B", version)
775
776         # my_autonomous_system
777         # AS_TRANS value, 23456 decadic.
778         my_autonomous_system_2_bytes = 23456
779         # AS number is mappable to 2 bytes
780         if my_autonomous_system < 65536:
781             my_autonomous_system_2_bytes = my_autonomous_system
782         my_autonomous_system_hex_2_bytes = struct.pack(">H",
783                                                        my_autonomous_system)
784
785         # Hold Time
786         hold_time_hex = struct.pack(">H", hold_time)
787
788         # BGP Identifier
789         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
790
791         # Optional Parameters
792         optional_parameters_hex = ""
793         if self.rfc4760:
794             optional_parameter_hex = (
795                 "\x02"  # Param type ("Capability Ad")
796                 "\x06"  # Length (6 bytes)
797                 "\x01"  # Capability type (NLRI Unicast),
798                         # see RFC 4760, secton 8
799                 "\x04"  # Capability value length
800                 "\x00\x01"  # AFI (Ipv4)
801                 "\x00"  # (reserved)
802                 "\x01"  # SAFI (Unicast)
803             )
804             optional_parameters_hex += optional_parameter_hex
805
806         if self.bgpls:
807             optional_parameter_hex = (
808                 "\x02"  # Param type ("Capability Ad")
809                 "\x06"  # Length (6 bytes)
810                 "\x01"  # Capability type (NLRI Unicast),
811                         # see RFC 4760, secton 8
812                 "\x04"  # Capability value length
813                 "\x40\x04"  # AFI (BGP-LS)
814                 "\x00"  # (reserved)
815                 "\x47"  # SAFI (BGP-LS)
816             )
817             optional_parameters_hex += optional_parameter_hex
818
819         if self.evpn:
820             optional_parameter_hex = (
821                 "\x02"  # Param type ("Capability Ad")
822                 "\x06"  # Length (6 bytes)
823                 "\x01"  # Multiprotocol extetension capability,
824                 "\x04"  # Capability value length
825                 "\x00\x19"  # AFI (L2-VPN)
826                 "\x00"  # (reserved)
827                 "\x46"  # SAFI (EVPN)
828             )
829             optional_parameters_hex += optional_parameter_hex
830
831         if self.mvpn:
832             optional_parameter_hex = (
833                 "\x02"  # Param type ("Capability Ad")
834                 "\x06"  # Length (6 bytes)
835                 "\x01"  # Multiprotocol extetension capability,
836                 "\x04"  # Capability value length
837                 "\x00\x01"  # AFI (IPV4)
838                 "\x00"  # (reserved)
839                 "\x05"  # SAFI (MCAST-VPN)
840             )
841             optional_parameters_hex += optional_parameter_hex
842             optional_parameter_hex = (
843                 "\x02"  # Param type ("Capability Ad")
844                 "\x06"  # Length (6 bytes)
845                 "\x01"  # Multiprotocol extetension capability,
846                 "\x04"  # Capability value length
847                 "\x00\x02"  # AFI (IPV6)
848                 "\x00"  # (reserved)
849                 "\x05"  # SAFI (MCAST-VPN)
850             )
851             optional_parameters_hex += optional_parameter_hex
852
853         optional_parameter_hex = (
854             "\x02"  # Param type ("Capability Ad")
855             "\x06"  # Length (6 bytes)
856             "\x41"  # "32 bit AS Numbers Support"
857                     # (see RFC 6793, section 3)
858             "\x04"  # Capability value length
859         )
860         optional_parameter_hex += (
861             struct.pack(">I", my_autonomous_system)  # My AS in 32 bit format
862         )
863         optional_parameters_hex += optional_parameter_hex
864
865         # Optional Parameters Length
866         optional_parameters_length = len(optional_parameters_hex)
867         optional_parameters_length_hex = struct.pack("B",
868                                                      optional_parameters_length)
869
870         # Length (big-endian)
871         length = (
872             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
873             len(my_autonomous_system_hex_2_bytes) +
874             len(hold_time_hex) + len(bgp_identifier_hex) +
875             len(optional_parameters_length_hex) +
876             len(optional_parameters_hex)
877         )
878         length_hex = struct.pack(">H", length)
879
880         # OPEN Message
881         message_hex = (
882             marker_hex +
883             length_hex +
884             type_hex +
885             version_hex +
886             my_autonomous_system_hex_2_bytes +
887             hold_time_hex +
888             bgp_identifier_hex +
889             optional_parameters_length_hex +
890             optional_parameters_hex
891         )
892
893         if self.log_debug:
894             logger.debug("OPEN message encoding")
895             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
896             logger.debug("  Length=" + str(length) + " (0x" +
897                          binascii.hexlify(length_hex) + ")")
898             logger.debug("  Type=" + str(type) + " (0x" +
899                          binascii.hexlify(type_hex) + ")")
900             logger.debug("  Version=" + str(version) + " (0x" +
901                          binascii.hexlify(version_hex) + ")")
902             logger.debug("  My Autonomous System=" +
903                          str(my_autonomous_system_2_bytes) + " (0x" +
904                          binascii.hexlify(my_autonomous_system_hex_2_bytes) +
905                          ")")
906             logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
907                          binascii.hexlify(hold_time_hex) + ")")
908             logger.debug("  BGP Identifier=" + str(bgp_identifier) +
909                          " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
910             logger.debug("  Optional Parameters Length=" +
911                          str(optional_parameters_length) + " (0x" +
912                          binascii.hexlify(optional_parameters_length_hex) +
913                          ")")
914             logger.debug("  Optional Parameters=0x" +
915                          binascii.hexlify(optional_parameters_hex))
916             logger.debug("OPEN message encoded: 0x%s",
917                          binascii.b2a_hex(message_hex))
918
919         return message_hex
920
921     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
922                        wr_prefix_length=None, nlri_prefix_length=None,
923                        my_autonomous_system=None, next_hop=None,
924                        originator_id=None, cluster_list_item=None,
925                        end_of_rib=False, **ls_nlri_params):
926         """Generates an UPDATE Message (rfc4271#section-4.3)
927
928         Arguments:
929             :param wr_prefixes: see the rfc4271#section-4.3
930             :param nlri_prefixes: see the rfc4271#section-4.3
931             :param wr_prefix_length: see the rfc4271#section-4.3
932             :param nlri_prefix_length: see the rfc4271#section-4.3
933             :param my_autonomous_system: see the rfc4271#section-4.3
934             :param next_hop: see the rfc4271#section-4.3
935         Returns:
936             :return: encoded UPDATE message in HEX
937         """
938
939         # default values handling
940         # TODO optimize default values handling (use e.g. dicionary.update() approach)
941         if wr_prefixes is None:
942             wr_prefixes = self.wr_prefixes_default
943         if nlri_prefixes is None:
944             nlri_prefixes = self.nlri_prefixes_default
945         if wr_prefix_length is None:
946             wr_prefix_length = self.prefix_length_default
947         if nlri_prefix_length is None:
948             nlri_prefix_length = self.prefix_length_default
949         if my_autonomous_system is None:
950             my_autonomous_system = self.my_autonomous_system_default
951         if next_hop is None:
952             next_hop = self.next_hop_default
953         if originator_id is None:
954             originator_id = self.originator_id_default
955         if cluster_list_item is None:
956             cluster_list_item = self.cluster_list_item_default
957         ls_nlri = self.ls_nlri_default.copy()
958         ls_nlri.update(ls_nlri_params)
959
960         # Marker
961         marker_hex = "\xFF" * 16
962
963         # Type
964         type = 2
965         type_hex = struct.pack("B", type)
966
967         # Withdrawn Routes
968         withdrawn_routes_hex = ""
969         if not self.bgpls:
970             bytes = ((wr_prefix_length - 1) / 8) + 1
971             for prefix in wr_prefixes:
972                 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
973                                        struct.pack(">I", int(prefix))[:bytes])
974                 withdrawn_routes_hex += withdrawn_route_hex
975
976         # Withdrawn Routes Length
977         withdrawn_routes_length = len(withdrawn_routes_hex)
978         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
979
980         # TODO: to replace hardcoded string by encoding?
981         # Path Attributes
982         path_attributes_hex = ""
983         if not self.skipattr:
984             path_attributes_hex += (
985                 "\x40"  # Flags ("Well-Known")
986                 "\x01"  # Type (ORIGIN)
987                 "\x01"  # Length (1)
988                 "\x00"  # Origin: IGP
989             )
990             path_attributes_hex += (
991                 "\x40"  # Flags ("Well-Known")
992                 "\x02"  # Type (AS_PATH)
993                 "\x06"  # Length (6)
994                 "\x02"  # AS segment type (AS_SEQUENCE)
995                 "\x01"  # AS segment length (1)
996             )
997             my_as_hex = struct.pack(">I", my_autonomous_system)
998             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
999             path_attributes_hex += (
1000                 "\x40"  # Flags ("Well-Known")
1001                 "\x05"  # Type (LOCAL_PREF)
1002                 "\x04"  # Length (4)
1003                 "\x00\x00\x00\x64"  # (100)
1004             )
1005         if nlri_prefixes != []:
1006             path_attributes_hex += (
1007                 "\x40"  # Flags ("Well-Known")
1008                 "\x03"  # Type (NEXT_HOP)
1009                 "\x04"  # Length (4)
1010             )
1011             next_hop_hex = struct.pack(">I", int(next_hop))
1012             path_attributes_hex += (
1013                 next_hop_hex  # IP address of the next hop (4 bytes)
1014             )
1015             if originator_id is not None:
1016                 path_attributes_hex += (
1017                     "\x80"  # Flags ("Optional, non-transitive")
1018                     "\x09"  # Type (ORIGINATOR_ID)
1019                     "\x04"  # Length (4)
1020                 )           # ORIGINATOR_ID (4 bytes)
1021                 path_attributes_hex += struct.pack(">I", int(originator_id))
1022             if cluster_list_item is not None:
1023                 path_attributes_hex += (
1024                     "\x80"  # Flags ("Optional, non-transitive")
1025                     "\x0a"  # Type (CLUSTER_LIST)
1026                     "\x04"  # Length (4)
1027                 )           # one CLUSTER_LIST item (4 bytes)
1028                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1029
1030         if self.bgpls and not end_of_rib:
1031             path_attributes_hex += (
1032                 "\x80"  # Flags ("Optional, non-transitive")
1033                 "\x0e"  # Type (MP_REACH_NLRI)
1034                 "\x22"  # Length (34)
1035                 "\x40\x04"  # AFI (BGP-LS)
1036                 "\x47"  # SAFI (BGP-LS)
1037                 "\x04"  # Next Hop Length (4)
1038             )
1039             path_attributes_hex += struct.pack(">I", int(next_hop))
1040             path_attributes_hex += "\x00"           # Reserved
1041             path_attributes_hex += (
1042                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1043                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1044                 "\x07"      # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1045             )
1046             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1047             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1048             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1049             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1050             path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1051
1052         # Total Path Attributes Length
1053         total_path_attributes_length = len(path_attributes_hex)
1054         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1055
1056         # Network Layer Reachability Information
1057         nlri_hex = ""
1058         if not self.bgpls:
1059             bytes = ((nlri_prefix_length - 1) / 8) + 1
1060             for prefix in nlri_prefixes:
1061                 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1062                                    struct.pack(">I", int(prefix))[:bytes])
1063                 nlri_hex += nlri_prefix_hex
1064
1065         # Length (big-endian)
1066         length = (
1067             len(marker_hex) + 2 + len(type_hex) +
1068             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1069             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1070             len(nlri_hex))
1071         length_hex = struct.pack(">H", length)
1072
1073         # UPDATE Message
1074         message_hex = (
1075             marker_hex +
1076             length_hex +
1077             type_hex +
1078             withdrawn_routes_length_hex +
1079             withdrawn_routes_hex +
1080             total_path_attributes_length_hex +
1081             path_attributes_hex +
1082             nlri_hex
1083         )
1084
1085         if self.log_debug:
1086             logger.debug("UPDATE message encoding")
1087             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1088             logger.debug("  Length=" + str(length) + " (0x" +
1089                          binascii.hexlify(length_hex) + ")")
1090             logger.debug("  Type=" + str(type) + " (0x" +
1091                          binascii.hexlify(type_hex) + ")")
1092             logger.debug("  withdrawn_routes_length=" +
1093                          str(withdrawn_routes_length) + " (0x" +
1094                          binascii.hexlify(withdrawn_routes_length_hex) + ")")
1095             logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1096                          str(wr_prefix_length) + " (0x" +
1097                          binascii.hexlify(withdrawn_routes_hex) + ")")
1098             if total_path_attributes_length:
1099                 logger.debug("  Total Path Attributes Length=" +
1100                              str(total_path_attributes_length) + " (0x" +
1101                              binascii.hexlify(total_path_attributes_length_hex) + ")")
1102                 logger.debug("  Path Attributes=" + "(0x" +
1103                              binascii.hexlify(path_attributes_hex) + ")")
1104                 logger.debug("    Origin=IGP")
1105                 logger.debug("    AS path=" + str(my_autonomous_system))
1106                 logger.debug("    Next hop=" + str(next_hop))
1107                 if originator_id is not None:
1108                     logger.debug("    Originator id=" + str(originator_id))
1109                 if cluster_list_item is not None:
1110                     logger.debug("    Cluster list=" + str(cluster_list_item))
1111                 if self.bgpls:
1112                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1113             logger.debug("  Network Layer Reachability Information=" +
1114                          str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1115                          " (0x" + binascii.hexlify(nlri_hex) + ")")
1116             logger.debug("UPDATE message encoded: 0x" +
1117                          binascii.b2a_hex(message_hex))
1118
1119         # updating counter
1120         self.updates_sent += 1
1121         # returning encoded message
1122         return message_hex
1123
1124     def notification_message(self, error_code, error_subcode, data_hex=""):
1125         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1126
1127         Arguments:
1128             :param error_code: see the rfc4271#section-4.5
1129             :param error_subcode: see the rfc4271#section-4.5
1130             :param data_hex: see the rfc4271#section-4.5
1131         Returns:
1132             :return: encoded NOTIFICATION message in HEX
1133         """
1134
1135         # Marker
1136         marker_hex = "\xFF" * 16
1137
1138         # Type
1139         type = 3
1140         type_hex = struct.pack("B", type)
1141
1142         # Error Code
1143         error_code_hex = struct.pack("B", error_code)
1144
1145         # Error Subode
1146         error_subcode_hex = struct.pack("B", error_subcode)
1147
1148         # Length (big-endian)
1149         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1150                   len(error_subcode_hex) + len(data_hex))
1151         length_hex = struct.pack(">H", length)
1152
1153         # NOTIFICATION Message
1154         message_hex = (
1155             marker_hex +
1156             length_hex +
1157             type_hex +
1158             error_code_hex +
1159             error_subcode_hex +
1160             data_hex
1161         )
1162
1163         if self.log_debug:
1164             logger.debug("NOTIFICATION message encoding")
1165             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1166             logger.debug("  Length=" + str(length) + " (0x" +
1167                          binascii.hexlify(length_hex) + ")")
1168             logger.debug("  Type=" + str(type) + " (0x" +
1169                          binascii.hexlify(type_hex) + ")")
1170             logger.debug("  Error Code=" + str(error_code) + " (0x" +
1171                          binascii.hexlify(error_code_hex) + ")")
1172             logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
1173                          binascii.hexlify(error_subcode_hex) + ")")
1174             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1175             logger.debug("NOTIFICATION message encoded: 0x%s",
1176                          binascii.b2a_hex(message_hex))
1177
1178         return message_hex
1179
1180     def keepalive_message(self):
1181         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1182
1183         Returns:
1184             :return: encoded KEEP ALIVE message in HEX
1185         """
1186
1187         # Marker
1188         marker_hex = "\xFF" * 16
1189
1190         # Type
1191         type = 4
1192         type_hex = struct.pack("B", type)
1193
1194         # Length (big-endian)
1195         length = len(marker_hex) + 2 + len(type_hex)
1196         length_hex = struct.pack(">H", length)
1197
1198         # KEEP ALIVE Message
1199         message_hex = (
1200             marker_hex +
1201             length_hex +
1202             type_hex
1203         )
1204
1205         if self.log_debug:
1206             logger.debug("KEEP ALIVE message encoding")
1207             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1208             logger.debug("  Length=" + str(length) + " (0x" +
1209                          binascii.hexlify(length_hex) + ")")
1210             logger.debug("  Type=" + str(type) + " (0x" +
1211                          binascii.hexlify(type_hex) + ")")
1212             logger.debug("KEEP ALIVE message encoded: 0x%s",
1213                          binascii.b2a_hex(message_hex))
1214
1215         return message_hex
1216
1217
1218 class TimeTracker(object):
1219     """Class for tracking timers, both for my keepalives and
1220     peer's hold time.
1221     """
1222
1223     def __init__(self, msg_in):
1224         """Initialisation. based on defaults and OPEN message from peer.
1225
1226         Arguments:
1227             msg_in: the OPEN message received from peer.
1228         """
1229         # Note: Relative time is always named timedelta, to stress that
1230         # the (non-delta) time is absolute.
1231         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1232         # Upper bound for being stuck in the same state, we should
1233         # at least report something before continuing.
1234         # Negotiate the hold timer by taking the smaller
1235         # of the 2 values (mine and the peer's).
1236         hold_timedelta = 180  # Not an attribute of self yet.
1237         # TODO: Make the default value configurable,
1238         # default value could mirror what peer said.
1239         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1240         if hold_timedelta > peer_hold_timedelta:
1241             hold_timedelta = peer_hold_timedelta
1242         if hold_timedelta != 0 and hold_timedelta < 3:
1243             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1244             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1245         self.hold_timedelta = hold_timedelta
1246         # If we do not hear from peer this long, we assume it has died.
1247         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1248         # Upper limit for duration between messages, to avoid being
1249         # declared to be dead.
1250         # The same as calling snapshot(), but also declares a field.
1251         self.snapshot_time = time.time()
1252         # Sometimes we need to store time. This is where to get
1253         # the value from afterwards. Time_keepalive may be too strict.
1254         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1255         # At this time point, peer will be declared dead.
1256         self.my_keepalive_time = None  # to be set later
1257         # At this point, we should be sending keepalive message.
1258
1259     def snapshot(self):
1260         """Store current time in instance data to use later."""
1261         # Read as time before something interesting was called.
1262         self.snapshot_time = time.time()
1263
1264     def reset_peer_hold_time(self):
1265         """Move hold time to future as peer has just proven it still lives."""
1266         self.peer_hold_time = time.time() + self.hold_timedelta
1267
1268     # Some methods could rely on self.snapshot_time, but it is better
1269     # to require user to provide it explicitly.
1270     def reset_my_keepalive_time(self, keepalive_time):
1271         """Calculate and set the next my KEEP ALIVE timeout time
1272
1273         Arguments:
1274             :keepalive_time: the initial value of the KEEP ALIVE timer
1275         """
1276         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1277
1278     def is_time_for_my_keepalive(self):
1279         """Check for my KEEP ALIVE timeout occurence"""
1280         if self.hold_timedelta == 0:
1281             return False
1282         return self.snapshot_time >= self.my_keepalive_time
1283
1284     def get_next_event_time(self):
1285         """Set the time of the next expected or to be sent KEEP ALIVE"""
1286         if self.hold_timedelta == 0:
1287             return self.snapshot_time + 86400
1288         return min(self.my_keepalive_time, self.peer_hold_time)
1289
1290     def check_peer_hold_time(self, snapshot_time):
1291         """Raise error if nothing was read from peer until specified time."""
1292         # Hold time = 0 means keepalive checking off.
1293         if self.hold_timedelta != 0:
1294             # time.time() may be too strict
1295             if snapshot_time > self.peer_hold_time:
1296                 logger.error("Peer has overstepped the hold timer.")
1297                 raise RuntimeError("Peer has overstepped the hold timer.")
1298                 # TODO: Include hold_timedelta?
1299                 # TODO: Add notification sending (attempt). That means
1300                 # move to write tracker.
1301
1302
1303 class ReadTracker(object):
1304     """Class for tracking read of mesages chunk by chunk and
1305     for idle waiting.
1306     """
1307
1308     def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False, wait_for_read=10):
1309         """The reader initialisation.
1310
1311         Arguments:
1312             bgp_socket: socket to be used for sending
1313             timer: timer to be used for scheduling
1314             storage: thread safe dict
1315             evpn: flag that evpn functionality is tested
1316             mvpn: flag that mvpn functionality is tested
1317         """
1318         # References to outside objects.
1319         self.socket = bgp_socket
1320         self.timer = timer
1321         # BGP marker length plus length field length.
1322         self.header_length = 18
1323         # TODO: make it class (constant) attribute
1324         # Computation of where next chunk ends depends on whether
1325         # we are beyond length field.
1326         self.reading_header = True
1327         # Countdown towards next size computation.
1328         self.bytes_to_read = self.header_length
1329         # Incremental buffer for message under read.
1330         self.msg_in = ""
1331         # Initialising counters
1332         self.updates_received = 0
1333         self.prefixes_introduced = 0
1334         self.prefixes_withdrawn = 0
1335         self.rx_idle_time = 0
1336         self.rx_activity_detected = True
1337         self.storage = storage
1338         self.evpn = evpn
1339         self.mvpn = mvpn
1340         self.wfr = wait_for_read
1341
1342     def read_message_chunk(self):
1343         """Read up to one message
1344
1345         Note:
1346             Currently it does not return anything.
1347         """
1348         # TODO: We could return the whole message, currently not needed.
1349         # We assume the socket is readable.
1350         logger.info("READING MESSAGE")
1351         chunk_message = self.socket.recv(self.bytes_to_read)
1352         self.msg_in += chunk_message
1353         self.bytes_to_read -= len(chunk_message)
1354         # TODO: bytes_to_read < 0 is not possible, right?
1355         if not self.bytes_to_read:
1356             # Finished reading a logical block.
1357             if self.reading_header:
1358                 # The logical block was a BGP header.
1359                 # Now we know the size of the message.
1360                 self.reading_header = False
1361                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1362                                       self.header_length)
1363             else:  # We have finished reading the body of the message.
1364                 # Peer has just proven it is still alive.
1365                 self.timer.reset_peer_hold_time()
1366                 # TODO: Do we want to count received messages?
1367                 # This version ignores the received message.
1368                 # TODO: Should we do validation and exit on anything
1369                 # besides update or keepalive?
1370                 # Prepare state for reading another message.
1371                 message_type_hex = self.msg_in[self.header_length]
1372                 if message_type_hex == "\x01":
1373                     logger.info("OPEN message received: 0x%s",
1374                                 binascii.b2a_hex(self.msg_in))
1375                 elif message_type_hex == "\x02":
1376                     logger.debug("UPDATE message received: 0x%s",
1377                                  binascii.b2a_hex(self.msg_in))
1378                     self.decode_update_message(self.msg_in)
1379                 elif message_type_hex == "\x03":
1380                     logger.info("NOTIFICATION message received: 0x%s",
1381                                 binascii.b2a_hex(self.msg_in))
1382                 elif message_type_hex == "\x04":
1383                     logger.info("KEEP ALIVE message received: 0x%s",
1384                                 binascii.b2a_hex(self.msg_in))
1385                 else:
1386                     logger.warning("Unexpected message received: 0x%s",
1387                                    binascii.b2a_hex(self.msg_in))
1388                 self.msg_in = ""
1389                 self.reading_header = True
1390                 self.bytes_to_read = self.header_length
1391         # We should not act upon peer_hold_time if we are reading
1392         # something right now.
1393         return
1394
1395     def decode_path_attributes(self, path_attributes_hex):
1396         """Decode the Path Attributes field (rfc4271#section-4.3)
1397
1398         Arguments:
1399             :path_attributes: path_attributes field to be decoded in hex
1400         Returns:
1401             :return: None
1402         """
1403         hex_to_decode = path_attributes_hex
1404
1405         while len(hex_to_decode):
1406             attr_flags_hex = hex_to_decode[0]
1407             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1408 #            attr_optional_bit = attr_flags & 128
1409 #            attr_transitive_bit = attr_flags & 64
1410 #            attr_partial_bit = attr_flags & 32
1411             attr_extended_length_bit = attr_flags & 16
1412
1413             attr_type_code_hex = hex_to_decode[1]
1414             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1415
1416             if attr_extended_length_bit:
1417                 attr_length_hex = hex_to_decode[2:4]
1418                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1419                 attr_value_hex = hex_to_decode[4:4 + attr_length]
1420                 hex_to_decode = hex_to_decode[4 + attr_length:]
1421             else:
1422                 attr_length_hex = hex_to_decode[2]
1423                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1424                 attr_value_hex = hex_to_decode[3:3 + attr_length]
1425                 hex_to_decode = hex_to_decode[3 + attr_length:]
1426
1427             if attr_type_code == 1:
1428                 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1429                              binascii.b2a_hex(attr_flags_hex))
1430                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1431             elif attr_type_code == 2:
1432                 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1433                              binascii.b2a_hex(attr_flags_hex))
1434                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1435             elif attr_type_code == 3:
1436                 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1437                              binascii.b2a_hex(attr_flags_hex))
1438                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1439             elif attr_type_code == 4:
1440                 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1441                              binascii.b2a_hex(attr_flags_hex))
1442                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1443             elif attr_type_code == 5:
1444                 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1445                              binascii.b2a_hex(attr_flags_hex))
1446                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1447             elif attr_type_code == 6:
1448                 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1449                              binascii.b2a_hex(attr_flags_hex))
1450                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1451             elif attr_type_code == 7:
1452                 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1453                              binascii.b2a_hex(attr_flags_hex))
1454                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1455             elif attr_type_code == 9:  # rfc4456#section-8
1456                 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1457                              binascii.b2a_hex(attr_flags_hex))
1458                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1459             elif attr_type_code == 10:  # rfc4456#section-8
1460                 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1461                              binascii.b2a_hex(attr_flags_hex))
1462                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1463             elif attr_type_code == 14:  # rfc4760#section-3
1464                 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1465                              binascii.b2a_hex(attr_flags_hex))
1466                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1467                 address_family_identifier_hex = attr_value_hex[0:2]
1468                 logger.debug("  Address Family Identifier=0x%s",
1469                              binascii.b2a_hex(address_family_identifier_hex))
1470                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1471                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1472                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1473                 next_hop_netaddr_len_hex = attr_value_hex[3]
1474                 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1475                 logger.debug("  Length of Next Hop Network Address=%s (0x%s)",
1476                              next_hop_netaddr_len,
1477                              binascii.b2a_hex(next_hop_netaddr_len_hex))
1478                 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1479                 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1480                 logger.debug("  Network Address of Next Hop=%s (0x%s)",
1481                              next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1482                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1483                 logger.debug("  Reserved=0x%s",
1484                              binascii.b2a_hex(reserved_hex))
1485                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1486                 logger.debug("  Network Layer Reachability Information=0x%s",
1487                              binascii.b2a_hex(nlri_hex))
1488                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1489                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1490                 for prefix in nlri_prefix_list:
1491                     logger.debug("  nlri_prefix_received: %s", prefix)
1492                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1493             elif attr_type_code == 15:  # rfc4760#section-4
1494                 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1495                              binascii.b2a_hex(attr_flags_hex))
1496                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1497                 address_family_identifier_hex = attr_value_hex[0:2]
1498                 logger.debug("  Address Family Identifier=0x%s",
1499                              binascii.b2a_hex(address_family_identifier_hex))
1500                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1501                 logger.debug("  Subsequent Address Family Identifier=0x%s",
1502                              binascii.b2a_hex(subsequent_address_family_identifier_hex))
1503                 wd_hex = attr_value_hex[3:]
1504                 logger.debug("  Withdrawn Routes=0x%s",
1505                              binascii.b2a_hex(wd_hex))
1506                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1507                 logger.debug("  Withdrawn routes prefix list: %s",
1508                              wdr_prefix_list)
1509                 for prefix in wdr_prefix_list:
1510                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1511                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1512             else:
1513                 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1514                              binascii.b2a_hex(attr_flags_hex))
1515                 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1516         return None
1517
1518     def decode_update_message(self, msg):
1519         """Decode an UPDATE message (rfc4271#section-4.3)
1520
1521         Arguments:
1522             :msg: message to be decoded in hex
1523         Returns:
1524             :return: None
1525         """
1526         logger.debug("Decoding update message:")
1527         # message header - marker
1528         marker_hex = msg[:16]
1529         logger.debug("Message header marker: 0x%s",
1530                      binascii.b2a_hex(marker_hex))
1531         # message header - message length
1532         msg_length_hex = msg[16:18]
1533         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1534         logger.debug("Message lenght: 0x%s (%s)",
1535                      binascii.b2a_hex(msg_length_hex), msg_length)
1536         # message header - message type
1537         msg_type_hex = msg[18:19]
1538         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1539
1540         with self.storage as stor:
1541             # this will replace the previously stored message
1542             stor['update'] = binascii.hexlify(msg)
1543
1544         logger.debug("Evpn {}".format(self.evpn))
1545         if self.evpn:
1546             logger.debug("Skipping update decoding due to evpn data expected")
1547             return
1548
1549         logger.debug("Mvpn {}".format(self.mvpn))
1550         if self.mvpn:
1551             logger.debug("Skipping update decoding due to mvpn data expected")
1552             return
1553
1554         if msg_type == 2:
1555             logger.debug("Message type: 0x%s (update)",
1556                          binascii.b2a_hex(msg_type_hex))
1557             # withdrawn routes length
1558             wdr_length_hex = msg[19:21]
1559             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1560             logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1561                          binascii.b2a_hex(wdr_length_hex), wdr_length)
1562             # withdrawn routes
1563             wdr_hex = msg[21:21 + wdr_length]
1564             logger.debug("Withdrawn routes: 0x%s",
1565                          binascii.b2a_hex(wdr_hex))
1566             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1567             logger.debug("Withdrawn routes prefix list: %s",
1568                          wdr_prefix_list)
1569             for prefix in wdr_prefix_list:
1570                 logger.debug("withdrawn_prefix_received: %s", prefix)
1571             # total path attribute length
1572             total_pa_length_offset = 21 + wdr_length
1573             total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1574             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1575             logger.debug("Total path attribute lenght: 0x%s (%s)",
1576                          binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1577             # path attributes
1578             pa_offset = total_pa_length_offset + 2
1579             pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1580             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1581             self.decode_path_attributes(pa_hex)
1582             # network layer reachability information length
1583             nlri_length = msg_length - 23 - total_pa_length - wdr_length
1584             logger.debug("Calculated NLRI length: %s", nlri_length)
1585             # network layer reachability information
1586             nlri_offset = pa_offset + total_pa_length
1587             nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1588             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1589             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1590             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1591             for prefix in nlri_prefix_list:
1592                 logger.debug("nlri_prefix_received: %s", prefix)
1593             # Updating counters
1594             self.updates_received += 1
1595             self.prefixes_introduced += len(nlri_prefix_list)
1596             self.prefixes_withdrawn += len(wdr_prefix_list)
1597         else:
1598             logger.error("Unexpeced message type 0x%s in 0x%s",
1599                          binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1600
1601     def wait_for_read(self):
1602         """Read message until timeout (next expected event).
1603
1604         Note:
1605             Used when no more updates has to be sent to avoid busy-wait.
1606             Currently it does not return anything.
1607         """
1608         # Compute time to the first predictable state change
1609         event_time = self.timer.get_next_event_time()
1610         # snapshot_time would be imprecise
1611         wait_timedelta = min(event_time - time.time(), self.wfr)
1612         if wait_timedelta < 0:
1613             # The program got around to waiting to an event in "very near
1614             # future" so late that it became a "past" event, thus tell
1615             # "select" to not wait at all. Passing negative timedelta to
1616             # select() would lead to either waiting forever (for -1) or
1617             # select.error("Invalid parameter") (for everything else).
1618             wait_timedelta = 0
1619         # And wait for event or something to read.
1620
1621         if not self.rx_activity_detected or not (self.updates_received % 100):
1622             # right time to write statistics to the log (not for every update and
1623             # not too frequently to avoid having large log files)
1624             logger.info("total_received_update_message_counter: %s",
1625                         self.updates_received)
1626             logger.info("total_received_nlri_prefix_counter: %s",
1627                         self.prefixes_introduced)
1628             logger.info("total_received_withdrawn_prefix_counter: %s",
1629                         self.prefixes_withdrawn)
1630
1631         start_time = time.time()
1632         select.select([self.socket], [], [self.socket], wait_timedelta)
1633         timedelta = time.time() - start_time
1634         self.rx_idle_time += timedelta
1635         self.rx_activity_detected = timedelta < 1
1636
1637         if not self.rx_activity_detected or not (self.updates_received % 100):
1638             # right time to write statistics to the log (not for every update and
1639             # not too frequently to avoid having large log files)
1640             logger.info("... idle for %.3fs", timedelta)
1641             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1642         return
1643
1644
1645 class WriteTracker(object):
1646     """Class tracking enqueueing messages and sending chunks of them."""
1647
1648     def __init__(self, bgp_socket, generator, timer):
1649         """The writter initialisation.
1650
1651         Arguments:
1652             bgp_socket: socket to be used for sending
1653             generator: generator to be used for message generation
1654             timer: timer to be used for scheduling
1655         """
1656         # References to outside objects,
1657         self.socket = bgp_socket
1658         self.generator = generator
1659         self.timer = timer
1660         # Really new fields.
1661         # TODO: Would attribute docstrings add anything substantial?
1662         self.sending_message = False
1663         self.bytes_to_send = 0
1664         self.msg_out = ""
1665
1666     def enqueue_message_for_sending(self, message):
1667         """Enqueue message and change state.
1668
1669         Arguments:
1670             message: message to be enqueued into the msg_out buffer
1671         """
1672         self.msg_out += message
1673         self.bytes_to_send += len(message)
1674         self.sending_message = True
1675
1676     def send_message_chunk_is_whole(self):
1677         """Send enqueued data from msg_out buffer
1678
1679         Returns:
1680             :return: true if no remaining data to send
1681         """
1682         # We assume there is a msg_out to send and socket is writable.
1683         # print "going to send", repr(self.msg_out)
1684         self.timer.snapshot()
1685         bytes_sent = self.socket.send(self.msg_out)
1686         # Forget the part of message that was sent.
1687         self.msg_out = self.msg_out[bytes_sent:]
1688         self.bytes_to_send -= bytes_sent
1689         if not self.bytes_to_send:
1690             # TODO: Is it possible to hit negative bytes_to_send?
1691             self.sending_message = False
1692             # We should have reset hold timer on peer side.
1693             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1694             # The possible reason for not prioritizing reads is gone.
1695             return True
1696         return False
1697
1698
1699 class StateTracker(object):
1700     """Main loop has state so complex it warrants this separate class."""
1701
1702     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1703         """The state tracker initialisation.
1704
1705         Arguments:
1706             bgp_socket: socket to be used for sending / receiving
1707             generator: generator to be used for message generation
1708             timer: timer to be used for scheduling
1709             inqueue: user initiated messages queue
1710             storage: thread safe dict to store data for the rpc server
1711             cliargs: cli args from the user
1712         """
1713         # References to outside objects.
1714         self.socket = bgp_socket
1715         self.generator = generator
1716         self.timer = timer
1717         # Sub-trackers.
1718         self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn,
1719                                   mvpn=cliargs.mvpn, wait_for_read=cliargs.wfr)
1720         self.writer = WriteTracker(bgp_socket, generator, timer)
1721         # Prioritization state.
1722         self.prioritize_writing = False
1723         # In general, we prioritize reading over writing. But in order
1724         # not to get blocked by neverending reads, we should
1725         # check whether we are not risking running out of holdtime.
1726         # So in some situations, this field is set to True to attempt
1727         # finishing sending a message, after which this field resets
1728         # back to False.
1729         # TODO: Alternative is to switch fairly between reading and
1730         # writing (called round robin from now on).
1731         # Message counting is done in generator.
1732         self.inqueue = inqueue
1733
1734     def perform_one_loop_iteration(self):
1735         """ The main loop iteration
1736
1737         Notes:
1738             Calculates priority, resolves all conditions, calls
1739             appropriate method and returns to caller to repeat.
1740         """
1741         self.timer.snapshot()
1742         if not self.prioritize_writing:
1743             if self.timer.is_time_for_my_keepalive():
1744                 if not self.writer.sending_message:
1745                     # We need to schedule a keepalive ASAP.
1746                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1747                     logger.info("KEEP ALIVE is sent.")
1748                 # We are sending a message now, so let's prioritize it.
1749                 self.prioritize_writing = True
1750
1751         try:
1752             msg = self.inqueue.get_nowait()
1753             logger.info("Received message: {}".format(msg))
1754             msgbin = binascii.unhexlify(msg)
1755             self.writer.enqueue_message_for_sending(msgbin)
1756         except Queue.Empty:
1757             pass
1758         # Now we know what our priorities are, we have to check
1759         # which actions are available.
1760         # socket.socket() returns three lists,
1761         # we store them to list of lists.
1762         list_list = select.select([self.socket], [self.socket], [self.socket],
1763                                   self.timer.report_timedelta)
1764         read_list, write_list, except_list = list_list
1765         # Lists are unpacked, each is either [] or [self.socket],
1766         # so we will test them as boolean.
1767         if except_list:
1768             logger.error("Exceptional state on the socket.")
1769             raise RuntimeError("Exceptional state on socket", self.socket)
1770         # We will do either read or write.
1771         if not (self.prioritize_writing and write_list):
1772             # Either we have no reason to rush writes,
1773             # or the socket is not writable.
1774             # We are focusing on reading here.
1775             if read_list:  # there is something to read indeed
1776                 # In this case we want to read chunk of message
1777                 # and repeat the select,
1778                 self.reader.read_message_chunk()
1779                 return
1780             # We were focusing on reading, but nothing to read was there.
1781             # Good time to check peer for hold timer.
1782             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1783             # Quiet on the read front, we can have attempt to write.
1784         if write_list:
1785             # Either we really want to reset peer's view of our hold
1786             # timer, or there was nothing to read.
1787             # Were we in the middle of sending a message?
1788             if self.writer.sending_message:
1789                 # Was it the end of a message?
1790                 whole = self.writer.send_message_chunk_is_whole()
1791                 # We were pressed to send something and we did it.
1792                 if self.prioritize_writing and whole:
1793                     # We prioritize reading again.
1794                     self.prioritize_writing = False
1795                 return
1796             # Finally to check if still update messages to be generated.
1797             if self.generator.remaining_prefixes:
1798                 msg_out = self.generator.compose_update_message()
1799                 if not self.generator.remaining_prefixes:
1800                     # We have just finished update generation,
1801                     # end-of-rib is due.
1802                     logger.info("All update messages generated.")
1803                     logger.info("Storing performance results.")
1804                     self.generator.store_results()
1805                     logger.info("Finally an END-OF-RIB is sent.")
1806                     msg_out += self.generator.update_message(wr_prefixes=[],
1807                                                              nlri_prefixes=[],
1808                                                              end_of_rib=True)
1809                 self.writer.enqueue_message_for_sending(msg_out)
1810                 # Attempt for real sending to be done in next iteration.
1811                 return
1812             # Nothing to write anymore.
1813             # To avoid busy loop, we do idle waiting here.
1814             self.reader.wait_for_read()
1815             return
1816         # We can neither read nor write.
1817         logger.warning("Input and output both blocked for " +
1818                        str(self.timer.report_timedelta) + " seconds.")
1819         # FIXME: Are we sure select has been really waiting
1820         # the whole period?
1821         return
1822
1823
1824 def create_logger(loglevel, logfile):
1825     """Create logger object
1826
1827     Arguments:
1828         :loglevel: log level
1829         :logfile: log file name
1830     Returns:
1831         :return: logger object
1832     """
1833     logger = logging.getLogger("logger")
1834     log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1835     console_handler = logging.StreamHandler()
1836     file_handler = logging.FileHandler(logfile, mode="w")
1837     console_handler.setFormatter(log_formatter)
1838     file_handler.setFormatter(log_formatter)
1839     logger.addHandler(console_handler)
1840     logger.addHandler(file_handler)
1841     logger.setLevel(loglevel)
1842     return logger
1843
1844
1845 def job(arguments, inqueue, storage):
1846     """One time initialisation and iterations looping.
1847     Notes:
1848         Establish BGP connection and run iterations.
1849
1850     Arguments:
1851         :arguments: Command line arguments
1852         :inqueue: Data to be sent from play.py
1853         :storage: Shared dict for rpc server
1854     Returns:
1855         :return: None
1856     """
1857     bgp_socket = establish_connection(arguments)
1858     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1859     # Receive open message before sending anything.
1860     # FIXME: Add parameter to send default open message first,
1861     # to work with "you first" peers.
1862     msg_in = read_open_message(bgp_socket)
1863     timer = TimeTracker(msg_in)
1864     generator = MessageGenerator(arguments)
1865     msg_out = generator.open_message()
1866     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1867     # Send our open message to the peer.
1868     bgp_socket.send(msg_out)
1869     # Wait for confirming keepalive.
1870     # TODO: Surely in just one packet?
1871     # Using exact keepalive length to not to see possible updates.
1872     msg_in = bgp_socket.recv(19)
1873     if msg_in != generator.keepalive_message():
1874         error_msg = "Open not confirmed by keepalive, instead got"
1875         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1876         raise MessageError(error_msg, msg_in)
1877     timer.reset_peer_hold_time()
1878     # Send the keepalive to indicate the connection is accepted.
1879     timer.snapshot()  # Remember this time.
1880     msg_out = generator.keepalive_message()
1881     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1882     bgp_socket.send(msg_out)
1883     # Use the remembered time.
1884     timer.reset_my_keepalive_time(timer.snapshot_time)
1885     # End of initial handshake phase.
1886     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1887     while True:  # main reactor loop
1888         state.perform_one_loop_iteration()
1889
1890
1891 class Rpcs:
1892     '''Handler for SimpleXMLRPCServer'''
1893
1894     def __init__(self, sendqueue, storage):
1895         '''Init method
1896
1897         Arguments:
1898             :sendqueue: queue for data to be sent towards odl
1899             :storage: thread safe dict
1900         '''
1901         self.queue = sendqueue
1902         self.storage = storage
1903
1904     def send(self, text):
1905         '''Data to be sent
1906
1907         Arguments:
1908             :text: hes string of the data to be sent
1909         '''
1910         self.queue.put(text)
1911
1912     def get(self, text=''):
1913         '''Reads data form the storage
1914
1915         - returns stored data or an empty string, at the moment only
1916           'update' is stored
1917
1918         Arguments:
1919             :text: a key to the storage to get the data
1920         Returns:
1921             :data: stored data
1922         '''
1923         with self.storage as stor:
1924             return stor.get(text, '')
1925
1926     def clean(self, text=''):
1927         '''Cleans data form the storage
1928
1929         Arguments:
1930             :text: a key to the storage to clean the data
1931         '''
1932         with self.storage as stor:
1933             if text in stor:
1934                 del stor[text]
1935
1936
1937 def threaded_job(arguments):
1938     """Run the job threaded
1939
1940     Arguments:
1941         :arguments: Command line arguments
1942     Returns:
1943         :return: None
1944     """
1945     amount_left = arguments.amount
1946     utils_left = arguments.multiplicity
1947     prefix_current = arguments.firstprefix
1948     myip_current = arguments.myip
1949     thread_args = []
1950     rpcqueue = Queue.Queue()
1951     storage = SafeDict()
1952
1953     while 1:
1954         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
1955         amount_left -= amount_per_util
1956         utils_left -= 1
1957
1958         args = deepcopy(arguments)
1959         args.amount = amount_per_util
1960         args.firstprefix = prefix_current
1961         args.myip = myip_current
1962         thread_args.append(args)
1963
1964         if not utils_left:
1965             break
1966         prefix_current += amount_per_util * 16
1967         myip_current += 1
1968
1969     try:
1970         # Create threads
1971         for t in thread_args:
1972             thread.start_new_thread(job, (t, rpcqueue, storage))
1973     except Exception:
1974         print "Error: unable to start thread."
1975         raise SystemExit(2)
1976
1977     rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
1978     rpcserver.register_instance(Rpcs(rpcqueue, storage))
1979     rpcserver.serve_forever()
1980
1981
1982 if __name__ == "__main__":
1983     arguments = parse_arguments()
1984     logger = create_logger(arguments.loglevel, arguments.logfile)
1985     threaded_job(arguments)