1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
29 def parse_arguments():
30 """Use argparse to get arguments,
35 parser = argparse.ArgumentParser()
36 # TODO: Should we use --argument-names-with-spaces?
37 str_help = "Autonomous System number use in the stream."
38 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
39 # FIXME: We are acting as iBGP peer,
40 # we should mirror AS number from peer's open message.
41 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
42 parser.add_argument("--amount", default="1", type=int, help=str_help)
43 str_help = "Maximum number of IP prefixes to be announced in one iteration"
44 parser.add_argument("--insert", default="1", type=int, help=str_help)
45 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
46 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
47 str_help = "The number of prefixes to process without withdrawals"
48 parser.add_argument("--prefill", default="0", type=int, help=str_help)
49 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
50 parser.add_argument("--updates", choices=["single", "separate"],
51 default=["separate"], help=str_help)
52 str_help = "Base prefix IP address for prefix generation"
53 parser.add_argument("--firstprefix", default="8.0.1.0",
54 type=ipaddr.IPv4Address, help=str_help)
55 str_help = "The prefix length."
56 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
57 str_help = "Listen for connection, instead of initiating it."
58 parser.add_argument("--listen", action="store_true", help=str_help)
59 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
60 "Default value only suitable for listening.")
61 parser.add_argument("--myip", default="0.0.0.0",
62 type=ipaddr.IPv4Address, help=str_help)
63 str_help = ("TCP port to bind to when listening or initiating connection." +
64 "Default only suitable for initiating.")
65 parser.add_argument("--myport", default="0", type=int, help=str_help)
66 str_help = "The IP of the next hop to be placed into the update messages."
67 parser.add_argument("--nexthop", default="192.0.2.1",
68 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
69 str_help = ("Numeric IP Address to try to connect to." +
70 "Currently no effect in listening mode.")
71 parser.add_argument("--peerip", default="127.0.0.2",
72 type=ipaddr.IPv4Address, help=str_help)
73 str_help = "TCP port to try to connect to. No effect in listening mode."
74 parser.add_argument("--peerport", default="179", type=int, help=str_help)
75 str_help = "Local hold time."
76 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
77 str_help = "Log level (--error, --warning, --info, --debug)"
78 parser.add_argument("--error", dest="loglevel", action="store_const",
79 const=logging.ERROR, default=logging.INFO,
81 parser.add_argument("--warning", dest="loglevel", action="store_const",
82 const=logging.WARNING, default=logging.INFO,
84 parser.add_argument("--info", dest="loglevel", action="store_const",
85 const=logging.INFO, default=logging.INFO,
87 parser.add_argument("--debug", dest="loglevel", action="store_const",
88 const=logging.DEBUG, default=logging.INFO,
90 str_help = "Log file name"
91 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
92 str_help = "Trailing part of the csv result files for plotting purposes"
93 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
94 str_help = "Minimum number of updates to reach to include result into csv."
95 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
96 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
97 parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
98 arguments = parser.parse_args()
99 # TODO: Are sanity checks (such as asnumber>=0) required?
103 def establish_connection(arguments):
104 """Establish connection to BGP peer.
107 :arguments: following command-line argumets are used
108 - arguments.myip: local IP address
109 - arguments.myport: local port
110 - arguments.peerip: remote IP address
111 - arguments.peerport: remote port
116 logger.info("Connecting in the listening mode.")
117 logger.debug("Local IP address: " + str(arguments.myip))
118 logger.debug("Local port: " + str(arguments.myport))
119 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
120 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
121 # bind need single tuple as argument
122 listening_socket.bind((str(arguments.myip), arguments.myport))
123 listening_socket.listen(1)
124 bgp_socket, _ = listening_socket.accept()
125 # TODO: Verify client IP is cotroller IP.
126 listening_socket.close()
128 logger.info("Connecting in the talking mode.")
129 logger.debug("Local IP address: " + str(arguments.myip))
130 logger.debug("Local port: " + str(arguments.myport))
131 logger.debug("Remote IP address: " + str(arguments.peerip))
132 logger.debug("Remote port: " + str(arguments.peerport))
133 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
134 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
135 # bind to force specified address and port
136 talking_socket.bind((str(arguments.myip), arguments.myport))
137 # socket does not spead ipaddr, hence str()
138 talking_socket.connect((str(arguments.peerip), arguments.peerport))
139 bgp_socket = talking_socket
140 logger.info("Connected to ODL.")
144 def get_short_int_from_message(message, offset=16):
145 """Extract 2-bytes number from provided message.
148 :message: given message
149 :offset: offset of the short_int inside the message
151 :return: required short_inf value.
153 default offset value is the BGP message size offset.
155 high_byte_int = ord(message[offset])
156 low_byte_int = ord(message[offset + 1])
157 short_int = high_byte_int * 256 + low_byte_int
161 def get_prefix_list_from_hex(prefixes_hex):
162 """Get decoded list of prefixes (rfc4271#section-4.3)
165 :prefixes_hex: list of prefixes to be decoded in hex
167 :return: list of prefixes in the form of ip address (X.X.X.X/X)
171 while offset < len(prefixes_hex):
172 prefix_bit_len_hex = prefixes_hex[offset]
173 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
174 prefix_len = ((prefix_bit_len - 1) / 8) + 1
175 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
176 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
177 offset += 1 + prefix_len
178 prefix_list.append(prefix + "/" + str(prefix_bit_len))
182 class MessageError(ValueError):
183 """Value error with logging optimized for hexlified messages."""
185 def __init__(self, text, message, *args):
188 Store and call super init for textual comment,
189 store raw message which caused it.
193 super(MessageError, self).__init__(text, message, *args)
196 """Generate human readable error message.
199 :return: human readable message as string
201 Use a placeholder string if the message is to be empty.
203 message = binascii.hexlify(self.msg)
205 message = "(empty message)"
206 return self.text + ": " + message
209 def read_open_message(bgp_socket):
210 """Receive peer's OPEN message
213 :bgp_socket: the socket to be read
215 :return: received OPEN message.
217 Performs just basic incomming message checks
219 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
220 # TODO: Can the incoming open message be split in more than one packet?
223 # 37 is minimal length of open message with 4-byte AS number.
224 logger.error("Got something else than open with 4-byte AS number: " +
225 binascii.hexlify(msg_in))
226 raise MessageError("Got something else than open with 4-byte AS number", msg_in)
227 # TODO: We could check BGP marker, but it is defined only later;
229 reported_length = get_short_int_from_message(msg_in)
230 if len(msg_in) != reported_length:
231 logger.error("Message length is not " + str(reported_length) +
232 " as stated in " + binascii.hexlify(msg_in))
233 raise MessageError("Message length is not " + reported_length +
234 " as stated in ", msg_in)
235 logger.info("Open message received.")
239 class MessageGenerator(object):
240 """Class which generates messages, holds states and configuration values."""
242 # TODO: Define bgp marker as a class (constant) variable.
243 def __init__(self, args):
244 """Initialisation according to command-line args.
247 :args: argsparser's Namespace object which contains command-line
248 options for MesageGenerator initialisation
250 Calculates and stores default values used later on for
253 self.total_prefix_amount = args.amount
254 # Number of update messages left to be sent.
255 self.remaining_prefixes = self.total_prefix_amount
257 # New parameters initialisation
259 self.prefix_base_default = args.firstprefix
260 self.prefix_length_default = args.prefixlen
261 self.wr_prefixes_default = []
262 self.nlri_prefixes_default = []
263 self.version_default = 4
264 self.my_autonomous_system_default = args.asnumber
265 self.hold_time_default = args.holdtime # Local hold time.
266 self.bgp_identifier_default = int(args.myip)
267 self.next_hop_default = args.nexthop
268 self.single_update_default = args.updates == "single"
269 self.randomize_updates_default = args.updates == "random"
270 self.prefix_count_to_add_default = args.insert
271 self.prefix_count_to_del_default = args.withdraw
272 if self.prefix_count_to_del_default < 0:
273 self.prefix_count_to_del_default = 0
274 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
275 # total number of prefixes must grow to avoid infinite test loop
276 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
277 self.slot_size_default = self.prefix_count_to_add_default
278 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
279 self.results_file_name_default = args.results
280 self.performance_threshold_default = args.threshold
281 self.rfc4760 = args.rfc4760 == "yes"
282 # Default values used for randomized part
283 s1_slots = ((self.total_prefix_amount -
284 self.remaining_prefixes_threshold - 1) /
285 self.prefix_count_to_add_default + 1)
286 s2_slots = ((self.remaining_prefixes_threshold - 1) /
287 (self.prefix_count_to_add_default -
288 self.prefix_count_to_del_default) + 1)
290 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
291 s2_first_index = s1_slots * self.prefix_count_to_add_default
292 s2_last_index = (s2_first_index +
293 s2_slots * (self.prefix_count_to_add_default -
294 self.prefix_count_to_del_default) - 1)
295 self.slot_gap_default = ((self.total_prefix_amount -
296 self.remaining_prefixes_threshold - 1) /
297 self.prefix_count_to_add_default + 1)
298 self.randomize_lowest_default = s2_first_index
299 self.randomize_highest_default = s2_last_index
301 # Initialising counters
302 self.phase1_start_time = 0
303 self.phase1_stop_time = 0
304 self.phase2_start_time = 0
305 self.phase2_stop_time = 0
306 self.phase1_updates_sent = 0
307 self.phase2_updates_sent = 0
308 self.updates_sent = 0
310 self.log_info = args.loglevel <= logging.INFO
311 self.log_debug = args.loglevel <= logging.DEBUG
313 Flags needed for the MessageGenerator performance optimization.
314 Calling logger methods each iteration even with proper log level set
315 slows down significantly the MessageGenerator performance.
316 Measured total generation time (1M updates, dry run, error log level):
317 - logging based on basic logger features: 36,2s
318 - logging based on advanced logger features (lazy logging): 21,2s
319 - conditional calling of logger methods enclosed inside condition: 8,6s
322 logger.info("Generator initialisation")
323 logger.info(" Target total number of prefixes to be introduced: " +
324 str(self.total_prefix_amount))
325 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
326 str(self.prefix_length_default))
327 logger.info(" My Autonomous System number: " +
328 str(self.my_autonomous_system_default))
329 logger.info(" My Hold Time: " + str(self.hold_time_default))
330 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
331 logger.info(" Next Hop: " + str(self.next_hop_default))
332 logger.info(" Prefix count to be inserted at once: " +
333 str(self.prefix_count_to_add_default))
334 logger.info(" Prefix count to be withdrawn at once: " +
335 str(self.prefix_count_to_del_default))
336 logger.info(" Fast pre-fill up to " +
337 str(self.total_prefix_amount -
338 self.remaining_prefixes_threshold) + " prefixes")
339 logger.info(" Remaining number of prefixes to be processed " +
340 "in parallel with withdrawals: " +
341 str(self.remaining_prefixes_threshold))
342 logger.debug(" Prefix index range used after pre-fill procedure [" +
343 str(self.randomize_lowest_default) + ", " +
344 str(self.randomize_highest_default) + "]")
345 if self.single_update_default:
346 logger.info(" Common single UPDATE will be generated " +
347 "for both NLRI & WITHDRAWN lists")
349 logger.info(" Two separate UPDATEs will be generated " +
350 "for each NLRI & WITHDRAWN lists")
351 if self.randomize_updates_default:
352 logger.info(" Generation of UPDATE messages will be randomized")
353 logger.info(" Let\'s go ...\n")
355 # TODO: Notification for hold timer expiration can be handy.
357 def store_results(self, file_name=None, threshold=None):
358 """ Stores specified results into files based on file_name value.
361 :param file_name: Trailing (common) part of result file names
362 :param threshold: Minimum number of sent updates needed for each
363 result to be included into result csv file
364 (mainly needed because of the result accuracy)
368 # default values handling
369 if file_name is None:
370 file_name = self.results_file_name_default
371 if threshold is None:
372 threshold = self.performance_threshold_default
373 # performance calculation
374 if self.phase1_updates_sent >= threshold:
375 totals1 = self.phase1_updates_sent
376 performance1 = int(self.phase1_updates_sent /
377 (self.phase1_stop_time - self.phase1_start_time))
381 if self.phase2_updates_sent >= threshold:
382 totals2 = self.phase2_updates_sent
383 performance2 = int(self.phase2_updates_sent /
384 (self.phase2_stop_time - self.phase2_start_time))
389 logger.info("#" * 10 + " Final results " + "#" * 10)
390 logger.info("Number of iterations: " + str(self.iteration))
391 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
392 str(self.phase1_updates_sent))
393 logger.info("The pre-fill phase duration: " +
394 str(self.phase1_stop_time - self.phase1_start_time) + "s")
395 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
396 str(self.phase2_updates_sent))
397 logger.info("The 2nd test phase duration: " +
398 str(self.phase2_stop_time - self.phase2_start_time) + "s")
399 logger.info("Threshold for performance reporting: " + str(threshold))
402 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
403 " route(s) per UPDATE")
404 if self.single_update_default:
405 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
406 "/-" + str(self.prefix_count_to_del_default) +
407 " routes per UPDATE")
409 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
410 "/-" + str(self.prefix_count_to_del_default) +
411 " routes in two UPDATEs")
412 # collecting capacity and performance results
415 if totals1 is not None:
416 totals[phase1_label] = totals1
417 performance[phase1_label] = performance1
418 if totals2 is not None:
419 totals[phase2_label] = totals2
420 performance[phase2_label] = performance2
421 self.write_results_to_file(totals, "totals-" + file_name)
422 self.write_results_to_file(performance, "performance-" + file_name)
424 def write_results_to_file(self, results, file_name):
425 """Writes results to the csv plot file consumable by Jenkins.
428 :param file_name: Name of the (csv) file to be created
434 f = open(file_name, "wt")
436 for key in sorted(results):
437 first_line += key + ", "
438 second_line += str(results[key]) + ", "
439 first_line = first_line[:-2]
440 second_line = second_line[:-2]
441 f.write(first_line + "\n")
442 f.write(second_line + "\n")
443 logger.info("Message generator performance results stored in " +
445 logger.info(" " + first_line)
446 logger.info(" " + second_line)
450 # Return pseudo-randomized (reproducible) index for selected range
451 def randomize_index(self, index, lowest=None, highest=None):
452 """Calculates pseudo-randomized index from selected range.
455 :param index: input index
456 :param lowest: the lowes index from the randomized area
457 :param highest: the highest index from the randomized area
459 :return: the (pseudo)randomized index
461 Created just as a fame for future generator enhancement.
463 # default values handling
465 lowest = self.randomize_lowest_default
467 highest = self.randomize_highest_default
469 if (index >= lowest) and (index <= highest):
470 # we are in the randomized range -> shuffle it inside
471 # the range (now just reverse the order)
472 new_index = highest - (index - lowest)
474 # we are out of the randomized range -> nothing to do
478 # Get list of prefixes
479 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
480 prefix_len=None, prefix_count=None, randomize=None):
481 """Generates list of IP address prefixes.
484 :param slot_index: index of group of prefix addresses
485 :param slot_size: size of group of prefix addresses
486 in [number of included prefixes]
487 :param prefix_base: IP address of the first prefix
488 (slot_index = 0, prefix_index = 0)
489 :param prefix_len: length of the prefix in bites
490 (the same as size of netmask)
491 :param prefix_count: number of prefixes to be returned
492 from the specified slot
494 :return: list of generated IP address prefixes
496 # default values handling
497 if slot_size is None:
498 slot_size = self.slot_size_default
499 if prefix_base is None:
500 prefix_base = self.prefix_base_default
501 if prefix_len is None:
502 prefix_len = self.prefix_length_default
503 if prefix_count is None:
504 prefix_count = slot_size
505 if randomize is None:
506 randomize = self.randomize_updates_default
507 # generating list of prefixes
510 prefix_gap = 2 ** (32 - prefix_len)
511 for i in range(prefix_count):
512 prefix_index = slot_index * slot_size + i
514 prefix_index = self.randomize_index(prefix_index)
515 indexes.append(prefix_index)
516 prefixes.append(prefix_base + prefix_index * prefix_gap)
518 logger.debug(" Prefix slot index: " + str(slot_index))
519 logger.debug(" Prefix slot size: " + str(slot_size))
520 logger.debug(" Prefix count: " + str(prefix_count))
521 logger.debug(" Prefix indexes: " + str(indexes))
522 logger.debug(" Prefix list: " + str(prefixes))
525 def compose_update_message(self, prefix_count_to_add=None,
526 prefix_count_to_del=None):
527 """Composes an UPDATE message
530 :param prefix_count_to_add: # of prefixes to put into NLRI list
531 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
533 :return: encoded UPDATE message in HEX
535 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
536 lists or common message wich includes both prefix lists.
537 Updates global counters.
539 # default values handling
540 if prefix_count_to_add is None:
541 prefix_count_to_add = self.prefix_count_to_add_default
542 if prefix_count_to_del is None:
543 prefix_count_to_del = self.prefix_count_to_del_default
545 if self.log_info and not (self.iteration % 1000):
546 logger.info("Iteration: " + str(self.iteration) +
547 " - total remaining prefixes: " +
548 str(self.remaining_prefixes))
550 logger.debug("#" * 10 + " Iteration: " +
551 str(self.iteration) + " " + "#" * 10)
552 logger.debug("Remaining prefixes: " +
553 str(self.remaining_prefixes))
554 # scenario type & one-shot counter
555 straightforward_scenario = (self.remaining_prefixes >
556 self.remaining_prefixes_threshold)
557 if straightforward_scenario:
558 prefix_count_to_del = 0
560 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
561 if not self.phase1_start_time:
562 self.phase1_start_time = time.time()
565 logger.debug("--- COMBINED SCENARIO ---")
566 if not self.phase2_start_time:
567 self.phase2_start_time = time.time()
568 # tailor the number of prefixes if needed
569 prefix_count_to_add = (prefix_count_to_del +
570 min(prefix_count_to_add - prefix_count_to_del,
571 self.remaining_prefixes))
572 # prefix slots selection for insertion and withdrawal
573 slot_index_to_add = self.iteration
574 slot_index_to_del = slot_index_to_add - self.slot_gap_default
575 # getting lists of prefixes for insertion in this iteration
577 logger.debug("Prefixes to be inserted in this iteration:")
578 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
579 prefix_count=prefix_count_to_add)
580 # getting lists of prefixes for withdrawal in this iteration
582 logger.debug("Prefixes to be withdrawn in this iteration:")
583 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
584 prefix_count=prefix_count_to_del)
585 # generating the mesage
586 if self.single_update_default:
587 # Send prefixes to be introduced and withdrawn
588 # in one UPDATE message
589 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
590 nlri_prefixes=prefix_list_to_add)
592 # Send prefixes to be introduced and withdrawn
593 # in separate UPDATE messages (if needed)
594 msg_out = self.update_message(wr_prefixes=[],
595 nlri_prefixes=prefix_list_to_add)
596 if prefix_count_to_del:
597 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
599 # updating counters - who knows ... maybe I am last time here ;)
600 if straightforward_scenario:
601 self.phase1_stop_time = time.time()
602 self.phase1_updates_sent = self.updates_sent
604 self.phase2_stop_time = time.time()
605 self.phase2_updates_sent = (self.updates_sent -
606 self.phase1_updates_sent)
607 # updating totals for the next iteration
609 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
610 # returning the encoded message
613 # Section of message encoders
615 def open_message(self, version=None, my_autonomous_system=None,
616 hold_time=None, bgp_identifier=None):
617 """Generates an OPEN Message (rfc4271#section-4.2)
620 :param version: see the rfc4271#section-4.2
621 :param my_autonomous_system: see the rfc4271#section-4.2
622 :param hold_time: see the rfc4271#section-4.2
623 :param bgp_identifier: see the rfc4271#section-4.2
625 :return: encoded OPEN message in HEX
628 # Default values handling
630 version = self.version_default
631 if my_autonomous_system is None:
632 my_autonomous_system = self.my_autonomous_system_default
633 if hold_time is None:
634 hold_time = self.hold_time_default
635 if bgp_identifier is None:
636 bgp_identifier = self.bgp_identifier_default
639 marker_hex = "\xFF" * 16
643 type_hex = struct.pack("B", type)
646 version_hex = struct.pack("B", version)
648 # my_autonomous_system
649 # AS_TRANS value, 23456 decadic.
650 my_autonomous_system_2_bytes = 23456
651 # AS number is mappable to 2 bytes
652 if my_autonomous_system < 65536:
653 my_autonomous_system_2_bytes = my_autonomous_system
654 my_autonomous_system_hex_2_bytes = struct.pack(">H",
655 my_autonomous_system)
658 hold_time_hex = struct.pack(">H", hold_time)
661 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
663 # Optional Parameters
664 optional_parameters_hex = ""
666 optional_parameter_hex = (
667 "\x02" # Param type ("Capability Ad")
668 "\x06" # Length (6 bytes)
669 "\x01" # Capability type (NLRI Unicast),
670 # see RFC 4760, secton 8
671 "\x04" # Capability value length
672 "\x00\x01" # AFI (Ipv4)
674 "\x01" # SAFI (Unicast)
676 optional_parameters_hex += optional_parameter_hex
678 optional_parameter_hex = (
679 "\x02" # Param type ("Capability Ad")
680 "\x06" # Length (6 bytes)
681 "\x41" # "32 bit AS Numbers Support"
682 # (see RFC 6793, section 3)
683 "\x04" # Capability value length
684 # My AS in 32 bit format
685 + struct.pack(">I", my_autonomous_system)
687 optional_parameters_hex += optional_parameter_hex
689 # Optional Parameters Length
690 optional_parameters_length = len(optional_parameters_hex)
691 optional_parameters_length_hex = struct.pack("B",
692 optional_parameters_length)
694 # Length (big-endian)
696 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
697 len(my_autonomous_system_hex_2_bytes) +
698 len(hold_time_hex) + len(bgp_identifier_hex) +
699 len(optional_parameters_length_hex) +
700 len(optional_parameters_hex)
702 length_hex = struct.pack(">H", length)
710 my_autonomous_system_hex_2_bytes +
713 optional_parameters_length_hex +
714 optional_parameters_hex
718 logger.debug("OPEN message encoding")
719 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
720 logger.debug(" Length=" + str(length) + " (0x" +
721 binascii.hexlify(length_hex) + ")")
722 logger.debug(" Type=" + str(type) + " (0x" +
723 binascii.hexlify(type_hex) + ")")
724 logger.debug(" Version=" + str(version) + " (0x" +
725 binascii.hexlify(version_hex) + ")")
726 logger.debug(" My Autonomous System=" +
727 str(my_autonomous_system_2_bytes) + " (0x" +
728 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
730 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
731 binascii.hexlify(hold_time_hex) + ")")
732 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
733 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
734 logger.debug(" Optional Parameters Length=" +
735 str(optional_parameters_length) + " (0x" +
736 binascii.hexlify(optional_parameters_length_hex) +
738 logger.debug(" Optional Parameters=0x" +
739 binascii.hexlify(optional_parameters_hex))
740 logger.debug("OPEN message encoded: 0x%s",
741 binascii.b2a_hex(message_hex))
745 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
746 wr_prefix_length=None, nlri_prefix_length=None,
747 my_autonomous_system=None, next_hop=None):
748 """Generates an UPDATE Message (rfc4271#section-4.3)
751 :param wr_prefixes: see the rfc4271#section-4.3
752 :param nlri_prefixes: see the rfc4271#section-4.3
753 :param wr_prefix_length: see the rfc4271#section-4.3
754 :param nlri_prefix_length: see the rfc4271#section-4.3
755 :param my_autonomous_system: see the rfc4271#section-4.3
756 :param next_hop: see the rfc4271#section-4.3
758 :return: encoded UPDATE message in HEX
761 # Default values handling
762 if wr_prefixes is None:
763 wr_prefixes = self.wr_prefixes_default
764 if nlri_prefixes is None:
765 nlri_prefixes = self.nlri_prefixes_default
766 if wr_prefix_length is None:
767 wr_prefix_length = self.prefix_length_default
768 if nlri_prefix_length is None:
769 nlri_prefix_length = self.prefix_length_default
770 if my_autonomous_system is None:
771 my_autonomous_system = self.my_autonomous_system_default
773 next_hop = self.next_hop_default
776 marker_hex = "\xFF" * 16
780 type_hex = struct.pack("B", type)
783 bytes = ((wr_prefix_length - 1) / 8) + 1
784 withdrawn_routes_hex = ""
785 for prefix in wr_prefixes:
786 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
787 struct.pack(">I", int(prefix))[:bytes])
788 withdrawn_routes_hex += withdrawn_route_hex
790 # Withdrawn Routes Length
791 withdrawn_routes_length = len(withdrawn_routes_hex)
792 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
794 # TODO: to replace hardcoded string by encoding?
796 if nlri_prefixes != []:
797 path_attributes_hex = (
798 "\x40" # Flags ("Well-Known")
799 "\x01" # Type (ORIGIN)
802 "\x40" # Flags ("Well-Known")
803 "\x02" # Type (AS_PATH)
805 "\x02" # AS segment type (AS_SEQUENCE)
806 "\x01" # AS segment length (1)
807 # AS segment (4 bytes)
808 + struct.pack(">I", my_autonomous_system) +
809 "\x40" # Flags ("Well-Known")
810 "\x03" # Type (NEXT_HOP)
812 # IP address of the next hop (4 bytes)
813 + struct.pack(">I", int(next_hop))
816 path_attributes_hex = ""
818 # Total Path Attributes Length
819 total_path_attributes_length = len(path_attributes_hex)
820 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
822 # Network Layer Reachability Information
823 bytes = ((nlri_prefix_length - 1) / 8) + 1
825 for prefix in nlri_prefixes:
826 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
827 struct.pack(">I", int(prefix))[:bytes])
828 nlri_hex += nlri_prefix_hex
830 # Length (big-endian)
832 len(marker_hex) + 2 + len(type_hex) +
833 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
834 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
836 length_hex = struct.pack(">H", length)
843 withdrawn_routes_length_hex +
844 withdrawn_routes_hex +
845 total_path_attributes_length_hex +
846 path_attributes_hex +
851 logger.debug("UPDATE message encoding")
852 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
853 logger.debug(" Length=" + str(length) + " (0x" +
854 binascii.hexlify(length_hex) + ")")
855 logger.debug(" Type=" + str(type) + " (0x" +
856 binascii.hexlify(type_hex) + ")")
857 logger.debug(" withdrawn_routes_length=" +
858 str(withdrawn_routes_length) + " (0x" +
859 binascii.hexlify(withdrawn_routes_length_hex) + ")")
860 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
861 str(wr_prefix_length) + " (0x" +
862 binascii.hexlify(withdrawn_routes_hex) + ")")
863 logger.debug(" Total Path Attributes Length=" +
864 str(total_path_attributes_length) + " (0x" +
865 binascii.hexlify(total_path_attributes_length_hex) +
867 logger.debug(" Path Attributes=" + "(0x" +
868 binascii.hexlify(path_attributes_hex) + ")")
869 logger.debug(" Network Layer Reachability Information=" +
870 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
871 " (0x" + binascii.hexlify(nlri_hex) + ")")
872 logger.debug("UPDATE message encoded: 0x" +
873 binascii.b2a_hex(message_hex))
876 self.updates_sent += 1
877 # returning encoded message
880 def notification_message(self, error_code, error_subcode, data_hex=""):
881 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
884 :param error_code: see the rfc4271#section-4.5
885 :param error_subcode: see the rfc4271#section-4.5
886 :param data_hex: see the rfc4271#section-4.5
888 :return: encoded NOTIFICATION message in HEX
892 marker_hex = "\xFF" * 16
896 type_hex = struct.pack("B", type)
899 error_code_hex = struct.pack("B", error_code)
902 error_subcode_hex = struct.pack("B", error_subcode)
904 # Length (big-endian)
905 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
906 len(error_subcode_hex) + len(data_hex))
907 length_hex = struct.pack(">H", length)
909 # NOTIFICATION Message
920 logger.debug("NOTIFICATION message encoding")
921 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
922 logger.debug(" Length=" + str(length) + " (0x" +
923 binascii.hexlify(length_hex) + ")")
924 logger.debug(" Type=" + str(type) + " (0x" +
925 binascii.hexlify(type_hex) + ")")
926 logger.debug(" Error Code=" + str(error_code) + " (0x" +
927 binascii.hexlify(error_code_hex) + ")")
928 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
929 binascii.hexlify(error_subcode_hex) + ")")
930 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
931 logger.debug("NOTIFICATION message encoded: 0x%s",
932 binascii.b2a_hex(message_hex))
936 def keepalive_message(self):
937 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
940 :return: encoded KEEP ALIVE message in HEX
944 marker_hex = "\xFF" * 16
948 type_hex = struct.pack("B", type)
950 # Length (big-endian)
951 length = len(marker_hex) + 2 + len(type_hex)
952 length_hex = struct.pack(">H", length)
962 logger.debug("KEEP ALIVE message encoding")
963 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
964 logger.debug(" Length=" + str(length) + " (0x" +
965 binascii.hexlify(length_hex) + ")")
966 logger.debug(" Type=" + str(type) + " (0x" +
967 binascii.hexlify(type_hex) + ")")
968 logger.debug("KEEP ALIVE message encoded: 0x%s",
969 binascii.b2a_hex(message_hex))
974 class TimeTracker(object):
975 """Class for tracking timers, both for my keepalives and
979 def __init__(self, msg_in):
980 """Initialisation. based on defaults and OPEN message from peer.
983 msg_in: the OPEN message received from peer.
985 # Note: Relative time is always named timedelta, to stress that
986 # the (non-delta) time is absolute.
987 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
988 # Upper bound for being stuck in the same state, we should
989 # at least report something before continuing.
990 # Negotiate the hold timer by taking the smaller
991 # of the 2 values (mine and the peer's).
992 hold_timedelta = 180 # Not an attribute of self yet.
993 # TODO: Make the default value configurable,
994 # default value could mirror what peer said.
995 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
996 if hold_timedelta > peer_hold_timedelta:
997 hold_timedelta = peer_hold_timedelta
998 if hold_timedelta != 0 and hold_timedelta < 3:
999 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1000 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1001 self.hold_timedelta = hold_timedelta
1002 # If we do not hear from peer this long, we assume it has died.
1003 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1004 # Upper limit for duration between messages, to avoid being
1005 # declared to be dead.
1006 # The same as calling snapshot(), but also declares a field.
1007 self.snapshot_time = time.time()
1008 # Sometimes we need to store time. This is where to get
1009 # the value from afterwards. Time_keepalive may be too strict.
1010 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1011 # At this time point, peer will be declared dead.
1012 self.my_keepalive_time = None # to be set later
1013 # At this point, we should be sending keepalive message.
1016 """Store current time in instance data to use later."""
1017 # Read as time before something interesting was called.
1018 self.snapshot_time = time.time()
1020 def reset_peer_hold_time(self):
1021 """Move hold time to future as peer has just proven it still lives."""
1022 self.peer_hold_time = time.time() + self.hold_timedelta
1024 # Some methods could rely on self.snapshot_time, but it is better
1025 # to require user to provide it explicitly.
1026 def reset_my_keepalive_time(self, keepalive_time):
1027 """Calculate and set the next my KEEP ALIVE timeout time
1030 :keepalive_time: the initial value of the KEEP ALIVE timer
1032 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1034 def is_time_for_my_keepalive(self):
1035 """Check for my KEEP ALIVE timeout occurence"""
1036 if self.hold_timedelta == 0:
1038 return self.snapshot_time >= self.my_keepalive_time
1040 def get_next_event_time(self):
1041 """Set the time of the next expected or to be sent KEEP ALIVE"""
1042 if self.hold_timedelta == 0:
1043 return self.snapshot_time + 86400
1044 return min(self.my_keepalive_time, self.peer_hold_time)
1046 def check_peer_hold_time(self, snapshot_time):
1047 """Raise error if nothing was read from peer until specified time."""
1048 # Hold time = 0 means keepalive checking off.
1049 if self.hold_timedelta != 0:
1050 # time.time() may be too strict
1051 if snapshot_time > self.peer_hold_time:
1052 logger.error("Peer has overstepped the hold timer.")
1053 raise RuntimeError("Peer has overstepped the hold timer.")
1054 # TODO: Include hold_timedelta?
1055 # TODO: Add notification sending (attempt). That means
1056 # move to write tracker.
1059 class ReadTracker(object):
1060 """Class for tracking read of mesages chunk by chunk and
1064 def __init__(self, bgp_socket, timer):
1065 """The reader initialisation.
1068 bgp_socket: socket to be used for sending
1069 timer: timer to be used for scheduling
1071 # References to outside objects.
1072 self.socket = bgp_socket
1074 # BGP marker length plus length field length.
1075 self.header_length = 18
1076 # TODO: make it class (constant) attribute
1077 # Computation of where next chunk ends depends on whether
1078 # we are beyond length field.
1079 self.reading_header = True
1080 # Countdown towards next size computation.
1081 self.bytes_to_read = self.header_length
1082 # Incremental buffer for message under read.
1084 # Initialising counters
1085 self.updates_received = 0
1086 self.prefixes_introduced = 0
1087 self.prefixes_withdrawn = 0
1088 self.rx_idle_time = 0
1089 self.rx_activity_detected = True
1091 def read_message_chunk(self):
1092 """Read up to one message
1095 Currently it does not return anything.
1097 # TODO: We could return the whole message, currently not needed.
1098 # We assume the socket is readable.
1099 chunk_message = self.socket.recv(self.bytes_to_read)
1100 self.msg_in += chunk_message
1101 self.bytes_to_read -= len(chunk_message)
1102 # TODO: bytes_to_read < 0 is not possible, right?
1103 if not self.bytes_to_read:
1104 # Finished reading a logical block.
1105 if self.reading_header:
1106 # The logical block was a BGP header.
1107 # Now we know the size of the message.
1108 self.reading_header = False
1109 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1111 else: # We have finished reading the body of the message.
1112 # Peer has just proven it is still alive.
1113 self.timer.reset_peer_hold_time()
1114 # TODO: Do we want to count received messages?
1115 # This version ignores the received message.
1116 # TODO: Should we do validation and exit on anything
1117 # besides update or keepalive?
1118 # Prepare state for reading another message.
1119 message_type_hex = self.msg_in[self.header_length]
1120 if message_type_hex == "\x01":
1121 logger.info("OPEN message received: 0x%s",
1122 binascii.b2a_hex(self.msg_in))
1123 elif message_type_hex == "\x02":
1124 logger.debug("UPDATE message received: 0x%s",
1125 binascii.b2a_hex(self.msg_in))
1126 self.decode_update_message(self.msg_in)
1127 elif message_type_hex == "\x03":
1128 logger.info("NOTIFICATION message received: 0x%s",
1129 binascii.b2a_hex(self.msg_in))
1130 elif message_type_hex == "\x04":
1131 logger.info("KEEP ALIVE message received: 0x%s",
1132 binascii.b2a_hex(self.msg_in))
1134 logger.warning("Unexpected message received: 0x%s",
1135 binascii.b2a_hex(self.msg_in))
1137 self.reading_header = True
1138 self.bytes_to_read = self.header_length
1139 # We should not act upon peer_hold_time if we are reading
1140 # something right now.
1143 def decode_path_attributes(self, path_attributes_hex):
1144 """Decode the Path Attributes field (rfc4271#section-4.3)
1147 :path_attributes: path_attributes field to be decoded in hex
1151 hex_to_decode = path_attributes_hex
1153 while len(hex_to_decode):
1154 attr_flags_hex = hex_to_decode[0]
1155 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1156 # attr_optional_bit = attr_flags & 128
1157 # attr_transitive_bit = attr_flags & 64
1158 # attr_partial_bit = attr_flags & 32
1159 attr_extended_length_bit = attr_flags & 16
1161 attr_type_code_hex = hex_to_decode[1]
1162 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1164 if attr_extended_length_bit:
1165 attr_length_hex = hex_to_decode[2:4]
1166 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1167 attr_value_hex = hex_to_decode[4:4 + attr_length]
1168 hex_to_decode = hex_to_decode[4 + attr_length:]
1170 attr_length_hex = hex_to_decode[2]
1171 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1172 attr_value_hex = hex_to_decode[3:3 + attr_length]
1173 hex_to_decode = hex_to_decode[3 + attr_length:]
1175 if attr_type_code == 1:
1176 logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
1177 binascii.b2a_hex(attr_flags_hex))
1178 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1179 elif attr_type_code == 2:
1180 logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
1181 binascii.b2a_hex(attr_flags_hex))
1182 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1183 elif attr_type_code == 3:
1184 logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
1185 binascii.b2a_hex(attr_flags_hex))
1186 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1187 elif attr_type_code == 4:
1188 logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
1189 binascii.b2a_hex(attr_flags_hex))
1190 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1191 elif attr_type_code == 5:
1192 logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
1193 binascii.b2a_hex(attr_flags_hex))
1194 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1195 elif attr_type_code == 6:
1196 logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
1197 binascii.b2a_hex(attr_flags_hex))
1198 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1199 elif attr_type_code == 7:
1200 logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
1201 binascii.b2a_hex(attr_flags_hex))
1202 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1203 elif attr_type_code == 14: # rfc4760#section-3
1204 logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
1205 binascii.b2a_hex(attr_flags_hex))
1206 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1207 address_family_identifier_hex = attr_value_hex[0:2]
1208 logger.debug(" Address Family Identifier = 0x%s",
1209 binascii.b2a_hex(address_family_identifier_hex))
1210 subsequent_address_family_identifier_hex = attr_value_hex[2]
1211 logger.debug(" Subsequent Address Family Identifier = 0x%s",
1212 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1213 next_hop_netaddr_len_hex = attr_value_hex[3]
1214 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1215 logger.debug(" Length of Next Hop Network Address = 0x%s (%s)",
1216 binascii.b2a_hex(next_hop_netaddr_len_hex),
1217 next_hop_netaddr_len)
1218 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1219 logger.debug(" Network Address of Next Hop = 0x%s",
1220 binascii.b2a_hex(next_hop_netaddr_hex))
1221 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1222 logger.debug(" Reserved = 0x%s",
1223 binascii.b2a_hex(reserved_hex))
1224 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1225 logger.debug(" Network Layer Reachability Information = 0x%s",
1226 binascii.b2a_hex(nlri_hex))
1227 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1228 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1229 for prefix in nlri_prefix_list:
1230 logger.debug(" nlri_prefix_received: %s", prefix)
1231 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1232 elif attr_type_code == 15: # rfc4760#section-4
1233 logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
1234 binascii.b2a_hex(attr_flags_hex))
1235 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1236 address_family_identifier_hex = attr_value_hex[0:2]
1237 logger.debug(" Address Family Identifier = 0x%s",
1238 binascii.b2a_hex(address_family_identifier_hex))
1239 subsequent_address_family_identifier_hex = attr_value_hex[2]
1240 logger.debug(" Subsequent Address Family Identifier = 0x%s",
1241 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1242 wd_hex = attr_value_hex[3:]
1243 logger.debug(" Withdrawn Routes = 0x%s",
1244 binascii.b2a_hex(wd_hex))
1245 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1246 logger.debug(" Withdrawn routes prefix list: %s",
1248 for prefix in wdr_prefix_list:
1249 logger.debug(" withdrawn_prefix_received: %s", prefix)
1250 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1252 logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
1253 binascii.b2a_hex(attr_flags_hex))
1254 logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1257 def decode_update_message(self, msg):
1258 """Decode an UPDATE message (rfc4271#section-4.3)
1261 :msg: message to be decoded in hex
1265 logger.debug("Decoding update message:")
1266 # message header - marker
1267 marker_hex = msg[:16]
1268 logger.debug("Message header marker: 0x%s",
1269 binascii.b2a_hex(marker_hex))
1270 # message header - message length
1271 msg_length_hex = msg[16:18]
1272 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1273 logger.debug("Message lenght: 0x%s (%s)",
1274 binascii.b2a_hex(msg_length_hex), msg_length)
1275 # message header - message type
1276 msg_type_hex = msg[18:19]
1277 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1279 logger.debug("Message type: 0x%s (update)",
1280 binascii.b2a_hex(msg_type_hex))
1281 # withdrawn routes length
1282 wdr_length_hex = msg[19:21]
1283 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1284 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1285 binascii.b2a_hex(wdr_length_hex), wdr_length)
1287 wdr_hex = msg[21:21 + wdr_length]
1288 logger.debug("Withdrawn routes: 0x%s",
1289 binascii.b2a_hex(wdr_hex))
1290 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1291 logger.debug("Withdrawn routes prefix list: %s",
1293 for prefix in wdr_prefix_list:
1294 logger.debug("withdrawn_prefix_received: %s", prefix)
1295 # total path attribute length
1296 total_pa_length_offset = 21 + wdr_length
1297 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1298 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1299 logger.debug("Total path attribute lenght: 0x%s (%s)",
1300 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1302 pa_offset = total_pa_length_offset + 2
1303 pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1304 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1305 self.decode_path_attributes(pa_hex)
1306 # network layer reachability information length
1307 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1308 logger.debug("Calculated NLRI length: %s", nlri_length)
1309 # network layer reachability information
1310 nlri_offset = pa_offset + total_pa_length
1311 nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1312 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1313 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1314 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1315 for prefix in nlri_prefix_list:
1316 logger.debug("nlri_prefix_received: %s", prefix)
1318 self.updates_received += 1
1319 self.prefixes_introduced += len(nlri_prefix_list)
1320 self.prefixes_withdrawn += len(wdr_prefix_list)
1322 logger.error("Unexpeced message type 0x%s in 0x%s",
1323 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1325 def wait_for_read(self):
1326 """Read message until timeout (next expected event).
1329 Used when no more updates has to be sent to avoid busy-wait.
1330 Currently it does not return anything.
1332 # Compute time to the first predictable state change
1333 event_time = self.timer.get_next_event_time()
1334 # snapshot_time would be imprecise
1335 wait_timedelta = min(event_time - time.time(), 10)
1336 if wait_timedelta < 0:
1337 # The program got around to waiting to an event in "very near
1338 # future" so late that it became a "past" event, thus tell
1339 # "select" to not wait at all. Passing negative timedelta to
1340 # select() would lead to either waiting forever (for -1) or
1341 # select.error("Invalid parameter") (for everything else).
1343 # And wait for event or something to read.
1345 if not self.rx_activity_detected or not (self.updates_received % 100):
1346 # right time to write statistics to the log (not for every update and
1347 # not too frequently to avoid having large log files)
1348 logger.info("total_received_update_message_counter: %s",
1349 self.updates_received)
1350 logger.info("total_received_nlri_prefix_counter: %s",
1351 self.prefixes_introduced)
1352 logger.info("total_received_withdrawn_prefix_counter: %s",
1353 self.prefixes_withdrawn)
1355 start_time = time.time()
1356 select.select([self.socket], [], [self.socket], wait_timedelta)
1357 timedelta = time.time() - start_time
1358 self.rx_idle_time += timedelta
1359 self.rx_activity_detected = timedelta < 1
1361 if not self.rx_activity_detected or not (self.updates_received % 100):
1362 # right time to write statistics to the log (not for every update and
1363 # not too frequently to avoid having large log files)
1364 logger.info("... idle for %.3fs", timedelta)
1365 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1369 class WriteTracker(object):
1370 """Class tracking enqueueing messages and sending chunks of them."""
1372 def __init__(self, bgp_socket, generator, timer):
1373 """The writter initialisation.
1376 bgp_socket: socket to be used for sending
1377 generator: generator to be used for message generation
1378 timer: timer to be used for scheduling
1380 # References to outside objects,
1381 self.socket = bgp_socket
1382 self.generator = generator
1384 # Really new fields.
1385 # TODO: Would attribute docstrings add anything substantial?
1386 self.sending_message = False
1387 self.bytes_to_send = 0
1390 def enqueue_message_for_sending(self, message):
1391 """Enqueue message and change state.
1394 message: message to be enqueued into the msg_out buffer
1396 self.msg_out += message
1397 self.bytes_to_send += len(message)
1398 self.sending_message = True
1400 def send_message_chunk_is_whole(self):
1401 """Send enqueued data from msg_out buffer
1404 :return: true if no remaining data to send
1406 # We assume there is a msg_out to send and socket is writable.
1407 # print "going to send", repr(self.msg_out)
1408 self.timer.snapshot()
1409 bytes_sent = self.socket.send(self.msg_out)
1410 # Forget the part of message that was sent.
1411 self.msg_out = self.msg_out[bytes_sent:]
1412 self.bytes_to_send -= bytes_sent
1413 if not self.bytes_to_send:
1414 # TODO: Is it possible to hit negative bytes_to_send?
1415 self.sending_message = False
1416 # We should have reset hold timer on peer side.
1417 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1418 # The possible reason for not prioritizing reads is gone.
1423 class StateTracker(object):
1424 """Main loop has state so complex it warrants this separate class."""
1426 def __init__(self, bgp_socket, generator, timer):
1427 """The state tracker initialisation.
1430 bgp_socket: socket to be used for sending / receiving
1431 generator: generator to be used for message generation
1432 timer: timer to be used for scheduling
1434 # References to outside objects.
1435 self.socket = bgp_socket
1436 self.generator = generator
1439 self.reader = ReadTracker(bgp_socket, timer)
1440 self.writer = WriteTracker(bgp_socket, generator, timer)
1441 # Prioritization state.
1442 self.prioritize_writing = False
1443 # In general, we prioritize reading over writing. But in order
1444 # not to get blocked by neverending reads, we should
1445 # check whether we are not risking running out of holdtime.
1446 # So in some situations, this field is set to True to attempt
1447 # finishing sending a message, after which this field resets
1449 # TODO: Alternative is to switch fairly between reading and
1450 # writing (called round robin from now on).
1451 # Message counting is done in generator.
1453 def perform_one_loop_iteration(self):
1454 """ The main loop iteration
1457 Calculates priority, resolves all conditions, calls
1458 appropriate method and returns to caller to repeat.
1460 self.timer.snapshot()
1461 if not self.prioritize_writing:
1462 if self.timer.is_time_for_my_keepalive():
1463 if not self.writer.sending_message:
1464 # We need to schedule a keepalive ASAP.
1465 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1466 logger.info("KEEP ALIVE is sent.")
1467 # We are sending a message now, so let's prioritize it.
1468 self.prioritize_writing = True
1469 # Now we know what our priorities are, we have to check
1470 # which actions are available.
1471 # socket.socket() returns three lists,
1472 # we store them to list of lists.
1473 list_list = select.select([self.socket], [self.socket], [self.socket],
1474 self.timer.report_timedelta)
1475 read_list, write_list, except_list = list_list
1476 # Lists are unpacked, each is either [] or [self.socket],
1477 # so we will test them as boolean.
1479 logger.error("Exceptional state on the socket.")
1480 raise RuntimeError("Exceptional state on socket", self.socket)
1481 # We will do either read or write.
1482 if not (self.prioritize_writing and write_list):
1483 # Either we have no reason to rush writes,
1484 # or the socket is not writable.
1485 # We are focusing on reading here.
1486 if read_list: # there is something to read indeed
1487 # In this case we want to read chunk of message
1488 # and repeat the select,
1489 self.reader.read_message_chunk()
1491 # We were focusing on reading, but nothing to read was there.
1492 # Good time to check peer for hold timer.
1493 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1494 # Quiet on the read front, we can have attempt to write.
1496 # Either we really want to reset peer's view of our hold
1497 # timer, or there was nothing to read.
1498 # Were we in the middle of sending a message?
1499 if self.writer.sending_message:
1500 # Was it the end of a message?
1501 whole = self.writer.send_message_chunk_is_whole()
1502 # We were pressed to send something and we did it.
1503 if self.prioritize_writing and whole:
1504 # We prioritize reading again.
1505 self.prioritize_writing = False
1507 # Finally to check if still update messages to be generated.
1508 if self.generator.remaining_prefixes:
1509 msg_out = self.generator.compose_update_message()
1510 if not self.generator.remaining_prefixes:
1511 # We have just finished update generation,
1512 # end-of-rib is due.
1513 logger.info("All update messages generated.")
1514 logger.info("Storing performance results.")
1515 self.generator.store_results()
1516 logger.info("Finally an END-OF-RIB is sent.")
1517 msg_out += self.generator.update_message(wr_prefixes=[],
1519 self.writer.enqueue_message_for_sending(msg_out)
1520 # Attempt for real sending to be done in next iteration.
1522 # Nothing to write anymore.
1523 # To avoid busy loop, we do idle waiting here.
1524 self.reader.wait_for_read()
1526 # We can neither read nor write.
1527 logger.warning("Input and output both blocked for " +
1528 str(self.timer.report_timedelta) + " seconds.")
1529 # FIXME: Are we sure select has been really waiting
1534 if __name__ == "__main__":
1535 """ One time initialisation and iterations looping.
1538 Establish BGP connection and run iterations.
1540 arguments = parse_arguments()
1541 logger = logging.getLogger("logger")
1542 log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1543 console_handler = logging.StreamHandler()
1544 file_handler = logging.FileHandler(arguments.logfile, mode="w")
1545 console_handler.setFormatter(log_formatter)
1546 file_handler.setFormatter(log_formatter)
1547 logger.addHandler(console_handler)
1548 logger.addHandler(file_handler)
1549 logger.setLevel(arguments.loglevel)
1550 bgp_socket = establish_connection(arguments)
1551 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1552 # Receive open message before sending anything.
1553 # FIXME: Add parameter to send default open message first,
1554 # to work with "you first" peers.
1555 msg_in = read_open_message(bgp_socket)
1556 timer = TimeTracker(msg_in)
1557 generator = MessageGenerator(arguments)
1558 msg_out = generator.open_message()
1559 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1560 # Send our open message to the peer.
1561 bgp_socket.send(msg_out)
1562 # Wait for confirming keepalive.
1563 # TODO: Surely in just one packet?
1564 # Using exact keepalive length to not to see possible updates.
1565 msg_in = bgp_socket.recv(19)
1566 if msg_in != generator.keepalive_message():
1567 logger.error("Open not confirmed by keepalive, instead got " +
1568 binascii.hexlify(msg_in))
1569 raise MessageError("Open not confirmed by keepalive, instead got",
1571 timer.reset_peer_hold_time()
1572 # Send the keepalive to indicate the connection is accepted.
1573 timer.snapshot() # Remember this time.
1574 msg_out = generator.keepalive_message()
1575 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1576 bgp_socket.send(msg_out)
1577 # Use the remembered time.
1578 timer.reset_my_keepalive_time(timer.snapshot_time)
1579 # End of initial handshake phase.
1580 state = StateTracker(bgp_socket, generator, timer)
1581 while True: # main reactor loop
1582 state.perform_one_loop_iteration()