1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
24 from copy import deepcopy
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
33 def parse_arguments():
34 """Use argparse to get arguments,
39 parser = argparse.ArgumentParser()
40 # TODO: Should we use --argument-names-with-spaces?
41 str_help = "Autonomous System number use in the stream."
42 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43 # FIXME: We are acting as iBGP peer,
44 # we should mirror AS number from peer's open message.
45 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46 parser.add_argument("--amount", default="1", type=int, help=str_help)
47 str_help = "Maximum number of IP prefixes to be announced in one iteration"
48 parser.add_argument("--insert", default="1", type=int, help=str_help)
49 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51 str_help = "The number of prefixes to process without withdrawals"
52 parser.add_argument("--prefill", default="0", type=int, help=str_help)
53 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54 parser.add_argument("--updates", choices=["single", "separate"],
55 default=["separate"], help=str_help)
56 str_help = "Base prefix IP address for prefix generation"
57 parser.add_argument("--firstprefix", default="8.0.1.0",
58 type=ipaddr.IPv4Address, help=str_help)
59 str_help = "The prefix length."
60 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61 str_help = "Listen for connection, instead of initiating it."
62 parser.add_argument("--listen", action="store_true", help=str_help)
63 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64 "Default value only suitable for listening.")
65 parser.add_argument("--myip", default="0.0.0.0",
66 type=ipaddr.IPv4Address, help=str_help)
67 str_help = ("TCP port to bind to when listening or initiating connection." +
68 "Default only suitable for initiating.")
69 parser.add_argument("--myport", default="0", type=int, help=str_help)
70 str_help = "The IP of the next hop to be placed into the update messages."
71 parser.add_argument("--nexthop", default="192.0.2.1",
72 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73 str_help = ("Numeric IP Address to try to connect to." +
74 "Currently no effect in listening mode.")
75 parser.add_argument("--peerip", default="127.0.0.2",
76 type=ipaddr.IPv4Address, help=str_help)
77 str_help = "TCP port to try to connect to. No effect in listening mode."
78 parser.add_argument("--peerport", default="179", type=int, help=str_help)
79 str_help = "Local hold time."
80 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
81 str_help = "Log level (--error, --warning, --info, --debug)"
82 parser.add_argument("--error", dest="loglevel", action="store_const",
83 const=logging.ERROR, default=logging.INFO,
85 parser.add_argument("--warning", dest="loglevel", action="store_const",
86 const=logging.WARNING, default=logging.INFO,
88 parser.add_argument("--info", dest="loglevel", action="store_const",
89 const=logging.INFO, default=logging.INFO,
91 parser.add_argument("--debug", dest="loglevel", action="store_const",
92 const=logging.DEBUG, default=logging.INFO,
94 str_help = "Log file name"
95 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
96 str_help = "Trailing part of the csv result files for plotting purposes"
97 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
98 str_help = "Minimum number of updates to reach to include result into csv."
99 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
100 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
101 parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
102 str_help = "How many play utilities are to be started."
103 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
104 arguments = parser.parse_args()
105 if arguments.multiplicity < 1:
106 print "Multiplicity", arguments.multiplicity, "is not positive."
108 # TODO: Are sanity checks (such as asnumber>=0) required?
112 def establish_connection(arguments):
113 """Establish connection to BGP peer.
116 :arguments: following command-line argumets are used
117 - arguments.myip: local IP address
118 - arguments.myport: local port
119 - arguments.peerip: remote IP address
120 - arguments.peerport: remote port
125 logger.info("Connecting in the listening mode.")
126 logger.debug("Local IP address: " + str(arguments.myip))
127 logger.debug("Local port: " + str(arguments.myport))
128 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
130 # bind need single tuple as argument
131 listening_socket.bind((str(arguments.myip), arguments.myport))
132 listening_socket.listen(1)
133 bgp_socket, _ = listening_socket.accept()
134 # TODO: Verify client IP is cotroller IP.
135 listening_socket.close()
137 logger.info("Connecting in the talking mode.")
138 logger.debug("Local IP address: " + str(arguments.myip))
139 logger.debug("Local port: " + str(arguments.myport))
140 logger.debug("Remote IP address: " + str(arguments.peerip))
141 logger.debug("Remote port: " + str(arguments.peerport))
142 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
143 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
144 # bind to force specified address and port
145 talking_socket.bind((str(arguments.myip), arguments.myport))
146 # socket does not spead ipaddr, hence str()
147 talking_socket.connect((str(arguments.peerip), arguments.peerport))
148 bgp_socket = talking_socket
149 logger.info("Connected to ODL.")
153 def get_short_int_from_message(message, offset=16):
154 """Extract 2-bytes number from provided message.
157 :message: given message
158 :offset: offset of the short_int inside the message
160 :return: required short_inf value.
162 default offset value is the BGP message size offset.
164 high_byte_int = ord(message[offset])
165 low_byte_int = ord(message[offset + 1])
166 short_int = high_byte_int * 256 + low_byte_int
170 def get_prefix_list_from_hex(prefixes_hex):
171 """Get decoded list of prefixes (rfc4271#section-4.3)
174 :prefixes_hex: list of prefixes to be decoded in hex
176 :return: list of prefixes in the form of ip address (X.X.X.X/X)
180 while offset < len(prefixes_hex):
181 prefix_bit_len_hex = prefixes_hex[offset]
182 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
183 prefix_len = ((prefix_bit_len - 1) / 8) + 1
184 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
185 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
186 offset += 1 + prefix_len
187 prefix_list.append(prefix + "/" + str(prefix_bit_len))
191 class MessageError(ValueError):
192 """Value error with logging optimized for hexlified messages."""
194 def __init__(self, text, message, *args):
197 Store and call super init for textual comment,
198 store raw message which caused it.
202 super(MessageError, self).__init__(text, message, *args)
205 """Generate human readable error message.
208 :return: human readable message as string
210 Use a placeholder string if the message is to be empty.
212 message = binascii.hexlify(self.msg)
214 message = "(empty message)"
215 return self.text + ": " + message
218 def read_open_message(bgp_socket):
219 """Receive peer's OPEN message
222 :bgp_socket: the socket to be read
224 :return: received OPEN message.
226 Performs just basic incomming message checks
228 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
229 # TODO: Can the incoming open message be split in more than one packet?
232 # 37 is minimal length of open message with 4-byte AS number.
233 logger.error("Got something else than open with 4-byte AS number: " +
234 binascii.hexlify(msg_in))
235 raise MessageError("Got something else than open with 4-byte AS number", msg_in)
236 # TODO: We could check BGP marker, but it is defined only later;
238 reported_length = get_short_int_from_message(msg_in)
239 if len(msg_in) != reported_length:
240 logger.error("Message length is not " + str(reported_length) +
241 " as stated in " + binascii.hexlify(msg_in))
242 raise MessageError("Message length is not " + reported_length +
243 " as stated in ", msg_in)
244 logger.info("Open message received.")
248 class MessageGenerator(object):
249 """Class which generates messages, holds states and configuration values."""
251 # TODO: Define bgp marker as a class (constant) variable.
252 def __init__(self, args):
253 """Initialisation according to command-line args.
256 :args: argsparser's Namespace object which contains command-line
257 options for MesageGenerator initialisation
259 Calculates and stores default values used later on for
262 self.total_prefix_amount = args.amount
263 # Number of update messages left to be sent.
264 self.remaining_prefixes = self.total_prefix_amount
266 # New parameters initialisation
268 self.prefix_base_default = args.firstprefix
269 self.prefix_length_default = args.prefixlen
270 self.wr_prefixes_default = []
271 self.nlri_prefixes_default = []
272 self.version_default = 4
273 self.my_autonomous_system_default = args.asnumber
274 self.hold_time_default = args.holdtime # Local hold time.
275 self.bgp_identifier_default = int(args.myip)
276 self.next_hop_default = args.nexthop
277 self.single_update_default = args.updates == "single"
278 self.randomize_updates_default = args.updates == "random"
279 self.prefix_count_to_add_default = args.insert
280 self.prefix_count_to_del_default = args.withdraw
281 if self.prefix_count_to_del_default < 0:
282 self.prefix_count_to_del_default = 0
283 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
284 # total number of prefixes must grow to avoid infinite test loop
285 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
286 self.slot_size_default = self.prefix_count_to_add_default
287 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
288 self.results_file_name_default = args.results
289 self.performance_threshold_default = args.threshold
290 self.rfc4760 = args.rfc4760 == "yes"
291 # Default values used for randomized part
292 s1_slots = ((self.total_prefix_amount -
293 self.remaining_prefixes_threshold - 1) /
294 self.prefix_count_to_add_default + 1)
295 s2_slots = ((self.remaining_prefixes_threshold - 1) /
296 (self.prefix_count_to_add_default -
297 self.prefix_count_to_del_default) + 1)
299 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
300 s2_first_index = s1_slots * self.prefix_count_to_add_default
301 s2_last_index = (s2_first_index +
302 s2_slots * (self.prefix_count_to_add_default -
303 self.prefix_count_to_del_default) - 1)
304 self.slot_gap_default = ((self.total_prefix_amount -
305 self.remaining_prefixes_threshold - 1) /
306 self.prefix_count_to_add_default + 1)
307 self.randomize_lowest_default = s2_first_index
308 self.randomize_highest_default = s2_last_index
310 # Initialising counters
311 self.phase1_start_time = 0
312 self.phase1_stop_time = 0
313 self.phase2_start_time = 0
314 self.phase2_stop_time = 0
315 self.phase1_updates_sent = 0
316 self.phase2_updates_sent = 0
317 self.updates_sent = 0
319 self.log_info = args.loglevel <= logging.INFO
320 self.log_debug = args.loglevel <= logging.DEBUG
322 Flags needed for the MessageGenerator performance optimization.
323 Calling logger methods each iteration even with proper log level set
324 slows down significantly the MessageGenerator performance.
325 Measured total generation time (1M updates, dry run, error log level):
326 - logging based on basic logger features: 36,2s
327 - logging based on advanced logger features (lazy logging): 21,2s
328 - conditional calling of logger methods enclosed inside condition: 8,6s
331 logger.info("Generator initialisation")
332 logger.info(" Target total number of prefixes to be introduced: " +
333 str(self.total_prefix_amount))
334 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
335 str(self.prefix_length_default))
336 logger.info(" My Autonomous System number: " +
337 str(self.my_autonomous_system_default))
338 logger.info(" My Hold Time: " + str(self.hold_time_default))
339 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
340 logger.info(" Next Hop: " + str(self.next_hop_default))
341 logger.info(" Prefix count to be inserted at once: " +
342 str(self.prefix_count_to_add_default))
343 logger.info(" Prefix count to be withdrawn at once: " +
344 str(self.prefix_count_to_del_default))
345 logger.info(" Fast pre-fill up to " +
346 str(self.total_prefix_amount -
347 self.remaining_prefixes_threshold) + " prefixes")
348 logger.info(" Remaining number of prefixes to be processed " +
349 "in parallel with withdrawals: " +
350 str(self.remaining_prefixes_threshold))
351 logger.debug(" Prefix index range used after pre-fill procedure [" +
352 str(self.randomize_lowest_default) + ", " +
353 str(self.randomize_highest_default) + "]")
354 if self.single_update_default:
355 logger.info(" Common single UPDATE will be generated " +
356 "for both NLRI & WITHDRAWN lists")
358 logger.info(" Two separate UPDATEs will be generated " +
359 "for each NLRI & WITHDRAWN lists")
360 if self.randomize_updates_default:
361 logger.info(" Generation of UPDATE messages will be randomized")
362 logger.info(" Let\'s go ...\n")
364 # TODO: Notification for hold timer expiration can be handy.
366 def store_results(self, file_name=None, threshold=None):
367 """ Stores specified results into files based on file_name value.
370 :param file_name: Trailing (common) part of result file names
371 :param threshold: Minimum number of sent updates needed for each
372 result to be included into result csv file
373 (mainly needed because of the result accuracy)
377 # default values handling
378 if file_name is None:
379 file_name = self.results_file_name_default
380 if threshold is None:
381 threshold = self.performance_threshold_default
382 # performance calculation
383 if self.phase1_updates_sent >= threshold:
384 totals1 = self.phase1_updates_sent
385 performance1 = int(self.phase1_updates_sent /
386 (self.phase1_stop_time - self.phase1_start_time))
390 if self.phase2_updates_sent >= threshold:
391 totals2 = self.phase2_updates_sent
392 performance2 = int(self.phase2_updates_sent /
393 (self.phase2_stop_time - self.phase2_start_time))
398 logger.info("#" * 10 + " Final results " + "#" * 10)
399 logger.info("Number of iterations: " + str(self.iteration))
400 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
401 str(self.phase1_updates_sent))
402 logger.info("The pre-fill phase duration: " +
403 str(self.phase1_stop_time - self.phase1_start_time) + "s")
404 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
405 str(self.phase2_updates_sent))
406 logger.info("The 2nd test phase duration: " +
407 str(self.phase2_stop_time - self.phase2_start_time) + "s")
408 logger.info("Threshold for performance reporting: " + str(threshold))
411 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
412 " route(s) per UPDATE")
413 if self.single_update_default:
414 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
415 "/-" + str(self.prefix_count_to_del_default) +
416 " routes per UPDATE")
418 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
419 "/-" + str(self.prefix_count_to_del_default) +
420 " routes in two UPDATEs")
421 # collecting capacity and performance results
424 if totals1 is not None:
425 totals[phase1_label] = totals1
426 performance[phase1_label] = performance1
427 if totals2 is not None:
428 totals[phase2_label] = totals2
429 performance[phase2_label] = performance2
430 self.write_results_to_file(totals, "totals-" + file_name)
431 self.write_results_to_file(performance, "performance-" + file_name)
433 def write_results_to_file(self, results, file_name):
434 """Writes results to the csv plot file consumable by Jenkins.
437 :param file_name: Name of the (csv) file to be created
443 f = open(file_name, "wt")
445 for key in sorted(results):
446 first_line += key + ", "
447 second_line += str(results[key]) + ", "
448 first_line = first_line[:-2]
449 second_line = second_line[:-2]
450 f.write(first_line + "\n")
451 f.write(second_line + "\n")
452 logger.info("Message generator performance results stored in " +
454 logger.info(" " + first_line)
455 logger.info(" " + second_line)
459 # Return pseudo-randomized (reproducible) index for selected range
460 def randomize_index(self, index, lowest=None, highest=None):
461 """Calculates pseudo-randomized index from selected range.
464 :param index: input index
465 :param lowest: the lowes index from the randomized area
466 :param highest: the highest index from the randomized area
468 :return: the (pseudo)randomized index
470 Created just as a fame for future generator enhancement.
472 # default values handling
474 lowest = self.randomize_lowest_default
476 highest = self.randomize_highest_default
478 if (index >= lowest) and (index <= highest):
479 # we are in the randomized range -> shuffle it inside
480 # the range (now just reverse the order)
481 new_index = highest - (index - lowest)
483 # we are out of the randomized range -> nothing to do
487 # Get list of prefixes
488 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
489 prefix_len=None, prefix_count=None, randomize=None):
490 """Generates list of IP address prefixes.
493 :param slot_index: index of group of prefix addresses
494 :param slot_size: size of group of prefix addresses
495 in [number of included prefixes]
496 :param prefix_base: IP address of the first prefix
497 (slot_index = 0, prefix_index = 0)
498 :param prefix_len: length of the prefix in bites
499 (the same as size of netmask)
500 :param prefix_count: number of prefixes to be returned
501 from the specified slot
503 :return: list of generated IP address prefixes
505 # default values handling
506 if slot_size is None:
507 slot_size = self.slot_size_default
508 if prefix_base is None:
509 prefix_base = self.prefix_base_default
510 if prefix_len is None:
511 prefix_len = self.prefix_length_default
512 if prefix_count is None:
513 prefix_count = slot_size
514 if randomize is None:
515 randomize = self.randomize_updates_default
516 # generating list of prefixes
519 prefix_gap = 2 ** (32 - prefix_len)
520 for i in range(prefix_count):
521 prefix_index = slot_index * slot_size + i
523 prefix_index = self.randomize_index(prefix_index)
524 indexes.append(prefix_index)
525 prefixes.append(prefix_base + prefix_index * prefix_gap)
527 logger.debug(" Prefix slot index: " + str(slot_index))
528 logger.debug(" Prefix slot size: " + str(slot_size))
529 logger.debug(" Prefix count: " + str(prefix_count))
530 logger.debug(" Prefix indexes: " + str(indexes))
531 logger.debug(" Prefix list: " + str(prefixes))
534 def compose_update_message(self, prefix_count_to_add=None,
535 prefix_count_to_del=None):
536 """Composes an UPDATE message
539 :param prefix_count_to_add: # of prefixes to put into NLRI list
540 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
542 :return: encoded UPDATE message in HEX
544 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
545 lists or common message wich includes both prefix lists.
546 Updates global counters.
548 # default values handling
549 if prefix_count_to_add is None:
550 prefix_count_to_add = self.prefix_count_to_add_default
551 if prefix_count_to_del is None:
552 prefix_count_to_del = self.prefix_count_to_del_default
554 if self.log_info and not (self.iteration % 1000):
555 logger.info("Iteration: " + str(self.iteration) +
556 " - total remaining prefixes: " +
557 str(self.remaining_prefixes))
559 logger.debug("#" * 10 + " Iteration: " +
560 str(self.iteration) + " " + "#" * 10)
561 logger.debug("Remaining prefixes: " +
562 str(self.remaining_prefixes))
563 # scenario type & one-shot counter
564 straightforward_scenario = (self.remaining_prefixes >
565 self.remaining_prefixes_threshold)
566 if straightforward_scenario:
567 prefix_count_to_del = 0
569 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
570 if not self.phase1_start_time:
571 self.phase1_start_time = time.time()
574 logger.debug("--- COMBINED SCENARIO ---")
575 if not self.phase2_start_time:
576 self.phase2_start_time = time.time()
577 # tailor the number of prefixes if needed
578 prefix_count_to_add = (prefix_count_to_del +
579 min(prefix_count_to_add - prefix_count_to_del,
580 self.remaining_prefixes))
581 # prefix slots selection for insertion and withdrawal
582 slot_index_to_add = self.iteration
583 slot_index_to_del = slot_index_to_add - self.slot_gap_default
584 # getting lists of prefixes for insertion in this iteration
586 logger.debug("Prefixes to be inserted in this iteration:")
587 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
588 prefix_count=prefix_count_to_add)
589 # getting lists of prefixes for withdrawal in this iteration
591 logger.debug("Prefixes to be withdrawn in this iteration:")
592 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
593 prefix_count=prefix_count_to_del)
594 # generating the mesage
595 if self.single_update_default:
596 # Send prefixes to be introduced and withdrawn
597 # in one UPDATE message
598 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
599 nlri_prefixes=prefix_list_to_add)
601 # Send prefixes to be introduced and withdrawn
602 # in separate UPDATE messages (if needed)
603 msg_out = self.update_message(wr_prefixes=[],
604 nlri_prefixes=prefix_list_to_add)
605 if prefix_count_to_del:
606 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
608 # updating counters - who knows ... maybe I am last time here ;)
609 if straightforward_scenario:
610 self.phase1_stop_time = time.time()
611 self.phase1_updates_sent = self.updates_sent
613 self.phase2_stop_time = time.time()
614 self.phase2_updates_sent = (self.updates_sent -
615 self.phase1_updates_sent)
616 # updating totals for the next iteration
618 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
619 # returning the encoded message
622 # Section of message encoders
624 def open_message(self, version=None, my_autonomous_system=None,
625 hold_time=None, bgp_identifier=None):
626 """Generates an OPEN Message (rfc4271#section-4.2)
629 :param version: see the rfc4271#section-4.2
630 :param my_autonomous_system: see the rfc4271#section-4.2
631 :param hold_time: see the rfc4271#section-4.2
632 :param bgp_identifier: see the rfc4271#section-4.2
634 :return: encoded OPEN message in HEX
637 # Default values handling
639 version = self.version_default
640 if my_autonomous_system is None:
641 my_autonomous_system = self.my_autonomous_system_default
642 if hold_time is None:
643 hold_time = self.hold_time_default
644 if bgp_identifier is None:
645 bgp_identifier = self.bgp_identifier_default
648 marker_hex = "\xFF" * 16
652 type_hex = struct.pack("B", type)
655 version_hex = struct.pack("B", version)
657 # my_autonomous_system
658 # AS_TRANS value, 23456 decadic.
659 my_autonomous_system_2_bytes = 23456
660 # AS number is mappable to 2 bytes
661 if my_autonomous_system < 65536:
662 my_autonomous_system_2_bytes = my_autonomous_system
663 my_autonomous_system_hex_2_bytes = struct.pack(">H",
664 my_autonomous_system)
667 hold_time_hex = struct.pack(">H", hold_time)
670 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
672 # Optional Parameters
673 optional_parameters_hex = ""
675 optional_parameter_hex = (
676 "\x02" # Param type ("Capability Ad")
677 "\x06" # Length (6 bytes)
678 "\x01" # Capability type (NLRI Unicast),
679 # see RFC 4760, secton 8
680 "\x04" # Capability value length
681 "\x00\x01" # AFI (Ipv4)
683 "\x01" # SAFI (Unicast)
685 optional_parameters_hex += optional_parameter_hex
687 optional_parameter_hex = (
688 "\x02" # Param type ("Capability Ad")
689 "\x06" # Length (6 bytes)
690 "\x41" # "32 bit AS Numbers Support"
691 # (see RFC 6793, section 3)
692 "\x04" # Capability value length
694 optional_parameter_hex += (
695 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
697 optional_parameters_hex += optional_parameter_hex
699 # Optional Parameters Length
700 optional_parameters_length = len(optional_parameters_hex)
701 optional_parameters_length_hex = struct.pack("B",
702 optional_parameters_length)
704 # Length (big-endian)
706 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
707 len(my_autonomous_system_hex_2_bytes) +
708 len(hold_time_hex) + len(bgp_identifier_hex) +
709 len(optional_parameters_length_hex) +
710 len(optional_parameters_hex)
712 length_hex = struct.pack(">H", length)
720 my_autonomous_system_hex_2_bytes +
723 optional_parameters_length_hex +
724 optional_parameters_hex
728 logger.debug("OPEN message encoding")
729 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
730 logger.debug(" Length=" + str(length) + " (0x" +
731 binascii.hexlify(length_hex) + ")")
732 logger.debug(" Type=" + str(type) + " (0x" +
733 binascii.hexlify(type_hex) + ")")
734 logger.debug(" Version=" + str(version) + " (0x" +
735 binascii.hexlify(version_hex) + ")")
736 logger.debug(" My Autonomous System=" +
737 str(my_autonomous_system_2_bytes) + " (0x" +
738 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
740 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
741 binascii.hexlify(hold_time_hex) + ")")
742 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
743 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
744 logger.debug(" Optional Parameters Length=" +
745 str(optional_parameters_length) + " (0x" +
746 binascii.hexlify(optional_parameters_length_hex) +
748 logger.debug(" Optional Parameters=0x" +
749 binascii.hexlify(optional_parameters_hex))
750 logger.debug("OPEN message encoded: 0x%s",
751 binascii.b2a_hex(message_hex))
755 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
756 wr_prefix_length=None, nlri_prefix_length=None,
757 my_autonomous_system=None, next_hop=None):
758 """Generates an UPDATE Message (rfc4271#section-4.3)
761 :param wr_prefixes: see the rfc4271#section-4.3
762 :param nlri_prefixes: see the rfc4271#section-4.3
763 :param wr_prefix_length: see the rfc4271#section-4.3
764 :param nlri_prefix_length: see the rfc4271#section-4.3
765 :param my_autonomous_system: see the rfc4271#section-4.3
766 :param next_hop: see the rfc4271#section-4.3
768 :return: encoded UPDATE message in HEX
771 # Default values handling
772 if wr_prefixes is None:
773 wr_prefixes = self.wr_prefixes_default
774 if nlri_prefixes is None:
775 nlri_prefixes = self.nlri_prefixes_default
776 if wr_prefix_length is None:
777 wr_prefix_length = self.prefix_length_default
778 if nlri_prefix_length is None:
779 nlri_prefix_length = self.prefix_length_default
780 if my_autonomous_system is None:
781 my_autonomous_system = self.my_autonomous_system_default
783 next_hop = self.next_hop_default
786 marker_hex = "\xFF" * 16
790 type_hex = struct.pack("B", type)
793 bytes = ((wr_prefix_length - 1) / 8) + 1
794 withdrawn_routes_hex = ""
795 for prefix in wr_prefixes:
796 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
797 struct.pack(">I", int(prefix))[:bytes])
798 withdrawn_routes_hex += withdrawn_route_hex
800 # Withdrawn Routes Length
801 withdrawn_routes_length = len(withdrawn_routes_hex)
802 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
804 # TODO: to replace hardcoded string by encoding?
806 if nlri_prefixes != []:
807 path_attributes_hex = (
808 "\x40" # Flags ("Well-Known")
809 "\x01" # Type (ORIGIN)
812 "\x40" # Flags ("Well-Known")
813 "\x02" # Type (AS_PATH)
815 "\x02" # AS segment type (AS_SEQUENCE)
816 "\x01" # AS segment length (1)
818 my_as_hex = struct.pack(">I", my_autonomous_system)
819 path_attributes_hex += my_as_hex # AS segment (4 bytes)
820 path_attributes_hex += (
821 "\x40" # Flags ("Well-Known")
822 "\x03" # Type (NEXT_HOP)
825 next_hop_hex = struct.pack(">I", int(next_hop))
826 path_attributes_hex += (
827 next_hop_hex # IP address of the next hop (4 bytes)
830 path_attributes_hex = ""
832 # Total Path Attributes Length
833 total_path_attributes_length = len(path_attributes_hex)
834 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
836 # Network Layer Reachability Information
837 bytes = ((nlri_prefix_length - 1) / 8) + 1
839 for prefix in nlri_prefixes:
840 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
841 struct.pack(">I", int(prefix))[:bytes])
842 nlri_hex += nlri_prefix_hex
844 # Length (big-endian)
846 len(marker_hex) + 2 + len(type_hex) +
847 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
848 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
850 length_hex = struct.pack(">H", length)
857 withdrawn_routes_length_hex +
858 withdrawn_routes_hex +
859 total_path_attributes_length_hex +
860 path_attributes_hex +
865 logger.debug("UPDATE message encoding")
866 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
867 logger.debug(" Length=" + str(length) + " (0x" +
868 binascii.hexlify(length_hex) + ")")
869 logger.debug(" Type=" + str(type) + " (0x" +
870 binascii.hexlify(type_hex) + ")")
871 logger.debug(" withdrawn_routes_length=" +
872 str(withdrawn_routes_length) + " (0x" +
873 binascii.hexlify(withdrawn_routes_length_hex) + ")")
874 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
875 str(wr_prefix_length) + " (0x" +
876 binascii.hexlify(withdrawn_routes_hex) + ")")
877 logger.debug(" Total Path Attributes Length=" +
878 str(total_path_attributes_length) + " (0x" +
879 binascii.hexlify(total_path_attributes_length_hex) +
881 logger.debug(" Path Attributes=" + "(0x" +
882 binascii.hexlify(path_attributes_hex) + ")")
883 logger.debug(" Network Layer Reachability Information=" +
884 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
885 " (0x" + binascii.hexlify(nlri_hex) + ")")
886 logger.debug("UPDATE message encoded: 0x" +
887 binascii.b2a_hex(message_hex))
890 self.updates_sent += 1
891 # returning encoded message
894 def notification_message(self, error_code, error_subcode, data_hex=""):
895 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
898 :param error_code: see the rfc4271#section-4.5
899 :param error_subcode: see the rfc4271#section-4.5
900 :param data_hex: see the rfc4271#section-4.5
902 :return: encoded NOTIFICATION message in HEX
906 marker_hex = "\xFF" * 16
910 type_hex = struct.pack("B", type)
913 error_code_hex = struct.pack("B", error_code)
916 error_subcode_hex = struct.pack("B", error_subcode)
918 # Length (big-endian)
919 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
920 len(error_subcode_hex) + len(data_hex))
921 length_hex = struct.pack(">H", length)
923 # NOTIFICATION Message
934 logger.debug("NOTIFICATION message encoding")
935 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
936 logger.debug(" Length=" + str(length) + " (0x" +
937 binascii.hexlify(length_hex) + ")")
938 logger.debug(" Type=" + str(type) + " (0x" +
939 binascii.hexlify(type_hex) + ")")
940 logger.debug(" Error Code=" + str(error_code) + " (0x" +
941 binascii.hexlify(error_code_hex) + ")")
942 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
943 binascii.hexlify(error_subcode_hex) + ")")
944 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
945 logger.debug("NOTIFICATION message encoded: 0x%s",
946 binascii.b2a_hex(message_hex))
950 def keepalive_message(self):
951 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
954 :return: encoded KEEP ALIVE message in HEX
958 marker_hex = "\xFF" * 16
962 type_hex = struct.pack("B", type)
964 # Length (big-endian)
965 length = len(marker_hex) + 2 + len(type_hex)
966 length_hex = struct.pack(">H", length)
976 logger.debug("KEEP ALIVE message encoding")
977 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
978 logger.debug(" Length=" + str(length) + " (0x" +
979 binascii.hexlify(length_hex) + ")")
980 logger.debug(" Type=" + str(type) + " (0x" +
981 binascii.hexlify(type_hex) + ")")
982 logger.debug("KEEP ALIVE message encoded: 0x%s",
983 binascii.b2a_hex(message_hex))
988 class TimeTracker(object):
989 """Class for tracking timers, both for my keepalives and
993 def __init__(self, msg_in):
994 """Initialisation. based on defaults and OPEN message from peer.
997 msg_in: the OPEN message received from peer.
999 # Note: Relative time is always named timedelta, to stress that
1000 # the (non-delta) time is absolute.
1001 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1002 # Upper bound for being stuck in the same state, we should
1003 # at least report something before continuing.
1004 # Negotiate the hold timer by taking the smaller
1005 # of the 2 values (mine and the peer's).
1006 hold_timedelta = 180 # Not an attribute of self yet.
1007 # TODO: Make the default value configurable,
1008 # default value could mirror what peer said.
1009 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1010 if hold_timedelta > peer_hold_timedelta:
1011 hold_timedelta = peer_hold_timedelta
1012 if hold_timedelta != 0 and hold_timedelta < 3:
1013 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1014 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1015 self.hold_timedelta = hold_timedelta
1016 # If we do not hear from peer this long, we assume it has died.
1017 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1018 # Upper limit for duration between messages, to avoid being
1019 # declared to be dead.
1020 # The same as calling snapshot(), but also declares a field.
1021 self.snapshot_time = time.time()
1022 # Sometimes we need to store time. This is where to get
1023 # the value from afterwards. Time_keepalive may be too strict.
1024 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1025 # At this time point, peer will be declared dead.
1026 self.my_keepalive_time = None # to be set later
1027 # At this point, we should be sending keepalive message.
1030 """Store current time in instance data to use later."""
1031 # Read as time before something interesting was called.
1032 self.snapshot_time = time.time()
1034 def reset_peer_hold_time(self):
1035 """Move hold time to future as peer has just proven it still lives."""
1036 self.peer_hold_time = time.time() + self.hold_timedelta
1038 # Some methods could rely on self.snapshot_time, but it is better
1039 # to require user to provide it explicitly.
1040 def reset_my_keepalive_time(self, keepalive_time):
1041 """Calculate and set the next my KEEP ALIVE timeout time
1044 :keepalive_time: the initial value of the KEEP ALIVE timer
1046 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1048 def is_time_for_my_keepalive(self):
1049 """Check for my KEEP ALIVE timeout occurence"""
1050 if self.hold_timedelta == 0:
1052 return self.snapshot_time >= self.my_keepalive_time
1054 def get_next_event_time(self):
1055 """Set the time of the next expected or to be sent KEEP ALIVE"""
1056 if self.hold_timedelta == 0:
1057 return self.snapshot_time + 86400
1058 return min(self.my_keepalive_time, self.peer_hold_time)
1060 def check_peer_hold_time(self, snapshot_time):
1061 """Raise error if nothing was read from peer until specified time."""
1062 # Hold time = 0 means keepalive checking off.
1063 if self.hold_timedelta != 0:
1064 # time.time() may be too strict
1065 if snapshot_time > self.peer_hold_time:
1066 logger.error("Peer has overstepped the hold timer.")
1067 raise RuntimeError("Peer has overstepped the hold timer.")
1068 # TODO: Include hold_timedelta?
1069 # TODO: Add notification sending (attempt). That means
1070 # move to write tracker.
1073 class ReadTracker(object):
1074 """Class for tracking read of mesages chunk by chunk and
1078 def __init__(self, bgp_socket, timer):
1079 """The reader initialisation.
1082 bgp_socket: socket to be used for sending
1083 timer: timer to be used for scheduling
1085 # References to outside objects.
1086 self.socket = bgp_socket
1088 # BGP marker length plus length field length.
1089 self.header_length = 18
1090 # TODO: make it class (constant) attribute
1091 # Computation of where next chunk ends depends on whether
1092 # we are beyond length field.
1093 self.reading_header = True
1094 # Countdown towards next size computation.
1095 self.bytes_to_read = self.header_length
1096 # Incremental buffer for message under read.
1098 # Initialising counters
1099 self.updates_received = 0
1100 self.prefixes_introduced = 0
1101 self.prefixes_withdrawn = 0
1102 self.rx_idle_time = 0
1103 self.rx_activity_detected = True
1105 def read_message_chunk(self):
1106 """Read up to one message
1109 Currently it does not return anything.
1111 # TODO: We could return the whole message, currently not needed.
1112 # We assume the socket is readable.
1113 chunk_message = self.socket.recv(self.bytes_to_read)
1114 self.msg_in += chunk_message
1115 self.bytes_to_read -= len(chunk_message)
1116 # TODO: bytes_to_read < 0 is not possible, right?
1117 if not self.bytes_to_read:
1118 # Finished reading a logical block.
1119 if self.reading_header:
1120 # The logical block was a BGP header.
1121 # Now we know the size of the message.
1122 self.reading_header = False
1123 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1125 else: # We have finished reading the body of the message.
1126 # Peer has just proven it is still alive.
1127 self.timer.reset_peer_hold_time()
1128 # TODO: Do we want to count received messages?
1129 # This version ignores the received message.
1130 # TODO: Should we do validation and exit on anything
1131 # besides update or keepalive?
1132 # Prepare state for reading another message.
1133 message_type_hex = self.msg_in[self.header_length]
1134 if message_type_hex == "\x01":
1135 logger.info("OPEN message received: 0x%s",
1136 binascii.b2a_hex(self.msg_in))
1137 elif message_type_hex == "\x02":
1138 logger.debug("UPDATE message received: 0x%s",
1139 binascii.b2a_hex(self.msg_in))
1140 self.decode_update_message(self.msg_in)
1141 elif message_type_hex == "\x03":
1142 logger.info("NOTIFICATION message received: 0x%s",
1143 binascii.b2a_hex(self.msg_in))
1144 elif message_type_hex == "\x04":
1145 logger.info("KEEP ALIVE message received: 0x%s",
1146 binascii.b2a_hex(self.msg_in))
1148 logger.warning("Unexpected message received: 0x%s",
1149 binascii.b2a_hex(self.msg_in))
1151 self.reading_header = True
1152 self.bytes_to_read = self.header_length
1153 # We should not act upon peer_hold_time if we are reading
1154 # something right now.
1157 def decode_path_attributes(self, path_attributes_hex):
1158 """Decode the Path Attributes field (rfc4271#section-4.3)
1161 :path_attributes: path_attributes field to be decoded in hex
1165 hex_to_decode = path_attributes_hex
1167 while len(hex_to_decode):
1168 attr_flags_hex = hex_to_decode[0]
1169 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1170 # attr_optional_bit = attr_flags & 128
1171 # attr_transitive_bit = attr_flags & 64
1172 # attr_partial_bit = attr_flags & 32
1173 attr_extended_length_bit = attr_flags & 16
1175 attr_type_code_hex = hex_to_decode[1]
1176 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1178 if attr_extended_length_bit:
1179 attr_length_hex = hex_to_decode[2:4]
1180 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1181 attr_value_hex = hex_to_decode[4:4 + attr_length]
1182 hex_to_decode = hex_to_decode[4 + attr_length:]
1184 attr_length_hex = hex_to_decode[2]
1185 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1186 attr_value_hex = hex_to_decode[3:3 + attr_length]
1187 hex_to_decode = hex_to_decode[3 + attr_length:]
1189 if attr_type_code == 1:
1190 logger.debug("Attribute type = 1 (ORIGIN, flags:0x%s)",
1191 binascii.b2a_hex(attr_flags_hex))
1192 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1193 elif attr_type_code == 2:
1194 logger.debug("Attribute type = 2 (AS_PATH, flags:0x%s)",
1195 binascii.b2a_hex(attr_flags_hex))
1196 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1197 elif attr_type_code == 3:
1198 logger.debug("Attribute type = 3 (NEXT_HOP, flags:0x%s)",
1199 binascii.b2a_hex(attr_flags_hex))
1200 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1201 elif attr_type_code == 4:
1202 logger.debug("Attribute type = 4 (MULTI_EXIT_DISC, flags:0x%s)",
1203 binascii.b2a_hex(attr_flags_hex))
1204 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1205 elif attr_type_code == 5:
1206 logger.debug("Attribute type = 5 (LOCAL_PREF, flags:0x%s)",
1207 binascii.b2a_hex(attr_flags_hex))
1208 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1209 elif attr_type_code == 6:
1210 logger.debug("Attribute type = 6 (ATOMIC_AGGREGATE, flags:0x%s)",
1211 binascii.b2a_hex(attr_flags_hex))
1212 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1213 elif attr_type_code == 7:
1214 logger.debug("Attribute type = 7 (AGGREGATOR, flags:0x%s)",
1215 binascii.b2a_hex(attr_flags_hex))
1216 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1217 elif attr_type_code == 14: # rfc4760#section-3
1218 logger.debug("Attribute type = 14 (MP_REACH_NLRI, flags:0x%s)",
1219 binascii.b2a_hex(attr_flags_hex))
1220 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1221 address_family_identifier_hex = attr_value_hex[0:2]
1222 logger.debug(" Address Family Identifier = 0x%s",
1223 binascii.b2a_hex(address_family_identifier_hex))
1224 subsequent_address_family_identifier_hex = attr_value_hex[2]
1225 logger.debug(" Subsequent Address Family Identifier = 0x%s",
1226 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1227 next_hop_netaddr_len_hex = attr_value_hex[3]
1228 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1229 logger.debug(" Length of Next Hop Network Address = 0x%s (%s)",
1230 binascii.b2a_hex(next_hop_netaddr_len_hex),
1231 next_hop_netaddr_len)
1232 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1233 logger.debug(" Network Address of Next Hop = 0x%s",
1234 binascii.b2a_hex(next_hop_netaddr_hex))
1235 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1236 logger.debug(" Reserved = 0x%s",
1237 binascii.b2a_hex(reserved_hex))
1238 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1239 logger.debug(" Network Layer Reachability Information = 0x%s",
1240 binascii.b2a_hex(nlri_hex))
1241 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1242 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1243 for prefix in nlri_prefix_list:
1244 logger.debug(" nlri_prefix_received: %s", prefix)
1245 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1246 elif attr_type_code == 15: # rfc4760#section-4
1247 logger.debug("Attribute type = 15 (MP_UNREACH_NLRI, flags:0x%s)",
1248 binascii.b2a_hex(attr_flags_hex))
1249 logger.debug("Attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1250 address_family_identifier_hex = attr_value_hex[0:2]
1251 logger.debug(" Address Family Identifier = 0x%s",
1252 binascii.b2a_hex(address_family_identifier_hex))
1253 subsequent_address_family_identifier_hex = attr_value_hex[2]
1254 logger.debug(" Subsequent Address Family Identifier = 0x%s",
1255 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1256 wd_hex = attr_value_hex[3:]
1257 logger.debug(" Withdrawn Routes = 0x%s",
1258 binascii.b2a_hex(wd_hex))
1259 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1260 logger.debug(" Withdrawn routes prefix list: %s",
1262 for prefix in wdr_prefix_list:
1263 logger.debug(" withdrawn_prefix_received: %s", prefix)
1264 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1266 logger.debug("Unknown attribute type = %s, flags:0x%s)", attr_type_code,
1267 binascii.b2a_hex(attr_flags_hex))
1268 logger.debug("Unknown attribute value = 0x%s", binascii.b2a_hex(attr_value_hex))
1271 def decode_update_message(self, msg):
1272 """Decode an UPDATE message (rfc4271#section-4.3)
1275 :msg: message to be decoded in hex
1279 logger.debug("Decoding update message:")
1280 # message header - marker
1281 marker_hex = msg[:16]
1282 logger.debug("Message header marker: 0x%s",
1283 binascii.b2a_hex(marker_hex))
1284 # message header - message length
1285 msg_length_hex = msg[16:18]
1286 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1287 logger.debug("Message lenght: 0x%s (%s)",
1288 binascii.b2a_hex(msg_length_hex), msg_length)
1289 # message header - message type
1290 msg_type_hex = msg[18:19]
1291 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1293 logger.debug("Message type: 0x%s (update)",
1294 binascii.b2a_hex(msg_type_hex))
1295 # withdrawn routes length
1296 wdr_length_hex = msg[19:21]
1297 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1298 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1299 binascii.b2a_hex(wdr_length_hex), wdr_length)
1301 wdr_hex = msg[21:21 + wdr_length]
1302 logger.debug("Withdrawn routes: 0x%s",
1303 binascii.b2a_hex(wdr_hex))
1304 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1305 logger.debug("Withdrawn routes prefix list: %s",
1307 for prefix in wdr_prefix_list:
1308 logger.debug("withdrawn_prefix_received: %s", prefix)
1309 # total path attribute length
1310 total_pa_length_offset = 21 + wdr_length
1311 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1312 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1313 logger.debug("Total path attribute lenght: 0x%s (%s)",
1314 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1316 pa_offset = total_pa_length_offset + 2
1317 pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1318 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1319 self.decode_path_attributes(pa_hex)
1320 # network layer reachability information length
1321 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1322 logger.debug("Calculated NLRI length: %s", nlri_length)
1323 # network layer reachability information
1324 nlri_offset = pa_offset + total_pa_length
1325 nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1326 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1327 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1328 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1329 for prefix in nlri_prefix_list:
1330 logger.debug("nlri_prefix_received: %s", prefix)
1332 self.updates_received += 1
1333 self.prefixes_introduced += len(nlri_prefix_list)
1334 self.prefixes_withdrawn += len(wdr_prefix_list)
1336 logger.error("Unexpeced message type 0x%s in 0x%s",
1337 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1339 def wait_for_read(self):
1340 """Read message until timeout (next expected event).
1343 Used when no more updates has to be sent to avoid busy-wait.
1344 Currently it does not return anything.
1346 # Compute time to the first predictable state change
1347 event_time = self.timer.get_next_event_time()
1348 # snapshot_time would be imprecise
1349 wait_timedelta = min(event_time - time.time(), 10)
1350 if wait_timedelta < 0:
1351 # The program got around to waiting to an event in "very near
1352 # future" so late that it became a "past" event, thus tell
1353 # "select" to not wait at all. Passing negative timedelta to
1354 # select() would lead to either waiting forever (for -1) or
1355 # select.error("Invalid parameter") (for everything else).
1357 # And wait for event or something to read.
1359 if not self.rx_activity_detected or not (self.updates_received % 100):
1360 # right time to write statistics to the log (not for every update and
1361 # not too frequently to avoid having large log files)
1362 logger.info("total_received_update_message_counter: %s",
1363 self.updates_received)
1364 logger.info("total_received_nlri_prefix_counter: %s",
1365 self.prefixes_introduced)
1366 logger.info("total_received_withdrawn_prefix_counter: %s",
1367 self.prefixes_withdrawn)
1369 start_time = time.time()
1370 select.select([self.socket], [], [self.socket], wait_timedelta)
1371 timedelta = time.time() - start_time
1372 self.rx_idle_time += timedelta
1373 self.rx_activity_detected = timedelta < 1
1375 if not self.rx_activity_detected or not (self.updates_received % 100):
1376 # right time to write statistics to the log (not for every update and
1377 # not too frequently to avoid having large log files)
1378 logger.info("... idle for %.3fs", timedelta)
1379 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1383 class WriteTracker(object):
1384 """Class tracking enqueueing messages and sending chunks of them."""
1386 def __init__(self, bgp_socket, generator, timer):
1387 """The writter initialisation.
1390 bgp_socket: socket to be used for sending
1391 generator: generator to be used for message generation
1392 timer: timer to be used for scheduling
1394 # References to outside objects,
1395 self.socket = bgp_socket
1396 self.generator = generator
1398 # Really new fields.
1399 # TODO: Would attribute docstrings add anything substantial?
1400 self.sending_message = False
1401 self.bytes_to_send = 0
1404 def enqueue_message_for_sending(self, message):
1405 """Enqueue message and change state.
1408 message: message to be enqueued into the msg_out buffer
1410 self.msg_out += message
1411 self.bytes_to_send += len(message)
1412 self.sending_message = True
1414 def send_message_chunk_is_whole(self):
1415 """Send enqueued data from msg_out buffer
1418 :return: true if no remaining data to send
1420 # We assume there is a msg_out to send and socket is writable.
1421 # print "going to send", repr(self.msg_out)
1422 self.timer.snapshot()
1423 bytes_sent = self.socket.send(self.msg_out)
1424 # Forget the part of message that was sent.
1425 self.msg_out = self.msg_out[bytes_sent:]
1426 self.bytes_to_send -= bytes_sent
1427 if not self.bytes_to_send:
1428 # TODO: Is it possible to hit negative bytes_to_send?
1429 self.sending_message = False
1430 # We should have reset hold timer on peer side.
1431 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1432 # The possible reason for not prioritizing reads is gone.
1437 class StateTracker(object):
1438 """Main loop has state so complex it warrants this separate class."""
1440 def __init__(self, bgp_socket, generator, timer):
1441 """The state tracker initialisation.
1444 bgp_socket: socket to be used for sending / receiving
1445 generator: generator to be used for message generation
1446 timer: timer to be used for scheduling
1448 # References to outside objects.
1449 self.socket = bgp_socket
1450 self.generator = generator
1453 self.reader = ReadTracker(bgp_socket, timer)
1454 self.writer = WriteTracker(bgp_socket, generator, timer)
1455 # Prioritization state.
1456 self.prioritize_writing = False
1457 # In general, we prioritize reading over writing. But in order
1458 # not to get blocked by neverending reads, we should
1459 # check whether we are not risking running out of holdtime.
1460 # So in some situations, this field is set to True to attempt
1461 # finishing sending a message, after which this field resets
1463 # TODO: Alternative is to switch fairly between reading and
1464 # writing (called round robin from now on).
1465 # Message counting is done in generator.
1467 def perform_one_loop_iteration(self):
1468 """ The main loop iteration
1471 Calculates priority, resolves all conditions, calls
1472 appropriate method and returns to caller to repeat.
1474 self.timer.snapshot()
1475 if not self.prioritize_writing:
1476 if self.timer.is_time_for_my_keepalive():
1477 if not self.writer.sending_message:
1478 # We need to schedule a keepalive ASAP.
1479 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1480 logger.info("KEEP ALIVE is sent.")
1481 # We are sending a message now, so let's prioritize it.
1482 self.prioritize_writing = True
1483 # Now we know what our priorities are, we have to check
1484 # which actions are available.
1485 # socket.socket() returns three lists,
1486 # we store them to list of lists.
1487 list_list = select.select([self.socket], [self.socket], [self.socket],
1488 self.timer.report_timedelta)
1489 read_list, write_list, except_list = list_list
1490 # Lists are unpacked, each is either [] or [self.socket],
1491 # so we will test them as boolean.
1493 logger.error("Exceptional state on the socket.")
1494 raise RuntimeError("Exceptional state on socket", self.socket)
1495 # We will do either read or write.
1496 if not (self.prioritize_writing and write_list):
1497 # Either we have no reason to rush writes,
1498 # or the socket is not writable.
1499 # We are focusing on reading here.
1500 if read_list: # there is something to read indeed
1501 # In this case we want to read chunk of message
1502 # and repeat the select,
1503 self.reader.read_message_chunk()
1505 # We were focusing on reading, but nothing to read was there.
1506 # Good time to check peer for hold timer.
1507 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1508 # Quiet on the read front, we can have attempt to write.
1510 # Either we really want to reset peer's view of our hold
1511 # timer, or there was nothing to read.
1512 # Were we in the middle of sending a message?
1513 if self.writer.sending_message:
1514 # Was it the end of a message?
1515 whole = self.writer.send_message_chunk_is_whole()
1516 # We were pressed to send something and we did it.
1517 if self.prioritize_writing and whole:
1518 # We prioritize reading again.
1519 self.prioritize_writing = False
1521 # Finally to check if still update messages to be generated.
1522 if self.generator.remaining_prefixes:
1523 msg_out = self.generator.compose_update_message()
1524 if not self.generator.remaining_prefixes:
1525 # We have just finished update generation,
1526 # end-of-rib is due.
1527 logger.info("All update messages generated.")
1528 logger.info("Storing performance results.")
1529 self.generator.store_results()
1530 logger.info("Finally an END-OF-RIB is sent.")
1531 msg_out += self.generator.update_message(wr_prefixes=[],
1533 self.writer.enqueue_message_for_sending(msg_out)
1534 # Attempt for real sending to be done in next iteration.
1536 # Nothing to write anymore.
1537 # To avoid busy loop, we do idle waiting here.
1538 self.reader.wait_for_read()
1540 # We can neither read nor write.
1541 logger.warning("Input and output both blocked for " +
1542 str(self.timer.report_timedelta) + " seconds.")
1543 # FIXME: Are we sure select has been really waiting
1548 def create_logger(loglevel, logfile):
1549 """Create logger object
1552 :loglevel: log level
1553 :logfile: log file name
1555 :return: logger object
1557 logger = logging.getLogger("logger")
1558 log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1559 console_handler = logging.StreamHandler()
1560 file_handler = logging.FileHandler(logfile, mode="w")
1561 console_handler.setFormatter(log_formatter)
1562 file_handler.setFormatter(log_formatter)
1563 logger.addHandler(console_handler)
1564 logger.addHandler(file_handler)
1565 logger.setLevel(loglevel)
1570 """One time initialisation and iterations looping.
1572 Establish BGP connection and run iterations.
1575 :arguments: Command line arguments
1579 bgp_socket = establish_connection(arguments)
1580 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1581 # Receive open message before sending anything.
1582 # FIXME: Add parameter to send default open message first,
1583 # to work with "you first" peers.
1584 msg_in = read_open_message(bgp_socket)
1585 timer = TimeTracker(msg_in)
1586 generator = MessageGenerator(arguments)
1587 msg_out = generator.open_message()
1588 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1589 # Send our open message to the peer.
1590 bgp_socket.send(msg_out)
1591 # Wait for confirming keepalive.
1592 # TODO: Surely in just one packet?
1593 # Using exact keepalive length to not to see possible updates.
1594 msg_in = bgp_socket.recv(19)
1595 if msg_in != generator.keepalive_message():
1596 logger.error("Open not confirmed by keepalive, instead got " +
1597 binascii.hexlify(msg_in))
1598 raise MessageError("Open not confirmed by keepalive, instead got",
1600 timer.reset_peer_hold_time()
1601 # Send the keepalive to indicate the connection is accepted.
1602 timer.snapshot() # Remember this time.
1603 msg_out = generator.keepalive_message()
1604 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1605 bgp_socket.send(msg_out)
1606 # Use the remembered time.
1607 timer.reset_my_keepalive_time(timer.snapshot_time)
1608 # End of initial handshake phase.
1609 state = StateTracker(bgp_socket, generator, timer)
1610 while True: # main reactor loop
1611 state.perform_one_loop_iteration()
1614 def threaded_job(arguments):
1615 """Run the job threaded
1618 :arguments: Command line arguments
1622 amount_left = arguments.amount
1623 utils_left = arguments.multiplicity
1624 prefix_current = arguments.firstprefix
1625 myip_current = arguments.myip
1629 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1630 amount_left -= amount_per_util
1633 args = deepcopy(arguments)
1634 args.amount = amount_per_util
1635 args.firstprefix = prefix_current
1636 args.myip = myip_current
1637 thread_args.append(args)
1641 prefix_current += amount_per_util * 16
1646 for t in thread_args:
1647 thread.start_new_thread(job, (t,))
1649 print "Error: unable to start thread."
1652 # Work remains forever
1657 if __name__ == "__main__":
1658 arguments = parse_arguments()
1659 logger = create_logger(arguments.loglevel, arguments.logfile)
1660 if arguments.multiplicity > 1:
1661 threaded_job(arguments)