1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Rpc server port."
69 parser.add_argument("--port", default="8000", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be announced in one iteration"
71 parser.add_argument("--insert", default="1", type=int, help=str_help)
72 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74 str_help = "The number of prefixes to process without withdrawals"
75 parser.add_argument("--prefill", default="0", type=int, help=str_help)
76 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77 parser.add_argument("--updates", choices=["single", "separate"],
78 default=["separate"], help=str_help)
79 str_help = "Base prefix IP address for prefix generation"
80 parser.add_argument("--firstprefix", default="8.0.1.0",
81 type=ipaddr.IPv4Address, help=str_help)
82 str_help = "The prefix length."
83 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84 str_help = "Listen for connection, instead of initiating it."
85 parser.add_argument("--listen", action="store_true", help=str_help)
86 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87 "Default value only suitable for listening.")
88 parser.add_argument("--myip", default="0.0.0.0",
89 type=ipaddr.IPv4Address, help=str_help)
90 str_help = ("TCP port to bind to when listening or initiating connection." +
91 "Default only suitable for initiating.")
92 parser.add_argument("--myport", default="0", type=int, help=str_help)
93 str_help = "The IP of the next hop to be placed into the update messages."
94 parser.add_argument("--nexthop", default="192.0.2.1",
95 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96 str_help = "Identifier of the route originator."
97 parser.add_argument("--originator", default=None,
98 type=ipaddr.IPv4Address, dest="originator", help=str_help)
99 str_help = "Cluster list item identifier."
100 parser.add_argument("--cluster", default=None,
101 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102 str_help = ("Numeric IP Address to try to connect to." +
103 "Currently no effect in listening mode.")
104 parser.add_argument("--peerip", default="127.0.0.2",
105 type=ipaddr.IPv4Address, help=str_help)
106 str_help = "TCP port to try to connect to. No effect in listening mode."
107 parser.add_argument("--peerport", default="179", type=int, help=str_help)
108 str_help = "Local hold time."
109 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110 str_help = "Log level (--error, --warning, --info, --debug)"
111 parser.add_argument("--error", dest="loglevel", action="store_const",
112 const=logging.ERROR, default=logging.INFO,
114 parser.add_argument("--warning", dest="loglevel", action="store_const",
115 const=logging.WARNING, default=logging.INFO,
117 parser.add_argument("--info", dest="loglevel", action="store_const",
118 const=logging.INFO, default=logging.INFO,
120 parser.add_argument("--debug", dest="loglevel", action="store_const",
121 const=logging.DEBUG, default=logging.INFO,
123 str_help = "Log file name"
124 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125 str_help = "Trailing part of the csv result files for plotting purposes"
126 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127 str_help = "Minimum number of updates to reach to include result into csv."
128 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131 str_help = "Using peerip instead of myip for xmlrpc server"
132 parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133 str_help = "Link-State NLRI supported"
134 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135 str_help = "Link-State NLRI: Identifier"
136 parser.add_argument("-lsid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: Tunnel ID"
138 parser.add_argument("-lstid", default="1", type=int, help=str_help)
139 str_help = "Link-State NLRI: LSP ID"
140 parser.add_argument("-lspid", default="1", type=int, help=str_help)
141 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142 parser.add_argument("--lstsaddr", default="1.2.3.4",
143 type=ipaddr.IPv4Address, help=str_help)
144 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145 parser.add_argument("--lsteaddr", default="5.6.7.8",
146 type=ipaddr.IPv4Address, help=str_help)
147 str_help = "Link-State NLRI: Identifier Step"
148 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149 str_help = "Link-State NLRI: Tunnel ID Step"
150 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151 str_help = "Link-State NLRI: LSP ID Step"
152 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157 str_help = "How many play utilities are to be started."
158 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160 Enabling this flag makes the script not decoding the update mesage, because of not\
161 supported decoding for these elements."
162 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164 Enabling this flag makes the script not decoding the update mesage, because of not\
165 supported decoding for these elements."
166 parser.add_argument("--grace", default="8", type=int, help=str_help)
167 str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
168 IPV4-Unicast, IPV6-Unicast, BGP-LS\
169 Enabling this flag makes the script not decoding the update mesage, because of not\
170 supported decoding for these elements."
171 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
172 str_help = "Open message includes L3VPN-MULTICAST arguments.\
173 Enabling this flag makes the script not decoding the update mesage, because of not\
174 supported decoding for these elements."
175 parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
176 str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
177 parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
178 str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
179 parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
180 str_help = "Open message includes ipv6-unicast family, without message decoding."
181 parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
182 str_help = "Add all supported families without message decoding."
183 parser.add_argument("--allf", default=False, action="store_true", help=str_help)
184 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
185 str_help = "Skipping well known attributes for update message"
186 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
187 arguments = parser.parse_args()
188 if arguments.multiplicity < 1:
189 print "Multiplicity", arguments.multiplicity, "is not positive."
191 # TODO: Are sanity checks (such as asnumber>=0) required?
195 def establish_connection(arguments):
196 """Establish connection to BGP peer.
199 :arguments: following command-line arguments are used
200 - arguments.myip: local IP address
201 - arguments.myport: local port
202 - arguments.peerip: remote IP address
203 - arguments.peerport: remote port
208 logger.info("Connecting in the listening mode.")
209 logger.debug("Local IP address: " + str(arguments.myip))
210 logger.debug("Local port: " + str(arguments.myport))
211 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
213 # bind need single tuple as argument
214 listening_socket.bind((str(arguments.myip), arguments.myport))
215 listening_socket.listen(1)
216 bgp_socket, _ = listening_socket.accept()
217 # TODO: Verify client IP is cotroller IP.
218 listening_socket.close()
220 logger.info("Connecting in the talking mode.")
221 logger.debug("Local IP address: " + str(arguments.myip))
222 logger.debug("Local port: " + str(arguments.myport))
223 logger.debug("Remote IP address: " + str(arguments.peerip))
224 logger.debug("Remote port: " + str(arguments.peerport))
225 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227 # bind to force specified address and port
228 talking_socket.bind((str(arguments.myip), arguments.myport))
229 # socket does not spead ipaddr, hence str()
230 talking_socket.connect((str(arguments.peerip), arguments.peerport))
231 bgp_socket = talking_socket
232 logger.info("Connected to ODL.")
236 def get_short_int_from_message(message, offset=16):
237 """Extract 2-bytes number from provided message.
240 :message: given message
241 :offset: offset of the short_int inside the message
243 :return: required short_inf value.
245 default offset value is the BGP message size offset.
247 high_byte_int = ord(message[offset])
248 low_byte_int = ord(message[offset + 1])
249 short_int = high_byte_int * 256 + low_byte_int
253 def get_prefix_list_from_hex(prefixes_hex):
254 """Get decoded list of prefixes (rfc4271#section-4.3)
257 :prefixes_hex: list of prefixes to be decoded in hex
259 :return: list of prefixes in the form of ip address (X.X.X.X/X)
263 while offset < len(prefixes_hex):
264 prefix_bit_len_hex = prefixes_hex[offset]
265 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
266 prefix_len = ((prefix_bit_len - 1) / 8) + 1
267 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
268 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
269 offset += 1 + prefix_len
270 prefix_list.append(prefix + "/" + str(prefix_bit_len))
274 class MessageError(ValueError):
275 """Value error with logging optimized for hexlified messages."""
277 def __init__(self, text, message, *args):
280 Store and call super init for textual comment,
281 store raw message which caused it.
285 super(MessageError, self).__init__(text, message, *args)
288 """Generate human readable error message.
291 :return: human readable message as string
293 Use a placeholder string if the message is to be empty.
295 message = binascii.hexlify(self.msg)
297 message = "(empty message)"
298 return self.text + ": " + message
301 def read_open_message(bgp_socket):
302 """Receive peer's OPEN message
305 :bgp_socket: the socket to be read
307 :return: received OPEN message.
309 Performs just basic incomming message checks
311 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
312 # TODO: Can the incoming open message be split in more than one packet?
315 # 37 is minimal length of open message with 4-byte AS number.
317 "Message length (" + str(len(msg_in)) + ") is smaller than "
318 "minimal length of OPEN message with 4-byte AS number (37)"
320 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
321 raise MessageError(error_msg, msg_in)
322 # TODO: We could check BGP marker, but it is defined only later;
324 reported_length = get_short_int_from_message(msg_in)
325 if len(msg_in) != reported_length:
327 "Expected message length (" + reported_length +
328 ") does not match actual length (" + str(len(msg_in)) + ")"
330 logger.error(error_msg + binascii.hexlify(msg_in))
331 raise MessageError(error_msg, msg_in)
332 logger.info("Open message received.")
336 class MessageGenerator(object):
337 """Class which generates messages, holds states and configuration values."""
339 # TODO: Define bgp marker as a class (constant) variable.
340 def __init__(self, args):
341 """Initialisation according to command-line args.
344 :args: argsparser's Namespace object which contains command-line
345 options for MesageGenerator initialisation
347 Calculates and stores default values used later on for
350 self.total_prefix_amount = args.amount
351 # Number of update messages left to be sent.
352 self.remaining_prefixes = self.total_prefix_amount
354 # New parameters initialisation
355 self.port = args.port
357 self.prefix_base_default = args.firstprefix
358 self.prefix_length_default = args.prefixlen
359 self.wr_prefixes_default = []
360 self.nlri_prefixes_default = []
361 self.version_default = 4
362 self.my_autonomous_system_default = args.asnumber
363 self.hold_time_default = args.holdtime # Local hold time.
364 self.bgp_identifier_default = int(args.myip)
365 self.next_hop_default = args.nexthop
366 self.originator_id_default = args.originator
367 self.cluster_list_item_default = args.cluster
368 self.single_update_default = args.updates == "single"
369 self.randomize_updates_default = args.updates == "random"
370 self.prefix_count_to_add_default = args.insert
371 self.prefix_count_to_del_default = args.withdraw
372 if self.prefix_count_to_del_default < 0:
373 self.prefix_count_to_del_default = 0
374 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
375 # total number of prefixes must grow to avoid infinite test loop
376 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
377 self.slot_size_default = self.prefix_count_to_add_default
378 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
379 self.results_file_name_default = args.results
380 self.performance_threshold_default = args.threshold
381 self.rfc4760 = args.rfc4760
382 self.bgpls = args.bgpls
383 self.evpn = args.evpn
384 self.mvpn = args.mvpn
385 self.l3vpn_mcast = args.l3vpn_mcast
386 self.l3vpn = args.l3vpn
387 self.rt_constrain = args.rt_constrain
388 self.ipv6 = args.ipv6
389 self.allf = args.allf
390 self.skipattr = args.skipattr
391 self.grace = args.grace
392 # Default values when BGP-LS Attributes are used
394 self.prefix_count_to_add_default = 1
395 self.prefix_count_to_del_default = 0
396 self.ls_nlri_default = {"Identifier": args.lsid,
397 "TunnelID": args.lstid,
399 "IPv4TunnelSenderAddress": args.lstsaddr,
400 "IPv4TunnelEndPointAddress": args.lsteaddr}
401 self.lsid_step = args.lsidstep
402 self.lstid_step = args.lstidstep
403 self.lspid_step = args.lspidstep
404 self.lstsaddr_step = args.lstsaddrstep
405 self.lsteaddr_step = args.lsteaddrstep
406 # Default values used for randomized part
407 s1_slots = ((self.total_prefix_amount -
408 self.remaining_prefixes_threshold - 1) /
409 self.prefix_count_to_add_default + 1)
410 s2_slots = ((self.remaining_prefixes_threshold - 1) /
411 (self.prefix_count_to_add_default -
412 self.prefix_count_to_del_default) + 1)
414 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
415 s2_first_index = s1_slots * self.prefix_count_to_add_default
416 s2_last_index = (s2_first_index +
417 s2_slots * (self.prefix_count_to_add_default -
418 self.prefix_count_to_del_default) - 1)
419 self.slot_gap_default = ((self.total_prefix_amount -
420 self.remaining_prefixes_threshold - 1) /
421 self.prefix_count_to_add_default + 1)
422 self.randomize_lowest_default = s2_first_index
423 self.randomize_highest_default = s2_last_index
424 # Initialising counters
425 self.phase1_start_time = 0
426 self.phase1_stop_time = 0
427 self.phase2_start_time = 0
428 self.phase2_stop_time = 0
429 self.phase1_updates_sent = 0
430 self.phase2_updates_sent = 0
431 self.updates_sent = 0
433 self.log_info = args.loglevel <= logging.INFO
434 self.log_debug = args.loglevel <= logging.DEBUG
436 Flags needed for the MessageGenerator performance optimization.
437 Calling logger methods each iteration even with proper log level set
438 slows down significantly the MessageGenerator performance.
439 Measured total generation time (1M updates, dry run, error log level):
440 - logging based on basic logger features: 36,2s
441 - logging based on advanced logger features (lazy logging): 21,2s
442 - conditional calling of logger methods enclosed inside condition: 8,6s
445 logger.info("Generator initialisation")
446 logger.info(" Target total number of prefixes to be introduced: " +
447 str(self.total_prefix_amount))
448 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
449 str(self.prefix_length_default))
450 logger.info(" My Autonomous System number: " +
451 str(self.my_autonomous_system_default))
452 logger.info(" My Hold Time: " + str(self.hold_time_default))
453 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
454 logger.info(" Next Hop: " + str(self.next_hop_default))
455 logger.info(" Originator ID: " + str(self.originator_id_default))
456 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
457 logger.info(" Prefix count to be inserted at once: " +
458 str(self.prefix_count_to_add_default))
459 logger.info(" Prefix count to be withdrawn at once: " +
460 str(self.prefix_count_to_del_default))
461 logger.info(" Fast pre-fill up to " +
462 str(self.total_prefix_amount -
463 self.remaining_prefixes_threshold) + " prefixes")
464 logger.info(" Remaining number of prefixes to be processed " +
465 "in parallel with withdrawals: " +
466 str(self.remaining_prefixes_threshold))
467 logger.debug(" Prefix index range used after pre-fill procedure [" +
468 str(self.randomize_lowest_default) + ", " +
469 str(self.randomize_highest_default) + "]")
470 if self.single_update_default:
471 logger.info(" Common single UPDATE will be generated " +
472 "for both NLRI & WITHDRAWN lists")
474 logger.info(" Two separate UPDATEs will be generated " +
475 "for each NLRI & WITHDRAWN lists")
476 if self.randomize_updates_default:
477 logger.info(" Generation of UPDATE messages will be randomized")
478 logger.info(" Let\'s go ...\n")
480 # TODO: Notification for hold timer expiration can be handy.
482 def store_results(self, file_name=None, threshold=None):
483 """ Stores specified results into files based on file_name value.
486 :param file_name: Trailing (common) part of result file names
487 :param threshold: Minimum number of sent updates needed for each
488 result to be included into result csv file
489 (mainly needed because of the result accuracy)
493 # default values handling
494 # TODO optimize default values handling (use e.g. dicionary.update() approach)
495 if file_name is None:
496 file_name = self.results_file_name_default
497 if threshold is None:
498 threshold = self.performance_threshold_default
499 # performance calculation
500 if self.phase1_updates_sent >= threshold:
501 totals1 = self.phase1_updates_sent
502 performance1 = int(self.phase1_updates_sent /
503 (self.phase1_stop_time - self.phase1_start_time))
507 if self.phase2_updates_sent >= threshold:
508 totals2 = self.phase2_updates_sent
509 performance2 = int(self.phase2_updates_sent /
510 (self.phase2_stop_time - self.phase2_start_time))
515 logger.info("#" * 10 + " Final results " + "#" * 10)
516 logger.info("Number of iterations: " + str(self.iteration))
517 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
518 str(self.phase1_updates_sent))
519 logger.info("The pre-fill phase duration: " +
520 str(self.phase1_stop_time - self.phase1_start_time) + "s")
521 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
522 str(self.phase2_updates_sent))
523 logger.info("The 2nd test phase duration: " +
524 str(self.phase2_stop_time - self.phase2_start_time) + "s")
525 logger.info("Threshold for performance reporting: " + str(threshold))
528 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
529 " route(s) per UPDATE")
530 if self.single_update_default:
531 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
532 "/-" + str(self.prefix_count_to_del_default) +
533 " routes per UPDATE")
535 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
536 "/-" + str(self.prefix_count_to_del_default) +
537 " routes in two UPDATEs")
538 # collecting capacity and performance results
541 if totals1 is not None:
542 totals[phase1_label] = totals1
543 performance[phase1_label] = performance1
544 if totals2 is not None:
545 totals[phase2_label] = totals2
546 performance[phase2_label] = performance2
547 self.write_results_to_file(totals, "totals-" + file_name)
548 self.write_results_to_file(performance, "performance-" + file_name)
550 def write_results_to_file(self, results, file_name):
551 """Writes results to the csv plot file consumable by Jenkins.
554 :param file_name: Name of the (csv) file to be created
560 f = open(file_name, "wt")
562 for key in sorted(results):
563 first_line += key + ", "
564 second_line += str(results[key]) + ", "
565 first_line = first_line[:-2]
566 second_line = second_line[:-2]
567 f.write(first_line + "\n")
568 f.write(second_line + "\n")
569 logger.info("Message generator performance results stored in " +
571 logger.info(" " + first_line)
572 logger.info(" " + second_line)
576 # Return pseudo-randomized (reproducible) index for selected range
577 def randomize_index(self, index, lowest=None, highest=None):
578 """Calculates pseudo-randomized index from selected range.
581 :param index: input index
582 :param lowest: the lowes index from the randomized area
583 :param highest: the highest index from the randomized area
585 :return: the (pseudo)randomized index
587 Created just as a fame for future generator enhancement.
589 # default values handling
590 # TODO optimize default values handling (use e.g. dicionary.update() approach)
592 lowest = self.randomize_lowest_default
594 highest = self.randomize_highest_default
596 if (index >= lowest) and (index <= highest):
597 # we are in the randomized range -> shuffle it inside
598 # the range (now just reverse the order)
599 new_index = highest - (index - lowest)
601 # we are out of the randomized range -> nothing to do
605 def get_ls_nlri_values(self, index):
606 """Generates LS-NLRI parameters.
607 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
610 :param index: index (iteration)
612 :return: dictionary of LS NLRI parameters and values
614 # generating list of LS NLRI parameters
615 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
616 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
617 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
618 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
619 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
620 ls_nlri_values = {"Identifier": identifier,
621 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
622 "TunnelID": tunnel_id, "LSPID": lsp_id,
623 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
624 return ls_nlri_values
626 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
627 prefix_len=None, prefix_count=None, randomize=None):
628 """Generates list of IP address prefixes.
631 :param slot_index: index of group of prefix addresses
632 :param slot_size: size of group of prefix addresses
633 in [number of included prefixes]
634 :param prefix_base: IP address of the first prefix
635 (slot_index = 0, prefix_index = 0)
636 :param prefix_len: length of the prefix in bites
637 (the same as size of netmask)
638 :param prefix_count: number of prefixes to be returned
639 from the specified slot
641 :return: list of generated IP address prefixes
643 # default values handling
644 # TODO optimize default values handling (use e.g. dicionary.update() approach)
645 if slot_size is None:
646 slot_size = self.slot_size_default
647 if prefix_base is None:
648 prefix_base = self.prefix_base_default
649 if prefix_len is None:
650 prefix_len = self.prefix_length_default
651 if prefix_count is None:
652 prefix_count = slot_size
653 if randomize is None:
654 randomize = self.randomize_updates_default
655 # generating list of prefixes
658 prefix_gap = 2 ** (32 - prefix_len)
659 for i in range(prefix_count):
660 prefix_index = slot_index * slot_size + i
662 prefix_index = self.randomize_index(prefix_index)
663 indexes.append(prefix_index)
664 prefixes.append(prefix_base + prefix_index * prefix_gap)
666 logger.debug(" Prefix slot index: " + str(slot_index))
667 logger.debug(" Prefix slot size: " + str(slot_size))
668 logger.debug(" Prefix count: " + str(prefix_count))
669 logger.debug(" Prefix indexes: " + str(indexes))
670 logger.debug(" Prefix list: " + str(prefixes))
673 def compose_update_message(self, prefix_count_to_add=None,
674 prefix_count_to_del=None):
675 """Composes an UPDATE message
678 :param prefix_count_to_add: # of prefixes to put into NLRI list
679 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
681 :return: encoded UPDATE message in HEX
683 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
684 lists or common message wich includes both prefix lists.
685 Updates global counters.
687 # default values handling
688 # TODO optimize default values handling (use e.g. dicionary.update() approach)
689 if prefix_count_to_add is None:
690 prefix_count_to_add = self.prefix_count_to_add_default
691 if prefix_count_to_del is None:
692 prefix_count_to_del = self.prefix_count_to_del_default
694 if self.log_info and not (self.iteration % 1000):
695 logger.info("Iteration: " + str(self.iteration) +
696 " - total remaining prefixes: " +
697 str(self.remaining_prefixes))
699 logger.debug("#" * 10 + " Iteration: " +
700 str(self.iteration) + " " + "#" * 10)
701 logger.debug("Remaining prefixes: " +
702 str(self.remaining_prefixes))
703 # scenario type & one-shot counter
704 straightforward_scenario = (self.remaining_prefixes >
705 self.remaining_prefixes_threshold)
706 if straightforward_scenario:
707 prefix_count_to_del = 0
709 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
710 if not self.phase1_start_time:
711 self.phase1_start_time = time.time()
714 logger.debug("--- COMBINED SCENARIO ---")
715 if not self.phase2_start_time:
716 self.phase2_start_time = time.time()
717 # tailor the number of prefixes if needed
718 prefix_count_to_add = (prefix_count_to_del +
719 min(prefix_count_to_add - prefix_count_to_del,
720 self.remaining_prefixes))
721 # prefix slots selection for insertion and withdrawal
722 slot_index_to_add = self.iteration
723 slot_index_to_del = slot_index_to_add - self.slot_gap_default
724 # getting lists of prefixes for insertion in this iteration
726 logger.debug("Prefixes to be inserted in this iteration:")
727 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
728 prefix_count=prefix_count_to_add)
729 # getting lists of prefixes for withdrawal in this iteration
731 logger.debug("Prefixes to be withdrawn in this iteration:")
732 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
733 prefix_count=prefix_count_to_del)
734 # generating the UPDATE mesage with LS-NLRI only
736 ls_nlri = self.get_ls_nlri_values(self.iteration)
737 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
740 # generating the UPDATE message with prefix lists
741 if self.single_update_default:
742 # Send prefixes to be introduced and withdrawn
743 # in one UPDATE message
744 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
745 nlri_prefixes=prefix_list_to_add)
747 # Send prefixes to be introduced and withdrawn
748 # in separate UPDATE messages (if needed)
749 msg_out = self.update_message(wr_prefixes=[],
750 nlri_prefixes=prefix_list_to_add)
751 if prefix_count_to_del:
752 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
754 # updating counters - who knows ... maybe I am last time here ;)
755 if straightforward_scenario:
756 self.phase1_stop_time = time.time()
757 self.phase1_updates_sent = self.updates_sent
759 self.phase2_stop_time = time.time()
760 self.phase2_updates_sent = (self.updates_sent -
761 self.phase1_updates_sent)
762 # updating totals for the next iteration
764 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
765 # returning the encoded message
768 # Section of message encoders
770 def open_message(self, version=None, my_autonomous_system=None,
771 hold_time=None, bgp_identifier=None):
772 """Generates an OPEN Message (rfc4271#section-4.2)
775 :param version: see the rfc4271#section-4.2
776 :param my_autonomous_system: see the rfc4271#section-4.2
777 :param hold_time: see the rfc4271#section-4.2
778 :param bgp_identifier: see the rfc4271#section-4.2
780 :return: encoded OPEN message in HEX
783 # default values handling
784 # TODO optimize default values handling (use e.g. dicionary.update() approach)
786 version = self.version_default
787 if my_autonomous_system is None:
788 my_autonomous_system = self.my_autonomous_system_default
789 if hold_time is None:
790 hold_time = self.hold_time_default
791 if bgp_identifier is None:
792 bgp_identifier = self.bgp_identifier_default
795 marker_hex = "\xFF" * 16
799 type_hex = struct.pack("B", type)
802 version_hex = struct.pack("B", version)
804 # my_autonomous_system
805 # AS_TRANS value, 23456 decadic.
806 my_autonomous_system_2_bytes = 23456
807 # AS number is mappable to 2 bytes
808 if my_autonomous_system < 65536:
809 my_autonomous_system_2_bytes = my_autonomous_system
810 my_autonomous_system_hex_2_bytes = struct.pack(">H",
811 my_autonomous_system)
814 hold_time_hex = struct.pack(">H", hold_time)
817 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
819 # Optional Parameters
820 optional_parameters_hex = ""
821 if self.rfc4760 or self.allf:
822 optional_parameter_hex = (
823 "\x02" # Param type ("Capability Ad")
824 "\x06" # Length (6 bytes)
825 "\x01" # Capability type (NLRI Unicast),
826 # see RFC 4760, secton 8
827 "\x04" # Capability value length
828 "\x00\x01" # AFI (Ipv4)
830 "\x01" # SAFI (Unicast)
832 optional_parameters_hex += optional_parameter_hex
834 if self.ipv6 or self.allf:
835 optional_parameter_hex = (
836 "\x02" # Param type ("Capability Ad")
837 "\x06" # Length (6 bytes)
838 "\x01" # Multiprotocol extetension capability,
839 "\x04" # Capability value length
840 "\x00\x02" # AFI (IPV6)
842 "\x01" # SAFI (UNICAST)
844 optional_parameters_hex += optional_parameter_hex
846 if self.bgpls or self.allf:
847 optional_parameter_hex = (
848 "\x02" # Param type ("Capability Ad")
849 "\x06" # Length (6 bytes)
850 "\x01" # Capability type (NLRI Unicast),
851 # see RFC 4760, secton 8
852 "\x04" # Capability value length
853 "\x40\x04" # AFI (BGP-LS)
855 "\x47" # SAFI (BGP-LS)
857 optional_parameters_hex += optional_parameter_hex
859 if self.evpn or self.allf:
860 optional_parameter_hex = (
861 "\x02" # Param type ("Capability Ad")
862 "\x06" # Length (6 bytes)
863 "\x01" # Multiprotocol extetension capability,
864 "\x04" # Capability value length
865 "\x00\x19" # AFI (L2-VPN)
869 optional_parameters_hex += optional_parameter_hex
871 if self.mvpn or self.allf:
872 optional_parameter_hex = (
873 "\x02" # Param type ("Capability Ad")
874 "\x06" # Length (6 bytes)
875 "\x01" # Multiprotocol extetension capability,
876 "\x04" # Capability value length
877 "\x00\x01" # AFI (IPV4)
879 "\x05" # SAFI (MCAST-VPN)
881 optional_parameters_hex += optional_parameter_hex
882 optional_parameter_hex = (
883 "\x02" # Param type ("Capability Ad")
884 "\x06" # Length (6 bytes)
885 "\x01" # Multiprotocol extetension capability,
886 "\x04" # Capability value length
887 "\x00\x02" # AFI (IPV6)
889 "\x05" # SAFI (MCAST-VPN)
891 optional_parameters_hex += optional_parameter_hex
893 if self.l3vpn_mcast or self.allf:
894 optional_parameter_hex = (
895 "\x02" # Param type ("Capability Ad")
896 "\x06" # Length (6 bytes)
897 "\x01" # Multiprotocol extetension capability,
898 "\x04" # Capability value length
899 "\x00\x01" # AFI (IPV4)
901 "\x81" # SAFI (L3VPN-MCAST)
903 optional_parameters_hex += optional_parameter_hex
904 optional_parameter_hex = (
905 "\x02" # Param type ("Capability Ad")
906 "\x06" # Length (6 bytes)
907 "\x01" # Multiprotocol extetension capability,
908 "\x04" # Capability value length
909 "\x00\x02" # AFI (IPV6)
911 "\x81" # SAFI (L3VPN-MCAST)
913 optional_parameters_hex += optional_parameter_hex
915 if self.l3vpn or self.allf:
916 optional_parameter_hex = (
917 "\x02" # Param type ("Capability Ad")
918 "\x06" # Length (6 bytes)
919 "\x01" # Multiprotocol extetension capability,
920 "\x04" # Capability value length
921 "\x00\x01" # AFI (IPV4)
923 "\x80" # SAFI (L3VPN-UNICAST)
925 optional_parameters_hex += optional_parameter_hex
926 optional_parameter_hex = (
927 "\x02" # Param type ("Capability Ad")
928 "\x06" # Length (6 bytes)
929 "\x01" # Multiprotocol extetension capability,
930 "\x04" # Capability value length
931 "\x00\x02" # AFI (IPV6)
933 "\x80" # SAFI (L3VPN-UNICAST)
935 optional_parameters_hex += optional_parameter_hex
937 if self.rt_constrain or self.allf:
938 optional_parameter_hex = (
939 "\x02" # Param type ("Capability Ad")
940 "\x06" # Length (6 bytes)
941 "\x01" # Multiprotocol extetension capability,
942 "\x04" # Capability value length
943 "\x00\x01" # AFI (IPV4)
945 "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
947 optional_parameters_hex += optional_parameter_hex
949 optional_parameter_hex = (
950 "\x02" # Param type ("Capability Ad")
951 "\x06" # Length (6 bytes)
952 "\x41" # "32 bit AS Numbers Support"
953 # (see RFC 6793, section 3)
954 "\x04" # Capability value length
956 optional_parameter_hex += (
957 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
959 optional_parameters_hex += optional_parameter_hex
962 b = list(bin(self.grace)[2:])
963 b = b + [0] * (3 - len(b))
966 restart_flag = "\x80\x05"
968 restart_flag = "\x00\x05"
974 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
978 logger.debug("Grace parameters list: {}".format(b))
979 # "\x02" Param type ("Capability Ad")
980 # :param length: Length of whole message
981 # "\x40" Graceful-restart capability
982 # "\x06" Length (6 bytes)
983 # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
984 # "\x05" Restart timer (5sec)
985 # "\x00\x01" AFI (IPV4)
986 # "\x01" SAFI (Unicast)
987 # "\x00" Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
988 # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
989 # ll-gr turned on when grace is between 4-7
990 optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
991 length, restart_flag, ipv4_flag, ll_gr)
992 optional_parameters_hex += optional_parameter_hex
994 # Optional Parameters Length
995 optional_parameters_length = len(optional_parameters_hex)
996 optional_parameters_length_hex = struct.pack("B",
997 optional_parameters_length)
999 # Length (big-endian)
1001 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
1002 len(my_autonomous_system_hex_2_bytes) +
1003 len(hold_time_hex) + len(bgp_identifier_hex) +
1004 len(optional_parameters_length_hex) +
1005 len(optional_parameters_hex)
1007 length_hex = struct.pack(">H", length)
1015 my_autonomous_system_hex_2_bytes +
1017 bgp_identifier_hex +
1018 optional_parameters_length_hex +
1019 optional_parameters_hex
1023 logger.debug("OPEN message encoding")
1024 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1025 logger.debug(" Length=" + str(length) + " (0x" +
1026 binascii.hexlify(length_hex) + ")")
1027 logger.debug(" Type=" + str(type) + " (0x" +
1028 binascii.hexlify(type_hex) + ")")
1029 logger.debug(" Version=" + str(version) + " (0x" +
1030 binascii.hexlify(version_hex) + ")")
1031 logger.debug(" My Autonomous System=" +
1032 str(my_autonomous_system_2_bytes) + " (0x" +
1033 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
1035 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
1036 binascii.hexlify(hold_time_hex) + ")")
1037 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
1038 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
1039 logger.debug(" Optional Parameters Length=" +
1040 str(optional_parameters_length) + " (0x" +
1041 binascii.hexlify(optional_parameters_length_hex) +
1043 logger.debug(" Optional Parameters=0x" +
1044 binascii.hexlify(optional_parameters_hex))
1045 logger.debug("OPEN message encoded: 0x%s",
1046 binascii.b2a_hex(message_hex))
1050 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
1051 wr_prefix_length=None, nlri_prefix_length=None,
1052 my_autonomous_system=None, next_hop=None,
1053 originator_id=None, cluster_list_item=None,
1054 end_of_rib=False, **ls_nlri_params):
1055 """Generates an UPDATE Message (rfc4271#section-4.3)
1058 :param wr_prefixes: see the rfc4271#section-4.3
1059 :param nlri_prefixes: see the rfc4271#section-4.3
1060 :param wr_prefix_length: see the rfc4271#section-4.3
1061 :param nlri_prefix_length: see the rfc4271#section-4.3
1062 :param my_autonomous_system: see the rfc4271#section-4.3
1063 :param next_hop: see the rfc4271#section-4.3
1065 :return: encoded UPDATE message in HEX
1068 # default values handling
1069 # TODO optimize default values handling (use e.g. dicionary.update() approach)
1070 if wr_prefixes is None:
1071 wr_prefixes = self.wr_prefixes_default
1072 if nlri_prefixes is None:
1073 nlri_prefixes = self.nlri_prefixes_default
1074 if wr_prefix_length is None:
1075 wr_prefix_length = self.prefix_length_default
1076 if nlri_prefix_length is None:
1077 nlri_prefix_length = self.prefix_length_default
1078 if my_autonomous_system is None:
1079 my_autonomous_system = self.my_autonomous_system_default
1080 if next_hop is None:
1081 next_hop = self.next_hop_default
1082 if originator_id is None:
1083 originator_id = self.originator_id_default
1084 if cluster_list_item is None:
1085 cluster_list_item = self.cluster_list_item_default
1086 ls_nlri = self.ls_nlri_default.copy()
1087 ls_nlri.update(ls_nlri_params)
1090 marker_hex = "\xFF" * 16
1094 type_hex = struct.pack("B", type)
1097 withdrawn_routes_hex = ""
1099 bytes = ((wr_prefix_length - 1) / 8) + 1
1100 for prefix in wr_prefixes:
1101 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1102 struct.pack(">I", int(prefix))[:bytes])
1103 withdrawn_routes_hex += withdrawn_route_hex
1105 # Withdrawn Routes Length
1106 withdrawn_routes_length = len(withdrawn_routes_hex)
1107 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1109 # TODO: to replace hardcoded string by encoding?
1111 path_attributes_hex = ""
1112 if not self.skipattr:
1113 path_attributes_hex += (
1114 "\x40" # Flags ("Well-Known")
1115 "\x01" # Type (ORIGIN)
1117 "\x00" # Origin: IGP
1119 path_attributes_hex += (
1120 "\x40" # Flags ("Well-Known")
1121 "\x02" # Type (AS_PATH)
1123 "\x02" # AS segment type (AS_SEQUENCE)
1124 "\x01" # AS segment length (1)
1126 my_as_hex = struct.pack(">I", my_autonomous_system)
1127 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1128 path_attributes_hex += (
1129 "\x40" # Flags ("Well-Known")
1130 "\x05" # Type (LOCAL_PREF)
1132 "\x00\x00\x00\x64" # (100)
1134 if nlri_prefixes != []:
1135 path_attributes_hex += (
1136 "\x40" # Flags ("Well-Known")
1137 "\x03" # Type (NEXT_HOP)
1140 next_hop_hex = struct.pack(">I", int(next_hop))
1141 path_attributes_hex += (
1142 next_hop_hex # IP address of the next hop (4 bytes)
1144 if originator_id is not None:
1145 path_attributes_hex += (
1146 "\x80" # Flags ("Optional, non-transitive")
1147 "\x09" # Type (ORIGINATOR_ID)
1149 ) # ORIGINATOR_ID (4 bytes)
1150 path_attributes_hex += struct.pack(">I", int(originator_id))
1151 if cluster_list_item is not None:
1152 path_attributes_hex += (
1153 "\x80" # Flags ("Optional, non-transitive")
1154 "\x0a" # Type (CLUSTER_LIST)
1156 ) # one CLUSTER_LIST item (4 bytes)
1157 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1159 if self.bgpls and not end_of_rib:
1160 path_attributes_hex += (
1161 "\x80" # Flags ("Optional, non-transitive")
1162 "\x0e" # Type (MP_REACH_NLRI)
1163 "\x22" # Length (34)
1164 "\x40\x04" # AFI (BGP-LS)
1165 "\x47" # SAFI (BGP-LS)
1166 "\x04" # Next Hop Length (4)
1168 path_attributes_hex += struct.pack(">I", int(next_hop))
1169 path_attributes_hex += "\x00" # Reserved
1170 path_attributes_hex += (
1171 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1172 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1173 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1175 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1176 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1177 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1178 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1179 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1181 # Total Path Attributes Length
1182 total_path_attributes_length = len(path_attributes_hex)
1183 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1185 # Network Layer Reachability Information
1188 bytes = ((nlri_prefix_length - 1) / 8) + 1
1189 for prefix in nlri_prefixes:
1190 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1191 struct.pack(">I", int(prefix))[:bytes])
1192 nlri_hex += nlri_prefix_hex
1194 # Length (big-endian)
1196 len(marker_hex) + 2 + len(type_hex) +
1197 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1198 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1200 length_hex = struct.pack(">H", length)
1207 withdrawn_routes_length_hex +
1208 withdrawn_routes_hex +
1209 total_path_attributes_length_hex +
1210 path_attributes_hex +
1214 if self.grace != 8 and self.grace != 0 and end_of_rib:
1215 message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
1218 logger.debug("UPDATE message encoding")
1219 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1220 logger.debug(" Length=" + str(length) + " (0x" +
1221 binascii.hexlify(length_hex) + ")")
1222 logger.debug(" Type=" + str(type) + " (0x" +
1223 binascii.hexlify(type_hex) + ")")
1224 logger.debug(" withdrawn_routes_length=" +
1225 str(withdrawn_routes_length) + " (0x" +
1226 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1227 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1228 str(wr_prefix_length) + " (0x" +
1229 binascii.hexlify(withdrawn_routes_hex) + ")")
1230 if total_path_attributes_length:
1231 logger.debug(" Total Path Attributes Length=" +
1232 str(total_path_attributes_length) + " (0x" +
1233 binascii.hexlify(total_path_attributes_length_hex) + ")")
1234 logger.debug(" Path Attributes=" + "(0x" +
1235 binascii.hexlify(path_attributes_hex) + ")")
1236 logger.debug(" Origin=IGP")
1237 logger.debug(" AS path=" + str(my_autonomous_system))
1238 logger.debug(" Next hop=" + str(next_hop))
1239 if originator_id is not None:
1240 logger.debug(" Originator id=" + str(originator_id))
1241 if cluster_list_item is not None:
1242 logger.debug(" Cluster list=" + str(cluster_list_item))
1244 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1245 logger.debug(" Network Layer Reachability Information=" +
1246 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1247 " (0x" + binascii.hexlify(nlri_hex) + ")")
1248 logger.debug("UPDATE message encoded: 0x" +
1249 binascii.b2a_hex(message_hex))
1252 self.updates_sent += 1
1253 # returning encoded message
1256 def notification_message(self, error_code, error_subcode, data_hex=""):
1257 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1260 :param error_code: see the rfc4271#section-4.5
1261 :param error_subcode: see the rfc4271#section-4.5
1262 :param data_hex: see the rfc4271#section-4.5
1264 :return: encoded NOTIFICATION message in HEX
1268 marker_hex = "\xFF" * 16
1272 type_hex = struct.pack("B", type)
1275 error_code_hex = struct.pack("B", error_code)
1278 error_subcode_hex = struct.pack("B", error_subcode)
1280 # Length (big-endian)
1281 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1282 len(error_subcode_hex) + len(data_hex))
1283 length_hex = struct.pack(">H", length)
1285 # NOTIFICATION Message
1296 logger.debug("NOTIFICATION message encoding")
1297 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1298 logger.debug(" Length=" + str(length) + " (0x" +
1299 binascii.hexlify(length_hex) + ")")
1300 logger.debug(" Type=" + str(type) + " (0x" +
1301 binascii.hexlify(type_hex) + ")")
1302 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1303 binascii.hexlify(error_code_hex) + ")")
1304 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1305 binascii.hexlify(error_subcode_hex) + ")")
1306 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1307 logger.debug("NOTIFICATION message encoded: 0x%s",
1308 binascii.b2a_hex(message_hex))
1312 def keepalive_message(self):
1313 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1316 :return: encoded KEEP ALIVE message in HEX
1320 marker_hex = "\xFF" * 16
1324 type_hex = struct.pack("B", type)
1326 # Length (big-endian)
1327 length = len(marker_hex) + 2 + len(type_hex)
1328 length_hex = struct.pack(">H", length)
1330 # KEEP ALIVE Message
1338 logger.debug("KEEP ALIVE message encoding")
1339 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1340 logger.debug(" Length=" + str(length) + " (0x" +
1341 binascii.hexlify(length_hex) + ")")
1342 logger.debug(" Type=" + str(type) + " (0x" +
1343 binascii.hexlify(type_hex) + ")")
1344 logger.debug("KEEP ALIVE message encoded: 0x%s",
1345 binascii.b2a_hex(message_hex))
1350 class TimeTracker(object):
1351 """Class for tracking timers, both for my keepalives and
1355 def __init__(self, msg_in):
1356 """Initialisation. based on defaults and OPEN message from peer.
1359 msg_in: the OPEN message received from peer.
1361 # Note: Relative time is always named timedelta, to stress that
1362 # the (non-delta) time is absolute.
1363 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1364 # Upper bound for being stuck in the same state, we should
1365 # at least report something before continuing.
1366 # Negotiate the hold timer by taking the smaller
1367 # of the 2 values (mine and the peer's).
1368 hold_timedelta = 180 # Not an attribute of self yet.
1369 # TODO: Make the default value configurable,
1370 # default value could mirror what peer said.
1371 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1372 if hold_timedelta > peer_hold_timedelta:
1373 hold_timedelta = peer_hold_timedelta
1374 if hold_timedelta != 0 and hold_timedelta < 3:
1375 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1376 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1377 self.hold_timedelta = hold_timedelta
1378 # If we do not hear from peer this long, we assume it has died.
1379 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1380 # Upper limit for duration between messages, to avoid being
1381 # declared to be dead.
1382 # The same as calling snapshot(), but also declares a field.
1383 self.snapshot_time = time.time()
1384 # Sometimes we need to store time. This is where to get
1385 # the value from afterwards. Time_keepalive may be too strict.
1386 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1387 # At this time point, peer will be declared dead.
1388 self.my_keepalive_time = None # to be set later
1389 # At this point, we should be sending keepalive message.
1392 """Store current time in instance data to use later."""
1393 # Read as time before something interesting was called.
1394 self.snapshot_time = time.time()
1396 def reset_peer_hold_time(self):
1397 """Move hold time to future as peer has just proven it still lives."""
1398 self.peer_hold_time = time.time() + self.hold_timedelta
1400 # Some methods could rely on self.snapshot_time, but it is better
1401 # to require user to provide it explicitly.
1402 def reset_my_keepalive_time(self, keepalive_time):
1403 """Calculate and set the next my KEEP ALIVE timeout time
1406 :keepalive_time: the initial value of the KEEP ALIVE timer
1408 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1410 def is_time_for_my_keepalive(self):
1411 """Check for my KEEP ALIVE timeout occurence"""
1412 if self.hold_timedelta == 0:
1414 return self.snapshot_time >= self.my_keepalive_time
1416 def get_next_event_time(self):
1417 """Set the time of the next expected or to be sent KEEP ALIVE"""
1418 if self.hold_timedelta == 0:
1419 return self.snapshot_time + 86400
1420 return min(self.my_keepalive_time, self.peer_hold_time)
1422 def check_peer_hold_time(self, snapshot_time):
1423 """Raise error if nothing was read from peer until specified time."""
1424 # Hold time = 0 means keepalive checking off.
1425 if self.hold_timedelta != 0:
1426 # time.time() may be too strict
1427 if snapshot_time > self.peer_hold_time:
1428 logger.error("Peer has overstepped the hold timer.")
1429 raise RuntimeError("Peer has overstepped the hold timer.")
1430 # TODO: Include hold_timedelta?
1431 # TODO: Add notification sending (attempt). That means
1432 # move to write tracker.
1435 class ReadTracker(object):
1436 """Class for tracking read of mesages chunk by chunk and
1440 def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1441 l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1442 ipv6=False, grace=8, wait_for_read=10):
1443 """The reader initialisation.
1446 bgp_socket: socket to be used for sending
1447 timer: timer to be used for scheduling
1448 storage: thread safe dict
1449 evpn: flag that evpn functionality is tested
1450 mvpn: flag that mvpn functionality is tested
1451 grace: flag that grace-restart functionality is tested
1452 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1453 l3vpn: flag that l3vpn unicast functionality is tested
1454 rt_constrain: flag that rt-constrain functionality is tested
1455 allf: flag for all family testing.
1457 # References to outside objects.
1458 self.socket = bgp_socket
1460 # BGP marker length plus length field length.
1461 self.header_length = 18
1462 # TODO: make it class (constant) attribute
1463 # Computation of where next chunk ends depends on whether
1464 # we are beyond length field.
1465 self.reading_header = True
1466 # Countdown towards next size computation.
1467 self.bytes_to_read = self.header_length
1468 # Incremental buffer for message under read.
1470 # Initialising counters
1471 self.updates_received = 0
1472 self.prefixes_introduced = 0
1473 self.prefixes_withdrawn = 0
1474 self.rx_idle_time = 0
1475 self.rx_activity_detected = True
1476 self.storage = storage
1479 self.l3vpn_mcast = l3vpn_mcast
1481 self.rt_constrain = rt_constrain
1484 self.wfr = wait_for_read
1487 def read_message_chunk(self):
1488 """Read up to one message
1491 Currently it does not return anything.
1493 # TODO: We could return the whole message, currently not needed.
1494 # We assume the socket is readable.
1495 logger.info("Receiving %d bytes", self.bytes_to_read)
1496 chunk_message = self.socket.recv(self.bytes_to_read)
1497 msglen = len(chunk_message)
1498 logger.info("Received %d bytes", msglen)
1499 self.msg_in += chunk_message
1500 logger.info("Current message is %d bytes", len(self.msg_in))
1501 self.bytes_to_read -= msglen
1502 logger.info("Checking %d bytes to read (header=%s)",
1503 self.bytes_to_read, self.reading_header)
1504 # TODO: bytes_to_read < 0 is not possible, right?
1505 if not self.bytes_to_read:
1506 # Finished reading a logical block.
1507 if self.reading_header:
1508 # The logical block was a BGP header.
1509 # Now we know the size of the message.
1510 self.reading_header = False
1511 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1513 else: # We have finished reading the body of the message.
1514 # Peer has just proven it is still alive.
1515 self.timer.reset_peer_hold_time()
1516 # TODO: Do we want to count received messages?
1517 # This version ignores the received message.
1518 # TODO: Should we do validation and exit on anything
1519 # besides update or keepalive?
1520 # Prepare state for reading another message.
1521 message_type_hex = self.msg_in[self.header_length]
1522 if message_type_hex == "\x01":
1523 logger.info("OPEN message received: 0x%s",
1524 binascii.b2a_hex(self.msg_in))
1525 elif message_type_hex == "\x02":
1526 logger.debug("UPDATE message received: 0x%s",
1527 binascii.b2a_hex(self.msg_in))
1528 self.decode_update_message(self.msg_in)
1529 elif message_type_hex == "\x03":
1530 logger.info("NOTIFICATION message received: 0x%s",
1531 binascii.b2a_hex(self.msg_in))
1532 elif message_type_hex == "\x04":
1533 logger.info("KEEP ALIVE message received: 0x%s",
1534 binascii.b2a_hex(self.msg_in))
1536 logger.warning("Unexpected message received: 0x%s",
1537 binascii.b2a_hex(self.msg_in))
1539 self.reading_header = True
1540 self.bytes_to_read = self.header_length
1541 # We should not act upon peer_hold_time if we are reading
1542 # something right now.
1543 logger.info("Require %d bytes to read (header=%s)",
1544 self.bytes_to_read, self.reading_header)
1547 def decode_path_attributes(self, path_attributes_hex):
1548 """Decode the Path Attributes field (rfc4271#section-4.3)
1551 :path_attributes: path_attributes field to be decoded in hex
1555 hex_to_decode = path_attributes_hex
1557 while len(hex_to_decode):
1558 attr_flags_hex = hex_to_decode[0]
1559 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1560 # attr_optional_bit = attr_flags & 128
1561 # attr_transitive_bit = attr_flags & 64
1562 # attr_partial_bit = attr_flags & 32
1563 attr_extended_length_bit = attr_flags & 16
1565 attr_type_code_hex = hex_to_decode[1]
1566 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1568 if attr_extended_length_bit:
1569 attr_length_hex = hex_to_decode[2:4]
1570 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1571 attr_value_hex = hex_to_decode[4:4 + attr_length]
1572 hex_to_decode = hex_to_decode[4 + attr_length:]
1574 attr_length_hex = hex_to_decode[2]
1575 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1576 attr_value_hex = hex_to_decode[3:3 + attr_length]
1577 hex_to_decode = hex_to_decode[3 + attr_length:]
1579 if attr_type_code == 1:
1580 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1581 binascii.b2a_hex(attr_flags_hex))
1582 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1583 elif attr_type_code == 2:
1584 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1585 binascii.b2a_hex(attr_flags_hex))
1586 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1587 elif attr_type_code == 3:
1588 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1589 binascii.b2a_hex(attr_flags_hex))
1590 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1591 elif attr_type_code == 4:
1592 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1593 binascii.b2a_hex(attr_flags_hex))
1594 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1595 elif attr_type_code == 5:
1596 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1597 binascii.b2a_hex(attr_flags_hex))
1598 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1599 elif attr_type_code == 6:
1600 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1601 binascii.b2a_hex(attr_flags_hex))
1602 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1603 elif attr_type_code == 7:
1604 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1605 binascii.b2a_hex(attr_flags_hex))
1606 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1607 elif attr_type_code == 9: # rfc4456#section-8
1608 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1609 binascii.b2a_hex(attr_flags_hex))
1610 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1611 elif attr_type_code == 10: # rfc4456#section-8
1612 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1613 binascii.b2a_hex(attr_flags_hex))
1614 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1615 elif attr_type_code == 14: # rfc4760#section-3
1616 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1617 binascii.b2a_hex(attr_flags_hex))
1618 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1619 address_family_identifier_hex = attr_value_hex[0:2]
1620 logger.debug(" Address Family Identifier=0x%s",
1621 binascii.b2a_hex(address_family_identifier_hex))
1622 subsequent_address_family_identifier_hex = attr_value_hex[2]
1623 logger.debug(" Subsequent Address Family Identifier=0x%s",
1624 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1625 next_hop_netaddr_len_hex = attr_value_hex[3]
1626 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1627 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1628 next_hop_netaddr_len,
1629 binascii.b2a_hex(next_hop_netaddr_len_hex))
1630 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1631 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1632 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1633 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1634 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1635 logger.debug(" Reserved=0x%s",
1636 binascii.b2a_hex(reserved_hex))
1637 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1638 logger.debug(" Network Layer Reachability Information=0x%s",
1639 binascii.b2a_hex(nlri_hex))
1640 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1641 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1642 for prefix in nlri_prefix_list:
1643 logger.debug(" nlri_prefix_received: %s", prefix)
1644 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1645 elif attr_type_code == 15: # rfc4760#section-4
1646 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1647 binascii.b2a_hex(attr_flags_hex))
1648 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1649 address_family_identifier_hex = attr_value_hex[0:2]
1650 logger.debug(" Address Family Identifier=0x%s",
1651 binascii.b2a_hex(address_family_identifier_hex))
1652 subsequent_address_family_identifier_hex = attr_value_hex[2]
1653 logger.debug(" Subsequent Address Family Identifier=0x%s",
1654 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1655 wd_hex = attr_value_hex[3:]
1656 logger.debug(" Withdrawn Routes=0x%s",
1657 binascii.b2a_hex(wd_hex))
1658 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1659 logger.debug(" Withdrawn routes prefix list: %s",
1661 for prefix in wdr_prefix_list:
1662 logger.debug(" withdrawn_prefix_received: %s", prefix)
1663 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1665 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1666 binascii.b2a_hex(attr_flags_hex))
1667 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1670 def decode_update_message(self, msg):
1671 """Decode an UPDATE message (rfc4271#section-4.3)
1674 :msg: message to be decoded in hex
1678 logger.debug("Decoding update message:")
1679 # message header - marker
1680 marker_hex = msg[:16]
1681 logger.debug("Message header marker: 0x%s",
1682 binascii.b2a_hex(marker_hex))
1683 # message header - message length
1684 msg_length_hex = msg[16:18]
1685 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1686 logger.debug("Message lenght: 0x%s (%s)",
1687 binascii.b2a_hex(msg_length_hex), msg_length)
1688 # message header - message type
1689 msg_type_hex = msg[18:19]
1690 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1692 with self.storage as stor:
1693 # this will replace the previously stored message
1694 stor['update'] = binascii.hexlify(msg)
1696 logger.debug("Evpn {}".format(self.evpn))
1698 logger.debug("Skipping update decoding due to evpn data expected")
1701 logger.debug("Graceful-restart {}".format(self.grace))
1703 logger.debug("Skipping update decoding due to graceful-restart data expected")
1706 logger.debug("Mvpn {}".format(self.mvpn))
1708 logger.debug("Skipping update decoding due to mvpn data expected")
1711 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1712 if self.l3vpn_mcast:
1713 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1716 logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1717 if self.l3vpn_mcast:
1718 logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1721 logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1722 if self.rt_constrain:
1723 logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1726 logger.debug("Ipv6-Unicast {}".format(self.ipv6))
1728 logger.debug("Skipping update decoding due to Ipv6 data expected")
1731 logger.debug("Allf {}".format(self.allf))
1733 logger.debug("Skipping update decoding")
1737 logger.debug("Message type: 0x%s (update)",
1738 binascii.b2a_hex(msg_type_hex))
1739 # withdrawn routes length
1740 wdr_length_hex = msg[19:21]
1741 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1742 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1743 binascii.b2a_hex(wdr_length_hex), wdr_length)
1745 wdr_hex = msg[21:21 + wdr_length]
1746 logger.debug("Withdrawn routes: 0x%s",
1747 binascii.b2a_hex(wdr_hex))
1748 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1749 logger.debug("Withdrawn routes prefix list: %s",
1751 for prefix in wdr_prefix_list:
1752 logger.debug("withdrawn_prefix_received: %s", prefix)
1753 # total path attribute length
1754 total_pa_length_offset = 21 + wdr_length
1755 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1756 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1757 logger.debug("Total path attribute lenght: 0x%s (%s)",
1758 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1760 pa_offset = total_pa_length_offset + 2
1761 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1762 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1763 self.decode_path_attributes(pa_hex)
1764 # network layer reachability information length
1765 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1766 logger.debug("Calculated NLRI length: %s", nlri_length)
1767 # network layer reachability information
1768 nlri_offset = pa_offset + total_pa_length
1769 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1770 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1771 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1772 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1773 for prefix in nlri_prefix_list:
1774 logger.debug("nlri_prefix_received: %s", prefix)
1776 self.updates_received += 1
1777 self.prefixes_introduced += len(nlri_prefix_list)
1778 self.prefixes_withdrawn += len(wdr_prefix_list)
1780 logger.error("Unexpeced message type 0x%s in 0x%s",
1781 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1783 def wait_for_read(self):
1784 """Read message until timeout (next expected event).
1787 Used when no more updates has to be sent to avoid busy-wait.
1788 Currently it does not return anything.
1790 # Compute time to the first predictable state change
1791 event_time = self.timer.get_next_event_time()
1792 # snapshot_time would be imprecise
1793 wait_timedelta = min(event_time - time.time(), self.wfr)
1794 if wait_timedelta < 0:
1795 # The program got around to waiting to an event in "very near
1796 # future" so late that it became a "past" event, thus tell
1797 # "select" to not wait at all. Passing negative timedelta to
1798 # select() would lead to either waiting forever (for -1) or
1799 # select.error("Invalid parameter") (for everything else).
1801 # And wait for event or something to read.
1803 if not self.rx_activity_detected or not (self.updates_received % 100):
1804 # right time to write statistics to the log (not for every update and
1805 # not too frequently to avoid having large log files)
1806 logger.info("total_received_update_message_counter: %s",
1807 self.updates_received)
1808 logger.info("total_received_nlri_prefix_counter: %s",
1809 self.prefixes_introduced)
1810 logger.info("total_received_withdrawn_prefix_counter: %s",
1811 self.prefixes_withdrawn)
1813 start_time = time.time()
1814 select.select([self.socket], [], [self.socket], wait_timedelta)
1815 timedelta = time.time() - start_time
1816 self.rx_idle_time += timedelta
1817 self.rx_activity_detected = timedelta < 1
1819 if not self.rx_activity_detected or not (self.updates_received % 100):
1820 # right time to write statistics to the log (not for every update and
1821 # not too frequently to avoid having large log files)
1822 logger.info("... idle for %.3fs", timedelta)
1823 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1827 class WriteTracker(object):
1828 """Class tracking enqueueing messages and sending chunks of them."""
1830 def __init__(self, bgp_socket, generator, timer):
1831 """The writter initialisation.
1834 bgp_socket: socket to be used for sending
1835 generator: generator to be used for message generation
1836 timer: timer to be used for scheduling
1838 # References to outside objects,
1839 self.socket = bgp_socket
1840 self.generator = generator
1842 # Really new fields.
1843 # TODO: Would attribute docstrings add anything substantial?
1844 self.sending_message = False
1845 self.bytes_to_send = 0
1848 def enqueue_message_for_sending(self, message):
1849 """Enqueue message and change state.
1852 message: message to be enqueued into the msg_out buffer
1854 self.msg_out += message
1855 self.bytes_to_send += len(message)
1856 self.sending_message = True
1858 def send_message_chunk_is_whole(self):
1859 """Send enqueued data from msg_out buffer
1862 :return: true if no remaining data to send
1864 # We assume there is a msg_out to send and socket is writable.
1865 # print "going to send", repr(self.msg_out)
1866 self.timer.snapshot()
1867 bytes_sent = self.socket.send(self.msg_out)
1868 # Forget the part of message that was sent.
1869 self.msg_out = self.msg_out[bytes_sent:]
1870 self.bytes_to_send -= bytes_sent
1871 if not self.bytes_to_send:
1872 # TODO: Is it possible to hit negative bytes_to_send?
1873 self.sending_message = False
1874 # We should have reset hold timer on peer side.
1875 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1876 # The possible reason for not prioritizing reads is gone.
1881 class StateTracker(object):
1882 """Main loop has state so complex it warrants this separate class."""
1884 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1885 """The state tracker initialisation.
1888 bgp_socket: socket to be used for sending / receiving
1889 generator: generator to be used for message generation
1890 timer: timer to be used for scheduling
1891 inqueue: user initiated messages queue
1892 storage: thread safe dict to store data for the rpc server
1893 cliargs: cli args from the user
1895 # References to outside objects.
1896 self.socket = bgp_socket
1897 self.generator = generator
1900 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1901 l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1902 rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
1903 wait_for_read=cliargs.wfr)
1904 self.writer = WriteTracker(bgp_socket, generator, timer)
1905 # Prioritization state.
1906 self.prioritize_writing = False
1907 # In general, we prioritize reading over writing. But in order
1908 # not to get blocked by neverending reads, we should
1909 # check whether we are not risking running out of holdtime.
1910 # So in some situations, this field is set to True to attempt
1911 # finishing sending a message, after which this field resets
1913 # TODO: Alternative is to switch fairly between reading and
1914 # writing (called round robin from now on).
1915 # Message counting is done in generator.
1916 self.inqueue = inqueue
1918 def perform_one_loop_iteration(self):
1919 """ The main loop iteration
1922 Calculates priority, resolves all conditions, calls
1923 appropriate method and returns to caller to repeat.
1925 self.timer.snapshot()
1926 if not self.prioritize_writing:
1927 if self.timer.is_time_for_my_keepalive():
1928 if not self.writer.sending_message:
1929 # We need to schedule a keepalive ASAP.
1930 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1931 logger.info("KEEP ALIVE is sent.")
1932 # We are sending a message now, so let's prioritize it.
1933 self.prioritize_writing = True
1936 msg = self.inqueue.get_nowait()
1937 logger.info("Received message: {}".format(msg))
1938 msgbin = binascii.unhexlify(msg)
1939 self.writer.enqueue_message_for_sending(msgbin)
1942 # Now we know what our priorities are, we have to check
1943 # which actions are available.
1944 # socket.socket() returns three lists,
1945 # we store them to list of lists.
1946 list_list = select.select([self.socket], [self.socket], [self.socket],
1947 self.timer.report_timedelta)
1948 read_list, write_list, except_list = list_list
1949 # Lists are unpacked, each is either [] or [self.socket],
1950 # so we will test them as boolean.
1952 logger.error("Exceptional state on the socket.")
1953 raise RuntimeError("Exceptional state on socket", self.socket)
1954 # We will do either read or write.
1955 if not (self.prioritize_writing and write_list):
1956 # Either we have no reason to rush writes,
1957 # or the socket is not writable.
1958 # We are focusing on reading here.
1959 if read_list: # there is something to read indeed
1960 # In this case we want to read chunk of message
1961 # and repeat the select,
1962 self.reader.read_message_chunk()
1964 # We were focusing on reading, but nothing to read was there.
1965 # Good time to check peer for hold timer.
1966 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1967 # Quiet on the read front, we can have attempt to write.
1969 # Either we really want to reset peer's view of our hold
1970 # timer, or there was nothing to read.
1971 # Were we in the middle of sending a message?
1972 if self.writer.sending_message:
1973 # Was it the end of a message?
1974 whole = self.writer.send_message_chunk_is_whole()
1975 # We were pressed to send something and we did it.
1976 if self.prioritize_writing and whole:
1977 # We prioritize reading again.
1978 self.prioritize_writing = False
1980 # Finally to check if still update messages to be generated.
1981 if self.generator.remaining_prefixes:
1982 msg_out = self.generator.compose_update_message()
1983 if not self.generator.remaining_prefixes:
1984 # We have just finished update generation,
1985 # end-of-rib is due.
1986 logger.info("All update messages generated.")
1987 logger.info("Storing performance results.")
1988 self.generator.store_results()
1989 logger.info("Finally an END-OF-RIB is sent.")
1990 msg_out += self.generator.update_message(wr_prefixes=[],
1993 self.writer.enqueue_message_for_sending(msg_out)
1994 # Attempt for real sending to be done in next iteration.
1996 # Nothing to write anymore.
1997 # To avoid busy loop, we do idle waiting here.
1998 self.reader.wait_for_read()
2000 # We can neither read nor write.
2001 logger.warning("Input and output both blocked for " +
2002 str(self.timer.report_timedelta) + " seconds.")
2003 # FIXME: Are we sure select has been really waiting
2008 def create_logger(loglevel, logfile):
2009 """Create logger object
2012 :loglevel: log level
2013 :logfile: log file name
2015 :return: logger object
2017 logger = logging.getLogger("logger")
2018 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
2019 console_handler = logging.StreamHandler()
2020 file_handler = logging.FileHandler(logfile, mode="w")
2021 console_handler.setFormatter(log_formatter)
2022 file_handler.setFormatter(log_formatter)
2023 logger.addHandler(console_handler)
2024 logger.addHandler(file_handler)
2025 logger.setLevel(loglevel)
2029 def job(arguments, inqueue, storage):
2030 """One time initialisation and iterations looping.
2032 Establish BGP connection and run iterations.
2035 :arguments: Command line arguments
2036 :inqueue: Data to be sent from play.py
2037 :storage: Shared dict for rpc server
2041 bgp_socket = establish_connection(arguments)
2042 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2043 # Receive open message before sending anything.
2044 # FIXME: Add parameter to send default open message first,
2045 # to work with "you first" peers.
2046 msg_in = read_open_message(bgp_socket)
2047 logger.info(binascii.hexlify(msg_in))
2048 storage['open'] = binascii.hexlify(msg_in)
2049 timer = TimeTracker(msg_in)
2050 generator = MessageGenerator(arguments)
2051 msg_out = generator.open_message()
2052 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2053 # Send our open message to the peer.
2054 bgp_socket.send(msg_out)
2055 # Wait for confirming keepalive.
2056 # TODO: Surely in just one packet?
2057 # Using exact keepalive length to not to see possible updates.
2058 msg_in = bgp_socket.recv(19)
2059 if msg_in != generator.keepalive_message():
2060 error_msg = "Open not confirmed by keepalive, instead got"
2061 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2062 raise MessageError(error_msg, msg_in)
2063 timer.reset_peer_hold_time()
2064 # Send the keepalive to indicate the connection is accepted.
2065 timer.snapshot() # Remember this time.
2066 msg_out = generator.keepalive_message()
2067 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2068 bgp_socket.send(msg_out)
2069 # Use the remembered time.
2070 timer.reset_my_keepalive_time(timer.snapshot_time)
2071 # End of initial handshake phase.
2072 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2073 while True: # main reactor loop
2074 state.perform_one_loop_iteration()
2078 '''Handler for SimpleXMLRPCServer'''
2080 def __init__(self, sendqueue, storage):
2084 :sendqueue: queue for data to be sent towards odl
2085 :storage: thread safe dict
2087 self.queue = sendqueue
2088 self.storage = storage
2090 def send(self, text):
2094 :text: hes string of the data to be sent
2096 self.queue.put(text)
2098 def get(self, text=''):
2099 '''Reads data form the storage
2101 - returns stored data or an empty string, at the moment only
2105 :text: a key to the storage to get the data
2109 with self.storage as stor:
2110 return stor.get(text, '')
2112 def clean(self, text=''):
2113 '''Cleans data form the storage
2116 :text: a key to the storage to clean the data
2118 with self.storage as stor:
2123 def threaded_job(arguments):
2124 """Run the job threaded
2127 :arguments: Command line arguments
2131 amount_left = arguments.amount
2132 utils_left = arguments.multiplicity
2133 prefix_current = arguments.firstprefix
2134 myip_current = arguments.myip
2135 port = arguments.port
2137 rpcqueue = Queue.Queue()
2138 storage = SafeDict()
2141 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
2142 amount_left -= amount_per_util
2145 args = deepcopy(arguments)
2146 args.amount = amount_per_util
2147 args.firstprefix = prefix_current
2148 args.myip = myip_current
2149 thread_args.append(args)
2153 prefix_current += amount_per_util * 16
2158 for t in thread_args:
2159 thread.start_new_thread(job, (t, rpcqueue, storage))
2161 print "Error: unable to start thread."
2164 if arguments.usepeerip:
2165 ip = arguments.peerip
2168 rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2169 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2170 rpcserver.serve_forever()
2173 if __name__ == "__main__":
2174 arguments = parse_arguments()
2175 logger = create_logger(arguments.loglevel, arguments.logfile)
2176 threaded_job(arguments)