Remove variables for genius
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 from copy import deepcopy
15 from xmlrpc.server import SimpleXMLRPCServer
16 import argparse
17 import binascii
18 import ipaddr
19 import logging
20 import queue
21 import select
22 import socket
23 import struct
24 import threading
25 import time
26
27
28 __author__ = "Vratko Polak"
29 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
30 __license__ = "Eclipse Public License v1.0"
31 __email__ = "vrpolak@cisco.com"
32
33
34 class SafeDict(dict):
35     """Thread safe dictionary
36
37     The object will serve as thread safe data storage.
38     It should be used with "with" statement.
39     """
40
41     def __init__(self, *p_arg, **n_arg):
42         super(SafeDict, self).__init__()
43         self._lock = threading.Lock()
44
45     def __enter__(self):
46         self._lock.acquire()
47         return self
48
49     def __exit__(self, type, value, traceback):
50         self._lock.release()
51
52
53 def parse_arguments():
54     """Use argparse to get arguments,
55
56     Returns:
57         :return: args object.
58     """
59     parser = argparse.ArgumentParser()
60     # TODO: Should we use --argument-names-with-spaces?
61     str_help = "Autonomous System number use in the stream."
62     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
63     # FIXME: We are acting as iBGP peer,
64     # we should mirror AS number from peer's open message.
65     str_help = "Amount of IP prefixes to generate. (negative means " "infinite" ")."
66     parser.add_argument("--amount", default="1", type=int, help=str_help)
67     str_help = "Rpc server port."
68     parser.add_argument("--port", default="8000", type=int, help=str_help)
69     str_help = "Maximum number of IP prefixes to be announced in one iteration"
70     parser.add_argument("--insert", default="1", type=int, help=str_help)
71     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
72     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
73     str_help = "The number of prefixes to process without withdrawals"
74     parser.add_argument("--prefill", default="0", type=int, help=str_help)
75     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
76     parser.add_argument(
77         "--updates", choices=["single", "separate"], default=["separate"], help=str_help
78     )
79     str_help = "Base prefix IP address for prefix generation"
80     parser.add_argument(
81         "--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help
82     )
83     str_help = "The prefix length."
84     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
85     str_help = "Listen for connection, instead of initiating it."
86     parser.add_argument("--listen", action="store_true", help=str_help)
87     str_help = (
88         "Numeric IP Address to bind to and derive BGP ID from."
89         + "Default value only suitable for listening."
90     )
91     parser.add_argument(
92         "--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help
93     )
94     str_help = (
95         "TCP port to bind to when listening or initiating connection."
96         + "Default only suitable for initiating."
97     )
98     parser.add_argument("--myport", default="0", type=int, help=str_help)
99     str_help = "The IP of the next hop to be placed into the update messages."
100     parser.add_argument(
101         "--nexthop",
102         default="192.0.2.1",
103         type=ipaddr.IPv4Address,
104         dest="nexthop",
105         help=str_help,
106     )
107     str_help = "Identifier of the route originator."
108     parser.add_argument(
109         "--originator",
110         default=None,
111         type=ipaddr.IPv4Address,
112         dest="originator",
113         help=str_help,
114     )
115     str_help = "Cluster list item identifier."
116     parser.add_argument(
117         "--cluster",
118         default=None,
119         type=ipaddr.IPv4Address,
120         dest="cluster",
121         help=str_help,
122     )
123     str_help = (
124         "Numeric IP Address to try to connect to."
125         + "Currently no effect in listening mode."
126     )
127     parser.add_argument(
128         "--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help
129     )
130     str_help = "TCP port to try to connect to. No effect in listening mode."
131     parser.add_argument("--peerport", default="179", type=int, help=str_help)
132     str_help = "Local hold time."
133     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
134     str_help = "Log level (--error, --warning, --info, --debug)"
135     parser.add_argument(
136         "--error",
137         dest="loglevel",
138         action="store_const",
139         const=logging.ERROR,
140         default=logging.INFO,
141         help=str_help,
142     )
143     parser.add_argument(
144         "--warning",
145         dest="loglevel",
146         action="store_const",
147         const=logging.WARNING,
148         default=logging.INFO,
149         help=str_help,
150     )
151     parser.add_argument(
152         "--info",
153         dest="loglevel",
154         action="store_const",
155         const=logging.INFO,
156         default=logging.INFO,
157         help=str_help,
158     )
159     parser.add_argument(
160         "--debug",
161         dest="loglevel",
162         action="store_const",
163         const=logging.DEBUG,
164         default=logging.INFO,
165         help=str_help,
166     )
167     str_help = "Log file name"
168     parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
169     str_help = "Trailing part of the csv result files for plotting purposes"
170     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
171     str_help = "Minimum number of updates to reach to include result into csv."
172     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
173     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
174     parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
175     str_help = "Using peerip instead of myip for xmlrpc server"
176     parser.add_argument(
177         "--usepeerip", default=False, action="store_true", help=str_help
178     )
179     str_help = "Link-State NLRI supported"
180     parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
181     str_help = "Link-State NLRI: Identifier"
182     parser.add_argument("-lsid", default="1", type=int, help=str_help)
183     str_help = "Link-State NLRI: Tunnel ID"
184     parser.add_argument("-lstid", default="1", type=int, help=str_help)
185     str_help = "Link-State NLRI: LSP ID"
186     parser.add_argument("-lspid", default="1", type=int, help=str_help)
187     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
188     parser.add_argument(
189         "--lstsaddr", default="1.2.3.4", type=ipaddr.IPv4Address, help=str_help
190     )
191     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
192     parser.add_argument(
193         "--lsteaddr", default="5.6.7.8", type=ipaddr.IPv4Address, help=str_help
194     )
195     str_help = "Link-State NLRI: Identifier Step"
196     parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
197     str_help = "Link-State NLRI: Tunnel ID Step"
198     parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
199     str_help = "Link-State NLRI: LSP ID Step"
200     parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
201     str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
202     parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
203     str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
204     parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
205     str_help = "How many play utilities are to be started."
206     parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
207     str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
208     Enabling this flag makes the script not decoding the update mesage, because of not\
209     supported decoding for these elements."
210     parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
211     str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
212     Enabling this flag makes the script not decoding the update mesage, because of not\
213     supported decoding for these elements."
214     parser.add_argument("--grace", default="8", type=int, help=str_help)
215     str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
216     IPV4-Unicast, IPV6-Unicast, BGP-LS\
217     Enabling this flag makes the script not decoding the update mesage, because of not\
218     supported decoding for these elements."
219     parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
220     str_help = "Open message includes L3VPN-MULTICAST arguments.\
221     Enabling this flag makes the script not decoding the update mesage, because of not\
222     supported decoding for these elements."
223     parser.add_argument(
224         "--l3vpn_mcast", default=False, action="store_true", help=str_help
225     )
226     str_help = (
227         "Open message includes L3VPN-UNICAST arguments, without message decoding."
228     )
229     parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
230     str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
231     parser.add_argument(
232         "--rt_constrain", default=False, action="store_true", help=str_help
233     )
234     str_help = "Open message includes ipv6-unicast family, without message decoding."
235     parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
236     str_help = "Add all supported families without message decoding."
237     parser.add_argument("--allf", default=False, action="store_true", help=str_help)
238     parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
239     str_help = "Skipping well known attributes for update message"
240     parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
241     arguments = parser.parse_args()
242     if arguments.multiplicity < 1:
243         print("Multiplicity", arguments.multiplicity, "is not positive.")
244         raise SystemExit(1)
245     # TODO: Are sanity checks (such as asnumber>=0) required?
246     return arguments
247
248
249 def establish_connection(arguments):
250     """Establish connection to BGP peer.
251
252     Arguments:
253         :arguments: following command-line arguments are used
254             - arguments.myip: local IP address
255             - arguments.myport: local port
256             - arguments.peerip: remote IP address
257             - arguments.peerport: remote port
258     Returns:
259         :return: socket.
260     """
261     if arguments.listen:
262         logger.info("Connecting in the listening mode.")
263         logger.debug("Local IP address: " + str(arguments.myip))
264         logger.debug("Local port: " + str(arguments.myport))
265         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
267         # bind need single tuple as argument
268         listening_socket.bind((str(arguments.myip), arguments.myport))
269         listening_socket.listen(1)
270         bgp_socket, _ = listening_socket.accept()
271         # TODO: Verify client IP is cotroller IP.
272         listening_socket.close()
273     else:
274         logger.info("Connecting in the talking mode.")
275         logger.debug("Local IP address: " + str(arguments.myip))
276         logger.debug("Local port: " + str(arguments.myport))
277         logger.debug("Remote IP address: " + str(arguments.peerip))
278         logger.debug("Remote port: " + str(arguments.peerport))
279         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
280         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
281         # bind to force specified address and port
282         talking_socket.bind((str(arguments.myip), arguments.myport))
283         # socket does not spead ipaddr, hence str()
284         talking_socket.connect((str(arguments.peerip), arguments.peerport))
285         bgp_socket = talking_socket
286     logger.info("Connected to ODL.")
287     return bgp_socket
288
289
290 def get_short_int_from_message(message, offset=16):
291     """Extract 2-bytes number from provided message.
292
293     Arguments:
294         :message: given message
295         :offset: offset of the short_int inside the message
296     Returns:
297         :return: required short_inf value.
298     Notes:
299         default offset value is the BGP message size offset.
300     """
301     high_byte_int = message[offset]
302     low_byte_int = message[offset + 1]
303     short_int = high_byte_int * 256 + low_byte_int
304     return short_int
305
306
307 def get_prefix_list_from_hex(prefixes_hex):
308     """Get decoded list of prefixes (rfc4271#section-4.3)
309
310     Arguments:
311         :prefixes_hex: list of prefixes to be decoded in hex
312     Returns:
313         :return: list of prefixes in the form of ip address (X.X.X.X/X)
314     """
315     prefix_list = []
316     offset = 0
317     while offset < len(prefixes_hex):
318         prefix_bit_len_hex = prefixes_hex[offset : offset + 1]
319         prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
320         prefix_len = int((prefix_bit_len - 1) / 8) + 1
321         prefix_hex = prefixes_hex[offset + 1 : offset + 1 + prefix_len]
322         prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
323         offset += 1 + prefix_len
324         prefix_list.append(prefix + "/" + str(prefix_bit_len))
325     return prefix_list
326
327
328 class MessageError(ValueError):
329     """Value error with logging optimized for hexlified messages."""
330
331     def __init__(self, text, message, *args):
332         """Initialisation.
333
334         Store and call super init for textual comment,
335         store raw message which caused it.
336         """
337         self.text = text
338         self.msg = message
339         super(MessageError, self).__init__(text, message, *args)
340
341     def __str__(self):
342         """Generate human readable error message.
343
344         Returns:
345             :return: human readable message as string
346         Notes:
347             Use a placeholder string if the message is to be empty.
348         """
349         message = binascii.hexlify(self.msg).decode()
350         if message == "":
351             message = "(empty message)"
352         return self.text + ": " + message
353
354
355 def read_open_message(bgp_socket):
356     """Receive peer's OPEN message
357
358     Arguments:
359         :bgp_socket: the socket to be read
360     Returns:
361         :return: received OPEN message.
362     Notes:
363         Performs just basic incomming message checks
364     """
365     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
366     # TODO: Can the incoming open message be split in more than one packet?
367     # Some validation.
368     if len(msg_in) < 37:
369         # 37 is minimal length of open message with 4-byte AS number.
370         error_msg = (
371             "Message length (" + str(len(msg_in)) + ") is smaller than "
372             "minimal length of OPEN message with 4-byte AS number (37)"
373         )
374         logger.error(error_msg + ": " + binascii.hexlify(msg_in).decode())
375         raise MessageError(error_msg, msg_in)
376     # TODO: We could check BGP marker, but it is defined only later;
377     # decide what to do.
378     reported_length = get_short_int_from_message(msg_in)
379     if len(msg_in) != reported_length:
380         error_msg = (
381             "Expected message length ("
382             + reported_length
383             + ") does not match actual length ("
384             + str(len(msg_in))
385             + ")"
386         )
387         logger.error(error_msg + binascii.hexlify(msg_in).decode())
388         raise MessageError(error_msg, msg_in)
389     logger.info("Open message received.")
390     return msg_in
391
392
393 class MessageGenerator(object):
394     """Class which generates messages, holds states and configuration values."""
395
396     # TODO: Define bgp marker as a class (constant) variable.
397     def __init__(self, args):
398         """Initialisation according to command-line args.
399
400         Arguments:
401             :args: argsparser's Namespace object which contains command-line
402                 options for MesageGenerator initialisation
403         Notes:
404             Calculates and stores default values used later on for
405             message generation.
406         """
407         self.total_prefix_amount = args.amount
408         # Number of update messages left to be sent.
409         self.remaining_prefixes = self.total_prefix_amount
410
411         # New parameters initialisation
412         self.port = args.port
413         self.iteration = 0
414         self.prefix_base_default = args.firstprefix
415         self.prefix_length_default = args.prefixlen
416         self.wr_prefixes_default = []
417         self.nlri_prefixes_default = []
418         self.version_default = 4
419         self.my_autonomous_system_default = args.asnumber
420         self.hold_time_default = args.holdtime  # Local hold time.
421         self.bgp_identifier_default = int(args.myip)
422         self.next_hop_default = args.nexthop
423         self.originator_id_default = args.originator
424         self.cluster_list_item_default = args.cluster
425         self.single_update_default = args.updates == "single"
426         self.randomize_updates_default = args.updates == "random"
427         self.prefix_count_to_add_default = args.insert
428         self.prefix_count_to_del_default = args.withdraw
429         if self.prefix_count_to_del_default < 0:
430             self.prefix_count_to_del_default = 0
431         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
432             # total number of prefixes must grow to avoid infinite test loop
433             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
434         self.slot_size_default = self.prefix_count_to_add_default
435         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
436         self.results_file_name_default = args.results
437         self.performance_threshold_default = args.threshold
438         self.rfc4760 = args.rfc4760
439         self.bgpls = args.bgpls
440         self.evpn = args.evpn
441         self.mvpn = args.mvpn
442         self.l3vpn_mcast = args.l3vpn_mcast
443         self.l3vpn = args.l3vpn
444         self.rt_constrain = args.rt_constrain
445         self.ipv6 = args.ipv6
446         self.allf = args.allf
447         self.skipattr = args.skipattr
448         self.grace = args.grace
449         # Default values when BGP-LS Attributes are used
450         if self.bgpls:
451             self.prefix_count_to_add_default = 1
452             self.prefix_count_to_del_default = 0
453         self.ls_nlri_default = {
454             "Identifier": args.lsid,
455             "TunnelID": args.lstid,
456             "LSPID": args.lspid,
457             "IPv4TunnelSenderAddress": args.lstsaddr,
458             "IPv4TunnelEndPointAddress": args.lsteaddr,
459         }
460         self.lsid_step = args.lsidstep
461         self.lstid_step = args.lstidstep
462         self.lspid_step = args.lspidstep
463         self.lstsaddr_step = args.lstsaddrstep
464         self.lsteaddr_step = args.lsteaddrstep
465         # Default values used for randomized part
466         s1_slots = (
467             int(
468                 (self.total_prefix_amount - self.remaining_prefixes_threshold - 1)
469                 / self.prefix_count_to_add_default
470             )
471             + 1
472         )
473         s2_slots = (
474             int(
475                 (self.remaining_prefixes_threshold - 1)
476                 / (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
477             )
478             + 1
479         )
480         # S1_First_Index = 0
481         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
482         s2_first_index = s1_slots * self.prefix_count_to_add_default
483         s2_last_index = (
484             s2_first_index
485             + s2_slots
486             * (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
487             - 1
488         )
489         self.slot_gap_default = (
490             int(
491                 (self.total_prefix_amount - self.remaining_prefixes_threshold - 1)
492                 / self.prefix_count_to_add_default
493             )
494             + 1
495         )
496         self.randomize_lowest_default = s2_first_index
497         self.randomize_highest_default = s2_last_index
498         # Initialising counters
499         self.phase1_start_time = 0
500         self.phase1_stop_time = 0
501         self.phase2_start_time = 0
502         self.phase2_stop_time = 0
503         self.phase1_updates_sent = 0
504         self.phase2_updates_sent = 0
505         self.updates_sent = 0
506
507         self.log_info = args.loglevel <= logging.INFO
508         self.log_debug = args.loglevel <= logging.DEBUG
509         """
510         Flags needed for the MessageGenerator performance optimization.
511         Calling logger methods each iteration even with proper log level set
512         slows down significantly the MessageGenerator performance.
513         Measured total generation time (1M updates, dry run, error log level):
514         - logging based on basic logger features: 36,2s
515         - logging based on advanced logger features (lazy logging): 21,2s
516         - conditional calling of logger methods enclosed inside condition: 8,6s
517         """
518
519         logger.info("Generator initialisation")
520         logger.info(
521             "  Target total number of prefixes to be introduced: "
522             + str(self.total_prefix_amount)
523         )
524         logger.info(
525             "  Prefix base: "
526             + str(self.prefix_base_default)
527             + "/"
528             + str(self.prefix_length_default)
529         )
530         logger.info(
531             "  My Autonomous System number: " + str(self.my_autonomous_system_default)
532         )
533         logger.info("  My Hold Time: " + str(self.hold_time_default))
534         logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
535         logger.info("  Next Hop: " + str(self.next_hop_default))
536         logger.info("  Originator ID: " + str(self.originator_id_default))
537         logger.info("  Cluster list: " + str(self.cluster_list_item_default))
538         logger.info(
539             "  Prefix count to be inserted at once: "
540             + str(self.prefix_count_to_add_default)
541         )
542         logger.info(
543             "  Prefix count to be withdrawn at once: "
544             + str(self.prefix_count_to_del_default)
545         )
546         logger.info(
547             "  Fast pre-fill up to "
548             + str(self.total_prefix_amount - self.remaining_prefixes_threshold)
549             + " prefixes"
550         )
551         logger.info(
552             "  Remaining number of prefixes to be processed "
553             + "in parallel with withdrawals: "
554             + str(self.remaining_prefixes_threshold)
555         )
556         logger.debug(
557             "  Prefix index range used after pre-fill procedure ["
558             + str(self.randomize_lowest_default)
559             + ", "
560             + str(self.randomize_highest_default)
561             + "]"
562         )
563         if self.single_update_default:
564             logger.info(
565                 "  Common single UPDATE will be generated "
566                 + "for both NLRI & WITHDRAWN lists"
567             )
568         else:
569             logger.info(
570                 "  Two separate UPDATEs will be generated "
571                 + "for each NLRI & WITHDRAWN lists"
572             )
573         if self.randomize_updates_default:
574             logger.info("  Generation of UPDATE messages will be randomized")
575         logger.info("  Let's go ...\n")
576
577         # TODO: Notification for hold timer expiration can be handy.
578
579     def store_results(self, file_name=None, threshold=None):
580         """Stores specified results into files based on file_name value.
581
582         Arguments:
583             :param file_name: Trailing (common) part of result file names
584             :param threshold: Minimum number of sent updates needed for each
585                               result to be included into result csv file
586                               (mainly needed because of the result accuracy)
587         Returns:
588             :return: n/a
589         """
590         # default values handling
591         # TODO optimize default values handling (use e.g. dicionary.update() approach)
592         if file_name is None:
593             file_name = self.results_file_name_default
594         if threshold is None:
595             threshold = self.performance_threshold_default
596         # performance calculation
597         if self.phase1_updates_sent >= threshold:
598             totals1 = self.phase1_updates_sent
599             performance1 = int(
600                 self.phase1_updates_sent
601                 / (self.phase1_stop_time - self.phase1_start_time)
602             )
603         else:
604             totals1 = None
605             performance1 = None
606         if self.phase2_updates_sent >= threshold:
607             totals2 = self.phase2_updates_sent
608             performance2 = int(
609                 self.phase2_updates_sent
610                 / (self.phase2_stop_time - self.phase2_start_time)
611             )
612         else:
613             totals2 = None
614             performance2 = None
615
616         logger.info("#" * 10 + " Final results " + "#" * 10)
617         logger.info("Number of iterations: " + str(self.iteration))
618         logger.info(
619             "Number of UPDATE messages sent in the pre-fill phase: "
620             + str(self.phase1_updates_sent)
621         )
622         logger.info(
623             "The pre-fill phase duration: "
624             + str(self.phase1_stop_time - self.phase1_start_time)
625             + "s"
626         )
627         logger.info(
628             "Number of UPDATE messages sent in the 2nd test phase: "
629             + str(self.phase2_updates_sent)
630         )
631         logger.info(
632             "The 2nd test phase duration: "
633             + str(self.phase2_stop_time - self.phase2_start_time)
634             + "s"
635         )
636         logger.info("Threshold for performance reporting: " + str(threshold))
637
638         # making labels
639         phase1_label = (
640             "pre-fill " + str(self.prefix_count_to_add_default) + " route(s) per UPDATE"
641         )
642         if self.single_update_default:
643             phase2_label = "+" + (
644                 str(self.prefix_count_to_add_default)
645                 + "/-"
646                 + str(self.prefix_count_to_del_default)
647                 + " routes per UPDATE"
648             )
649         else:
650             phase2_label = "+" + (
651                 str(self.prefix_count_to_add_default)
652                 + "/-"
653                 + str(self.prefix_count_to_del_default)
654                 + " routes in two UPDATEs"
655             )
656         # collecting capacity and performance results
657         totals = {}
658         performance = {}
659         if totals1 is not None:
660             totals[phase1_label] = totals1
661             performance[phase1_label] = performance1
662         if totals2 is not None:
663             totals[phase2_label] = totals2
664             performance[phase2_label] = performance2
665         self.write_results_to_file(totals, "totals-" + file_name)
666         self.write_results_to_file(performance, "performance-" + file_name)
667
668     def write_results_to_file(self, results, file_name):
669         """Writes results to the csv plot file consumable by Jenkins.
670
671         Arguments:
672             :param file_name: Name of the (csv) file to be created
673         Returns:
674             :return: none
675         """
676         first_line = ""
677         second_line = ""
678         f = open(file_name, "wt")
679         try:
680             for key in sorted(results):
681                 first_line += key + ", "
682                 second_line += str(results[key]) + ", "
683             first_line = first_line[:-2]
684             second_line = second_line[:-2]
685             f.write(first_line + "\n")
686             f.write(second_line + "\n")
687             logger.info(
688                 "Message generator performance results stored in " + file_name + ":"
689             )
690             logger.info("  " + first_line)
691             logger.info("  " + second_line)
692         finally:
693             f.close()
694
695     # Return pseudo-randomized (reproducible) index for selected range
696     def randomize_index(self, index, lowest=None, highest=None):
697         """Calculates pseudo-randomized index from selected range.
698
699         Arguments:
700             :param index: input index
701             :param lowest: the lowes index from the randomized area
702             :param highest: the highest index from the randomized area
703         Returns:
704             :return: the (pseudo)randomized index
705         Notes:
706             Created just as a fame for future generator enhancement.
707         """
708         # default values handling
709         # TODO optimize default values handling (use e.g. dicionary.update() approach)
710         if lowest is None:
711             lowest = self.randomize_lowest_default
712         if highest is None:
713             highest = self.randomize_highest_default
714         # randomize
715         if (index >= lowest) and (index <= highest):
716             # we are in the randomized range -> shuffle it inside
717             # the range (now just reverse the order)
718             new_index = highest - (index - lowest)
719         else:
720             # we are out of the randomized range -> nothing to do
721             new_index = index
722         return new_index
723
724     def get_ls_nlri_values(self, index):
725         """Generates LS-NLRI parameters.
726         http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
727
728         Arguments:
729             :param index: index (iteration)
730         Returns:
731             :return: dictionary of LS NLRI parameters and values
732         """
733         # generating list of LS NLRI parameters
734         identifier = self.ls_nlri_default["Identifier"] + int(index / self.lsid_step)
735         ipv4_tunnel_sender_address = self.ls_nlri_default[
736             "IPv4TunnelSenderAddress"
737         ] + int(index / self.lstsaddr_step)
738         tunnel_id = self.ls_nlri_default["TunnelID"] + int(index / self.lstid_step)
739         lsp_id = self.ls_nlri_default["LSPID"] + int(index / self.lspid_step)
740         ipv4_tunnel_endpoint_address = self.ls_nlri_default[
741             "IPv4TunnelEndPointAddress"
742         ] + int(index / self.lsteaddr_step)
743         ls_nlri_values = {
744             "Identifier": identifier,
745             "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
746             "TunnelID": tunnel_id,
747             "LSPID": lsp_id,
748             "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address,
749         }
750         return ls_nlri_values
751
752     def get_prefix_list(
753         self,
754         slot_index,
755         slot_size=None,
756         prefix_base=None,
757         prefix_len=None,
758         prefix_count=None,
759         randomize=None,
760     ):
761         """Generates list of IP address prefixes.
762
763         Arguments:
764             :param slot_index: index of group of prefix addresses
765             :param slot_size: size of group of prefix addresses
766                 in [number of included prefixes]
767             :param prefix_base: IP address of the first prefix
768                 (slot_index = 0, prefix_index = 0)
769             :param prefix_len: length of the prefix in bites
770                 (the same as size of netmask)
771             :param prefix_count: number of prefixes to be returned
772                 from the specified slot
773         Returns:
774             :return: list of generated IP address prefixes
775         """
776         # default values handling
777         # TODO optimize default values handling (use e.g. dicionary.update() approach)
778         if slot_size is None:
779             slot_size = self.slot_size_default
780         if prefix_base is None:
781             prefix_base = self.prefix_base_default
782         if prefix_len is None:
783             prefix_len = self.prefix_length_default
784         if prefix_count is None:
785             prefix_count = slot_size
786         if randomize is None:
787             randomize = self.randomize_updates_default
788         # generating list of prefixes
789         indexes = []
790         prefixes = []
791         prefix_gap = 2 ** (32 - prefix_len)
792         for i in range(prefix_count):
793             prefix_index = slot_index * slot_size + i
794             if randomize:
795                 prefix_index = self.randomize_index(prefix_index)
796             indexes.append(prefix_index)
797             prefixes.append(prefix_base + prefix_index * prefix_gap)
798         if self.log_debug:
799             logger.debug("  Prefix slot index: " + str(slot_index))
800             logger.debug("  Prefix slot size: " + str(slot_size))
801             logger.debug("  Prefix count: " + str(prefix_count))
802             logger.debug("  Prefix indexes: " + str(indexes))
803             logger.debug("  Prefix list: " + str(prefixes))
804         return prefixes
805
806     def compose_update_message(
807         self, prefix_count_to_add=None, prefix_count_to_del=None
808     ):
809         """Composes an UPDATE message
810
811         Arguments:
812             :param prefix_count_to_add: # of prefixes to put into NLRI list
813             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
814         Returns:
815             :return: encoded UPDATE message in HEX
816         Notes:
817             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
818             lists or common message wich includes both prefix lists.
819             Updates global counters.
820         """
821         # default values handling
822         # TODO optimize default values handling (use e.g. dicionary.update() approach)
823         if prefix_count_to_add is None:
824             prefix_count_to_add = self.prefix_count_to_add_default
825         if prefix_count_to_del is None:
826             prefix_count_to_del = self.prefix_count_to_del_default
827         # logging
828         if self.log_info and not (self.iteration % 1000):
829             logger.info(
830                 "Iteration: "
831                 + str(self.iteration)
832                 + " - total remaining prefixes: "
833                 + str(self.remaining_prefixes)
834             )
835         if self.log_debug:
836             logger.debug(
837                 "#" * 10 + " Iteration: " + str(self.iteration) + " " + "#" * 10
838             )
839             logger.debug("Remaining prefixes: " + str(self.remaining_prefixes))
840         # scenario type & one-shot counter
841         straightforward_scenario = (
842             self.remaining_prefixes > self.remaining_prefixes_threshold
843         )
844         if straightforward_scenario:
845             prefix_count_to_del = 0
846             if self.log_debug:
847                 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
848             if not self.phase1_start_time:
849                 self.phase1_start_time = time.time()
850         else:
851             if self.log_debug:
852                 logger.debug("--- COMBINED SCENARIO ---")
853             if not self.phase2_start_time:
854                 self.phase2_start_time = time.time()
855         # tailor the number of prefixes if needed
856         prefix_count_to_add = prefix_count_to_del + min(
857             prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes
858         )
859         # prefix slots selection for insertion and withdrawal
860         slot_index_to_add = self.iteration
861         slot_index_to_del = slot_index_to_add - self.slot_gap_default
862         # getting lists of prefixes for insertion in this iteration
863         if self.log_debug:
864             logger.debug("Prefixes to be inserted in this iteration:")
865         prefix_list_to_add = self.get_prefix_list(
866             slot_index_to_add, prefix_count=prefix_count_to_add
867         )
868         # getting lists of prefixes for withdrawal in this iteration
869         if self.log_debug:
870             logger.debug("Prefixes to be withdrawn in this iteration:")
871         prefix_list_to_del = self.get_prefix_list(
872             slot_index_to_del, prefix_count=prefix_count_to_del
873         )
874         # generating the UPDATE mesage with LS-NLRI only
875         if self.bgpls:
876             ls_nlri = self.get_ls_nlri_values(self.iteration)
877             msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[], **ls_nlri)
878         else:
879             # generating the UPDATE message with prefix lists
880             if self.single_update_default:
881                 # Send prefixes to be introduced and withdrawn
882                 # in one UPDATE message
883                 msg_out = self.update_message(
884                     wr_prefixes=prefix_list_to_del, nlri_prefixes=prefix_list_to_add
885                 )
886             else:
887                 # Send prefixes to be introduced and withdrawn
888                 # in separate UPDATE messages (if needed)
889                 msg_out = self.update_message(
890                     wr_prefixes=[], nlri_prefixes=prefix_list_to_add
891                 )
892                 if prefix_count_to_del:
893                     msg_out += self.update_message(
894                         wr_prefixes=prefix_list_to_del, nlri_prefixes=[]
895                     )
896         # updating counters - who knows ... maybe I am last time here ;)
897         if straightforward_scenario:
898             self.phase1_stop_time = time.time()
899             self.phase1_updates_sent = self.updates_sent
900         else:
901             self.phase2_stop_time = time.time()
902             self.phase2_updates_sent = self.updates_sent - self.phase1_updates_sent
903         # updating totals for the next iteration
904         self.iteration += 1
905         self.remaining_prefixes -= prefix_count_to_add - prefix_count_to_del
906         # returning the encoded message
907         return msg_out
908
909     # Section of message encoders
910
911     def open_message(
912         self,
913         version=None,
914         my_autonomous_system=None,
915         hold_time=None,
916         bgp_identifier=None,
917     ):
918         """Generates an OPEN Message (rfc4271#section-4.2)
919
920         Arguments:
921             :param version: see the rfc4271#section-4.2
922             :param my_autonomous_system: see the rfc4271#section-4.2
923             :param hold_time: see the rfc4271#section-4.2
924             :param bgp_identifier: see the rfc4271#section-4.2
925         Returns:
926             :return: encoded OPEN message in HEX
927         """
928
929         # default values handling
930         # TODO optimize default values handling (use e.g. dicionary.update() approach)
931         if version is None:
932             version = self.version_default
933         if my_autonomous_system is None:
934             my_autonomous_system = self.my_autonomous_system_default
935         if hold_time is None:
936             hold_time = self.hold_time_default
937         if bgp_identifier is None:
938             bgp_identifier = self.bgp_identifier_default
939
940         # Marker
941         marker_hex = b"\xFF" * 16
942
943         # Type
944         type = 1
945         type_hex = struct.pack("B", type)
946
947         # version
948         version_hex = struct.pack("B", version)
949
950         # my_autonomous_system
951         # AS_TRANS value, 23456 decadic.
952         my_autonomous_system_2_bytes = 23456
953         # AS number is mappable to 2 bytes
954         if my_autonomous_system < 65536:
955             my_autonomous_system_2_bytes = my_autonomous_system
956         my_autonomous_system_hex_2_bytes = struct.pack(">H", my_autonomous_system)
957
958         # Hold Time
959         hold_time_hex = struct.pack(">H", hold_time)
960
961         # BGP Identifier
962         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
963
964         # Optional Parameters
965         optional_parameters_hex = b""
966         if self.rfc4760 or self.allf:
967             optional_parameter_hex = (
968                 b"\x02"  # Param type ("Capability Ad")
969                 b"\x06"  # Length (6 bytes)
970                 b"\x01"  # Capability type (NLRI Unicast),
971                 # see RFC 4760, secton 8
972                 b"\x04"  # Capability value length
973                 b"\x00\x01"  # AFI (Ipv4)
974                 b"\x00"  # (reserved)
975                 b"\x01"  # SAFI (Unicast)
976             )
977             optional_parameters_hex += optional_parameter_hex
978
979         if self.ipv6 or self.allf:
980             optional_parameter_hex = (
981                 b"\x02"  # Param type ("Capability Ad")
982                 b"\x06"  # Length (6 bytes)
983                 b"\x01"  # Multiprotocol extetension capability,
984                 b"\x04"  # Capability value length
985                 b"\x00\x02"  # AFI (IPV6)
986                 b"\x00"  # (reserved)
987                 b"\x01"  # SAFI (UNICAST)
988             )
989             optional_parameters_hex += optional_parameter_hex
990
991         if self.bgpls or self.allf:
992             optional_parameter_hex = (
993                 b"\x02"  # Param type ("Capability Ad")
994                 b"\x06"  # Length (6 bytes)
995                 b"\x01"  # Capability type (NLRI Unicast),
996                 # see RFC 4760, secton 8
997                 b"\x04"  # Capability value length
998                 b"\x40\x04"  # AFI (BGP-LS)
999                 b"\x00"  # (reserved)
1000                 b"\x47"  # SAFI (BGP-LS)
1001             )
1002             optional_parameters_hex += optional_parameter_hex
1003
1004         if self.evpn or self.allf:
1005             optional_parameter_hex = (
1006                 b"\x02"  # Param type ("Capability Ad")
1007                 b"\x06"  # Length (6 bytes)
1008                 b"\x01"  # Multiprotocol extetension capability,
1009                 b"\x04"  # Capability value length
1010                 b"\x00\x19"  # AFI (L2-VPN)
1011                 b"\x00"  # (reserved)
1012                 b"\x46"  # SAFI (EVPN)
1013             )
1014             optional_parameters_hex += optional_parameter_hex
1015
1016         if self.mvpn or self.allf:
1017             optional_parameter_hex = (
1018                 b"\x02"  # Param type ("Capability Ad")
1019                 b"\x06"  # Length (6 bytes)
1020                 b"\x01"  # Multiprotocol extetension capability,
1021                 b"\x04"  # Capability value length
1022                 b"\x00\x01"  # AFI (IPV4)
1023                 b"\x00"  # (reserved)
1024                 b"\x05"  # SAFI (MCAST-VPN)
1025             )
1026             optional_parameters_hex += optional_parameter_hex
1027             optional_parameter_hex = (
1028                 b"\x02"  # Param type ("Capability Ad")
1029                 b"\x06"  # Length (6 bytes)
1030                 b"\x01"  # Multiprotocol extetension capability,
1031                 b"\x04"  # Capability value length
1032                 b"\x00\x02"  # AFI (IPV6)
1033                 b"\x00"  # (reserved)
1034                 b"\x05"  # SAFI (MCAST-VPN)
1035             )
1036             optional_parameters_hex += optional_parameter_hex
1037
1038         if self.l3vpn_mcast or self.allf:
1039             optional_parameter_hex = (
1040                 b"\x02"  # Param type ("Capability Ad")
1041                 b"\x06"  # Length (6 bytes)
1042                 b"\x01"  # Multiprotocol extetension capability,
1043                 b"\x04"  # Capability value length
1044                 b"\x00\x01"  # AFI (IPV4)
1045                 b"\x00"  # (reserved)
1046                 b"\x81"  # SAFI (L3VPN-MCAST)
1047             )
1048             optional_parameters_hex += optional_parameter_hex
1049             optional_parameter_hex = (
1050                 b"\x02"  # Param type ("Capability Ad")
1051                 b"\x06"  # Length (6 bytes)
1052                 b"\x01"  # Multiprotocol extetension capability,
1053                 b"\x04"  # Capability value length
1054                 b"\x00\x02"  # AFI (IPV6)
1055                 b"\x00"  # (reserved)
1056                 b"\x81"  # SAFI (L3VPN-MCAST)
1057             )
1058             optional_parameters_hex += optional_parameter_hex
1059
1060         if self.l3vpn or self.allf:
1061             optional_parameter_hex = (
1062                 b"\x02"  # Param type ("Capability Ad")
1063                 b"\x06"  # Length (6 bytes)
1064                 b"\x01"  # Multiprotocol extetension capability,
1065                 b"\x04"  # Capability value length
1066                 b"\x00\x01"  # AFI (IPV4)
1067                 b"\x00"  # (reserved)
1068                 b"\x80"  # SAFI (L3VPN-UNICAST)
1069             )
1070             optional_parameters_hex += optional_parameter_hex
1071             optional_parameter_hex = (
1072                 b"\x02"  # Param type ("Capability Ad")
1073                 b"\x06"  # Length (6 bytes)
1074                 b"\x01"  # Multiprotocol extetension capability,
1075                 b"\x04"  # Capability value length
1076                 b"\x00\x02"  # AFI (IPV6)
1077                 b"\x00"  # (reserved)
1078                 b"\x80"  # SAFI (L3VPN-UNICAST)
1079             )
1080             optional_parameters_hex += optional_parameter_hex
1081
1082         if self.rt_constrain or self.allf:
1083             optional_parameter_hex = (
1084                 b"\x02"  # Param type ("Capability Ad")
1085                 b"\x06"  # Length (6 bytes)
1086                 b"\x01"  # Multiprotocol extetension capability,
1087                 b"\x04"  # Capability value length
1088                 b"\x00\x01"  # AFI (IPV4)
1089                 b"\x00"  # (reserved)
1090                 b"\x84"  # SAFI (ROUTE-TARGET-CONSTRAIN)
1091             )
1092             optional_parameters_hex += optional_parameter_hex
1093
1094         optional_parameter_hex = (
1095             b"\x02"  # Param type ("Capability Ad")
1096             b"\x06"  # Length (6 bytes)
1097             b"\x41"  # "32 bit AS Numbers Support"
1098             # (see RFC 6793, section 3)
1099             b"\x04"  # Capability value length
1100         )
1101         optional_parameter_hex += struct.pack(
1102             ">I", my_autonomous_system
1103         )  # My AS in 32 bit format
1104         optional_parameters_hex += optional_parameter_hex
1105
1106         if self.grace != 8:
1107             b = list(bin(self.grace)[2:])
1108             b = b + [0] * (3 - len(b))
1109             length = b"\x08"
1110             if b[1] == "1":
1111                 restart_flag = b"\x80\x05"
1112             else:
1113                 restart_flag = b"\x00\x05"
1114             if b[2] == "1":
1115                 ipv4_flag = b"\x80"
1116             else:
1117                 ipv4_flag = b"\x00"
1118             if b[0] == "1":
1119                 ll_gr = b"\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
1120                 length = b"\x11"
1121             else:
1122                 ll_gr = b""
1123             logger.debug("Grace parameters list: {}".format(b))
1124             # "\x02" Param type ("Capability Ad")
1125             # :param length: Length of whole message
1126             # "\x40" Graceful-restart capability
1127             # "\x06" Length (6 bytes)
1128             # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
1129             # "\x05" Restart timer (5sec)
1130             # "\x00\x01" AFI (IPV4)
1131             # "\x01"  SAFI (Unicast)
1132             # "\x00"  Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
1133             # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
1134             # ll-gr turned on when grace is between 4-7
1135             optional_parameter_hex = (
1136                 f"\x02{length}\x40\x06{restart_flag}\x00\x01\x01{ipv4_flag}{ll_gr}"
1137             )
1138             optional_parameters_hex += optional_parameter_hex
1139
1140         # Optional Parameters Length
1141         optional_parameters_length = len(optional_parameters_hex)
1142         optional_parameters_length_hex = struct.pack("B", optional_parameters_length)
1143
1144         # Length (big-endian)
1145         length = (
1146             len(marker_hex)
1147             + 2
1148             + len(type_hex)
1149             + len(version_hex)
1150             + len(my_autonomous_system_hex_2_bytes)
1151             + len(hold_time_hex)
1152             + len(bgp_identifier_hex)
1153             + len(optional_parameters_length_hex)
1154             + len(optional_parameters_hex)
1155         )
1156         length_hex = struct.pack(">H", length)
1157
1158         # OPEN Message
1159         message_hex = (
1160             marker_hex
1161             + length_hex
1162             + type_hex
1163             + version_hex
1164             + my_autonomous_system_hex_2_bytes
1165             + hold_time_hex
1166             + bgp_identifier_hex
1167             + optional_parameters_length_hex
1168             + optional_parameters_hex
1169         )
1170
1171         if self.log_debug:
1172             logger.debug("OPEN message encoding")
1173             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex).decode())
1174             logger.debug(
1175                 "  Length="
1176                 + str(length)
1177                 + " (0x"
1178                 + binascii.hexlify(length_hex).decode()
1179                 + ")"
1180             )
1181             logger.debug(
1182                 "  Type="
1183                 + str(type)
1184                 + " (0x"
1185                 + binascii.hexlify(type_hex).decode()
1186                 + ")"
1187             )
1188             logger.debug(
1189                 "  Version="
1190                 + str(version)
1191                 + " (0x"
1192                 + binascii.hexlify(version_hex).decode()
1193                 + ")"
1194             )
1195             logger.debug(
1196                 "  My Autonomous System="
1197                 + str(my_autonomous_system_2_bytes)
1198                 + " (0x"
1199                 + binascii.hexlify(my_autonomous_system_hex_2_bytes).decode()
1200                 + ")"
1201             )
1202             logger.debug(
1203                 "  Hold Time="
1204                 + str(hold_time)
1205                 + " (0x"
1206                 + binascii.hexlify(hold_time_hex).decode()
1207                 + ")"
1208             )
1209             logger.debug(
1210                 "  BGP Identifier="
1211                 + str(bgp_identifier)
1212                 + " (0x"
1213                 + binascii.hexlify(bgp_identifier_hex).decode()
1214                 + ")"
1215             )
1216             logger.debug(
1217                 "  Optional Parameters Length="
1218                 + str(optional_parameters_length)
1219                 + " (0x"
1220                 + binascii.hexlify(optional_parameters_length_hex).decode()
1221                 + ")"
1222             )
1223             logger.debug(
1224                 "  Optional Parameters=0x"
1225                 + binascii.hexlify(optional_parameters_hex).decode()
1226             )
1227             logger.debug("OPEN message encoded: 0x%s", binascii.b2a_hex(message_hex))
1228
1229         return message_hex
1230
1231     def update_message(
1232         self,
1233         wr_prefixes=None,
1234         nlri_prefixes=None,
1235         wr_prefix_length=None,
1236         nlri_prefix_length=None,
1237         my_autonomous_system=None,
1238         next_hop=None,
1239         originator_id=None,
1240         cluster_list_item=None,
1241         end_of_rib=False,
1242         **ls_nlri_params,
1243     ):
1244         """Generates an UPDATE Message (rfc4271#section-4.3)
1245
1246         Arguments:
1247             :param wr_prefixes: see the rfc4271#section-4.3
1248             :param nlri_prefixes: see the rfc4271#section-4.3
1249             :param wr_prefix_length: see the rfc4271#section-4.3
1250             :param nlri_prefix_length: see the rfc4271#section-4.3
1251             :param my_autonomous_system: see the rfc4271#section-4.3
1252             :param next_hop: see the rfc4271#section-4.3
1253         Returns:
1254             :return: encoded UPDATE message in HEX
1255         """
1256
1257         # default values handling
1258         # TODO optimize default values handling (use e.g. dicionary.update() approach)
1259         if wr_prefixes is None:
1260             wr_prefixes = self.wr_prefixes_default
1261         if nlri_prefixes is None:
1262             nlri_prefixes = self.nlri_prefixes_default
1263         if wr_prefix_length is None:
1264             wr_prefix_length = self.prefix_length_default
1265         if nlri_prefix_length is None:
1266             nlri_prefix_length = self.prefix_length_default
1267         if my_autonomous_system is None:
1268             my_autonomous_system = self.my_autonomous_system_default
1269         if next_hop is None:
1270             next_hop = self.next_hop_default
1271         if originator_id is None:
1272             originator_id = self.originator_id_default
1273         if cluster_list_item is None:
1274             cluster_list_item = self.cluster_list_item_default
1275         ls_nlri = self.ls_nlri_default.copy()
1276         ls_nlri.update(ls_nlri_params)
1277
1278         # Marker
1279         marker_hex = b"\xFF" * 16
1280
1281         # Type
1282         type = 2
1283         type_hex = struct.pack("B", type)
1284
1285         # Withdrawn Routes
1286         withdrawn_routes_hex = b""
1287         if not self.bgpls:
1288             bytes = int((wr_prefix_length - 1) / 8) + 1
1289             for prefix in wr_prefixes:
1290                 withdrawn_route_hex = (
1291                     struct.pack("B", wr_prefix_length)
1292                     + struct.pack(">I", int(prefix))[:bytes]
1293                 )
1294                 withdrawn_routes_hex += withdrawn_route_hex
1295
1296         # Withdrawn Routes Length
1297         withdrawn_routes_length = len(withdrawn_routes_hex)
1298         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1299
1300         # TODO: to replace hardcoded string by encoding?
1301         # Path Attributes
1302         path_attributes_hex = b""
1303         if not self.skipattr:
1304             path_attributes_hex += (
1305                 b"\x40"  # Flags ("Well-Known")
1306                 b"\x01"  # Type (ORIGIN)
1307                 b"\x01"  # Length (1)
1308                 b"\x00"  # Origin: IGP
1309             )
1310             path_attributes_hex += (
1311                 b"\x40"  # Flags ("Well-Known")
1312                 b"\x02"  # Type (AS_PATH)
1313                 b"\x06"  # Length (6)
1314                 b"\x02"  # AS segment type (AS_SEQUENCE)
1315                 b"\x01"  # AS segment length (1)
1316             )
1317             my_as_hex = struct.pack(">I", my_autonomous_system)
1318             path_attributes_hex += my_as_hex  # AS segment (4 bytes)
1319             path_attributes_hex += (
1320                 b"\x40"  # Flags ("Well-Known")
1321                 b"\x05"  # Type (LOCAL_PREF)
1322                 b"\x04"  # Length (4)
1323                 b"\x00\x00\x00\x64"  # (100)
1324             )
1325         if nlri_prefixes != []:
1326             path_attributes_hex += (
1327                 b"\x40"  # Flags ("Well-Known")
1328                 b"\x03"  # Type (NEXT_HOP)
1329                 b"\x04"  # Length (4)
1330             )
1331             next_hop_hex = struct.pack(">I", int(next_hop))
1332             path_attributes_hex += next_hop_hex  # IP address of the next hop (4 bytes)
1333             if originator_id is not None:
1334                 path_attributes_hex += (
1335                     b"\x80"  # Flags ("Optional, non-transitive")
1336                     b"\x09"  # Type (ORIGINATOR_ID)
1337                     b"\x04"  # Length (4)
1338                 )  # ORIGINATOR_ID (4 bytes)
1339                 path_attributes_hex += struct.pack(">I", int(originator_id))
1340             if cluster_list_item is not None:
1341                 path_attributes_hex += (
1342                     b"\x80"  # Flags ("Optional, non-transitive")
1343                     b"\x0a"  # Type (CLUSTER_LIST)
1344                     b"\x04"  # Length (4)
1345                 )  # one CLUSTER_LIST item (4 bytes)
1346                 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1347
1348         if self.bgpls and not end_of_rib:
1349             path_attributes_hex += (
1350                 b"\x80"  # Flags ("Optional, non-transitive")
1351                 b"\x0e"  # Type (MP_REACH_NLRI)
1352                 b"\x22"  # Length (34)
1353                 b"\x40\x04"  # AFI (BGP-LS)
1354                 b"\x47"  # SAFI (BGP-LS)
1355                 b"\x04"  # Next Hop Length (4)
1356             )
1357             path_attributes_hex += struct.pack(">I", int(next_hop))
1358             path_attributes_hex += b"\x00"  # Reserved
1359             path_attributes_hex += (
1360                 b"\x00\x05"  # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1361                 b"\x00\x15"  # LS-NLRI.TotalNLRILength (21)
1362                 b"\x07"  # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1363             )
1364             path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1365             path_attributes_hex += struct.pack(
1366                 ">I", int(ls_nlri["IPv4TunnelSenderAddress"])
1367             )
1368             path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1369             path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1370             path_attributes_hex += struct.pack(
1371                 ">I", int(ls_nlri["IPv4TunnelEndPointAddress"])
1372             )
1373
1374         # Total Path Attributes Length
1375         total_path_attributes_length = len(path_attributes_hex)
1376         total_path_attributes_length_hex = struct.pack(
1377             ">H", total_path_attributes_length
1378         )
1379
1380         # Network Layer Reachability Information
1381         nlri_hex = b""
1382         if not self.bgpls:
1383             bytes = int((nlri_prefix_length - 1) / 8) + 1
1384             for prefix in nlri_prefixes:
1385                 nlri_prefix_hex = (
1386                     struct.pack("B", nlri_prefix_length)
1387                     + struct.pack(">I", int(prefix))[:bytes]
1388                 )
1389                 nlri_hex += nlri_prefix_hex
1390
1391         # Length (big-endian)
1392         length = (
1393             len(marker_hex)
1394             + 2
1395             + len(type_hex)
1396             + len(withdrawn_routes_length_hex)
1397             + len(withdrawn_routes_hex)
1398             + len(total_path_attributes_length_hex)
1399             + len(path_attributes_hex)
1400             + len(nlri_hex)
1401         )
1402         length_hex = struct.pack(">H", length)
1403
1404         # UPDATE Message
1405         message_hex = (
1406             marker_hex
1407             + length_hex
1408             + type_hex
1409             + withdrawn_routes_length_hex
1410             + withdrawn_routes_hex
1411             + total_path_attributes_length_hex
1412             + path_attributes_hex
1413             + nlri_hex
1414         )
1415
1416         if self.grace != 8 and self.grace != 0 and end_of_rib:
1417             message_hex = marker_hex + binascii.unhexlify("00170200000000")
1418
1419         if self.log_debug:
1420             logger.debug("UPDATE message encoding")
1421             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex).decode())
1422             logger.debug(
1423                 "  Length="
1424                 + str(length)
1425                 + " (0x"
1426                 + binascii.hexlify(length_hex).decode()
1427                 + ")"
1428             )
1429             logger.debug(
1430                 "  Type="
1431                 + str(type)
1432                 + " (0x"
1433                 + binascii.hexlify(type_hex).decode()
1434                 + ")"
1435             )
1436             logger.debug(
1437                 "  withdrawn_routes_length="
1438                 + str(withdrawn_routes_length)
1439                 + " (0x"
1440                 + binascii.hexlify(withdrawn_routes_length_hex).decode()
1441                 + ")"
1442             )
1443             logger.debug(
1444                 "  Withdrawn_Routes="
1445                 + str(wr_prefixes)
1446                 + "/"
1447                 + str(wr_prefix_length)
1448                 + " (0x"
1449                 + binascii.hexlify(withdrawn_routes_hex).decode()
1450                 + ")"
1451             )
1452             if total_path_attributes_length:
1453                 logger.debug(
1454                     "  Total Path Attributes Length="
1455                     + str(total_path_attributes_length)
1456                     + " (0x"
1457                     + binascii.hexlify(total_path_attributes_length_hex).decode()
1458                     + ")"
1459                 )
1460                 logger.debug(
1461                     "  Path Attributes="
1462                     + "(0x"
1463                     + binascii.hexlify(path_attributes_hex).decode()
1464                     + ")"
1465                 )
1466                 logger.debug("    Origin=IGP")
1467                 logger.debug("    AS path=" + str(my_autonomous_system))
1468                 logger.debug("    Next hop=" + str(next_hop))
1469                 if originator_id is not None:
1470                     logger.debug("    Originator id=" + str(originator_id))
1471                 if cluster_list_item is not None:
1472                     logger.debug("    Cluster list=" + str(cluster_list_item))
1473                 if self.bgpls:
1474                     logger.debug("    MP_REACH_NLRI: %s", ls_nlri)
1475             logger.debug(
1476                 "  Network Layer Reachability Information="
1477                 + str(nlri_prefixes)
1478                 + "/"
1479                 + str(nlri_prefix_length)
1480                 + " (0x"
1481                 + binascii.hexlify(nlri_hex).decode()
1482                 + ")"
1483             )
1484             logger.debug(
1485                 "UPDATE message encoded: 0x" + binascii.b2a_hex(message_hex).decode()
1486             )
1487
1488         # updating counter
1489         self.updates_sent += 1
1490         # returning encoded message
1491         return message_hex
1492
1493     def notification_message(self, error_code, error_subcode, data_hex=b""):
1494         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1495
1496         Arguments:
1497             :param error_code: see the rfc4271#section-4.5
1498             :param error_subcode: see the rfc4271#section-4.5
1499             :param data_hex: see the rfc4271#section-4.5
1500         Returns:
1501             :return: encoded NOTIFICATION message in HEX
1502         """
1503
1504         # Marker
1505         marker_hex = b"\xFF" * 16
1506
1507         # Type
1508         type = 3
1509         type_hex = struct.pack("B", type)
1510
1511         # Error Code
1512         error_code_hex = struct.pack("B", error_code)
1513
1514         # Error Subode
1515         error_subcode_hex = struct.pack("B", error_subcode)
1516
1517         # Length (big-endian)
1518         length = (
1519             len(marker_hex)
1520             + 2
1521             + len(type_hex)
1522             + len(error_code_hex)
1523             + len(error_subcode_hex)
1524             + len(data_hex)
1525         )
1526         length_hex = struct.pack(">H", length)
1527
1528         # NOTIFICATION Message
1529         message_hex = (
1530             marker_hex
1531             + length_hex
1532             + type_hex
1533             + error_code_hex
1534             + error_subcode_hex
1535             + data_hex
1536         )
1537
1538         if self.log_debug:
1539             logger.debug("NOTIFICATION message encoding")
1540             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex).decode())
1541             logger.debug(
1542                 "  Length="
1543                 + str(length)
1544                 + " (0x"
1545                 + binascii.hexlify(length_hex).decode()
1546                 + ")"
1547             )
1548             logger.debug(
1549                 "  Type="
1550                 + str(type)
1551                 + " (0x"
1552                 + binascii.hexlify(type_hex).decode()
1553                 + ")"
1554             )
1555             logger.debug(
1556                 "  Error Code="
1557                 + str(error_code)
1558                 + " (0x"
1559                 + binascii.hexlify(error_code_hex).decode()
1560                 + ")"
1561             )
1562             logger.debug(
1563                 "  Error Subode="
1564                 + str(error_subcode)
1565                 + " (0x"
1566                 + binascii.hexlify(error_subcode_hex).decode()
1567                 + ")"
1568             )
1569             logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex).decode() + ")")
1570             logger.debug(
1571                 "NOTIFICATION message encoded: 0x%s", binascii.b2a_hex(message_hex)
1572             )
1573
1574         return message_hex
1575
1576     def keepalive_message(self):
1577         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1578
1579         Returns:
1580             :return: encoded KEEP ALIVE message in HEX
1581         """
1582
1583         # Marker
1584         marker_hex = b"\xFF" * 16
1585
1586         # Type
1587         type = 4
1588         type_hex = struct.pack("B", type)
1589
1590         # Length (big-endian)
1591         length = len(marker_hex) + 2 + len(type_hex)
1592         length_hex = struct.pack(">H", length)
1593
1594         # KEEP ALIVE Message
1595         message_hex = marker_hex + length_hex + type_hex
1596
1597         if self.log_debug:
1598             logger.debug("KEEP ALIVE message encoding")
1599             logger.debug("  Marker=0x" + binascii.hexlify(marker_hex).decode())
1600             logger.debug(
1601                 "  Length="
1602                 + str(length)
1603                 + " (0x"
1604                 + binascii.hexlify(length_hex).decode()
1605                 + ")"
1606             )
1607             logger.debug(
1608                 "  Type="
1609                 + str(type)
1610                 + " (0x"
1611                 + binascii.hexlify(type_hex).decode()
1612                 + ")"
1613             )
1614             logger.debug(
1615                 "KEEP ALIVE message encoded: 0x%s",
1616                 binascii.b2a_hex(message_hex).decode(),
1617             )
1618
1619         return message_hex
1620
1621
1622 class TimeTracker(object):
1623     """Class for tracking timers, both for my keepalives and
1624     peer's hold time.
1625     """
1626
1627     def __init__(self, msg_in):
1628         """Initialisation. based on defaults and OPEN message from peer.
1629
1630         Arguments:
1631             msg_in: the OPEN message received from peer.
1632         """
1633         # Note: Relative time is always named timedelta, to stress that
1634         # the (non-delta) time is absolute.
1635         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
1636         # Upper bound for being stuck in the same state, we should
1637         # at least report something before continuing.
1638         # Negotiate the hold timer by taking the smaller
1639         # of the 2 values (mine and the peer's).
1640         hold_timedelta = 180  # Not an attribute of self yet.
1641         # TODO: Make the default value configurable,
1642         # default value could mirror what peer said.
1643         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1644         if hold_timedelta > peer_hold_timedelta:
1645             hold_timedelta = peer_hold_timedelta
1646         if hold_timedelta != 0 and hold_timedelta < 3:
1647             logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1648             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1649         self.hold_timedelta = hold_timedelta
1650         # If we do not hear from peer this long, we assume it has died.
1651         self.keepalive_timedelta = int(hold_timedelta / 3.0)
1652         # Upper limit for duration between messages, to avoid being
1653         # declared to be dead.
1654         # The same as calling snapshot(), but also declares a field.
1655         self.snapshot_time = time.time()
1656         # Sometimes we need to store time. This is where to get
1657         # the value from afterwards. Time_keepalive may be too strict.
1658         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1659         # At this time point, peer will be declared dead.
1660         self.my_keepalive_time = None  # to be set later
1661         # At this point, we should be sending keepalive message.
1662
1663     def snapshot(self):
1664         """Store current time in instance data to use later."""
1665         # Read as time before something interesting was called.
1666         self.snapshot_time = time.time()
1667
1668     def reset_peer_hold_time(self):
1669         """Move hold time to future as peer has just proven it still lives."""
1670         self.peer_hold_time = time.time() + self.hold_timedelta
1671
1672     # Some methods could rely on self.snapshot_time, but it is better
1673     # to require user to provide it explicitly.
1674     def reset_my_keepalive_time(self, keepalive_time):
1675         """Calculate and set the next my KEEP ALIVE timeout time
1676
1677         Arguments:
1678             :keepalive_time: the initial value of the KEEP ALIVE timer
1679         """
1680         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1681
1682     def is_time_for_my_keepalive(self):
1683         """Check for my KEEP ALIVE timeout occurence"""
1684         if self.hold_timedelta == 0:
1685             return False
1686         return self.snapshot_time >= self.my_keepalive_time
1687
1688     def get_next_event_time(self):
1689         """Set the time of the next expected or to be sent KEEP ALIVE"""
1690         if self.hold_timedelta == 0:
1691             return self.snapshot_time + 86400
1692         return min(self.my_keepalive_time, self.peer_hold_time)
1693
1694     def check_peer_hold_time(self, snapshot_time):
1695         """Raise error if nothing was read from peer until specified time."""
1696         # Hold time = 0 means keepalive checking off.
1697         if self.hold_timedelta != 0:
1698             # time.time() may be too strict
1699             if snapshot_time > self.peer_hold_time:
1700                 logger.error("Peer has overstepped the hold timer.")
1701                 raise RuntimeError("Peer has overstepped the hold timer.")
1702                 # TODO: Include hold_timedelta?
1703                 # TODO: Add notification sending (attempt). That means
1704                 # move to write tracker.
1705
1706
1707 class ReadTracker(object):
1708     """Class for tracking read of mesages chunk by chunk and
1709     for idle waiting.
1710     """
1711
1712     def __init__(
1713         self,
1714         bgp_socket,
1715         timer,
1716         storage,
1717         evpn=False,
1718         mvpn=False,
1719         l3vpn_mcast=False,
1720         allf=False,
1721         l3vpn=False,
1722         rt_constrain=False,
1723         ipv6=False,
1724         grace=8,
1725         wait_for_read=10,
1726     ):
1727         """The reader initialisation.
1728
1729         Arguments:
1730             bgp_socket: socket to be used for sending
1731             timer: timer to be used for scheduling
1732             storage: thread safe dict
1733             evpn: flag that evpn functionality is tested
1734             mvpn: flag that mvpn functionality is tested
1735             grace: flag that grace-restart functionality is tested
1736             l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1737             l3vpn: flag that l3vpn unicast functionality is tested
1738             rt_constrain: flag that rt-constrain functionality is tested
1739             allf: flag for all family testing.
1740         """
1741         # References to outside objects.
1742         self.socket = bgp_socket
1743         self.timer = timer
1744         # BGP marker length plus length field length.
1745         self.header_length = 18
1746         # TODO: make it class (constant) attribute
1747         # Computation of where next chunk ends depends on whether
1748         # we are beyond length field.
1749         self.reading_header = True
1750         # Countdown towards next size computation.
1751         self.bytes_to_read = self.header_length
1752         # Incremental buffer for message under read.
1753         self.msg_in = b""
1754         # Initialising counters
1755         self.updates_received = 0
1756         self.prefixes_introduced = 0
1757         self.prefixes_withdrawn = 0
1758         self.rx_idle_time = 0
1759         self.rx_activity_detected = True
1760         self.storage = storage
1761         self.evpn = evpn
1762         self.mvpn = mvpn
1763         self.l3vpn_mcast = l3vpn_mcast
1764         self.l3vpn = l3vpn
1765         self.rt_constrain = rt_constrain
1766         self.ipv6 = ipv6
1767         self.allf = allf
1768         self.wfr = wait_for_read
1769         self.grace = grace
1770
1771     def read_message_chunk(self):
1772         """Read up to one message
1773
1774         Note:
1775             Currently it does not return anything.
1776         """
1777         # TODO: We could return the whole message, currently not needed.
1778         # We assume the socket is readable.
1779         chunk_message = self.socket.recv(self.bytes_to_read)
1780         self.msg_in += chunk_message
1781         self.bytes_to_read -= len(chunk_message)
1782         # TODO: bytes_to_read < 0 is not possible, right?
1783         if not self.bytes_to_read:
1784             # Finished reading a logical block.
1785             if self.reading_header:
1786                 # The logical block was a BGP header.
1787                 # Now we know the size of the message.
1788                 self.reading_header = False
1789                 self.bytes_to_read = (
1790                     get_short_int_from_message(self.msg_in) - self.header_length
1791                 )
1792             else:  # We have finished reading the body of the message.
1793                 # Peer has just proven it is still alive.
1794                 self.timer.reset_peer_hold_time()
1795                 # TODO: Do we want to count received messages?
1796                 # This version ignores the received message.
1797                 # TODO: Should we do validation and exit on anything
1798                 # besides update or keepalive?
1799                 # Prepare state for reading another message.
1800                 message_type_hex = self.msg_in[
1801                     self.header_length : self.header_length + 1
1802                 ]
1803                 if message_type_hex == b"\x01":
1804                     logger.info(
1805                         "OPEN message received: 0x%s",
1806                         binascii.b2a_hex(self.msg_in).decode(),
1807                     )
1808                 elif message_type_hex == b"\x02":
1809                     logger.debug(
1810                         "UPDATE message received: 0x%s",
1811                         binascii.b2a_hex(self.msg_in).decode(),
1812                     )
1813                     self.decode_update_message(self.msg_in)
1814                 elif message_type_hex == b"\x03":
1815                     logger.info(
1816                         "NOTIFICATION message received: 0x%s",
1817                         binascii.b2a_hex(self.msg_in).decode(),
1818                     )
1819                 elif message_type_hex == b"\x04":
1820                     logger.info(
1821                         "KEEP ALIVE message received: 0x%s",
1822                         binascii.b2a_hex(self.msg_in).decode(),
1823                     )
1824                 else:
1825                     logger.warning(
1826                         "Unexpected message received: 0x%s",
1827                         binascii.b2a_hex(self.msg_in).decode(),
1828                     )
1829                 self.msg_in = b""
1830                 self.reading_header = True
1831                 self.bytes_to_read = self.header_length
1832         # We should not act upon peer_hold_time if we are reading
1833         # something right now.
1834         return
1835
1836     def decode_path_attributes(self, path_attributes_hex):
1837         """Decode the Path Attributes field (rfc4271#section-4.3)
1838
1839         Arguments:
1840             :path_attributes: path_attributes field to be decoded in hex
1841         Returns:
1842             :return: None
1843         """
1844         hex_to_decode = path_attributes_hex
1845
1846         while len(hex_to_decode):
1847             attr_flags_hex = hex_to_decode[0:1]
1848             attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1849             #            attr_optional_bit = attr_flags & 128
1850             #            attr_transitive_bit = attr_flags & 64
1851             #            attr_partial_bit = attr_flags & 32
1852             attr_extended_length_bit = attr_flags & 16
1853
1854             attr_type_code_hex = hex_to_decode[1:2]
1855             attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1856
1857             if attr_extended_length_bit:
1858                 attr_length_hex = hex_to_decode[2:4]
1859                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1860                 attr_value_hex = hex_to_decode[4 : 4 + attr_length]
1861                 hex_to_decode = hex_to_decode[4 + attr_length :]
1862             else:
1863                 attr_length_hex = hex_to_decode[2:3]
1864                 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1865                 attr_value_hex = hex_to_decode[3 : 3 + attr_length]
1866                 hex_to_decode = hex_to_decode[3 + attr_length :]
1867
1868             if attr_type_code == 1:
1869                 logger.debug(
1870                     "Attribute type=1 (ORIGIN, flags:0x%s)",
1871                     binascii.b2a_hex(attr_flags_hex),
1872                 )
1873                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1874             elif attr_type_code == 2:
1875                 logger.debug(
1876                     "Attribute type=2 (AS_PATH, flags:0x%s)",
1877                     binascii.b2a_hex(attr_flags_hex),
1878                 )
1879                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1880             elif attr_type_code == 3:
1881                 logger.debug(
1882                     "Attribute type=3 (NEXT_HOP, flags:0x%s)",
1883                     binascii.b2a_hex(attr_flags_hex),
1884                 )
1885                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1886             elif attr_type_code == 4:
1887                 logger.debug(
1888                     "Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1889                     binascii.b2a_hex(attr_flags_hex),
1890                 )
1891                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1892             elif attr_type_code == 5:
1893                 logger.debug(
1894                     "Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1895                     binascii.b2a_hex(attr_flags_hex),
1896                 )
1897                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1898             elif attr_type_code == 6:
1899                 logger.debug(
1900                     "Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1901                     binascii.b2a_hex(attr_flags_hex),
1902                 )
1903                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1904             elif attr_type_code == 7:
1905                 logger.debug(
1906                     "Attribute type=7 (AGGREGATOR, flags:0x%s)",
1907                     binascii.b2a_hex(attr_flags_hex),
1908                 )
1909                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1910             elif attr_type_code == 9:  # rfc4456#section-8
1911                 logger.debug(
1912                     "Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1913                     binascii.b2a_hex(attr_flags_hex),
1914                 )
1915                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1916             elif attr_type_code == 10:  # rfc4456#section-8
1917                 logger.debug(
1918                     "Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1919                     binascii.b2a_hex(attr_flags_hex),
1920                 )
1921                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1922             elif attr_type_code == 14:  # rfc4760#section-3
1923                 logger.debug(
1924                     "Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1925                     binascii.b2a_hex(attr_flags_hex),
1926                 )
1927                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1928                 address_family_identifier_hex = attr_value_hex[0:2]
1929                 logger.debug(
1930                     "  Address Family Identifier=0x%s",
1931                     binascii.b2a_hex(address_family_identifier_hex),
1932                 )
1933                 subsequent_address_family_identifier_hex = attr_value_hex[2:3]
1934                 logger.debug(
1935                     "  Subsequent Address Family Identifier=0x%s",
1936                     binascii.b2a_hex(subsequent_address_family_identifier_hex),
1937                 )
1938                 next_hop_netaddr_len_hex = attr_value_hex[3:4]
1939                 next_hop_netaddr_len = int(
1940                     binascii.b2a_hex(next_hop_netaddr_len_hex), 16
1941                 )
1942                 logger.debug(
1943                     "  Length of Next Hop Network Address=%s (0x%s)",
1944                     next_hop_netaddr_len,
1945                     binascii.b2a_hex(next_hop_netaddr_len_hex),
1946                 )
1947                 next_hop_netaddr_hex = attr_value_hex[4 : 4 + next_hop_netaddr_len]
1948                 next_hop_netaddr = ".".join(
1949                     str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)
1950                 )
1951                 logger.debug(
1952                     "  Network Address of Next Hop=%s (0x%s)",
1953                     next_hop_netaddr,
1954                     binascii.b2a_hex(next_hop_netaddr_hex),
1955                 )
1956                 reserved_byte_pos = 4 + next_hop_netaddr_len
1957                 reserved_hex = attr_value_hex[reserved_byte_pos : reserved_byte_pos + 1]
1958                 logger.debug("  Reserved=0x%s", binascii.b2a_hex(reserved_hex))
1959                 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1 :]
1960                 logger.debug(
1961                     "  Network Layer Reachability Information=0x%s",
1962                     binascii.b2a_hex(nlri_hex),
1963                 )
1964                 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1965                 logger.debug("  NLRI prefix list: %s", nlri_prefix_list)
1966                 for prefix in nlri_prefix_list:
1967                     logger.debug("  nlri_prefix_received: %s", prefix)
1968                 self.prefixes_introduced += len(nlri_prefix_list)  # update counter
1969             elif attr_type_code == 15:  # rfc4760#section-4
1970                 logger.debug(
1971                     "Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1972                     binascii.b2a_hex(attr_flags_hex),
1973                 )
1974                 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1975                 address_family_identifier_hex = attr_value_hex[0:2]
1976                 logger.debug(
1977                     "  Address Family Identifier=0x%s",
1978                     binascii.b2a_hex(address_family_identifier_hex),
1979                 )
1980                 subsequent_address_family_identifier_hex = attr_value_hex[2:3]
1981                 logger.debug(
1982                     "  Subsequent Address Family Identifier=0x%s",
1983                     binascii.b2a_hex(subsequent_address_family_identifier_hex),
1984                 )
1985                 wd_hex = attr_value_hex[3:]
1986                 logger.debug("  Withdrawn Routes=0x%s", binascii.b2a_hex(wd_hex))
1987                 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1988                 logger.debug("  Withdrawn routes prefix list: %s", wdr_prefix_list)
1989                 for prefix in wdr_prefix_list:
1990                     logger.debug("  withdrawn_prefix_received: %s", prefix)
1991                 self.prefixes_withdrawn += len(wdr_prefix_list)  # update counter
1992             else:
1993                 logger.debug(
1994                     "Unknown attribute type=%s, flags:0x%s)",
1995                     attr_type_code,
1996                     binascii.b2a_hex(attr_flags_hex),
1997                 )
1998                 logger.debug(
1999                     "Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)
2000                 )
2001         return None
2002
2003     def decode_update_message(self, msg):
2004         """Decode an UPDATE message (rfc4271#section-4.3)
2005
2006         Arguments:
2007             :msg: message to be decoded in hex
2008         Returns:
2009             :return: None
2010         """
2011         logger.debug("Decoding update message:")
2012         # message header - marker
2013         marker_hex = msg[:16]
2014         logger.debug("Message header marker: 0x%s", binascii.b2a_hex(marker_hex))
2015         # message header - message length
2016         msg_length_hex = msg[16:18]
2017         msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
2018         logger.debug(
2019             "Message lenght: 0x%s (%s)", binascii.b2a_hex(msg_length_hex), msg_length
2020         )
2021         # message header - message type
2022         msg_type_hex = msg[18:19]
2023         msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
2024
2025         with self.storage as stor:
2026             # this will replace the previously stored message
2027             stor["update"] = binascii.hexlify(msg).decode()
2028
2029         logger.debug("Evpn {}".format(self.evpn))
2030         if self.evpn:
2031             logger.debug("Skipping update decoding due to evpn data expected")
2032             return
2033
2034         logger.debug("Graceful-restart {}".format(self.grace))
2035         if self.grace != 8:
2036             logger.debug(
2037                 "Skipping update decoding due to graceful-restart data expected"
2038             )
2039             return
2040
2041         logger.debug("Mvpn {}".format(self.mvpn))
2042         if self.mvpn:
2043             logger.debug("Skipping update decoding due to mvpn data expected")
2044             return
2045
2046         logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
2047         if self.l3vpn_mcast:
2048             logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
2049             return
2050
2051         logger.debug("L3vpn-unicast {}".format(self.l3vpn))
2052         if self.l3vpn_mcast:
2053             logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
2054             return
2055
2056         logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
2057         if self.rt_constrain:
2058             logger.debug(
2059                 "Skipping update decoding due to Route-Target-Constrain data expected"
2060             )
2061             return
2062
2063         logger.debug("Ipv6-Unicast {}".format(self.ipv6))
2064         if self.ipv6:
2065             logger.debug("Skipping update decoding due to Ipv6 data expected")
2066             return
2067
2068         logger.debug("Allf {}".format(self.allf))
2069         if self.allf:
2070             logger.debug("Skipping update decoding")
2071             return
2072
2073         if msg_type == 2:
2074             logger.debug("Message type: 0x%s (update)", binascii.b2a_hex(msg_type_hex))
2075             # withdrawn routes length
2076             wdr_length_hex = msg[19:21]
2077             wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
2078             logger.debug(
2079                 "Withdrawn routes lenght: 0x%s (%s)",
2080                 binascii.b2a_hex(wdr_length_hex),
2081                 wdr_length,
2082             )
2083             # withdrawn routes
2084             wdr_hex = msg[21 : 21 + wdr_length]
2085             logger.debug("Withdrawn routes: 0x%s", binascii.b2a_hex(wdr_hex))
2086             wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
2087             logger.debug("Withdrawn routes prefix list: %s", wdr_prefix_list)
2088             for prefix in wdr_prefix_list:
2089                 logger.debug("withdrawn_prefix_received: %s", prefix)
2090             # total path attribute length
2091             total_pa_length_offset = 21 + wdr_length
2092             total_pa_length_hex = msg[
2093                 total_pa_length_offset : total_pa_length_offset + 2
2094             ]
2095             total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
2096             logger.debug(
2097                 "Total path attribute lenght: 0x%s (%s)",
2098                 binascii.b2a_hex(total_pa_length_hex),
2099                 total_pa_length,
2100             )
2101             # path attributes
2102             pa_offset = total_pa_length_offset + 2
2103             pa_hex = msg[pa_offset : pa_offset + total_pa_length]
2104             logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
2105             self.decode_path_attributes(pa_hex)
2106             # network layer reachability information length
2107             nlri_length = msg_length - 23 - total_pa_length - wdr_length
2108             logger.debug("Calculated NLRI length: %s", nlri_length)
2109             # network layer reachability information
2110             nlri_offset = pa_offset + total_pa_length
2111             nlri_hex = msg[nlri_offset : nlri_offset + nlri_length]
2112             logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
2113             nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
2114             logger.debug("NLRI prefix list: %s", nlri_prefix_list)
2115             for prefix in nlri_prefix_list:
2116                 logger.debug("nlri_prefix_received: %s", prefix)
2117             # Updating counters
2118             self.updates_received += 1
2119             self.prefixes_introduced += len(nlri_prefix_list)
2120             self.prefixes_withdrawn += len(wdr_prefix_list)
2121         else:
2122             logger.error(
2123                 "Unexpeced message type 0x%s in 0x%s",
2124                 binascii.b2a_hex(msg_type_hex),
2125                 binascii.b2a_hex(msg),
2126             )
2127
2128     def wait_for_read(self):
2129         """Read message until timeout (next expected event).
2130
2131         Note:
2132             Used when no more updates has to be sent to avoid busy-wait.
2133             Currently it does not return anything.
2134         """
2135         # Compute time to the first predictable state change
2136         event_time = self.timer.get_next_event_time()
2137         # snapshot_time would be imprecise
2138         wait_timedelta = min(event_time - time.time(), self.wfr)
2139         if wait_timedelta < 0:
2140             # The program got around to waiting to an event in "very near
2141             # future" so late that it became a "past" event, thus tell
2142             # "select" to not wait at all. Passing negative timedelta to
2143             # select() would lead to either waiting forever (for -1) or
2144             # select.error("Invalid parameter") (for everything else).
2145             wait_timedelta = 0
2146         # And wait for event or something to read.
2147
2148         if not self.rx_activity_detected or not (self.updates_received % 100):
2149             # right time to write statistics to the log (not for every update and
2150             # not too frequently to avoid having large log files)
2151             logger.info(
2152                 "total_received_update_message_counter: %s", self.updates_received
2153             )
2154             logger.info(
2155                 "total_received_nlri_prefix_counter: %s", self.prefixes_introduced
2156             )
2157             logger.info(
2158                 "total_received_withdrawn_prefix_counter: %s", self.prefixes_withdrawn
2159             )
2160
2161         start_time = time.time()
2162         select.select([self.socket], [], [self.socket], wait_timedelta)
2163         timedelta = time.time() - start_time
2164         self.rx_idle_time += timedelta
2165         self.rx_activity_detected = timedelta < 1
2166
2167         if not self.rx_activity_detected or not (self.updates_received % 100):
2168             # right time to write statistics to the log (not for every update and
2169             # not too frequently to avoid having large log files)
2170             logger.info("... idle for %.3fs", timedelta)
2171             logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
2172         return
2173
2174
2175 class WriteTracker(object):
2176     """Class tracking enqueueing messages and sending chunks of them."""
2177
2178     def __init__(self, bgp_socket, generator, timer):
2179         """The writter initialisation.
2180
2181         Arguments:
2182             bgp_socket: socket to be used for sending
2183             generator: generator to be used for message generation
2184             timer: timer to be used for scheduling
2185         """
2186         # References to outside objects,
2187         self.socket = bgp_socket
2188         self.generator = generator
2189         self.timer = timer
2190         # Really new fields.
2191         # TODO: Would attribute docstrings add anything substantial?
2192         self.sending_message = False
2193         self.bytes_to_send = 0
2194         self.msg_out = b""
2195
2196     def enqueue_message_for_sending(self, message):
2197         """Enqueue message and change state.
2198
2199         Arguments:
2200             message: message to be enqueued into the msg_out buffer
2201         """
2202         self.msg_out += message
2203         self.bytes_to_send += len(message)
2204         self.sending_message = True
2205
2206     def send_message_chunk_is_whole(self):
2207         """Send enqueued data from msg_out buffer
2208
2209         Returns:
2210             :return: true if no remaining data to send
2211         """
2212         # We assume there is a msg_out to send and socket is writable.
2213         self.timer.snapshot()
2214         bytes_sent = self.socket.send(self.msg_out)
2215         # Forget the part of message that was sent.
2216         self.msg_out = self.msg_out[bytes_sent:]
2217         self.bytes_to_send -= bytes_sent
2218         if not self.bytes_to_send:
2219             # TODO: Is it possible to hit negative bytes_to_send?
2220             self.sending_message = False
2221             # We should have reset hold timer on peer side.
2222             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
2223             # The possible reason for not prioritizing reads is gone.
2224             return True
2225         return False
2226
2227
2228 class StateTracker(object):
2229     """Main loop has state so complex it warrants this separate class."""
2230
2231     def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
2232         """The state tracker initialisation.
2233
2234         Arguments:
2235             bgp_socket: socket to be used for sending / receiving
2236             generator: generator to be used for message generation
2237             timer: timer to be used for scheduling
2238             inqueue: user initiated messages queue
2239             storage: thread safe dict to store data for the rpc server
2240             cliargs: cli args from the user
2241         """
2242         # References to outside objects.
2243         self.socket = bgp_socket
2244         self.generator = generator
2245         self.timer = timer
2246         # Sub-trackers.
2247         self.reader = ReadTracker(
2248             bgp_socket,
2249             timer,
2250             storage,
2251             evpn=cliargs.evpn,
2252             mvpn=cliargs.mvpn,
2253             l3vpn_mcast=cliargs.l3vpn_mcast,
2254             l3vpn=cliargs.l3vpn,
2255             allf=cliargs.allf,
2256             rt_constrain=cliargs.rt_constrain,
2257             ipv6=cliargs.ipv6,
2258             grace=cliargs.grace,
2259             wait_for_read=cliargs.wfr,
2260         )
2261         self.writer = WriteTracker(bgp_socket, generator, timer)
2262         # Prioritization state.
2263         self.prioritize_writing = False
2264         # In general, we prioritize reading over writing. But in order
2265         # not to get blocked by neverending reads, we should
2266         # check whether we are not risking running out of holdtime.
2267         # So in some situations, this field is set to True to attempt
2268         # finishing sending a message, after which this field resets
2269         # back to False.
2270         # TODO: Alternative is to switch fairly between reading and
2271         # writing (called round robin from now on).
2272         # Message counting is done in generator.
2273         self.inqueue = inqueue
2274
2275     def perform_one_loop_iteration(self):
2276         """The main loop iteration
2277
2278         Notes:
2279             Calculates priority, resolves all conditions, calls
2280             appropriate method and returns to caller to repeat.
2281         """
2282         self.timer.snapshot()
2283         if not self.prioritize_writing:
2284             if self.timer.is_time_for_my_keepalive():
2285                 if not self.writer.sending_message:
2286                     # We need to schedule a keepalive ASAP.
2287                     self.writer.enqueue_message_for_sending(
2288                         self.generator.keepalive_message()
2289                     )
2290                     logger.info("KEEP ALIVE is sent.")
2291                 # We are sending a message now, so let's prioritize it.
2292                 self.prioritize_writing = True
2293
2294         try:
2295             msg = self.inqueue.get_nowait()
2296             logger.info("Received message: {}".format(msg))
2297             msgbin = binascii.unhexlify(msg)
2298             self.writer.enqueue_message_for_sending(msgbin)
2299         except queue.Empty:
2300             pass
2301         # Now we know what our priorities are, we have to check
2302         # which actions are available.
2303         # socket.socket() returns three lists,
2304         # we store them to list of lists.
2305         list_list = select.select(
2306             [self.socket], [self.socket], [self.socket], self.timer.report_timedelta
2307         )
2308         read_list, write_list, except_list = list_list
2309         # Lists are unpacked, each is either [] or [self.socket],
2310         # so we will test them as boolean.
2311         if except_list:
2312             logger.error("Exceptional state on the socket.")
2313             raise RuntimeError("Exceptional state on socket", self.socket)
2314         # We will do either read or write.
2315         if not (self.prioritize_writing and write_list):
2316             # Either we have no reason to rush writes,
2317             # or the socket is not writable.
2318             # We are focusing on reading here.
2319             if read_list:  # there is something to read indeed
2320                 # In this case we want to read chunk of message
2321                 # and repeat the select,
2322                 self.reader.read_message_chunk()
2323                 return
2324             # We were focusing on reading, but nothing to read was there.
2325             # Good time to check peer for hold timer.
2326             self.timer.check_peer_hold_time(self.timer.snapshot_time)
2327             # Quiet on the read front, we can have attempt to write.
2328         if write_list:
2329             # Either we really want to reset peer's view of our hold
2330             # timer, or there was nothing to read.
2331             # Were we in the middle of sending a message?
2332             if self.writer.sending_message:
2333                 # Was it the end of a message?
2334                 whole = self.writer.send_message_chunk_is_whole()
2335                 # We were pressed to send something and we did it.
2336                 if self.prioritize_writing and whole:
2337                     # We prioritize reading again.
2338                     self.prioritize_writing = False
2339                 return
2340             # Finally to check if still update messages to be generated.
2341             if self.generator.remaining_prefixes:
2342                 msg_out = self.generator.compose_update_message()
2343                 if not self.generator.remaining_prefixes:
2344                     # We have just finished update generation,
2345                     # end-of-rib is due.
2346                     logger.info("All update messages generated.")
2347                     logger.info("Storing performance results.")
2348                     self.generator.store_results()
2349                     logger.info("Finally an END-OF-RIB is sent.")
2350                     msg_out += self.generator.update_message(
2351                         wr_prefixes=[], nlri_prefixes=[], end_of_rib=True
2352                     )
2353                 self.writer.enqueue_message_for_sending(msg_out)
2354                 # Attempt for real sending to be done in next iteration.
2355                 return
2356             # Nothing to write anymore.
2357             # To avoid busy loop, we do idle waiting here.
2358             self.reader.wait_for_read()
2359             return
2360         # We can neither read nor write.
2361         logger.warning(
2362             "Input and output both blocked for "
2363             + str(self.timer.report_timedelta)
2364             + " seconds."
2365         )
2366         # FIXME: Are we sure select has been really waiting
2367         # the whole period?
2368         return
2369
2370
2371 def create_logger(loglevel, logfile):
2372     """Create logger object
2373
2374     Arguments:
2375         :loglevel: log level
2376         :logfile: log file name
2377     Returns:
2378         :return: logger object
2379     """
2380     logger = logging.getLogger("logger")
2381     log_formatter = logging.Formatter(
2382         "%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s"
2383     )
2384     console_handler = logging.StreamHandler()
2385     file_handler = logging.FileHandler(logfile, mode="w")
2386     console_handler.setFormatter(log_formatter)
2387     file_handler.setFormatter(log_formatter)
2388     logger.addHandler(console_handler)
2389     logger.addHandler(file_handler)
2390     logger.setLevel(loglevel)
2391     return logger
2392
2393
2394 def job(arguments, inqueue, storage):
2395     """One time initialisation and iterations looping.
2396     Notes:
2397         Establish BGP connection and run iterations.
2398
2399     Arguments:
2400         :arguments: Command line arguments
2401         :inqueue: Data to be sent from play.py
2402         :storage: Shared dict for rpc server
2403     Returns:
2404         :return: None
2405     """
2406     bgp_socket = establish_connection(arguments)
2407     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2408     # Receive open message before sending anything.
2409     # FIXME: Add parameter to send default open message first,
2410     # to work with "you first" peers.
2411     msg_in = read_open_message(bgp_socket)
2412     logger.info(binascii.hexlify(msg_in).decode())
2413     storage["open"] = binascii.hexlify(msg_in).decode()
2414     timer = TimeTracker(msg_in)
2415     generator = MessageGenerator(arguments)
2416     msg_out = generator.open_message()
2417     logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out).decode())
2418     # Send our open message to the peer.
2419     bgp_socket.send(msg_out)
2420     # Wait for confirming keepalive.
2421     # TODO: Surely in just one packet?
2422     # Using exact keepalive length to not to see possible updates.
2423     msg_in = bgp_socket.recv(19)
2424     if msg_in != generator.keepalive_message():
2425         error_msg = "Open not confirmed by keepalive, instead got"
2426         logger.error(error_msg + ": " + binascii.hexlify(msg_in).decode())
2427         raise MessageError(error_msg, msg_in)
2428     timer.reset_peer_hold_time()
2429     # Send the keepalive to indicate the connection is accepted.
2430     timer.snapshot()  # Remember this time.
2431     msg_out = generator.keepalive_message()
2432     logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out).decode())
2433     bgp_socket.send(msg_out)
2434     # Use the remembered time.
2435     timer.reset_my_keepalive_time(timer.snapshot_time)
2436     # End of initial handshake phase.
2437     state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2438     while True:  # main reactor loop
2439         state.perform_one_loop_iteration()
2440
2441
2442 class Rpcs:
2443     """Handler for SimpleXMLRPCServer"""
2444
2445     def __init__(self, sendqueue, storage):
2446         """Init method
2447
2448         Arguments:
2449             :sendqueue: queue for data to be sent towards odl
2450             :storage: thread safe dict
2451         """
2452         self.queue = sendqueue
2453         self.storage = storage
2454
2455     def send(self, text):
2456         """Data to be sent
2457
2458         Arguments:
2459             :text: hes string of the data to be sent
2460         """
2461         self.queue.put(text)
2462
2463     def get(self, text=""):
2464         """Reads data form the storage
2465
2466         - returns stored data or an empty string, at the moment only
2467           'update' is stored
2468
2469         Arguments:
2470             :text: a key to the storage to get the data
2471         Returns:
2472             :data: stored data
2473         """
2474         with self.storage as stor:
2475             return stor.get(text, "")
2476
2477     def clean(self, text=""):
2478         """Cleans data form the storage
2479
2480         Arguments:
2481             :text: a key to the storage to clean the data
2482         """
2483         with self.storage as stor:
2484             if text in stor:
2485                 del stor[text]
2486
2487
2488 def threaded_job(arguments):
2489     """Run the job threaded
2490
2491     Arguments:
2492         :arguments: Command line arguments
2493     Returns:
2494         :return: None
2495     """
2496     amount_left = arguments.amount
2497     utils_left = arguments.multiplicity
2498     prefix_current = arguments.firstprefix
2499     myip_current = arguments.myip
2500     port = arguments.port
2501     thread_args = []
2502     rpcqueue = queue.Queue()
2503     storage = SafeDict()
2504
2505     while 1:
2506         amount_per_util = int((amount_left - 1) / utils_left) + 1  # round up
2507         amount_left -= amount_per_util
2508         utils_left -= 1
2509
2510         args = deepcopy(arguments)
2511         args.amount = amount_per_util
2512         args.firstprefix = prefix_current
2513         args.myip = myip_current
2514         thread_args.append(args)
2515
2516         if not utils_left:
2517             break
2518         prefix_current += amount_per_util * 16
2519         myip_current += 1
2520
2521     try:
2522         # Create threads
2523         for t in thread_args:
2524             threading.Thread(
2525                 target=job, args=(t, rpcqueue, storage), daemon=True
2526             ).start()
2527     except Exception:
2528         print("Error: unable to start thread.")
2529         raise SystemExit(2)
2530
2531     if arguments.usepeerip:
2532         ip = arguments.peerip
2533     else:
2534         ip = arguments.myip
2535     rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2536     rpcserver.register_instance(Rpcs(rpcqueue, storage))
2537     rpcserver.serve_forever()
2538
2539
2540 if __name__ == "__main__":
2541     arguments = parse_arguments()
2542     logger = create_logger(arguments.loglevel, arguments.logfile)
2543     threaded_job(arguments)