1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 """Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, *p_arg, **n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means " "infinite" ")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Rpc server port."
69 parser.add_argument("--port", default="8000", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be announced in one iteration"
71 parser.add_argument("--insert", default="1", type=int, help=str_help)
72 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74 str_help = "The number of prefixes to process without withdrawals"
75 parser.add_argument("--prefill", default="0", type=int, help=str_help)
76 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
78 "--updates", choices=["single", "separate"], default=["separate"], help=str_help
80 str_help = "Base prefix IP address for prefix generation"
82 "--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help
84 str_help = "The prefix length."
85 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
86 str_help = "Listen for connection, instead of initiating it."
87 parser.add_argument("--listen", action="store_true", help=str_help)
89 "Numeric IP Address to bind to and derive BGP ID from."
90 + "Default value only suitable for listening."
93 "--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help
96 "TCP port to bind to when listening or initiating connection."
97 + "Default only suitable for initiating."
99 parser.add_argument("--myport", default="0", type=int, help=str_help)
100 str_help = "The IP of the next hop to be placed into the update messages."
104 type=ipaddr.IPv4Address,
108 str_help = "Identifier of the route originator."
112 type=ipaddr.IPv4Address,
116 str_help = "Cluster list item identifier."
120 type=ipaddr.IPv4Address,
125 "Numeric IP Address to try to connect to."
126 + "Currently no effect in listening mode."
129 "--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help
131 str_help = "TCP port to try to connect to. No effect in listening mode."
132 parser.add_argument("--peerport", default="179", type=int, help=str_help)
133 str_help = "Local hold time."
134 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
135 str_help = "Log level (--error, --warning, --info, --debug)"
139 action="store_const",
141 default=logging.INFO,
147 action="store_const",
148 const=logging.WARNING,
149 default=logging.INFO,
155 action="store_const",
157 default=logging.INFO,
163 action="store_const",
165 default=logging.INFO,
168 str_help = "Log file name"
169 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
170 str_help = "Trailing part of the csv result files for plotting purposes"
171 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
172 str_help = "Minimum number of updates to reach to include result into csv."
173 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
174 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
175 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
176 str_help = "Using peerip instead of myip for xmlrpc server"
178 "--usepeerip", default=False, action="store_true", help=str_help
180 str_help = "Link-State NLRI supported"
181 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
182 str_help = "Link-State NLRI: Identifier"
183 parser.add_argument("-lsid", default="1", type=int, help=str_help)
184 str_help = "Link-State NLRI: Tunnel ID"
185 parser.add_argument("-lstid", default="1", type=int, help=str_help)
186 str_help = "Link-State NLRI: LSP ID"
187 parser.add_argument("-lspid", default="1", type=int, help=str_help)
188 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
190 "--lstsaddr", default="1.2.3.4", type=ipaddr.IPv4Address, help=str_help
192 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
194 "--lsteaddr", default="5.6.7.8", type=ipaddr.IPv4Address, help=str_help
196 str_help = "Link-State NLRI: Identifier Step"
197 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
198 str_help = "Link-State NLRI: Tunnel ID Step"
199 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
200 str_help = "Link-State NLRI: LSP ID Step"
201 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
202 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
203 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
204 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
205 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
206 str_help = "How many play utilities are to be started."
207 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
208 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
209 Enabling this flag makes the script not decoding the update mesage, because of not\
210 supported decoding for these elements."
211 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
212 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
213 Enabling this flag makes the script not decoding the update mesage, because of not\
214 supported decoding for these elements."
215 parser.add_argument("--grace", default="8", type=int, help=str_help)
216 str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
217 IPV4-Unicast, IPV6-Unicast, BGP-LS\
218 Enabling this flag makes the script not decoding the update mesage, because of not\
219 supported decoding for these elements."
220 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
221 str_help = "Open message includes L3VPN-MULTICAST arguments.\
222 Enabling this flag makes the script not decoding the update mesage, because of not\
223 supported decoding for these elements."
225 "--l3vpn_mcast", default=False, action="store_true", help=str_help
228 "Open message includes L3VPN-UNICAST arguments, without message decoding."
230 parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
231 str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
233 "--rt_constrain", default=False, action="store_true", help=str_help
235 str_help = "Open message includes ipv6-unicast family, without message decoding."
236 parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
237 str_help = "Add all supported families without message decoding."
238 parser.add_argument("--allf", default=False, action="store_true", help=str_help)
239 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
240 str_help = "Skipping well known attributes for update message"
241 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
242 arguments = parser.parse_args()
243 if arguments.multiplicity < 1:
244 print("Multiplicity", arguments.multiplicity, "is not positive.")
246 # TODO: Are sanity checks (such as asnumber>=0) required?
250 def establish_connection(arguments):
251 """Establish connection to BGP peer.
254 :arguments: following command-line arguments are used
255 - arguments.myip: local IP address
256 - arguments.myport: local port
257 - arguments.peerip: remote IP address
258 - arguments.peerport: remote port
263 logger.info("Connecting in the listening mode.")
264 logger.debug("Local IP address: " + str(arguments.myip))
265 logger.debug("Local port: " + str(arguments.myport))
266 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
267 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
268 # bind need single tuple as argument
269 listening_socket.bind((str(arguments.myip), arguments.myport))
270 listening_socket.listen(1)
271 bgp_socket, _ = listening_socket.accept()
272 # TODO: Verify client IP is cotroller IP.
273 listening_socket.close()
275 logger.info("Connecting in the talking mode.")
276 logger.debug("Local IP address: " + str(arguments.myip))
277 logger.debug("Local port: " + str(arguments.myport))
278 logger.debug("Remote IP address: " + str(arguments.peerip))
279 logger.debug("Remote port: " + str(arguments.peerport))
280 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
281 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
282 # bind to force specified address and port
283 talking_socket.bind((str(arguments.myip), arguments.myport))
284 # socket does not spead ipaddr, hence str()
285 talking_socket.connect((str(arguments.peerip), arguments.peerport))
286 bgp_socket = talking_socket
287 logger.info("Connected to ODL.")
291 def get_short_int_from_message(message, offset=16):
292 """Extract 2-bytes number from provided message.
295 :message: given message
296 :offset: offset of the short_int inside the message
298 :return: required short_inf value.
300 default offset value is the BGP message size offset.
302 high_byte_int = ord(message[offset])
303 low_byte_int = ord(message[offset + 1])
304 short_int = high_byte_int * 256 + low_byte_int
308 def get_prefix_list_from_hex(prefixes_hex):
309 """Get decoded list of prefixes (rfc4271#section-4.3)
312 :prefixes_hex: list of prefixes to be decoded in hex
314 :return: list of prefixes in the form of ip address (X.X.X.X/X)
318 while offset < len(prefixes_hex):
319 prefix_bit_len_hex = prefixes_hex[offset]
320 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
321 prefix_len = ((prefix_bit_len - 1) / 8) + 1
322 prefix_hex = prefixes_hex[offset + 1 : offset + 1 + prefix_len]
323 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
324 offset += 1 + prefix_len
325 prefix_list.append(prefix + "/" + str(prefix_bit_len))
329 class MessageError(ValueError):
330 """Value error with logging optimized for hexlified messages."""
332 def __init__(self, text, message, *args):
335 Store and call super init for textual comment,
336 store raw message which caused it.
340 super(MessageError, self).__init__(text, message, *args)
343 """Generate human readable error message.
346 :return: human readable message as string
348 Use a placeholder string if the message is to be empty.
350 message = binascii.hexlify(self.msg)
352 message = "(empty message)"
353 return self.text + ": " + message
356 def read_open_message(bgp_socket):
357 """Receive peer's OPEN message
360 :bgp_socket: the socket to be read
362 :return: received OPEN message.
364 Performs just basic incomming message checks
366 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
367 # TODO: Can the incoming open message be split in more than one packet?
370 # 37 is minimal length of open message with 4-byte AS number.
372 "Message length (" + str(len(msg_in)) + ") is smaller than "
373 "minimal length of OPEN message with 4-byte AS number (37)"
375 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
376 raise MessageError(error_msg, msg_in)
377 # TODO: We could check BGP marker, but it is defined only later;
379 reported_length = get_short_int_from_message(msg_in)
380 if len(msg_in) != reported_length:
382 "Expected message length ("
384 + ") does not match actual length ("
388 logger.error(error_msg + binascii.hexlify(msg_in))
389 raise MessageError(error_msg, msg_in)
390 logger.info("Open message received.")
394 class MessageGenerator(object):
395 """Class which generates messages, holds states and configuration values."""
397 # TODO: Define bgp marker as a class (constant) variable.
398 def __init__(self, args):
399 """Initialisation according to command-line args.
402 :args: argsparser's Namespace object which contains command-line
403 options for MesageGenerator initialisation
405 Calculates and stores default values used later on for
408 self.total_prefix_amount = args.amount
409 # Number of update messages left to be sent.
410 self.remaining_prefixes = self.total_prefix_amount
412 # New parameters initialisation
413 self.port = args.port
415 self.prefix_base_default = args.firstprefix
416 self.prefix_length_default = args.prefixlen
417 self.wr_prefixes_default = []
418 self.nlri_prefixes_default = []
419 self.version_default = 4
420 self.my_autonomous_system_default = args.asnumber
421 self.hold_time_default = args.holdtime # Local hold time.
422 self.bgp_identifier_default = int(args.myip)
423 self.next_hop_default = args.nexthop
424 self.originator_id_default = args.originator
425 self.cluster_list_item_default = args.cluster
426 self.single_update_default = args.updates == "single"
427 self.randomize_updates_default = args.updates == "random"
428 self.prefix_count_to_add_default = args.insert
429 self.prefix_count_to_del_default = args.withdraw
430 if self.prefix_count_to_del_default < 0:
431 self.prefix_count_to_del_default = 0
432 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
433 # total number of prefixes must grow to avoid infinite test loop
434 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
435 self.slot_size_default = self.prefix_count_to_add_default
436 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
437 self.results_file_name_default = args.results
438 self.performance_threshold_default = args.threshold
439 self.rfc4760 = args.rfc4760
440 self.bgpls = args.bgpls
441 self.evpn = args.evpn
442 self.mvpn = args.mvpn
443 self.l3vpn_mcast = args.l3vpn_mcast
444 self.l3vpn = args.l3vpn
445 self.rt_constrain = args.rt_constrain
446 self.ipv6 = args.ipv6
447 self.allf = args.allf
448 self.skipattr = args.skipattr
449 self.grace = args.grace
450 # Default values when BGP-LS Attributes are used
452 self.prefix_count_to_add_default = 1
453 self.prefix_count_to_del_default = 0
454 self.ls_nlri_default = {
455 "Identifier": args.lsid,
456 "TunnelID": args.lstid,
458 "IPv4TunnelSenderAddress": args.lstsaddr,
459 "IPv4TunnelEndPointAddress": args.lsteaddr,
461 self.lsid_step = args.lsidstep
462 self.lstid_step = args.lstidstep
463 self.lspid_step = args.lspidstep
464 self.lstsaddr_step = args.lstsaddrstep
465 self.lsteaddr_step = args.lsteaddrstep
466 # Default values used for randomized part
468 self.total_prefix_amount - self.remaining_prefixes_threshold - 1
469 ) / self.prefix_count_to_add_default + 1
470 s2_slots = (self.remaining_prefixes_threshold - 1) / (
471 self.prefix_count_to_add_default - self.prefix_count_to_del_default
474 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
475 s2_first_index = s1_slots * self.prefix_count_to_add_default
479 * (self.prefix_count_to_add_default - self.prefix_count_to_del_default)
482 self.slot_gap_default = (
483 self.total_prefix_amount - self.remaining_prefixes_threshold - 1
484 ) / self.prefix_count_to_add_default + 1
485 self.randomize_lowest_default = s2_first_index
486 self.randomize_highest_default = s2_last_index
487 # Initialising counters
488 self.phase1_start_time = 0
489 self.phase1_stop_time = 0
490 self.phase2_start_time = 0
491 self.phase2_stop_time = 0
492 self.phase1_updates_sent = 0
493 self.phase2_updates_sent = 0
494 self.updates_sent = 0
496 self.log_info = args.loglevel <= logging.INFO
497 self.log_debug = args.loglevel <= logging.DEBUG
499 Flags needed for the MessageGenerator performance optimization.
500 Calling logger methods each iteration even with proper log level set
501 slows down significantly the MessageGenerator performance.
502 Measured total generation time (1M updates, dry run, error log level):
503 - logging based on basic logger features: 36,2s
504 - logging based on advanced logger features (lazy logging): 21,2s
505 - conditional calling of logger methods enclosed inside condition: 8,6s
508 logger.info("Generator initialisation")
510 " Target total number of prefixes to be introduced: "
511 + str(self.total_prefix_amount)
515 + str(self.prefix_base_default)
517 + str(self.prefix_length_default)
520 " My Autonomous System number: " + str(self.my_autonomous_system_default)
522 logger.info(" My Hold Time: " + str(self.hold_time_default))
523 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
524 logger.info(" Next Hop: " + str(self.next_hop_default))
525 logger.info(" Originator ID: " + str(self.originator_id_default))
526 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
528 " Prefix count to be inserted at once: "
529 + str(self.prefix_count_to_add_default)
532 " Prefix count to be withdrawn at once: "
533 + str(self.prefix_count_to_del_default)
536 " Fast pre-fill up to "
537 + str(self.total_prefix_amount - self.remaining_prefixes_threshold)
541 " Remaining number of prefixes to be processed "
542 + "in parallel with withdrawals: "
543 + str(self.remaining_prefixes_threshold)
546 " Prefix index range used after pre-fill procedure ["
547 + str(self.randomize_lowest_default)
549 + str(self.randomize_highest_default)
552 if self.single_update_default:
554 " Common single UPDATE will be generated "
555 + "for both NLRI & WITHDRAWN lists"
559 " Two separate UPDATEs will be generated "
560 + "for each NLRI & WITHDRAWN lists"
562 if self.randomize_updates_default:
563 logger.info(" Generation of UPDATE messages will be randomized")
564 logger.info(" Let's go ...\n")
566 # TODO: Notification for hold timer expiration can be handy.
568 def store_results(self, file_name=None, threshold=None):
569 """ Stores specified results into files based on file_name value.
572 :param file_name: Trailing (common) part of result file names
573 :param threshold: Minimum number of sent updates needed for each
574 result to be included into result csv file
575 (mainly needed because of the result accuracy)
579 # default values handling
580 # TODO optimize default values handling (use e.g. dicionary.update() approach)
581 if file_name is None:
582 file_name = self.results_file_name_default
583 if threshold is None:
584 threshold = self.performance_threshold_default
585 # performance calculation
586 if self.phase1_updates_sent >= threshold:
587 totals1 = self.phase1_updates_sent
589 self.phase1_updates_sent
590 / (self.phase1_stop_time - self.phase1_start_time)
595 if self.phase2_updates_sent >= threshold:
596 totals2 = self.phase2_updates_sent
598 self.phase2_updates_sent
599 / (self.phase2_stop_time - self.phase2_start_time)
605 logger.info("#" * 10 + " Final results " + "#" * 10)
606 logger.info("Number of iterations: " + str(self.iteration))
608 "Number of UPDATE messages sent in the pre-fill phase: "
609 + str(self.phase1_updates_sent)
612 "The pre-fill phase duration: "
613 + str(self.phase1_stop_time - self.phase1_start_time)
617 "Number of UPDATE messages sent in the 2nd test phase: "
618 + str(self.phase2_updates_sent)
621 "The 2nd test phase duration: "
622 + str(self.phase2_stop_time - self.phase2_start_time)
625 logger.info("Threshold for performance reporting: " + str(threshold))
629 "pre-fill " + str(self.prefix_count_to_add_default) + " route(s) per UPDATE"
631 if self.single_update_default:
632 phase2_label = "+" + (
633 str(self.prefix_count_to_add_default)
635 + str(self.prefix_count_to_del_default)
636 + " routes per UPDATE"
639 phase2_label = "+" + (
640 str(self.prefix_count_to_add_default)
642 + str(self.prefix_count_to_del_default)
643 + " routes in two UPDATEs"
645 # collecting capacity and performance results
648 if totals1 is not None:
649 totals[phase1_label] = totals1
650 performance[phase1_label] = performance1
651 if totals2 is not None:
652 totals[phase2_label] = totals2
653 performance[phase2_label] = performance2
654 self.write_results_to_file(totals, "totals-" + file_name)
655 self.write_results_to_file(performance, "performance-" + file_name)
657 def write_results_to_file(self, results, file_name):
658 """Writes results to the csv plot file consumable by Jenkins.
661 :param file_name: Name of the (csv) file to be created
667 f = open(file_name, "wt")
669 for key in sorted(results):
670 first_line += key + ", "
671 second_line += str(results[key]) + ", "
672 first_line = first_line[:-2]
673 second_line = second_line[:-2]
674 f.write(first_line + "\n")
675 f.write(second_line + "\n")
677 "Message generator performance results stored in " + file_name + ":"
679 logger.info(" " + first_line)
680 logger.info(" " + second_line)
684 # Return pseudo-randomized (reproducible) index for selected range
685 def randomize_index(self, index, lowest=None, highest=None):
686 """Calculates pseudo-randomized index from selected range.
689 :param index: input index
690 :param lowest: the lowes index from the randomized area
691 :param highest: the highest index from the randomized area
693 :return: the (pseudo)randomized index
695 Created just as a fame for future generator enhancement.
697 # default values handling
698 # TODO optimize default values handling (use e.g. dicionary.update() approach)
700 lowest = self.randomize_lowest_default
702 highest = self.randomize_highest_default
704 if (index >= lowest) and (index <= highest):
705 # we are in the randomized range -> shuffle it inside
706 # the range (now just reverse the order)
707 new_index = highest - (index - lowest)
709 # we are out of the randomized range -> nothing to do
713 def get_ls_nlri_values(self, index):
714 """Generates LS-NLRI parameters.
715 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
718 :param index: index (iteration)
720 :return: dictionary of LS NLRI parameters and values
722 # generating list of LS NLRI parameters
723 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
724 ipv4_tunnel_sender_address = (
725 self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
727 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
728 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
729 ipv4_tunnel_endpoint_address = (
730 self.ls_nlri_default["IPv4TunnelEndPointAddress"]
731 + index / self.lsteaddr_step
734 "Identifier": identifier,
735 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
736 "TunnelID": tunnel_id,
738 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address,
740 return ls_nlri_values
751 """Generates list of IP address prefixes.
754 :param slot_index: index of group of prefix addresses
755 :param slot_size: size of group of prefix addresses
756 in [number of included prefixes]
757 :param prefix_base: IP address of the first prefix
758 (slot_index = 0, prefix_index = 0)
759 :param prefix_len: length of the prefix in bites
760 (the same as size of netmask)
761 :param prefix_count: number of prefixes to be returned
762 from the specified slot
764 :return: list of generated IP address prefixes
766 # default values handling
767 # TODO optimize default values handling (use e.g. dicionary.update() approach)
768 if slot_size is None:
769 slot_size = self.slot_size_default
770 if prefix_base is None:
771 prefix_base = self.prefix_base_default
772 if prefix_len is None:
773 prefix_len = self.prefix_length_default
774 if prefix_count is None:
775 prefix_count = slot_size
776 if randomize is None:
777 randomize = self.randomize_updates_default
778 # generating list of prefixes
781 prefix_gap = 2 ** (32 - prefix_len)
782 for i in range(prefix_count):
783 prefix_index = slot_index * slot_size + i
785 prefix_index = self.randomize_index(prefix_index)
786 indexes.append(prefix_index)
787 prefixes.append(prefix_base + prefix_index * prefix_gap)
789 logger.debug(" Prefix slot index: " + str(slot_index))
790 logger.debug(" Prefix slot size: " + str(slot_size))
791 logger.debug(" Prefix count: " + str(prefix_count))
792 logger.debug(" Prefix indexes: " + str(indexes))
793 logger.debug(" Prefix list: " + str(prefixes))
796 def compose_update_message(
797 self, prefix_count_to_add=None, prefix_count_to_del=None
799 """Composes an UPDATE message
802 :param prefix_count_to_add: # of prefixes to put into NLRI list
803 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
805 :return: encoded UPDATE message in HEX
807 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
808 lists or common message wich includes both prefix lists.
809 Updates global counters.
811 # default values handling
812 # TODO optimize default values handling (use e.g. dicionary.update() approach)
813 if prefix_count_to_add is None:
814 prefix_count_to_add = self.prefix_count_to_add_default
815 if prefix_count_to_del is None:
816 prefix_count_to_del = self.prefix_count_to_del_default
818 if self.log_info and not (self.iteration % 1000):
821 + str(self.iteration)
822 + " - total remaining prefixes: "
823 + str(self.remaining_prefixes)
827 "#" * 10 + " Iteration: " + str(self.iteration) + " " + "#" * 10
829 logger.debug("Remaining prefixes: " + str(self.remaining_prefixes))
830 # scenario type & one-shot counter
831 straightforward_scenario = (
832 self.remaining_prefixes > self.remaining_prefixes_threshold
834 if straightforward_scenario:
835 prefix_count_to_del = 0
837 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
838 if not self.phase1_start_time:
839 self.phase1_start_time = time.time()
842 logger.debug("--- COMBINED SCENARIO ---")
843 if not self.phase2_start_time:
844 self.phase2_start_time = time.time()
845 # tailor the number of prefixes if needed
846 prefix_count_to_add = prefix_count_to_del + min(
847 prefix_count_to_add - prefix_count_to_del, self.remaining_prefixes
849 # prefix slots selection for insertion and withdrawal
850 slot_index_to_add = self.iteration
851 slot_index_to_del = slot_index_to_add - self.slot_gap_default
852 # getting lists of prefixes for insertion in this iteration
854 logger.debug("Prefixes to be inserted in this iteration:")
855 prefix_list_to_add = self.get_prefix_list(
856 slot_index_to_add, prefix_count=prefix_count_to_add
858 # getting lists of prefixes for withdrawal in this iteration
860 logger.debug("Prefixes to be withdrawn in this iteration:")
861 prefix_list_to_del = self.get_prefix_list(
862 slot_index_to_del, prefix_count=prefix_count_to_del
864 # generating the UPDATE mesage with LS-NLRI only
866 ls_nlri = self.get_ls_nlri_values(self.iteration)
867 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[], **ls_nlri)
869 # generating the UPDATE message with prefix lists
870 if self.single_update_default:
871 # Send prefixes to be introduced and withdrawn
872 # in one UPDATE message
873 msg_out = self.update_message(
874 wr_prefixes=prefix_list_to_del, nlri_prefixes=prefix_list_to_add
877 # Send prefixes to be introduced and withdrawn
878 # in separate UPDATE messages (if needed)
879 msg_out = self.update_message(
880 wr_prefixes=[], nlri_prefixes=prefix_list_to_add
882 if prefix_count_to_del:
883 msg_out += self.update_message(
884 wr_prefixes=prefix_list_to_del, nlri_prefixes=[]
886 # updating counters - who knows ... maybe I am last time here ;)
887 if straightforward_scenario:
888 self.phase1_stop_time = time.time()
889 self.phase1_updates_sent = self.updates_sent
891 self.phase2_stop_time = time.time()
892 self.phase2_updates_sent = self.updates_sent - self.phase1_updates_sent
893 # updating totals for the next iteration
895 self.remaining_prefixes -= prefix_count_to_add - prefix_count_to_del
896 # returning the encoded message
899 # Section of message encoders
904 my_autonomous_system=None,
908 """Generates an OPEN Message (rfc4271#section-4.2)
911 :param version: see the rfc4271#section-4.2
912 :param my_autonomous_system: see the rfc4271#section-4.2
913 :param hold_time: see the rfc4271#section-4.2
914 :param bgp_identifier: see the rfc4271#section-4.2
916 :return: encoded OPEN message in HEX
919 # default values handling
920 # TODO optimize default values handling (use e.g. dicionary.update() approach)
922 version = self.version_default
923 if my_autonomous_system is None:
924 my_autonomous_system = self.my_autonomous_system_default
925 if hold_time is None:
926 hold_time = self.hold_time_default
927 if bgp_identifier is None:
928 bgp_identifier = self.bgp_identifier_default
931 marker_hex = "\xFF" * 16
935 type_hex = struct.pack("B", type)
938 version_hex = struct.pack("B", version)
940 # my_autonomous_system
941 # AS_TRANS value, 23456 decadic.
942 my_autonomous_system_2_bytes = 23456
943 # AS number is mappable to 2 bytes
944 if my_autonomous_system < 65536:
945 my_autonomous_system_2_bytes = my_autonomous_system
946 my_autonomous_system_hex_2_bytes = struct.pack(">H", my_autonomous_system)
949 hold_time_hex = struct.pack(">H", hold_time)
952 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
954 # Optional Parameters
955 optional_parameters_hex = ""
956 if self.rfc4760 or self.allf:
957 optional_parameter_hex = (
958 "\x02" # Param type ("Capability Ad")
959 "\x06" # Length (6 bytes)
960 "\x01" # Capability type (NLRI Unicast),
961 # see RFC 4760, secton 8
962 "\x04" # Capability value length
963 "\x00\x01" # AFI (Ipv4)
965 "\x01" # SAFI (Unicast)
967 optional_parameters_hex += optional_parameter_hex
969 if self.ipv6 or self.allf:
970 optional_parameter_hex = (
971 "\x02" # Param type ("Capability Ad")
972 "\x06" # Length (6 bytes)
973 "\x01" # Multiprotocol extetension capability,
974 "\x04" # Capability value length
975 "\x00\x02" # AFI (IPV6)
977 "\x01" # SAFI (UNICAST)
979 optional_parameters_hex += optional_parameter_hex
981 if self.bgpls or self.allf:
982 optional_parameter_hex = (
983 "\x02" # Param type ("Capability Ad")
984 "\x06" # Length (6 bytes)
985 "\x01" # Capability type (NLRI Unicast),
986 # see RFC 4760, secton 8
987 "\x04" # Capability value length
988 "\x40\x04" # AFI (BGP-LS)
990 "\x47" # SAFI (BGP-LS)
992 optional_parameters_hex += optional_parameter_hex
994 if self.evpn or self.allf:
995 optional_parameter_hex = (
996 "\x02" # Param type ("Capability Ad")
997 "\x06" # Length (6 bytes)
998 "\x01" # Multiprotocol extetension capability,
999 "\x04" # Capability value length
1000 "\x00\x19" # AFI (L2-VPN)
1002 "\x46" # SAFI (EVPN)
1004 optional_parameters_hex += optional_parameter_hex
1006 if self.mvpn or self.allf:
1007 optional_parameter_hex = (
1008 "\x02" # Param type ("Capability Ad")
1009 "\x06" # Length (6 bytes)
1010 "\x01" # Multiprotocol extetension capability,
1011 "\x04" # Capability value length
1012 "\x00\x01" # AFI (IPV4)
1014 "\x05" # SAFI (MCAST-VPN)
1016 optional_parameters_hex += optional_parameter_hex
1017 optional_parameter_hex = (
1018 "\x02" # Param type ("Capability Ad")
1019 "\x06" # Length (6 bytes)
1020 "\x01" # Multiprotocol extetension capability,
1021 "\x04" # Capability value length
1022 "\x00\x02" # AFI (IPV6)
1024 "\x05" # SAFI (MCAST-VPN)
1026 optional_parameters_hex += optional_parameter_hex
1028 if self.l3vpn_mcast or self.allf:
1029 optional_parameter_hex = (
1030 "\x02" # Param type ("Capability Ad")
1031 "\x06" # Length (6 bytes)
1032 "\x01" # Multiprotocol extetension capability,
1033 "\x04" # Capability value length
1034 "\x00\x01" # AFI (IPV4)
1036 "\x81" # SAFI (L3VPN-MCAST)
1038 optional_parameters_hex += optional_parameter_hex
1039 optional_parameter_hex = (
1040 "\x02" # Param type ("Capability Ad")
1041 "\x06" # Length (6 bytes)
1042 "\x01" # Multiprotocol extetension capability,
1043 "\x04" # Capability value length
1044 "\x00\x02" # AFI (IPV6)
1046 "\x81" # SAFI (L3VPN-MCAST)
1048 optional_parameters_hex += optional_parameter_hex
1050 if self.l3vpn or self.allf:
1051 optional_parameter_hex = (
1052 "\x02" # Param type ("Capability Ad")
1053 "\x06" # Length (6 bytes)
1054 "\x01" # Multiprotocol extetension capability,
1055 "\x04" # Capability value length
1056 "\x00\x01" # AFI (IPV4)
1058 "\x80" # SAFI (L3VPN-UNICAST)
1060 optional_parameters_hex += optional_parameter_hex
1061 optional_parameter_hex = (
1062 "\x02" # Param type ("Capability Ad")
1063 "\x06" # Length (6 bytes)
1064 "\x01" # Multiprotocol extetension capability,
1065 "\x04" # Capability value length
1066 "\x00\x02" # AFI (IPV6)
1068 "\x80" # SAFI (L3VPN-UNICAST)
1070 optional_parameters_hex += optional_parameter_hex
1072 if self.rt_constrain or self.allf:
1073 optional_parameter_hex = (
1074 "\x02" # Param type ("Capability Ad")
1075 "\x06" # Length (6 bytes)
1076 "\x01" # Multiprotocol extetension capability,
1077 "\x04" # Capability value length
1078 "\x00\x01" # AFI (IPV4)
1080 "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
1082 optional_parameters_hex += optional_parameter_hex
1084 optional_parameter_hex = (
1085 "\x02" # Param type ("Capability Ad")
1086 "\x06" # Length (6 bytes)
1087 "\x41" # "32 bit AS Numbers Support"
1088 # (see RFC 6793, section 3)
1089 "\x04" # Capability value length
1091 optional_parameter_hex += struct.pack(
1092 ">I", my_autonomous_system
1093 ) # My AS in 32 bit format
1094 optional_parameters_hex += optional_parameter_hex
1097 b = list(bin(self.grace)[2:])
1098 b = b + [0] * (3 - len(b))
1101 restart_flag = "\x80\x05"
1103 restart_flag = "\x00\x05"
1109 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
1113 logger.debug("Grace parameters list: {}".format(b))
1114 # "\x02" Param type ("Capability Ad")
1115 # :param length: Length of whole message
1116 # "\x40" Graceful-restart capability
1117 # "\x06" Length (6 bytes)
1118 # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
1119 # "\x05" Restart timer (5sec)
1120 # "\x00\x01" AFI (IPV4)
1121 # "\x01" SAFI (Unicast)
1122 # "\x00" Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
1123 # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
1124 # ll-gr turned on when grace is between 4-7
1125 optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
1126 length, restart_flag, ipv4_flag, ll_gr
1128 optional_parameters_hex += optional_parameter_hex
1130 # Optional Parameters Length
1131 optional_parameters_length = len(optional_parameters_hex)
1132 optional_parameters_length_hex = struct.pack("B", optional_parameters_length)
1134 # Length (big-endian)
1140 + len(my_autonomous_system_hex_2_bytes)
1141 + len(hold_time_hex)
1142 + len(bgp_identifier_hex)
1143 + len(optional_parameters_length_hex)
1144 + len(optional_parameters_hex)
1146 length_hex = struct.pack(">H", length)
1154 + my_autonomous_system_hex_2_bytes
1156 + bgp_identifier_hex
1157 + optional_parameters_length_hex
1158 + optional_parameters_hex
1162 logger.debug("OPEN message encoding")
1163 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1165 " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1168 " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1174 + binascii.hexlify(version_hex)
1178 " My Autonomous System="
1179 + str(my_autonomous_system_2_bytes)
1181 + binascii.hexlify(my_autonomous_system_hex_2_bytes)
1188 + binascii.hexlify(hold_time_hex)
1193 + str(bgp_identifier)
1195 + binascii.hexlify(bgp_identifier_hex)
1199 " Optional Parameters Length="
1200 + str(optional_parameters_length)
1202 + binascii.hexlify(optional_parameters_length_hex)
1206 " Optional Parameters=0x" + binascii.hexlify(optional_parameters_hex)
1208 logger.debug("OPEN message encoded: 0x%s", binascii.b2a_hex(message_hex))
1216 wr_prefix_length=None,
1217 nlri_prefix_length=None,
1218 my_autonomous_system=None,
1221 cluster_list_item=None,
1225 """Generates an UPDATE Message (rfc4271#section-4.3)
1228 :param wr_prefixes: see the rfc4271#section-4.3
1229 :param nlri_prefixes: see the rfc4271#section-4.3
1230 :param wr_prefix_length: see the rfc4271#section-4.3
1231 :param nlri_prefix_length: see the rfc4271#section-4.3
1232 :param my_autonomous_system: see the rfc4271#section-4.3
1233 :param next_hop: see the rfc4271#section-4.3
1235 :return: encoded UPDATE message in HEX
1238 # default values handling
1239 # TODO optimize default values handling (use e.g. dicionary.update() approach)
1240 if wr_prefixes is None:
1241 wr_prefixes = self.wr_prefixes_default
1242 if nlri_prefixes is None:
1243 nlri_prefixes = self.nlri_prefixes_default
1244 if wr_prefix_length is None:
1245 wr_prefix_length = self.prefix_length_default
1246 if nlri_prefix_length is None:
1247 nlri_prefix_length = self.prefix_length_default
1248 if my_autonomous_system is None:
1249 my_autonomous_system = self.my_autonomous_system_default
1250 if next_hop is None:
1251 next_hop = self.next_hop_default
1252 if originator_id is None:
1253 originator_id = self.originator_id_default
1254 if cluster_list_item is None:
1255 cluster_list_item = self.cluster_list_item_default
1256 ls_nlri = self.ls_nlri_default.copy()
1257 ls_nlri.update(ls_nlri_params)
1260 marker_hex = "\xFF" * 16
1264 type_hex = struct.pack("B", type)
1267 withdrawn_routes_hex = ""
1269 bytes = ((wr_prefix_length - 1) / 8) + 1
1270 for prefix in wr_prefixes:
1271 withdrawn_route_hex = (
1272 struct.pack("B", wr_prefix_length)
1273 + struct.pack(">I", int(prefix))[:bytes]
1275 withdrawn_routes_hex += withdrawn_route_hex
1277 # Withdrawn Routes Length
1278 withdrawn_routes_length = len(withdrawn_routes_hex)
1279 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1281 # TODO: to replace hardcoded string by encoding?
1283 path_attributes_hex = ""
1284 if not self.skipattr:
1285 path_attributes_hex += (
1286 "\x40" # Flags ("Well-Known")
1287 "\x01" # Type (ORIGIN)
1289 "\x00" # Origin: IGP
1291 path_attributes_hex += (
1292 "\x40" # Flags ("Well-Known")
1293 "\x02" # Type (AS_PATH)
1295 "\x02" # AS segment type (AS_SEQUENCE)
1296 "\x01" # AS segment length (1)
1298 my_as_hex = struct.pack(">I", my_autonomous_system)
1299 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1300 path_attributes_hex += (
1301 "\x40" # Flags ("Well-Known")
1302 "\x05" # Type (LOCAL_PREF)
1304 "\x00\x00\x00\x64" # (100)
1306 if nlri_prefixes != []:
1307 path_attributes_hex += (
1308 "\x40" # Flags ("Well-Known")
1309 "\x03" # Type (NEXT_HOP)
1312 next_hop_hex = struct.pack(">I", int(next_hop))
1313 path_attributes_hex += next_hop_hex # IP address of the next hop (4 bytes)
1314 if originator_id is not None:
1315 path_attributes_hex += (
1316 "\x80" # Flags ("Optional, non-transitive")
1317 "\x09" # Type (ORIGINATOR_ID)
1319 ) # ORIGINATOR_ID (4 bytes)
1320 path_attributes_hex += struct.pack(">I", int(originator_id))
1321 if cluster_list_item is not None:
1322 path_attributes_hex += (
1323 "\x80" # Flags ("Optional, non-transitive")
1324 "\x0a" # Type (CLUSTER_LIST)
1326 ) # one CLUSTER_LIST item (4 bytes)
1327 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1329 if self.bgpls and not end_of_rib:
1330 path_attributes_hex += (
1331 "\x80" # Flags ("Optional, non-transitive")
1332 "\x0e" # Type (MP_REACH_NLRI)
1333 "\x22" # Length (34)
1334 "\x40\x04" # AFI (BGP-LS)
1335 "\x47" # SAFI (BGP-LS)
1336 "\x04" # Next Hop Length (4)
1338 path_attributes_hex += struct.pack(">I", int(next_hop))
1339 path_attributes_hex += "\x00" # Reserved
1340 path_attributes_hex += (
1341 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1342 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1343 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1345 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1346 path_attributes_hex += struct.pack(
1347 ">I", int(ls_nlri["IPv4TunnelSenderAddress"])
1349 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1350 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1351 path_attributes_hex += struct.pack(
1352 ">I", int(ls_nlri["IPv4TunnelEndPointAddress"])
1355 # Total Path Attributes Length
1356 total_path_attributes_length = len(path_attributes_hex)
1357 total_path_attributes_length_hex = struct.pack(
1358 ">H", total_path_attributes_length
1361 # Network Layer Reachability Information
1364 bytes = ((nlri_prefix_length - 1) / 8) + 1
1365 for prefix in nlri_prefixes:
1367 struct.pack("B", nlri_prefix_length)
1368 + struct.pack(">I", int(prefix))[:bytes]
1370 nlri_hex += nlri_prefix_hex
1372 # Length (big-endian)
1377 + len(withdrawn_routes_length_hex)
1378 + len(withdrawn_routes_hex)
1379 + len(total_path_attributes_length_hex)
1380 + len(path_attributes_hex)
1383 length_hex = struct.pack(">H", length)
1390 + withdrawn_routes_length_hex
1391 + withdrawn_routes_hex
1392 + total_path_attributes_length_hex
1393 + path_attributes_hex
1397 if self.grace != 8 and self.grace != 0 and end_of_rib:
1398 message_hex = marker_hex + binascii.unhexlify("00170200000000")
1401 logger.debug("UPDATE message encoding")
1402 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1404 " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1407 " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1410 " withdrawn_routes_length="
1411 + str(withdrawn_routes_length)
1413 + binascii.hexlify(withdrawn_routes_length_hex)
1417 " Withdrawn_Routes="
1420 + str(wr_prefix_length)
1422 + binascii.hexlify(withdrawn_routes_hex)
1425 if total_path_attributes_length:
1427 " Total Path Attributes Length="
1428 + str(total_path_attributes_length)
1430 + binascii.hexlify(total_path_attributes_length_hex)
1436 + binascii.hexlify(path_attributes_hex)
1439 logger.debug(" Origin=IGP")
1440 logger.debug(" AS path=" + str(my_autonomous_system))
1441 logger.debug(" Next hop=" + str(next_hop))
1442 if originator_id is not None:
1443 logger.debug(" Originator id=" + str(originator_id))
1444 if cluster_list_item is not None:
1445 logger.debug(" Cluster list=" + str(cluster_list_item))
1447 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1449 " Network Layer Reachability Information="
1450 + str(nlri_prefixes)
1452 + str(nlri_prefix_length)
1454 + binascii.hexlify(nlri_hex)
1457 logger.debug("UPDATE message encoded: 0x" + binascii.b2a_hex(message_hex))
1460 self.updates_sent += 1
1461 # returning encoded message
1464 def notification_message(self, error_code, error_subcode, data_hex=""):
1465 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1468 :param error_code: see the rfc4271#section-4.5
1469 :param error_subcode: see the rfc4271#section-4.5
1470 :param data_hex: see the rfc4271#section-4.5
1472 :return: encoded NOTIFICATION message in HEX
1476 marker_hex = "\xFF" * 16
1480 type_hex = struct.pack("B", type)
1483 error_code_hex = struct.pack("B", error_code)
1486 error_subcode_hex = struct.pack("B", error_subcode)
1488 # Length (big-endian)
1493 + len(error_code_hex)
1494 + len(error_subcode_hex)
1497 length_hex = struct.pack(">H", length)
1499 # NOTIFICATION Message
1510 logger.debug("NOTIFICATION message encoding")
1511 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1513 " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1516 " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1522 + binascii.hexlify(error_code_hex)
1527 + str(error_subcode)
1529 + binascii.hexlify(error_subcode_hex)
1532 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1534 "NOTIFICATION message encoded: 0x%s", binascii.b2a_hex(message_hex)
1539 def keepalive_message(self):
1540 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1543 :return: encoded KEEP ALIVE message in HEX
1547 marker_hex = "\xFF" * 16
1551 type_hex = struct.pack("B", type)
1553 # Length (big-endian)
1554 length = len(marker_hex) + 2 + len(type_hex)
1555 length_hex = struct.pack(">H", length)
1557 # KEEP ALIVE Message
1558 message_hex = marker_hex + length_hex + type_hex
1561 logger.debug("KEEP ALIVE message encoding")
1562 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1564 " Length=" + str(length) + " (0x" + binascii.hexlify(length_hex) + ")"
1567 " Type=" + str(type) + " (0x" + binascii.hexlify(type_hex) + ")"
1570 "KEEP ALIVE message encoded: 0x%s", binascii.b2a_hex(message_hex)
1576 class TimeTracker(object):
1577 """Class for tracking timers, both for my keepalives and
1581 def __init__(self, msg_in):
1582 """Initialisation. based on defaults and OPEN message from peer.
1585 msg_in: the OPEN message received from peer.
1587 # Note: Relative time is always named timedelta, to stress that
1588 # the (non-delta) time is absolute.
1589 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1590 # Upper bound for being stuck in the same state, we should
1591 # at least report something before continuing.
1592 # Negotiate the hold timer by taking the smaller
1593 # of the 2 values (mine and the peer's).
1594 hold_timedelta = 180 # Not an attribute of self yet.
1595 # TODO: Make the default value configurable,
1596 # default value could mirror what peer said.
1597 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1598 if hold_timedelta > peer_hold_timedelta:
1599 hold_timedelta = peer_hold_timedelta
1600 if hold_timedelta != 0 and hold_timedelta < 3:
1601 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1602 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1603 self.hold_timedelta = hold_timedelta
1604 # If we do not hear from peer this long, we assume it has died.
1605 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1606 # Upper limit for duration between messages, to avoid being
1607 # declared to be dead.
1608 # The same as calling snapshot(), but also declares a field.
1609 self.snapshot_time = time.time()
1610 # Sometimes we need to store time. This is where to get
1611 # the value from afterwards. Time_keepalive may be too strict.
1612 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1613 # At this time point, peer will be declared dead.
1614 self.my_keepalive_time = None # to be set later
1615 # At this point, we should be sending keepalive message.
1618 """Store current time in instance data to use later."""
1619 # Read as time before something interesting was called.
1620 self.snapshot_time = time.time()
1622 def reset_peer_hold_time(self):
1623 """Move hold time to future as peer has just proven it still lives."""
1624 self.peer_hold_time = time.time() + self.hold_timedelta
1626 # Some methods could rely on self.snapshot_time, but it is better
1627 # to require user to provide it explicitly.
1628 def reset_my_keepalive_time(self, keepalive_time):
1629 """Calculate and set the next my KEEP ALIVE timeout time
1632 :keepalive_time: the initial value of the KEEP ALIVE timer
1634 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1636 def is_time_for_my_keepalive(self):
1637 """Check for my KEEP ALIVE timeout occurence"""
1638 if self.hold_timedelta == 0:
1640 return self.snapshot_time >= self.my_keepalive_time
1642 def get_next_event_time(self):
1643 """Set the time of the next expected or to be sent KEEP ALIVE"""
1644 if self.hold_timedelta == 0:
1645 return self.snapshot_time + 86400
1646 return min(self.my_keepalive_time, self.peer_hold_time)
1648 def check_peer_hold_time(self, snapshot_time):
1649 """Raise error if nothing was read from peer until specified time."""
1650 # Hold time = 0 means keepalive checking off.
1651 if self.hold_timedelta != 0:
1652 # time.time() may be too strict
1653 if snapshot_time > self.peer_hold_time:
1654 logger.error("Peer has overstepped the hold timer.")
1655 raise RuntimeError("Peer has overstepped the hold timer.")
1656 # TODO: Include hold_timedelta?
1657 # TODO: Add notification sending (attempt). That means
1658 # move to write tracker.
1661 class ReadTracker(object):
1662 """Class for tracking read of mesages chunk by chunk and
1681 """The reader initialisation.
1684 bgp_socket: socket to be used for sending
1685 timer: timer to be used for scheduling
1686 storage: thread safe dict
1687 evpn: flag that evpn functionality is tested
1688 mvpn: flag that mvpn functionality is tested
1689 grace: flag that grace-restart functionality is tested
1690 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1691 l3vpn: flag that l3vpn unicast functionality is tested
1692 rt_constrain: flag that rt-constrain functionality is tested
1693 allf: flag for all family testing.
1695 # References to outside objects.
1696 self.socket = bgp_socket
1698 # BGP marker length plus length field length.
1699 self.header_length = 18
1700 # TODO: make it class (constant) attribute
1701 # Computation of where next chunk ends depends on whether
1702 # we are beyond length field.
1703 self.reading_header = True
1704 # Countdown towards next size computation.
1705 self.bytes_to_read = self.header_length
1706 # Incremental buffer for message under read.
1708 # Initialising counters
1709 self.updates_received = 0
1710 self.prefixes_introduced = 0
1711 self.prefixes_withdrawn = 0
1712 self.rx_idle_time = 0
1713 self.rx_activity_detected = True
1714 self.storage = storage
1717 self.l3vpn_mcast = l3vpn_mcast
1719 self.rt_constrain = rt_constrain
1722 self.wfr = wait_for_read
1725 def read_message_chunk(self):
1726 """Read up to one message
1729 Currently it does not return anything.
1731 # TODO: We could return the whole message, currently not needed.
1732 # We assume the socket is readable.
1733 chunk_message = self.socket.recv(self.bytes_to_read)
1734 self.msg_in += chunk_message
1735 self.bytes_to_read -= len(chunk_message)
1736 # TODO: bytes_to_read < 0 is not possible, right?
1737 if not self.bytes_to_read:
1738 # Finished reading a logical block.
1739 if self.reading_header:
1740 # The logical block was a BGP header.
1741 # Now we know the size of the message.
1742 self.reading_header = False
1743 self.bytes_to_read = (
1744 get_short_int_from_message(self.msg_in) - self.header_length
1746 else: # We have finished reading the body of the message.
1747 # Peer has just proven it is still alive.
1748 self.timer.reset_peer_hold_time()
1749 # TODO: Do we want to count received messages?
1750 # This version ignores the received message.
1751 # TODO: Should we do validation and exit on anything
1752 # besides update or keepalive?
1753 # Prepare state for reading another message.
1754 message_type_hex = self.msg_in[self.header_length]
1755 if message_type_hex == "\x01":
1757 "OPEN message received: 0x%s", binascii.b2a_hex(self.msg_in)
1759 elif message_type_hex == "\x02":
1761 "UPDATE message received: 0x%s", binascii.b2a_hex(self.msg_in)
1763 self.decode_update_message(self.msg_in)
1764 elif message_type_hex == "\x03":
1766 "NOTIFICATION message received: 0x%s",
1767 binascii.b2a_hex(self.msg_in),
1769 elif message_type_hex == "\x04":
1771 "KEEP ALIVE message received: 0x%s",
1772 binascii.b2a_hex(self.msg_in),
1776 "Unexpected message received: 0x%s",
1777 binascii.b2a_hex(self.msg_in),
1780 self.reading_header = True
1781 self.bytes_to_read = self.header_length
1782 # We should not act upon peer_hold_time if we are reading
1783 # something right now.
1786 def decode_path_attributes(self, path_attributes_hex):
1787 """Decode the Path Attributes field (rfc4271#section-4.3)
1790 :path_attributes: path_attributes field to be decoded in hex
1794 hex_to_decode = path_attributes_hex
1796 while len(hex_to_decode):
1797 attr_flags_hex = hex_to_decode[0]
1798 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1799 # attr_optional_bit = attr_flags & 128
1800 # attr_transitive_bit = attr_flags & 64
1801 # attr_partial_bit = attr_flags & 32
1802 attr_extended_length_bit = attr_flags & 16
1804 attr_type_code_hex = hex_to_decode[1]
1805 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1807 if attr_extended_length_bit:
1808 attr_length_hex = hex_to_decode[2:4]
1809 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1810 attr_value_hex = hex_to_decode[4 : 4 + attr_length]
1811 hex_to_decode = hex_to_decode[4 + attr_length :]
1813 attr_length_hex = hex_to_decode[2]
1814 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1815 attr_value_hex = hex_to_decode[3 : 3 + attr_length]
1816 hex_to_decode = hex_to_decode[3 + attr_length :]
1818 if attr_type_code == 1:
1820 "Attribute type=1 (ORIGIN, flags:0x%s)",
1821 binascii.b2a_hex(attr_flags_hex),
1823 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1824 elif attr_type_code == 2:
1826 "Attribute type=2 (AS_PATH, flags:0x%s)",
1827 binascii.b2a_hex(attr_flags_hex),
1829 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1830 elif attr_type_code == 3:
1832 "Attribute type=3 (NEXT_HOP, flags:0x%s)",
1833 binascii.b2a_hex(attr_flags_hex),
1835 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1836 elif attr_type_code == 4:
1838 "Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1839 binascii.b2a_hex(attr_flags_hex),
1841 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1842 elif attr_type_code == 5:
1844 "Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1845 binascii.b2a_hex(attr_flags_hex),
1847 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1848 elif attr_type_code == 6:
1850 "Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1851 binascii.b2a_hex(attr_flags_hex),
1853 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1854 elif attr_type_code == 7:
1856 "Attribute type=7 (AGGREGATOR, flags:0x%s)",
1857 binascii.b2a_hex(attr_flags_hex),
1859 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1860 elif attr_type_code == 9: # rfc4456#section-8
1862 "Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1863 binascii.b2a_hex(attr_flags_hex),
1865 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1866 elif attr_type_code == 10: # rfc4456#section-8
1868 "Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1869 binascii.b2a_hex(attr_flags_hex),
1871 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1872 elif attr_type_code == 14: # rfc4760#section-3
1874 "Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1875 binascii.b2a_hex(attr_flags_hex),
1877 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1878 address_family_identifier_hex = attr_value_hex[0:2]
1880 " Address Family Identifier=0x%s",
1881 binascii.b2a_hex(address_family_identifier_hex),
1883 subsequent_address_family_identifier_hex = attr_value_hex[2]
1885 " Subsequent Address Family Identifier=0x%s",
1886 binascii.b2a_hex(subsequent_address_family_identifier_hex),
1888 next_hop_netaddr_len_hex = attr_value_hex[3]
1889 next_hop_netaddr_len = int(
1890 binascii.b2a_hex(next_hop_netaddr_len_hex), 16
1893 " Length of Next Hop Network Address=%s (0x%s)",
1894 next_hop_netaddr_len,
1895 binascii.b2a_hex(next_hop_netaddr_len_hex),
1897 next_hop_netaddr_hex = attr_value_hex[4 : 4 + next_hop_netaddr_len]
1898 next_hop_netaddr = ".".join(
1899 str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex)
1902 " Network Address of Next Hop=%s (0x%s)",
1904 binascii.b2a_hex(next_hop_netaddr_hex),
1906 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1907 logger.debug(" Reserved=0x%s", binascii.b2a_hex(reserved_hex))
1908 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1 :]
1910 " Network Layer Reachability Information=0x%s",
1911 binascii.b2a_hex(nlri_hex),
1913 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1914 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1915 for prefix in nlri_prefix_list:
1916 logger.debug(" nlri_prefix_received: %s", prefix)
1917 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1918 elif attr_type_code == 15: # rfc4760#section-4
1920 "Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1921 binascii.b2a_hex(attr_flags_hex),
1923 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1924 address_family_identifier_hex = attr_value_hex[0:2]
1926 " Address Family Identifier=0x%s",
1927 binascii.b2a_hex(address_family_identifier_hex),
1929 subsequent_address_family_identifier_hex = attr_value_hex[2]
1931 " Subsequent Address Family Identifier=0x%s",
1932 binascii.b2a_hex(subsequent_address_family_identifier_hex),
1934 wd_hex = attr_value_hex[3:]
1935 logger.debug(" Withdrawn Routes=0x%s", binascii.b2a_hex(wd_hex))
1936 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1937 logger.debug(" Withdrawn routes prefix list: %s", wdr_prefix_list)
1938 for prefix in wdr_prefix_list:
1939 logger.debug(" withdrawn_prefix_received: %s", prefix)
1940 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1943 "Unknown attribute type=%s, flags:0x%s)",
1945 binascii.b2a_hex(attr_flags_hex),
1948 "Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex)
1952 def decode_update_message(self, msg):
1953 """Decode an UPDATE message (rfc4271#section-4.3)
1956 :msg: message to be decoded in hex
1960 logger.debug("Decoding update message:")
1961 # message header - marker
1962 marker_hex = msg[:16]
1963 logger.debug("Message header marker: 0x%s", binascii.b2a_hex(marker_hex))
1964 # message header - message length
1965 msg_length_hex = msg[16:18]
1966 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1968 "Message lenght: 0x%s (%s)", binascii.b2a_hex(msg_length_hex), msg_length
1970 # message header - message type
1971 msg_type_hex = msg[18:19]
1972 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1974 with self.storage as stor:
1975 # this will replace the previously stored message
1976 stor["update"] = binascii.hexlify(msg)
1978 logger.debug("Evpn {}".format(self.evpn))
1980 logger.debug("Skipping update decoding due to evpn data expected")
1983 logger.debug("Graceful-restart {}".format(self.grace))
1986 "Skipping update decoding due to graceful-restart data expected"
1990 logger.debug("Mvpn {}".format(self.mvpn))
1992 logger.debug("Skipping update decoding due to mvpn data expected")
1995 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1996 if self.l3vpn_mcast:
1997 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
2000 logger.debug("L3vpn-unicast {}".format(self.l3vpn))
2001 if self.l3vpn_mcast:
2002 logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
2005 logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
2006 if self.rt_constrain:
2008 "Skipping update decoding due to Route-Target-Constrain data expected"
2012 logger.debug("Ipv6-Unicast {}".format(self.ipv6))
2014 logger.debug("Skipping update decoding due to Ipv6 data expected")
2017 logger.debug("Allf {}".format(self.allf))
2019 logger.debug("Skipping update decoding")
2023 logger.debug("Message type: 0x%s (update)", binascii.b2a_hex(msg_type_hex))
2024 # withdrawn routes length
2025 wdr_length_hex = msg[19:21]
2026 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
2028 "Withdrawn routes lenght: 0x%s (%s)",
2029 binascii.b2a_hex(wdr_length_hex),
2033 wdr_hex = msg[21 : 21 + wdr_length]
2034 logger.debug("Withdrawn routes: 0x%s", binascii.b2a_hex(wdr_hex))
2035 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
2036 logger.debug("Withdrawn routes prefix list: %s", wdr_prefix_list)
2037 for prefix in wdr_prefix_list:
2038 logger.debug("withdrawn_prefix_received: %s", prefix)
2039 # total path attribute length
2040 total_pa_length_offset = 21 + wdr_length
2041 total_pa_length_hex = msg[
2042 total_pa_length_offset : total_pa_length_offset + 2
2044 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
2046 "Total path attribute lenght: 0x%s (%s)",
2047 binascii.b2a_hex(total_pa_length_hex),
2051 pa_offset = total_pa_length_offset + 2
2052 pa_hex = msg[pa_offset : pa_offset + total_pa_length]
2053 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
2054 self.decode_path_attributes(pa_hex)
2055 # network layer reachability information length
2056 nlri_length = msg_length - 23 - total_pa_length - wdr_length
2057 logger.debug("Calculated NLRI length: %s", nlri_length)
2058 # network layer reachability information
2059 nlri_offset = pa_offset + total_pa_length
2060 nlri_hex = msg[nlri_offset : nlri_offset + nlri_length]
2061 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
2062 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
2063 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
2064 for prefix in nlri_prefix_list:
2065 logger.debug("nlri_prefix_received: %s", prefix)
2067 self.updates_received += 1
2068 self.prefixes_introduced += len(nlri_prefix_list)
2069 self.prefixes_withdrawn += len(wdr_prefix_list)
2072 "Unexpeced message type 0x%s in 0x%s",
2073 binascii.b2a_hex(msg_type_hex),
2074 binascii.b2a_hex(msg),
2077 def wait_for_read(self):
2078 """Read message until timeout (next expected event).
2081 Used when no more updates has to be sent to avoid busy-wait.
2082 Currently it does not return anything.
2084 # Compute time to the first predictable state change
2085 event_time = self.timer.get_next_event_time()
2086 # snapshot_time would be imprecise
2087 wait_timedelta = min(event_time - time.time(), self.wfr)
2088 if wait_timedelta < 0:
2089 # The program got around to waiting to an event in "very near
2090 # future" so late that it became a "past" event, thus tell
2091 # "select" to not wait at all. Passing negative timedelta to
2092 # select() would lead to either waiting forever (for -1) or
2093 # select.error("Invalid parameter") (for everything else).
2095 # And wait for event or something to read.
2097 if not self.rx_activity_detected or not (self.updates_received % 100):
2098 # right time to write statistics to the log (not for every update and
2099 # not too frequently to avoid having large log files)
2101 "total_received_update_message_counter: %s", self.updates_received
2104 "total_received_nlri_prefix_counter: %s", self.prefixes_introduced
2107 "total_received_withdrawn_prefix_counter: %s", self.prefixes_withdrawn
2110 start_time = time.time()
2111 select.select([self.socket], [], [self.socket], wait_timedelta)
2112 timedelta = time.time() - start_time
2113 self.rx_idle_time += timedelta
2114 self.rx_activity_detected = timedelta < 1
2116 if not self.rx_activity_detected or not (self.updates_received % 100):
2117 # right time to write statistics to the log (not for every update and
2118 # not too frequently to avoid having large log files)
2119 logger.info("... idle for %.3fs", timedelta)
2120 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
2124 class WriteTracker(object):
2125 """Class tracking enqueueing messages and sending chunks of them."""
2127 def __init__(self, bgp_socket, generator, timer):
2128 """The writter initialisation.
2131 bgp_socket: socket to be used for sending
2132 generator: generator to be used for message generation
2133 timer: timer to be used for scheduling
2135 # References to outside objects,
2136 self.socket = bgp_socket
2137 self.generator = generator
2139 # Really new fields.
2140 # TODO: Would attribute docstrings add anything substantial?
2141 self.sending_message = False
2142 self.bytes_to_send = 0
2145 def enqueue_message_for_sending(self, message):
2146 """Enqueue message and change state.
2149 message: message to be enqueued into the msg_out buffer
2151 self.msg_out += message
2152 self.bytes_to_send += len(message)
2153 self.sending_message = True
2155 def send_message_chunk_is_whole(self):
2156 """Send enqueued data from msg_out buffer
2159 :return: true if no remaining data to send
2161 # We assume there is a msg_out to send and socket is writable.
2162 self.timer.snapshot()
2163 bytes_sent = self.socket.send(self.msg_out)
2164 # Forget the part of message that was sent.
2165 self.msg_out = self.msg_out[bytes_sent:]
2166 self.bytes_to_send -= bytes_sent
2167 if not self.bytes_to_send:
2168 # TODO: Is it possible to hit negative bytes_to_send?
2169 self.sending_message = False
2170 # We should have reset hold timer on peer side.
2171 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
2172 # The possible reason for not prioritizing reads is gone.
2177 class StateTracker(object):
2178 """Main loop has state so complex it warrants this separate class."""
2180 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
2181 """The state tracker initialisation.
2184 bgp_socket: socket to be used for sending / receiving
2185 generator: generator to be used for message generation
2186 timer: timer to be used for scheduling
2187 inqueue: user initiated messages queue
2188 storage: thread safe dict to store data for the rpc server
2189 cliargs: cli args from the user
2191 # References to outside objects.
2192 self.socket = bgp_socket
2193 self.generator = generator
2196 self.reader = ReadTracker(
2202 l3vpn_mcast=cliargs.l3vpn_mcast,
2203 l3vpn=cliargs.l3vpn,
2205 rt_constrain=cliargs.rt_constrain,
2207 grace=cliargs.grace,
2208 wait_for_read=cliargs.wfr,
2210 self.writer = WriteTracker(bgp_socket, generator, timer)
2211 # Prioritization state.
2212 self.prioritize_writing = False
2213 # In general, we prioritize reading over writing. But in order
2214 # not to get blocked by neverending reads, we should
2215 # check whether we are not risking running out of holdtime.
2216 # So in some situations, this field is set to True to attempt
2217 # finishing sending a message, after which this field resets
2219 # TODO: Alternative is to switch fairly between reading and
2220 # writing (called round robin from now on).
2221 # Message counting is done in generator.
2222 self.inqueue = inqueue
2224 def perform_one_loop_iteration(self):
2225 """ The main loop iteration
2228 Calculates priority, resolves all conditions, calls
2229 appropriate method and returns to caller to repeat.
2231 self.timer.snapshot()
2232 if not self.prioritize_writing:
2233 if self.timer.is_time_for_my_keepalive():
2234 if not self.writer.sending_message:
2235 # We need to schedule a keepalive ASAP.
2236 self.writer.enqueue_message_for_sending(
2237 self.generator.keepalive_message()
2239 logger.info("KEEP ALIVE is sent.")
2240 # We are sending a message now, so let's prioritize it.
2241 self.prioritize_writing = True
2244 msg = self.inqueue.get_nowait()
2245 logger.info("Received message: {}".format(msg))
2246 msgbin = binascii.unhexlify(msg)
2247 self.writer.enqueue_message_for_sending(msgbin)
2250 # Now we know what our priorities are, we have to check
2251 # which actions are available.
2252 # socket.socket() returns three lists,
2253 # we store them to list of lists.
2254 list_list = select.select(
2255 [self.socket], [self.socket], [self.socket], self.timer.report_timedelta
2257 read_list, write_list, except_list = list_list
2258 # Lists are unpacked, each is either [] or [self.socket],
2259 # so we will test them as boolean.
2261 logger.error("Exceptional state on the socket.")
2262 raise RuntimeError("Exceptional state on socket", self.socket)
2263 # We will do either read or write.
2264 if not (self.prioritize_writing and write_list):
2265 # Either we have no reason to rush writes,
2266 # or the socket is not writable.
2267 # We are focusing on reading here.
2268 if read_list: # there is something to read indeed
2269 # In this case we want to read chunk of message
2270 # and repeat the select,
2271 self.reader.read_message_chunk()
2273 # We were focusing on reading, but nothing to read was there.
2274 # Good time to check peer for hold timer.
2275 self.timer.check_peer_hold_time(self.timer.snapshot_time)
2276 # Quiet on the read front, we can have attempt to write.
2278 # Either we really want to reset peer's view of our hold
2279 # timer, or there was nothing to read.
2280 # Were we in the middle of sending a message?
2281 if self.writer.sending_message:
2282 # Was it the end of a message?
2283 whole = self.writer.send_message_chunk_is_whole()
2284 # We were pressed to send something and we did it.
2285 if self.prioritize_writing and whole:
2286 # We prioritize reading again.
2287 self.prioritize_writing = False
2289 # Finally to check if still update messages to be generated.
2290 if self.generator.remaining_prefixes:
2291 msg_out = self.generator.compose_update_message()
2292 if not self.generator.remaining_prefixes:
2293 # We have just finished update generation,
2294 # end-of-rib is due.
2295 logger.info("All update messages generated.")
2296 logger.info("Storing performance results.")
2297 self.generator.store_results()
2298 logger.info("Finally an END-OF-RIB is sent.")
2299 msg_out += self.generator.update_message(
2300 wr_prefixes=[], nlri_prefixes=[], end_of_rib=True
2302 self.writer.enqueue_message_for_sending(msg_out)
2303 # Attempt for real sending to be done in next iteration.
2305 # Nothing to write anymore.
2306 # To avoid busy loop, we do idle waiting here.
2307 self.reader.wait_for_read()
2309 # We can neither read nor write.
2311 "Input and output both blocked for "
2312 + str(self.timer.report_timedelta)
2315 # FIXME: Are we sure select has been really waiting
2320 def create_logger(loglevel, logfile):
2321 """Create logger object
2324 :loglevel: log level
2325 :logfile: log file name
2327 :return: logger object
2329 logger = logging.getLogger("logger")
2330 log_formatter = logging.Formatter(
2331 "%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s"
2333 console_handler = logging.StreamHandler()
2334 file_handler = logging.FileHandler(logfile, mode="w")
2335 console_handler.setFormatter(log_formatter)
2336 file_handler.setFormatter(log_formatter)
2337 logger.addHandler(console_handler)
2338 logger.addHandler(file_handler)
2339 logger.setLevel(loglevel)
2343 def job(arguments, inqueue, storage):
2344 """One time initialisation and iterations looping.
2346 Establish BGP connection and run iterations.
2349 :arguments: Command line arguments
2350 :inqueue: Data to be sent from play.py
2351 :storage: Shared dict for rpc server
2355 bgp_socket = establish_connection(arguments)
2356 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2357 # Receive open message before sending anything.
2358 # FIXME: Add parameter to send default open message first,
2359 # to work with "you first" peers.
2360 msg_in = read_open_message(bgp_socket)
2361 logger.info(binascii.hexlify(msg_in))
2362 storage["open"] = binascii.hexlify(msg_in)
2363 timer = TimeTracker(msg_in)
2364 generator = MessageGenerator(arguments)
2365 msg_out = generator.open_message()
2366 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2367 # Send our open message to the peer.
2368 bgp_socket.send(msg_out)
2369 # Wait for confirming keepalive.
2370 # TODO: Surely in just one packet?
2371 # Using exact keepalive length to not to see possible updates.
2372 msg_in = bgp_socket.recv(19)
2373 if msg_in != generator.keepalive_message():
2374 error_msg = "Open not confirmed by keepalive, instead got"
2375 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2376 raise MessageError(error_msg, msg_in)
2377 timer.reset_peer_hold_time()
2378 # Send the keepalive to indicate the connection is accepted.
2379 timer.snapshot() # Remember this time.
2380 msg_out = generator.keepalive_message()
2381 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2382 bgp_socket.send(msg_out)
2383 # Use the remembered time.
2384 timer.reset_my_keepalive_time(timer.snapshot_time)
2385 # End of initial handshake phase.
2386 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2387 while True: # main reactor loop
2388 state.perform_one_loop_iteration()
2392 """Handler for SimpleXMLRPCServer"""
2394 def __init__(self, sendqueue, storage):
2398 :sendqueue: queue for data to be sent towards odl
2399 :storage: thread safe dict
2401 self.queue = sendqueue
2402 self.storage = storage
2404 def send(self, text):
2408 :text: hes string of the data to be sent
2410 self.queue.put(text)
2412 def get(self, text=""):
2413 """Reads data form the storage
2415 - returns stored data or an empty string, at the moment only
2419 :text: a key to the storage to get the data
2423 with self.storage as stor:
2424 return stor.get(text, "")
2426 def clean(self, text=""):
2427 """Cleans data form the storage
2430 :text: a key to the storage to clean the data
2432 with self.storage as stor:
2437 def threaded_job(arguments):
2438 """Run the job threaded
2441 :arguments: Command line arguments
2445 amount_left = arguments.amount
2446 utils_left = arguments.multiplicity
2447 prefix_current = arguments.firstprefix
2448 myip_current = arguments.myip
2449 port = arguments.port
2451 rpcqueue = Queue.Queue()
2452 storage = SafeDict()
2455 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
2456 amount_left -= amount_per_util
2459 args = deepcopy(arguments)
2460 args.amount = amount_per_util
2461 args.firstprefix = prefix_current
2462 args.myip = myip_current
2463 thread_args.append(args)
2467 prefix_current += amount_per_util * 16
2472 for t in thread_args:
2473 thread.start_new_thread(job, (t, rpcqueue, storage))
2475 print("Error: unable to start thread.")
2478 if arguments.usepeerip:
2479 ip = arguments.peerip
2482 rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2483 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2484 rpcserver.serve_forever()
2487 if __name__ == "__main__":
2488 arguments = parse_arguments()
2489 logger = create_logger(arguments.loglevel, arguments.logfile)
2490 threaded_job(arguments)