Auto-generated patch by python-black
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import Queue
21 import select
22 import socket
23 import struct
24 import thread
25 import threading
26 import time
27
28
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
33
34
35 class SafeDict(dict):
36     """Thread safe dictionary
37
38     The object will serve as thread safe data storage.
39     It should be used with "with" statement.
40     """
41
42     def __init__(self, *p_arg, **n_arg):
43         super(SafeDict, self).__init__()
44         self._lock = threading.Lock()
45
46     def __enter__(self):
47         self._lock.acquire()
48         return self
49
50     def __exit__(self, type, value, traceback):
51         self._lock.release()
52
53
54 def parse_arguments():
55     """Use argparse to get arguments,
56
57     Returns:
58         :return: args object.
59     """
60     parser = argparse.ArgumentParser()
61     # TODO: Should we use --argument-names-with-spaces?
62     str_help = "Autonomous System number use in the stream."
63     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64     # FIXME: We are acting as iBGP peer,
65     # we should mirror AS number from peer's open message.
66     str_help = "Amount of IP prefixes to generate. (negative means " "infinite" ")."
67     parser.add_argument("--amount", default="1", type=int, help=str_help)
68     str_help = "Rpc server port."
69     parser.add_argument("--port", default="8000", type=int, help=str_help)
70     str_help = "Maximum number of IP prefixes to be announced in one iteration"
71     parser.add_argument("--insert", default="1", type=int, help=str_help)
72     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74     str_help = "The number of prefixes to process without withdrawals"
75     parser.add_argument("--prefill", default="0", type=int, help=str_help)
76     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77     parser.add_argument(
78         "--updates", choices=["single", "separate"], default=["separate"], help=str_help
79     )
80     str_help = "Base prefix IP address for prefix generation"
81     parser.add_argument(
82         "--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help
83     )
84     str_help = "The prefix length."
85     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
86     str_help = "Listen for connection, instead of initiating it."
87     parser.add_argument("--listen", action="store_true", help=str_help)
88     str_help = (
89         "Numeric IP Address to bind to and derive BGP ID from."
90         + "Default value only suitable for listening."
91     )
92     parser.add_argument(
93         "--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help
94     )
95     str_help = (
96         "TCP port to bind to when listening or initiating connection."
97         + "Default only suitable for initiating."
98     )
99     parser.add_argument("--myport", default="0", type=int, help=str_help)
100     str_help = "The IP of the next hop to be placed into the update messages."
101     parser.add_argument(
102         "--nexthop",
103         default="192.0.2.1",
104         type=ipaddr.IPv4Address,
105         dest="nexthop",
106         help=str_help,
107     )
108     str_help = "Identifier of the route originator."
109     parser.add_argument(
110         "--originator",
111         default=None,
112         type=ipaddr.IPv4Address,
113         dest="originator",
114         help=str_help,
115     )
116     str_help = "Cluster list item identifier."
117     parser.add_argument(
118         "--cluster",
119         default=None,
120         type=ipaddr.IPv4Address,
121         dest="cluster",
122         help=str_help,
123     )
124     str_help = (
125         "Numeric IP Address to try to connect to."
126         + "Currently no effect in listening mode."
127     )
128     parser.add_argument(
129         "--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help
130     )
131     str_help = "TCP port to try to connect to. No effect in listening mode."
132     parser.add_argument("--peerport", default="179", type=int, help=str_help)
133     str_help = "Local hold time."
134     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
135     str_help = "Log level (--error, --warning, --info, --debug)"
136     parser.add_argument(
137         "--error",
138         dest="loglevel",
139         action="store_const",
140         const=logging.ERROR,
141         default=logging.INFO,
142         help=str_help,
143     )
144     parser.add_argument(
145         "--warning",
146         dest="loglevel",
147         action="store_const",
148         const=logging.WARNING,
149         default=logging.INFO,
150         help=str_help,
151     )
152     parser.add_argument(
153         "--info",
154         dest="loglevel",
155         action="store_const",
156         const=logging.INFO,
157         default=logging.INFO,
158         help=str_help,
159     )
160     parser.add_argument(
161         "--debug",
162         dest="loglevel",
163         action="store_const",
164         const=logging.DEBUG,
165         default=logging.INFO,
166         help=str_help,
167     )
168     str_help = "Log file name"
169     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
170     str_help = "Trailing part of the csv result files for plotting purposes"
171     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
172     str_help = "Minimum number of updates to reach to include result into csv."
173     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
174     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
175     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
176     str_help = "Using peerip instead of myip for xmlrpc server"
177     parser.add_argument(
178         "--usepeerip", default=False, action="store_true", help=str_help
179     )
180     str_help = "Link-State NLRI supported"
181     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
182     str_help = "Link-State NLRI: Identifier"
183     parser.add_argument("-lsid", default="1", type=int, help=str_help)
184     str_help = "Link-State NLRI: Tunnel ID"
185     parser.add_argument("-lstid", default="1", type=int, help=str_help)
186     str_help = "Link-State NLRI: LSP ID"
187     parser.add_argument("-lspid", default="1", type=int, help=str_help)
188     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
189     parser.add_argument(
190         "--lstsaddr", default="1.2.3.4", type=ipaddr.IPv4Address, help=str_help
191     )
192     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
193     parser.add_argument(
194         "--lsteaddr", default="5.6.7.8", type=ipaddr.IPv4Address, help=str_help
195     )
196     str_help = "Link-State NLRI: Identifier Step"
197     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
198     str_help = "Link-State NLRI: Tunnel ID Step"
199     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
200     str_help = "Link-State NLRI: LSP ID Step"
201     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
202     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
203     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
204     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
205     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
206     str_help = "How many play utilities are to be started."
207     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
208     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
209     Enabling this flag makes the script not decoding the update mesage, because of not\
210     supported decoding for these elements."
211     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
212     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
213     Enabling this flag makes the script not decoding the update mesage, because of not\
214     supported decoding for these elements."
215     parser.add_argument("--grace", default="8", type=int, help=str_help)
216     str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
217     IPV4-Unicast, IPV6-Unicast, BGP-LS\
218     Enabling this flag makes the script not decoding the update mesage, because of not\
219     supported decoding for these elements."
220     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
221     str_help = "Open message includes L3VPN-MULTICAST arguments.\
222     Enabling this flag makes the script not decoding the update mesage, because of not\
223     supported decoding for these elements."
224     parser.add_argument(
225         "--l3vpn_mcast", default=False, action="store_true", help=str_help
226     )
227     str_help = (
228         "Open message includes L3VPN-UNICAST arguments, without message decoding."
229     )
230     parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
231     str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
232     parser.add_argument(
233         "--rt_constrain", default=False, action="store_true", help=str_help
234     )
235     str_help = "Open message includes ipv6-unicast family, without message decoding."
236     parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
237     str_help = "Add all supported families without message decoding."
238     parser.add_argument("--allf", default=False, action="store_true", help=str_help)
239     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
240     str_help = "Skipping well known attributes for update message"
241     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
242     arguments = parser.parse_args()
243     if arguments.multiplicity < 1:
244         print("Multiplicity", arguments.multiplicity, "is not positive.")
245         raise SystemExit(1)
246     # TODO: Are sanity checks (such as asnumber>=0) required?
247     return arguments
248
249
250 def establish_connection(arguments):
251     """Establish connection to BGP peer.
252
253     Arguments:
254         :arguments: following command-line arguments are used
255             - arguments.myip: local IP address
256             - arguments.myport: local port
257             - arguments.peerip: remote IP address
258             - arguments.peerport: remote port
259     Returns:
260         :return: socket.
261     """
262     if arguments.listen:
263         logger.info("Connecting in the listening mode.")
264         logger.debug("Local IP address: " + str(arguments.myip))
265         logger.debug("Local port: " + str(arguments.myport))
266         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
267         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
268         # bind need single tuple as argument
269         listening_socket.bind((str(arguments.myip), arguments.myport))
270         listening_socket.listen(1)
271         bgp_socket, _ = listening_socket.accept()
272         # TODO: Verify client IP is cotroller IP.
273         listening_socket.close()
274     else:
275         logger.info("Connecting in the talking mode.")
276         logger.debug("Local IP address: " + str(arguments.myip))
277         logger.debug("Local port: " + str(arguments.myport))
278         logger.debug("Remote IP address: " + str(arguments.peerip))
279         logger.debug("Remote port: " + str(arguments.peerport))
280         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
281         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
282         # bind to force specified address and port
283         talking_socket.bind((str(arguments.myip), arguments.myport))
284         # socket does not spead ipaddr, hence str()
285         talking_socket.connect((str(arguments.peerip), arguments.peerport))
286         bgp_socket = talking_socket
287     logger.info("Connected to ODL.")
288     return bgp_socket
289
290
291 def get_short_int_from_message(message, offset=16):
292     """Extract 2-bytes number from provided message.
293
294     Arguments:
295         :message: given message
296         :offset: offset of the short_int inside the message
297     Returns:
298         :return: required short_inf value.
299     Notes:
300         default offset value is the BGP message size offset.
301     """
302     high_byte_int = ord(message[offset])
303     low_byte_int = ord(message[offset + 1])
304     short_int = high_byte_int * 256 + low_byte_int
305     return short_int
306
307
308 def get_prefix_list_from_hex(prefixes_hex):
309     """Get decoded list of prefixes (rfc4271#section-4.3)
310
311     Arguments:
312         :prefixes_hex: list of prefixes to be decoded in hex
313     Returns:
314         :return: list of prefixes in the form of ip address (X.X.X.X/X)
315     """
316     prefix_list = []
317     offset = 0
318     while offset < len(prefixes_hex):
319         prefix_bit_len_hex = prefixes_hex[offset]
320         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
321         prefix_len = ((prefix_bit_len - 1) / 8) + 1
322         prefix_hex = prefixes_hex[offset + 1 : offset + 1 + prefix_len]
323         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
324         offset += 1 + prefix_len
325         prefix_list.append(prefix + "/" + str(prefix_bit_len))
326     return prefix_list
327
328
329 class MessageError(ValueError):
330     """Value error with logging optimized for hexlified messages."""
331
332     def __init__(self, text, message, *args):
333         """Initialisation.
334
335         Store and call super init for textual comment,
336         store raw message which caused it.
337         """
338         self.text = text
339         self.msg = message
340         super(MessageError, self).__init__(text, message, *args)
341
342     def __str__(self):
343         """Generate human readable error message.
344
345         Returns:
346             :return: human readable message as string
347         Notes:
348             Use a placeholder string if the message is to be empty.
349         """
350         message = binascii.hexlify(self.msg)
351         if message == "":
352             message = "(empty message)"
353         return self.text + ": " + message
354
355
356 def read_open_message(bgp_socket):
357     """Receive peer's OPEN message
358
359     Arguments:
360         :bgp_socket: the socket to be read
361     Returns:
362         :return: received OPEN message.
363     Notes:
364         Performs just basic incomming message checks
365     """
366     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
367     # TODO: Can the incoming open message be split in more than one packet?
368     # Some validation.
369     if len(msg_in) < 37:
370         # 37 is minimal length of open message with 4-byte AS number.
371         error_msg = (
372             "Message length (" + str(len(msg_in)) + ") is smaller than "
373             "minimal length of OPEN message with 4-byte AS number (37)"
374         )
375         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
376         raise MessageError(error_msg, msg_in)
377     # TODO: We could check BGP marker, but it is defined only later;
378     # decide what to do.
379     reported_length = get_short_int_from_message(msg_in)
380     if len(msg_in) != reported_length:
381         error_msg = (
382             "Expected message length ("
383             + reported_length
384             + ") does not match actual length ("
385             + str(len(msg_in))
386             + ")"
387         )
388         logger.error(error_msg + binascii.hexlify(msg_in))
389         raise MessageError(error_msg, msg_in)
390     logger.info("Open message received.")
391     return msg_in
392
393
394 class MessageGenerator(object):
395     """Class which generates messages, holds states and configuration values."""
396
397     # TODO: Define bgp marker as a class (constant) variable.
398     def __init__(self, args):
399         """Initialisation according to command-line args.
400
401         Arguments:
402             :args: argsparser's Namespace object which contains command-line
403                 options for MesageGenerator initialisation
404         Notes:
405             Calculates and stores default values used later on for
406             message generation.
407         """
408         self.total_prefix_amount = args.amount
409         # Number of update messages left to be sent.
410         self.remaining_prefixes = self.total_prefix_amount
411
412         # New parameters initialisation
413         self.port = args.port
414         self.iteration = 0
415         self.prefix_base_default = args.firstprefix
416         self.prefix_length_default = args.prefixlen
417         self.wr_prefixes_default = []
418         self.nlri_prefixes_default = []
419         self.version_default = 4
420         self.my_autonomous_system_default = args.asnumber
421         self.hold_time_default = args.holdtime  # Local hold time.
422         self.bgp_identifier_default = int(args.myip)
423         self.next_hop_default = args.nexthop
424         self.originator_id_default = args.originator
425         self.cluster_list_item_default = args.cluster
426         self.single_update_default = args.updates == "single"
427         self.randomize_updates_default = args.updates == "random"
428         self.prefix_count_to_add_default = args.insert
429         self.prefix_count_to_del_default = args.withdraw
430         if self.prefix_count_to_del_default < 0:
431             self.prefix_count_to_del_default = 0
432         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
433             # total number of prefixes must grow to avoid infinite test loop
434             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
435         self.slot_size_default = self.prefix_count_to_add_default
436         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
437         self.results_file_name_default = args.results
438         self.performance_threshold_default = args.threshold
439         self.rfc4760 = args.rfc4760
440         self.bgpls = args.bgpls
441         self.evpn = args.evpn
442         self.mvpn = args.mvpn
443         self.l3vpn_mcast = args.l3vpn_mcast
444         self.l3vpn = args.l3vpn
445         self.rt_constrain = args.rt_constrain
446         self.ipv6 = args.ipv6
447         self.allf = args.allf
448         self.skipattr = args.skipattr
449         self.grace = args.grace
450         # Default values when BGP-LS Attributes are used
451         if self.bgpls:
452             self.prefix_count_to_add_default = 1
453             self.prefix_count_to_del_default = 0
454         self.ls_nlri_default = {
455             "Identifier": args.lsid,
456             "TunnelID": args.lstid,
457             "LSPID": args.lspid,
458             "IPv4TunnelSenderAddress": args.lstsaddr,
459             "IPv4TunnelEndPointAddress": args.lsteaddr,
460         }
461         self.lsid_step = args.lsidstep
462         self.lstid_step = args.lstidstep
463         self.lspid_step = args.lspidstep
464         self.lstsaddr_step = args.lstsaddrstep
465         self.lsteaddr_step = args.lsteaddrstep
466         # Default values used for randomized part
467         s1_slots = (
468             self.total_prefix_amount - self.remaining_prefixes_threshold - 1
469         ) / self.prefix_count_to_add_default + 1
470         s2_slots = (self.remaining_prefixes_threshold - 1) / (
471             self.prefix_count_to_add_default - self.prefix_count_to_del_default
472         ) + 1
473         # S1_First_Index = 0
474         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
475         s2_first_index = s1_slots * self.prefix_count_to_add_default
476         s2_last_index = (
477             s2_first_index
478             + s2_slots
479             * (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
480             - 1
481         )
482         self.slot_gap_default = (
483             self.total_prefix_amount - self.remaining_prefixes_threshold - 1
484         ) / self.prefix_count_to_add_default + 1
485         self.randomize_lowest_default = s2_first_index
486         self.randomize_highest_default = s2_last_index
487         # Initialising counters
488         self.phase1_start_time = 0
489         self.phase1_stop_time = 0
490         self.phase2_start_time = 0
491         self.phase2_stop_time = 0
492         self.phase1_updates_sent = 0
493         self.phase2_updates_sent = 0
494         self.updates_sent = 0
495
496         self.log_info = args.loglevel <= logging.INFO
497         self.log_debug = args.loglevel <= logging.DEBUG
498         """
499         Flags needed for the MessageGenerator performance optimization.
500         Calling logger methods each iteration even with proper log level set
501         slows down significantly the MessageGenerator performance.
502         Measured total generation time (1M updates, dry run, error log level):
503         - logging based on basic logger features: 36,2s
504         - logging based on advanced logger features (lazy logging): 21,2s
505         - conditional calling of logger methods enclosed inside condition: 8,6s
506         """
507
508         logger.info("Generator initialisation")
509         logger.info(
510             "  Target total number of prefixes to be introduced: "
511             + str(self.total_prefix_amount)
512         )
513         logger.info(
514             "  Prefix base: "
515             + str(self.prefix_base_default)
516             + "/"
517             + str(self.prefix_length_default)
518         )
519         logger.info(
520             "  My Autonomous System number: " + str(self.my_autonomous_system_default)
521         )
522         logger.info("  My Hold Time: " + str(self.hold_time_default))
523         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
524         logger.info("  Next Hop: " + str(self.next_hop_default))
525         logger.info("  Originator ID: " + str(self.originator_id_default))
526         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
527         logger.info(
528             "  Prefix count to be inserted at once: "
529             + str(self.prefix_count_to_add_default)
530         )
531         logger.info(
532             "  Prefix count to be withdrawn at once: "
533             + str(self.prefix_count_to_del_default)
534         )
535         logger.info(
536             "  Fast pre-fill up to "
537             + str(self.total_prefix_amount - self.remaining_prefixes_threshold)
538             + " prefixes"
539         )
540         logger.info(
541             "  Remaining number of prefixes to be processed "
542             + "in parallel with withdrawals: "
543             + str(self.remaining_prefixes_threshold)
544         )
545         logger.debug(
546             "  Prefix index range used after pre-fill procedure ["
547             + str(self.randomize_lowest_default)
548             + ", "
549             + str(self.randomize_highest_default)
550             + "]"
551         )
552         if self.single_update_default:
553             logger.info(
554                 "  Common single UPDATE will be generated "
555                 + "for both NLRI & WITHDRAWN lists"
556             )
557         else:
558             logger.info(
559                 "  Two separate UPDATEs will be generated "
560                 + "for each NLRI & WITHDRAWN lists"
561             )
562         if self.randomize_updates_default:
563             logger.info("  Generation of UPDATE messages will be randomized")
564         logger.info("  Let's go ...\n")
565
566         # TODO: Notification for hold timer expiration can be handy.
567
568     def store_results(self, file_name=None, threshold=None):
569         """ Stores specified results into files based on file_name value.
570
571         Arguments:
572             :param file_name: Trailing (common) part of result file names
573             :param threshold: Minimum number of sent updates needed for each
574                               result to be included into result csv file
575                               (mainly needed because of the result accuracy)
576         Returns:
577             :return: n/a
578         """
579         # default values handling
580         # TODO optimize default values handling (use e.g. dicionary.update() approach)
581         if file_name is None:
582             file_name = self.results_file_name_default
583         if threshold is None:
584             threshold = self.performance_threshold_default
585         # performance calculation
586         if self.phase1_updates_sent >= threshold:
587             totals1 = self.phase1_updates_sent
588             performance1 = int(
589                 self.phase1_updates_sent
590                 / (self.phase1_stop_time - self.phase1_start_time)
591             )
592         else:
593             totals1 = None
594             performance1 = None
595         if self.phase2_updates_sent >= threshold:
596             totals2 = self.phase2_updates_sent
597             performance2 = int(
598                 self.phase2_updates_sent
599                 / (self.phase2_stop_time - self.phase2_start_time)
600             )
601         else:
602             totals2 = None
603             performance2 = None
604
605         logger.info("#" * 10 + " Final results " + "#" * 10)
606         logger.info("Number of iterations: " + str(self.iteration))
607         logger.info(
608             "Number of UPDATE messages sent in the pre-fill phase: "
609             + str(self.phase1_updates_sent)
610         )
611         logger.info(
612             "The pre-fill phase duration: "
613             + str(self.phase1_stop_time - self.phase1_start_time)
614             + "s"
615         )
616         logger.info(
617             "Number of UPDATE messages sent in the 2nd test phase: "
618             + str(self.phase2_updates_sent)
619         )
620         logger.info(
621             "The 2nd test phase duration: "
622             + str(self.phase2_stop_time - self.phase2_start_time)
623             + "s"
624         )
625         logger.info("Threshold for performance reporting: " + str(threshold))
626
627         # making labels
628         phase1_label = (
629             "pre-fill " + str(self.prefix_count_to_add_default) + " route(s) per UPDATE"
630         )
631         if self.single_update_default:
632             phase2_label = "+" + (
633                 str(self.prefix_count_to_add_default)
634                 + "/-"
635                 + str(self.prefix_count_to_del_default)
636                 + " routes per UPDATE"
637             )
638         else:
639             phase2_label = "+" + (
640                 str(self.prefix_count_to_add_default)
641                 + "/-"
642                 + str(self.prefix_count_to_del_default)
643                 + " routes in two UPDATEs"
644             )
645         # collecting capacity and performance results
646         totals = {}
647         performance = {}
648         if totals1 is not None:
649             totals[phase1_label] = totals1
650             performance[phase1_label] = performance1
651         if totals2 is not None:
652             totals[phase2_label] = totals2
653             performance[phase2_label] = performance2
654         self.write_results_to_file(totals, "totals-" + file_name)
655         self.write_results_to_file(performance, "performance-" + file_name)
656
657     def write_results_to_file(self, results, file_name):
658         """Writes results to the csv plot file consumable by Jenkins.
659
660         Arguments:
661             :param file_name: Name of the (csv) file to be created
662         Returns:
663             :return: none
664         """
665         first_line = ""
666         second_line = ""
667         f = open(file_name, "wt")
668         try:
669             for key in sorted(results):
670                 first_line += key + ", "
671                 second_line += str(results[key]) + ", "
672             first_line = first_line[:-2]
673             second_line = second_line[:-2]
674             f.write(first_line + "\n")
675             f.write(second_line + "\n")
676             logger.info(
677                 "Message generator performance results stored in " + file_name + ":"
678             )
679             logger.info("  " + first_line)
680             logger.info("  " + second_line)
681         finally:
682             f.close()
683
684     # Return pseudo-randomized (reproducible) index for selected range
685     def randomize_index(self, index, lowest=None, highest=None):
686         """Calculates pseudo-randomized index from selected range.
687
688         Arguments:
689             :param index: input index
690             :param lowest: the lowes index from the randomized area
691             :param highest: the highest index from the randomized area
692         Returns:
693             :return: the (pseudo)randomized index
694         Notes:
695             Created just as a fame for future generator enhancement.
696         """
697         # default values handling
698         # TODO optimize default values handling (use e.g. dicionary.update() approach)
699         if lowest is None:
700             lowest = self.randomize_lowest_default
701         if highest is None:
702             highest = self.randomize_highest_default
703         # randomize
704         if (index >= lowest) and (index <= highest):
705             # we are in the randomized range -> shuffle it inside
706             # the range (now just reverse the order)
707             new_index = highest - (index - lowest)
708         else:
709             # we are out of the randomized range -> nothing to do
710             new_index = index
711         return new_index
712
713     def get_ls_nlri_values(self, index):
714         """Generates LS-NLRI parameters.
715         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
716
717         Arguments:
718             :param index: index (iteration)
719         Returns:
720             :return: dictionary of LS NLRI parameters and values
721         """
722         # generating list of LS NLRI parameters
723         identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
724         ipv4_tunnel_sender_address = (
725             self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
726         )
727         tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
728         lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
729         ipv4_tunnel_endpoint_address = (
730             self.ls_nlri_default["IPv4TunnelEndPointAddress"]
731             + index / self.lsteaddr_step
732         )
733         ls_nlri_values = {
734             "Identifier": identifier,
735             "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
736             "TunnelID": tunnel_id,
737             "LSPID": lsp_id,
738             "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address,
739         }
740         return ls_nlri_values
741
742     def get_prefix_list(
743         self,
744         slot_index,
745         slot_size=None,
746         prefix_base=None,
747         prefix_len=None,
748         prefix_count=None,
749         randomize=None,
750     ):
751         """Generates list of IP address prefixes.
752
753         Arguments:
754             :param slot_index: index of group of prefix addresses
755             :param slot_size: size of group of prefix addresses
756                 in [number of included prefixes]
757             :param prefix_base: IP address of the first prefix
758                 (slot_index = 0, prefix_index = 0)
759             :param prefix_len: length of the prefix in bites
760                 (the same as size of netmask)
761             :param prefix_count: number of prefixes to be returned
762                 from the specified slot
763         Returns:
764             :return: list of generated IP address prefixes
765         """
766         # default values handling
767         # TODO optimize default values handling (use e.g. dicionary.update() approach)
768         if slot_size is None:
769             slot_size = self.slot_size_default
770         if prefix_base is None:
771             prefix_base = self.prefix_base_default
772         if prefix_len is None:
773             prefix_len = self.prefix_length_default
774         if prefix_count is None:
775             prefix_count = slot_size
776         if randomize is None:
777             randomize = self.randomize_updates_default
778         # generating list of prefixes
779         indexes = []
780         prefixes = []
781         prefix_gap = 2 ** (32 - prefix_len)
782         for i in range(prefix_count):
783             prefix_index = slot_index * slot_size + i
784             if randomize:
785                 prefix_index = self.randomize_index(prefix_index)
786             indexes.append(prefix_index)
787             prefixes.append(prefix_base + prefix_index * prefix_gap)
788         if self.log_debug:
789             logger.debug("  Prefix slot index: " + str(slot_index))
790             logger.debug("  Prefix slot size: " + str(slot_size))
791             logger.debug("  Prefix count: " + str(prefix_count))
792             logger.debug("  Prefix indexes: " + str(indexes))
793             logger.debug("  Prefix list: " + str(prefixes))
794         return prefixes
795
796     def compose_update_message(
797         self, prefix_count_to_add=None, prefix_count_to_del=None
798     ):
799         """Composes an UPDATE message
800
801         Arguments:
802             :param prefix_count_to_add: # of prefixes to put into NLRI list
803             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
804         Returns:
805             :return: encoded UPDATE message in HEX
806         Notes:
807             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
808             lists or common message wich includes both prefix lists.
809             Updates global counters.
810         """
811         # default values handling
812         # TODO optimize default values handling (use e.g. dicionary.update() approach)
813         if prefix_count_to_add is None:
814             prefix_count_to_add = self.prefix_count_to_add_default
815         if prefix_count_to_del is None:
816             prefix_count_to_del = self.prefix_count_to_del_default
817         # logging
818         if self.log_info and not (self.iteration % 1000):
819             logger.info(
820                 "Iteration: "
821                 + str(self.iteration)
822                 + " - total remaining prefixes: "
823                 + str(self.remaining_prefixes)
824             )
825         if self.log_debug:
826             logger.debug(
827                 "#" * 10 + " Iteration: " + str(self.iteration) + " " + "#" * 10
828             )
829             logger.debug("Remaining prefixes: " + str(self.remaining_prefixes))
830         # scenario type & one-shot counter
831         straightforward_scenario = (
832             self.remaining_prefixes > self.remaining_prefixes_threshold
833         )
834         if straightforward_scenario:
835             prefix_count_to_del = 0
836             if self.log_debug:
837                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
838             if not self.phase1_start_time:
839                 self.phase1_start_time = time.time()
840         else:
841             if self.log_debug:
842                 logger.debug("--- COMBINED SCENARIO ---")
843             if not self.phase2_start_time:
844                 self.phase2_start_time = time.time()
845         # tailor the number of prefixes if needed
846         prefix_count_to_add = prefix_count_to_del + min(
847             prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes
848         )
849         # prefix slots selection for insertion and withdrawal
850         slot_index_to_add = self.iteration
851         slot_index_to_del = slot_index_to_add - self.slot_gap_default
852         # getting lists of prefixes for insertion in this iteration
853         if self.log_debug:
854             logger.debug("Prefixes to be inserted in this iteration:")
855         prefix_list_to_add = self.get_prefix_list(
856             slot_index_to_add, prefix_count=prefix_count_to_add
857         )
858         # getting lists of prefixes for withdrawal in this iteration
859         if self.log_debug:
860             logger.debug("Prefixes to be withdrawn in this iteration:")
861         prefix_list_to_del = self.get_prefix_list(
862             slot_index_to_del, prefix_count=prefix_count_to_del
863         )
864         # generating the UPDATE mesage with LS-NLRI only
865         if self.bgpls:
866             ls_nlri = self.get_ls_nlri_values(self.iteration)
867             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[], **ls_nlri)
868         else:
869             # generating the UPDATE message with prefix lists
870             if self.single_update_default:
871                 # Send prefixes to be introduced and withdrawn
872                 # in one UPDATE message
873                 msg_out = self.update_message(
874                     wr_prefixes=prefix_list_to_del, nlri_prefixes=prefix_list_to_add
875                 )
876             else:
877                 # Send prefixes to be introduced and withdrawn
878                 # in separate UPDATE messages (if needed)
879                 msg_out = self.update_message(
880                     wr_prefixes=[], nlri_prefixes=prefix_list_to_add
881                 )
882                 if prefix_count_to_del:
883                     msg_out += self.update_message(
884                         wr_prefixes=prefix_list_to_del, nlri_prefixes=[]
885                     )
886         # updating counters - who knows ... maybe I am last time here ;)
887         if straightforward_scenario:
888             self.phase1_stop_time = time.time()
889             self.phase1_updates_sent = self.updates_sent
890         else:
891             self.phase2_stop_time = time.time()
892             self.phase2_updates_sent = self.updates_sent - self.phase1_updates_sent
893         # updating totals for the next iteration
894         self.iteration += 1
895         self.remaining_prefixes -= prefix_count_to_add - prefix_count_to_del
896         # returning the encoded message
897         return msg_out
898
899     # Section of message encoders
900
901     def open_message(
902         self,
903         version=None,
904         my_autonomous_system=None,
905         hold_time=None,
906         bgp_identifier=None,
907     ):
908         """Generates an OPEN Message (rfc4271#section-4.2)
909
910         Arguments:
911             :param version: see the rfc4271#section-4.2
912             :param my_autonomous_system: see the rfc4271#section-4.2
913             :param hold_time: see the rfc4271#section-4.2
914             :param bgp_identifier: see the rfc4271#section-4.2
915         Returns:
916             :return: encoded OPEN message in HEX
917         """
918
919         # default values handling
920         # TODO optimize default values handling (use e.g. dicionary.update() approach)
921         if version is None:
922             version = self.version_default
923         if my_autonomous_system is None:
924             my_autonomous_system = self.my_autonomous_system_default
925         if hold_time is None:
926             hold_time = self.hold_time_default
927         if bgp_identifier is None:
928             bgp_identifier = self.bgp_identifier_default
929
930         # Marker
931         marker_hex = "\xFF" * 16
932
933         # Type
934         type = 1
935         type_hex = struct.pack("B", type)
936
937         # version
938         version_hex = struct.pack("B", version)
939
940         # my_autonomous_system
941         # AS_TRANS value, 23456 decadic.
942         my_autonomous_system_2_bytes = 23456
943         # AS number is mappable to 2 bytes
944         if my_autonomous_system < 65536:
945             my_autonomous_system_2_bytes = my_autonomous_system
946         my_autonomous_system_hex_2_bytes = struct.pack(">H", my_autonomous_system)
947
948         # Hold Time
949         hold_time_hex = struct.pack(">H", hold_time)
950
951         # BGP Identifier
952         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
953
954         # Optional Parameters
955         optional_parameters_hex = ""
956         if self.rfc4760 or self.allf:
957             optional_parameter_hex = (
958                 "\x02"  # Param type ("Capability Ad")
959                 "\x06"  # Length (6 bytes)
960                 "\x01"  # Capability type (NLRI Unicast),
961                 # see RFC 4760, secton 8
962                 "\x04"  # Capability value length
963                 "\x00\x01"  # AFI (Ipv4)
964                 "\x00"  # (reserved)
965                 "\x01"  # SAFI (Unicast)
966             )
967             optional_parameters_hex += optional_parameter_hex
968
969         if self.ipv6 or self.allf:
970             optional_parameter_hex = (
971                 "\x02"  # Param type ("Capability Ad")
972                 "\x06"  # Length (6 bytes)
973                 "\x01"  # Multiprotocol extetension capability,
974                 "\x04"  # Capability value length
975                 "\x00\x02"  # AFI (IPV6)
976                 "\x00"  # (reserved)
977                 "\x01"  # SAFI (UNICAST)
978             )
979             optional_parameters_hex += optional_parameter_hex
980
981         if self.bgpls or self.allf:
982             optional_parameter_hex = (
983                 "\x02"  # Param type ("Capability Ad")
984                 "\x06"  # Length (6 bytes)
985                 "\x01"  # Capability type (NLRI Unicast),
986                 # see RFC 4760, secton 8
987                 "\x04"  # Capability value length
988                 "\x40\x04"  # AFI (BGP-LS)
989                 "\x00"  # (reserved)
990                 "\x47"  # SAFI (BGP-LS)
991             )
992             optional_parameters_hex += optional_parameter_hex
993
994         if self.evpn or self.allf:
995             optional_parameter_hex = (
996                 "\x02"  # Param type ("Capability Ad")
997                 "\x06"  # Length (6 bytes)
998                 "\x01"  # Multiprotocol extetension capability,
999                 "\x04"  # Capability value length
1000                 "\x00\x19"  # AFI (L2-VPN)
1001                 "\x00"  # (reserved)
1002                 "\x46"  # SAFI (EVPN)
1003             )
1004             optional_parameters_hex += optional_parameter_hex
1005
1006         if self.mvpn or self.allf:
1007             optional_parameter_hex = (
1008                 "\x02"  # Param type ("Capability Ad")
1009                 "\x06"  # Length (6 bytes)
1010                 "\x01"  # Multiprotocol extetension capability,
1011                 "\x04"  # Capability value length
1012                 "\x00\x01"  # AFI (IPV4)
1013                 "\x00"  # (reserved)
1014                 "\x05"  # SAFI (MCAST-VPN)
1015             )
1016             optional_parameters_hex += optional_parameter_hex
1017             optional_parameter_hex = (
1018                 "\x02"  # Param type ("Capability Ad")
1019                 "\x06"  # Length (6 bytes)
1020                 "\x01"  # Multiprotocol extetension capability,
1021                 "\x04"  # Capability value length
1022                 "\x00\x02"  # AFI (IPV6)
1023                 "\x00"  # (reserved)
1024                 "\x05"  # SAFI (MCAST-VPN)
1025             )
1026             optional_parameters_hex += optional_parameter_hex
1027
1028         if self.l3vpn_mcast or self.allf:
1029             optional_parameter_hex = (
1030                 "\x02"  # Param type ("Capability Ad")
1031                 "\x06"  # Length (6 bytes)
1032                 "\x01"  # Multiprotocol extetension capability,
1033                 "\x04"  # Capability value length
1034                 "\x00\x01"  # AFI (IPV4)
1035                 "\x00"  # (reserved)
1036                 "\x81"  # SAFI (L3VPN-MCAST)
1037             )
1038             optional_parameters_hex += optional_parameter_hex
1039             optional_parameter_hex = (
1040                 "\x02"  # Param type ("Capability Ad")
1041                 "\x06"  # Length (6 bytes)
1042                 "\x01"  # Multiprotocol extetension capability,
1043                 "\x04"  # Capability value length
1044                 "\x00\x02"  # AFI (IPV6)
1045                 "\x00"  # (reserved)
1046                 "\x81"  # SAFI (L3VPN-MCAST)
1047             )
1048             optional_parameters_hex += optional_parameter_hex
1049
1050         if self.l3vpn or self.allf:
1051             optional_parameter_hex = (
1052                 "\x02"  # Param type ("Capability Ad")
1053                 "\x06"  # Length (6 bytes)
1054                 "\x01"  # Multiprotocol extetension capability,
1055                 "\x04"  # Capability value length
1056                 "\x00\x01"  # AFI (IPV4)
1057                 "\x00"  # (reserved)
1058                 "\x80"  # SAFI (L3VPN-UNICAST)
1059             )
1060             optional_parameters_hex += optional_parameter_hex
1061             optional_parameter_hex = (
1062                 "\x02"  # Param type ("Capability Ad")
1063                 "\x06"  # Length (6 bytes)
1064                 "\x01"  # Multiprotocol extetension capability,
1065                 "\x04"  # Capability value length
1066                 "\x00\x02"  # AFI (IPV6)
1067                 "\x00"  # (reserved)
1068                 "\x80"  # SAFI (L3VPN-UNICAST)
1069             )
1070             optional_parameters_hex += optional_parameter_hex
1071
1072         if self.rt_constrain or self.allf:
1073             optional_parameter_hex = (
1074                 "\x02"  # Param type ("Capability Ad")
1075                 "\x06"  # Length (6 bytes)
1076                 "\x01"  # Multiprotocol extetension capability,
1077                 "\x04"  # Capability value length
1078                 "\x00\x01"  # AFI (IPV4)
1079                 "\x00"  # (reserved)
1080                 "\x84"  # SAFI (ROUTE-TARGET-CONSTRAIN)
1081             )
1082             optional_parameters_hex += optional_parameter_hex
1083
1084         optional_parameter_hex = (
1085             "\x02"  # Param type ("Capability Ad")
1086             "\x06"  # Length (6 bytes)
1087             "\x41"  # "32 bit AS Numbers Support"
1088             # (see RFC 6793, section 3)
1089             "\x04"  # Capability value length
1090         )
1091         optional_parameter_hex += struct.pack(
1092             ">I", my_autonomous_system
1093         )  # My AS in 32 bit format
1094         optional_parameters_hex += optional_parameter_hex
1095
1096         if self.grace != 8:
1097             b = list(bin(self.grace)[2:])
1098             b = b + [0] * (3 - len(b))
1099             length = "\x08"
1100             if b[1] == "1":
1101                 restart_flag = "\x80\x05"
1102             else:
1103                 restart_flag = "\x00\x05"
1104             if b[2] == "1":
1105                 ipv4_flag = "\x80"
1106             else:
1107                 ipv4_flag = "\x00"
1108             if b[0] == "1":
1109                 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
1110                 length = "\x11"
1111             else:
1112                 ll_gr = ""
1113             logger.debug("Grace parameters list: {}".format(b))
1114             # "\x02" Param type ("Capability Ad")
1115             # :param length: Length of whole message
1116             # "\x40" Graceful-restart capability
1117             # "\x06" Length (6 bytes)
1118             # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
1119             # "\x05" Restart timer (5sec)
1120             # "\x00\x01" AFI (IPV4)
1121             # "\x01"  SAFI (Unicast)
1122             # "\x00"  Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
1123             # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
1124             # ll-gr turned on when grace is between 4-7
1125             optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
1126                 length, restart_flag, ipv4_flag, ll_gr
1127             )
1128             optional_parameters_hex += optional_parameter_hex
1129
1130         # Optional Parameters Length
1131         optional_parameters_length = len(optional_parameters_hex)
1132         optional_parameters_length_hex = struct.pack("B", optional_parameters_length)
1133
1134         # Length (big-endian)
1135         length = (
1136             len(marker_hex)
1137             + 2
1138             + len(type_hex)
1139             + len(version_hex)
1140             + len(my_autonomous_system_hex_2_bytes)
1141             + len(hold_time_hex)
1142             + len(bgp_identifier_hex)
1143             + len(optional_parameters_length_hex)
1144             + len(optional_parameters_hex)
1145         )
1146         length_hex = struct.pack(">H", length)
1147
1148         # OPEN Message
1149         message_hex = (
1150             marker_hex
1151             + length_hex
1152             + type_hex
1153             + version_hex
1154             + my_autonomous_system_hex_2_bytes
1155             + hold_time_hex
1156             + bgp_identifier_hex
1157             + optional_parameters_length_hex
1158             + optional_parameters_hex
1159         )
1160
1161         if self.log_debug:
1162             logger.debug("OPEN message encoding")
1163             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1164             logger.debug(
1165                 "  Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1166             )
1167             logger.debug(
1168                 "  Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1169             )
1170             logger.debug(
1171                 "  Version="
1172                 + str(version)
1173                 + " (0x"
1174                 + binascii.hexlify(version_hex)
1175                 + ")"
1176             )
1177             logger.debug(
1178                 "  My Autonomous System="
1179                 + str(my_autonomous_system_2_bytes)
1180                 + " (0x"
1181                 + binascii.hexlify(my_autonomous_system_hex_2_bytes)
1182                 + ")"
1183             )
1184             logger.debug(
1185                 "  Hold Time="
1186                 + str(hold_time)
1187                 + " (0x"
1188                 + binascii.hexlify(hold_time_hex)
1189                 + ")"
1190             )
1191             logger.debug(
1192                 "  BGP Identifier="
1193                 + str(bgp_identifier)
1194                 + " (0x"
1195                 + binascii.hexlify(bgp_identifier_hex)
1196                 + ")"
1197             )
1198             logger.debug(
1199                 "  Optional Parameters Length="
1200                 + str(optional_parameters_length)
1201                 + " (0x"
1202                 + binascii.hexlify(optional_parameters_length_hex)
1203                 + ")"
1204             )
1205             logger.debug(
1206                 "  Optional Parameters=0x" + binascii.hexlify(optional_parameters_hex)
1207             )
1208             logger.debug("OPEN message encoded: 0x%s", binascii.b2a_hex(message_hex))
1209
1210         return message_hex
1211
1212     def update_message(
1213         self,
1214         wr_prefixes=None,
1215         nlri_prefixes=None,
1216         wr_prefix_length=None,
1217         nlri_prefix_length=None,
1218         my_autonomous_system=None,
1219         next_hop=None,
1220         originator_id=None,
1221         cluster_list_item=None,
1222         end_of_rib=False,
1223         **ls_nlri_params
1224     ):
1225         """Generates an UPDATE Message (rfc4271#section-4.3)
1226
1227         Arguments:
1228             :param wr_prefixes: see the rfc4271#section-4.3
1229             :param nlri_prefixes: see the rfc4271#section-4.3
1230             :param wr_prefix_length: see the rfc4271#section-4.3
1231             :param nlri_prefix_length: see the rfc4271#section-4.3
1232             :param my_autonomous_system: see the rfc4271#section-4.3
1233             :param next_hop: see the rfc4271#section-4.3
1234         Returns:
1235             :return: encoded UPDATE message in HEX
1236         """
1237
1238         # default values handling
1239         # TODO optimize default values handling (use e.g. dicionary.update() approach)
1240         if wr_prefixes is None:
1241             wr_prefixes = self.wr_prefixes_default
1242         if nlri_prefixes is None:
1243             nlri_prefixes = self.nlri_prefixes_default
1244         if wr_prefix_length is None:
1245             wr_prefix_length = self.prefix_length_default
1246         if nlri_prefix_length is None:
1247             nlri_prefix_length = self.prefix_length_default
1248         if my_autonomous_system is None:
1249             my_autonomous_system = self.my_autonomous_system_default
1250         if next_hop is None:
1251             next_hop = self.next_hop_default
1252         if originator_id is None:
1253             originator_id = self.originator_id_default
1254         if cluster_list_item is None:
1255             cluster_list_item = self.cluster_list_item_default
1256         ls_nlri = self.ls_nlri_default.copy()
1257         ls_nlri.update(ls_nlri_params)
1258
1259         # Marker
1260         marker_hex = "\xFF" * 16
1261
1262         # Type
1263         type = 2
1264         type_hex = struct.pack("B", type)
1265
1266         # Withdrawn Routes
1267         withdrawn_routes_hex = ""
1268         if not self.bgpls:
1269             bytes = ((wr_prefix_length - 1) / 8) + 1
1270             for prefix in wr_prefixes:
1271                 withdrawn_route_hex = (
1272                     struct.pack("B", wr_prefix_length)
1273                     + struct.pack(">I", int(prefix))[:bytes]
1274                 )
1275                 withdrawn_routes_hex += withdrawn_route_hex
1276
1277         # Withdrawn Routes Length
1278         withdrawn_routes_length = len(withdrawn_routes_hex)
1279         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1280
1281         # TODO: to replace hardcoded string by encoding?
1282         # Path Attributes
1283         path_attributes_hex = ""
1284         if not self.skipattr:
1285             path_attributes_hex += (
1286                 "\x40"  # Flags ("Well-Known")
1287                 "\x01"  # Type (ORIGIN)
1288                 "\x01"  # Length (1)
1289                 "\x00"  # Origin: IGP
1290             )
1291             path_attributes_hex += (
1292                 "\x40"  # Flags ("Well-Known")
1293                 "\x02"  # Type (AS_PATH)
1294                 "\x06"  # Length (6)
1295                 "\x02"  # AS segment type (AS_SEQUENCE)
1296                 "\x01"  # AS segment length (1)
1297             )
1298             my_as_hex = struct.pack(">I", my_autonomous_system)
1299             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1300             path_attributes_hex += (
1301                 "\x40"  # Flags ("Well-Known")
1302                 "\x05"  # Type (LOCAL_PREF)
1303                 "\x04"  # Length (4)
1304                 "\x00\x00\x00\x64"  # (100)
1305             )
1306         if nlri_prefixes != []:
1307             path_attributes_hex += (
1308                 "\x40"  # Flags ("Well-Known")
1309                 "\x03"  # Type (NEXT_HOP)
1310                 "\x04"  # Length (4)
1311             )
1312             next_hop_hex = struct.pack(">I", int(next_hop))
1313             path_attributes_hex += next_hop_hex  # IP address of the next hop (4 bytes)
1314             if originator_id is not None:
1315                 path_attributes_hex += (
1316                     "\x80"  # Flags ("Optional, non-transitive")
1317                     "\x09"  # Type (ORIGINATOR_ID)
1318                     "\x04"  # Length (4)
1319                 )  # ORIGINATOR_ID (4 bytes)
1320                 path_attributes_hex += struct.pack(">I", int(originator_id))
1321             if cluster_list_item is not None:
1322                 path_attributes_hex += (
1323                     "\x80"  # Flags ("Optional, non-transitive")
1324                     "\x0a"  # Type (CLUSTER_LIST)
1325                     "\x04"  # Length (4)
1326                 )  # one CLUSTER_LIST item (4 bytes)
1327                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1328
1329         if self.bgpls and not end_of_rib:
1330             path_attributes_hex += (
1331                 "\x80"  # Flags ("Optional, non-transitive")
1332                 "\x0e"  # Type (MP_REACH_NLRI)
1333                 "\x22"  # Length (34)
1334                 "\x40\x04"  # AFI (BGP-LS)
1335                 "\x47"  # SAFI (BGP-LS)
1336                 "\x04"  # Next Hop Length (4)
1337             )
1338             path_attributes_hex += struct.pack(">I", int(next_hop))
1339             path_attributes_hex += "\x00"  # Reserved
1340             path_attributes_hex += (
1341                 "\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1342                 "\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1343                 "\x07"  # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1344             )
1345             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1346             path_attributes_hex += struct.pack(
1347                 ">I", int(ls_nlri["IPv4TunnelSenderAddress"])
1348             )
1349             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1350             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1351             path_attributes_hex += struct.pack(
1352                 ">I", int(ls_nlri["IPv4TunnelEndPointAddress"])
1353             )
1354
1355         # Total Path Attributes Length
1356         total_path_attributes_length = len(path_attributes_hex)
1357         total_path_attributes_length_hex = struct.pack(
1358             ">H", total_path_attributes_length
1359         )
1360
1361         # Network Layer Reachability Information
1362         nlri_hex = ""
1363         if not self.bgpls:
1364             bytes = ((nlri_prefix_length - 1) / 8) + 1
1365             for prefix in nlri_prefixes:
1366                 nlri_prefix_hex = (
1367                     struct.pack("B", nlri_prefix_length)
1368                     + struct.pack(">I", int(prefix))[:bytes]
1369                 )
1370                 nlri_hex += nlri_prefix_hex
1371
1372         # Length (big-endian)
1373         length = (
1374             len(marker_hex)
1375             + 2
1376             + len(type_hex)
1377             + len(withdrawn_routes_length_hex)
1378             + len(withdrawn_routes_hex)
1379             + len(total_path_attributes_length_hex)
1380             + len(path_attributes_hex)
1381             + len(nlri_hex)
1382         )
1383         length_hex = struct.pack(">H", length)
1384
1385         # UPDATE Message
1386         message_hex = (
1387             marker_hex
1388             + length_hex
1389             + type_hex
1390             + withdrawn_routes_length_hex
1391             + withdrawn_routes_hex
1392             + total_path_attributes_length_hex
1393             + path_attributes_hex
1394             + nlri_hex
1395         )
1396
1397         if self.grace != 8 and self.grace != 0 and end_of_rib:
1398             message_hex = marker_hex + binascii.unhexlify("00170200000000")
1399
1400         if self.log_debug:
1401             logger.debug("UPDATE message encoding")
1402             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1403             logger.debug(
1404                 "  Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1405             )
1406             logger.debug(
1407                 "  Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1408             )
1409             logger.debug(
1410                 "  withdrawn_routes_length="
1411                 + str(withdrawn_routes_length)
1412                 + " (0x"
1413                 + binascii.hexlify(withdrawn_routes_length_hex)
1414                 + ")"
1415             )
1416             logger.debug(
1417                 "  Withdrawn_Routes="
1418                 + str(wr_prefixes)
1419                 + "/"
1420                 + str(wr_prefix_length)
1421                 + " (0x"
1422                 + binascii.hexlify(withdrawn_routes_hex)
1423                 + ")"
1424             )
1425             if total_path_attributes_length:
1426                 logger.debug(
1427                     "  Total Path Attributes Length="
1428                     + str(total_path_attributes_length)
1429                     + " (0x"
1430                     + binascii.hexlify(total_path_attributes_length_hex)
1431                     + ")"
1432                 )
1433                 logger.debug(
1434                     "  Path Attributes="
1435                     + "(0x"
1436                     + binascii.hexlify(path_attributes_hex)
1437                     + ")"
1438                 )
1439                 logger.debug("    Origin=IGP")
1440                 logger.debug("    AS path=" + str(my_autonomous_system))
1441                 logger.debug("    Next hop=" + str(next_hop))
1442                 if originator_id is not None:
1443                     logger.debug("    Originator id=" + str(originator_id))
1444                 if cluster_list_item is not None:
1445                     logger.debug("    Cluster list=" + str(cluster_list_item))
1446                 if self.bgpls:
1447                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1448             logger.debug(
1449                 "  Network Layer Reachability Information="
1450                 + str(nlri_prefixes)
1451                 + "/"
1452                 + str(nlri_prefix_length)
1453                 + " (0x"
1454                 + binascii.hexlify(nlri_hex)
1455                 + ")"
1456             )
1457             logger.debug("UPDATE message encoded: 0x" + binascii.b2a_hex(message_hex))
1458
1459         # updating counter
1460         self.updates_sent += 1
1461         # returning encoded message
1462         return message_hex
1463
1464     def notification_message(self, error_code, error_subcode, data_hex=""):
1465         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1466
1467         Arguments:
1468             :param error_code: see the rfc4271#section-4.5
1469             :param error_subcode: see the rfc4271#section-4.5
1470             :param data_hex: see the rfc4271#section-4.5
1471         Returns:
1472             :return: encoded NOTIFICATION message in HEX
1473         """
1474
1475         # Marker
1476         marker_hex = "\xFF" * 16
1477
1478         # Type
1479         type = 3
1480         type_hex = struct.pack("B", type)
1481
1482         # Error Code
1483         error_code_hex = struct.pack("B", error_code)
1484
1485         # Error Subode
1486         error_subcode_hex = struct.pack("B", error_subcode)
1487
1488         # Length (big-endian)
1489         length = (
1490             len(marker_hex)
1491             + 2
1492             + len(type_hex)
1493             + len(error_code_hex)
1494             + len(error_subcode_hex)
1495             + len(data_hex)
1496         )
1497         length_hex = struct.pack(">H", length)
1498
1499         # NOTIFICATION Message
1500         message_hex = (
1501             marker_hex
1502             + length_hex
1503             + type_hex
1504             + error_code_hex
1505             + error_subcode_hex
1506             + data_hex
1507         )
1508
1509         if self.log_debug:
1510             logger.debug("NOTIFICATION message encoding")
1511             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1512             logger.debug(
1513                 "  Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1514             )
1515             logger.debug(
1516                 "  Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1517             )
1518             logger.debug(
1519                 "  Error Code="
1520                 + str(error_code)
1521                 + " (0x"
1522                 + binascii.hexlify(error_code_hex)
1523                 + ")"
1524             )
1525             logger.debug(
1526                 "  Error Subode="
1527                 + str(error_subcode)
1528                 + " (0x"
1529                 + binascii.hexlify(error_subcode_hex)
1530                 + ")"
1531             )
1532             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1533             logger.debug(
1534                 "NOTIFICATION message encoded: 0x%s", binascii.b2a_hex(message_hex)
1535             )
1536
1537         return message_hex
1538
1539     def keepalive_message(self):
1540         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1541
1542         Returns:
1543             :return: encoded KEEP ALIVE message in HEX
1544         """
1545
1546         # Marker
1547         marker_hex = "\xFF" * 16
1548
1549         # Type
1550         type = 4
1551         type_hex = struct.pack("B", type)
1552
1553         # Length (big-endian)
1554         length = len(marker_hex) + 2 + len(type_hex)
1555         length_hex = struct.pack(">H", length)
1556
1557         # KEEP ALIVE Message
1558         message_hex = marker_hex + length_hex + type_hex
1559
1560         if self.log_debug:
1561             logger.debug("KEEP ALIVE message encoding")
1562             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
1563             logger.debug(
1564                 "  Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1565             )
1566             logger.debug(
1567                 "  Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1568             )
1569             logger.debug(
1570                 "KEEP ALIVE message encoded: 0x%s", binascii.b2a_hex(message_hex)
1571             )
1572
1573         return message_hex
1574
1575
1576 class TimeTracker(object):
1577     """Class for tracking timers, both for my keepalives and
1578     peer's hold time.
1579     """
1580
1581     def __init__(self, msg_in):
1582         """Initialisation. based on defaults and OPEN message from peer.
1583
1584         Arguments:
1585             msg_in: the OPEN message received from peer.
1586         """
1587         # Note: Relative time is always named timedelta, to stress that
1588         # the (non-delta) time is absolute.
1589         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1590         # Upper bound for being stuck in the same state, we should
1591         # at least report something before continuing.
1592         # Negotiate the hold timer by taking the smaller
1593         # of the 2 values (mine and the peer's).
1594         hold_timedelta = 180  # Not an attribute of self yet.
1595         # TODO: Make the default value configurable,
1596         # default value could mirror what peer said.
1597         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1598         if hold_timedelta > peer_hold_timedelta:
1599             hold_timedelta = peer_hold_timedelta
1600         if hold_timedelta != 0 and hold_timedelta < 3:
1601             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1602             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1603         self.hold_timedelta = hold_timedelta
1604         # If we do not hear from peer this long, we assume it has died.
1605         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1606         # Upper limit for duration between messages, to avoid being
1607         # declared to be dead.
1608         # The same as calling snapshot(), but also declares a field.
1609         self.snapshot_time = time.time()
1610         # Sometimes we need to store time. This is where to get
1611         # the value from afterwards. Time_keepalive may be too strict.
1612         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1613         # At this time point, peer will be declared dead.
1614         self.my_keepalive_time = None  # to be set later
1615         # At this point, we should be sending keepalive message.
1616
1617     def snapshot(self):
1618         """Store current time in instance data to use later."""
1619         # Read as time before something interesting was called.
1620         self.snapshot_time = time.time()
1621
1622     def reset_peer_hold_time(self):
1623         """Move hold time to future as peer has just proven it still lives."""
1624         self.peer_hold_time = time.time() + self.hold_timedelta
1625
1626     # Some methods could rely on self.snapshot_time, but it is better
1627     # to require user to provide it explicitly.
1628     def reset_my_keepalive_time(self, keepalive_time):
1629         """Calculate and set the next my KEEP ALIVE timeout time
1630
1631         Arguments:
1632             :keepalive_time: the initial value of the KEEP ALIVE timer
1633         """
1634         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1635
1636     def is_time_for_my_keepalive(self):
1637         """Check for my KEEP ALIVE timeout occurence"""
1638         if self.hold_timedelta == 0:
1639             return False
1640         return self.snapshot_time >= self.my_keepalive_time
1641
1642     def get_next_event_time(self):
1643         """Set the time of the next expected or to be sent KEEP ALIVE"""
1644         if self.hold_timedelta == 0:
1645             return self.snapshot_time + 86400
1646         return min(self.my_keepalive_time, self.peer_hold_time)
1647
1648     def check_peer_hold_time(self, snapshot_time):
1649         """Raise error if nothing was read from peer until specified time."""
1650         # Hold time = 0 means keepalive checking off.
1651         if self.hold_timedelta != 0:
1652             # time.time() may be too strict
1653             if snapshot_time > self.peer_hold_time:
1654                 logger.error("Peer has overstepped the hold timer.")
1655                 raise RuntimeError("Peer has overstepped the hold timer.")
1656                 # TODO: Include hold_timedelta?
1657                 # TODO: Add notification sending (attempt). That means
1658                 # move to write tracker.
1659
1660
1661 class ReadTracker(object):
1662     """Class for tracking read of mesages chunk by chunk and
1663     for idle waiting.
1664     """
1665
1666     def __init__(
1667         self,
1668         bgp_socket,
1669         timer,
1670         storage,
1671         evpn=False,
1672         mvpn=False,
1673         l3vpn_mcast=False,
1674         allf=False,
1675         l3vpn=False,
1676         rt_constrain=False,
1677         ipv6=False,
1678         grace=8,
1679         wait_for_read=10,
1680     ):
1681         """The reader initialisation.
1682
1683         Arguments:
1684             bgp_socket: socket to be used for sending
1685             timer: timer to be used for scheduling
1686             storage: thread safe dict
1687             evpn: flag that evpn functionality is tested
1688             mvpn: flag that mvpn functionality is tested
1689             grace: flag that grace-restart functionality is tested
1690             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1691             l3vpn: flag that l3vpn unicast functionality is tested
1692             rt_constrain: flag that rt-constrain functionality is tested
1693             allf: flag for all family testing.
1694         """
1695         # References to outside objects.
1696         self.socket = bgp_socket
1697         self.timer = timer
1698         # BGP marker length plus length field length.
1699         self.header_length = 18
1700         # TODO: make it class (constant) attribute
1701         # Computation of where next chunk ends depends on whether
1702         # we are beyond length field.
1703         self.reading_header = True
1704         # Countdown towards next size computation.
1705         self.bytes_to_read = self.header_length
1706         # Incremental buffer for message under read.
1707         self.msg_in = ""
1708         # Initialising counters
1709         self.updates_received = 0
1710         self.prefixes_introduced = 0
1711         self.prefixes_withdrawn = 0
1712         self.rx_idle_time = 0
1713         self.rx_activity_detected = True
1714         self.storage = storage
1715         self.evpn = evpn
1716         self.mvpn = mvpn
1717         self.l3vpn_mcast = l3vpn_mcast
1718         self.l3vpn = l3vpn
1719         self.rt_constrain = rt_constrain
1720         self.ipv6 = ipv6
1721         self.allf = allf
1722         self.wfr = wait_for_read
1723         self.grace = grace
1724
1725     def read_message_chunk(self):
1726         """Read up to one message
1727
1728         Note:
1729             Currently it does not return anything.
1730         """
1731         # TODO: We could return the whole message, currently not needed.
1732         # We assume the socket is readable.
1733         chunk_message = self.socket.recv(self.bytes_to_read)
1734         self.msg_in += chunk_message
1735         self.bytes_to_read -= len(chunk_message)
1736         # TODO: bytes_to_read < 0 is not possible, right?
1737         if not self.bytes_to_read:
1738             # Finished reading a logical block.
1739             if self.reading_header:
1740                 # The logical block was a BGP header.
1741                 # Now we know the size of the message.
1742                 self.reading_header = False
1743                 self.bytes_to_read = (
1744                     get_short_int_from_message(self.msg_in) - self.header_length
1745                 )
1746             else:  # We have finished reading the body of the message.
1747                 # Peer has just proven it is still alive.
1748                 self.timer.reset_peer_hold_time()
1749                 # TODO: Do we want to count received messages?
1750                 # This version ignores the received message.
1751                 # TODO: Should we do validation and exit on anything
1752                 # besides update or keepalive?
1753                 # Prepare state for reading another message.
1754                 message_type_hex = self.msg_in[self.header_length]
1755                 if message_type_hex == "\x01":
1756                     logger.info(
1757                         "OPEN message received: 0x%s", binascii.b2a_hex(self.msg_in)
1758                     )
1759                 elif message_type_hex == "\x02":
1760                     logger.debug(
1761                         "UPDATE message received: 0x%s", binascii.b2a_hex(self.msg_in)
1762                     )
1763                     self.decode_update_message(self.msg_in)
1764                 elif message_type_hex == "\x03":
1765                     logger.info(
1766                         "NOTIFICATION message received: 0x%s",
1767                         binascii.b2a_hex(self.msg_in),
1768                     )
1769                 elif message_type_hex == "\x04":
1770                     logger.info(
1771                         "KEEP ALIVE message received: 0x%s",
1772                         binascii.b2a_hex(self.msg_in),
1773                     )
1774                 else:
1775                     logger.warning(
1776                         "Unexpected message received: 0x%s",
1777                         binascii.b2a_hex(self.msg_in),
1778                     )
1779                 self.msg_in = ""
1780                 self.reading_header = True
1781                 self.bytes_to_read = self.header_length
1782         # We should not act upon peer_hold_time if we are reading
1783         # something right now.
1784         return
1785
1786     def decode_path_attributes(self, path_attributes_hex):
1787         """Decode the Path Attributes field (rfc4271#section-4.3)
1788
1789         Arguments:
1790             :path_attributes: path_attributes field to be decoded in hex
1791         Returns:
1792             :return: None
1793         """
1794         hex_to_decode = path_attributes_hex
1795
1796         while len(hex_to_decode):
1797             attr_flags_hex = hex_to_decode[0]
1798             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1799             #            attr_optional_bit = attr_flags & 128
1800             #            attr_transitive_bit = attr_flags & 64
1801             #            attr_partial_bit = attr_flags & 32
1802             attr_extended_length_bit = attr_flags & 16
1803
1804             attr_type_code_hex = hex_to_decode[1]
1805             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1806
1807             if attr_extended_length_bit:
1808                 attr_length_hex = hex_to_decode[2:4]
1809                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1810                 attr_value_hex = hex_to_decode[4 : 4 + attr_length]
1811                 hex_to_decode = hex_to_decode[4 + attr_length :]
1812             else:
1813                 attr_length_hex = hex_to_decode[2]
1814                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1815                 attr_value_hex = hex_to_decode[3 : 3 + attr_length]
1816                 hex_to_decode = hex_to_decode[3 + attr_length :]
1817
1818             if attr_type_code == 1:
1819                 logger.debug(
1820                     "Attribute type=1 (ORIGIN, flags:0x%s)",
1821                     binascii.b2a_hex(attr_flags_hex),
1822                 )
1823                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1824             elif attr_type_code == 2:
1825                 logger.debug(
1826                     "Attribute type=2 (AS_PATH, flags:0x%s)",
1827                     binascii.b2a_hex(attr_flags_hex),
1828                 )
1829                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1830             elif attr_type_code == 3:
1831                 logger.debug(
1832                     "Attribute type=3 (NEXT_HOP, flags:0x%s)",
1833                     binascii.b2a_hex(attr_flags_hex),
1834                 )
1835                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1836             elif attr_type_code == 4:
1837                 logger.debug(
1838                     "Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1839                     binascii.b2a_hex(attr_flags_hex),
1840                 )
1841                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1842             elif attr_type_code == 5:
1843                 logger.debug(
1844                     "Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1845                     binascii.b2a_hex(attr_flags_hex),
1846                 )
1847                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1848             elif attr_type_code == 6:
1849                 logger.debug(
1850                     "Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1851                     binascii.b2a_hex(attr_flags_hex),
1852                 )
1853                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1854             elif attr_type_code == 7:
1855                 logger.debug(
1856                     "Attribute type=7 (AGGREGATOR, flags:0x%s)",
1857                     binascii.b2a_hex(attr_flags_hex),
1858                 )
1859                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1860             elif attr_type_code == 9:  # rfc4456#section-8
1861                 logger.debug(
1862                     "Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1863                     binascii.b2a_hex(attr_flags_hex),
1864                 )
1865                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1866             elif attr_type_code == 10:  # rfc4456#section-8
1867                 logger.debug(
1868                     "Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1869                     binascii.b2a_hex(attr_flags_hex),
1870                 )
1871                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1872             elif attr_type_code == 14:  # rfc4760#section-3
1873                 logger.debug(
1874                     "Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1875                     binascii.b2a_hex(attr_flags_hex),
1876                 )
1877                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1878                 address_family_identifier_hex = attr_value_hex[0:2]
1879                 logger.debug(
1880                     "  Address Family Identifier=0x%s",
1881                     binascii.b2a_hex(address_family_identifier_hex),
1882                 )
1883                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1884                 logger.debug(
1885                     "  Subsequent Address Family Identifier=0x%s",
1886                     binascii.b2a_hex(subsequent_address_family_identifier_hex),
1887                 )
1888                 next_hop_netaddr_len_hex = attr_value_hex[3]
1889                 next_hop_netaddr_len = int(
1890                     binascii.b2a_hex(next_hop_netaddr_len_hex), 16
1891                 )
1892                 logger.debug(
1893                     "  Length of Next Hop Network Address=%s (0x%s)",
1894                     next_hop_netaddr_len,
1895                     binascii.b2a_hex(next_hop_netaddr_len_hex),
1896                 )
1897                 next_hop_netaddr_hex = attr_value_hex[4 : 4 + next_hop_netaddr_len]
1898                 next_hop_netaddr = ".".join(
1899                     str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)
1900                 )
1901                 logger.debug(
1902                     "  Network Address of Next Hop=%s (0x%s)",
1903                     next_hop_netaddr,
1904                     binascii.b2a_hex(next_hop_netaddr_hex),
1905                 )
1906                 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1907                 logger.debug("  Reserved=0x%s", binascii.b2a_hex(reserved_hex))
1908                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1 :]
1909                 logger.debug(
1910                     "  Network Layer Reachability Information=0x%s",
1911                     binascii.b2a_hex(nlri_hex),
1912                 )
1913                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1914                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1915                 for prefix in nlri_prefix_list:
1916                     logger.debug("  nlri_prefix_received: %s", prefix)
1917                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1918             elif attr_type_code == 15:  # rfc4760#section-4
1919                 logger.debug(
1920                     "Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1921                     binascii.b2a_hex(attr_flags_hex),
1922                 )
1923                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1924                 address_family_identifier_hex = attr_value_hex[0:2]
1925                 logger.debug(
1926                     "  Address Family Identifier=0x%s",
1927                     binascii.b2a_hex(address_family_identifier_hex),
1928                 )
1929                 subsequent_address_family_identifier_hex = attr_value_hex[2]
1930                 logger.debug(
1931                     "  Subsequent Address Family Identifier=0x%s",
1932                     binascii.b2a_hex(subsequent_address_family_identifier_hex),
1933                 )
1934                 wd_hex = attr_value_hex[3:]
1935                 logger.debug("  Withdrawn Routes=0x%s", binascii.b2a_hex(wd_hex))
1936                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1937                 logger.debug("  Withdrawn routes prefix list: %s", wdr_prefix_list)
1938                 for prefix in wdr_prefix_list:
1939                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1940                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1941             else:
1942                 logger.debug(
1943                     "Unknown attribute type=%s, flags:0x%s)",
1944                     attr_type_code,
1945                     binascii.b2a_hex(attr_flags_hex),
1946                 )
1947                 logger.debug(
1948                     "Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)
1949                 )
1950         return None
1951
1952     def decode_update_message(self, msg):
1953         """Decode an UPDATE message (rfc4271#section-4.3)
1954
1955         Arguments:
1956             :msg: message to be decoded in hex
1957         Returns:
1958             :return: None
1959         """
1960         logger.debug("Decoding update message:")
1961         # message header - marker
1962         marker_hex = msg[:16]
1963         logger.debug("Message header marker: 0x%s", binascii.b2a_hex(marker_hex))
1964         # message header - message length
1965         msg_length_hex = msg[16:18]
1966         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1967         logger.debug(
1968             "Message lenght: 0x%s (%s)", binascii.b2a_hex(msg_length_hex), msg_length
1969         )
1970         # message header - message type
1971         msg_type_hex = msg[18:19]
1972         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1973
1974         with self.storage as stor:
1975             # this will replace the previously stored message
1976             stor["update"] = binascii.hexlify(msg)
1977
1978         logger.debug("Evpn {}".format(self.evpn))
1979         if self.evpn:
1980             logger.debug("Skipping update decoding due to evpn data expected")
1981             return
1982
1983         logger.debug("Graceful-restart {}".format(self.grace))
1984         if self.grace != 8:
1985             logger.debug(
1986                 "Skipping update decoding due to graceful-restart data expected"
1987             )
1988             return
1989
1990         logger.debug("Mvpn {}".format(self.mvpn))
1991         if self.mvpn:
1992             logger.debug("Skipping update decoding due to mvpn data expected")
1993             return
1994
1995         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1996         if self.l3vpn_mcast:
1997             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1998             return
1999
2000         logger.debug("L3vpn-unicast {}".format(self.l3vpn))
2001         if self.l3vpn_mcast:
2002             logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
2003             return
2004
2005         logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
2006         if self.rt_constrain:
2007             logger.debug(
2008                 "Skipping update decoding due to Route-Target-Constrain data expected"
2009             )
2010             return
2011
2012         logger.debug("Ipv6-Unicast {}".format(self.ipv6))
2013         if self.ipv6:
2014             logger.debug("Skipping update decoding due to Ipv6 data expected")
2015             return
2016
2017         logger.debug("Allf {}".format(self.allf))
2018         if self.allf:
2019             logger.debug("Skipping update decoding")
2020             return
2021
2022         if msg_type == 2:
2023             logger.debug("Message type: 0x%s (update)", binascii.b2a_hex(msg_type_hex))
2024             # withdrawn routes length
2025             wdr_length_hex = msg[19:21]
2026             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
2027             logger.debug(
2028                 "Withdrawn routes lenght: 0x%s (%s)",
2029                 binascii.b2a_hex(wdr_length_hex),
2030                 wdr_length,
2031             )
2032             # withdrawn routes
2033             wdr_hex = msg[21 : 21 + wdr_length]
2034             logger.debug("Withdrawn routes: 0x%s", binascii.b2a_hex(wdr_hex))
2035             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
2036             logger.debug("Withdrawn routes prefix list: %s", wdr_prefix_list)
2037             for prefix in wdr_prefix_list:
2038                 logger.debug("withdrawn_prefix_received: %s", prefix)
2039             # total path attribute length
2040             total_pa_length_offset = 21 + wdr_length
2041             total_pa_length_hex = msg[
2042                 total_pa_length_offset : total_pa_length_offset + 2
2043             ]
2044             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
2045             logger.debug(
2046                 "Total path attribute lenght: 0x%s (%s)",
2047                 binascii.b2a_hex(total_pa_length_hex),
2048                 total_pa_length,
2049             )
2050             # path attributes
2051             pa_offset = total_pa_length_offset + 2
2052             pa_hex = msg[pa_offset : pa_offset + total_pa_length]
2053             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
2054             self.decode_path_attributes(pa_hex)
2055             # network layer reachability information length
2056             nlri_length = msg_length - 23 - total_pa_length - wdr_length
2057             logger.debug("Calculated NLRI length: %s", nlri_length)
2058             # network layer reachability information
2059             nlri_offset = pa_offset + total_pa_length
2060             nlri_hex = msg[nlri_offset : nlri_offset + nlri_length]
2061             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
2062             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
2063             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
2064             for prefix in nlri_prefix_list:
2065                 logger.debug("nlri_prefix_received: %s", prefix)
2066             # Updating counters
2067             self.updates_received += 1
2068             self.prefixes_introduced += len(nlri_prefix_list)
2069             self.prefixes_withdrawn += len(wdr_prefix_list)
2070         else:
2071             logger.error(
2072                 "Unexpeced message type 0x%s in 0x%s",
2073                 binascii.b2a_hex(msg_type_hex),
2074                 binascii.b2a_hex(msg),
2075             )
2076
2077     def wait_for_read(self):
2078         """Read message until timeout (next expected event).
2079
2080         Note:
2081             Used when no more updates has to be sent to avoid busy-wait.
2082             Currently it does not return anything.
2083         """
2084         # Compute time to the first predictable state change
2085         event_time = self.timer.get_next_event_time()
2086         # snapshot_time would be imprecise
2087         wait_timedelta = min(event_time - time.time(), self.wfr)
2088         if wait_timedelta < 0:
2089             # The program got around to waiting to an event in "very near
2090             # future" so late that it became a "past" event, thus tell
2091             # "select" to not wait at all. Passing negative timedelta to
2092             # select() would lead to either waiting forever (for -1) or
2093             # select.error("Invalid parameter") (for everything else).
2094             wait_timedelta = 0
2095         # And wait for event or something to read.
2096
2097         if not self.rx_activity_detected or not (self.updates_received % 100):
2098             # right time to write statistics to the log (not for every update and
2099             # not too frequently to avoid having large log files)
2100             logger.info(
2101                 "total_received_update_message_counter: %s", self.updates_received
2102             )
2103             logger.info(
2104                 "total_received_nlri_prefix_counter: %s", self.prefixes_introduced
2105             )
2106             logger.info(
2107                 "total_received_withdrawn_prefix_counter: %s", self.prefixes_withdrawn
2108             )
2109
2110         start_time = time.time()
2111         select.select([self.socket], [], [self.socket], wait_timedelta)
2112         timedelta = time.time() - start_time
2113         self.rx_idle_time += timedelta
2114         self.rx_activity_detected = timedelta < 1
2115
2116         if not self.rx_activity_detected or not (self.updates_received % 100):
2117             # right time to write statistics to the log (not for every update and
2118             # not too frequently to avoid having large log files)
2119             logger.info("... idle for %.3fs", timedelta)
2120             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
2121         return
2122
2123
2124 class WriteTracker(object):
2125     """Class tracking enqueueing messages and sending chunks of them."""
2126
2127     def __init__(self, bgp_socket, generator, timer):
2128         """The writter initialisation.
2129
2130         Arguments:
2131             bgp_socket: socket to be used for sending
2132             generator: generator to be used for message generation
2133             timer: timer to be used for scheduling
2134         """
2135         # References to outside objects,
2136         self.socket = bgp_socket
2137         self.generator = generator
2138         self.timer = timer
2139         # Really new fields.
2140         # TODO: Would attribute docstrings add anything substantial?
2141         self.sending_message = False
2142         self.bytes_to_send = 0
2143         self.msg_out = ""
2144
2145     def enqueue_message_for_sending(self, message):
2146         """Enqueue message and change state.
2147
2148         Arguments:
2149             message: message to be enqueued into the msg_out buffer
2150         """
2151         self.msg_out += message
2152         self.bytes_to_send += len(message)
2153         self.sending_message = True
2154
2155     def send_message_chunk_is_whole(self):
2156         """Send enqueued data from msg_out buffer
2157
2158         Returns:
2159             :return: true if no remaining data to send
2160         """
2161         # We assume there is a msg_out to send and socket is writable.
2162         self.timer.snapshot()
2163         bytes_sent = self.socket.send(self.msg_out)
2164         # Forget the part of message that was sent.
2165         self.msg_out = self.msg_out[bytes_sent:]
2166         self.bytes_to_send -= bytes_sent
2167         if not self.bytes_to_send:
2168             # TODO: Is it possible to hit negative bytes_to_send?
2169             self.sending_message = False
2170             # We should have reset hold timer on peer side.
2171             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
2172             # The possible reason for not prioritizing reads is gone.
2173             return True
2174         return False
2175
2176
2177 class StateTracker(object):
2178     """Main loop has state so complex it warrants this separate class."""
2179
2180     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
2181         """The state tracker initialisation.
2182
2183         Arguments:
2184             bgp_socket: socket to be used for sending / receiving
2185             generator: generator to be used for message generation
2186             timer: timer to be used for scheduling
2187             inqueue: user initiated messages queue
2188             storage: thread safe dict to store data for the rpc server
2189             cliargs: cli args from the user
2190         """
2191         # References to outside objects.
2192         self.socket = bgp_socket
2193         self.generator = generator
2194         self.timer = timer
2195         # Sub-trackers.
2196         self.reader = ReadTracker(
2197             bgp_socket,
2198             timer,
2199             storage,
2200             evpn=cliargs.evpn,
2201             mvpn=cliargs.mvpn,
2202             l3vpn_mcast=cliargs.l3vpn_mcast,
2203             l3vpn=cliargs.l3vpn,
2204             allf=cliargs.allf,
2205             rt_constrain=cliargs.rt_constrain,
2206             ipv6=cliargs.ipv6,
2207             grace=cliargs.grace,
2208             wait_for_read=cliargs.wfr,
2209         )
2210         self.writer = WriteTracker(bgp_socket, generator, timer)
2211         # Prioritization state.
2212         self.prioritize_writing = False
2213         # In general, we prioritize reading over writing. But in order
2214         # not to get blocked by neverending reads, we should
2215         # check whether we are not risking running out of holdtime.
2216         # So in some situations, this field is set to True to attempt
2217         # finishing sending a message, after which this field resets
2218         # back to False.
2219         # TODO: Alternative is to switch fairly between reading and
2220         # writing (called round robin from now on).
2221         # Message counting is done in generator.
2222         self.inqueue = inqueue
2223
2224     def perform_one_loop_iteration(self):
2225         """ The main loop iteration
2226
2227         Notes:
2228             Calculates priority, resolves all conditions, calls
2229             appropriate method and returns to caller to repeat.
2230         """
2231         self.timer.snapshot()
2232         if not self.prioritize_writing:
2233             if self.timer.is_time_for_my_keepalive():
2234                 if not self.writer.sending_message:
2235                     # We need to schedule a keepalive ASAP.
2236                     self.writer.enqueue_message_for_sending(
2237                         self.generator.keepalive_message()
2238                     )
2239                     logger.info("KEEP ALIVE is sent.")
2240                 # We are sending a message now, so let's prioritize it.
2241                 self.prioritize_writing = True
2242
2243         try:
2244             msg = self.inqueue.get_nowait()
2245             logger.info("Received message: {}".format(msg))
2246             msgbin = binascii.unhexlify(msg)
2247             self.writer.enqueue_message_for_sending(msgbin)
2248         except Queue.Empty:
2249             pass
2250         # Now we know what our priorities are, we have to check
2251         # which actions are available.
2252         # socket.socket() returns three lists,
2253         # we store them to list of lists.
2254         list_list = select.select(
2255             [self.socket], [self.socket], [self.socket], self.timer.report_timedelta
2256         )
2257         read_list, write_list, except_list = list_list
2258         # Lists are unpacked, each is either [] or [self.socket],
2259         # so we will test them as boolean.
2260         if except_list:
2261             logger.error("Exceptional state on the socket.")
2262             raise RuntimeError("Exceptional state on socket", self.socket)
2263         # We will do either read or write.
2264         if not (self.prioritize_writing and write_list):
2265             # Either we have no reason to rush writes,
2266             # or the socket is not writable.
2267             # We are focusing on reading here.
2268             if read_list:  # there is something to read indeed
2269                 # In this case we want to read chunk of message
2270                 # and repeat the select,
2271                 self.reader.read_message_chunk()
2272                 return
2273             # We were focusing on reading, but nothing to read was there.
2274             # Good time to check peer for hold timer.
2275             self.timer.check_peer_hold_time(self.timer.snapshot_time)
2276             # Quiet on the read front, we can have attempt to write.
2277         if write_list:
2278             # Either we really want to reset peer's view of our hold
2279             # timer, or there was nothing to read.
2280             # Were we in the middle of sending a message?
2281             if self.writer.sending_message:
2282                 # Was it the end of a message?
2283                 whole = self.writer.send_message_chunk_is_whole()
2284                 # We were pressed to send something and we did it.
2285                 if self.prioritize_writing and whole:
2286                     # We prioritize reading again.
2287                     self.prioritize_writing = False
2288                 return
2289             # Finally to check if still update messages to be generated.
2290             if self.generator.remaining_prefixes:
2291                 msg_out = self.generator.compose_update_message()
2292                 if not self.generator.remaining_prefixes:
2293                     # We have just finished update generation,
2294                     # end-of-rib is due.
2295                     logger.info("All update messages generated.")
2296                     logger.info("Storing performance results.")
2297                     self.generator.store_results()
2298                     logger.info("Finally an END-OF-RIB is sent.")
2299                     msg_out += self.generator.update_message(
2300                         wr_prefixes=[], nlri_prefixes=[], end_of_rib=True
2301                     )
2302                 self.writer.enqueue_message_for_sending(msg_out)
2303                 # Attempt for real sending to be done in next iteration.
2304                 return
2305             # Nothing to write anymore.
2306             # To avoid busy loop, we do idle waiting here.
2307             self.reader.wait_for_read()
2308             return
2309         # We can neither read nor write.
2310         logger.warning(
2311             "Input and output both blocked for "
2312             + str(self.timer.report_timedelta)
2313             + " seconds."
2314         )
2315         # FIXME: Are we sure select has been really waiting
2316         # the whole period?
2317         return
2318
2319
2320 def create_logger(loglevel, logfile):
2321     """Create logger object
2322
2323     Arguments:
2324         :loglevel: log level
2325         :logfile: log file name
2326     Returns:
2327         :return: logger object
2328     """
2329     logger = logging.getLogger("logger")
2330     log_formatter = logging.Formatter(
2331         "%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s"
2332     )
2333     console_handler = logging.StreamHandler()
2334     file_handler = logging.FileHandler(logfile, mode="w")
2335     console_handler.setFormatter(log_formatter)
2336     file_handler.setFormatter(log_formatter)
2337     logger.addHandler(console_handler)
2338     logger.addHandler(file_handler)
2339     logger.setLevel(loglevel)
2340     return logger
2341
2342
2343 def job(arguments, inqueue, storage):
2344     """One time initialisation and iterations looping.
2345     Notes:
2346         Establish BGP connection and run iterations.
2347
2348     Arguments:
2349         :arguments: Command line arguments
2350         :inqueue: Data to be sent from play.py
2351         :storage: Shared dict for rpc server
2352     Returns:
2353         :return: None
2354     """
2355     bgp_socket = establish_connection(arguments)
2356     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2357     # Receive open message before sending anything.
2358     # FIXME: Add parameter to send default open message first,
2359     # to work with "you first" peers.
2360     msg_in = read_open_message(bgp_socket)
2361     logger.info(binascii.hexlify(msg_in))
2362     storage["open"] = binascii.hexlify(msg_in)
2363     timer = TimeTracker(msg_in)
2364     generator = MessageGenerator(arguments)
2365     msg_out = generator.open_message()
2366     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2367     # Send our open message to the peer.
2368     bgp_socket.send(msg_out)
2369     # Wait for confirming keepalive.
2370     # TODO: Surely in just one packet?
2371     # Using exact keepalive length to not to see possible updates.
2372     msg_in = bgp_socket.recv(19)
2373     if msg_in != generator.keepalive_message():
2374         error_msg = "Open not confirmed by keepalive, instead got"
2375         logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2376         raise MessageError(error_msg, msg_in)
2377     timer.reset_peer_hold_time()
2378     # Send the keepalive to indicate the connection is accepted.
2379     timer.snapshot()  # Remember this time.
2380     msg_out = generator.keepalive_message()
2381     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2382     bgp_socket.send(msg_out)
2383     # Use the remembered time.
2384     timer.reset_my_keepalive_time(timer.snapshot_time)
2385     # End of initial handshake phase.
2386     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2387     while True:  # main reactor loop
2388         state.perform_one_loop_iteration()
2389
2390
2391 class Rpcs:
2392     """Handler for SimpleXMLRPCServer"""
2393
2394     def __init__(self, sendqueue, storage):
2395         """Init method
2396
2397         Arguments:
2398             :sendqueue: queue for data to be sent towards odl
2399             :storage: thread safe dict
2400         """
2401         self.queue = sendqueue
2402         self.storage = storage
2403
2404     def send(self, text):
2405         """Data to be sent
2406
2407         Arguments:
2408             :text: hes string of the data to be sent
2409         """
2410         self.queue.put(text)
2411
2412     def get(self, text=""):
2413         """Reads data form the storage
2414
2415         - returns stored data or an empty string, at the moment only
2416           'update' is stored
2417
2418         Arguments:
2419             :text: a key to the storage to get the data
2420         Returns:
2421             :data: stored data
2422         """
2423         with self.storage as stor:
2424             return stor.get(text, "")
2425
2426     def clean(self, text=""):
2427         """Cleans data form the storage
2428
2429         Arguments:
2430             :text: a key to the storage to clean the data
2431         """
2432         with self.storage as stor:
2433             if text in stor:
2434                 del stor[text]
2435
2436
2437 def threaded_job(arguments):
2438     """Run the job threaded
2439
2440     Arguments:
2441         :arguments: Command line arguments
2442     Returns:
2443         :return: None
2444     """
2445     amount_left = arguments.amount
2446     utils_left = arguments.multiplicity
2447     prefix_current = arguments.firstprefix
2448     myip_current = arguments.myip
2449     port = arguments.port
2450     thread_args = []
2451     rpcqueue = Queue.Queue()
2452     storage = SafeDict()
2453
2454     while 1:
2455         amount_per_util = (amount_left - 1) / utils_left + 1  # round up
2456         amount_left -= amount_per_util
2457         utils_left -= 1
2458
2459         args = deepcopy(arguments)
2460         args.amount = amount_per_util
2461         args.firstprefix = prefix_current
2462         args.myip = myip_current
2463         thread_args.append(args)
2464
2465         if not utils_left:
2466             break
2467         prefix_current += amount_per_util * 16
2468         myip_current += 1
2469
2470     try:
2471         # Create threads
2472         for t in thread_args:
2473             thread.start_new_thread(job, (t, rpcqueue, storage))
2474     except Exception:
2475         print("Error: unable to start thread.")
2476         raise SystemExit(2)
2477
2478     if arguments.usepeerip:
2479         ip = arguments.peerip
2480     else:
2481         ip = arguments.myip
2482     rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2483     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2484     rpcserver.serve_forever()
2485
2486
2487 if __name__ == "__main__":
2488     arguments = parse_arguments()
2489     logger = create_logger(arguments.loglevel, arguments.logfile)
2490     threaded_job(arguments)