1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
29 from copy import deepcopy
32 def parse_arguments():
33 """Use argparse to get arguments,
38 parser = argparse.ArgumentParser()
39 # TODO: Should we use --argument-names-with-spaces?
40 str_help = "Autonomous System number use in the stream."
41 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
42 # FIXME: We are acting as iBGP peer,
43 # we should mirror AS number from peer's open message.
44 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
45 parser.add_argument("--amount", default="1", type=int, help=str_help)
46 str_help = "Maximum number of IP prefixes to be announced in one iteration"
47 parser.add_argument("--insert", default="1", type=int, help=str_help)
48 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
49 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
50 str_help = "The number of prefixes to process without withdrawals"
51 parser.add_argument("--prefill", default="0", type=int, help=str_help)
52 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
53 parser.add_argument("--updates", choices=["single", "separate"],
54 default=["separate"], help=str_help)
55 str_help = "Base prefix IP address for prefix generation"
56 parser.add_argument("--firstprefix", default="8.0.1.0",
57 type=ipaddr.IPv4Address, help=str_help)
58 str_help = "The prefix length."
59 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
60 str_help = "Listen for connection, instead of initiating it."
61 parser.add_argument("--listen", action="store_true", help=str_help)
62 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
63 "Default value only suitable for listening.")
64 parser.add_argument("--myip", default="0.0.0.0",
65 type=ipaddr.IPv4Address, help=str_help)
66 str_help = ("TCP port to bind to when listening or initiating connection." +
67 "Default only suitable for initiating.")
68 parser.add_argument("--myport", default="0", type=int, help=str_help)
69 str_help = "The IP of the next hop to be placed into the update messages."
70 parser.add_argument("--nexthop", default="192.0.2.1",
71 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
72 str_help = ("Numeric IP Address to try to connect to." +
73 "Currently no effect in listening mode.")
74 parser.add_argument("--peerip", default="127.0.0.2",
75 type=ipaddr.IPv4Address, help=str_help)
76 str_help = "TCP port to try to connect to. No effect in listening mode."
77 parser.add_argument("--peerport", default="179", type=int, help=str_help)
78 str_help = "Local hold time."
79 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
80 str_help = "Log level (--error, --warning, --info, --debug)"
81 parser.add_argument("--error", dest="loglevel", action="store_const",
82 const=logging.ERROR, default=logging.INFO,
84 parser.add_argument("--warning", dest="loglevel", action="store_const",
85 const=logging.WARNING, default=logging.INFO,
87 parser.add_argument("--info", dest="loglevel", action="store_const",
88 const=logging.INFO, default=logging.INFO,
90 parser.add_argument("--debug", dest="loglevel", action="store_const",
91 const=logging.DEBUG, default=logging.INFO,
93 str_help = "Log file name"
94 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
95 str_help = "Trailing part of the csv result files for plotting purposes"
96 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
97 str_help = "Minimum number of updates to reach to include result into csv."
98 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
99 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
100 parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
101 str_help = "How many play utilities are to be started."
102 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
103 arguments = parser.parse_args()
104 if arguments.multiplicity < 1:
105 print "Multiplicity", arguments.multiplicity, "is not positive."
107 # TODO: Are sanity checks (such as asnumber>=0) required?
111 def establish_connection(arguments):
112 """Establish connection to BGP peer.
115 :arguments: following command-line argumets are used
116 - arguments.myip: local IP address
117 - arguments.myport: local port
118 - arguments.peerip: remote IP address
119 - arguments.peerport: remote port
124 logger.info("Connecting in the listening mode.")
125 logger.debug("Local IP address: " + str(arguments.myip))
126 logger.debug("Local port: " + str(arguments.myport))
127 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
128 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
129 # bind need single tuple as argument
130 listening_socket.bind((str(arguments.myip), arguments.myport))
131 listening_socket.listen(1)
132 bgp_socket, _ = listening_socket.accept()
133 # TODO: Verify client IP is cotroller IP.
134 listening_socket.close()
136 logger.info("Connecting in the talking mode.")
137 logger.debug("Local IP address: " + str(arguments.myip))
138 logger.debug("Local port: " + str(arguments.myport))
139 logger.debug("Remote IP address: " + str(arguments.peerip))
140 logger.debug("Remote port: " + str(arguments.peerport))
141 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
142 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
143 # bind to force specified address and port
144 talking_socket.bind((str(arguments.myip), arguments.myport))
145 # socket does not spead ipaddr, hence str()
146 talking_socket.connect((str(arguments.peerip), arguments.peerport))
147 bgp_socket = talking_socket
148 logger.info("Connected to ODL.")
152 def get_short_int_from_message(message, offset=16):
153 """Extract 2-bytes number from provided message.
156 :message: given message
157 :offset: offset of the short_int inside the message
159 :return: required short_inf value.
161 default offset value is the BGP message size offset.
163 high_byte_int = ord(message[offset])
164 low_byte_int = ord(message[offset + 1])
165 short_int = high_byte_int * 256 + low_byte_int
169 def get_prefix_list_from_hex(prefixes_hex):
170 """Get decoded list of prefixes (rfc4271#section-4.3)
173 :prefixes_hex: list of prefixes to be decoded in hex
175 :return: list of prefixes in the form of ip address (X.X.X.X/X)
179 while offset < len(prefixes_hex):
180 prefix_bit_len_hex = prefixes_hex[offset]
181 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
182 prefix_len = ((prefix_bit_len - 1) / 8) + 1
183 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
184 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
185 offset += 1 + prefix_len
186 prefix_list.append(prefix + "/" + str(prefix_bit_len))
190 class MessageError(ValueError):
191 """Value error with logging optimized for hexlified messages."""
193 def __init__(self, text, message, *args):
196 Store and call super init for textual comment,
197 store raw message which caused it.
201 super(MessageError, self).__init__(text, message, *args)
204 """Generate human readable error message.
207 :return: human readable message as string
209 Use a placeholder string if the message is to be empty.
211 message = binascii.hexlify(self.msg)
213 message = "(empty message)"
214 return self.text + ": " + message
217 def read_open_message(bgp_socket):
218 """Receive peer's OPEN message
221 :bgp_socket: the socket to be read
223 :return: received OPEN message.
225 Performs just basic incomming message checks
227 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
228 # TODO: Can the incoming open message be split in more than one packet?
231 # 37 is minimal length of open message with 4-byte AS number.
232 logger.error("Got something else than open with 4-byte AS number: " +
233 binascii.hexlify(msg_in))
234 raise MessageError("Got something else than open with 4-byte AS number", msg_in)
235 # TODO: We could check BGP marker, but it is defined only later;
237 reported_length = get_short_int_from_message(msg_in)
238 if len(msg_in) != reported_length:
239 logger.error("Message length is not " + str(reported_length) +
240 " as stated in " + binascii.hexlify(msg_in))
241 raise MessageError("Message length is not " + reported_length +
242 " as stated in ", msg_in)
243 logger.info("Open message received.")
247 class MessageGenerator(object):
248 """Class which generates messages, holds states and configuration values."""
250 # TODO: Define bgp marker as a class (constant) variable.
251 def __init__(self, args):
252 """Initialisation according to command-line args.
255 :args: argsparser's Namespace object which contains command-line
256 options for MesageGenerator initialisation
258 Calculates and stores default values used later on for
261 self.total_prefix_amount = args.amount
262 # Number of update messages left to be sent.
263 self.remaining_prefixes = self.total_prefix_amount
265 # New parameters initialisation
267 self.prefix_base_default = args.firstprefix
268 self.prefix_length_default = args.prefixlen
269 self.wr_prefixes_default = []
270 self.nlri_prefixes_default = []
271 self.version_default = 4
272 self.my_autonomous_system_default = args.asnumber
273 self.hold_time_default = args.holdtime # Local hold time.
274 self.bgp_identifier_default = int(args.myip)
275 self.next_hop_default = args.nexthop
276 self.single_update_default = args.updates == "single"
277 self.randomize_updates_default = args.updates == "random"
278 self.prefix_count_to_add_default = args.insert
279 self.prefix_count_to_del_default = args.withdraw
280 if self.prefix_count_to_del_default < 0:
281 self.prefix_count_to_del_default = 0
282 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
283 # total number of prefixes must grow to avoid infinite test loop
284 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
285 self.slot_size_default = self.prefix_count_to_add_default
286 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
287 self.results_file_name_default = args.results
288 self.performance_threshold_default = args.threshold
289 self.rfc4760 = args.rfc4760 == "yes"
290 # Default values used for randomized part
291 s1_slots = ((self.total_prefix_amount -
292 self.remaining_prefixes_threshold - 1) /
293 self.prefix_count_to_add_default + 1)
294 s2_slots = ((self.remaining_prefixes_threshold - 1) /
295 (self.prefix_count_to_add_default -
296 self.prefix_count_to_del_default) + 1)
298 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
299 s2_first_index = s1_slots * self.prefix_count_to_add_default
300 s2_last_index = (s2_first_index +
301 s2_slots * (self.prefix_count_to_add_default -
302 self.prefix_count_to_del_default) - 1)
303 self.slot_gap_default = ((self.total_prefix_amount -
304 self.remaining_prefixes_threshold - 1) /
305 self.prefix_count_to_add_default + 1)
306 self.randomize_lowest_default = s2_first_index
307 self.randomize_highest_default = s2_last_index
309 # Initialising counters
310 self.phase1_start_time = 0
311 self.phase1_stop_time = 0
312 self.phase2_start_time = 0
313 self.phase2_stop_time = 0
314 self.phase1_updates_sent = 0
315 self.phase2_updates_sent = 0
316 self.updates_sent = 0
318 self.log_info = args.loglevel <= logging.INFO
319 self.log_debug = args.loglevel <= logging.DEBUG
321 Flags needed for the MessageGenerator performance optimization.
322 Calling logger methods each iteration even with proper log level set
323 slows down significantly the MessageGenerator performance.
324 Measured total generation time (1M updates, dry run, error log level):
325 - logging based on basic logger features: 36,2s
326 - logging based on advanced logger features (lazy logging): 21,2s
327 - conditional calling of logger methods enclosed inside condition: 8,6s
330 logger.info("Generator initialisation")
331 logger.info(" Target total number of prefixes to be introduced: " +
332 str(self.total_prefix_amount))
333 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
334 str(self.prefix_length_default))
335 logger.info(" My Autonomous System number: " +
336 str(self.my_autonomous_system_default))
337 logger.info(" My Hold Time: " + str(self.hold_time_default))
338 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
339 logger.info(" Next Hop: " + str(self.next_hop_default))
340 logger.info(" Prefix count to be inserted at once: " +
341 str(self.prefix_count_to_add_default))
342 logger.info(" Prefix count to be withdrawn at once: " +
343 str(self.prefix_count_to_del_default))
344 logger.info(" Fast pre-fill up to " +
345 str(self.total_prefix_amount -
346 self.remaining_prefixes_threshold) + " prefixes")
347 logger.info(" Remaining number of prefixes to be processed " +
348 "in parallel with withdrawals: " +
349 str(self.remaining_prefixes_threshold))
350 logger.debug(" Prefix index range used after pre-fill procedure [" +
351 str(self.randomize_lowest_default) + ", " +
352 str(self.randomize_highest_default) + "]")
353 if self.single_update_default:
354 logger.info(" Common single UPDATE will be generated " +
355 "for both NLRI & WITHDRAWN lists")
357 logger.info(" Two separate UPDATEs will be generated " +
358 "for each NLRI & WITHDRAWN lists")
359 if self.randomize_updates_default:
360 logger.info(" Generation of UPDATE messages will be randomized")
361 logger.info(" Let\'s go ...\n")
363 # TODO: Notification for hold timer expiration can be handy.
365 def store_results(self, file_name=None, threshold=None):
366 """ Stores specified results into files based on file_name value.
369 :param file_name: Trailing (common) part of result file names
370 :param threshold: Minimum number of sent updates needed for each
371 result to be included into result csv file
372 (mainly needed because of the result accuracy)
376 # default values handling
377 if file_name is None:
378 file_name = self.results_file_name_default
379 if threshold is None:
380 threshold = self.performance_threshold_default
381 # performance calculation
382 if self.phase1_updates_sent >= threshold:
383 totals1 = self.phase1_updates_sent
384 performance1 = int(self.phase1_updates_sent /
385 (self.phase1_stop_time - self.phase1_start_time))
389 if self.phase2_updates_sent >= threshold:
390 totals2 = self.phase2_updates_sent
391 performance2 = int(self.phase2_updates_sent /
392 (self.phase2_stop_time - self.phase2_start_time))
397 logger.info("#" * 10 + " Final results " + "#" * 10)
398 logger.info("Number of iterations: " + str(self.iteration))
399 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
400 str(self.phase1_updates_sent))
401 logger.info("The pre-fill phase duration: " +
402 str(self.phase1_stop_time - self.phase1_start_time) + "s")
403 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
404 str(self.phase2_updates_sent))
405 logger.info("The 2nd test phase duration: " +
406 str(self.phase2_stop_time - self.phase2_start_time) + "s")
407 logger.info("Threshold for performance reporting: " + str(threshold))
410 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
411 " route(s) per UPDATE")
412 if self.single_update_default:
413 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
414 "/-" + str(self.prefix_count_to_del_default) +
415 " routes per UPDATE")
417 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
418 "/-" + str(self.prefix_count_to_del_default) +
419 " routes in two UPDATEs")
420 # collecting capacity and performance results
423 if totals1 is not None:
424 totals[phase1_label] = totals1
425 performance[phase1_label] = performance1
426 if totals2 is not None:
427 totals[phase2_label] = totals2
428 performance[phase2_label] = performance2
429 self.write_results_to_file(totals, "totals-" + file_name)
430 self.write_results_to_file(performance, "performance-" + file_name)
432 def write_results_to_file(self, results, file_name):
433 """Writes results to the csv plot file consumable by Jenkins.
436 :param file_name: Name of the (csv) file to be created
442 f = open(file_name, "wt")
444 for key in sorted(results):
445 first_line += key + ", "
446 second_line += str(results[key]) + ", "
447 first_line = first_line[:-2]
448 second_line = second_line[:-2]
449 f.write(first_line + "\n")
450 f.write(second_line + "\n")
451 logger.info("Message generator performance results stored in " +
453 logger.info(" " + first_line)
454 logger.info(" " + second_line)
458 # Return pseudo-randomized (reproducible) index for selected range
459 def randomize_index(self, index, lowest=None, highest=None):
460 """Calculates pseudo-randomized index from selected range.
463 :param index: input index
464 :param lowest: the lowes index from the randomized area
465 :param highest: the highest index from the randomized area
467 :return: the (pseudo)randomized index
469 Created just as a fame for future generator enhancement.
471 # default values handling
473 lowest = self.randomize_lowest_default
475 highest = self.randomize_highest_default
477 if (index >= lowest) and (index <= highest):
478 # we are in the randomized range -> shuffle it inside
479 # the range (now just reverse the order)
480 new_index = highest - (index - lowest)
482 # we are out of the randomized range -> nothing to do
486 # Get list of prefixes
487 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
488 prefix_len=None, prefix_count=None, randomize=None):
489 """Generates list of IP address prefixes.
492 :param slot_index: index of group of prefix addresses
493 :param slot_size: size of group of prefix addresses
494 in [number of included prefixes]
495 :param prefix_base: IP address of the first prefix
496 (slot_index = 0, prefix_index = 0)
497 :param prefix_len: length of the prefix in bites
498 (the same as size of netmask)
499 :param prefix_count: number of prefixes to be returned
500 from the specified slot
502 :return: list of generated IP address prefixes
504 # default values handling
505 if slot_size is None:
506 slot_size = self.slot_size_default
507 if prefix_base is None:
508 prefix_base = self.prefix_base_default
509 if prefix_len is None:
510 prefix_len = self.prefix_length_default
511 if prefix_count is None:
512 prefix_count = slot_size
513 if randomize is None:
514 randomize = self.randomize_updates_default
515 # generating list of prefixes
518 prefix_gap = 2 ** (32 - prefix_len)
519 for i in range(prefix_count):
520 prefix_index = slot_index * slot_size + i
522 prefix_index = self.randomize_index(prefix_index)
523 indexes.append(prefix_index)
524 prefixes.append(prefix_base + prefix_index * prefix_gap)
526 logger.debug(" Prefix slot index: " + str(slot_index))
527 logger.debug(" Prefix slot size: " + str(slot_size))
528 logger.debug(" Prefix count: " + str(prefix_count))
529 logger.debug(" Prefix indexes: " + str(indexes))
530 logger.debug(" Prefix list: " + str(prefixes))
533 def compose_update_message(self, prefix_count_to_add=None,
534 prefix_count_to_del=None):
535 """Composes an UPDATE message
538 :param prefix_count_to_add: # of prefixes to put into NLRI list
539 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
541 :return: encoded UPDATE message in HEX
543 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
544 lists or common message wich includes both prefix lists.
545 Updates global counters.
547 # default values handling
548 if prefix_count_to_add is None:
549 prefix_count_to_add = self.prefix_count_to_add_default
550 if prefix_count_to_del is None:
551 prefix_count_to_del = self.prefix_count_to_del_default
553 if self.log_info and not (self.iteration % 1000):
554 logger.info("Iteration: " + str(self.iteration) +
555 " - total remaining prefixes: " +
556 str(self.remaining_prefixes))
558 logger.debug("#" * 10 + " Iteration: " +
559 str(self.iteration) + " " + "#" * 10)
560 logger.debug("Remaining prefixes: " +
561 str(self.remaining_prefixes))
562 # scenario type & one-shot counter
563 straightforward_scenario = (self.remaining_prefixes >
564 self.remaining_prefixes_threshold)
565 if straightforward_scenario:
566 prefix_count_to_del = 0
568 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
569 if not self.phase1_start_time:
570 self.phase1_start_time = time.time()
573 logger.debug("--- COMBINED SCENARIO ---")
574 if not self.phase2_start_time:
575 self.phase2_start_time = time.time()
576 # tailor the number of prefixes if needed
577 prefix_count_to_add = (prefix_count_to_del +
578 min(prefix_count_to_add - prefix_count_to_del,
579 self.remaining_prefixes))
580 # prefix slots selection for insertion and withdrawal
581 slot_index_to_add = self.iteration
582 slot_index_to_del = slot_index_to_add - self.slot_gap_default
583 # getting lists of prefixes for insertion in this iteration
585 logger.debug("Prefixes to be inserted in this iteration:")
586 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
587 prefix_count=prefix_count_to_add)
588 # getting lists of prefixes for withdrawal in this iteration
590 logger.debug("Prefixes to be withdrawn in this iteration:")
591 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
592 prefix_count=prefix_count_to_del)
593 # generating the mesage
594 if self.single_update_default:
595 # Send prefixes to be introduced and withdrawn
596 # in one UPDATE message
597 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
598 nlri_prefixes=prefix_list_to_add)
600 # Send prefixes to be introduced and withdrawn
601 # in separate UPDATE messages (if needed)
602 msg_out = self.update_message(wr_prefixes=[],
603 nlri_prefixes=prefix_list_to_add)
604 if prefix_count_to_del:
605 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
607 # updating counters - who knows ... maybe I am last time here ;)
608 if straightforward_scenario:
609 self.phase1_stop_time = time.time()
610 self.phase1_updates_sent = self.updates_sent
612 self.phase2_stop_time = time.time()
613 self.phase2_updates_sent = (self.updates_sent -
614 self.phase1_updates_sent)
615 # updating totals for the next iteration
617 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
618 # returning the encoded message
621 # Section of message encoders
623 def open_message(self, version=None, my_autonomous_system=None,
624 hold_time=None, bgp_identifier=None):
625 """Generates an OPEN Message (rfc4271#section-4.2)
628 :param version: see the rfc4271#section-4.2
629 :param my_autonomous_system: see the rfc4271#section-4.2
630 :param hold_time: see the rfc4271#section-4.2
631 :param bgp_identifier: see the rfc4271#section-4.2
633 :return: encoded OPEN message in HEX
636 # Default values handling
638 version = self.version_default
639 if my_autonomous_system is None:
640 my_autonomous_system = self.my_autonomous_system_default
641 if hold_time is None:
642 hold_time = self.hold_time_default
643 if bgp_identifier is None:
644 bgp_identifier = self.bgp_identifier_default
647 marker_hex = "\xFF" * 16
651 type_hex = struct.pack("B", type)
654 version_hex = struct.pack("B", version)
656 # my_autonomous_system
657 # AS_TRANS value, 23456 decadic.
658 my_autonomous_system_2_bytes = 23456
659 # AS number is mappable to 2 bytes
660 if my_autonomous_system < 65536:
661 my_autonomous_system_2_bytes = my_autonomous_system
662 my_autonomous_system_hex_2_bytes = struct.pack(">H",
663 my_autonomous_system)
666 hold_time_hex = struct.pack(">H", hold_time)
669 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
671 # Optional Parameters
672 optional_parameters_hex = ""
674 optional_parameter_hex = (
675 "\x02" # Param type ("Capability Ad")
676 "\x06" # Length (6 bytes)
677 "\x01" # Capability type (NLRI Unicast),
678 # see RFC 4760, secton 8
679 "\x04" # Capability value length
680 "\x00\x01" # AFI (Ipv4)
682 "\x01" # SAFI (Unicast)
684 optional_parameters_hex += optional_parameter_hex
686 optional_parameter_hex = (
687 "\x02" # Param type ("Capability Ad")
688 "\x06" # Length (6 bytes)
689 "\x41" # "32 bit AS Numbers Support"
690 # (see RFC 6793, section 3)
691 "\x04" # Capability value length
692 # My AS in 32 bit format
693 + struct.pack(">I", my_autonomous_system)
695 optional_parameters_hex += optional_parameter_hex
697 # Optional Parameters Length
698 optional_parameters_length = len(optional_parameters_hex)
699 optional_parameters_length_hex = struct.pack("B",
700 optional_parameters_length)
702 # Length (big-endian)
704 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
705 len(my_autonomous_system_hex_2_bytes) +
706 len(hold_time_hex) + len(bgp_identifier_hex) +
707 len(optional_parameters_length_hex) +
708 len(optional_parameters_hex)
710 length_hex = struct.pack(">H", length)
718 my_autonomous_system_hex_2_bytes +
721 optional_parameters_length_hex +
722 optional_parameters_hex
726 logger.debug("OPEN message encoding")
727 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
728 logger.debug(" Length=" + str(length) + " (0x" +
729 binascii.hexlify(length_hex) + ")")
730 logger.debug(" Type=" + str(type) + " (0x" +
731 binascii.hexlify(type_hex) + ")")
732 logger.debug(" Version=" + str(version) + " (0x" +
733 binascii.hexlify(version_hex) + ")")
734 logger.debug(" My Autonomous System=" +
735 str(my_autonomous_system_2_bytes) + " (0x" +
736 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
738 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
739 binascii.hexlify(hold_time_hex) + ")")
740 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
741 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
742 logger.debug(" Optional Parameters Length=" +
743 str(optional_parameters_length) + " (0x" +
744 binascii.hexlify(optional_parameters_length_hex) +
746 logger.debug(" Optional Parameters=0x" +
747 binascii.hexlify(optional_parameters_hex))
748 logger.debug("OPEN message encoded: 0x%s",
749 binascii.b2a_hex(message_hex))
753 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
754 wr_prefix_length=None, nlri_prefix_length=None,
755 my_autonomous_system=None, next_hop=None):
756 """Generates an UPDATE Message (rfc4271#section-4.3)
759 :param wr_prefixes: see the rfc4271#section-4.3
760 :param nlri_prefixes: see the rfc4271#section-4.3
761 :param wr_prefix_length: see the rfc4271#section-4.3
762 :param nlri_prefix_length: see the rfc4271#section-4.3
763 :param my_autonomous_system: see the rfc4271#section-4.3
764 :param next_hop: see the rfc4271#section-4.3
766 :return: encoded UPDATE message in HEX
769 # Default values handling
770 if wr_prefixes is None:
771 wr_prefixes = self.wr_prefixes_default
772 if nlri_prefixes is None:
773 nlri_prefixes = self.nlri_prefixes_default
774 if wr_prefix_length is None:
775 wr_prefix_length = self.prefix_length_default
776 if nlri_prefix_length is None:
777 nlri_prefix_length = self.prefix_length_default
778 if my_autonomous_system is None:
779 my_autonomous_system = self.my_autonomous_system_default
781 next_hop = self.next_hop_default
784 marker_hex = "\xFF" * 16
788 type_hex = struct.pack("B", type)
791 bytes = ((wr_prefix_length - 1) / 8) + 1
792 withdrawn_routes_hex = ""
793 for prefix in wr_prefixes:
794 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
795 struct.pack(">I", int(prefix))[:bytes])
796 withdrawn_routes_hex += withdrawn_route_hex
798 # Withdrawn Routes Length
799 withdrawn_routes_length = len(withdrawn_routes_hex)
800 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
802 # TODO: to replace hardcoded string by encoding?
804 if nlri_prefixes != []:
805 path_attributes_hex = (
806 "\x40" # Flags ("Well-Known")
807 "\x01" # Type (ORIGIN)
810 "\x40" # Flags ("Well-Known")
811 "\x02" # Type (AS_PATH)
813 "\x02" # AS segment type (AS_SEQUENCE)
814 "\x01" # AS segment length (1)
815 # AS segment (4 bytes)
816 + struct.pack(">I", my_autonomous_system) +
817 "\x40" # Flags ("Well-Known")
818 "\x03" # Type (NEXT_HOP)
820 # IP address of the next hop (4 bytes)
821 + struct.pack(">I", int(next_hop))
824 path_attributes_hex = ""
826 # Total Path Attributes Length
827 total_path_attributes_length = len(path_attributes_hex)
828 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
830 # Network Layer Reachability Information
831 bytes = ((nlri_prefix_length - 1) / 8) + 1
833 for prefix in nlri_prefixes:
834 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
835 struct.pack(">I", int(prefix))[:bytes])
836 nlri_hex += nlri_prefix_hex
838 # Length (big-endian)
840 len(marker_hex) + 2 + len(type_hex) +
841 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
842 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
844 length_hex = struct.pack(">H", length)
851 withdrawn_routes_length_hex +
852 withdrawn_routes_hex +
853 total_path_attributes_length_hex +
854 path_attributes_hex +
859 logger.debug("UPDATE message encoding")
860 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
861 logger.debug(" Length=" + str(length) + " (0x" +
862 binascii.hexlify(length_hex) + ")")
863 logger.debug(" Type=" + str(type) + " (0x" +
864 binascii.hexlify(type_hex) + ")")
865 logger.debug(" withdrawn_routes_length=" +
866 str(withdrawn_routes_length) + " (0x" +
867 binascii.hexlify(withdrawn_routes_length_hex) + ")")
868 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
869 str(wr_prefix_length) + " (0x" +
870 binascii.hexlify(withdrawn_routes_hex) + ")")
871 logger.debug(" Total Path Attributes Length=" +
872 str(total_path_attributes_length) + " (0x" +
873 binascii.hexlify(total_path_attributes_length_hex) +
875 logger.debug(" Path Attributes=" + "(0x" +
876 binascii.hexlify(path_attributes_hex) + ")")
877 logger.debug(" Network Layer Reachability Information=" +
878 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
879 " (0x" + binascii.hexlify(nlri_hex) + ")")
880 logger.debug("UPDATE message encoded: 0x" +
881 binascii.b2a_hex(message_hex))
884 self.updates_sent += 1
885 # returning encoded message
888 def notification_message(self, error_code, error_subcode, data_hex=""):
889 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
892 :param error_code: see the rfc4271#section-4.5
893 :param error_subcode: see the rfc4271#section-4.5
894 :param data_hex: see the rfc4271#section-4.5
896 :return: encoded NOTIFICATION message in HEX
900 marker_hex = "\xFF" * 16
904 type_hex = struct.pack("B", type)
907 error_code_hex = struct.pack("B", error_code)
910 error_subcode_hex = struct.pack("B", error_subcode)
912 # Length (big-endian)
913 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
914 len(error_subcode_hex) + len(data_hex))
915 length_hex = struct.pack(">H", length)
917 # NOTIFICATION Message
928 logger.debug("NOTIFICATION message encoding")
929 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
930 logger.debug(" Length=" + str(length) + " (0x" +
931 binascii.hexlify(length_hex) + ")")
932 logger.debug(" Type=" + str(type) + " (0x" +
933 binascii.hexlify(type_hex) + ")")
934 logger.debug(" Error Code=" + str(error_code) + " (0x" +
935 binascii.hexlify(error_code_hex) + ")")
936 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
937 binascii.hexlify(error_subcode_hex) + ")")
938 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
939 logger.debug("NOTIFICATION message encoded: 0x%s",
940 binascii.b2a_hex(message_hex))
944 def keepalive_message(self):
945 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
948 :return: encoded KEEP ALIVE message in HEX
952 marker_hex = "\xFF" * 16
956 type_hex = struct.pack("B", type)
958 # Length (big-endian)
959 length = len(marker_hex) + 2 + len(type_hex)
960 length_hex = struct.pack(">H", length)
970 logger.debug("KEEP ALIVE message encoding")
971 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
972 logger.debug(" Length=" + str(length) + " (0x" +
973 binascii.hexlify(length_hex) + ")")
974 logger.debug(" Type=" + str(type) + " (0x" +
975 binascii.hexlify(type_hex) + ")")
976 logger.debug("KEEP ALIVE message encoded: 0x%s",
977 binascii.b2a_hex(message_hex))
982 class TimeTracker(object):
983 """Class for tracking timers, both for my keepalives and
987 def __init__(self, msg_in):
988 """Initialisation. based on defaults and OPEN message from peer.
991 msg_in: the OPEN message received from peer.
993 # Note: Relative time is always named timedelta, to stress that
994 # the (non-delta) time is absolute.
995 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
996 # Upper bound for being stuck in the same state, we should
997 # at least report something before continuing.
998 # Negotiate the hold timer by taking the smaller
999 # of the 2 values (mine and the peer's).
1000 hold_timedelta = 180 # Not an attribute of self yet.
1001 # TODO: Make the default value configurable,
1002 # default value could mirror what peer said.
1003 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1004 if hold_timedelta > peer_hold_timedelta:
1005 hold_timedelta = peer_hold_timedelta
1006 if hold_timedelta != 0 and hold_timedelta < 3:
1007 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1008 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1009 self.hold_timedelta = hold_timedelta
1010 # If we do not hear from peer this long, we assume it has died.
1011 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1012 # Upper limit for duration between messages, to avoid being
1013 # declared to be dead.
1014 # The same as calling snapshot(), but also declares a field.
1015 self.snapshot_time = time.time()
1016 # Sometimes we need to store time. This is where to get
1017 # the value from afterwards. Time_keepalive may be too strict.
1018 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1019 # At this time point, peer will be declared dead.
1020 self.my_keepalive_time = None # to be set later
1021 # At this point, we should be sending keepalive message.
1024 """Store current time in instance data to use later."""
1025 # Read as time before something interesting was called.
1026 self.snapshot_time = time.time()
1028 def reset_peer_hold_time(self):
1029 """Move hold time to future as peer has just proven it still lives."""
1030 self.peer_hold_time = time.time() + self.hold_timedelta
1032 # Some methods could rely on self.snapshot_time, but it is better
1033 # to require user to provide it explicitly.
1034 def reset_my_keepalive_time(self, keepalive_time):
1035 """Calculate and set the next my KEEP ALIVE timeout time
1038 :keepalive_time: the initial value of the KEEP ALIVE timer
1040 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1042 def is_time_for_my_keepalive(self):
1043 """Check for my KEEP ALIVE timeout occurence"""
1044 if self.hold_timedelta == 0:
1046 return self.snapshot_time >= self.my_keepalive_time
1048 def get_next_event_time(self):
1049 """Set the time of the next expected or to be sent KEEP ALIVE"""
1050 if self.hold_timedelta == 0:
1051 return self.snapshot_time + 86400
1052 return min(self.my_keepalive_time, self.peer_hold_time)
1054 def check_peer_hold_time(self, snapshot_time):
1055 """Raise error if nothing was read from peer until specified time."""
1056 # Hold time = 0 means keepalive checking off.
1057 if self.hold_timedelta != 0:
1058 # time.time() may be too strict
1059 if snapshot_time > self.peer_hold_time:
1060 logger.error("Peer has overstepped the hold timer.")
1061 raise RuntimeError("Peer has overstepped the hold timer.")
1062 # TODO: Include hold_timedelta?
1063 # TODO: Add notification sending (attempt). That means
1064 # move to write tracker.
1067 class ReadTracker(object):
1068 """Class for tracking read of mesages chunk by chunk and
1072 def __init__(self, bgp_socket, timer):
1073 """The reader initialisation.
1076 bgp_socket: socket to be used for sending
1077 timer: timer to be used for scheduling
1079 # References to outside objects.
1080 self.socket = bgp_socket
1082 # BGP marker length plus length field length.
1083 self.header_length = 18
1084 # TODO: make it class (constant) attribute
1085 # Computation of where next chunk ends depends on whether
1086 # we are beyond length field.
1087 self.reading_header = True
1088 # Countdown towards next size computation.
1089 self.bytes_to_read = self.header_length
1090 # Incremental buffer for message under read.
1092 # Initialising counters
1093 self.updates_received = 0
1094 self.prefixes_introduced = 0
1095 self.prefixes_withdrawn = 0
1096 self.rx_idle_time = 0
1097 self.rx_activity_detected = True
1099 def read_message_chunk(self):
1100 """Read up to one message
1103 Currently it does not return anything.
1105 # TODO: We could return the whole message, currently not needed.
1106 # We assume the socket is readable.
1107 chunk_message = self.socket.recv(self.bytes_to_read)
1108 self.msg_in += chunk_message
1109 self.bytes_to_read -= len(chunk_message)
1110 # TODO: bytes_to_read < 0 is not possible, right?
1111 if not self.bytes_to_read:
1112 # Finished reading a logical block.
1113 if self.reading_header:
1114 # The logical block was a BGP header.
1115 # Now we know the size of the message.
1116 self.reading_header = False
1117 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1119 else: # We have finished reading the body of the message.
1120 # Peer has just proven it is still alive.
1121 self.timer.reset_peer_hold_time()
1122 # TODO: Do we want to count received messages?
1123 # This version ignores the received message.
1124 # TODO: Should we do validation and exit on anything
1125 # besides update or keepalive?
1126 # Prepare state for reading another message.
1127 message_type_hex = self.msg_in[self.header_length]
1128 if message_type_hex == "\x01":
1129 logger.info("OPEN message received: 0x%s",
1130 binascii.b2a_hex(self.msg_in))
1131 elif message_type_hex == "\x02":
1132 logger.debug("UPDATE message received: 0x%s",
1133 binascii.b2a_hex(self.msg_in))
1134 self.decode_update_message(self.msg_in)
1135 elif message_type_hex == "\x03":
1136 logger.info("NOTIFICATION message received: 0x%s",
1137 binascii.b2a_hex(self.msg_in))
1138 elif message_type_hex == "\x04":
1139 logger.info("KEEP ALIVE message received: 0x%s",
1140 binascii.b2a_hex(self.msg_in))
1142 logger.warning("Unexpected message received: 0x%s",
1143 binascii.b2a_hex(self.msg_in))
1145 self.reading_header = True
1146 self.bytes_to_read = self.header_length
1147 # We should not act upon peer_hold_time if we are reading
1148 # something right now.
1151 def decode_path_attributes(self, path_attributes_hex):
1152 """Decode the Path Attributes field (rfc4271#section-4.3)
1155 :path_attributes: path_attributes field to be decoded in hex
1159 hex_to_decode = path_attributes_hex
1161 while len(hex_to_decode):
1162 attr_flags_hex = hex_to_decode[0]
1163 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1164 # attr_optional_bit = attr_flags & 128
1165 # attr_transitive_bit = attr_flags & 64
1166 # attr_partial_bit = attr_flags & 32
1167 attr_extended_length_bit = attr_flags & 16
1169 attr_type_code_hex = hex_to_decode[1]
1170 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1172 if attr_extended_length_bit:
1173 attr_length_hex = hex_to_decode[2:4]
1174 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1175 attr_value_hex = hex_to_decode[4:4 + attr_length]
1176 hex_to_decode = hex_to_decode[4 + attr_length:]
1178 attr_length_hex = hex_to_decode[2]
1179 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1180 attr_value_hex = hex_to_decode[3:3 + attr_length]
1181 hex_to_decode = hex_to_decode[3 + attr_length:]
1183 if attr_type_code == 1:
1184 logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
1185 binascii.b2a_hex(attr_flags_hex))
1186 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1187 elif attr_type_code == 2:
1188 logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
1189 binascii.b2a_hex(attr_flags_hex))
1190 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1191 elif attr_type_code == 3:
1192 logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
1193 binascii.b2a_hex(attr_flags_hex))
1194 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1195 elif attr_type_code == 4:
1196 logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
1197 binascii.b2a_hex(attr_flags_hex))
1198 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1199 elif attr_type_code == 5:
1200 logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
1201 binascii.b2a_hex(attr_flags_hex))
1202 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1203 elif attr_type_code == 6:
1204 logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
1205 binascii.b2a_hex(attr_flags_hex))
1206 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1207 elif attr_type_code == 7:
1208 logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
1209 binascii.b2a_hex(attr_flags_hex))
1210 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1211 elif attr_type_code == 14: # rfc4760#section-3
1212 logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
1213 binascii.b2a_hex(attr_flags_hex))
1214 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1215 address_family_identifier_hex = attr_value_hex[0:2]
1216 logger.debug(" Address Family Identifier = 0x%s",
1217 binascii.b2a_hex(address_family_identifier_hex))
1218 subsequent_address_family_identifier_hex = attr_value_hex[2]
1219 logger.debug(" Subsequent Address Family Identifier = 0x%s",
1220 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1221 next_hop_netaddr_len_hex = attr_value_hex[3]
1222 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1223 logger.debug(" Length of Next Hop Network Address = 0x%s (%s)",
1224 binascii.b2a_hex(next_hop_netaddr_len_hex),
1225 next_hop_netaddr_len)
1226 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1227 logger.debug(" Network Address of Next Hop = 0x%s",
1228 binascii.b2a_hex(next_hop_netaddr_hex))
1229 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1230 logger.debug(" Reserved = 0x%s",
1231 binascii.b2a_hex(reserved_hex))
1232 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1233 logger.debug(" Network Layer Reachability Information = 0x%s",
1234 binascii.b2a_hex(nlri_hex))
1235 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1236 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1237 for prefix in nlri_prefix_list:
1238 logger.debug(" nlri_prefix_received: %s", prefix)
1239 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1240 elif attr_type_code == 15: # rfc4760#section-4
1241 logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
1242 binascii.b2a_hex(attr_flags_hex))
1243 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1244 address_family_identifier_hex = attr_value_hex[0:2]
1245 logger.debug(" Address Family Identifier = 0x%s",
1246 binascii.b2a_hex(address_family_identifier_hex))
1247 subsequent_address_family_identifier_hex = attr_value_hex[2]
1248 logger.debug(" Subsequent Address Family Identifier = 0x%s",
1249 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1250 wd_hex = attr_value_hex[3:]
1251 logger.debug(" Withdrawn Routes = 0x%s",
1252 binascii.b2a_hex(wd_hex))
1253 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1254 logger.debug(" Withdrawn routes prefix list: %s",
1256 for prefix in wdr_prefix_list:
1257 logger.debug(" withdrawn_prefix_received: %s", prefix)
1258 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1260 logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
1261 binascii.b2a_hex(attr_flags_hex))
1262 logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1265 def decode_update_message(self, msg):
1266 """Decode an UPDATE message (rfc4271#section-4.3)
1269 :msg: message to be decoded in hex
1273 logger.debug("Decoding update message:")
1274 # message header - marker
1275 marker_hex = msg[:16]
1276 logger.debug("Message header marker: 0x%s",
1277 binascii.b2a_hex(marker_hex))
1278 # message header - message length
1279 msg_length_hex = msg[16:18]
1280 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1281 logger.debug("Message lenght: 0x%s (%s)",
1282 binascii.b2a_hex(msg_length_hex), msg_length)
1283 # message header - message type
1284 msg_type_hex = msg[18:19]
1285 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1287 logger.debug("Message type: 0x%s (update)",
1288 binascii.b2a_hex(msg_type_hex))
1289 # withdrawn routes length
1290 wdr_length_hex = msg[19:21]
1291 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1292 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1293 binascii.b2a_hex(wdr_length_hex), wdr_length)
1295 wdr_hex = msg[21:21 + wdr_length]
1296 logger.debug("Withdrawn routes: 0x%s",
1297 binascii.b2a_hex(wdr_hex))
1298 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1299 logger.debug("Withdrawn routes prefix list: %s",
1301 for prefix in wdr_prefix_list:
1302 logger.debug("withdrawn_prefix_received: %s", prefix)
1303 # total path attribute length
1304 total_pa_length_offset = 21 + wdr_length
1305 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1306 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1307 logger.debug("Total path attribute lenght: 0x%s (%s)",
1308 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1310 pa_offset = total_pa_length_offset + 2
1311 pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1312 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1313 self.decode_path_attributes(pa_hex)
1314 # network layer reachability information length
1315 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1316 logger.debug("Calculated NLRI length: %s", nlri_length)
1317 # network layer reachability information
1318 nlri_offset = pa_offset + total_pa_length
1319 nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1320 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1321 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1322 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1323 for prefix in nlri_prefix_list:
1324 logger.debug("nlri_prefix_received: %s", prefix)
1326 self.updates_received += 1
1327 self.prefixes_introduced += len(nlri_prefix_list)
1328 self.prefixes_withdrawn += len(wdr_prefix_list)
1330 logger.error("Unexpeced message type 0x%s in 0x%s",
1331 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1333 def wait_for_read(self):
1334 """Read message until timeout (next expected event).
1337 Used when no more updates has to be sent to avoid busy-wait.
1338 Currently it does not return anything.
1340 # Compute time to the first predictable state change
1341 event_time = self.timer.get_next_event_time()
1342 # snapshot_time would be imprecise
1343 wait_timedelta = min(event_time - time.time(), 10)
1344 if wait_timedelta < 0:
1345 # The program got around to waiting to an event in "very near
1346 # future" so late that it became a "past" event, thus tell
1347 # "select" to not wait at all. Passing negative timedelta to
1348 # select() would lead to either waiting forever (for -1) or
1349 # select.error("Invalid parameter") (for everything else).
1351 # And wait for event or something to read.
1353 if not self.rx_activity_detected or not (self.updates_received % 100):
1354 # right time to write statistics to the log (not for every update and
1355 # not too frequently to avoid having large log files)
1356 logger.info("total_received_update_message_counter: %s",
1357 self.updates_received)
1358 logger.info("total_received_nlri_prefix_counter: %s",
1359 self.prefixes_introduced)
1360 logger.info("total_received_withdrawn_prefix_counter: %s",
1361 self.prefixes_withdrawn)
1363 start_time = time.time()
1364 select.select([self.socket], [], [self.socket], wait_timedelta)
1365 timedelta = time.time() - start_time
1366 self.rx_idle_time += timedelta
1367 self.rx_activity_detected = timedelta < 1
1369 if not self.rx_activity_detected or not (self.updates_received % 100):
1370 # right time to write statistics to the log (not for every update and
1371 # not too frequently to avoid having large log files)
1372 logger.info("... idle for %.3fs", timedelta)
1373 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1377 class WriteTracker(object):
1378 """Class tracking enqueueing messages and sending chunks of them."""
1380 def __init__(self, bgp_socket, generator, timer):
1381 """The writter initialisation.
1384 bgp_socket: socket to be used for sending
1385 generator: generator to be used for message generation
1386 timer: timer to be used for scheduling
1388 # References to outside objects,
1389 self.socket = bgp_socket
1390 self.generator = generator
1392 # Really new fields.
1393 # TODO: Would attribute docstrings add anything substantial?
1394 self.sending_message = False
1395 self.bytes_to_send = 0
1398 def enqueue_message_for_sending(self, message):
1399 """Enqueue message and change state.
1402 message: message to be enqueued into the msg_out buffer
1404 self.msg_out += message
1405 self.bytes_to_send += len(message)
1406 self.sending_message = True
1408 def send_message_chunk_is_whole(self):
1409 """Send enqueued data from msg_out buffer
1412 :return: true if no remaining data to send
1414 # We assume there is a msg_out to send and socket is writable.
1415 # print "going to send", repr(self.msg_out)
1416 self.timer.snapshot()
1417 bytes_sent = self.socket.send(self.msg_out)
1418 # Forget the part of message that was sent.
1419 self.msg_out = self.msg_out[bytes_sent:]
1420 self.bytes_to_send -= bytes_sent
1421 if not self.bytes_to_send:
1422 # TODO: Is it possible to hit negative bytes_to_send?
1423 self.sending_message = False
1424 # We should have reset hold timer on peer side.
1425 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1426 # The possible reason for not prioritizing reads is gone.
1431 class StateTracker(object):
1432 """Main loop has state so complex it warrants this separate class."""
1434 def __init__(self, bgp_socket, generator, timer):
1435 """The state tracker initialisation.
1438 bgp_socket: socket to be used for sending / receiving
1439 generator: generator to be used for message generation
1440 timer: timer to be used for scheduling
1442 # References to outside objects.
1443 self.socket = bgp_socket
1444 self.generator = generator
1447 self.reader = ReadTracker(bgp_socket, timer)
1448 self.writer = WriteTracker(bgp_socket, generator, timer)
1449 # Prioritization state.
1450 self.prioritize_writing = False
1451 # In general, we prioritize reading over writing. But in order
1452 # not to get blocked by neverending reads, we should
1453 # check whether we are not risking running out of holdtime.
1454 # So in some situations, this field is set to True to attempt
1455 # finishing sending a message, after which this field resets
1457 # TODO: Alternative is to switch fairly between reading and
1458 # writing (called round robin from now on).
1459 # Message counting is done in generator.
1461 def perform_one_loop_iteration(self):
1462 """ The main loop iteration
1465 Calculates priority, resolves all conditions, calls
1466 appropriate method and returns to caller to repeat.
1468 self.timer.snapshot()
1469 if not self.prioritize_writing:
1470 if self.timer.is_time_for_my_keepalive():
1471 if not self.writer.sending_message:
1472 # We need to schedule a keepalive ASAP.
1473 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1474 logger.info("KEEP ALIVE is sent.")
1475 # We are sending a message now, so let's prioritize it.
1476 self.prioritize_writing = True
1477 # Now we know what our priorities are, we have to check
1478 # which actions are available.
1479 # socket.socket() returns three lists,
1480 # we store them to list of lists.
1481 list_list = select.select([self.socket], [self.socket], [self.socket],
1482 self.timer.report_timedelta)
1483 read_list, write_list, except_list = list_list
1484 # Lists are unpacked, each is either [] or [self.socket],
1485 # so we will test them as boolean.
1487 logger.error("Exceptional state on the socket.")
1488 raise RuntimeError("Exceptional state on socket", self.socket)
1489 # We will do either read or write.
1490 if not (self.prioritize_writing and write_list):
1491 # Either we have no reason to rush writes,
1492 # or the socket is not writable.
1493 # We are focusing on reading here.
1494 if read_list: # there is something to read indeed
1495 # In this case we want to read chunk of message
1496 # and repeat the select,
1497 self.reader.read_message_chunk()
1499 # We were focusing on reading, but nothing to read was there.
1500 # Good time to check peer for hold timer.
1501 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1502 # Quiet on the read front, we can have attempt to write.
1504 # Either we really want to reset peer's view of our hold
1505 # timer, or there was nothing to read.
1506 # Were we in the middle of sending a message?
1507 if self.writer.sending_message:
1508 # Was it the end of a message?
1509 whole = self.writer.send_message_chunk_is_whole()
1510 # We were pressed to send something and we did it.
1511 if self.prioritize_writing and whole:
1512 # We prioritize reading again.
1513 self.prioritize_writing = False
1515 # Finally to check if still update messages to be generated.
1516 if self.generator.remaining_prefixes:
1517 msg_out = self.generator.compose_update_message()
1518 if not self.generator.remaining_prefixes:
1519 # We have just finished update generation,
1520 # end-of-rib is due.
1521 logger.info("All update messages generated.")
1522 logger.info("Storing performance results.")
1523 self.generator.store_results()
1524 logger.info("Finally an END-OF-RIB is sent.")
1525 msg_out += self.generator.update_message(wr_prefixes=[],
1527 self.writer.enqueue_message_for_sending(msg_out)
1528 # Attempt for real sending to be done in next iteration.
1530 # Nothing to write anymore.
1531 # To avoid busy loop, we do idle waiting here.
1532 self.reader.wait_for_read()
1534 # We can neither read nor write.
1535 logger.warning("Input and output both blocked for " +
1536 str(self.timer.report_timedelta) + " seconds.")
1537 # FIXME: Are we sure select has been really waiting
1542 def create_logger(loglevel, logfile):
1543 """Create logger object
1546 :loglevel: log level
1547 :logfile: log file name
1549 :return: logger object
1551 logger = logging.getLogger("logger")
1552 log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1553 console_handler = logging.StreamHandler()
1554 file_handler = logging.FileHandler(logfile, mode="w")
1555 console_handler.setFormatter(log_formatter)
1556 file_handler.setFormatter(log_formatter)
1557 logger.addHandler(console_handler)
1558 logger.addHandler(file_handler)
1559 logger.setLevel(loglevel)
1564 """One time initialisation and iterations looping.
1566 Establish BGP connection and run iterations.
1569 :arguments: Command line arguments
1573 bgp_socket = establish_connection(arguments)
1574 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1575 # Receive open message before sending anything.
1576 # FIXME: Add parameter to send default open message first,
1577 # to work with "you first" peers.
1578 msg_in = read_open_message(bgp_socket)
1579 timer = TimeTracker(msg_in)
1580 generator = MessageGenerator(arguments)
1581 msg_out = generator.open_message()
1582 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1583 # Send our open message to the peer.
1584 bgp_socket.send(msg_out)
1585 # Wait for confirming keepalive.
1586 # TODO: Surely in just one packet?
1587 # Using exact keepalive length to not to see possible updates.
1588 msg_in = bgp_socket.recv(19)
1589 if msg_in != generator.keepalive_message():
1590 logger.error("Open not confirmed by keepalive, instead got " +
1591 binascii.hexlify(msg_in))
1592 raise MessageError("Open not confirmed by keepalive, instead got",
1594 timer.reset_peer_hold_time()
1595 # Send the keepalive to indicate the connection is accepted.
1596 timer.snapshot() # Remember this time.
1597 msg_out = generator.keepalive_message()
1598 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1599 bgp_socket.send(msg_out)
1600 # Use the remembered time.
1601 timer.reset_my_keepalive_time(timer.snapshot_time)
1602 # End of initial handshake phase.
1603 state = StateTracker(bgp_socket, generator, timer)
1604 while True: # main reactor loop
1605 state.perform_one_loop_iteration()
1608 def threaded_job(arguments):
1609 """Run the job threaded
1612 :arguments: Command line arguments
1616 amount_left = arguments.amount
1617 utils_left = arguments.multiplicity
1618 prefix_current = arguments.firstprefix
1619 myip_current = arguments.myip
1623 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1624 amount_left -= amount_per_util
1627 args = deepcopy(arguments)
1628 args.amount = amount_per_util
1629 args.firstprefix = prefix_current
1630 args.myip = myip_current
1631 thread_args.append(args)
1635 prefix_current += amount_per_util * 16
1640 for t in thread_args:
1641 thread.start_new_thread(job, (t,))
1643 print "Error: unable to start thread."
1646 # Work remains forever
1651 if __name__ == "__main__":
1652 arguments = parse_arguments()
1653 logger = create_logger(arguments.loglevel, arguments.logfile)
1654 if arguments.multiplicity > 1:
1655 threaded_job(arguments)