1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Rpc server port."
69 parser.add_argument("--port", default="8000", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be announced in one iteration"
71 parser.add_argument("--insert", default="1", type=int, help=str_help)
72 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74 str_help = "The number of prefixes to process without withdrawals"
75 parser.add_argument("--prefill", default="0", type=int, help=str_help)
76 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77 parser.add_argument("--updates", choices=["single", "separate"],
78 default=["separate"], help=str_help)
79 str_help = "Base prefix IP address for prefix generation"
80 parser.add_argument("--firstprefix", default="8.0.1.0",
81 type=ipaddr.IPv4Address, help=str_help)
82 str_help = "The prefix length."
83 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84 str_help = "Listen for connection, instead of initiating it."
85 parser.add_argument("--listen", action="store_true", help=str_help)
86 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87 "Default value only suitable for listening.")
88 parser.add_argument("--myip", default="0.0.0.0",
89 type=ipaddr.IPv4Address, help=str_help)
90 str_help = ("TCP port to bind to when listening or initiating connection." +
91 "Default only suitable for initiating.")
92 parser.add_argument("--myport", default="0", type=int, help=str_help)
93 str_help = "The IP of the next hop to be placed into the update messages."
94 parser.add_argument("--nexthop", default="192.0.2.1",
95 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96 str_help = "Identifier of the route originator."
97 parser.add_argument("--originator", default=None,
98 type=ipaddr.IPv4Address, dest="originator", help=str_help)
99 str_help = "Cluster list item identifier."
100 parser.add_argument("--cluster", default=None,
101 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102 str_help = ("Numeric IP Address to try to connect to." +
103 "Currently no effect in listening mode.")
104 parser.add_argument("--peerip", default="127.0.0.2",
105 type=ipaddr.IPv4Address, help=str_help)
106 str_help = "TCP port to try to connect to. No effect in listening mode."
107 parser.add_argument("--peerport", default="179", type=int, help=str_help)
108 str_help = "Local hold time."
109 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110 str_help = "Log level (--error, --warning, --info, --debug)"
111 parser.add_argument("--error", dest="loglevel", action="store_const",
112 const=logging.ERROR, default=logging.INFO,
114 parser.add_argument("--warning", dest="loglevel", action="store_const",
115 const=logging.WARNING, default=logging.INFO,
117 parser.add_argument("--info", dest="loglevel", action="store_const",
118 const=logging.INFO, default=logging.INFO,
120 parser.add_argument("--debug", dest="loglevel", action="store_const",
121 const=logging.DEBUG, default=logging.INFO,
123 str_help = "Log file name"
124 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125 str_help = "Trailing part of the csv result files for plotting purposes"
126 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127 str_help = "Minimum number of updates to reach to include result into csv."
128 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131 str_help = "Using peerip instead of myip for xmlrpc server"
132 parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133 str_help = "Link-State NLRI supported"
134 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135 str_help = "Link-State NLRI: Identifier"
136 parser.add_argument("-lsid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: Tunnel ID"
138 parser.add_argument("-lstid", default="1", type=int, help=str_help)
139 str_help = "Link-State NLRI: LSP ID"
140 parser.add_argument("-lspid", default="1", type=int, help=str_help)
141 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142 parser.add_argument("--lstsaddr", default="1.2.3.4",
143 type=ipaddr.IPv4Address, help=str_help)
144 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145 parser.add_argument("--lsteaddr", default="5.6.7.8",
146 type=ipaddr.IPv4Address, help=str_help)
147 str_help = "Link-State NLRI: Identifier Step"
148 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149 str_help = "Link-State NLRI: Tunnel ID Step"
150 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151 str_help = "Link-State NLRI: LSP ID Step"
152 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157 str_help = "How many play utilities are to be started."
158 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160 Enabling this flag makes the script not decoding the update mesage, because of not\
161 supported decoding for these elements."
162 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164 Enabling this flag makes the script not decoding the update mesage, because of not\
165 supported decoding for these elements."
166 parser.add_argument("--grace", default="8", type=int, help=str_help)
167 str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
168 IPV4-Unicast, IPV6-Unicast, BGP-LS\
169 Enabling this flag makes the script not decoding the update mesage, because of not\
170 supported decoding for these elements."
171 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
172 str_help = "Open message includes L3VPN-MULTICAST arguments.\
173 Enabling this flag makes the script not decoding the update mesage, because of not\
174 supported decoding for these elements."
175 parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
176 str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
177 parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
178 str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
179 parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
180 str_help = "Open message includes ipv6-unicast family, without message decoding."
181 parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
182 str_help = "Add all supported families without message decoding."
183 parser.add_argument("--allf", default=False, action="store_true", help=str_help)
184 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
185 str_help = "Skipping well known attributes for update message"
186 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
187 arguments = parser.parse_args()
188 if arguments.multiplicity < 1:
189 print("Multiplicity", arguments.multiplicity, "is not positive.")
191 # TODO: Are sanity checks (such as asnumber>=0) required?
195 def establish_connection(arguments):
196 """Establish connection to BGP peer.
199 :arguments: following command-line arguments are used
200 - arguments.myip: local IP address
201 - arguments.myport: local port
202 - arguments.peerip: remote IP address
203 - arguments.peerport: remote port
208 logger.info("Connecting in the listening mode.")
209 logger.debug("Local IP address: " + str(arguments.myip))
210 logger.debug("Local port: " + str(arguments.myport))
211 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
213 # bind need single tuple as argument
214 listening_socket.bind((str(arguments.myip), arguments.myport))
215 listening_socket.listen(1)
216 bgp_socket, _ = listening_socket.accept()
217 # TODO: Verify client IP is cotroller IP.
218 listening_socket.close()
220 logger.info("Connecting in the talking mode.")
221 logger.debug("Local IP address: " + str(arguments.myip))
222 logger.debug("Local port: " + str(arguments.myport))
223 logger.debug("Remote IP address: " + str(arguments.peerip))
224 logger.debug("Remote port: " + str(arguments.peerport))
225 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227 # bind to force specified address and port
228 talking_socket.bind((str(arguments.myip), arguments.myport))
229 # socket does not spead ipaddr, hence str()
230 talking_socket.connect((str(arguments.peerip), arguments.peerport))
231 bgp_socket = talking_socket
232 logger.info("Connected to ODL.")
236 def get_short_int_from_message(message, offset=16):
237 """Extract 2-bytes number from provided message.
240 :message: given message
241 :offset: offset of the short_int inside the message
243 :return: required short_inf value.
245 default offset value is the BGP message size offset.
247 high_byte_int = ord(message[offset])
248 low_byte_int = ord(message[offset + 1])
249 short_int = high_byte_int * 256 + low_byte_int
253 def get_prefix_list_from_hex(prefixes_hex):
254 """Get decoded list of prefixes (rfc4271#section-4.3)
257 :prefixes_hex: list of prefixes to be decoded in hex
259 :return: list of prefixes in the form of ip address (X.X.X.X/X)
263 while offset < len(prefixes_hex):
264 prefix_bit_len_hex = prefixes_hex[offset]
265 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
266 prefix_len = ((prefix_bit_len - 1) / 8) + 1
267 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
268 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
269 offset += 1 + prefix_len
270 prefix_list.append(prefix + "/" + str(prefix_bit_len))
274 class MessageError(ValueError):
275 """Value error with logging optimized for hexlified messages."""
277 def __init__(self, text, message, *args):
280 Store and call super init for textual comment,
281 store raw message which caused it.
285 super(MessageError, self).__init__(text, message, *args)
288 """Generate human readable error message.
291 :return: human readable message as string
293 Use a placeholder string if the message is to be empty.
295 message = binascii.hexlify(self.msg)
297 message = "(empty message)"
298 return self.text + ": " + message
301 def read_open_message(bgp_socket):
302 """Receive peer's OPEN message
305 :bgp_socket: the socket to be read
307 :return: received OPEN message.
309 Performs just basic incomming message checks
311 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
312 # TODO: Can the incoming open message be split in more than one packet?
315 # 37 is minimal length of open message with 4-byte AS number.
317 "Message length (" + str(len(msg_in)) + ") is smaller than "
318 "minimal length of OPEN message with 4-byte AS number (37)"
320 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
321 raise MessageError(error_msg, msg_in)
322 # TODO: We could check BGP marker, but it is defined only later;
324 reported_length = get_short_int_from_message(msg_in)
325 if len(msg_in) != reported_length:
327 "Expected message length (" + reported_length +
328 ") does not match actual length (" + str(len(msg_in)) + ")"
330 logger.error(error_msg + binascii.hexlify(msg_in))
331 raise MessageError(error_msg, msg_in)
332 logger.info("Open message received.")
336 class MessageGenerator(object):
337 """Class which generates messages, holds states and configuration values."""
339 # TODO: Define bgp marker as a class (constant) variable.
340 def __init__(self, args):
341 """Initialisation according to command-line args.
344 :args: argsparser's Namespace object which contains command-line
345 options for MesageGenerator initialisation
347 Calculates and stores default values used later on for
350 self.total_prefix_amount = args.amount
351 # Number of update messages left to be sent.
352 self.remaining_prefixes = self.total_prefix_amount
354 # New parameters initialisation
355 self.port = args.port
357 self.prefix_base_default = args.firstprefix
358 self.prefix_length_default = args.prefixlen
359 self.wr_prefixes_default = []
360 self.nlri_prefixes_default = []
361 self.version_default = 4
362 self.my_autonomous_system_default = args.asnumber
363 self.hold_time_default = args.holdtime # Local hold time.
364 self.bgp_identifier_default = int(args.myip)
365 self.next_hop_default = args.nexthop
366 self.originator_id_default = args.originator
367 self.cluster_list_item_default = args.cluster
368 self.single_update_default = args.updates == "single"
369 self.randomize_updates_default = args.updates == "random"
370 self.prefix_count_to_add_default = args.insert
371 self.prefix_count_to_del_default = args.withdraw
372 if self.prefix_count_to_del_default < 0:
373 self.prefix_count_to_del_default = 0
374 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
375 # total number of prefixes must grow to avoid infinite test loop
376 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
377 self.slot_size_default = self.prefix_count_to_add_default
378 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
379 self.results_file_name_default = args.results
380 self.performance_threshold_default = args.threshold
381 self.rfc4760 = args.rfc4760
382 self.bgpls = args.bgpls
383 self.evpn = args.evpn
384 self.mvpn = args.mvpn
385 self.l3vpn_mcast = args.l3vpn_mcast
386 self.l3vpn = args.l3vpn
387 self.rt_constrain = args.rt_constrain
388 self.ipv6 = args.ipv6
389 self.allf = args.allf
390 self.skipattr = args.skipattr
391 self.grace = args.grace
392 # Default values when BGP-LS Attributes are used
394 self.prefix_count_to_add_default = 1
395 self.prefix_count_to_del_default = 0
396 self.ls_nlri_default = {"Identifier": args.lsid,
397 "TunnelID": args.lstid,
399 "IPv4TunnelSenderAddress": args.lstsaddr,
400 "IPv4TunnelEndPointAddress": args.lsteaddr}
401 self.lsid_step = args.lsidstep
402 self.lstid_step = args.lstidstep
403 self.lspid_step = args.lspidstep
404 self.lstsaddr_step = args.lstsaddrstep
405 self.lsteaddr_step = args.lsteaddrstep
406 # Default values used for randomized part
407 s1_slots = ((self.total_prefix_amount -
408 self.remaining_prefixes_threshold - 1) /
409 self.prefix_count_to_add_default + 1)
411 (self.remaining_prefixes_threshold - 1)
412 / (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
416 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
417 s2_first_index = s1_slots * self.prefix_count_to_add_default
418 s2_last_index = (s2_first_index +
419 s2_slots * (self.prefix_count_to_add_default -
420 self.prefix_count_to_del_default) - 1)
421 self.slot_gap_default = ((self.total_prefix_amount -
422 self.remaining_prefixes_threshold - 1) /
423 self.prefix_count_to_add_default + 1)
424 self.randomize_lowest_default = s2_first_index
425 self.randomize_highest_default = s2_last_index
426 # Initialising counters
427 self.phase1_start_time = 0
428 self.phase1_stop_time = 0
429 self.phase2_start_time = 0
430 self.phase2_stop_time = 0
431 self.phase1_updates_sent = 0
432 self.phase2_updates_sent = 0
433 self.updates_sent = 0
435 self.log_info = args.loglevel <= logging.INFO
436 self.log_debug = args.loglevel <= logging.DEBUG
438 Flags needed for the MessageGenerator performance optimization.
439 Calling logger methods each iteration even with proper log level set
440 slows down significantly the MessageGenerator performance.
441 Measured total generation time (1M updates, dry run, error log level):
442 - logging based on basic logger features: 36,2s
443 - logging based on advanced logger features (lazy logging): 21,2s
444 - conditional calling of logger methods enclosed inside condition: 8,6s
447 logger.info("Generator initialisation")
448 logger.info(" Target total number of prefixes to be introduced: " +
449 str(self.total_prefix_amount))
450 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
451 str(self.prefix_length_default))
452 logger.info(" My Autonomous System number: " +
453 str(self.my_autonomous_system_default))
454 logger.info(" My Hold Time: " + str(self.hold_time_default))
455 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
456 logger.info(" Next Hop: " + str(self.next_hop_default))
457 logger.info(" Originator ID: " + str(self.originator_id_default))
458 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
459 logger.info(" Prefix count to be inserted at once: " +
460 str(self.prefix_count_to_add_default))
461 logger.info(" Prefix count to be withdrawn at once: " +
462 str(self.prefix_count_to_del_default))
463 logger.info(" Fast pre-fill up to " +
464 str(self.total_prefix_amount -
465 self.remaining_prefixes_threshold) + " prefixes")
466 logger.info(" Remaining number of prefixes to be processed " +
467 "in parallel with withdrawals: " +
468 str(self.remaining_prefixes_threshold))
469 logger.debug(" Prefix index range used after pre-fill procedure [" +
470 str(self.randomize_lowest_default) + ", " +
471 str(self.randomize_highest_default) + "]")
472 if self.single_update_default:
473 logger.info(" Common single UPDATE will be generated " +
474 "for both NLRI & WITHDRAWN lists")
476 logger.info(" Two separate UPDATEs will be generated " +
477 "for each NLRI & WITHDRAWN lists")
478 if self.randomize_updates_default:
479 logger.info(" Generation of UPDATE messages will be randomized")
480 logger.info(" Let\'s go ...\n")
482 # TODO: Notification for hold timer expiration can be handy.
484 def store_results(self, file_name=None, threshold=None):
485 """ Stores specified results into files based on file_name value.
488 :param file_name: Trailing (common) part of result file names
489 :param threshold: Minimum number of sent updates needed for each
490 result to be included into result csv file
491 (mainly needed because of the result accuracy)
495 # default values handling
496 # TODO optimize default values handling (use e.g. dicionary.update() approach)
497 if file_name is None:
498 file_name = self.results_file_name_default
499 if threshold is None:
500 threshold = self.performance_threshold_default
501 # performance calculation
502 if self.phase1_updates_sent >= threshold:
503 totals1 = self.phase1_updates_sent
504 performance1 = int(self.phase1_updates_sent /
505 (self.phase1_stop_time - self.phase1_start_time))
509 if self.phase2_updates_sent >= threshold:
510 totals2 = self.phase2_updates_sent
511 performance2 = int(self.phase2_updates_sent /
512 (self.phase2_stop_time - self.phase2_start_time))
517 logger.info("#" * 10 + " Final results " + "#" * 10)
518 logger.info("Number of iterations: " + str(self.iteration))
519 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
520 str(self.phase1_updates_sent))
521 logger.info("The pre-fill phase duration: " +
522 str(self.phase1_stop_time - self.phase1_start_time) + "s")
523 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
524 str(self.phase2_updates_sent))
525 logger.info("The 2nd test phase duration: " +
526 str(self.phase2_stop_time - self.phase2_start_time) + "s")
527 logger.info("Threshold for performance reporting: " + str(threshold))
530 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
531 " route(s) per UPDATE")
532 if self.single_update_default:
533 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
534 "/-" + str(self.prefix_count_to_del_default) +
535 " routes per UPDATE")
537 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
538 "/-" + str(self.prefix_count_to_del_default) +
539 " routes in two UPDATEs")
540 # collecting capacity and performance results
543 if totals1 is not None:
544 totals[phase1_label] = totals1
545 performance[phase1_label] = performance1
546 if totals2 is not None:
547 totals[phase2_label] = totals2
548 performance[phase2_label] = performance2
549 self.write_results_to_file(totals, "totals-" + file_name)
550 self.write_results_to_file(performance, "performance-" + file_name)
552 def write_results_to_file(self, results, file_name):
553 """Writes results to the csv plot file consumable by Jenkins.
556 :param file_name: Name of the (csv) file to be created
562 f = open(file_name, "wt")
564 for key in sorted(results):
565 first_line += key + ", "
566 second_line += str(results[key]) + ", "
567 first_line = first_line[:-2]
568 second_line = second_line[:-2]
569 f.write(first_line + "\n")
570 f.write(second_line + "\n")
571 logger.info("Message generator performance results stored in " +
573 logger.info(" " + first_line)
574 logger.info(" " + second_line)
578 # Return pseudo-randomized (reproducible) index for selected range
579 def randomize_index(self, index, lowest=None, highest=None):
580 """Calculates pseudo-randomized index from selected range.
583 :param index: input index
584 :param lowest: the lowes index from the randomized area
585 :param highest: the highest index from the randomized area
587 :return: the (pseudo)randomized index
589 Created just as a fame for future generator enhancement.
591 # default values handling
592 # TODO optimize default values handling (use e.g. dicionary.update() approach)
594 lowest = self.randomize_lowest_default
596 highest = self.randomize_highest_default
598 if (index >= lowest) and (index <= highest):
599 # we are in the randomized range -> shuffle it inside
600 # the range (now just reverse the order)
601 new_index = highest - (index - lowest)
603 # we are out of the randomized range -> nothing to do
607 def get_ls_nlri_values(self, index):
608 """Generates LS-NLRI parameters.
609 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
612 :param index: index (iteration)
614 :return: dictionary of LS NLRI parameters and values
616 # generating list of LS NLRI parameters
617 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
618 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
619 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
620 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
621 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
622 ls_nlri_values = {"Identifier": identifier,
623 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
624 "TunnelID": tunnel_id, "LSPID": lsp_id,
625 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
626 return ls_nlri_values
628 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
629 prefix_len=None, prefix_count=None, randomize=None):
630 """Generates list of IP address prefixes.
633 :param slot_index: index of group of prefix addresses
634 :param slot_size: size of group of prefix addresses
635 in [number of included prefixes]
636 :param prefix_base: IP address of the first prefix
637 (slot_index = 0, prefix_index = 0)
638 :param prefix_len: length of the prefix in bites
639 (the same as size of netmask)
640 :param prefix_count: number of prefixes to be returned
641 from the specified slot
643 :return: list of generated IP address prefixes
645 # default values handling
646 # TODO optimize default values handling (use e.g. dicionary.update() approach)
647 if slot_size is None:
648 slot_size = self.slot_size_default
649 if prefix_base is None:
650 prefix_base = self.prefix_base_default
651 if prefix_len is None:
652 prefix_len = self.prefix_length_default
653 if prefix_count is None:
654 prefix_count = slot_size
655 if randomize is None:
656 randomize = self.randomize_updates_default
657 # generating list of prefixes
660 prefix_gap = 2 ** (32 - prefix_len)
661 for i in range(prefix_count):
662 prefix_index = slot_index * slot_size + i
664 prefix_index = self.randomize_index(prefix_index)
665 indexes.append(prefix_index)
666 prefixes.append(prefix_base + prefix_index * prefix_gap)
668 logger.debug(" Prefix slot index: " + str(slot_index))
669 logger.debug(" Prefix slot size: " + str(slot_size))
670 logger.debug(" Prefix count: " + str(prefix_count))
671 logger.debug(" Prefix indexes: " + str(indexes))
672 logger.debug(" Prefix list: " + str(prefixes))
675 def compose_update_message(self, prefix_count_to_add=None,
676 prefix_count_to_del=None):
677 """Composes an UPDATE message
680 :param prefix_count_to_add: # of prefixes to put into NLRI list
681 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
683 :return: encoded UPDATE message in HEX
685 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
686 lists or common message wich includes both prefix lists.
687 Updates global counters.
689 # default values handling
690 # TODO optimize default values handling (use e.g. dicionary.update() approach)
691 if prefix_count_to_add is None:
692 prefix_count_to_add = self.prefix_count_to_add_default
693 if prefix_count_to_del is None:
694 prefix_count_to_del = self.prefix_count_to_del_default
696 if self.log_info and not (self.iteration % 1000):
697 logger.info("Iteration: " + str(self.iteration) +
698 " - total remaining prefixes: " +
699 str(self.remaining_prefixes))
701 logger.debug("#" * 10 + " Iteration: " +
702 str(self.iteration) + " " + "#" * 10)
703 logger.debug("Remaining prefixes: " +
704 str(self.remaining_prefixes))
705 # scenario type & one-shot counter
706 straightforward_scenario = (self.remaining_prefixes >
707 self.remaining_prefixes_threshold)
708 if straightforward_scenario:
709 prefix_count_to_del = 0
711 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
712 if not self.phase1_start_time:
713 self.phase1_start_time = time.time()
716 logger.debug("--- COMBINED SCENARIO ---")
717 if not self.phase2_start_time:
718 self.phase2_start_time = time.time()
719 # tailor the number of prefixes if needed
720 prefix_count_to_add = (
722 + min(prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes)
724 # prefix slots selection for insertion and withdrawal
725 slot_index_to_add = self.iteration
726 slot_index_to_del = slot_index_to_add - self.slot_gap_default
727 # getting lists of prefixes for insertion in this iteration
729 logger.debug("Prefixes to be inserted in this iteration:")
730 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
731 prefix_count=prefix_count_to_add)
732 # getting lists of prefixes for withdrawal in this iteration
734 logger.debug("Prefixes to be withdrawn in this iteration:")
735 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
736 prefix_count=prefix_count_to_del)
737 # generating the UPDATE mesage with LS-NLRI only
739 ls_nlri = self.get_ls_nlri_values(self.iteration)
740 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
743 # generating the UPDATE message with prefix lists
744 if self.single_update_default:
745 # Send prefixes to be introduced and withdrawn
746 # in one UPDATE message
747 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
748 nlri_prefixes=prefix_list_to_add)
750 # Send prefixes to be introduced and withdrawn
751 # in separate UPDATE messages (if needed)
752 msg_out = self.update_message(wr_prefixes=[],
753 nlri_prefixes=prefix_list_to_add)
754 if prefix_count_to_del:
755 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
757 # updating counters - who knows ... maybe I am last time here ;)
758 if straightforward_scenario:
759 self.phase1_stop_time = time.time()
760 self.phase1_updates_sent = self.updates_sent
762 self.phase2_stop_time = time.time()
763 self.phase2_updates_sent = (self.updates_sent -
764 self.phase1_updates_sent)
765 # updating totals for the next iteration
767 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
768 # returning the encoded message
771 # Section of message encoders
773 def open_message(self, version=None, my_autonomous_system=None,
774 hold_time=None, bgp_identifier=None):
775 """Generates an OPEN Message (rfc4271#section-4.2)
778 :param version: see the rfc4271#section-4.2
779 :param my_autonomous_system: see the rfc4271#section-4.2
780 :param hold_time: see the rfc4271#section-4.2
781 :param bgp_identifier: see the rfc4271#section-4.2
783 :return: encoded OPEN message in HEX
786 # default values handling
787 # TODO optimize default values handling (use e.g. dicionary.update() approach)
789 version = self.version_default
790 if my_autonomous_system is None:
791 my_autonomous_system = self.my_autonomous_system_default
792 if hold_time is None:
793 hold_time = self.hold_time_default
794 if bgp_identifier is None:
795 bgp_identifier = self.bgp_identifier_default
798 marker_hex = "\xFF" * 16
802 type_hex = struct.pack("B", type)
805 version_hex = struct.pack("B", version)
807 # my_autonomous_system
808 # AS_TRANS value, 23456 decadic.
809 my_autonomous_system_2_bytes = 23456
810 # AS number is mappable to 2 bytes
811 if my_autonomous_system < 65536:
812 my_autonomous_system_2_bytes = my_autonomous_system
813 my_autonomous_system_hex_2_bytes = struct.pack(">H",
814 my_autonomous_system)
817 hold_time_hex = struct.pack(">H", hold_time)
820 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
822 # Optional Parameters
823 optional_parameters_hex = ""
824 if self.rfc4760 or self.allf:
825 optional_parameter_hex = (
826 "\x02" # Param type ("Capability Ad")
827 "\x06" # Length (6 bytes)
828 "\x01" # Capability type (NLRI Unicast),
829 # see RFC 4760, secton 8
830 "\x04" # Capability value length
831 "\x00\x01" # AFI (Ipv4)
833 "\x01" # SAFI (Unicast)
835 optional_parameters_hex += optional_parameter_hex
837 if self.ipv6 or self.allf:
838 optional_parameter_hex = (
839 "\x02" # Param type ("Capability Ad")
840 "\x06" # Length (6 bytes)
841 "\x01" # Multiprotocol extetension capability,
842 "\x04" # Capability value length
843 "\x00\x02" # AFI (IPV6)
845 "\x01" # SAFI (UNICAST)
847 optional_parameters_hex += optional_parameter_hex
849 if self.bgpls or self.allf:
850 optional_parameter_hex = (
851 "\x02" # Param type ("Capability Ad")
852 "\x06" # Length (6 bytes)
853 "\x01" # Capability type (NLRI Unicast),
854 # see RFC 4760, secton 8
855 "\x04" # Capability value length
856 "\x40\x04" # AFI (BGP-LS)
858 "\x47" # SAFI (BGP-LS)
860 optional_parameters_hex += optional_parameter_hex
862 if self.evpn or self.allf:
863 optional_parameter_hex = (
864 "\x02" # Param type ("Capability Ad")
865 "\x06" # Length (6 bytes)
866 "\x01" # Multiprotocol extetension capability,
867 "\x04" # Capability value length
868 "\x00\x19" # AFI (L2-VPN)
872 optional_parameters_hex += optional_parameter_hex
874 if self.mvpn or self.allf:
875 optional_parameter_hex = (
876 "\x02" # Param type ("Capability Ad")
877 "\x06" # Length (6 bytes)
878 "\x01" # Multiprotocol extetension capability,
879 "\x04" # Capability value length
880 "\x00\x01" # AFI (IPV4)
882 "\x05" # SAFI (MCAST-VPN)
884 optional_parameters_hex += optional_parameter_hex
885 optional_parameter_hex = (
886 "\x02" # Param type ("Capability Ad")
887 "\x06" # Length (6 bytes)
888 "\x01" # Multiprotocol extetension capability,
889 "\x04" # Capability value length
890 "\x00\x02" # AFI (IPV6)
892 "\x05" # SAFI (MCAST-VPN)
894 optional_parameters_hex += optional_parameter_hex
896 if self.l3vpn_mcast or self.allf:
897 optional_parameter_hex = (
898 "\x02" # Param type ("Capability Ad")
899 "\x06" # Length (6 bytes)
900 "\x01" # Multiprotocol extetension capability,
901 "\x04" # Capability value length
902 "\x00\x01" # AFI (IPV4)
904 "\x81" # SAFI (L3VPN-MCAST)
906 optional_parameters_hex += optional_parameter_hex
907 optional_parameter_hex = (
908 "\x02" # Param type ("Capability Ad")
909 "\x06" # Length (6 bytes)
910 "\x01" # Multiprotocol extetension capability,
911 "\x04" # Capability value length
912 "\x00\x02" # AFI (IPV6)
914 "\x81" # SAFI (L3VPN-MCAST)
916 optional_parameters_hex += optional_parameter_hex
918 if self.l3vpn or self.allf:
919 optional_parameter_hex = (
920 "\x02" # Param type ("Capability Ad")
921 "\x06" # Length (6 bytes)
922 "\x01" # Multiprotocol extetension capability,
923 "\x04" # Capability value length
924 "\x00\x01" # AFI (IPV4)
926 "\x80" # SAFI (L3VPN-UNICAST)
928 optional_parameters_hex += optional_parameter_hex
929 optional_parameter_hex = (
930 "\x02" # Param type ("Capability Ad")
931 "\x06" # Length (6 bytes)
932 "\x01" # Multiprotocol extetension capability,
933 "\x04" # Capability value length
934 "\x00\x02" # AFI (IPV6)
936 "\x80" # SAFI (L3VPN-UNICAST)
938 optional_parameters_hex += optional_parameter_hex
940 if self.rt_constrain or self.allf:
941 optional_parameter_hex = (
942 "\x02" # Param type ("Capability Ad")
943 "\x06" # Length (6 bytes)
944 "\x01" # Multiprotocol extetension capability,
945 "\x04" # Capability value length
946 "\x00\x01" # AFI (IPV4)
948 "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
950 optional_parameters_hex += optional_parameter_hex
952 optional_parameter_hex = (
953 "\x02" # Param type ("Capability Ad")
954 "\x06" # Length (6 bytes)
955 "\x41" # "32 bit AS Numbers Support"
956 # (see RFC 6793, section 3)
957 "\x04" # Capability value length
959 optional_parameter_hex += (
960 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
962 optional_parameters_hex += optional_parameter_hex
965 b = list(bin(self.grace)[2:])
966 b = b + [0] * (3 - len(b))
969 restart_flag = "\x80\x05"
971 restart_flag = "\x00\x05"
977 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
981 logger.debug("Grace parameters list: {}".format(b))
982 # "\x02" Param type ("Capability Ad")
983 # :param length: Length of whole message
984 # "\x40" Graceful-restart capability
985 # "\x06" Length (6 bytes)
986 # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
987 # "\x05" Restart timer (5sec)
988 # "\x00\x01" AFI (IPV4)
989 # "\x01" SAFI (Unicast)
990 # "\x00" Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
991 # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
992 # ll-gr turned on when grace is between 4-7
993 optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
994 length, restart_flag, ipv4_flag, ll_gr)
995 optional_parameters_hex += optional_parameter_hex
997 # Optional Parameters Length
998 optional_parameters_length = len(optional_parameters_hex)
999 optional_parameters_length_hex = struct.pack("B",
1000 optional_parameters_length)
1002 # Length (big-endian)
1004 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
1005 len(my_autonomous_system_hex_2_bytes) +
1006 len(hold_time_hex) + len(bgp_identifier_hex) +
1007 len(optional_parameters_length_hex) +
1008 len(optional_parameters_hex)
1010 length_hex = struct.pack(">H", length)
1018 my_autonomous_system_hex_2_bytes +
1020 bgp_identifier_hex +
1021 optional_parameters_length_hex +
1022 optional_parameters_hex
1026 logger.debug("OPEN message encoding")
1027 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1028 logger.debug(" Length=" + str(length) + " (0x" +
1029 binascii.hexlify(length_hex) + ")")
1030 logger.debug(" Type=" + str(type) + " (0x" +
1031 binascii.hexlify(type_hex) + ")")
1032 logger.debug(" Version=" + str(version) + " (0x" +
1033 binascii.hexlify(version_hex) + ")")
1034 logger.debug(" My Autonomous System=" +
1035 str(my_autonomous_system_2_bytes) + " (0x" +
1036 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
1038 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
1039 binascii.hexlify(hold_time_hex) + ")")
1040 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
1041 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
1042 logger.debug(" Optional Parameters Length=" +
1043 str(optional_parameters_length) + " (0x" +
1044 binascii.hexlify(optional_parameters_length_hex) +
1046 logger.debug(" Optional Parameters=0x" +
1047 binascii.hexlify(optional_parameters_hex))
1048 logger.debug("OPEN message encoded: 0x%s",
1049 binascii.b2a_hex(message_hex))
1053 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
1054 wr_prefix_length=None, nlri_prefix_length=None,
1055 my_autonomous_system=None, next_hop=None,
1056 originator_id=None, cluster_list_item=None,
1057 end_of_rib=False, **ls_nlri_params):
1058 """Generates an UPDATE Message (rfc4271#section-4.3)
1061 :param wr_prefixes: see the rfc4271#section-4.3
1062 :param nlri_prefixes: see the rfc4271#section-4.3
1063 :param wr_prefix_length: see the rfc4271#section-4.3
1064 :param nlri_prefix_length: see the rfc4271#section-4.3
1065 :param my_autonomous_system: see the rfc4271#section-4.3
1066 :param next_hop: see the rfc4271#section-4.3
1068 :return: encoded UPDATE message in HEX
1071 # default values handling
1072 # TODO optimize default values handling (use e.g. dicionary.update() approach)
1073 if wr_prefixes is None:
1074 wr_prefixes = self.wr_prefixes_default
1075 if nlri_prefixes is None:
1076 nlri_prefixes = self.nlri_prefixes_default
1077 if wr_prefix_length is None:
1078 wr_prefix_length = self.prefix_length_default
1079 if nlri_prefix_length is None:
1080 nlri_prefix_length = self.prefix_length_default
1081 if my_autonomous_system is None:
1082 my_autonomous_system = self.my_autonomous_system_default
1083 if next_hop is None:
1084 next_hop = self.next_hop_default
1085 if originator_id is None:
1086 originator_id = self.originator_id_default
1087 if cluster_list_item is None:
1088 cluster_list_item = self.cluster_list_item_default
1089 ls_nlri = self.ls_nlri_default.copy()
1090 ls_nlri.update(ls_nlri_params)
1093 marker_hex = "\xFF" * 16
1097 type_hex = struct.pack("B", type)
1100 withdrawn_routes_hex = ""
1102 bytes = ((wr_prefix_length - 1) / 8) + 1
1103 for prefix in wr_prefixes:
1104 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1105 struct.pack(">I", int(prefix))[:bytes])
1106 withdrawn_routes_hex += withdrawn_route_hex
1108 # Withdrawn Routes Length
1109 withdrawn_routes_length = len(withdrawn_routes_hex)
1110 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1112 # TODO: to replace hardcoded string by encoding?
1114 path_attributes_hex = ""
1115 if not self.skipattr:
1116 path_attributes_hex += (
1117 "\x40" # Flags ("Well-Known")
1118 "\x01" # Type (ORIGIN)
1120 "\x00" # Origin: IGP
1122 path_attributes_hex += (
1123 "\x40" # Flags ("Well-Known")
1124 "\x02" # Type (AS_PATH)
1126 "\x02" # AS segment type (AS_SEQUENCE)
1127 "\x01" # AS segment length (1)
1129 my_as_hex = struct.pack(">I", my_autonomous_system)
1130 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1131 path_attributes_hex += (
1132 "\x40" # Flags ("Well-Known")
1133 "\x05" # Type (LOCAL_PREF)
1135 "\x00\x00\x00\x64" # (100)
1137 if nlri_prefixes != []:
1138 path_attributes_hex += (
1139 "\x40" # Flags ("Well-Known")
1140 "\x03" # Type (NEXT_HOP)
1143 next_hop_hex = struct.pack(">I", int(next_hop))
1144 path_attributes_hex += (
1145 next_hop_hex # IP address of the next hop (4 bytes)
1147 if originator_id is not None:
1148 path_attributes_hex += (
1149 "\x80" # Flags ("Optional, non-transitive")
1150 "\x09" # Type (ORIGINATOR_ID)
1152 ) # ORIGINATOR_ID (4 bytes)
1153 path_attributes_hex += struct.pack(">I", int(originator_id))
1154 if cluster_list_item is not None:
1155 path_attributes_hex += (
1156 "\x80" # Flags ("Optional, non-transitive")
1157 "\x0a" # Type (CLUSTER_LIST)
1159 ) # one CLUSTER_LIST item (4 bytes)
1160 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1162 if self.bgpls and not end_of_rib:
1163 path_attributes_hex += (
1164 "\x80" # Flags ("Optional, non-transitive")
1165 "\x0e" # Type (MP_REACH_NLRI)
1166 "\x22" # Length (34)
1167 "\x40\x04" # AFI (BGP-LS)
1168 "\x47" # SAFI (BGP-LS)
1169 "\x04" # Next Hop Length (4)
1171 path_attributes_hex += struct.pack(">I", int(next_hop))
1172 path_attributes_hex += "\x00" # Reserved
1173 path_attributes_hex += (
1174 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1175 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1176 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1178 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1179 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1180 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1181 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1182 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1184 # Total Path Attributes Length
1185 total_path_attributes_length = len(path_attributes_hex)
1186 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1188 # Network Layer Reachability Information
1191 bytes = ((nlri_prefix_length - 1) / 8) + 1
1192 for prefix in nlri_prefixes:
1193 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1194 struct.pack(">I", int(prefix))[:bytes])
1195 nlri_hex += nlri_prefix_hex
1197 # Length (big-endian)
1199 len(marker_hex) + 2 + len(type_hex) +
1200 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1201 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1203 length_hex = struct.pack(">H", length)
1210 withdrawn_routes_length_hex +
1211 withdrawn_routes_hex +
1212 total_path_attributes_length_hex +
1213 path_attributes_hex +
1217 if self.grace != 8 and self.grace != 0 and end_of_rib:
1218 message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
1221 logger.debug("UPDATE message encoding")
1222 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1223 logger.debug(" Length=" + str(length) + " (0x" +
1224 binascii.hexlify(length_hex) + ")")
1225 logger.debug(" Type=" + str(type) + " (0x" +
1226 binascii.hexlify(type_hex) + ")")
1227 logger.debug(" withdrawn_routes_length=" +
1228 str(withdrawn_routes_length) + " (0x" +
1229 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1230 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1231 str(wr_prefix_length) + " (0x" +
1232 binascii.hexlify(withdrawn_routes_hex) + ")")
1233 if total_path_attributes_length:
1234 logger.debug(" Total Path Attributes Length=" +
1235 str(total_path_attributes_length) + " (0x" +
1236 binascii.hexlify(total_path_attributes_length_hex) + ")")
1237 logger.debug(" Path Attributes=" + "(0x" +
1238 binascii.hexlify(path_attributes_hex) + ")")
1239 logger.debug(" Origin=IGP")
1240 logger.debug(" AS path=" + str(my_autonomous_system))
1241 logger.debug(" Next hop=" + str(next_hop))
1242 if originator_id is not None:
1243 logger.debug(" Originator id=" + str(originator_id))
1244 if cluster_list_item is not None:
1245 logger.debug(" Cluster list=" + str(cluster_list_item))
1247 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1248 logger.debug(" Network Layer Reachability Information=" +
1249 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1250 " (0x" + binascii.hexlify(nlri_hex) + ")")
1251 logger.debug("UPDATE message encoded: 0x" +
1252 binascii.b2a_hex(message_hex))
1255 self.updates_sent += 1
1256 # returning encoded message
1259 def notification_message(self, error_code, error_subcode, data_hex=""):
1260 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1263 :param error_code: see the rfc4271#section-4.5
1264 :param error_subcode: see the rfc4271#section-4.5
1265 :param data_hex: see the rfc4271#section-4.5
1267 :return: encoded NOTIFICATION message in HEX
1271 marker_hex = "\xFF" * 16
1275 type_hex = struct.pack("B", type)
1278 error_code_hex = struct.pack("B", error_code)
1281 error_subcode_hex = struct.pack("B", error_subcode)
1283 # Length (big-endian)
1284 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1285 len(error_subcode_hex) + len(data_hex))
1286 length_hex = struct.pack(">H", length)
1288 # NOTIFICATION Message
1299 logger.debug("NOTIFICATION message encoding")
1300 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1301 logger.debug(" Length=" + str(length) + " (0x" +
1302 binascii.hexlify(length_hex) + ")")
1303 logger.debug(" Type=" + str(type) + " (0x" +
1304 binascii.hexlify(type_hex) + ")")
1305 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1306 binascii.hexlify(error_code_hex) + ")")
1307 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1308 binascii.hexlify(error_subcode_hex) + ")")
1309 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1310 logger.debug("NOTIFICATION message encoded: 0x%s",
1311 binascii.b2a_hex(message_hex))
1315 def keepalive_message(self):
1316 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1319 :return: encoded KEEP ALIVE message in HEX
1323 marker_hex = "\xFF" * 16
1327 type_hex = struct.pack("B", type)
1329 # Length (big-endian)
1330 length = len(marker_hex) + 2 + len(type_hex)
1331 length_hex = struct.pack(">H", length)
1333 # KEEP ALIVE Message
1341 logger.debug("KEEP ALIVE message encoding")
1342 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1343 logger.debug(" Length=" + str(length) + " (0x" +
1344 binascii.hexlify(length_hex) + ")")
1345 logger.debug(" Type=" + str(type) + " (0x" +
1346 binascii.hexlify(type_hex) + ")")
1347 logger.debug("KEEP ALIVE message encoded: 0x%s",
1348 binascii.b2a_hex(message_hex))
1353 class TimeTracker(object):
1354 """Class for tracking timers, both for my keepalives and
1358 def __init__(self, msg_in):
1359 """Initialisation. based on defaults and OPEN message from peer.
1362 msg_in: the OPEN message received from peer.
1364 # Note: Relative time is always named timedelta, to stress that
1365 # the (non-delta) time is absolute.
1366 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1367 # Upper bound for being stuck in the same state, we should
1368 # at least report something before continuing.
1369 # Negotiate the hold timer by taking the smaller
1370 # of the 2 values (mine and the peer's).
1371 hold_timedelta = 180 # Not an attribute of self yet.
1372 # TODO: Make the default value configurable,
1373 # default value could mirror what peer said.
1374 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1375 if hold_timedelta > peer_hold_timedelta:
1376 hold_timedelta = peer_hold_timedelta
1377 if hold_timedelta != 0 and hold_timedelta < 3:
1378 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1379 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1380 self.hold_timedelta = hold_timedelta
1381 # If we do not hear from peer this long, we assume it has died.
1382 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1383 # Upper limit for duration between messages, to avoid being
1384 # declared to be dead.
1385 # The same as calling snapshot(), but also declares a field.
1386 self.snapshot_time = time.time()
1387 # Sometimes we need to store time. This is where to get
1388 # the value from afterwards. Time_keepalive may be too strict.
1389 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1390 # At this time point, peer will be declared dead.
1391 self.my_keepalive_time = None # to be set later
1392 # At this point, we should be sending keepalive message.
1395 """Store current time in instance data to use later."""
1396 # Read as time before something interesting was called.
1397 self.snapshot_time = time.time()
1399 def reset_peer_hold_time(self):
1400 """Move hold time to future as peer has just proven it still lives."""
1401 self.peer_hold_time = time.time() + self.hold_timedelta
1403 # Some methods could rely on self.snapshot_time, but it is better
1404 # to require user to provide it explicitly.
1405 def reset_my_keepalive_time(self, keepalive_time):
1406 """Calculate and set the next my KEEP ALIVE timeout time
1409 :keepalive_time: the initial value of the KEEP ALIVE timer
1411 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1413 def is_time_for_my_keepalive(self):
1414 """Check for my KEEP ALIVE timeout occurence"""
1415 if self.hold_timedelta == 0:
1417 return self.snapshot_time >= self.my_keepalive_time
1419 def get_next_event_time(self):
1420 """Set the time of the next expected or to be sent KEEP ALIVE"""
1421 if self.hold_timedelta == 0:
1422 return self.snapshot_time + 86400
1423 return min(self.my_keepalive_time, self.peer_hold_time)
1425 def check_peer_hold_time(self, snapshot_time):
1426 """Raise error if nothing was read from peer until specified time."""
1427 # Hold time = 0 means keepalive checking off.
1428 if self.hold_timedelta != 0:
1429 # time.time() may be too strict
1430 if snapshot_time > self.peer_hold_time:
1431 logger.error("Peer has overstepped the hold timer.")
1432 raise RuntimeError("Peer has overstepped the hold timer.")
1433 # TODO: Include hold_timedelta?
1434 # TODO: Add notification sending (attempt). That means
1435 # move to write tracker.
1438 class ReadTracker(object):
1439 """Class for tracking read of mesages chunk by chunk and
1443 def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1444 l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1445 ipv6=False, grace=8, wait_for_read=10):
1446 """The reader initialisation.
1449 bgp_socket: socket to be used for sending
1450 timer: timer to be used for scheduling
1451 storage: thread safe dict
1452 evpn: flag that evpn functionality is tested
1453 mvpn: flag that mvpn functionality is tested
1454 grace: flag that grace-restart functionality is tested
1455 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1456 l3vpn: flag that l3vpn unicast functionality is tested
1457 rt_constrain: flag that rt-constrain functionality is tested
1458 allf: flag for all family testing.
1460 # References to outside objects.
1461 self.socket = bgp_socket
1463 # BGP marker length plus length field length.
1464 self.header_length = 18
1465 # TODO: make it class (constant) attribute
1466 # Computation of where next chunk ends depends on whether
1467 # we are beyond length field.
1468 self.reading_header = True
1469 # Countdown towards next size computation.
1470 self.bytes_to_read = self.header_length
1471 # Incremental buffer for message under read.
1473 # Initialising counters
1474 self.updates_received = 0
1475 self.prefixes_introduced = 0
1476 self.prefixes_withdrawn = 0
1477 self.rx_idle_time = 0
1478 self.rx_activity_detected = True
1479 self.storage = storage
1482 self.l3vpn_mcast = l3vpn_mcast
1484 self.rt_constrain = rt_constrain
1487 self.wfr = wait_for_read
1490 def read_message_chunk(self):
1491 """Read up to one message
1494 Currently it does not return anything.
1496 # TODO: We could return the whole message, currently not needed.
1497 # We assume the socket is readable.
1498 chunk_message = self.socket.recv(self.bytes_to_read)
1499 self.msg_in += chunk_message
1500 self.bytes_to_read -= len(chunk_message)
1501 # TODO: bytes_to_read < 0 is not possible, right?
1502 if not self.bytes_to_read:
1503 # Finished reading a logical block.
1504 if self.reading_header:
1505 # The logical block was a BGP header.
1506 # Now we know the size of the message.
1507 self.reading_header = False
1508 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1510 else: # We have finished reading the body of the message.
1511 # Peer has just proven it is still alive.
1512 self.timer.reset_peer_hold_time()
1513 # TODO: Do we want to count received messages?
1514 # This version ignores the received message.
1515 # TODO: Should we do validation and exit on anything
1516 # besides update or keepalive?
1517 # Prepare state for reading another message.
1518 message_type_hex = self.msg_in[self.header_length]
1519 if message_type_hex == "\x01":
1520 logger.info("OPEN message received: 0x%s",
1521 binascii.b2a_hex(self.msg_in))
1522 elif message_type_hex == "\x02":
1523 logger.debug("UPDATE message received: 0x%s",
1524 binascii.b2a_hex(self.msg_in))
1525 self.decode_update_message(self.msg_in)
1526 elif message_type_hex == "\x03":
1527 logger.info("NOTIFICATION message received: 0x%s",
1528 binascii.b2a_hex(self.msg_in))
1529 elif message_type_hex == "\x04":
1530 logger.info("KEEP ALIVE message received: 0x%s",
1531 binascii.b2a_hex(self.msg_in))
1533 logger.warning("Unexpected message received: 0x%s",
1534 binascii.b2a_hex(self.msg_in))
1536 self.reading_header = True
1537 self.bytes_to_read = self.header_length
1538 # We should not act upon peer_hold_time if we are reading
1539 # something right now.
1542 def decode_path_attributes(self, path_attributes_hex):
1543 """Decode the Path Attributes field (rfc4271#section-4.3)
1546 :path_attributes: path_attributes field to be decoded in hex
1550 hex_to_decode = path_attributes_hex
1552 while len(hex_to_decode):
1553 attr_flags_hex = hex_to_decode[0]
1554 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1555 # attr_optional_bit = attr_flags & 128
1556 # attr_transitive_bit = attr_flags & 64
1557 # attr_partial_bit = attr_flags & 32
1558 attr_extended_length_bit = attr_flags & 16
1560 attr_type_code_hex = hex_to_decode[1]
1561 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1563 if attr_extended_length_bit:
1564 attr_length_hex = hex_to_decode[2:4]
1565 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1566 attr_value_hex = hex_to_decode[4:4 + attr_length]
1567 hex_to_decode = hex_to_decode[4 + attr_length:]
1569 attr_length_hex = hex_to_decode[2]
1570 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1571 attr_value_hex = hex_to_decode[3:3 + attr_length]
1572 hex_to_decode = hex_to_decode[3 + attr_length:]
1574 if attr_type_code == 1:
1575 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1576 binascii.b2a_hex(attr_flags_hex))
1577 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1578 elif attr_type_code == 2:
1579 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1580 binascii.b2a_hex(attr_flags_hex))
1581 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1582 elif attr_type_code == 3:
1583 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1584 binascii.b2a_hex(attr_flags_hex))
1585 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1586 elif attr_type_code == 4:
1587 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1588 binascii.b2a_hex(attr_flags_hex))
1589 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1590 elif attr_type_code == 5:
1591 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1592 binascii.b2a_hex(attr_flags_hex))
1593 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1594 elif attr_type_code == 6:
1595 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1596 binascii.b2a_hex(attr_flags_hex))
1597 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1598 elif attr_type_code == 7:
1599 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1600 binascii.b2a_hex(attr_flags_hex))
1601 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1602 elif attr_type_code == 9: # rfc4456#section-8
1603 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1604 binascii.b2a_hex(attr_flags_hex))
1605 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1606 elif attr_type_code == 10: # rfc4456#section-8
1607 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1608 binascii.b2a_hex(attr_flags_hex))
1609 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1610 elif attr_type_code == 14: # rfc4760#section-3
1611 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1612 binascii.b2a_hex(attr_flags_hex))
1613 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1614 address_family_identifier_hex = attr_value_hex[0:2]
1615 logger.debug(" Address Family Identifier=0x%s",
1616 binascii.b2a_hex(address_family_identifier_hex))
1617 subsequent_address_family_identifier_hex = attr_value_hex[2]
1618 logger.debug(" Subsequent Address Family Identifier=0x%s",
1619 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1620 next_hop_netaddr_len_hex = attr_value_hex[3]
1621 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1622 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1623 next_hop_netaddr_len,
1624 binascii.b2a_hex(next_hop_netaddr_len_hex))
1625 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1626 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1627 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1628 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1629 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1630 logger.debug(" Reserved=0x%s",
1631 binascii.b2a_hex(reserved_hex))
1632 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1633 logger.debug(" Network Layer Reachability Information=0x%s",
1634 binascii.b2a_hex(nlri_hex))
1635 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1636 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1637 for prefix in nlri_prefix_list:
1638 logger.debug(" nlri_prefix_received: %s", prefix)
1639 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1640 elif attr_type_code == 15: # rfc4760#section-4
1641 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1642 binascii.b2a_hex(attr_flags_hex))
1643 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1644 address_family_identifier_hex = attr_value_hex[0:2]
1645 logger.debug(" Address Family Identifier=0x%s",
1646 binascii.b2a_hex(address_family_identifier_hex))
1647 subsequent_address_family_identifier_hex = attr_value_hex[2]
1648 logger.debug(" Subsequent Address Family Identifier=0x%s",
1649 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1650 wd_hex = attr_value_hex[3:]
1651 logger.debug(" Withdrawn Routes=0x%s",
1652 binascii.b2a_hex(wd_hex))
1653 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1654 logger.debug(" Withdrawn routes prefix list: %s",
1656 for prefix in wdr_prefix_list:
1657 logger.debug(" withdrawn_prefix_received: %s", prefix)
1658 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1660 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1661 binascii.b2a_hex(attr_flags_hex))
1662 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1665 def decode_update_message(self, msg):
1666 """Decode an UPDATE message (rfc4271#section-4.3)
1669 :msg: message to be decoded in hex
1673 logger.debug("Decoding update message:")
1674 # message header - marker
1675 marker_hex = msg[:16]
1676 logger.debug("Message header marker: 0x%s",
1677 binascii.b2a_hex(marker_hex))
1678 # message header - message length
1679 msg_length_hex = msg[16:18]
1680 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1681 logger.debug("Message lenght: 0x%s (%s)",
1682 binascii.b2a_hex(msg_length_hex), msg_length)
1683 # message header - message type
1684 msg_type_hex = msg[18:19]
1685 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1687 with self.storage as stor:
1688 # this will replace the previously stored message
1689 stor['update'] = binascii.hexlify(msg)
1691 logger.debug("Evpn {}".format(self.evpn))
1693 logger.debug("Skipping update decoding due to evpn data expected")
1696 logger.debug("Graceful-restart {}".format(self.grace))
1698 logger.debug("Skipping update decoding due to graceful-restart data expected")
1701 logger.debug("Mvpn {}".format(self.mvpn))
1703 logger.debug("Skipping update decoding due to mvpn data expected")
1706 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1707 if self.l3vpn_mcast:
1708 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1711 logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1712 if self.l3vpn_mcast:
1713 logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1716 logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1717 if self.rt_constrain:
1718 logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1721 logger.debug("Ipv6-Unicast {}".format(self.ipv6))
1723 logger.debug("Skipping update decoding due to Ipv6 data expected")
1726 logger.debug("Allf {}".format(self.allf))
1728 logger.debug("Skipping update decoding")
1732 logger.debug("Message type: 0x%s (update)",
1733 binascii.b2a_hex(msg_type_hex))
1734 # withdrawn routes length
1735 wdr_length_hex = msg[19:21]
1736 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1737 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1738 binascii.b2a_hex(wdr_length_hex), wdr_length)
1740 wdr_hex = msg[21:21 + wdr_length]
1741 logger.debug("Withdrawn routes: 0x%s",
1742 binascii.b2a_hex(wdr_hex))
1743 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1744 logger.debug("Withdrawn routes prefix list: %s",
1746 for prefix in wdr_prefix_list:
1747 logger.debug("withdrawn_prefix_received: %s", prefix)
1748 # total path attribute length
1749 total_pa_length_offset = 21 + wdr_length
1750 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1751 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1752 logger.debug("Total path attribute lenght: 0x%s (%s)",
1753 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1755 pa_offset = total_pa_length_offset + 2
1756 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1757 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1758 self.decode_path_attributes(pa_hex)
1759 # network layer reachability information length
1760 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1761 logger.debug("Calculated NLRI length: %s", nlri_length)
1762 # network layer reachability information
1763 nlri_offset = pa_offset + total_pa_length
1764 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1765 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1766 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1767 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1768 for prefix in nlri_prefix_list:
1769 logger.debug("nlri_prefix_received: %s", prefix)
1771 self.updates_received += 1
1772 self.prefixes_introduced += len(nlri_prefix_list)
1773 self.prefixes_withdrawn += len(wdr_prefix_list)
1775 logger.error("Unexpeced message type 0x%s in 0x%s",
1776 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1778 def wait_for_read(self):
1779 """Read message until timeout (next expected event).
1782 Used when no more updates has to be sent to avoid busy-wait.
1783 Currently it does not return anything.
1785 # Compute time to the first predictable state change
1786 event_time = self.timer.get_next_event_time()
1787 # snapshot_time would be imprecise
1788 wait_timedelta = min(event_time - time.time(), self.wfr)
1789 if wait_timedelta < 0:
1790 # The program got around to waiting to an event in "very near
1791 # future" so late that it became a "past" event, thus tell
1792 # "select" to not wait at all. Passing negative timedelta to
1793 # select() would lead to either waiting forever (for -1) or
1794 # select.error("Invalid parameter") (for everything else).
1796 # And wait for event or something to read.
1798 if not self.rx_activity_detected or not (self.updates_received % 100):
1799 # right time to write statistics to the log (not for every update and
1800 # not too frequently to avoid having large log files)
1801 logger.info("total_received_update_message_counter: %s",
1802 self.updates_received)
1803 logger.info("total_received_nlri_prefix_counter: %s",
1804 self.prefixes_introduced)
1805 logger.info("total_received_withdrawn_prefix_counter: %s",
1806 self.prefixes_withdrawn)
1808 start_time = time.time()
1809 select.select([self.socket], [], [self.socket], wait_timedelta)
1810 timedelta = time.time() - start_time
1811 self.rx_idle_time += timedelta
1812 self.rx_activity_detected = timedelta < 1
1814 if not self.rx_activity_detected or not (self.updates_received % 100):
1815 # right time to write statistics to the log (not for every update and
1816 # not too frequently to avoid having large log files)
1817 logger.info("... idle for %.3fs", timedelta)
1818 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1822 class WriteTracker(object):
1823 """Class tracking enqueueing messages and sending chunks of them."""
1825 def __init__(self, bgp_socket, generator, timer):
1826 """The writter initialisation.
1829 bgp_socket: socket to be used for sending
1830 generator: generator to be used for message generation
1831 timer: timer to be used for scheduling
1833 # References to outside objects,
1834 self.socket = bgp_socket
1835 self.generator = generator
1837 # Really new fields.
1838 # TODO: Would attribute docstrings add anything substantial?
1839 self.sending_message = False
1840 self.bytes_to_send = 0
1843 def enqueue_message_for_sending(self, message):
1844 """Enqueue message and change state.
1847 message: message to be enqueued into the msg_out buffer
1849 self.msg_out += message
1850 self.bytes_to_send += len(message)
1851 self.sending_message = True
1853 def send_message_chunk_is_whole(self):
1854 """Send enqueued data from msg_out buffer
1857 :return: true if no remaining data to send
1859 # We assume there is a msg_out to send and socket is writable.
1860 self.timer.snapshot()
1861 bytes_sent = self.socket.send(self.msg_out)
1862 # Forget the part of message that was sent.
1863 self.msg_out = self.msg_out[bytes_sent:]
1864 self.bytes_to_send -= bytes_sent
1865 if not self.bytes_to_send:
1866 # TODO: Is it possible to hit negative bytes_to_send?
1867 self.sending_message = False
1868 # We should have reset hold timer on peer side.
1869 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1870 # The possible reason for not prioritizing reads is gone.
1875 class StateTracker(object):
1876 """Main loop has state so complex it warrants this separate class."""
1878 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1879 """The state tracker initialisation.
1882 bgp_socket: socket to be used for sending / receiving
1883 generator: generator to be used for message generation
1884 timer: timer to be used for scheduling
1885 inqueue: user initiated messages queue
1886 storage: thread safe dict to store data for the rpc server
1887 cliargs: cli args from the user
1889 # References to outside objects.
1890 self.socket = bgp_socket
1891 self.generator = generator
1894 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1895 l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1896 rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
1897 wait_for_read=cliargs.wfr)
1898 self.writer = WriteTracker(bgp_socket, generator, timer)
1899 # Prioritization state.
1900 self.prioritize_writing = False
1901 # In general, we prioritize reading over writing. But in order
1902 # not to get blocked by neverending reads, we should
1903 # check whether we are not risking running out of holdtime.
1904 # So in some situations, this field is set to True to attempt
1905 # finishing sending a message, after which this field resets
1907 # TODO: Alternative is to switch fairly between reading and
1908 # writing (called round robin from now on).
1909 # Message counting is done in generator.
1910 self.inqueue = inqueue
1912 def perform_one_loop_iteration(self):
1913 """ The main loop iteration
1916 Calculates priority, resolves all conditions, calls
1917 appropriate method and returns to caller to repeat.
1919 self.timer.snapshot()
1920 if not self.prioritize_writing:
1921 if self.timer.is_time_for_my_keepalive():
1922 if not self.writer.sending_message:
1923 # We need to schedule a keepalive ASAP.
1924 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1925 logger.info("KEEP ALIVE is sent.")
1926 # We are sending a message now, so let's prioritize it.
1927 self.prioritize_writing = True
1930 msg = self.inqueue.get_nowait()
1931 logger.info("Received message: {}".format(msg))
1932 msgbin = binascii.unhexlify(msg)
1933 self.writer.enqueue_message_for_sending(msgbin)
1936 # Now we know what our priorities are, we have to check
1937 # which actions are available.
1938 # socket.socket() returns three lists,
1939 # we store them to list of lists.
1940 list_list = select.select([self.socket], [self.socket], [self.socket],
1941 self.timer.report_timedelta)
1942 read_list, write_list, except_list = list_list
1943 # Lists are unpacked, each is either [] or [self.socket],
1944 # so we will test them as boolean.
1946 logger.error("Exceptional state on the socket.")
1947 raise RuntimeError("Exceptional state on socket", self.socket)
1948 # We will do either read or write.
1949 if not (self.prioritize_writing and write_list):
1950 # Either we have no reason to rush writes,
1951 # or the socket is not writable.
1952 # We are focusing on reading here.
1953 if read_list: # there is something to read indeed
1954 # In this case we want to read chunk of message
1955 # and repeat the select,
1956 self.reader.read_message_chunk()
1958 # We were focusing on reading, but nothing to read was there.
1959 # Good time to check peer for hold timer.
1960 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1961 # Quiet on the read front, we can have attempt to write.
1963 # Either we really want to reset peer's view of our hold
1964 # timer, or there was nothing to read.
1965 # Were we in the middle of sending a message?
1966 if self.writer.sending_message:
1967 # Was it the end of a message?
1968 whole = self.writer.send_message_chunk_is_whole()
1969 # We were pressed to send something and we did it.
1970 if self.prioritize_writing and whole:
1971 # We prioritize reading again.
1972 self.prioritize_writing = False
1974 # Finally to check if still update messages to be generated.
1975 if self.generator.remaining_prefixes:
1976 msg_out = self.generator.compose_update_message()
1977 if not self.generator.remaining_prefixes:
1978 # We have just finished update generation,
1979 # end-of-rib is due.
1980 logger.info("All update messages generated.")
1981 logger.info("Storing performance results.")
1982 self.generator.store_results()
1983 logger.info("Finally an END-OF-RIB is sent.")
1984 msg_out += self.generator.update_message(wr_prefixes=[],
1987 self.writer.enqueue_message_for_sending(msg_out)
1988 # Attempt for real sending to be done in next iteration.
1990 # Nothing to write anymore.
1991 # To avoid busy loop, we do idle waiting here.
1992 self.reader.wait_for_read()
1994 # We can neither read nor write.
1995 logger.warning("Input and output both blocked for " +
1996 str(self.timer.report_timedelta) + " seconds.")
1997 # FIXME: Are we sure select has been really waiting
2002 def create_logger(loglevel, logfile):
2003 """Create logger object
2006 :loglevel: log level
2007 :logfile: log file name
2009 :return: logger object
2011 logger = logging.getLogger("logger")
2012 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
2013 console_handler = logging.StreamHandler()
2014 file_handler = logging.FileHandler(logfile, mode="w")
2015 console_handler.setFormatter(log_formatter)
2016 file_handler.setFormatter(log_formatter)
2017 logger.addHandler(console_handler)
2018 logger.addHandler(file_handler)
2019 logger.setLevel(loglevel)
2023 def job(arguments, inqueue, storage):
2024 """One time initialisation and iterations looping.
2026 Establish BGP connection and run iterations.
2029 :arguments: Command line arguments
2030 :inqueue: Data to be sent from play.py
2031 :storage: Shared dict for rpc server
2035 bgp_socket = establish_connection(arguments)
2036 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2037 # Receive open message before sending anything.
2038 # FIXME: Add parameter to send default open message first,
2039 # to work with "you first" peers.
2040 msg_in = read_open_message(bgp_socket)
2041 logger.info(binascii.hexlify(msg_in))
2042 storage['open'] = binascii.hexlify(msg_in)
2043 timer = TimeTracker(msg_in)
2044 generator = MessageGenerator(arguments)
2045 msg_out = generator.open_message()
2046 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2047 # Send our open message to the peer.
2048 bgp_socket.send(msg_out)
2049 # Wait for confirming keepalive.
2050 # TODO: Surely in just one packet?
2051 # Using exact keepalive length to not to see possible updates.
2052 msg_in = bgp_socket.recv(19)
2053 if msg_in != generator.keepalive_message():
2054 error_msg = "Open not confirmed by keepalive, instead got"
2055 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2056 raise MessageError(error_msg, msg_in)
2057 timer.reset_peer_hold_time()
2058 # Send the keepalive to indicate the connection is accepted.
2059 timer.snapshot() # Remember this time.
2060 msg_out = generator.keepalive_message()
2061 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2062 bgp_socket.send(msg_out)
2063 # Use the remembered time.
2064 timer.reset_my_keepalive_time(timer.snapshot_time)
2065 # End of initial handshake phase.
2066 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2067 while True: # main reactor loop
2068 state.perform_one_loop_iteration()
2072 '''Handler for SimpleXMLRPCServer'''
2074 def __init__(self, sendqueue, storage):
2078 :sendqueue: queue for data to be sent towards odl
2079 :storage: thread safe dict
2081 self.queue = sendqueue
2082 self.storage = storage
2084 def send(self, text):
2088 :text: hes string of the data to be sent
2090 self.queue.put(text)
2092 def get(self, text=''):
2093 '''Reads data form the storage
2095 - returns stored data or an empty string, at the moment only
2099 :text: a key to the storage to get the data
2103 with self.storage as stor:
2104 return stor.get(text, '')
2106 def clean(self, text=''):
2107 '''Cleans data form the storage
2110 :text: a key to the storage to clean the data
2112 with self.storage as stor:
2117 def threaded_job(arguments):
2118 """Run the job threaded
2121 :arguments: Command line arguments
2125 amount_left = arguments.amount
2126 utils_left = arguments.multiplicity
2127 prefix_current = arguments.firstprefix
2128 myip_current = arguments.myip
2129 port = arguments.port
2131 rpcqueue = Queue.Queue()
2132 storage = SafeDict()
2135 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
2136 amount_left -= amount_per_util
2139 args = deepcopy(arguments)
2140 args.amount = amount_per_util
2141 args.firstprefix = prefix_current
2142 args.myip = myip_current
2143 thread_args.append(args)
2147 prefix_current += amount_per_util * 16
2152 for t in thread_args:
2153 thread.start_new_thread(job, (t, rpcqueue, storage))
2155 print("Error: unable to start thread.")
2158 if arguments.usepeerip:
2159 ip = arguments.peerip
2162 rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2163 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2164 rpcserver.serve_forever()
2167 if __name__ == "__main__":
2168 arguments = parse_arguments()
2169 logger = create_logger(arguments.loglevel, arguments.logfile)
2170 threaded_job(arguments)