1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
29 def parse_arguments():
30 """Use argparse to get arguments,
35 parser = argparse.ArgumentParser()
36 # TODO: Should we use --argument-names-with-spaces?
37 str_help = "Autonomous System number use in the stream."
38 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
39 # FIXME: We are acting as iBGP peer,
40 # we should mirror AS number from peer's open message.
41 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
42 parser.add_argument("--amount", default="1", type=int, help=str_help)
43 str_help = "Maximum number of IP prefixes to be announced in one iteration"
44 parser.add_argument("--insert", default="1", type=int, help=str_help)
45 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
46 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
47 str_help = "The number of prefixes to process without withdrawals"
48 parser.add_argument("--prefill", default="0", type=int, help=str_help)
49 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
50 parser.add_argument("--updates", choices=["single", "mixed"],
51 default=["mixed"], help=str_help)
52 str_help = "Base prefix IP address for prefix generation"
53 parser.add_argument("--firstprefix", default="8.0.1.0",
54 type=ipaddr.IPv4Address, help=str_help)
55 str_help = "The prefix length."
56 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
57 str_help = "Listen for connection, instead of initiating it."
58 parser.add_argument("--listen", action="store_true", help=str_help)
59 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
60 "Default value only suitable for listening.")
61 parser.add_argument("--myip", default="0.0.0.0",
62 type=ipaddr.IPv4Address, help=str_help)
63 str_help = ("TCP port to bind to when listening or initiating connection." +
64 "Default only suitable for initiating.")
65 parser.add_argument("--myport", default="0", type=int, help=str_help)
66 str_help = "The IP of the next hop to be placed into the update messages."
67 parser.add_argument("--nexthop", default="192.0.2.1",
68 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
69 str_help = ("Numeric IP Address to try to connect to." +
70 "Currently no effect in listening mode.")
71 parser.add_argument("--peerip", default="127.0.0.2",
72 type=ipaddr.IPv4Address, help=str_help)
73 str_help = "TCP port to try to connect to. No effect in listening mode."
74 parser.add_argument("--peerport", default="179", type=int, help=str_help)
75 str_help = "Local hold time."
76 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
77 str_help = "Log level (--error, --warning, --info, --debug)"
78 parser.add_argument("--error", dest="loglevel", action="store_const",
79 const=logging.ERROR, default=logging.INFO,
81 parser.add_argument("--warning", dest="loglevel", action="store_const",
82 const=logging.WARNING, default=logging.INFO,
84 parser.add_argument("--info", dest="loglevel", action="store_const",
85 const=logging.INFO, default=logging.INFO,
87 parser.add_argument("--debug", dest="loglevel", action="store_const",
88 const=logging.DEBUG, default=logging.INFO,
90 str_help = "Trailing part of the csv result files for plotting purposes"
91 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
92 str_help = "Minimum number of updates to reach to include result into csv."
93 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
94 arguments = parser.parse_args()
95 # TODO: Are sanity checks (such as asnumber>=0) required?
99 def establish_connection(arguments):
100 """Establish connection to BGP peer.
103 :arguments: following command-line argumets are used
104 - arguments.myip: local IP address
105 - arguments.myport: local port
106 - arguments.peerip: remote IP address
107 - arguments.peerport: remote port
112 stdout_logger.info("Connecting in the listening mode.")
113 stdout_logger.debug("Local IP address: " + str(arguments.myip))
114 stdout_logger.debug("Local port: " + str(arguments.myport))
115 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
116 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
117 # bind need single tuple as argument
118 listening_socket.bind((str(arguments.myip), arguments.myport))
119 listening_socket.listen(1)
120 bgp_socket, _ = listening_socket.accept()
121 # TODO: Verify client IP is cotroller IP.
122 listening_socket.close()
124 stdout_logger.info("Connecting in the talking mode.")
125 stdout_logger.debug("Local IP address: " + str(arguments.myip))
126 stdout_logger.debug("Local port: " + str(arguments.myport))
127 stdout_logger.debug("Remote IP address: " + str(arguments.peerip))
128 stdout_logger.debug("Remote port: " + str(arguments.peerport))
129 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
130 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
131 # bind to force specified address and port
132 talking_socket.bind((str(arguments.myip), arguments.myport))
133 # socket does not spead ipaddr, hence str()
134 talking_socket.connect((str(arguments.peerip), arguments.peerport))
135 bgp_socket = talking_socket
136 stdout_logger.info("Connected to ODL.")
140 def get_short_int_from_message(message, offset=16):
141 """Extract 2-bytes number from provided message.
144 :message: given message
145 :offset: offset of the short_int inside the message
147 :return: required short_inf value.
149 default offset value is the BGP message size offset.
151 high_byte_int = ord(message[offset])
152 low_byte_int = ord(message[offset + 1])
153 short_int = high_byte_int * 256 + low_byte_int
157 class MessageError(ValueError):
158 """Value error with logging optimized for hexlified messages."""
160 def __init__(self, text, message, *args):
163 Store and call super init for textual comment,
164 store raw message which caused it.
168 super(MessageError, self).__init__(text, message, *args)
171 """Generate human readable error message.
174 :return: human readable message as string
176 Use a placeholder string if the message is to be empty.
178 message = binascii.hexlify(self.msg)
180 message = "(empty message)"
181 return self.text + ": " + message
184 def read_open_message(bgp_socket):
185 """Receive peer's OPEN message
188 :bgp_socket: the socket to be read
190 :return: received OPEN message.
192 Performs just basic incomming message checks
194 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
195 # TODO: Can the incoming open message be split in more than one packet?
198 # 37 is minimal length of open message with 4-byte AS number.
199 stdout_logger.error("Got something else than open with 4-byte AS number: " +
200 binascii.hexlify(msg_in))
201 raise MessageError("Got something else than open with 4-byte AS number",
203 # TODO: We could check BGP marker, but it is defined only later;
205 reported_length = get_short_int_from_message(msg_in)
206 if len(msg_in) != reported_length:
207 stdout_logger.error("Message length is not " + str(reported_length) +
208 " as stated in " + binascii.hexlify(msg_in))
209 raise MessageError("Message length is not " + reported_length +
210 " as stated in ", msg_in)
211 stdout_logger.info("Open message received.")
215 class MessageGenerator(object):
216 """Class which generates messages, holds states and configuration values."""
218 # TODO: Define bgp marker as a class (constant) variable.
219 def __init__(self, args):
220 """Initialisation according to command-line args.
223 :args: argsparser's Namespace object which contains command-line
224 options for MesageGenerator initialisation
226 Calculates and stores default values used later on for
229 self.total_prefix_amount = args.amount
230 # Number of update messages left to be sent.
231 self.remaining_prefixes = self.total_prefix_amount
233 # New parameters initialisation
235 self.prefix_base_default = args.firstprefix
236 self.prefix_length_default = args.prefixlen
237 self.wr_prefixes_default = []
238 self.nlri_prefixes_default = []
239 self.version_default = 4
240 self.my_autonomous_system_default = args.asnumber
241 self.hold_time_default = args.holdtime # Local hold time.
242 self.bgp_identifier_default = int(args.myip)
243 self.next_hop_default = args.nexthop
244 self.single_update_default = args.updates == "single"
245 self.randomize_updates_default = args.updates == "random"
246 self.prefix_count_to_add_default = args.insert
247 self.prefix_count_to_del_default = args.withdraw
248 if self.prefix_count_to_del_default < 0:
249 self.prefix_count_to_del_default = 0
250 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
251 # total number of prefixes must grow to avoid infinite test loop
252 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
253 self.slot_size_default = self.prefix_count_to_add_default
254 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
255 self.results_file_name_default = args.results
256 self.performance_threshold_default = args.threshold
257 # Default values used for randomized part
258 s1_slots = ((self.total_prefix_amount -
259 self.remaining_prefixes_threshold - 1) /
260 self.prefix_count_to_add_default + 1)
261 s2_slots = ((self.remaining_prefixes_threshold - 1) /
262 (self.prefix_count_to_add_default -
263 self.prefix_count_to_del_default) + 1)
265 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
266 s2_first_index = s1_slots * self.prefix_count_to_add_default
267 s2_last_index = (s2_first_index +
268 s2_slots * (self.prefix_count_to_add_default -
269 self.prefix_count_to_del_default) - 1)
270 self.slot_gap_default = ((self.total_prefix_amount -
271 self.remaining_prefixes_threshold - 1) /
272 self.prefix_count_to_add_default + 1)
273 self.randomize_lowest_default = s2_first_index
274 self.randomize_highest_default = s2_last_index
276 # Initialising counters
277 self.phase1_start_time = 0
278 self.phase1_stop_time = 0
279 self.phase2_start_time = 0
280 self.phase2_stop_time = 0
281 self.phase1_updates_sent = 0
282 self.phase2_updates_sent = 0
283 self.updates_sent = 0
285 self.log_info = args.loglevel <= logging.INFO
286 self.log_debug = args.loglevel <= logging.DEBUG
288 Flags needed for the MessageGenerator performance optimization.
289 Calling logger methods each iteration even with proper log level set
290 slows down significantly the MessageGenerator performance.
291 Measured total generation time (1M updates, dry run, error log level):
292 - logging based on basic logger features: 36,2s
293 - logging based on advanced logger features (lazy logging): 21,2s
294 - conditional calling of logger methods enclosed inside condition: 8,6s
297 stdout_logger.info("Generator initialisation")
298 stdout_logger.info(" Target total number of prefixes to be introduced: " +
299 str(self.total_prefix_amount))
300 stdout_logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
301 str(self.prefix_length_default))
302 stdout_logger.info(" My Autonomous System number: " +
303 str(self.my_autonomous_system_default))
304 stdout_logger.info(" My Hold Time: " + str(self.hold_time_default))
305 stdout_logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
306 stdout_logger.info(" Next Hop: " + str(self.next_hop_default))
307 stdout_logger.info(" Prefix count to be inserted at once: " +
308 str(self.prefix_count_to_add_default))
309 stdout_logger.info(" Prefix count to be withdrawn at once: " +
310 str(self.prefix_count_to_del_default))
311 stdout_logger.info(" Fast pre-fill up to " +
312 str(self.total_prefix_amount -
313 self.remaining_prefixes_threshold) + " prefixes")
314 stdout_logger.info(" Remaining number of prefixes to be processed " +
315 "in parallel with withdrawals: " +
316 str(self.remaining_prefixes_threshold))
317 stdout_logger.debug(" Prefix index range used after pre-fill procedure [" +
318 str(self.randomize_lowest_default) + ", " +
319 str(self.randomize_highest_default) + "]")
320 if self.single_update_default:
321 stdout_logger.info(" Common single UPDATE will be generated " +
322 "for both NLRI & WITHDRAWN lists")
324 stdout_logger.info(" Two separate UPDATEs will be generated " +
325 "for each NLRI & WITHDRAWN lists")
326 if self.randomize_updates_default:
327 stdout_logger.info(" Generation of UPDATE messages will be randomized")
328 stdout_logger.info(" Let\"s go ...\n")
330 # TODO: Notification for hold timer expiration can be handy.
332 def store_results(self, file_name=None, threshold=None):
333 """ Stores specified results into files based on file_name value.
336 :param file_name: Trailing (common) part of result file names
337 :param threshold: Minimum number of sent updates needed for each
338 result to be included into result csv file
339 (mainly needed because of the result accuracy)
343 # default values handling
344 if file_name is None:
345 file_name = self.results_file_name_default
346 if threshold is None:
347 threshold = self.performance_threshold_default
348 # performance calculation
349 if self.phase1_updates_sent >= threshold:
350 totals1 = self.phase1_updates_sent
351 performance1 = int(self.phase1_updates_sent /
352 (self.phase1_stop_time - self.phase1_start_time))
356 if self.phase2_updates_sent >= threshold:
357 totals2 = self.phase2_updates_sent
358 performance2 = int(self.phase2_updates_sent /
359 (self.phase2_stop_time - self.phase2_start_time))
364 stdout_logger.info("#" * 10 + " Final results " + "#" * 10)
365 stdout_logger.info("Number of iterations: " + str(self.iteration))
366 stdout_logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
367 str(self.phase1_updates_sent))
368 stdout_logger.info("The pre-fill phase duration: " +
369 str(self.phase1_stop_time - self.phase1_start_time) + "s")
370 stdout_logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
371 str(self.phase2_updates_sent))
372 stdout_logger.info("The 2nd test phase duration: " +
373 str(self.phase2_stop_time - self.phase2_start_time) + "s")
374 stdout_logger.info("Threshold for performance reporting: " + str(threshold))
377 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
378 " route(s) per UPDATE")
379 if self.single_update_default:
380 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
381 "/-" + str(self.prefix_count_to_del_default) +
382 " routes per UPDATE")
384 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
385 "/-" + str(self.prefix_count_to_del_default) +
386 " routes in two UPDATEs")
387 # collecting capacity and performance results
390 if totals1 is not None:
391 totals[phase1_label] = totals1
392 performance[phase1_label] = performance1
393 if totals2 is not None:
394 totals[phase2_label] = totals2
395 performance[phase2_label] = performance2
396 self.write_results_to_file(totals, "totals-" + file_name)
397 self.write_results_to_file(performance, "performance-" + file_name)
399 def write_results_to_file(self, results, file_name):
400 """Writes results to the csv plot file consumable by Jenkins.
403 :param file_name: Name of the (csv) file to be created
409 f = open(file_name, "wt")
411 for key in sorted(results):
412 first_line += key + ", "
413 second_line += str(results[key]) + ", "
414 first_line = first_line[:-2]
415 second_line = second_line[:-2]
416 f.write(first_line + "\n")
417 f.write(second_line + "\n")
418 stdout_logger.info("Message generator performance results stored in " +
420 stdout_logger.info(" " + first_line)
421 stdout_logger.info(" " + second_line)
425 # Return pseudo-randomized (reproducible) index for selected range
426 def randomize_index(self, index, lowest=None, highest=None):
427 """Calculates pseudo-randomized index from selected range.
430 :param index: input index
431 :param lowest: the lowes index from the randomized area
432 :param highest: the highest index from the randomized area
434 :return: the (pseudo)randomized index
436 Created just as a fame for future generator enhancement.
438 # default values handling
440 lowest = self.randomize_lowest_default
442 highest = self.randomize_highest_default
444 if (index >= lowest) and (index <= highest):
445 # we are in the randomized range -> shuffle it inside
446 # the range (now just reverse the order)
447 new_index = highest - (index - lowest)
449 # we are out of the randomized range -> nothing to do
453 # Get list of prefixes
454 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
455 prefix_len=None, prefix_count=None, randomize=None):
456 """Generates list of IP address prefixes.
459 :param slot_index: index of group of prefix addresses
460 :param slot_size: size of group of prefix addresses
461 in [number of included prefixes]
462 :param prefix_base: IP address of the first prefix
463 (slot_index = 0, prefix_index = 0)
464 :param prefix_len: length of the prefix in bites
465 (the same as size of netmask)
466 :param prefix_count: number of prefixes to be returned
467 from the specified slot
469 :return: list of generated IP address prefixes
471 # default values handling
472 if slot_size is None:
473 slot_size = self.slot_size_default
474 if prefix_base is None:
475 prefix_base = self.prefix_base_default
476 if prefix_len is None:
477 prefix_len = self.prefix_length_default
478 if prefix_count is None:
479 prefix_count = slot_size
480 if randomize is None:
481 randomize = self.randomize_updates_default
482 # generating list of prefixes
485 prefix_gap = 2 ** (32 - prefix_len)
486 for i in range(prefix_count):
487 prefix_index = slot_index * slot_size + i
489 prefix_index = self.randomize_index(prefix_index)
490 indexes.append(prefix_index)
491 prefixes.append(prefix_base + prefix_index * prefix_gap)
493 stdout_logger.debug(" Prefix slot index: " + str(slot_index))
494 stdout_logger.debug(" Prefix slot size: " + str(slot_size))
495 stdout_logger.debug(" Prefix count: " + str(prefix_count))
496 stdout_logger.debug(" Prefix indexes: " + str(indexes))
497 stdout_logger.debug(" Prefix list: " + str(prefixes))
500 def compose_update_message(self, prefix_count_to_add=None,
501 prefix_count_to_del=None):
502 """Composes an UPDATE message
505 :param prefix_count_to_add: # of prefixes to put into NLRI list
506 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
508 :return: encoded UPDATE message in HEX
510 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
511 lists or common message wich includes both prefix lists.
512 Updates global counters.
514 # default values handling
515 if prefix_count_to_add is None:
516 prefix_count_to_add = self.prefix_count_to_add_default
517 if prefix_count_to_del is None:
518 prefix_count_to_del = self.prefix_count_to_del_default
520 if self.log_info and not (self.iteration % 1000):
521 stdout_logger.info("Iteration: " + str(self.iteration) +
522 " - total remaining prefixes: " +
523 str(self.remaining_prefixes))
525 stdout_logger.debug("#" * 10 + " Iteration: " +
526 str(self.iteration) + " " + "#" * 10)
527 stdout_logger.debug("Remaining prefixes: " +
528 str(self.remaining_prefixes))
529 # scenario type & one-shot counter
530 straightforward_scenario = (self.remaining_prefixes >
531 self.remaining_prefixes_threshold)
532 if straightforward_scenario:
533 prefix_count_to_del = 0
535 stdout_logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
536 if not self.phase1_start_time:
537 self.phase1_start_time = time.time()
540 stdout_logger.debug("--- COMBINED SCENARIO ---")
541 if not self.phase2_start_time:
542 self.phase2_start_time = time.time()
543 # tailor the number of prefixes if needed
544 prefix_count_to_add = (prefix_count_to_del +
545 min(prefix_count_to_add - prefix_count_to_del,
546 self.remaining_prefixes))
547 # prefix slots selection for insertion and withdrawal
548 slot_index_to_add = self.iteration
549 slot_index_to_del = slot_index_to_add - self.slot_gap_default
550 # getting lists of prefixes for insertion in this iteration
552 stdout_logger.debug("Prefixes to be inserted in this iteration:")
553 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
554 prefix_count=prefix_count_to_add)
555 # getting lists of prefixes for withdrawal in this iteration
557 stdout_logger.debug("Prefixes to be withdrawn in this iteration:")
558 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
559 prefix_count=prefix_count_to_del)
560 # generating the mesage
561 if self.single_update_default:
562 # Send prefixes to be introduced and withdrawn
563 # in one UPDATE message
564 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
565 nlri_prefixes=prefix_list_to_add)
567 # Send prefixes to be introduced and withdrawn
568 # in separate UPDATE messages (if needed)
569 msg_out = self.update_message(wr_prefixes=[],
570 nlri_prefixes=prefix_list_to_add)
571 if prefix_count_to_del:
572 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
574 # updating counters - who knows ... maybe I am last time here ;)
575 if straightforward_scenario:
576 self.phase1_stop_time = time.time()
577 self.phase1_updates_sent = self.updates_sent
579 self.phase2_stop_time = time.time()
580 self.phase2_updates_sent = (self.updates_sent -
581 self.phase1_updates_sent)
582 # updating totals for the next iteration
584 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
585 # returning the encoded message
588 # Section of message encoders
590 def open_message(self, version=None, my_autonomous_system=None,
591 hold_time=None, bgp_identifier=None):
592 """Generates an OPEN Message (rfc4271#section-4.2)
595 :param version: see the rfc4271#section-4.2
596 :param my_autonomous_system: see the rfc4271#section-4.2
597 :param hold_time: see the rfc4271#section-4.2
598 :param bgp_identifier: see the rfc4271#section-4.2
600 :return: encoded OPEN message in HEX
603 # Default values handling
605 version = self.version_default
606 if my_autonomous_system is None:
607 my_autonomous_system = self.my_autonomous_system_default
608 if hold_time is None:
609 hold_time = self.hold_time_default
610 if bgp_identifier is None:
611 bgp_identifier = self.bgp_identifier_default
614 marker_hex = "\xFF" * 16
618 type_hex = struct.pack("B", type)
621 version_hex = struct.pack("B", version)
623 # my_autonomous_system
624 # AS_TRANS value, 23456 decadic.
625 my_autonomous_system_2_bytes = 23456
626 # AS number is mappable to 2 bytes
627 if my_autonomous_system < 65536:
628 my_autonomous_system_2_bytes = my_autonomous_system
629 my_autonomous_system_hex_2_bytes = struct.pack(">H",
630 my_autonomous_system)
633 hold_time_hex = struct.pack(">H", hold_time)
636 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
638 # Optional Parameters
639 optional_parameters_hex = (
640 "\x02" # Param type ("Capability Ad")
641 "\x06" # Length (6 bytes)
642 "\x01" # Capability type (NLRI Unicast),
643 # see RFC 4760, secton 8
644 "\x04" # Capability value length
645 "\x00\x01" # AFI (Ipv4)
647 "\x01" # SAFI (Unicast)
649 "\x02" # Param type ("Capability Ad")
650 "\x06" # Length (6 bytes)
651 "\x41" # "32 bit AS Numbers Support"
652 # (see RFC 6793, section 3)
653 "\x04" # Capability value length
654 # My AS in 32 bit format
655 + struct.pack(">I", my_autonomous_system)
658 # Optional Parameters Length
659 optional_parameters_length = len(optional_parameters_hex)
660 optional_parameters_length_hex = struct.pack("B",
661 optional_parameters_length)
663 # Length (big-endian)
665 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
666 len(my_autonomous_system_hex_2_bytes) +
667 len(hold_time_hex) + len(bgp_identifier_hex) +
668 len(optional_parameters_length_hex) +
669 len(optional_parameters_hex)
671 length_hex = struct.pack(">H", length)
679 my_autonomous_system_hex_2_bytes +
682 optional_parameters_length_hex +
683 optional_parameters_hex
687 stdout_logger.debug("OPEN Message encoding")
688 stdout_logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
689 stdout_logger.debug(" Length=" + str(length) + " (0x" +
690 binascii.hexlify(length_hex) + ")")
691 stdout_logger.debug(" Type=" + str(type) + " (0x" +
692 binascii.hexlify(type_hex) + ")")
693 stdout_logger.debug(" Version=" + str(version) + " (0x" +
694 binascii.hexlify(version_hex) + ")")
695 stdout_logger.debug(" My Autonomous System=" +
696 str(my_autonomous_system_2_bytes) + " (0x" +
697 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
699 stdout_logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
700 binascii.hexlify(hold_time_hex) + ")")
701 stdout_logger.debug(" BGP Identifier=" + str(bgp_identifier) +
702 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
703 stdout_logger.debug(" Optional Parameters Length=" +
704 str(optional_parameters_length) + " (0x" +
705 binascii.hexlify(optional_parameters_length_hex) +
707 stdout_logger.debug(" Optional Parameters=0x" +
708 binascii.hexlify(optional_parameters_hex))
709 stdout_logger.debug(" OPEN Message encoded: 0x" +
710 binascii.b2a_hex(message_hex))
714 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
715 wr_prefix_length=None, nlri_prefix_length=None,
716 my_autonomous_system=None, next_hop=None):
717 """Generates an UPDATE Message (rfc4271#section-4.3)
720 :param wr_prefixes: see the rfc4271#section-4.3
721 :param nlri_prefixes: see the rfc4271#section-4.3
722 :param wr_prefix_length: see the rfc4271#section-4.3
723 :param nlri_prefix_length: see the rfc4271#section-4.3
724 :param my_autonomous_system: see the rfc4271#section-4.3
725 :param next_hop: see the rfc4271#section-4.3
727 :return: encoded UPDATE message in HEX
730 # Default values handling
731 if wr_prefixes is None:
732 wr_prefixes = self.wr_prefixes_default
733 if nlri_prefixes is None:
734 nlri_prefixes = self.nlri_prefixes_default
735 if wr_prefix_length is None:
736 wr_prefix_length = self.prefix_length_default
737 if nlri_prefix_length is None:
738 nlri_prefix_length = self.prefix_length_default
739 if my_autonomous_system is None:
740 my_autonomous_system = self.my_autonomous_system_default
742 next_hop = self.next_hop_default
745 marker_hex = "\xFF" * 16
749 type_hex = struct.pack("B", type)
752 bytes = ((wr_prefix_length - 1) / 8) + 1
753 withdrawn_routes_hex = ""
754 for prefix in wr_prefixes:
755 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
756 struct.pack(">I", int(prefix))[:bytes])
757 withdrawn_routes_hex += withdrawn_route_hex
759 # Withdrawn Routes Length
760 withdrawn_routes_length = len(withdrawn_routes_hex)
761 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
763 # TODO: to replace hardcoded string by encoding?
765 if nlri_prefixes != []:
766 path_attributes_hex = (
767 "\x40" # Flags ("Well-Known")
768 "\x01" # Type (ORIGIN)
771 "\x40" # Flags ("Well-Known")
772 "\x02" # Type (AS_PATH)
774 "\x02" # AS segment type (AS_SEQUENCE)
775 "\x01" # AS segment length (1)
776 # AS segment (4 bytes)
777 + struct.pack(">I", my_autonomous_system) +
778 "\x40" # Flags ("Well-Known")
779 "\x03" # Type (NEXT_HOP)
781 # IP address of the next hop (4 bytes)
782 + struct.pack(">I", int(next_hop))
785 path_attributes_hex = ""
787 # Total Path Attributes Length
788 total_path_attributes_length = len(path_attributes_hex)
789 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
791 # Network Layer Reachability Information
792 bytes = ((nlri_prefix_length - 1) / 8) + 1
794 for prefix in nlri_prefixes:
795 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
796 struct.pack(">I", int(prefix))[:bytes])
797 nlri_hex += nlri_prefix_hex
799 # Length (big-endian)
801 len(marker_hex) + 2 + len(type_hex) +
802 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
803 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
805 length_hex = struct.pack(">H", length)
812 withdrawn_routes_length_hex +
813 withdrawn_routes_hex +
814 total_path_attributes_length_hex +
815 path_attributes_hex +
820 stdout_logger.debug("UPDATE Message encoding")
821 stdout_logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
822 stdout_logger.debug(" Length=" + str(length) + " (0x" +
823 binascii.hexlify(length_hex) + ")")
824 stdout_logger.debug(" Type=" + str(type) + " (0x" +
825 binascii.hexlify(type_hex) + ")")
826 stdout_logger.debug(" withdrawn_routes_length=" +
827 str(withdrawn_routes_length) + " (0x" +
828 binascii.hexlify(withdrawn_routes_length_hex) + ")")
829 stdout_logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
830 str(wr_prefix_length) + " (0x" +
831 binascii.hexlify(withdrawn_routes_hex) + ")")
832 stdout_logger.debug(" Total Path Attributes Length=" +
833 str(total_path_attributes_length) + " (0x" +
834 binascii.hexlify(total_path_attributes_length_hex) +
836 stdout_logger.debug(" Path Attributes=" + "(0x" +
837 binascii.hexlify(path_attributes_hex) + ")")
838 stdout_logger.debug(" Network Layer Reachability Information=" +
839 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
840 " (0x" + binascii.hexlify(nlri_hex) + ")")
841 stdout_logger.debug(" UPDATE Message encoded: 0x" +
842 binascii.b2a_hex(message_hex))
845 self.updates_sent += 1
846 # returning encoded message
849 def notification_message(self, error_code, error_subcode, data_hex=""):
850 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
853 :param error_code: see the rfc4271#section-4.5
854 :param error_subcode: see the rfc4271#section-4.5
855 :param data_hex: see the rfc4271#section-4.5
857 :return: encoded NOTIFICATION message in HEX
861 marker_hex = "\xFF" * 16
865 type_hex = struct.pack("B", type)
868 error_code_hex = struct.pack("B", error_code)
871 error_subcode_hex = struct.pack("B", error_subcode)
873 # Length (big-endian)
874 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
875 len(error_subcode_hex) + len(data_hex))
876 length_hex = struct.pack(">H", length)
878 # NOTIFICATION Message
889 stdout_logger.debug("NOTIFICATION Message encoding")
890 stdout_logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
891 stdout_logger.debug(" Length=" + str(length) + " (0x" +
892 binascii.hexlify(length_hex) + ")")
893 stdout_logger.debug(" Type=" + str(type) + " (0x" +
894 binascii.hexlify(type_hex) + ")")
895 stdout_logger.debug(" Error Code=" + str(error_code) + " (0x" +
896 binascii.hexlify(error_code_hex) + ")")
897 stdout_logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
898 binascii.hexlify(error_subcode_hex) + ")")
899 stdout_logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
900 stdout_logger.debug(" NOTIFICATION Message encoded: 0x" +
901 binascii.b2a_hex(message_hex))
905 def keepalive_message(self):
906 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
909 :return: encoded KEEP ALIVE message in HEX
913 marker_hex = "\xFF" * 16
917 type_hex = struct.pack("B", type)
919 # Length (big-endian)
920 length = len(marker_hex) + 2 + len(type_hex)
921 length_hex = struct.pack(">H", length)
931 stdout_logger.debug("KEEP ALIVE Message encoding")
932 stdout_logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
933 stdout_logger.debug(" Length=" + str(length) + " (0x" +
934 binascii.hexlify(length_hex) + ")")
935 stdout_logger.debug(" Type=" + str(type) + " (0x" +
936 binascii.hexlify(type_hex) + ")")
937 stdout_logger.debug(" KEEP ALIVE Message encoded: 0x" +
938 binascii.b2a_hex(message_hex))
943 class TimeTracker(object):
944 """Class for tracking timers, both for my keepalives and
948 def __init__(self, msg_in):
949 """Initialisation. based on defaults and OPEN message from peer.
952 msg_in: the OPEN message received from peer.
954 # Note: Relative time is always named timedelta, to stress that
955 # the (non-delta) time is absolute.
956 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
957 # Upper bound for being stuck in the same state, we should
958 # at least report something before continuing.
959 # Negotiate the hold timer by taking the smaller
960 # of the 2 values (mine and the peer's).
961 hold_timedelta = 180 # Not an attribute of self yet.
962 # TODO: Make the default value configurable,
963 # default value could mirror what peer said.
964 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
965 if hold_timedelta > peer_hold_timedelta:
966 hold_timedelta = peer_hold_timedelta
967 if hold_timedelta != 0 and hold_timedelta < 3:
968 stdout_logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
969 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
970 self.hold_timedelta = hold_timedelta
971 # If we do not hear from peer this long, we assume it has died.
972 self.keepalive_timedelta = int(hold_timedelta / 3.0)
973 # Upper limit for duration between messages, to avoid being
974 # declared to be dead.
975 # The same as calling snapshot(), but also declares a field.
976 self.snapshot_time = time.time()
977 # Sometimes we need to store time. This is where to get
978 # the value from afterwards. Time_keepalive may be too strict.
979 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
980 # At this time point, peer will be declared dead.
981 self.my_keepalive_time = None # to be set later
982 # At this point, we should be sending keepalive message.
985 """Store current time in instance data to use later."""
986 # Read as time before something interesting was called.
987 self.snapshot_time = time.time()
989 def reset_peer_hold_time(self):
990 """Move hold time to future as peer has just proven it still lives."""
991 self.peer_hold_time = time.time() + self.hold_timedelta
993 # Some methods could rely on self.snapshot_time, but it is better
994 # to require user to provide it explicitly.
995 def reset_my_keepalive_time(self, keepalive_time):
996 """Calculate and set the next my KEEP ALIVE timeout time
999 :keepalive_time: the initial value of the KEEP ALIVE timer
1001 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1003 def is_time_for_my_keepalive(self):
1004 """Check for my KEEP ALIVE timeout occurence"""
1005 if self.hold_timedelta == 0:
1007 return self.snapshot_time >= self.my_keepalive_time
1009 def get_next_event_time(self):
1010 """Set the time of the next expected or to be sent KEEP ALIVE"""
1011 if self.hold_timedelta == 0:
1012 return self.snapshot_time + 86400
1013 return min(self.my_keepalive_time, self.peer_hold_time)
1015 def check_peer_hold_time(self, snapshot_time):
1016 """Raise error if nothing was read from peer until specified time."""
1017 # Hold time = 0 means keepalive checking off.
1018 if self.hold_timedelta != 0:
1019 # time.time() may be too strict
1020 if snapshot_time > self.peer_hold_time:
1021 stdout_logger.error("Peer has overstepped the hold timer.")
1022 raise RuntimeError("Peer has overstepped the hold timer.")
1023 # TODO: Include hold_timedelta?
1024 # TODO: Add notification sending (attempt). That means
1025 # move to write tracker.
1028 class ReadTracker(object):
1029 """Class for tracking read of mesages chunk by chunk and
1033 def __init__(self, bgp_socket, timer):
1034 """The reader initialisation.
1037 bgp_socket: socket to be used for sending
1038 timer: timer to be used for scheduling
1040 # References to outside objects.
1041 self.socket = bgp_socket
1043 # BGP marker length plus length field length.
1044 self.header_length = 18
1045 # TODO: make it class (constant) attribute
1046 # Computation of where next chunk ends depends on whether
1047 # we are beyond length field.
1048 self.reading_header = True
1049 # Countdown towards next size computation.
1050 self.bytes_to_read = self.header_length
1051 # Incremental buffer for message under read.
1054 def read_message_chunk(self):
1055 """Read up to one message
1058 Currently it does not return anything.
1060 # TODO: We could return the whole message, currently not needed.
1061 # We assume the socket is readable.
1062 chunk_message = self.socket.recv(self.bytes_to_read)
1063 self.msg_in += chunk_message
1064 self.bytes_to_read -= len(chunk_message)
1065 # TODO: bytes_to_read < 0 is not possible, right?
1066 if not self.bytes_to_read:
1067 # Finished reading a logical block.
1068 if self.reading_header:
1069 # The logical block was a BGP header.
1070 # Now we know the size of the message.
1071 self.reading_header = False
1072 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1074 else: # We have finished reading the body of the message.
1075 # Peer has just proven it is still alive.
1076 self.timer.reset_peer_hold_time()
1077 # TODO: Do we want to count received messages?
1078 # This version ignores the received message.
1079 # TODO: Should we do validation and exit on anything
1080 # besides update or keepalive?
1081 # Prepare state for reading another message.
1082 message_type_hex = self.msg_in[self.header_length]
1083 if message_type_hex == "\x01":
1084 stdout_logger.info("OPEN message received: 0x%s",
1085 binascii.b2a_hex(self.msg_in))
1086 elif message_type_hex == "\x02":
1087 stdout_logger.debug("UPDATE message received: 0x%s",
1088 binascii.b2a_hex(self.msg_in))
1089 elif message_type_hex == "\x03":
1090 stdout_logger.info("NOTIFICATION message received: 0x%s",
1091 binascii.b2a_hex(self.msg_in))
1092 elif message_type_hex == "\x04":
1093 stdout_logger.info("KEEP ALIVE message received: 0x%s",
1094 binascii.b2a_hex(self.msg_in))
1096 stdout_logger.warning("Unexpected message received: 0x%s",
1097 binascii.b2a_hex(self.msg_in))
1099 self.reading_header = True
1100 self.bytes_to_read = self.header_length
1101 # We should not act upon peer_hold_time if we are reading
1102 # something right now.
1105 def wait_for_read(self):
1106 """Read message until timeout (next expected event).
1109 Used when no more updates has to be sent to avoid busy-wait.
1110 Currently it does not return anything.
1112 # Compute time to the first predictable state change
1113 event_time = self.timer.get_next_event_time()
1114 # snapshot_time would be imprecise
1115 wait_timedelta = event_time - time.time()
1116 if wait_timedelta < 0:
1117 # The program got around to waiting to an event in "very near
1118 # future" so late that it became a "past" event, thus tell
1119 # "select" to not wait at all. Passing negative timedelta to
1120 # select() would lead to either waiting forever (for -1) or
1121 # select.error("Invalid parameter") (for everything else).
1123 # And wait for event or something to read.
1124 select.select([self.socket], [], [self.socket], wait_timedelta)
1125 # Not checking anything, that will be done in next iteration.
1129 class WriteTracker(object):
1130 """Class tracking enqueueing messages and sending chunks of them."""
1132 def __init__(self, bgp_socket, generator, timer):
1133 """The writter initialisation.
1136 bgp_socket: socket to be used for sending
1137 generator: generator to be used for message generation
1138 timer: timer to be used for scheduling
1140 # References to outside objects,
1141 self.socket = bgp_socket
1142 self.generator = generator
1144 # Really new fields.
1145 # TODO: Would attribute docstrings add anything substantial?
1146 self.sending_message = False
1147 self.bytes_to_send = 0
1150 def enqueue_message_for_sending(self, message):
1151 """Enqueue message and change state.
1154 message: message to be enqueued into the msg_out buffer
1156 self.msg_out += message
1157 self.bytes_to_send += len(message)
1158 self.sending_message = True
1160 def send_message_chunk_is_whole(self):
1161 """Send enqueued data from msg_out buffer
1164 :return: true if no remaining data to send
1166 # We assume there is a msg_out to send and socket is writable.
1167 # print "going to send", repr(self.msg_out)
1168 self.timer.snapshot()
1169 bytes_sent = self.socket.send(self.msg_out)
1170 # Forget the part of message that was sent.
1171 self.msg_out = self.msg_out[bytes_sent:]
1172 self.bytes_to_send -= bytes_sent
1173 if not self.bytes_to_send:
1174 # TODO: Is it possible to hit negative bytes_to_send?
1175 self.sending_message = False
1176 # We should have reset hold timer on peer side.
1177 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1178 # The possible reason for not prioritizing reads is gone.
1183 class StateTracker(object):
1184 """Main loop has state so complex it warrants this separate class."""
1186 def __init__(self, bgp_socket, generator, timer):
1187 """The state tracker initialisation.
1190 bgp_socket: socket to be used for sending / receiving
1191 generator: generator to be used for message generation
1192 timer: timer to be used for scheduling
1194 # References to outside objects.
1195 self.socket = bgp_socket
1196 self.generator = generator
1199 self.reader = ReadTracker(bgp_socket, timer)
1200 self.writer = WriteTracker(bgp_socket, generator, timer)
1201 # Prioritization state.
1202 self.prioritize_writing = False
1203 # In general, we prioritize reading over writing. But in order
1204 # not to get blocked by neverending reads, we should
1205 # check whether we are not risking running out of holdtime.
1206 # So in some situations, this field is set to True to attempt
1207 # finishing sending a message, after which this field resets
1209 # TODO: Alternative is to switch fairly between reading and
1210 # writing (called round robin from now on).
1211 # Message counting is done in generator.
1213 def perform_one_loop_iteration(self):
1214 """ The main loop iteration
1217 Calculates priority, resolves all conditions, calls
1218 appropriate method and returns to caller to repeat.
1220 self.timer.snapshot()
1221 if not self.prioritize_writing:
1222 if self.timer.is_time_for_my_keepalive():
1223 if not self.writer.sending_message:
1224 # We need to schedule a keepalive ASAP.
1225 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1226 # We are sending a message now, so let's prioritize it.
1227 self.prioritize_writing = True
1228 # Now we know what our priorities are, we have to check
1229 # which actions are available.
1230 # socket.socket() returns three lists,
1231 # we store them to list of lists.
1232 list_list = select.select([self.socket], [self.socket], [self.socket],
1233 self.timer.report_timedelta)
1234 read_list, write_list, except_list = list_list
1235 # Lists are unpacked, each is either [] or [self.socket],
1236 # so we will test them as boolean.
1238 stdout_logger.error("Exceptional state on the socket.")
1239 raise RuntimeError("Exceptional state on socket", self.socket)
1240 # We will do either read or write.
1241 if not (self.prioritize_writing and write_list):
1242 # Either we have no reason to rush writes,
1243 # or the socket is not writable.
1244 # We are focusing on reading here.
1245 if read_list: # there is something to read indeed
1246 # In this case we want to read chunk of message
1247 # and repeat the select,
1248 self.reader.read_message_chunk()
1250 # We were focusing on reading, but nothing to read was there.
1251 # Good time to check peer for hold timer.
1252 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1253 # Quiet on the read front, we can have attempt to write.
1255 # Either we really want to reset peer's view of our hold
1256 # timer, or there was nothing to read.
1257 # Were we in the middle of sending a message?
1258 if self.writer.sending_message:
1259 # Was it the end of a message?
1260 whole = self.writer.send_message_chunk_is_whole()
1261 # We were pressed to send something and we did it.
1262 if self.prioritize_writing and whole:
1263 # We prioritize reading again.
1264 self.prioritize_writing = False
1266 # Finally to check if still update messages to be generated.
1267 if self.generator.remaining_prefixes:
1268 msg_out = self.generator.compose_update_message()
1269 if not self.generator.remaining_prefixes:
1270 # We have just finished update generation,
1271 # end-of-rib is due.
1272 stdout_logger.info("All update messages generated.")
1273 stdout_logger.info("Storing performance results.")
1274 self.generator.store_results()
1275 stdout_logger.info("Finally an END-OF-RIB is going to be sent.")
1276 msg_out += self.generator.update_message(wr_prefixes=[],
1278 self.writer.enqueue_message_for_sending(msg_out)
1279 # Attempt for real sending to be done in next iteration.
1281 # Nothing to write anymore, except occasional keepalives.
1282 stdout_logger.info("Everything has been done." +
1283 "Now just waiting for possible incomming message.")
1284 # To avoid busy loop, we do idle waiting here.
1285 self.reader.wait_for_read()
1287 # We can neither read nor write.
1288 stdout_logger.warning("Input and output both blocked for " +
1289 str(self.timer.report_timedelta) + " seconds.")
1290 # FIXME: Are we sure select has been really waiting
1295 if __name__ == "__main__":
1296 """ One time initialisation and iterations looping.
1299 Establish BGP connection and run iterations.
1301 arguments = parse_arguments()
1302 logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
1303 stdout_logger = logging.getLogger("stdout_logger")
1304 stdout_logger.setLevel(arguments.loglevel)
1305 bgp_socket = establish_connection(arguments)
1306 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1307 # Receive open message before sending anything.
1308 # FIXME: Add parameter to send default open message first,
1309 # to work with "you first" peers.
1310 msg_in = read_open_message(bgp_socket)
1311 timer = TimeTracker(msg_in)
1312 generator = MessageGenerator(arguments)
1313 msg_out = generator.open_message()
1314 stdout_logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1315 # Send our open message to the peer.
1316 bgp_socket.send(msg_out)
1317 # Wait for confirming keepalive.
1318 # TODO: Surely in just one packet?
1319 # Using exact keepalive length to not to see possible updates.
1320 msg_in = bgp_socket.recv(19)
1321 if msg_in != generator.keepalive_message():
1322 stdout_logger.error("Open not confirmed by keepalive, instead got " +
1323 binascii.hexlify(msg_in))
1324 raise MessageError("Open not confirmed by keepalive, instead got",
1326 timer.reset_peer_hold_time()
1327 # Send the keepalive to indicate the connection is accepted.
1328 timer.snapshot() # Remember this time.
1329 msg_out = generator.keepalive_message()
1330 stdout_logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1331 bgp_socket.send(msg_out)
1332 # Use the remembered time.
1333 timer.reset_my_keepalive_time(timer.snapshot_time)
1334 # End of initial handshake phase.
1335 state = StateTracker(bgp_socket, generator, timer)
1336 while True: # main reactor loop
1337 state.perform_one_loop_iteration()