1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
24 from copy import deepcopy
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
33 def parse_arguments():
34 """Use argparse to get arguments,
39 parser = argparse.ArgumentParser()
40 # TODO: Should we use --argument-names-with-spaces?
41 str_help = "Autonomous System number use in the stream."
42 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43 # FIXME: We are acting as iBGP peer,
44 # we should mirror AS number from peer's open message.
45 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46 parser.add_argument("--amount", default="1", type=int, help=str_help)
47 str_help = "Maximum number of IP prefixes to be announced in one iteration"
48 parser.add_argument("--insert", default="1", type=int, help=str_help)
49 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51 str_help = "The number of prefixes to process without withdrawals"
52 parser.add_argument("--prefill", default="0", type=int, help=str_help)
53 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54 parser.add_argument("--updates", choices=["single", "separate"],
55 default=["separate"], help=str_help)
56 str_help = "Base prefix IP address for prefix generation"
57 parser.add_argument("--firstprefix", default="8.0.1.0",
58 type=ipaddr.IPv4Address, help=str_help)
59 str_help = "The prefix length."
60 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61 str_help = "Listen for connection, instead of initiating it."
62 parser.add_argument("--listen", action="store_true", help=str_help)
63 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64 "Default value only suitable for listening.")
65 parser.add_argument("--myip", default="0.0.0.0",
66 type=ipaddr.IPv4Address, help=str_help)
67 str_help = ("TCP port to bind to when listening or initiating connection." +
68 "Default only suitable for initiating.")
69 parser.add_argument("--myport", default="0", type=int, help=str_help)
70 str_help = "The IP of the next hop to be placed into the update messages."
71 parser.add_argument("--nexthop", default="192.0.2.1",
72 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73 str_help = "Identifier of the route originator."
74 parser.add_argument("--originator", default=None,
75 type=ipaddr.IPv4Address, dest="originator", help=str_help)
76 str_help = "Cluster list item identifier."
77 parser.add_argument("--cluster", default=None,
78 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79 str_help = ("Numeric IP Address to try to connect to." +
80 "Currently no effect in listening mode.")
81 parser.add_argument("--peerip", default="127.0.0.2",
82 type=ipaddr.IPv4Address, help=str_help)
83 str_help = "TCP port to try to connect to. No effect in listening mode."
84 parser.add_argument("--peerport", default="179", type=int, help=str_help)
85 str_help = "Local hold time."
86 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87 str_help = "Log level (--error, --warning, --info, --debug)"
88 parser.add_argument("--error", dest="loglevel", action="store_const",
89 const=logging.ERROR, default=logging.INFO,
91 parser.add_argument("--warning", dest="loglevel", action="store_const",
92 const=logging.WARNING, default=logging.INFO,
94 parser.add_argument("--info", dest="loglevel", action="store_const",
95 const=logging.INFO, default=logging.INFO,
97 parser.add_argument("--debug", dest="loglevel", action="store_const",
98 const=logging.DEBUG, default=logging.INFO,
100 str_help = "Log file name"
101 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102 str_help = "Trailing part of the csv result files for plotting purposes"
103 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104 str_help = "Minimum number of updates to reach to include result into csv."
105 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
108 str_help = "Link-State NLRI supported"
109 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
110 str_help = "Link-State NLRI: Identifier"
111 parser.add_argument("-lsid", default="1", type=int, help=str_help)
112 str_help = "Link-State NLRI: Tunnel ID"
113 parser.add_argument("-lstid", default="1", type=int, help=str_help)
114 str_help = "Link-State NLRI: LSP ID"
115 parser.add_argument("-lspid", default="1", type=int, help=str_help)
116 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
117 parser.add_argument("--lstsaddr", default="1.2.3.4",
118 type=ipaddr.IPv4Address, help=str_help)
119 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
120 parser.add_argument("--lsteaddr", default="5.6.7.8",
121 type=ipaddr.IPv4Address, help=str_help)
122 str_help = "Link-State NLRI: Identifier Step"
123 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
124 str_help = "Link-State NLRI: Tunnel ID Step"
125 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
126 str_help = "Link-State NLRI: LSP ID Step"
127 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
128 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
129 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
130 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
131 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
132 str_help = "How many play utilities are to be started."
133 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
134 arguments = parser.parse_args()
135 if arguments.multiplicity < 1:
136 print "Multiplicity", arguments.multiplicity, "is not positive."
138 # TODO: Are sanity checks (such as asnumber>=0) required?
142 def establish_connection(arguments):
143 """Establish connection to BGP peer.
146 :arguments: following command-line argumets are used
147 - arguments.myip: local IP address
148 - arguments.myport: local port
149 - arguments.peerip: remote IP address
150 - arguments.peerport: remote port
155 logger.info("Connecting in the listening mode.")
156 logger.debug("Local IP address: " + str(arguments.myip))
157 logger.debug("Local port: " + str(arguments.myport))
158 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
159 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
160 # bind need single tuple as argument
161 listening_socket.bind((str(arguments.myip), arguments.myport))
162 listening_socket.listen(1)
163 bgp_socket, _ = listening_socket.accept()
164 # TODO: Verify client IP is cotroller IP.
165 listening_socket.close()
167 logger.info("Connecting in the talking mode.")
168 logger.debug("Local IP address: " + str(arguments.myip))
169 logger.debug("Local port: " + str(arguments.myport))
170 logger.debug("Remote IP address: " + str(arguments.peerip))
171 logger.debug("Remote port: " + str(arguments.peerport))
172 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
173 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
174 # bind to force specified address and port
175 talking_socket.bind((str(arguments.myip), arguments.myport))
176 # socket does not spead ipaddr, hence str()
177 talking_socket.connect((str(arguments.peerip), arguments.peerport))
178 bgp_socket = talking_socket
179 logger.info("Connected to ODL.")
183 def get_short_int_from_message(message, offset=16):
184 """Extract 2-bytes number from provided message.
187 :message: given message
188 :offset: offset of the short_int inside the message
190 :return: required short_inf value.
192 default offset value is the BGP message size offset.
194 high_byte_int = ord(message[offset])
195 low_byte_int = ord(message[offset + 1])
196 short_int = high_byte_int * 256 + low_byte_int
200 def get_prefix_list_from_hex(prefixes_hex):
201 """Get decoded list of prefixes (rfc4271#section-4.3)
204 :prefixes_hex: list of prefixes to be decoded in hex
206 :return: list of prefixes in the form of ip address (X.X.X.X/X)
210 while offset < len(prefixes_hex):
211 prefix_bit_len_hex = prefixes_hex[offset]
212 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
213 prefix_len = ((prefix_bit_len - 1) / 8) + 1
214 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
215 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
216 offset += 1 + prefix_len
217 prefix_list.append(prefix + "/" + str(prefix_bit_len))
221 class MessageError(ValueError):
222 """Value error with logging optimized for hexlified messages."""
224 def __init__(self, text, message, *args):
227 Store and call super init for textual comment,
228 store raw message which caused it.
232 super(MessageError, self).__init__(text, message, *args)
235 """Generate human readable error message.
238 :return: human readable message as string
240 Use a placeholder string if the message is to be empty.
242 message = binascii.hexlify(self.msg)
244 message = "(empty message)"
245 return self.text + ": " + message
248 def read_open_message(bgp_socket):
249 """Receive peer's OPEN message
252 :bgp_socket: the socket to be read
254 :return: received OPEN message.
256 Performs just basic incomming message checks
258 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
259 # TODO: Can the incoming open message be split in more than one packet?
262 # 37 is minimal length of open message with 4-byte AS number.
264 "Message length (" + str(len(msg_in)) + ") is smaller than "
265 "minimal length of OPEN message with 4-byte AS number (37)"
267 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
268 raise MessageError(error_msg, msg_in)
269 # TODO: We could check BGP marker, but it is defined only later;
271 reported_length = get_short_int_from_message(msg_in)
272 if len(msg_in) != reported_length:
274 "Expected message length (" + reported_length +
275 ") does not match actual length (" + str(len(msg_in)) + ")"
277 logger.error(error_msg + binascii.hexlify(msg_in))
278 raise MessageError(error_msg, msg_in)
279 logger.info("Open message received.")
283 class MessageGenerator(object):
284 """Class which generates messages, holds states and configuration values."""
286 # TODO: Define bgp marker as a class (constant) variable.
287 def __init__(self, args):
288 """Initialisation according to command-line args.
291 :args: argsparser's Namespace object which contains command-line
292 options for MesageGenerator initialisation
294 Calculates and stores default values used later on for
297 self.total_prefix_amount = args.amount
298 # Number of update messages left to be sent.
299 self.remaining_prefixes = self.total_prefix_amount
301 # New parameters initialisation
303 self.prefix_base_default = args.firstprefix
304 self.prefix_length_default = args.prefixlen
305 self.wr_prefixes_default = []
306 self.nlri_prefixes_default = []
307 self.version_default = 4
308 self.my_autonomous_system_default = args.asnumber
309 self.hold_time_default = args.holdtime # Local hold time.
310 self.bgp_identifier_default = int(args.myip)
311 self.next_hop_default = args.nexthop
312 self.originator_id_default = args.originator
313 self.cluster_list_item_default = args.cluster
314 self.single_update_default = args.updates == "single"
315 self.randomize_updates_default = args.updates == "random"
316 self.prefix_count_to_add_default = args.insert
317 self.prefix_count_to_del_default = args.withdraw
318 if self.prefix_count_to_del_default < 0:
319 self.prefix_count_to_del_default = 0
320 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
321 # total number of prefixes must grow to avoid infinite test loop
322 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
323 self.slot_size_default = self.prefix_count_to_add_default
324 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
325 self.results_file_name_default = args.results
326 self.performance_threshold_default = args.threshold
327 self.rfc4760 = args.rfc4760
328 self.bgpls = args.bgpls
329 # Default values when BGP-LS Attributes are used
331 self.prefix_count_to_add_default = 1
332 self.prefix_count_to_del_default = 0
333 self.ls_nlri_default = {"Identifier": args.lsid,
334 "TunnelID": args.lstid,
336 "IPv4TunnelSenderAddress": args.lstsaddr,
337 "IPv4TunnelEndPointAddress": args.lsteaddr}
338 self.lsid_step = args.lsidstep
339 self.lstid_step = args.lstidstep
340 self.lspid_step = args.lspidstep
341 self.lstsaddr_step = args.lstsaddrstep
342 self.lsteaddr_step = args.lsteaddrstep
343 # Default values used for randomized part
344 s1_slots = ((self.total_prefix_amount -
345 self.remaining_prefixes_threshold - 1) /
346 self.prefix_count_to_add_default + 1)
347 s2_slots = ((self.remaining_prefixes_threshold - 1) /
348 (self.prefix_count_to_add_default -
349 self.prefix_count_to_del_default) + 1)
351 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
352 s2_first_index = s1_slots * self.prefix_count_to_add_default
353 s2_last_index = (s2_first_index +
354 s2_slots * (self.prefix_count_to_add_default -
355 self.prefix_count_to_del_default) - 1)
356 self.slot_gap_default = ((self.total_prefix_amount -
357 self.remaining_prefixes_threshold - 1) /
358 self.prefix_count_to_add_default + 1)
359 self.randomize_lowest_default = s2_first_index
360 self.randomize_highest_default = s2_last_index
361 # Initialising counters
362 self.phase1_start_time = 0
363 self.phase1_stop_time = 0
364 self.phase2_start_time = 0
365 self.phase2_stop_time = 0
366 self.phase1_updates_sent = 0
367 self.phase2_updates_sent = 0
368 self.updates_sent = 0
370 self.log_info = args.loglevel <= logging.INFO
371 self.log_debug = args.loglevel <= logging.DEBUG
373 Flags needed for the MessageGenerator performance optimization.
374 Calling logger methods each iteration even with proper log level set
375 slows down significantly the MessageGenerator performance.
376 Measured total generation time (1M updates, dry run, error log level):
377 - logging based on basic logger features: 36,2s
378 - logging based on advanced logger features (lazy logging): 21,2s
379 - conditional calling of logger methods enclosed inside condition: 8,6s
382 logger.info("Generator initialisation")
383 logger.info(" Target total number of prefixes to be introduced: " +
384 str(self.total_prefix_amount))
385 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
386 str(self.prefix_length_default))
387 logger.info(" My Autonomous System number: " +
388 str(self.my_autonomous_system_default))
389 logger.info(" My Hold Time: " + str(self.hold_time_default))
390 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
391 logger.info(" Next Hop: " + str(self.next_hop_default))
392 logger.info(" Originator ID: " + str(self.originator_id_default))
393 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
394 logger.info(" Prefix count to be inserted at once: " +
395 str(self.prefix_count_to_add_default))
396 logger.info(" Prefix count to be withdrawn at once: " +
397 str(self.prefix_count_to_del_default))
398 logger.info(" Fast pre-fill up to " +
399 str(self.total_prefix_amount -
400 self.remaining_prefixes_threshold) + " prefixes")
401 logger.info(" Remaining number of prefixes to be processed " +
402 "in parallel with withdrawals: " +
403 str(self.remaining_prefixes_threshold))
404 logger.debug(" Prefix index range used after pre-fill procedure [" +
405 str(self.randomize_lowest_default) + ", " +
406 str(self.randomize_highest_default) + "]")
407 if self.single_update_default:
408 logger.info(" Common single UPDATE will be generated " +
409 "for both NLRI & WITHDRAWN lists")
411 logger.info(" Two separate UPDATEs will be generated " +
412 "for each NLRI & WITHDRAWN lists")
413 if self.randomize_updates_default:
414 logger.info(" Generation of UPDATE messages will be randomized")
415 logger.info(" Let\'s go ...\n")
417 # TODO: Notification for hold timer expiration can be handy.
419 def store_results(self, file_name=None, threshold=None):
420 """ Stores specified results into files based on file_name value.
423 :param file_name: Trailing (common) part of result file names
424 :param threshold: Minimum number of sent updates needed for each
425 result to be included into result csv file
426 (mainly needed because of the result accuracy)
430 # default values handling
431 # TODO optimize default values handling (use e.g. dicionary.update() approach)
432 if file_name is None:
433 file_name = self.results_file_name_default
434 if threshold is None:
435 threshold = self.performance_threshold_default
436 # performance calculation
437 if self.phase1_updates_sent >= threshold:
438 totals1 = self.phase1_updates_sent
439 performance1 = int(self.phase1_updates_sent /
440 (self.phase1_stop_time - self.phase1_start_time))
444 if self.phase2_updates_sent >= threshold:
445 totals2 = self.phase2_updates_sent
446 performance2 = int(self.phase2_updates_sent /
447 (self.phase2_stop_time - self.phase2_start_time))
452 logger.info("#" * 10 + " Final results " + "#" * 10)
453 logger.info("Number of iterations: " + str(self.iteration))
454 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
455 str(self.phase1_updates_sent))
456 logger.info("The pre-fill phase duration: " +
457 str(self.phase1_stop_time - self.phase1_start_time) + "s")
458 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
459 str(self.phase2_updates_sent))
460 logger.info("The 2nd test phase duration: " +
461 str(self.phase2_stop_time - self.phase2_start_time) + "s")
462 logger.info("Threshold for performance reporting: " + str(threshold))
465 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
466 " route(s) per UPDATE")
467 if self.single_update_default:
468 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
469 "/-" + str(self.prefix_count_to_del_default) +
470 " routes per UPDATE")
472 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
473 "/-" + str(self.prefix_count_to_del_default) +
474 " routes in two UPDATEs")
475 # collecting capacity and performance results
478 if totals1 is not None:
479 totals[phase1_label] = totals1
480 performance[phase1_label] = performance1
481 if totals2 is not None:
482 totals[phase2_label] = totals2
483 performance[phase2_label] = performance2
484 self.write_results_to_file(totals, "totals-" + file_name)
485 self.write_results_to_file(performance, "performance-" + file_name)
487 def write_results_to_file(self, results, file_name):
488 """Writes results to the csv plot file consumable by Jenkins.
491 :param file_name: Name of the (csv) file to be created
497 f = open(file_name, "wt")
499 for key in sorted(results):
500 first_line += key + ", "
501 second_line += str(results[key]) + ", "
502 first_line = first_line[:-2]
503 second_line = second_line[:-2]
504 f.write(first_line + "\n")
505 f.write(second_line + "\n")
506 logger.info("Message generator performance results stored in " +
508 logger.info(" " + first_line)
509 logger.info(" " + second_line)
513 # Return pseudo-randomized (reproducible) index for selected range
514 def randomize_index(self, index, lowest=None, highest=None):
515 """Calculates pseudo-randomized index from selected range.
518 :param index: input index
519 :param lowest: the lowes index from the randomized area
520 :param highest: the highest index from the randomized area
522 :return: the (pseudo)randomized index
524 Created just as a fame for future generator enhancement.
526 # default values handling
527 # TODO optimize default values handling (use e.g. dicionary.update() approach)
529 lowest = self.randomize_lowest_default
531 highest = self.randomize_highest_default
533 if (index >= lowest) and (index <= highest):
534 # we are in the randomized range -> shuffle it inside
535 # the range (now just reverse the order)
536 new_index = highest - (index - lowest)
538 # we are out of the randomized range -> nothing to do
542 def get_ls_nlri_values(self, index):
543 """Generates LS-NLRI parameters.
544 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
547 :param index: index (iteration)
549 :return: dictionary of LS NLRI parameters and values
551 # generating list of LS NLRI parameters
552 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
553 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
554 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
555 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
556 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
557 ls_nlri_values = {"Identifier": identifier,
558 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
559 "TunnelID": tunnel_id, "LSPID": lsp_id,
560 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
561 return ls_nlri_values
563 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
564 prefix_len=None, prefix_count=None, randomize=None):
565 """Generates list of IP address prefixes.
568 :param slot_index: index of group of prefix addresses
569 :param slot_size: size of group of prefix addresses
570 in [number of included prefixes]
571 :param prefix_base: IP address of the first prefix
572 (slot_index = 0, prefix_index = 0)
573 :param prefix_len: length of the prefix in bites
574 (the same as size of netmask)
575 :param prefix_count: number of prefixes to be returned
576 from the specified slot
578 :return: list of generated IP address prefixes
580 # default values handling
581 # TODO optimize default values handling (use e.g. dicionary.update() approach)
582 if slot_size is None:
583 slot_size = self.slot_size_default
584 if prefix_base is None:
585 prefix_base = self.prefix_base_default
586 if prefix_len is None:
587 prefix_len = self.prefix_length_default
588 if prefix_count is None:
589 prefix_count = slot_size
590 if randomize is None:
591 randomize = self.randomize_updates_default
592 # generating list of prefixes
595 prefix_gap = 2 ** (32 - prefix_len)
596 for i in range(prefix_count):
597 prefix_index = slot_index * slot_size + i
599 prefix_index = self.randomize_index(prefix_index)
600 indexes.append(prefix_index)
601 prefixes.append(prefix_base + prefix_index * prefix_gap)
603 logger.debug(" Prefix slot index: " + str(slot_index))
604 logger.debug(" Prefix slot size: " + str(slot_size))
605 logger.debug(" Prefix count: " + str(prefix_count))
606 logger.debug(" Prefix indexes: " + str(indexes))
607 logger.debug(" Prefix list: " + str(prefixes))
610 def compose_update_message(self, prefix_count_to_add=None,
611 prefix_count_to_del=None):
612 """Composes an UPDATE message
615 :param prefix_count_to_add: # of prefixes to put into NLRI list
616 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
618 :return: encoded UPDATE message in HEX
620 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
621 lists or common message wich includes both prefix lists.
622 Updates global counters.
624 # default values handling
625 # TODO optimize default values handling (use e.g. dicionary.update() approach)
626 if prefix_count_to_add is None:
627 prefix_count_to_add = self.prefix_count_to_add_default
628 if prefix_count_to_del is None:
629 prefix_count_to_del = self.prefix_count_to_del_default
631 if self.log_info and not (self.iteration % 1000):
632 logger.info("Iteration: " + str(self.iteration) +
633 " - total remaining prefixes: " +
634 str(self.remaining_prefixes))
636 logger.debug("#" * 10 + " Iteration: " +
637 str(self.iteration) + " " + "#" * 10)
638 logger.debug("Remaining prefixes: " +
639 str(self.remaining_prefixes))
640 # scenario type & one-shot counter
641 straightforward_scenario = (self.remaining_prefixes >
642 self.remaining_prefixes_threshold)
643 if straightforward_scenario:
644 prefix_count_to_del = 0
646 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
647 if not self.phase1_start_time:
648 self.phase1_start_time = time.time()
651 logger.debug("--- COMBINED SCENARIO ---")
652 if not self.phase2_start_time:
653 self.phase2_start_time = time.time()
654 # tailor the number of prefixes if needed
655 prefix_count_to_add = (prefix_count_to_del +
656 min(prefix_count_to_add - prefix_count_to_del,
657 self.remaining_prefixes))
658 # prefix slots selection for insertion and withdrawal
659 slot_index_to_add = self.iteration
660 slot_index_to_del = slot_index_to_add - self.slot_gap_default
661 # getting lists of prefixes for insertion in this iteration
663 logger.debug("Prefixes to be inserted in this iteration:")
664 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
665 prefix_count=prefix_count_to_add)
666 # getting lists of prefixes for withdrawal in this iteration
668 logger.debug("Prefixes to be withdrawn in this iteration:")
669 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
670 prefix_count=prefix_count_to_del)
671 # generating the UPDATE mesage with LS-NLRI only
673 ls_nlri = self.get_ls_nlri_values(self.iteration)
674 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
677 # generating the UPDATE message with prefix lists
678 if self.single_update_default:
679 # Send prefixes to be introduced and withdrawn
680 # in one UPDATE message
681 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
682 nlri_prefixes=prefix_list_to_add)
684 # Send prefixes to be introduced and withdrawn
685 # in separate UPDATE messages (if needed)
686 msg_out = self.update_message(wr_prefixes=[],
687 nlri_prefixes=prefix_list_to_add)
688 if prefix_count_to_del:
689 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
691 # updating counters - who knows ... maybe I am last time here ;)
692 if straightforward_scenario:
693 self.phase1_stop_time = time.time()
694 self.phase1_updates_sent = self.updates_sent
696 self.phase2_stop_time = time.time()
697 self.phase2_updates_sent = (self.updates_sent -
698 self.phase1_updates_sent)
699 # updating totals for the next iteration
701 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
702 # returning the encoded message
705 # Section of message encoders
707 def open_message(self, version=None, my_autonomous_system=None,
708 hold_time=None, bgp_identifier=None):
709 """Generates an OPEN Message (rfc4271#section-4.2)
712 :param version: see the rfc4271#section-4.2
713 :param my_autonomous_system: see the rfc4271#section-4.2
714 :param hold_time: see the rfc4271#section-4.2
715 :param bgp_identifier: see the rfc4271#section-4.2
717 :return: encoded OPEN message in HEX
720 # default values handling
721 # TODO optimize default values handling (use e.g. dicionary.update() approach)
723 version = self.version_default
724 if my_autonomous_system is None:
725 my_autonomous_system = self.my_autonomous_system_default
726 if hold_time is None:
727 hold_time = self.hold_time_default
728 if bgp_identifier is None:
729 bgp_identifier = self.bgp_identifier_default
732 marker_hex = "\xFF" * 16
736 type_hex = struct.pack("B", type)
739 version_hex = struct.pack("B", version)
741 # my_autonomous_system
742 # AS_TRANS value, 23456 decadic.
743 my_autonomous_system_2_bytes = 23456
744 # AS number is mappable to 2 bytes
745 if my_autonomous_system < 65536:
746 my_autonomous_system_2_bytes = my_autonomous_system
747 my_autonomous_system_hex_2_bytes = struct.pack(">H",
748 my_autonomous_system)
751 hold_time_hex = struct.pack(">H", hold_time)
754 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
756 # Optional Parameters
757 optional_parameters_hex = ""
759 optional_parameter_hex = (
760 "\x02" # Param type ("Capability Ad")
761 "\x06" # Length (6 bytes)
762 "\x01" # Capability type (NLRI Unicast),
763 # see RFC 4760, secton 8
764 "\x04" # Capability value length
765 "\x00\x01" # AFI (Ipv4)
767 "\x01" # SAFI (Unicast)
769 optional_parameters_hex += optional_parameter_hex
772 optional_parameter_hex = (
773 "\x02" # Param type ("Capability Ad")
774 "\x06" # Length (6 bytes)
775 "\x01" # Capability type (NLRI Unicast),
776 # see RFC 4760, secton 8
777 "\x04" # Capability value length
778 "\x40\x04" # AFI (BGP-LS)
780 "\x47" # SAFI (BGP-LS)
782 optional_parameters_hex += optional_parameter_hex
784 optional_parameter_hex = (
785 "\x02" # Param type ("Capability Ad")
786 "\x06" # Length (6 bytes)
787 "\x41" # "32 bit AS Numbers Support"
788 # (see RFC 6793, section 3)
789 "\x04" # Capability value length
791 optional_parameter_hex += (
792 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
794 optional_parameters_hex += optional_parameter_hex
796 # Optional Parameters Length
797 optional_parameters_length = len(optional_parameters_hex)
798 optional_parameters_length_hex = struct.pack("B",
799 optional_parameters_length)
801 # Length (big-endian)
803 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
804 len(my_autonomous_system_hex_2_bytes) +
805 len(hold_time_hex) + len(bgp_identifier_hex) +
806 len(optional_parameters_length_hex) +
807 len(optional_parameters_hex)
809 length_hex = struct.pack(">H", length)
817 my_autonomous_system_hex_2_bytes +
820 optional_parameters_length_hex +
821 optional_parameters_hex
825 logger.debug("OPEN message encoding")
826 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
827 logger.debug(" Length=" + str(length) + " (0x" +
828 binascii.hexlify(length_hex) + ")")
829 logger.debug(" Type=" + str(type) + " (0x" +
830 binascii.hexlify(type_hex) + ")")
831 logger.debug(" Version=" + str(version) + " (0x" +
832 binascii.hexlify(version_hex) + ")")
833 logger.debug(" My Autonomous System=" +
834 str(my_autonomous_system_2_bytes) + " (0x" +
835 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
837 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
838 binascii.hexlify(hold_time_hex) + ")")
839 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
840 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
841 logger.debug(" Optional Parameters Length=" +
842 str(optional_parameters_length) + " (0x" +
843 binascii.hexlify(optional_parameters_length_hex) +
845 logger.debug(" Optional Parameters=0x" +
846 binascii.hexlify(optional_parameters_hex))
847 logger.debug("OPEN message encoded: 0x%s",
848 binascii.b2a_hex(message_hex))
852 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
853 wr_prefix_length=None, nlri_prefix_length=None,
854 my_autonomous_system=None, next_hop=None,
855 originator_id=None, cluster_list_item=None,
856 end_of_rib=False, **ls_nlri_params):
857 """Generates an UPDATE Message (rfc4271#section-4.3)
860 :param wr_prefixes: see the rfc4271#section-4.3
861 :param nlri_prefixes: see the rfc4271#section-4.3
862 :param wr_prefix_length: see the rfc4271#section-4.3
863 :param nlri_prefix_length: see the rfc4271#section-4.3
864 :param my_autonomous_system: see the rfc4271#section-4.3
865 :param next_hop: see the rfc4271#section-4.3
867 :return: encoded UPDATE message in HEX
870 # default values handling
871 # TODO optimize default values handling (use e.g. dicionary.update() approach)
872 if wr_prefixes is None:
873 wr_prefixes = self.wr_prefixes_default
874 if nlri_prefixes is None:
875 nlri_prefixes = self.nlri_prefixes_default
876 if wr_prefix_length is None:
877 wr_prefix_length = self.prefix_length_default
878 if nlri_prefix_length is None:
879 nlri_prefix_length = self.prefix_length_default
880 if my_autonomous_system is None:
881 my_autonomous_system = self.my_autonomous_system_default
883 next_hop = self.next_hop_default
884 if originator_id is None:
885 originator_id = self.originator_id_default
886 if cluster_list_item is None:
887 cluster_list_item = self.cluster_list_item_default
888 ls_nlri = self.ls_nlri_default.copy()
889 ls_nlri.update(ls_nlri_params)
892 marker_hex = "\xFF" * 16
896 type_hex = struct.pack("B", type)
899 withdrawn_routes_hex = ""
901 bytes = ((wr_prefix_length - 1) / 8) + 1
902 for prefix in wr_prefixes:
903 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
904 struct.pack(">I", int(prefix))[:bytes])
905 withdrawn_routes_hex += withdrawn_route_hex
907 # Withdrawn Routes Length
908 withdrawn_routes_length = len(withdrawn_routes_hex)
909 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
911 # TODO: to replace hardcoded string by encoding?
913 path_attributes_hex = ""
914 if nlri_prefixes != []:
915 path_attributes_hex += (
916 "\x40" # Flags ("Well-Known")
917 "\x01" # Type (ORIGIN)
921 path_attributes_hex += (
922 "\x40" # Flags ("Well-Known")
923 "\x02" # Type (AS_PATH)
925 "\x02" # AS segment type (AS_SEQUENCE)
926 "\x01" # AS segment length (1)
928 my_as_hex = struct.pack(">I", my_autonomous_system)
929 path_attributes_hex += my_as_hex # AS segment (4 bytes)
930 path_attributes_hex += (
931 "\x40" # Flags ("Well-Known")
932 "\x03" # Type (NEXT_HOP)
935 next_hop_hex = struct.pack(">I", int(next_hop))
936 path_attributes_hex += (
937 next_hop_hex # IP address of the next hop (4 bytes)
939 if originator_id is not None:
940 path_attributes_hex += (
941 "\x80" # Flags ("Optional, non-transitive")
942 "\x09" # Type (ORIGINATOR_ID)
944 ) # ORIGINATOR_ID (4 bytes)
945 path_attributes_hex += struct.pack(">I", int(originator_id))
946 if cluster_list_item is not None:
947 path_attributes_hex += (
948 "\x80" # Flags ("Optional, non-transitive")
949 "\x09" # Type (CLUSTER_LIST)
951 ) # one CLUSTER_LIST item (4 bytes)
952 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
954 if self.bgpls and not end_of_rib:
955 path_attributes_hex += (
956 "\x80" # Flags ("Optional, non-transitive")
957 "\x0e" # Type (MP_REACH_NLRI)
959 "\x40\x04" # AFI (BGP-LS)
960 "\x47" # SAFI (BGP-LS)
961 "\x04" # Next Hop Length (4)
963 path_attributes_hex += struct.pack(">I", int(next_hop))
964 path_attributes_hex += "\x00" # Reserved
965 path_attributes_hex += (
966 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
967 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
968 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
970 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
971 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
972 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
973 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
974 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
976 # Total Path Attributes Length
977 total_path_attributes_length = len(path_attributes_hex)
978 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
980 # Network Layer Reachability Information
983 bytes = ((nlri_prefix_length - 1) / 8) + 1
984 for prefix in nlri_prefixes:
985 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
986 struct.pack(">I", int(prefix))[:bytes])
987 nlri_hex += nlri_prefix_hex
989 # Length (big-endian)
991 len(marker_hex) + 2 + len(type_hex) +
992 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
993 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
995 length_hex = struct.pack(">H", length)
1002 withdrawn_routes_length_hex +
1003 withdrawn_routes_hex +
1004 total_path_attributes_length_hex +
1005 path_attributes_hex +
1010 logger.debug("UPDATE message encoding")
1011 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1012 logger.debug(" Length=" + str(length) + " (0x" +
1013 binascii.hexlify(length_hex) + ")")
1014 logger.debug(" Type=" + str(type) + " (0x" +
1015 binascii.hexlify(type_hex) + ")")
1016 logger.debug(" withdrawn_routes_length=" +
1017 str(withdrawn_routes_length) + " (0x" +
1018 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1019 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1020 str(wr_prefix_length) + " (0x" +
1021 binascii.hexlify(withdrawn_routes_hex) + ")")
1022 if total_path_attributes_length:
1023 logger.debug(" Total Path Attributes Length=" +
1024 str(total_path_attributes_length) + " (0x" +
1025 binascii.hexlify(total_path_attributes_length_hex) + ")")
1026 logger.debug(" Path Attributes=" + "(0x" +
1027 binascii.hexlify(path_attributes_hex) + ")")
1028 logger.debug(" Origin=IGP")
1029 logger.debug(" AS path=" + str(my_autonomous_system))
1030 logger.debug(" Next hop=" + str(next_hop))
1031 if originator_id is not None:
1032 logger.debug(" Originator id=" + str(originator_id))
1033 if cluster_list_item is not None:
1034 logger.debug(" Cluster list=" + str(cluster_list_item))
1036 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1037 logger.debug(" Network Layer Reachability Information=" +
1038 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1039 " (0x" + binascii.hexlify(nlri_hex) + ")")
1040 logger.debug("UPDATE message encoded: 0x" +
1041 binascii.b2a_hex(message_hex))
1044 self.updates_sent += 1
1045 # returning encoded message
1048 def notification_message(self, error_code, error_subcode, data_hex=""):
1049 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1052 :param error_code: see the rfc4271#section-4.5
1053 :param error_subcode: see the rfc4271#section-4.5
1054 :param data_hex: see the rfc4271#section-4.5
1056 :return: encoded NOTIFICATION message in HEX
1060 marker_hex = "\xFF" * 16
1064 type_hex = struct.pack("B", type)
1067 error_code_hex = struct.pack("B", error_code)
1070 error_subcode_hex = struct.pack("B", error_subcode)
1072 # Length (big-endian)
1073 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1074 len(error_subcode_hex) + len(data_hex))
1075 length_hex = struct.pack(">H", length)
1077 # NOTIFICATION Message
1088 logger.debug("NOTIFICATION message encoding")
1089 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1090 logger.debug(" Length=" + str(length) + " (0x" +
1091 binascii.hexlify(length_hex) + ")")
1092 logger.debug(" Type=" + str(type) + " (0x" +
1093 binascii.hexlify(type_hex) + ")")
1094 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1095 binascii.hexlify(error_code_hex) + ")")
1096 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1097 binascii.hexlify(error_subcode_hex) + ")")
1098 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1099 logger.debug("NOTIFICATION message encoded: 0x%s",
1100 binascii.b2a_hex(message_hex))
1104 def keepalive_message(self):
1105 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1108 :return: encoded KEEP ALIVE message in HEX
1112 marker_hex = "\xFF" * 16
1116 type_hex = struct.pack("B", type)
1118 # Length (big-endian)
1119 length = len(marker_hex) + 2 + len(type_hex)
1120 length_hex = struct.pack(">H", length)
1122 # KEEP ALIVE Message
1130 logger.debug("KEEP ALIVE message encoding")
1131 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1132 logger.debug(" Length=" + str(length) + " (0x" +
1133 binascii.hexlify(length_hex) + ")")
1134 logger.debug(" Type=" + str(type) + " (0x" +
1135 binascii.hexlify(type_hex) + ")")
1136 logger.debug("KEEP ALIVE message encoded: 0x%s",
1137 binascii.b2a_hex(message_hex))
1142 class TimeTracker(object):
1143 """Class for tracking timers, both for my keepalives and
1147 def __init__(self, msg_in):
1148 """Initialisation. based on defaults and OPEN message from peer.
1151 msg_in: the OPEN message received from peer.
1153 # Note: Relative time is always named timedelta, to stress that
1154 # the (non-delta) time is absolute.
1155 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1156 # Upper bound for being stuck in the same state, we should
1157 # at least report something before continuing.
1158 # Negotiate the hold timer by taking the smaller
1159 # of the 2 values (mine and the peer's).
1160 hold_timedelta = 180 # Not an attribute of self yet.
1161 # TODO: Make the default value configurable,
1162 # default value could mirror what peer said.
1163 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1164 if hold_timedelta > peer_hold_timedelta:
1165 hold_timedelta = peer_hold_timedelta
1166 if hold_timedelta != 0 and hold_timedelta < 3:
1167 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1168 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1169 self.hold_timedelta = hold_timedelta
1170 # If we do not hear from peer this long, we assume it has died.
1171 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1172 # Upper limit for duration between messages, to avoid being
1173 # declared to be dead.
1174 # The same as calling snapshot(), but also declares a field.
1175 self.snapshot_time = time.time()
1176 # Sometimes we need to store time. This is where to get
1177 # the value from afterwards. Time_keepalive may be too strict.
1178 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1179 # At this time point, peer will be declared dead.
1180 self.my_keepalive_time = None # to be set later
1181 # At this point, we should be sending keepalive message.
1184 """Store current time in instance data to use later."""
1185 # Read as time before something interesting was called.
1186 self.snapshot_time = time.time()
1188 def reset_peer_hold_time(self):
1189 """Move hold time to future as peer has just proven it still lives."""
1190 self.peer_hold_time = time.time() + self.hold_timedelta
1192 # Some methods could rely on self.snapshot_time, but it is better
1193 # to require user to provide it explicitly.
1194 def reset_my_keepalive_time(self, keepalive_time):
1195 """Calculate and set the next my KEEP ALIVE timeout time
1198 :keepalive_time: the initial value of the KEEP ALIVE timer
1200 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1202 def is_time_for_my_keepalive(self):
1203 """Check for my KEEP ALIVE timeout occurence"""
1204 if self.hold_timedelta == 0:
1206 return self.snapshot_time >= self.my_keepalive_time
1208 def get_next_event_time(self):
1209 """Set the time of the next expected or to be sent KEEP ALIVE"""
1210 if self.hold_timedelta == 0:
1211 return self.snapshot_time + 86400
1212 return min(self.my_keepalive_time, self.peer_hold_time)
1214 def check_peer_hold_time(self, snapshot_time):
1215 """Raise error if nothing was read from peer until specified time."""
1216 # Hold time = 0 means keepalive checking off.
1217 if self.hold_timedelta != 0:
1218 # time.time() may be too strict
1219 if snapshot_time > self.peer_hold_time:
1220 logger.error("Peer has overstepped the hold timer.")
1221 raise RuntimeError("Peer has overstepped the hold timer.")
1222 # TODO: Include hold_timedelta?
1223 # TODO: Add notification sending (attempt). That means
1224 # move to write tracker.
1227 class ReadTracker(object):
1228 """Class for tracking read of mesages chunk by chunk and
1232 def __init__(self, bgp_socket, timer):
1233 """The reader initialisation.
1236 bgp_socket: socket to be used for sending
1237 timer: timer to be used for scheduling
1239 # References to outside objects.
1240 self.socket = bgp_socket
1242 # BGP marker length plus length field length.
1243 self.header_length = 18
1244 # TODO: make it class (constant) attribute
1245 # Computation of where next chunk ends depends on whether
1246 # we are beyond length field.
1247 self.reading_header = True
1248 # Countdown towards next size computation.
1249 self.bytes_to_read = self.header_length
1250 # Incremental buffer for message under read.
1252 # Initialising counters
1253 self.updates_received = 0
1254 self.prefixes_introduced = 0
1255 self.prefixes_withdrawn = 0
1256 self.rx_idle_time = 0
1257 self.rx_activity_detected = True
1259 def read_message_chunk(self):
1260 """Read up to one message
1263 Currently it does not return anything.
1265 # TODO: We could return the whole message, currently not needed.
1266 # We assume the socket is readable.
1267 chunk_message = self.socket.recv(self.bytes_to_read)
1268 self.msg_in += chunk_message
1269 self.bytes_to_read -= len(chunk_message)
1270 # TODO: bytes_to_read < 0 is not possible, right?
1271 if not self.bytes_to_read:
1272 # Finished reading a logical block.
1273 if self.reading_header:
1274 # The logical block was a BGP header.
1275 # Now we know the size of the message.
1276 self.reading_header = False
1277 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1279 else: # We have finished reading the body of the message.
1280 # Peer has just proven it is still alive.
1281 self.timer.reset_peer_hold_time()
1282 # TODO: Do we want to count received messages?
1283 # This version ignores the received message.
1284 # TODO: Should we do validation and exit on anything
1285 # besides update or keepalive?
1286 # Prepare state for reading another message.
1287 message_type_hex = self.msg_in[self.header_length]
1288 if message_type_hex == "\x01":
1289 logger.info("OPEN message received: 0x%s",
1290 binascii.b2a_hex(self.msg_in))
1291 elif message_type_hex == "\x02":
1292 logger.debug("UPDATE message received: 0x%s",
1293 binascii.b2a_hex(self.msg_in))
1294 self.decode_update_message(self.msg_in)
1295 elif message_type_hex == "\x03":
1296 logger.info("NOTIFICATION message received: 0x%s",
1297 binascii.b2a_hex(self.msg_in))
1298 elif message_type_hex == "\x04":
1299 logger.info("KEEP ALIVE message received: 0x%s",
1300 binascii.b2a_hex(self.msg_in))
1302 logger.warning("Unexpected message received: 0x%s",
1303 binascii.b2a_hex(self.msg_in))
1305 self.reading_header = True
1306 self.bytes_to_read = self.header_length
1307 # We should not act upon peer_hold_time if we are reading
1308 # something right now.
1311 def decode_path_attributes(self, path_attributes_hex):
1312 """Decode the Path Attributes field (rfc4271#section-4.3)
1315 :path_attributes: path_attributes field to be decoded in hex
1319 hex_to_decode = path_attributes_hex
1321 while len(hex_to_decode):
1322 attr_flags_hex = hex_to_decode[0]
1323 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1324 # attr_optional_bit = attr_flags & 128
1325 # attr_transitive_bit = attr_flags & 64
1326 # attr_partial_bit = attr_flags & 32
1327 attr_extended_length_bit = attr_flags & 16
1329 attr_type_code_hex = hex_to_decode[1]
1330 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1332 if attr_extended_length_bit:
1333 attr_length_hex = hex_to_decode[2:4]
1334 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1335 attr_value_hex = hex_to_decode[4:4 + attr_length]
1336 hex_to_decode = hex_to_decode[4 + attr_length:]
1338 attr_length_hex = hex_to_decode[2]
1339 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1340 attr_value_hex = hex_to_decode[3:3 + attr_length]
1341 hex_to_decode = hex_to_decode[3 + attr_length:]
1343 if attr_type_code == 1:
1344 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1345 binascii.b2a_hex(attr_flags_hex))
1346 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1347 elif attr_type_code == 2:
1348 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1349 binascii.b2a_hex(attr_flags_hex))
1350 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1351 elif attr_type_code == 3:
1352 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1353 binascii.b2a_hex(attr_flags_hex))
1354 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1355 elif attr_type_code == 4:
1356 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1357 binascii.b2a_hex(attr_flags_hex))
1358 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1359 elif attr_type_code == 5:
1360 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1361 binascii.b2a_hex(attr_flags_hex))
1362 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1363 elif attr_type_code == 6:
1364 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1365 binascii.b2a_hex(attr_flags_hex))
1366 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1367 elif attr_type_code == 7:
1368 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1369 binascii.b2a_hex(attr_flags_hex))
1370 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1371 elif attr_type_code == 9: # rfc4456#section-8
1372 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1373 binascii.b2a_hex(attr_flags_hex))
1374 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1375 elif attr_type_code == 10: # rfc4456#section-8
1376 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1377 binascii.b2a_hex(attr_flags_hex))
1378 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1379 elif attr_type_code == 14: # rfc4760#section-3
1380 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1381 binascii.b2a_hex(attr_flags_hex))
1382 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1383 address_family_identifier_hex = attr_value_hex[0:2]
1384 logger.debug(" Address Family Identifier=0x%s",
1385 binascii.b2a_hex(address_family_identifier_hex))
1386 subsequent_address_family_identifier_hex = attr_value_hex[2]
1387 logger.debug(" Subsequent Address Family Identifier=0x%s",
1388 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1389 next_hop_netaddr_len_hex = attr_value_hex[3]
1390 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1391 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1392 next_hop_netaddr_len,
1393 binascii.b2a_hex(next_hop_netaddr_len_hex))
1394 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1395 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1396 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1397 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1398 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1399 logger.debug(" Reserved=0x%s",
1400 binascii.b2a_hex(reserved_hex))
1401 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1402 logger.debug(" Network Layer Reachability Information=0x%s",
1403 binascii.b2a_hex(nlri_hex))
1404 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1405 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1406 for prefix in nlri_prefix_list:
1407 logger.debug(" nlri_prefix_received: %s", prefix)
1408 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1409 elif attr_type_code == 15: # rfc4760#section-4
1410 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1411 binascii.b2a_hex(attr_flags_hex))
1412 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1413 address_family_identifier_hex = attr_value_hex[0:2]
1414 logger.debug(" Address Family Identifier=0x%s",
1415 binascii.b2a_hex(address_family_identifier_hex))
1416 subsequent_address_family_identifier_hex = attr_value_hex[2]
1417 logger.debug(" Subsequent Address Family Identifier=0x%s",
1418 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1419 wd_hex = attr_value_hex[3:]
1420 logger.debug(" Withdrawn Routes=0x%s",
1421 binascii.b2a_hex(wd_hex))
1422 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1423 logger.debug(" Withdrawn routes prefix list: %s",
1425 for prefix in wdr_prefix_list:
1426 logger.debug(" withdrawn_prefix_received: %s", prefix)
1427 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1429 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1430 binascii.b2a_hex(attr_flags_hex))
1431 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1434 def decode_update_message(self, msg):
1435 """Decode an UPDATE message (rfc4271#section-4.3)
1438 :msg: message to be decoded in hex
1442 logger.debug("Decoding update message:")
1443 # message header - marker
1444 marker_hex = msg[:16]
1445 logger.debug("Message header marker: 0x%s",
1446 binascii.b2a_hex(marker_hex))
1447 # message header - message length
1448 msg_length_hex = msg[16:18]
1449 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1450 logger.debug("Message lenght: 0x%s (%s)",
1451 binascii.b2a_hex(msg_length_hex), msg_length)
1452 # message header - message type
1453 msg_type_hex = msg[18:19]
1454 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1456 logger.debug("Message type: 0x%s (update)",
1457 binascii.b2a_hex(msg_type_hex))
1458 # withdrawn routes length
1459 wdr_length_hex = msg[19:21]
1460 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1461 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1462 binascii.b2a_hex(wdr_length_hex), wdr_length)
1464 wdr_hex = msg[21:21 + wdr_length]
1465 logger.debug("Withdrawn routes: 0x%s",
1466 binascii.b2a_hex(wdr_hex))
1467 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1468 logger.debug("Withdrawn routes prefix list: %s",
1470 for prefix in wdr_prefix_list:
1471 logger.debug("withdrawn_prefix_received: %s", prefix)
1472 # total path attribute length
1473 total_pa_length_offset = 21 + wdr_length
1474 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1475 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1476 logger.debug("Total path attribute lenght: 0x%s (%s)",
1477 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1479 pa_offset = total_pa_length_offset + 2
1480 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1481 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1482 self.decode_path_attributes(pa_hex)
1483 # network layer reachability information length
1484 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1485 logger.debug("Calculated NLRI length: %s", nlri_length)
1486 # network layer reachability information
1487 nlri_offset = pa_offset + total_pa_length
1488 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1489 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1490 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1491 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1492 for prefix in nlri_prefix_list:
1493 logger.debug("nlri_prefix_received: %s", prefix)
1495 self.updates_received += 1
1496 self.prefixes_introduced += len(nlri_prefix_list)
1497 self.prefixes_withdrawn += len(wdr_prefix_list)
1499 logger.error("Unexpeced message type 0x%s in 0x%s",
1500 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1502 def wait_for_read(self):
1503 """Read message until timeout (next expected event).
1506 Used when no more updates has to be sent to avoid busy-wait.
1507 Currently it does not return anything.
1509 # Compute time to the first predictable state change
1510 event_time = self.timer.get_next_event_time()
1511 # snapshot_time would be imprecise
1512 wait_timedelta = min(event_time - time.time(), 10)
1513 if wait_timedelta < 0:
1514 # The program got around to waiting to an event in "very near
1515 # future" so late that it became a "past" event, thus tell
1516 # "select" to not wait at all. Passing negative timedelta to
1517 # select() would lead to either waiting forever (for -1) or
1518 # select.error("Invalid parameter") (for everything else).
1520 # And wait for event or something to read.
1522 if not self.rx_activity_detected or not (self.updates_received % 100):
1523 # right time to write statistics to the log (not for every update and
1524 # not too frequently to avoid having large log files)
1525 logger.info("total_received_update_message_counter: %s",
1526 self.updates_received)
1527 logger.info("total_received_nlri_prefix_counter: %s",
1528 self.prefixes_introduced)
1529 logger.info("total_received_withdrawn_prefix_counter: %s",
1530 self.prefixes_withdrawn)
1532 start_time = time.time()
1533 select.select([self.socket], [], [self.socket], wait_timedelta)
1534 timedelta = time.time() - start_time
1535 self.rx_idle_time += timedelta
1536 self.rx_activity_detected = timedelta < 1
1538 if not self.rx_activity_detected or not (self.updates_received % 100):
1539 # right time to write statistics to the log (not for every update and
1540 # not too frequently to avoid having large log files)
1541 logger.info("... idle for %.3fs", timedelta)
1542 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1546 class WriteTracker(object):
1547 """Class tracking enqueueing messages and sending chunks of them."""
1549 def __init__(self, bgp_socket, generator, timer):
1550 """The writter initialisation.
1553 bgp_socket: socket to be used for sending
1554 generator: generator to be used for message generation
1555 timer: timer to be used for scheduling
1557 # References to outside objects,
1558 self.socket = bgp_socket
1559 self.generator = generator
1561 # Really new fields.
1562 # TODO: Would attribute docstrings add anything substantial?
1563 self.sending_message = False
1564 self.bytes_to_send = 0
1567 def enqueue_message_for_sending(self, message):
1568 """Enqueue message and change state.
1571 message: message to be enqueued into the msg_out buffer
1573 self.msg_out += message
1574 self.bytes_to_send += len(message)
1575 self.sending_message = True
1577 def send_message_chunk_is_whole(self):
1578 """Send enqueued data from msg_out buffer
1581 :return: true if no remaining data to send
1583 # We assume there is a msg_out to send and socket is writable.
1584 # print "going to send", repr(self.msg_out)
1585 self.timer.snapshot()
1586 bytes_sent = self.socket.send(self.msg_out)
1587 # Forget the part of message that was sent.
1588 self.msg_out = self.msg_out[bytes_sent:]
1589 self.bytes_to_send -= bytes_sent
1590 if not self.bytes_to_send:
1591 # TODO: Is it possible to hit negative bytes_to_send?
1592 self.sending_message = False
1593 # We should have reset hold timer on peer side.
1594 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1595 # The possible reason for not prioritizing reads is gone.
1600 class StateTracker(object):
1601 """Main loop has state so complex it warrants this separate class."""
1603 def __init__(self, bgp_socket, generator, timer):
1604 """The state tracker initialisation.
1607 bgp_socket: socket to be used for sending / receiving
1608 generator: generator to be used for message generation
1609 timer: timer to be used for scheduling
1611 # References to outside objects.
1612 self.socket = bgp_socket
1613 self.generator = generator
1616 self.reader = ReadTracker(bgp_socket, timer)
1617 self.writer = WriteTracker(bgp_socket, generator, timer)
1618 # Prioritization state.
1619 self.prioritize_writing = False
1620 # In general, we prioritize reading over writing. But in order
1621 # not to get blocked by neverending reads, we should
1622 # check whether we are not risking running out of holdtime.
1623 # So in some situations, this field is set to True to attempt
1624 # finishing sending a message, after which this field resets
1626 # TODO: Alternative is to switch fairly between reading and
1627 # writing (called round robin from now on).
1628 # Message counting is done in generator.
1630 def perform_one_loop_iteration(self):
1631 """ The main loop iteration
1634 Calculates priority, resolves all conditions, calls
1635 appropriate method and returns to caller to repeat.
1637 self.timer.snapshot()
1638 if not self.prioritize_writing:
1639 if self.timer.is_time_for_my_keepalive():
1640 if not self.writer.sending_message:
1641 # We need to schedule a keepalive ASAP.
1642 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1643 logger.info("KEEP ALIVE is sent.")
1644 # We are sending a message now, so let's prioritize it.
1645 self.prioritize_writing = True
1646 # Now we know what our priorities are, we have to check
1647 # which actions are available.
1648 # socket.socket() returns three lists,
1649 # we store them to list of lists.
1650 list_list = select.select([self.socket], [self.socket], [self.socket],
1651 self.timer.report_timedelta)
1652 read_list, write_list, except_list = list_list
1653 # Lists are unpacked, each is either [] or [self.socket],
1654 # so we will test them as boolean.
1656 logger.error("Exceptional state on the socket.")
1657 raise RuntimeError("Exceptional state on socket", self.socket)
1658 # We will do either read or write.
1659 if not (self.prioritize_writing and write_list):
1660 # Either we have no reason to rush writes,
1661 # or the socket is not writable.
1662 # We are focusing on reading here.
1663 if read_list: # there is something to read indeed
1664 # In this case we want to read chunk of message
1665 # and repeat the select,
1666 self.reader.read_message_chunk()
1668 # We were focusing on reading, but nothing to read was there.
1669 # Good time to check peer for hold timer.
1670 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1671 # Quiet on the read front, we can have attempt to write.
1673 # Either we really want to reset peer's view of our hold
1674 # timer, or there was nothing to read.
1675 # Were we in the middle of sending a message?
1676 if self.writer.sending_message:
1677 # Was it the end of a message?
1678 whole = self.writer.send_message_chunk_is_whole()
1679 # We were pressed to send something and we did it.
1680 if self.prioritize_writing and whole:
1681 # We prioritize reading again.
1682 self.prioritize_writing = False
1684 # Finally to check if still update messages to be generated.
1685 if self.generator.remaining_prefixes:
1686 msg_out = self.generator.compose_update_message()
1687 if not self.generator.remaining_prefixes:
1688 # We have just finished update generation,
1689 # end-of-rib is due.
1690 logger.info("All update messages generated.")
1691 logger.info("Storing performance results.")
1692 self.generator.store_results()
1693 logger.info("Finally an END-OF-RIB is sent.")
1694 msg_out += self.generator.update_message(wr_prefixes=[],
1697 self.writer.enqueue_message_for_sending(msg_out)
1698 # Attempt for real sending to be done in next iteration.
1700 # Nothing to write anymore.
1701 # To avoid busy loop, we do idle waiting here.
1702 self.reader.wait_for_read()
1704 # We can neither read nor write.
1705 logger.warning("Input and output both blocked for " +
1706 str(self.timer.report_timedelta) + " seconds.")
1707 # FIXME: Are we sure select has been really waiting
1712 def create_logger(loglevel, logfile):
1713 """Create logger object
1716 :loglevel: log level
1717 :logfile: log file name
1719 :return: logger object
1721 logger = logging.getLogger("logger")
1722 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1723 console_handler = logging.StreamHandler()
1724 file_handler = logging.FileHandler(logfile, mode="w")
1725 console_handler.setFormatter(log_formatter)
1726 file_handler.setFormatter(log_formatter)
1727 logger.addHandler(console_handler)
1728 logger.addHandler(file_handler)
1729 logger.setLevel(loglevel)
1734 """One time initialisation and iterations looping.
1736 Establish BGP connection and run iterations.
1739 :arguments: Command line arguments
1743 bgp_socket = establish_connection(arguments)
1744 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1745 # Receive open message before sending anything.
1746 # FIXME: Add parameter to send default open message first,
1747 # to work with "you first" peers.
1748 msg_in = read_open_message(bgp_socket)
1749 timer = TimeTracker(msg_in)
1750 generator = MessageGenerator(arguments)
1751 msg_out = generator.open_message()
1752 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1753 # Send our open message to the peer.
1754 bgp_socket.send(msg_out)
1755 # Wait for confirming keepalive.
1756 # TODO: Surely in just one packet?
1757 # Using exact keepalive length to not to see possible updates.
1758 msg_in = bgp_socket.recv(19)
1759 if msg_in != generator.keepalive_message():
1760 error_msg = "Open not confirmed by keepalive, instead got"
1761 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1762 raise MessageError(error_msg, msg_in)
1763 timer.reset_peer_hold_time()
1764 # Send the keepalive to indicate the connection is accepted.
1765 timer.snapshot() # Remember this time.
1766 msg_out = generator.keepalive_message()
1767 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1768 bgp_socket.send(msg_out)
1769 # Use the remembered time.
1770 timer.reset_my_keepalive_time(timer.snapshot_time)
1771 # End of initial handshake phase.
1772 state = StateTracker(bgp_socket, generator, timer)
1773 while True: # main reactor loop
1774 state.perform_one_loop_iteration()
1777 def threaded_job(arguments):
1778 """Run the job threaded
1781 :arguments: Command line arguments
1785 amount_left = arguments.amount
1786 utils_left = arguments.multiplicity
1787 prefix_current = arguments.firstprefix
1788 myip_current = arguments.myip
1792 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1793 amount_left -= amount_per_util
1796 args = deepcopy(arguments)
1797 args.amount = amount_per_util
1798 args.firstprefix = prefix_current
1799 args.myip = myip_current
1800 thread_args.append(args)
1804 prefix_current += amount_per_util * 16
1809 for t in thread_args:
1810 thread.start_new_thread(job, (t,))
1812 print "Error: unable to start thread."
1815 # Work remains forever
1820 if __name__ == "__main__":
1821 arguments = parse_arguments()
1822 logger = create_logger(arguments.loglevel, arguments.logfile)
1823 threaded_job(arguments)