1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from xmlrpc.server import SimpleXMLRPCServer
28 __author__ = "Vratko Polak"
29 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
30 __license__ = "Eclipse Public License v1.0"
31 __email__ = "vrpolak@cisco.com"
35 """Thread safe dictionary
37 The object will serve as thread safe data storage.
38 It should be used with "with" statement.
41 def __init__(self, *p_arg, **n_arg):
42 super(SafeDict, self).__init__()
43 self._lock = threading.Lock()
49 def __exit__(self, type, value, traceback):
53 def parse_arguments():
54 """Use argparse to get arguments,
59 parser = argparse.ArgumentParser()
60 # TODO: Should we use --argument-names-with-spaces?
61 str_help = "Autonomous System number use in the stream."
62 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
63 # FIXME: We are acting as iBGP peer,
64 # we should mirror AS number from peer's open message.
65 str_help = "Amount of IP prefixes to generate. (negative means " "infinite" ")."
66 parser.add_argument("--amount", default="1", type=int, help=str_help)
67 str_help = "Rpc server port."
68 parser.add_argument("--port", default="8000", type=int, help=str_help)
69 str_help = "Maximum number of IP prefixes to be announced in one iteration"
70 parser.add_argument("--insert", default="1", type=int, help=str_help)
71 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
72 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
73 str_help = "The number of prefixes to process without withdrawals"
74 parser.add_argument("--prefill", default="0", type=int, help=str_help)
75 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77 "--updates", choices=["single", "separate"], default=["separate"], help=str_help
79 str_help = "Base prefix IP address for prefix generation"
81 "--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help
83 str_help = "The prefix length."
84 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
85 str_help = "Listen for connection, instead of initiating it."
86 parser.add_argument("--listen", action="store_true", help=str_help)
88 "Numeric IP Address to bind to and derive BGP ID from."
89 + "Default value only suitable for listening."
92 "--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help
95 "TCP port to bind to when listening or initiating connection."
96 + "Default only suitable for initiating."
98 parser.add_argument("--myport", default="0", type=int, help=str_help)
99 str_help = "The IP of the next hop to be placed into the update messages."
103 type=ipaddr.IPv4Address,
107 str_help = "Identifier of the route originator."
111 type=ipaddr.IPv4Address,
115 str_help = "Cluster list item identifier."
119 type=ipaddr.IPv4Address,
124 "Numeric IP Address to try to connect to."
125 + "Currently no effect in listening mode."
128 "--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help
130 str_help = "TCP port to try to connect to. No effect in listening mode."
131 parser.add_argument("--peerport", default="179", type=int, help=str_help)
132 str_help = "Local hold time."
133 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
134 str_help = "Log level (--error, --warning, --info, --debug)"
138 action="store_const",
140 default=logging.INFO,
146 action="store_const",
147 const=logging.WARNING,
148 default=logging.INFO,
154 action="store_const",
156 default=logging.INFO,
162 action="store_const",
164 default=logging.INFO,
167 str_help = "Log file name"
168 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
169 str_help = "Trailing part of the csv result files for plotting purposes"
170 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
171 str_help = "Minimum number of updates to reach to include result into csv."
172 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
173 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
174 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
175 str_help = "Using peerip instead of myip for xmlrpc server"
177 "--usepeerip", default=False, action="store_true", help=str_help
179 str_help = "Link-State NLRI supported"
180 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
181 str_help = "Link-State NLRI: Identifier"
182 parser.add_argument("-lsid", default="1", type=int, help=str_help)
183 str_help = "Link-State NLRI: Tunnel ID"
184 parser.add_argument("-lstid", default="1", type=int, help=str_help)
185 str_help = "Link-State NLRI: LSP ID"
186 parser.add_argument("-lspid", default="1", type=int, help=str_help)
187 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
189 "--lstsaddr", default="1.2.3.4", type=ipaddr.IPv4Address, help=str_help
191 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
193 "--lsteaddr", default="5.6.7.8", type=ipaddr.IPv4Address, help=str_help
195 str_help = "Link-State NLRI: Identifier Step"
196 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
197 str_help = "Link-State NLRI: Tunnel ID Step"
198 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
199 str_help = "Link-State NLRI: LSP ID Step"
200 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
201 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
202 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
203 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
204 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
205 str_help = "How many play utilities are to be started."
206 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
207 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
208 Enabling this flag makes the script not decoding the update mesage, because of not\
209 supported decoding for these elements."
210 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
211 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
212 Enabling this flag makes the script not decoding the update mesage, because of not\
213 supported decoding for these elements."
214 parser.add_argument("--grace", default="8", type=int, help=str_help)
215 str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
216 IPV4-Unicast, IPV6-Unicast, BGP-LS\
217 Enabling this flag makes the script not decoding the update mesage, because of not\
218 supported decoding for these elements."
219 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
220 str_help = "Open message includes L3VPN-MULTICAST arguments.\
221 Enabling this flag makes the script not decoding the update mesage, because of not\
222 supported decoding for these elements."
224 "--l3vpn_mcast", default=False, action="store_true", help=str_help
227 "Open message includes L3VPN-UNICAST arguments, without message decoding."
229 parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
230 str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
232 "--rt_constrain", default=False, action="store_true", help=str_help
234 str_help = "Open message includes ipv6-unicast family, without message decoding."
235 parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
236 str_help = "Add all supported families without message decoding."
237 parser.add_argument("--allf", default=False, action="store_true", help=str_help)
238 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
239 str_help = "Skipping well known attributes for update message"
240 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
241 arguments = parser.parse_args()
242 if arguments.multiplicity < 1:
243 print("Multiplicity", arguments.multiplicity, "is not positive.")
245 # TODO: Are sanity checks (such as asnumber>=0) required?
249 def establish_connection(arguments):
250 """Establish connection to BGP peer.
253 :arguments: following command-line arguments are used
254 - arguments.myip: local IP address
255 - arguments.myport: local port
256 - arguments.peerip: remote IP address
257 - arguments.peerport: remote port
262 logger.info("Connecting in the listening mode.")
263 logger.debug("Local IP address: " + str(arguments.myip))
264 logger.debug("Local port: " + str(arguments.myport))
265 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
267 # bind need single tuple as argument
268 listening_socket.bind((str(arguments.myip), arguments.myport))
269 listening_socket.listen(1)
270 bgp_socket, _ = listening_socket.accept()
271 # TODO: Verify client IP is cotroller IP.
272 listening_socket.close()
274 logger.info("Connecting in the talking mode.")
275 logger.debug("Local IP address: " + str(arguments.myip))
276 logger.debug("Local port: " + str(arguments.myport))
277 logger.debug("Remote IP address: " + str(arguments.peerip))
278 logger.debug("Remote port: " + str(arguments.peerport))
279 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
280 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
281 # bind to force specified address and port
282 talking_socket.bind((str(arguments.myip), arguments.myport))
283 # socket does not spead ipaddr, hence str()
284 talking_socket.connect((str(arguments.peerip), arguments.peerport))
285 bgp_socket = talking_socket
286 logger.info("Connected to ODL.")
290 def get_short_int_from_message(message, offset=16):
291 """Extract 2-bytes number from provided message.
294 :message: given message
295 :offset: offset of the short_int inside the message
297 :return: required short_inf value.
299 default offset value is the BGP message size offset.
301 high_byte_int = message[offset]
302 low_byte_int = message[offset + 1]
303 short_int = high_byte_int * 256 + low_byte_int
307 def get_prefix_list_from_hex(prefixes_hex):
308 """Get decoded list of prefixes (rfc4271#section-4.3)
311 :prefixes_hex: list of prefixes to be decoded in hex
313 :return: list of prefixes in the form of ip address (X.X.X.X/X)
317 while offset < len(prefixes_hex):
318 prefix_bit_len_hex = prefixes_hex[offset : offset + 1]
319 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
320 prefix_len = int((prefix_bit_len - 1) / 8) + 1
321 prefix_hex = prefixes_hex[offset + 1 : offset + 1 + prefix_len]
322 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
323 offset += 1 + prefix_len
324 prefix_list.append(prefix + "/" + str(prefix_bit_len))
328 class MessageError(ValueError):
329 """Value error with logging optimized for hexlified messages."""
331 def __init__(self, text, message, *args):
334 Store and call super init for textual comment,
335 store raw message which caused it.
339 super(MessageError, self).__init__(text, message, *args)
342 """Generate human readable error message.
345 :return: human readable message as string
347 Use a placeholder string if the message is to be empty.
349 message = binascii.hexlify(self.msg).decode()
351 message = "(empty message)"
352 return self.text + ": " + message
355 def read_open_message(bgp_socket):
356 """Receive peer's OPEN message
359 :bgp_socket: the socket to be read
361 :return: received OPEN message.
363 Performs just basic incomming message checks
365 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
366 # TODO: Can the incoming open message be split in more than one packet?
369 # 37 is minimal length of open message with 4-byte AS number.
371 "Message length (" + str(len(msg_in)) + ") is smaller than "
372 "minimal length of OPEN message with 4-byte AS number (37)"
374 logger.error(error_msg + ": " + binascii.hexlify(msg_in).decode())
375 raise MessageError(error_msg, msg_in)
376 # TODO: We could check BGP marker, but it is defined only later;
378 reported_length = get_short_int_from_message(msg_in)
379 if len(msg_in) != reported_length:
381 "Expected message length ("
383 + ") does not match actual length ("
387 logger.error(error_msg + binascii.hexlify(msg_in).decode())
388 raise MessageError(error_msg, msg_in)
389 logger.info("Open message received.")
393 class MessageGenerator(object):
394 """Class which generates messages, holds states and configuration values."""
396 # TODO: Define bgp marker as a class (constant) variable.
397 def __init__(self, args):
398 """Initialisation according to command-line args.
401 :args: argsparser's Namespace object which contains command-line
402 options for MesageGenerator initialisation
404 Calculates and stores default values used later on for
407 self.total_prefix_amount = args.amount
408 # Number of update messages left to be sent.
409 self.remaining_prefixes = self.total_prefix_amount
411 # New parameters initialisation
412 self.port = args.port
414 self.prefix_base_default = args.firstprefix
415 self.prefix_length_default = args.prefixlen
416 self.wr_prefixes_default = []
417 self.nlri_prefixes_default = []
418 self.version_default = 4
419 self.my_autonomous_system_default = args.asnumber
420 self.hold_time_default = args.holdtime # Local hold time.
421 self.bgp_identifier_default = int(args.myip)
422 self.next_hop_default = args.nexthop
423 self.originator_id_default = args.originator
424 self.cluster_list_item_default = args.cluster
425 self.single_update_default = args.updates == "single"
426 self.randomize_updates_default = args.updates == "random"
427 self.prefix_count_to_add_default = args.insert
428 self.prefix_count_to_del_default = args.withdraw
429 if self.prefix_count_to_del_default < 0:
430 self.prefix_count_to_del_default = 0
431 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
432 # total number of prefixes must grow to avoid infinite test loop
433 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
434 self.slot_size_default = self.prefix_count_to_add_default
435 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
436 self.results_file_name_default = args.results
437 self.performance_threshold_default = args.threshold
438 self.rfc4760 = args.rfc4760
439 self.bgpls = args.bgpls
440 self.evpn = args.evpn
441 self.mvpn = args.mvpn
442 self.l3vpn_mcast = args.l3vpn_mcast
443 self.l3vpn = args.l3vpn
444 self.rt_constrain = args.rt_constrain
445 self.ipv6 = args.ipv6
446 self.allf = args.allf
447 self.skipattr = args.skipattr
448 self.grace = args.grace
449 # Default values when BGP-LS Attributes are used
451 self.prefix_count_to_add_default = 1
452 self.prefix_count_to_del_default = 0
453 self.ls_nlri_default = {
454 "Identifier": args.lsid,
455 "TunnelID": args.lstid,
457 "IPv4TunnelSenderAddress": args.lstsaddr,
458 "IPv4TunnelEndPointAddress": args.lsteaddr,
460 self.lsid_step = args.lsidstep
461 self.lstid_step = args.lstidstep
462 self.lspid_step = args.lspidstep
463 self.lstsaddr_step = args.lstsaddrstep
464 self.lsteaddr_step = args.lsteaddrstep
465 # Default values used for randomized part
468 (self.total_prefix_amount - self.remaining_prefixes_threshold - 1)
469 / self.prefix_count_to_add_default
475 (self.remaining_prefixes_threshold - 1)
476 / (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
481 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
482 s2_first_index = s1_slots * self.prefix_count_to_add_default
486 * (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
489 self.slot_gap_default = (
491 (self.total_prefix_amount - self.remaining_prefixes_threshold - 1)
492 / self.prefix_count_to_add_default
496 self.randomize_lowest_default = s2_first_index
497 self.randomize_highest_default = s2_last_index
498 # Initialising counters
499 self.phase1_start_time = 0
500 self.phase1_stop_time = 0
501 self.phase2_start_time = 0
502 self.phase2_stop_time = 0
503 self.phase1_updates_sent = 0
504 self.phase2_updates_sent = 0
505 self.updates_sent = 0
507 self.log_info = args.loglevel <= logging.INFO
508 self.log_debug = args.loglevel <= logging.DEBUG
510 Flags needed for the MessageGenerator performance optimization.
511 Calling logger methods each iteration even with proper log level set
512 slows down significantly the MessageGenerator performance.
513 Measured total generation time (1M updates, dry run, error log level):
514 - logging based on basic logger features: 36,2s
515 - logging based on advanced logger features (lazy logging): 21,2s
516 - conditional calling of logger methods enclosed inside condition: 8,6s
519 logger.info("Generator initialisation")
521 " Target total number of prefixes to be introduced: "
522 + str(self.total_prefix_amount)
526 + str(self.prefix_base_default)
528 + str(self.prefix_length_default)
531 " My Autonomous System number: " + str(self.my_autonomous_system_default)
533 logger.info(" My Hold Time: " + str(self.hold_time_default))
534 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
535 logger.info(" Next Hop: " + str(self.next_hop_default))
536 logger.info(" Originator ID: " + str(self.originator_id_default))
537 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
539 " Prefix count to be inserted at once: "
540 + str(self.prefix_count_to_add_default)
543 " Prefix count to be withdrawn at once: "
544 + str(self.prefix_count_to_del_default)
547 " Fast pre-fill up to "
548 + str(self.total_prefix_amount - self.remaining_prefixes_threshold)
552 " Remaining number of prefixes to be processed "
553 + "in parallel with withdrawals: "
554 + str(self.remaining_prefixes_threshold)
557 " Prefix index range used after pre-fill procedure ["
558 + str(self.randomize_lowest_default)
560 + str(self.randomize_highest_default)
563 if self.single_update_default:
565 " Common single UPDATE will be generated "
566 + "for both NLRI & WITHDRAWN lists"
570 " Two separate UPDATEs will be generated "
571 + "for each NLRI & WITHDRAWN lists"
573 if self.randomize_updates_default:
574 logger.info(" Generation of UPDATE messages will be randomized")
575 logger.info(" Let's go ...\n")
577 # TODO: Notification for hold timer expiration can be handy.
579 def store_results(self, file_name=None, threshold=None):
580 """Stores specified results into files based on file_name value.
583 :param file_name: Trailing (common) part of result file names
584 :param threshold: Minimum number of sent updates needed for each
585 result to be included into result csv file
586 (mainly needed because of the result accuracy)
590 # default values handling
591 # TODO optimize default values handling (use e.g. dicionary.update() approach)
592 if file_name is None:
593 file_name = self.results_file_name_default
594 if threshold is None:
595 threshold = self.performance_threshold_default
596 # performance calculation
597 if self.phase1_updates_sent >= threshold:
598 totals1 = self.phase1_updates_sent
600 self.phase1_updates_sent
601 / (self.phase1_stop_time - self.phase1_start_time)
606 if self.phase2_updates_sent >= threshold:
607 totals2 = self.phase2_updates_sent
609 self.phase2_updates_sent
610 / (self.phase2_stop_time - self.phase2_start_time)
616 logger.info("#" * 10 + " Final results " + "#" * 10)
617 logger.info("Number of iterations: " + str(self.iteration))
619 "Number of UPDATE messages sent in the pre-fill phase: "
620 + str(self.phase1_updates_sent)
623 "The pre-fill phase duration: "
624 + str(self.phase1_stop_time - self.phase1_start_time)
628 "Number of UPDATE messages sent in the 2nd test phase: "
629 + str(self.phase2_updates_sent)
632 "The 2nd test phase duration: "
633 + str(self.phase2_stop_time - self.phase2_start_time)
636 logger.info("Threshold for performance reporting: " + str(threshold))
640 "pre-fill " + str(self.prefix_count_to_add_default) + " route(s) per UPDATE"
642 if self.single_update_default:
643 phase2_label = "+" + (
644 str(self.prefix_count_to_add_default)
646 + str(self.prefix_count_to_del_default)
647 + " routes per UPDATE"
650 phase2_label = "+" + (
651 str(self.prefix_count_to_add_default)
653 + str(self.prefix_count_to_del_default)
654 + " routes in two UPDATEs"
656 # collecting capacity and performance results
659 if totals1 is not None:
660 totals[phase1_label] = totals1
661 performance[phase1_label] = performance1
662 if totals2 is not None:
663 totals[phase2_label] = totals2
664 performance[phase2_label] = performance2
665 self.write_results_to_file(totals, "totals-" + file_name)
666 self.write_results_to_file(performance, "performance-" + file_name)
668 def write_results_to_file(self, results, file_name):
669 """Writes results to the csv plot file consumable by Jenkins.
672 :param file_name: Name of the (csv) file to be created
678 f = open(file_name, "wt")
680 for key in sorted(results):
681 first_line += key + ", "
682 second_line += str(results[key]) + ", "
683 first_line = first_line[:-2]
684 second_line = second_line[:-2]
685 f.write(first_line + "\n")
686 f.write(second_line + "\n")
688 "Message generator performance results stored in " + file_name + ":"
690 logger.info(" " + first_line)
691 logger.info(" " + second_line)
695 # Return pseudo-randomized (reproducible) index for selected range
696 def randomize_index(self, index, lowest=None, highest=None):
697 """Calculates pseudo-randomized index from selected range.
700 :param index: input index
701 :param lowest: the lowes index from the randomized area
702 :param highest: the highest index from the randomized area
704 :return: the (pseudo)randomized index
706 Created just as a fame for future generator enhancement.
708 # default values handling
709 # TODO optimize default values handling (use e.g. dicionary.update() approach)
711 lowest = self.randomize_lowest_default
713 highest = self.randomize_highest_default
715 if (index >= lowest) and (index <= highest):
716 # we are in the randomized range -> shuffle it inside
717 # the range (now just reverse the order)
718 new_index = highest - (index - lowest)
720 # we are out of the randomized range -> nothing to do
724 def get_ls_nlri_values(self, index):
725 """Generates LS-NLRI parameters.
726 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
729 :param index: index (iteration)
731 :return: dictionary of LS NLRI parameters and values
733 # generating list of LS NLRI parameters
734 identifier = self.ls_nlri_default["Identifier"] + int(index / self.lsid_step)
735 ipv4_tunnel_sender_address = self.ls_nlri_default[
736 "IPv4TunnelSenderAddress"
737 ] + int(index / self.lstsaddr_step)
738 tunnel_id = self.ls_nlri_default["TunnelID"] + int(index / self.lstid_step)
739 lsp_id = self.ls_nlri_default["LSPID"] + int(index / self.lspid_step)
740 ipv4_tunnel_endpoint_address = self.ls_nlri_default[
741 "IPv4TunnelEndPointAddress"
742 ] + int(index / self.lsteaddr_step)
744 "Identifier": identifier,
745 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
746 "TunnelID": tunnel_id,
748 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address,
750 return ls_nlri_values
761 """Generates list of IP address prefixes.
764 :param slot_index: index of group of prefix addresses
765 :param slot_size: size of group of prefix addresses
766 in [number of included prefixes]
767 :param prefix_base: IP address of the first prefix
768 (slot_index = 0, prefix_index = 0)
769 :param prefix_len: length of the prefix in bites
770 (the same as size of netmask)
771 :param prefix_count: number of prefixes to be returned
772 from the specified slot
774 :return: list of generated IP address prefixes
776 # default values handling
777 # TODO optimize default values handling (use e.g. dicionary.update() approach)
778 if slot_size is None:
779 slot_size = self.slot_size_default
780 if prefix_base is None:
781 prefix_base = self.prefix_base_default
782 if prefix_len is None:
783 prefix_len = self.prefix_length_default
784 if prefix_count is None:
785 prefix_count = slot_size
786 if randomize is None:
787 randomize = self.randomize_updates_default
788 # generating list of prefixes
791 prefix_gap = 2 ** (32 - prefix_len)
792 for i in range(prefix_count):
793 prefix_index = slot_index * slot_size + i
795 prefix_index = self.randomize_index(prefix_index)
796 indexes.append(prefix_index)
797 prefixes.append(prefix_base + prefix_index * prefix_gap)
799 logger.debug(" Prefix slot index: " + str(slot_index))
800 logger.debug(" Prefix slot size: " + str(slot_size))
801 logger.debug(" Prefix count: " + str(prefix_count))
802 logger.debug(" Prefix indexes: " + str(indexes))
803 logger.debug(" Prefix list: " + str(prefixes))
806 def compose_update_message(
807 self, prefix_count_to_add=None, prefix_count_to_del=None
809 """Composes an UPDATE message
812 :param prefix_count_to_add: # of prefixes to put into NLRI list
813 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
815 :return: encoded UPDATE message in HEX
817 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
818 lists or common message wich includes both prefix lists.
819 Updates global counters.
821 # default values handling
822 # TODO optimize default values handling (use e.g. dicionary.update() approach)
823 if prefix_count_to_add is None:
824 prefix_count_to_add = self.prefix_count_to_add_default
825 if prefix_count_to_del is None:
826 prefix_count_to_del = self.prefix_count_to_del_default
828 if self.log_info and not (self.iteration % 1000):
831 + str(self.iteration)
832 + " - total remaining prefixes: "
833 + str(self.remaining_prefixes)
837 "#" * 10 + " Iteration: " + str(self.iteration) + " " + "#" * 10
839 logger.debug("Remaining prefixes: " + str(self.remaining_prefixes))
840 # scenario type & one-shot counter
841 straightforward_scenario = (
842 self.remaining_prefixes > self.remaining_prefixes_threshold
844 if straightforward_scenario:
845 prefix_count_to_del = 0
847 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
848 if not self.phase1_start_time:
849 self.phase1_start_time = time.time()
852 logger.debug("--- COMBINED SCENARIO ---")
853 if not self.phase2_start_time:
854 self.phase2_start_time = time.time()
855 # tailor the number of prefixes if needed
856 prefix_count_to_add = prefix_count_to_del + min(
857 prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes
859 # prefix slots selection for insertion and withdrawal
860 slot_index_to_add = self.iteration
861 slot_index_to_del = slot_index_to_add - self.slot_gap_default
862 # getting lists of prefixes for insertion in this iteration
864 logger.debug("Prefixes to be inserted in this iteration:")
865 prefix_list_to_add = self.get_prefix_list(
866 slot_index_to_add, prefix_count=prefix_count_to_add
868 # getting lists of prefixes for withdrawal in this iteration
870 logger.debug("Prefixes to be withdrawn in this iteration:")
871 prefix_list_to_del = self.get_prefix_list(
872 slot_index_to_del, prefix_count=prefix_count_to_del
874 # generating the UPDATE mesage with LS-NLRI only
876 ls_nlri = self.get_ls_nlri_values(self.iteration)
877 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[], **ls_nlri)
879 # generating the UPDATE message with prefix lists
880 if self.single_update_default:
881 # Send prefixes to be introduced and withdrawn
882 # in one UPDATE message
883 msg_out = self.update_message(
884 wr_prefixes=prefix_list_to_del, nlri_prefixes=prefix_list_to_add
887 # Send prefixes to be introduced and withdrawn
888 # in separate UPDATE messages (if needed)
889 msg_out = self.update_message(
890 wr_prefixes=[], nlri_prefixes=prefix_list_to_add
892 if prefix_count_to_del:
893 msg_out += self.update_message(
894 wr_prefixes=prefix_list_to_del, nlri_prefixes=[]
896 # updating counters - who knows ... maybe I am last time here ;)
897 if straightforward_scenario:
898 self.phase1_stop_time = time.time()
899 self.phase1_updates_sent = self.updates_sent
901 self.phase2_stop_time = time.time()
902 self.phase2_updates_sent = self.updates_sent - self.phase1_updates_sent
903 # updating totals for the next iteration
905 self.remaining_prefixes -= prefix_count_to_add - prefix_count_to_del
906 # returning the encoded message
909 # Section of message encoders
914 my_autonomous_system=None,
918 """Generates an OPEN Message (rfc4271#section-4.2)
921 :param version: see the rfc4271#section-4.2
922 :param my_autonomous_system: see the rfc4271#section-4.2
923 :param hold_time: see the rfc4271#section-4.2
924 :param bgp_identifier: see the rfc4271#section-4.2
926 :return: encoded OPEN message in HEX
929 # default values handling
930 # TODO optimize default values handling (use e.g. dicionary.update() approach)
932 version = self.version_default
933 if my_autonomous_system is None:
934 my_autonomous_system = self.my_autonomous_system_default
935 if hold_time is None:
936 hold_time = self.hold_time_default
937 if bgp_identifier is None:
938 bgp_identifier = self.bgp_identifier_default
941 marker_hex = b"\xFF" * 16
945 type_hex = struct.pack("B", type)
948 version_hex = struct.pack("B", version)
950 # my_autonomous_system
951 # AS_TRANS value, 23456 decadic.
952 my_autonomous_system_2_bytes = 23456
953 # AS number is mappable to 2 bytes
954 if my_autonomous_system < 65536:
955 my_autonomous_system_2_bytes = my_autonomous_system
956 my_autonomous_system_hex_2_bytes = struct.pack(">H", my_autonomous_system)
959 hold_time_hex = struct.pack(">H", hold_time)
962 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
964 # Optional Parameters
965 optional_parameters_hex = b""
966 if self.rfc4760 or self.allf:
967 optional_parameter_hex = (
968 b"\x02" # Param type ("Capability Ad")
969 b"\x06" # Length (6 bytes)
970 b"\x01" # Capability type (NLRI Unicast),
971 # see RFC 4760, secton 8
972 b"\x04" # Capability value length
973 b"\x00\x01" # AFI (Ipv4)
975 b"\x01" # SAFI (Unicast)
977 optional_parameters_hex += optional_parameter_hex
979 if self.ipv6 or self.allf:
980 optional_parameter_hex = (
981 b"\x02" # Param type ("Capability Ad")
982 b"\x06" # Length (6 bytes)
983 b"\x01" # Multiprotocol extetension capability,
984 b"\x04" # Capability value length
985 b"\x00\x02" # AFI (IPV6)
987 b"\x01" # SAFI (UNICAST)
989 optional_parameters_hex += optional_parameter_hex
991 if self.bgpls or self.allf:
992 optional_parameter_hex = (
993 b"\x02" # Param type ("Capability Ad")
994 b"\x06" # Length (6 bytes)
995 b"\x01" # Capability type (NLRI Unicast),
996 # see RFC 4760, secton 8
997 b"\x04" # Capability value length
998 b"\x40\x04" # AFI (BGP-LS)
1000 b"\x47" # SAFI (BGP-LS)
1002 optional_parameters_hex += optional_parameter_hex
1004 if self.evpn or self.allf:
1005 optional_parameter_hex = (
1006 b"\x02" # Param type ("Capability Ad")
1007 b"\x06" # Length (6 bytes)
1008 b"\x01" # Multiprotocol extetension capability,
1009 b"\x04" # Capability value length
1010 b"\x00\x19" # AFI (L2-VPN)
1011 b"\x00" # (reserved)
1012 b"\x46" # SAFI (EVPN)
1014 optional_parameters_hex += optional_parameter_hex
1016 if self.mvpn or self.allf:
1017 optional_parameter_hex = (
1018 b"\x02" # Param type ("Capability Ad")
1019 b"\x06" # Length (6 bytes)
1020 b"\x01" # Multiprotocol extetension capability,
1021 b"\x04" # Capability value length
1022 b"\x00\x01" # AFI (IPV4)
1023 b"\x00" # (reserved)
1024 b"\x05" # SAFI (MCAST-VPN)
1026 optional_parameters_hex += optional_parameter_hex
1027 optional_parameter_hex = (
1028 b"\x02" # Param type ("Capability Ad")
1029 b"\x06" # Length (6 bytes)
1030 b"\x01" # Multiprotocol extetension capability,
1031 b"\x04" # Capability value length
1032 b"\x00\x02" # AFI (IPV6)
1033 b"\x00" # (reserved)
1034 b"\x05" # SAFI (MCAST-VPN)
1036 optional_parameters_hex += optional_parameter_hex
1038 if self.l3vpn_mcast or self.allf:
1039 optional_parameter_hex = (
1040 b"\x02" # Param type ("Capability Ad")
1041 b"\x06" # Length (6 bytes)
1042 b"\x01" # Multiprotocol extetension capability,
1043 b"\x04" # Capability value length
1044 b"\x00\x01" # AFI (IPV4)
1045 b"\x00" # (reserved)
1046 b"\x81" # SAFI (L3VPN-MCAST)
1048 optional_parameters_hex += optional_parameter_hex
1049 optional_parameter_hex = (
1050 b"\x02" # Param type ("Capability Ad")
1051 b"\x06" # Length (6 bytes)
1052 b"\x01" # Multiprotocol extetension capability,
1053 b"\x04" # Capability value length
1054 b"\x00\x02" # AFI (IPV6)
1055 b"\x00" # (reserved)
1056 b"\x81" # SAFI (L3VPN-MCAST)
1058 optional_parameters_hex += optional_parameter_hex
1060 if self.l3vpn or self.allf:
1061 optional_parameter_hex = (
1062 b"\x02" # Param type ("Capability Ad")
1063 b"\x06" # Length (6 bytes)
1064 b"\x01" # Multiprotocol extetension capability,
1065 b"\x04" # Capability value length
1066 b"\x00\x01" # AFI (IPV4)
1067 b"\x00" # (reserved)
1068 b"\x80" # SAFI (L3VPN-UNICAST)
1070 optional_parameters_hex += optional_parameter_hex
1071 optional_parameter_hex = (
1072 b"\x02" # Param type ("Capability Ad")
1073 b"\x06" # Length (6 bytes)
1074 b"\x01" # Multiprotocol extetension capability,
1075 b"\x04" # Capability value length
1076 b"\x00\x02" # AFI (IPV6)
1077 b"\x00" # (reserved)
1078 b"\x80" # SAFI (L3VPN-UNICAST)
1080 optional_parameters_hex += optional_parameter_hex
1082 if self.rt_constrain or self.allf:
1083 optional_parameter_hex = (
1084 b"\x02" # Param type ("Capability Ad")
1085 b"\x06" # Length (6 bytes)
1086 b"\x01" # Multiprotocol extetension capability,
1087 b"\x04" # Capability value length
1088 b"\x00\x01" # AFI (IPV4)
1089 b"\x00" # (reserved)
1090 b"\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
1092 optional_parameters_hex += optional_parameter_hex
1094 optional_parameter_hex = (
1095 b"\x02" # Param type ("Capability Ad")
1096 b"\x06" # Length (6 bytes)
1097 b"\x41" # "32 bit AS Numbers Support"
1098 # (see RFC 6793, section 3)
1099 b"\x04" # Capability value length
1101 optional_parameter_hex += struct.pack(
1102 ">I", my_autonomous_system
1103 ) # My AS in 32 bit format
1104 optional_parameters_hex += optional_parameter_hex
1107 b = list(bin(self.grace)[2:])
1108 b = b + [0] * (3 - len(b))
1111 restart_flag = b"\x80\x05"
1113 restart_flag = b"\x00\x05"
1119 ll_gr = b"\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
1123 logger.debug("Grace parameters list: {}".format(b))
1124 # "\x02" Param type ("Capability Ad")
1125 # :param length: Length of whole message
1126 # "\x40" Graceful-restart capability
1127 # "\x06" Length (6 bytes)
1128 # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
1129 # "\x05" Restart timer (5sec)
1130 # "\x00\x01" AFI (IPV4)
1131 # "\x01" SAFI (Unicast)
1132 # "\x00" Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
1133 # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
1134 # ll-gr turned on when grace is between 4-7
1135 optional_parameter_hex = (
1136 f"\x02{length}\x40\x06{restart_flag}\x00\x01\x01{ipv4_flag}{ll_gr}"
1138 optional_parameters_hex += optional_parameter_hex
1140 # Optional Parameters Length
1141 optional_parameters_length = len(optional_parameters_hex)
1142 optional_parameters_length_hex = struct.pack("B", optional_parameters_length)
1144 # Length (big-endian)
1150 + len(my_autonomous_system_hex_2_bytes)
1151 + len(hold_time_hex)
1152 + len(bgp_identifier_hex)
1153 + len(optional_parameters_length_hex)
1154 + len(optional_parameters_hex)
1156 length_hex = struct.pack(">H", length)
1164 + my_autonomous_system_hex_2_bytes
1166 + bgp_identifier_hex
1167 + optional_parameters_length_hex
1168 + optional_parameters_hex
1172 logger.debug("OPEN message encoding")
1173 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex).decode())
1178 + binascii.hexlify(length_hex).decode()
1185 + binascii.hexlify(type_hex).decode()
1192 + binascii.hexlify(version_hex).decode()
1196 " My Autonomous System="
1197 + str(my_autonomous_system_2_bytes)
1199 + binascii.hexlify(my_autonomous_system_hex_2_bytes).decode()
1206 + binascii.hexlify(hold_time_hex).decode()
1211 + str(bgp_identifier)
1213 + binascii.hexlify(bgp_identifier_hex).decode()
1217 " Optional Parameters Length="
1218 + str(optional_parameters_length)
1220 + binascii.hexlify(optional_parameters_length_hex).decode()
1224 " Optional Parameters=0x"
1225 + binascii.hexlify(optional_parameters_hex).decode()
1227 logger.debug("OPEN message encoded: 0x%s", binascii.b2a_hex(message_hex))
1235 wr_prefix_length=None,
1236 nlri_prefix_length=None,
1237 my_autonomous_system=None,
1240 cluster_list_item=None,
1244 """Generates an UPDATE Message (rfc4271#section-4.3)
1247 :param wr_prefixes: see the rfc4271#section-4.3
1248 :param nlri_prefixes: see the rfc4271#section-4.3
1249 :param wr_prefix_length: see the rfc4271#section-4.3
1250 :param nlri_prefix_length: see the rfc4271#section-4.3
1251 :param my_autonomous_system: see the rfc4271#section-4.3
1252 :param next_hop: see the rfc4271#section-4.3
1254 :return: encoded UPDATE message in HEX
1257 # default values handling
1258 # TODO optimize default values handling (use e.g. dicionary.update() approach)
1259 if wr_prefixes is None:
1260 wr_prefixes = self.wr_prefixes_default
1261 if nlri_prefixes is None:
1262 nlri_prefixes = self.nlri_prefixes_default
1263 if wr_prefix_length is None:
1264 wr_prefix_length = self.prefix_length_default
1265 if nlri_prefix_length is None:
1266 nlri_prefix_length = self.prefix_length_default
1267 if my_autonomous_system is None:
1268 my_autonomous_system = self.my_autonomous_system_default
1269 if next_hop is None:
1270 next_hop = self.next_hop_default
1271 if originator_id is None:
1272 originator_id = self.originator_id_default
1273 if cluster_list_item is None:
1274 cluster_list_item = self.cluster_list_item_default
1275 ls_nlri = self.ls_nlri_default.copy()
1276 ls_nlri.update(ls_nlri_params)
1279 marker_hex = b"\xFF" * 16
1283 type_hex = struct.pack("B", type)
1286 withdrawn_routes_hex = b""
1288 bytes = int((wr_prefix_length - 1) / 8) + 1
1289 for prefix in wr_prefixes:
1290 withdrawn_route_hex = (
1291 struct.pack("B", wr_prefix_length)
1292 + struct.pack(">I", int(prefix))[:bytes]
1294 withdrawn_routes_hex += withdrawn_route_hex
1296 # Withdrawn Routes Length
1297 withdrawn_routes_length = len(withdrawn_routes_hex)
1298 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1300 # TODO: to replace hardcoded string by encoding?
1302 path_attributes_hex = b""
1303 if not self.skipattr:
1304 path_attributes_hex += (
1305 b"\x40" # Flags ("Well-Known")
1306 b"\x01" # Type (ORIGIN)
1307 b"\x01" # Length (1)
1308 b"\x00" # Origin: IGP
1310 path_attributes_hex += (
1311 b"\x40" # Flags ("Well-Known")
1312 b"\x02" # Type (AS_PATH)
1313 b"\x06" # Length (6)
1314 b"\x02" # AS segment type (AS_SEQUENCE)
1315 b"\x01" # AS segment length (1)
1317 my_as_hex = struct.pack(">I", my_autonomous_system)
1318 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1319 path_attributes_hex += (
1320 b"\x40" # Flags ("Well-Known")
1321 b"\x05" # Type (LOCAL_PREF)
1322 b"\x04" # Length (4)
1323 b"\x00\x00\x00\x64" # (100)
1325 if nlri_prefixes != []:
1326 path_attributes_hex += (
1327 b"\x40" # Flags ("Well-Known")
1328 b"\x03" # Type (NEXT_HOP)
1329 b"\x04" # Length (4)
1331 next_hop_hex = struct.pack(">I", int(next_hop))
1332 path_attributes_hex += next_hop_hex # IP address of the next hop (4 bytes)
1333 if originator_id is not None:
1334 path_attributes_hex += (
1335 b"\x80" # Flags ("Optional, non-transitive")
1336 b"\x09" # Type (ORIGINATOR_ID)
1337 b"\x04" # Length (4)
1338 ) # ORIGINATOR_ID (4 bytes)
1339 path_attributes_hex += struct.pack(">I", int(originator_id))
1340 if cluster_list_item is not None:
1341 path_attributes_hex += (
1342 b"\x80" # Flags ("Optional, non-transitive")
1343 b"\x0a" # Type (CLUSTER_LIST)
1344 b"\x04" # Length (4)
1345 ) # one CLUSTER_LIST item (4 bytes)
1346 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1348 if self.bgpls and not end_of_rib:
1349 path_attributes_hex += (
1350 b"\x80" # Flags ("Optional, non-transitive")
1351 b"\x0e" # Type (MP_REACH_NLRI)
1352 b"\x22" # Length (34)
1353 b"\x40\x04" # AFI (BGP-LS)
1354 b"\x47" # SAFI (BGP-LS)
1355 b"\x04" # Next Hop Length (4)
1357 path_attributes_hex += struct.pack(">I", int(next_hop))
1358 path_attributes_hex += b"\x00" # Reserved
1359 path_attributes_hex += (
1360 b"\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1361 b"\x00\x15" # LS-NLRI.TotalNLRILength (21)
1362 b"\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1364 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1365 path_attributes_hex += struct.pack(
1366 ">I", int(ls_nlri["IPv4TunnelSenderAddress"])
1368 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1369 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1370 path_attributes_hex += struct.pack(
1371 ">I", int(ls_nlri["IPv4TunnelEndPointAddress"])
1374 # Total Path Attributes Length
1375 total_path_attributes_length = len(path_attributes_hex)
1376 total_path_attributes_length_hex = struct.pack(
1377 ">H", total_path_attributes_length
1380 # Network Layer Reachability Information
1383 bytes = int((nlri_prefix_length - 1) / 8) + 1
1384 for prefix in nlri_prefixes:
1386 struct.pack("B", nlri_prefix_length)
1387 + struct.pack(">I", int(prefix))[:bytes]
1389 nlri_hex += nlri_prefix_hex
1391 # Length (big-endian)
1396 + len(withdrawn_routes_length_hex)
1397 + len(withdrawn_routes_hex)
1398 + len(total_path_attributes_length_hex)
1399 + len(path_attributes_hex)
1402 length_hex = struct.pack(">H", length)
1409 + withdrawn_routes_length_hex
1410 + withdrawn_routes_hex
1411 + total_path_attributes_length_hex
1412 + path_attributes_hex
1416 if self.grace != 8 and self.grace != 0 and end_of_rib:
1417 message_hex = marker_hex + binascii.unhexlify("00170200000000")
1420 logger.debug("UPDATE message encoding")
1421 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex).decode())
1426 + binascii.hexlify(length_hex).decode()
1433 + binascii.hexlify(type_hex).decode()
1437 " withdrawn_routes_length="
1438 + str(withdrawn_routes_length)
1440 + binascii.hexlify(withdrawn_routes_length_hex).decode()
1444 " Withdrawn_Routes="
1447 + str(wr_prefix_length)
1449 + binascii.hexlify(withdrawn_routes_hex).decode()
1452 if total_path_attributes_length:
1454 " Total Path Attributes Length="
1455 + str(total_path_attributes_length)
1457 + binascii.hexlify(total_path_attributes_length_hex).decode()
1463 + binascii.hexlify(path_attributes_hex).decode()
1466 logger.debug(" Origin=IGP")
1467 logger.debug(" AS path=" + str(my_autonomous_system))
1468 logger.debug(" Next hop=" + str(next_hop))
1469 if originator_id is not None:
1470 logger.debug(" Originator id=" + str(originator_id))
1471 if cluster_list_item is not None:
1472 logger.debug(" Cluster list=" + str(cluster_list_item))
1474 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1476 " Network Layer Reachability Information="
1477 + str(nlri_prefixes)
1479 + str(nlri_prefix_length)
1481 + binascii.hexlify(nlri_hex).decode()
1485 "UPDATE message encoded: 0x" + binascii.b2a_hex(message_hex).decode()
1489 self.updates_sent += 1
1490 # returning encoded message
1493 def notification_message(self, error_code, error_subcode, data_hex=b""):
1494 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1497 :param error_code: see the rfc4271#section-4.5
1498 :param error_subcode: see the rfc4271#section-4.5
1499 :param data_hex: see the rfc4271#section-4.5
1501 :return: encoded NOTIFICATION message in HEX
1505 marker_hex = b"\xFF" * 16
1509 type_hex = struct.pack("B", type)
1512 error_code_hex = struct.pack("B", error_code)
1515 error_subcode_hex = struct.pack("B", error_subcode)
1517 # Length (big-endian)
1522 + len(error_code_hex)
1523 + len(error_subcode_hex)
1526 length_hex = struct.pack(">H", length)
1528 # NOTIFICATION Message
1539 logger.debug("NOTIFICATION message encoding")
1540 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex).decode())
1545 + binascii.hexlify(length_hex).decode()
1552 + binascii.hexlify(type_hex).decode()
1559 + binascii.hexlify(error_code_hex).decode()
1564 + str(error_subcode)
1566 + binascii.hexlify(error_subcode_hex).decode()
1569 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex).decode() + ")")
1571 "NOTIFICATION message encoded: 0x%s", binascii.b2a_hex(message_hex)
1576 def keepalive_message(self):
1577 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1580 :return: encoded KEEP ALIVE message in HEX
1584 marker_hex = b"\xFF" * 16
1588 type_hex = struct.pack("B", type)
1590 # Length (big-endian)
1591 length = len(marker_hex) + 2 + len(type_hex)
1592 length_hex = struct.pack(">H", length)
1594 # KEEP ALIVE Message
1595 message_hex = marker_hex + length_hex + type_hex
1598 logger.debug("KEEP ALIVE message encoding")
1599 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex).decode())
1604 + binascii.hexlify(length_hex).decode()
1611 + binascii.hexlify(type_hex).decode()
1615 "KEEP ALIVE message encoded: 0x%s",
1616 binascii.b2a_hex(message_hex).decode(),
1622 class TimeTracker(object):
1623 """Class for tracking timers, both for my keepalives and
1627 def __init__(self, msg_in):
1628 """Initialisation. based on defaults and OPEN message from peer.
1631 msg_in: the OPEN message received from peer.
1633 # Note: Relative time is always named timedelta, to stress that
1634 # the (non-delta) time is absolute.
1635 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1636 # Upper bound for being stuck in the same state, we should
1637 # at least report something before continuing.
1638 # Negotiate the hold timer by taking the smaller
1639 # of the 2 values (mine and the peer's).
1640 hold_timedelta = 180 # Not an attribute of self yet.
1641 # TODO: Make the default value configurable,
1642 # default value could mirror what peer said.
1643 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1644 if hold_timedelta > peer_hold_timedelta:
1645 hold_timedelta = peer_hold_timedelta
1646 if hold_timedelta != 0 and hold_timedelta < 3:
1647 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1648 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1649 self.hold_timedelta = hold_timedelta
1650 # If we do not hear from peer this long, we assume it has died.
1651 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1652 # Upper limit for duration between messages, to avoid being
1653 # declared to be dead.
1654 # The same as calling snapshot(), but also declares a field.
1655 self.snapshot_time = time.time()
1656 # Sometimes we need to store time. This is where to get
1657 # the value from afterwards. Time_keepalive may be too strict.
1658 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1659 # At this time point, peer will be declared dead.
1660 self.my_keepalive_time = None # to be set later
1661 # At this point, we should be sending keepalive message.
1664 """Store current time in instance data to use later."""
1665 # Read as time before something interesting was called.
1666 self.snapshot_time = time.time()
1668 def reset_peer_hold_time(self):
1669 """Move hold time to future as peer has just proven it still lives."""
1670 self.peer_hold_time = time.time() + self.hold_timedelta
1672 # Some methods could rely on self.snapshot_time, but it is better
1673 # to require user to provide it explicitly.
1674 def reset_my_keepalive_time(self, keepalive_time):
1675 """Calculate and set the next my KEEP ALIVE timeout time
1678 :keepalive_time: the initial value of the KEEP ALIVE timer
1680 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1682 def is_time_for_my_keepalive(self):
1683 """Check for my KEEP ALIVE timeout occurence"""
1684 if self.hold_timedelta == 0:
1686 return self.snapshot_time >= self.my_keepalive_time
1688 def get_next_event_time(self):
1689 """Set the time of the next expected or to be sent KEEP ALIVE"""
1690 if self.hold_timedelta == 0:
1691 return self.snapshot_time + 86400
1692 return min(self.my_keepalive_time, self.peer_hold_time)
1694 def check_peer_hold_time(self, snapshot_time):
1695 """Raise error if nothing was read from peer until specified time."""
1696 # Hold time = 0 means keepalive checking off.
1697 if self.hold_timedelta != 0:
1698 # time.time() may be too strict
1699 if snapshot_time > self.peer_hold_time:
1700 logger.error("Peer has overstepped the hold timer.")
1701 raise RuntimeError("Peer has overstepped the hold timer.")
1702 # TODO: Include hold_timedelta?
1703 # TODO: Add notification sending (attempt). That means
1704 # move to write tracker.
1707 class ReadTracker(object):
1708 """Class for tracking read of mesages chunk by chunk and
1727 """The reader initialisation.
1730 bgp_socket: socket to be used for sending
1731 timer: timer to be used for scheduling
1732 storage: thread safe dict
1733 evpn: flag that evpn functionality is tested
1734 mvpn: flag that mvpn functionality is tested
1735 grace: flag that grace-restart functionality is tested
1736 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1737 l3vpn: flag that l3vpn unicast functionality is tested
1738 rt_constrain: flag that rt-constrain functionality is tested
1739 allf: flag for all family testing.
1741 # References to outside objects.
1742 self.socket = bgp_socket
1744 # BGP marker length plus length field length.
1745 self.header_length = 18
1746 # TODO: make it class (constant) attribute
1747 # Computation of where next chunk ends depends on whether
1748 # we are beyond length field.
1749 self.reading_header = True
1750 # Countdown towards next size computation.
1751 self.bytes_to_read = self.header_length
1752 # Incremental buffer for message under read.
1754 # Initialising counters
1755 self.updates_received = 0
1756 self.prefixes_introduced = 0
1757 self.prefixes_withdrawn = 0
1758 self.rx_idle_time = 0
1759 self.rx_activity_detected = True
1760 self.storage = storage
1763 self.l3vpn_mcast = l3vpn_mcast
1765 self.rt_constrain = rt_constrain
1768 self.wfr = wait_for_read
1771 def read_message_chunk(self):
1772 """Read up to one message
1775 Currently it does not return anything.
1777 # TODO: We could return the whole message, currently not needed.
1778 # We assume the socket is readable.
1779 chunk_message = self.socket.recv(self.bytes_to_read)
1780 self.msg_in += chunk_message
1781 self.bytes_to_read -= len(chunk_message)
1782 # TODO: bytes_to_read < 0 is not possible, right?
1783 if not self.bytes_to_read:
1784 # Finished reading a logical block.
1785 if self.reading_header:
1786 # The logical block was a BGP header.
1787 # Now we know the size of the message.
1788 self.reading_header = False
1789 self.bytes_to_read = (
1790 get_short_int_from_message(self.msg_in) - self.header_length
1792 else: # We have finished reading the body of the message.
1793 # Peer has just proven it is still alive.
1794 self.timer.reset_peer_hold_time()
1795 # TODO: Do we want to count received messages?
1796 # This version ignores the received message.
1797 # TODO: Should we do validation and exit on anything
1798 # besides update or keepalive?
1799 # Prepare state for reading another message.
1800 message_type_hex = self.msg_in[
1801 self.header_length : self.header_length + 1
1803 if message_type_hex == b"\x01":
1805 "OPEN message received: 0x%s",
1806 binascii.b2a_hex(self.msg_in).decode(),
1808 elif message_type_hex == b"\x02":
1810 "UPDATE message received: 0x%s",
1811 binascii.b2a_hex(self.msg_in).decode(),
1813 self.decode_update_message(self.msg_in)
1814 elif message_type_hex == b"\x03":
1816 "NOTIFICATION message received: 0x%s",
1817 binascii.b2a_hex(self.msg_in).decode(),
1819 elif message_type_hex == b"\x04":
1821 "KEEP ALIVE message received: 0x%s",
1822 binascii.b2a_hex(self.msg_in).decode(),
1826 "Unexpected message received: 0x%s",
1827 binascii.b2a_hex(self.msg_in).decode(),
1830 self.reading_header = True
1831 self.bytes_to_read = self.header_length
1832 # We should not act upon peer_hold_time if we are reading
1833 # something right now.
1836 def decode_path_attributes(self, path_attributes_hex):
1837 """Decode the Path Attributes field (rfc4271#section-4.3)
1840 :path_attributes: path_attributes field to be decoded in hex
1844 hex_to_decode = path_attributes_hex
1846 while len(hex_to_decode):
1847 attr_flags_hex = hex_to_decode[0:1]
1848 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1849 # attr_optional_bit = attr_flags & 128
1850 # attr_transitive_bit = attr_flags & 64
1851 # attr_partial_bit = attr_flags & 32
1852 attr_extended_length_bit = attr_flags & 16
1854 attr_type_code_hex = hex_to_decode[1:2]
1855 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1857 if attr_extended_length_bit:
1858 attr_length_hex = hex_to_decode[2:4]
1859 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1860 attr_value_hex = hex_to_decode[4 : 4 + attr_length]
1861 hex_to_decode = hex_to_decode[4 + attr_length :]
1863 attr_length_hex = hex_to_decode[2:3]
1864 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1865 attr_value_hex = hex_to_decode[3 : 3 + attr_length]
1866 hex_to_decode = hex_to_decode[3 + attr_length :]
1868 if attr_type_code == 1:
1870 "Attribute type=1 (ORIGIN, flags:0x%s)",
1871 binascii.b2a_hex(attr_flags_hex),
1873 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1874 elif attr_type_code == 2:
1876 "Attribute type=2 (AS_PATH, flags:0x%s)",
1877 binascii.b2a_hex(attr_flags_hex),
1879 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1880 elif attr_type_code == 3:
1882 "Attribute type=3 (NEXT_HOP, flags:0x%s)",
1883 binascii.b2a_hex(attr_flags_hex),
1885 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1886 elif attr_type_code == 4:
1888 "Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1889 binascii.b2a_hex(attr_flags_hex),
1891 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1892 elif attr_type_code == 5:
1894 "Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1895 binascii.b2a_hex(attr_flags_hex),
1897 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1898 elif attr_type_code == 6:
1900 "Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1901 binascii.b2a_hex(attr_flags_hex),
1903 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1904 elif attr_type_code == 7:
1906 "Attribute type=7 (AGGREGATOR, flags:0x%s)",
1907 binascii.b2a_hex(attr_flags_hex),
1909 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1910 elif attr_type_code == 9: # rfc4456#section-8
1912 "Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1913 binascii.b2a_hex(attr_flags_hex),
1915 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1916 elif attr_type_code == 10: # rfc4456#section-8
1918 "Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1919 binascii.b2a_hex(attr_flags_hex),
1921 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1922 elif attr_type_code == 14: # rfc4760#section-3
1924 "Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1925 binascii.b2a_hex(attr_flags_hex),
1927 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1928 address_family_identifier_hex = attr_value_hex[0:2]
1930 " Address Family Identifier=0x%s",
1931 binascii.b2a_hex(address_family_identifier_hex),
1933 subsequent_address_family_identifier_hex = attr_value_hex[2:3]
1935 " Subsequent Address Family Identifier=0x%s",
1936 binascii.b2a_hex(subsequent_address_family_identifier_hex),
1938 next_hop_netaddr_len_hex = attr_value_hex[3:4]
1939 next_hop_netaddr_len = int(
1940 binascii.b2a_hex(next_hop_netaddr_len_hex), 16
1943 " Length of Next Hop Network Address=%s (0x%s)",
1944 next_hop_netaddr_len,
1945 binascii.b2a_hex(next_hop_netaddr_len_hex),
1947 next_hop_netaddr_hex = attr_value_hex[4 : 4 + next_hop_netaddr_len]
1948 next_hop_netaddr = ".".join(
1949 str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)
1952 " Network Address of Next Hop=%s (0x%s)",
1954 binascii.b2a_hex(next_hop_netaddr_hex),
1956 reserved_byte_pos = 4 + next_hop_netaddr_len
1957 reserved_hex = attr_value_hex[reserved_byte_pos : reserved_byte_pos + 1]
1958 logger.debug(" Reserved=0x%s", binascii.b2a_hex(reserved_hex))
1959 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1 :]
1961 " Network Layer Reachability Information=0x%s",
1962 binascii.b2a_hex(nlri_hex),
1964 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1965 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1966 for prefix in nlri_prefix_list:
1967 logger.debug(" nlri_prefix_received: %s", prefix)
1968 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1969 elif attr_type_code == 15: # rfc4760#section-4
1971 "Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1972 binascii.b2a_hex(attr_flags_hex),
1974 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1975 address_family_identifier_hex = attr_value_hex[0:2]
1977 " Address Family Identifier=0x%s",
1978 binascii.b2a_hex(address_family_identifier_hex),
1980 subsequent_address_family_identifier_hex = attr_value_hex[2:3]
1982 " Subsequent Address Family Identifier=0x%s",
1983 binascii.b2a_hex(subsequent_address_family_identifier_hex),
1985 wd_hex = attr_value_hex[3:]
1986 logger.debug(" Withdrawn Routes=0x%s", binascii.b2a_hex(wd_hex))
1987 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1988 logger.debug(" Withdrawn routes prefix list: %s", wdr_prefix_list)
1989 for prefix in wdr_prefix_list:
1990 logger.debug(" withdrawn_prefix_received: %s", prefix)
1991 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1994 "Unknown attribute type=%s, flags:0x%s)",
1996 binascii.b2a_hex(attr_flags_hex),
1999 "Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)
2003 def decode_update_message(self, msg):
2004 """Decode an UPDATE message (rfc4271#section-4.3)
2007 :msg: message to be decoded in hex
2011 logger.debug("Decoding update message:")
2012 # message header - marker
2013 marker_hex = msg[:16]
2014 logger.debug("Message header marker: 0x%s", binascii.b2a_hex(marker_hex))
2015 # message header - message length
2016 msg_length_hex = msg[16:18]
2017 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
2019 "Message lenght: 0x%s (%s)", binascii.b2a_hex(msg_length_hex), msg_length
2021 # message header - message type
2022 msg_type_hex = msg[18:19]
2023 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
2025 with self.storage as stor:
2026 # this will replace the previously stored message
2027 stor["update"] = binascii.hexlify(msg).decode()
2029 logger.debug("Evpn {}".format(self.evpn))
2031 logger.debug("Skipping update decoding due to evpn data expected")
2034 logger.debug("Graceful-restart {}".format(self.grace))
2037 "Skipping update decoding due to graceful-restart data expected"
2041 logger.debug("Mvpn {}".format(self.mvpn))
2043 logger.debug("Skipping update decoding due to mvpn data expected")
2046 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
2047 if self.l3vpn_mcast:
2048 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
2051 logger.debug("L3vpn-unicast {}".format(self.l3vpn))
2052 if self.l3vpn_mcast:
2053 logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
2056 logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
2057 if self.rt_constrain:
2059 "Skipping update decoding due to Route-Target-Constrain data expected"
2063 logger.debug("Ipv6-Unicast {}".format(self.ipv6))
2065 logger.debug("Skipping update decoding due to Ipv6 data expected")
2068 logger.debug("Allf {}".format(self.allf))
2070 logger.debug("Skipping update decoding")
2074 logger.debug("Message type: 0x%s (update)", binascii.b2a_hex(msg_type_hex))
2075 # withdrawn routes length
2076 wdr_length_hex = msg[19:21]
2077 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
2079 "Withdrawn routes lenght: 0x%s (%s)",
2080 binascii.b2a_hex(wdr_length_hex),
2084 wdr_hex = msg[21 : 21 + wdr_length]
2085 logger.debug("Withdrawn routes: 0x%s", binascii.b2a_hex(wdr_hex))
2086 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
2087 logger.debug("Withdrawn routes prefix list: %s", wdr_prefix_list)
2088 for prefix in wdr_prefix_list:
2089 logger.debug("withdrawn_prefix_received: %s", prefix)
2090 # total path attribute length
2091 total_pa_length_offset = 21 + wdr_length
2092 total_pa_length_hex = msg[
2093 total_pa_length_offset : total_pa_length_offset + 2
2095 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
2097 "Total path attribute lenght: 0x%s (%s)",
2098 binascii.b2a_hex(total_pa_length_hex),
2102 pa_offset = total_pa_length_offset + 2
2103 pa_hex = msg[pa_offset : pa_offset + total_pa_length]
2104 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
2105 self.decode_path_attributes(pa_hex)
2106 # network layer reachability information length
2107 nlri_length = msg_length - 23 - total_pa_length - wdr_length
2108 logger.debug("Calculated NLRI length: %s", nlri_length)
2109 # network layer reachability information
2110 nlri_offset = pa_offset + total_pa_length
2111 nlri_hex = msg[nlri_offset : nlri_offset + nlri_length]
2112 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
2113 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
2114 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
2115 for prefix in nlri_prefix_list:
2116 logger.debug("nlri_prefix_received: %s", prefix)
2118 self.updates_received += 1
2119 self.prefixes_introduced += len(nlri_prefix_list)
2120 self.prefixes_withdrawn += len(wdr_prefix_list)
2123 "Unexpeced message type 0x%s in 0x%s",
2124 binascii.b2a_hex(msg_type_hex),
2125 binascii.b2a_hex(msg),
2128 def wait_for_read(self):
2129 """Read message until timeout (next expected event).
2132 Used when no more updates has to be sent to avoid busy-wait.
2133 Currently it does not return anything.
2135 # Compute time to the first predictable state change
2136 event_time = self.timer.get_next_event_time()
2137 # snapshot_time would be imprecise
2138 wait_timedelta = min(event_time - time.time(), self.wfr)
2139 if wait_timedelta < 0:
2140 # The program got around to waiting to an event in "very near
2141 # future" so late that it became a "past" event, thus tell
2142 # "select" to not wait at all. Passing negative timedelta to
2143 # select() would lead to either waiting forever (for -1) or
2144 # select.error("Invalid parameter") (for everything else).
2146 # And wait for event or something to read.
2148 if not self.rx_activity_detected or not (self.updates_received % 100):
2149 # right time to write statistics to the log (not for every update and
2150 # not too frequently to avoid having large log files)
2152 "total_received_update_message_counter: %s", self.updates_received
2155 "total_received_nlri_prefix_counter: %s", self.prefixes_introduced
2158 "total_received_withdrawn_prefix_counter: %s", self.prefixes_withdrawn
2161 start_time = time.time()
2162 select.select([self.socket], [], [self.socket], wait_timedelta)
2163 timedelta = time.time() - start_time
2164 self.rx_idle_time += timedelta
2165 self.rx_activity_detected = timedelta < 1
2167 if not self.rx_activity_detected or not (self.updates_received % 100):
2168 # right time to write statistics to the log (not for every update and
2169 # not too frequently to avoid having large log files)
2170 logger.info("... idle for %.3fs", timedelta)
2171 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
2175 class WriteTracker(object):
2176 """Class tracking enqueueing messages and sending chunks of them."""
2178 def __init__(self, bgp_socket, generator, timer):
2179 """The writter initialisation.
2182 bgp_socket: socket to be used for sending
2183 generator: generator to be used for message generation
2184 timer: timer to be used for scheduling
2186 # References to outside objects,
2187 self.socket = bgp_socket
2188 self.generator = generator
2190 # Really new fields.
2191 # TODO: Would attribute docstrings add anything substantial?
2192 self.sending_message = False
2193 self.bytes_to_send = 0
2196 def enqueue_message_for_sending(self, message):
2197 """Enqueue message and change state.
2200 message: message to be enqueued into the msg_out buffer
2202 self.msg_out += message
2203 self.bytes_to_send += len(message)
2204 self.sending_message = True
2206 def send_message_chunk_is_whole(self):
2207 """Send enqueued data from msg_out buffer
2210 :return: true if no remaining data to send
2212 # We assume there is a msg_out to send and socket is writable.
2213 self.timer.snapshot()
2214 bytes_sent = self.socket.send(self.msg_out)
2215 # Forget the part of message that was sent.
2216 self.msg_out = self.msg_out[bytes_sent:]
2217 self.bytes_to_send -= bytes_sent
2218 if not self.bytes_to_send:
2219 # TODO: Is it possible to hit negative bytes_to_send?
2220 self.sending_message = False
2221 # We should have reset hold timer on peer side.
2222 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
2223 # The possible reason for not prioritizing reads is gone.
2228 class StateTracker(object):
2229 """Main loop has state so complex it warrants this separate class."""
2231 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
2232 """The state tracker initialisation.
2235 bgp_socket: socket to be used for sending / receiving
2236 generator: generator to be used for message generation
2237 timer: timer to be used for scheduling
2238 inqueue: user initiated messages queue
2239 storage: thread safe dict to store data for the rpc server
2240 cliargs: cli args from the user
2242 # References to outside objects.
2243 self.socket = bgp_socket
2244 self.generator = generator
2247 self.reader = ReadTracker(
2253 l3vpn_mcast=cliargs.l3vpn_mcast,
2254 l3vpn=cliargs.l3vpn,
2256 rt_constrain=cliargs.rt_constrain,
2258 grace=cliargs.grace,
2259 wait_for_read=cliargs.wfr,
2261 self.writer = WriteTracker(bgp_socket, generator, timer)
2262 # Prioritization state.
2263 self.prioritize_writing = False
2264 # In general, we prioritize reading over writing. But in order
2265 # not to get blocked by neverending reads, we should
2266 # check whether we are not risking running out of holdtime.
2267 # So in some situations, this field is set to True to attempt
2268 # finishing sending a message, after which this field resets
2270 # TODO: Alternative is to switch fairly between reading and
2271 # writing (called round robin from now on).
2272 # Message counting is done in generator.
2273 self.inqueue = inqueue
2275 def perform_one_loop_iteration(self):
2276 """The main loop iteration
2279 Calculates priority, resolves all conditions, calls
2280 appropriate method and returns to caller to repeat.
2282 self.timer.snapshot()
2283 if not self.prioritize_writing:
2284 if self.timer.is_time_for_my_keepalive():
2285 if not self.writer.sending_message:
2286 # We need to schedule a keepalive ASAP.
2287 self.writer.enqueue_message_for_sending(
2288 self.generator.keepalive_message()
2290 logger.info("KEEP ALIVE is sent.")
2291 # We are sending a message now, so let's prioritize it.
2292 self.prioritize_writing = True
2295 msg = self.inqueue.get_nowait()
2296 logger.info("Received message: {}".format(msg))
2297 msgbin = binascii.unhexlify(msg)
2298 self.writer.enqueue_message_for_sending(msgbin)
2301 # Now we know what our priorities are, we have to check
2302 # which actions are available.
2303 # socket.socket() returns three lists,
2304 # we store them to list of lists.
2305 list_list = select.select(
2306 [self.socket], [self.socket], [self.socket], self.timer.report_timedelta
2308 read_list, write_list, except_list = list_list
2309 # Lists are unpacked, each is either [] or [self.socket],
2310 # so we will test them as boolean.
2312 logger.error("Exceptional state on the socket.")
2313 raise RuntimeError("Exceptional state on socket", self.socket)
2314 # We will do either read or write.
2315 if not (self.prioritize_writing and write_list):
2316 # Either we have no reason to rush writes,
2317 # or the socket is not writable.
2318 # We are focusing on reading here.
2319 if read_list: # there is something to read indeed
2320 # In this case we want to read chunk of message
2321 # and repeat the select,
2322 self.reader.read_message_chunk()
2324 # We were focusing on reading, but nothing to read was there.
2325 # Good time to check peer for hold timer.
2326 self.timer.check_peer_hold_time(self.timer.snapshot_time)
2327 # Quiet on the read front, we can have attempt to write.
2329 # Either we really want to reset peer's view of our hold
2330 # timer, or there was nothing to read.
2331 # Were we in the middle of sending a message?
2332 if self.writer.sending_message:
2333 # Was it the end of a message?
2334 whole = self.writer.send_message_chunk_is_whole()
2335 # We were pressed to send something and we did it.
2336 if self.prioritize_writing and whole:
2337 # We prioritize reading again.
2338 self.prioritize_writing = False
2340 # Finally to check if still update messages to be generated.
2341 if self.generator.remaining_prefixes:
2342 msg_out = self.generator.compose_update_message()
2343 if not self.generator.remaining_prefixes:
2344 # We have just finished update generation,
2345 # end-of-rib is due.
2346 logger.info("All update messages generated.")
2347 logger.info("Storing performance results.")
2348 self.generator.store_results()
2349 logger.info("Finally an END-OF-RIB is sent.")
2350 msg_out += self.generator.update_message(
2351 wr_prefixes=[], nlri_prefixes=[], end_of_rib=True
2353 self.writer.enqueue_message_for_sending(msg_out)
2354 # Attempt for real sending to be done in next iteration.
2356 # Nothing to write anymore.
2357 # To avoid busy loop, we do idle waiting here.
2358 self.reader.wait_for_read()
2360 # We can neither read nor write.
2362 "Input and output both blocked for "
2363 + str(self.timer.report_timedelta)
2366 # FIXME: Are we sure select has been really waiting
2371 def create_logger(loglevel, logfile):
2372 """Create logger object
2375 :loglevel: log level
2376 :logfile: log file name
2378 :return: logger object
2380 logger = logging.getLogger("logger")
2381 log_formatter = logging.Formatter(
2382 "%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s"
2384 console_handler = logging.StreamHandler()
2385 file_handler = logging.FileHandler(logfile, mode="w")
2386 console_handler.setFormatter(log_formatter)
2387 file_handler.setFormatter(log_formatter)
2388 logger.addHandler(console_handler)
2389 logger.addHandler(file_handler)
2390 logger.setLevel(loglevel)
2394 def job(arguments, inqueue, storage):
2395 """One time initialisation and iterations looping.
2397 Establish BGP connection and run iterations.
2400 :arguments: Command line arguments
2401 :inqueue: Data to be sent from play.py
2402 :storage: Shared dict for rpc server
2406 bgp_socket = establish_connection(arguments)
2407 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2408 # Receive open message before sending anything.
2409 # FIXME: Add parameter to send default open message first,
2410 # to work with "you first" peers.
2411 msg_in = read_open_message(bgp_socket)
2412 logger.info(binascii.hexlify(msg_in).decode())
2413 storage["open"] = binascii.hexlify(msg_in).decode()
2414 timer = TimeTracker(msg_in)
2415 generator = MessageGenerator(arguments)
2416 msg_out = generator.open_message()
2417 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out).decode())
2418 # Send our open message to the peer.
2419 bgp_socket.send(msg_out)
2420 # Wait for confirming keepalive.
2421 # TODO: Surely in just one packet?
2422 # Using exact keepalive length to not to see possible updates.
2423 msg_in = bgp_socket.recv(19)
2424 if msg_in != generator.keepalive_message():
2425 error_msg = "Open not confirmed by keepalive, instead got"
2426 logger.error(error_msg + ": " + binascii.hexlify(msg_in).decode())
2427 raise MessageError(error_msg, msg_in)
2428 timer.reset_peer_hold_time()
2429 # Send the keepalive to indicate the connection is accepted.
2430 timer.snapshot() # Remember this time.
2431 msg_out = generator.keepalive_message()
2432 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out).decode())
2433 bgp_socket.send(msg_out)
2434 # Use the remembered time.
2435 timer.reset_my_keepalive_time(timer.snapshot_time)
2436 # End of initial handshake phase.
2437 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2438 while True: # main reactor loop
2439 state.perform_one_loop_iteration()
2443 """Handler for SimpleXMLRPCServer"""
2445 def __init__(self, sendqueue, storage):
2449 :sendqueue: queue for data to be sent towards odl
2450 :storage: thread safe dict
2452 self.queue = sendqueue
2453 self.storage = storage
2455 def send(self, text):
2459 :text: hes string of the data to be sent
2461 self.queue.put(text)
2463 def get(self, text=""):
2464 """Reads data form the storage
2466 - returns stored data or an empty string, at the moment only
2470 :text: a key to the storage to get the data
2474 with self.storage as stor:
2475 return stor.get(text, "")
2477 def clean(self, text=""):
2478 """Cleans data form the storage
2481 :text: a key to the storage to clean the data
2483 with self.storage as stor:
2488 def threaded_job(arguments):
2489 """Run the job threaded
2492 :arguments: Command line arguments
2496 amount_left = arguments.amount
2497 utils_left = arguments.multiplicity
2498 prefix_current = arguments.firstprefix
2499 myip_current = arguments.myip
2500 port = arguments.port
2502 rpcqueue = queue.Queue()
2503 storage = SafeDict()
2506 amount_per_util = int((amount_left - 1) / utils_left) + 1 # round up
2507 amount_left -= amount_per_util
2510 args = deepcopy(arguments)
2511 args.amount = amount_per_util
2512 args.firstprefix = prefix_current
2513 args.myip = myip_current
2514 thread_args.append(args)
2518 prefix_current += amount_per_util * 16
2523 for t in thread_args:
2525 target=job, args=(t, rpcqueue, storage), daemon=True
2528 print("Error: unable to start thread.")
2531 if arguments.usepeerip:
2532 ip = arguments.peerip
2535 rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2536 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2537 rpcserver.serve_forever()
2540 if __name__ == "__main__":
2541 arguments = parse_arguments()
2542 logger = create_logger(arguments.loglevel, arguments.logfile)
2543 threaded_job(arguments)