1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
29 def parse_arguments():
30 """Use argparse to get arguments,
35 parser = argparse.ArgumentParser()
36 # TODO: Should we use --argument-names-with-spaces?
37 str_help = "Autonomous System number use in the stream."
38 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
39 # FIXME: We are acting as iBGP peer,
40 # we should mirror AS number from peer's open message.
41 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
42 parser.add_argument("--amount", default="1", type=int, help=str_help)
43 str_help = "Maximum number of IP prefixes to be announced in one iteration"
44 parser.add_argument("--insert", default="1", type=int, help=str_help)
45 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
46 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
47 str_help = "The number of prefixes to process without withdrawals"
48 parser.add_argument("--prefill", default="0", type=int, help=str_help)
49 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
50 parser.add_argument("--updates", choices=["single", "mixed"],
51 default=["mixed"], help=str_help)
52 str_help = "Base prefix IP address for prefix generation"
53 parser.add_argument("--firstprefix", default="8.0.1.0",
54 type=ipaddr.IPv4Address, help=str_help)
55 str_help = "The prefix length."
56 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
57 str_help = "Listen for connection, instead of initiating it."
58 parser.add_argument("--listen", action="store_true", help=str_help)
59 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
60 "Default value only suitable for listening.")
61 parser.add_argument("--myip", default="0.0.0.0",
62 type=ipaddr.IPv4Address, help=str_help)
63 str_help = ("TCP port to bind to when listening or initiating connection." +
64 "Default only suitable for initiating.")
65 parser.add_argument("--myport", default="0", type=int, help=str_help)
66 str_help = "The IP of the next hop to be placed into the update messages."
67 parser.add_argument("--nexthop", default="192.0.2.1",
68 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
69 str_help = ("Numeric IP Address to try to connect to." +
70 "Currently no effect in listening mode.")
71 parser.add_argument("--peerip", default="127.0.0.2",
72 type=ipaddr.IPv4Address, help=str_help)
73 str_help = "TCP port to try to connect to. No effect in listening mode."
74 parser.add_argument("--peerport", default="179", type=int, help=str_help)
75 str_help = "Local hold time."
76 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
77 str_help = "Log level (--error, --warning, --info, --debug)"
78 parser.add_argument("--error", dest="loglevel", action="store_const",
79 const=logging.ERROR, default=logging.ERROR,
81 parser.add_argument("--warning", dest="loglevel", action="store_const",
82 const=logging.WARNING, default=logging.ERROR,
84 parser.add_argument("--info", dest="loglevel", action="store_const",
85 const=logging.INFO, default=logging.ERROR,
87 parser.add_argument("--debug", dest="loglevel", action="store_const",
88 const=logging.DEBUG, default=logging.ERROR,
90 str_help = "Trailing part of the csv result files for plotting purposes"
91 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
92 str_help = "Minimum number of updates to reach to include result into csv."
93 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
94 arguments = parser.parse_args()
95 # TODO: Are sanity checks (such as asnumber>=0) required?
99 def establish_connection(arguments):
100 """Establish connection to BGP peer.
103 :arguments: following command-line argumets are used
104 - arguments.myip: local IP address
105 - arguments.myport: local port
106 - arguments.peerip: remote IP address
107 - arguments.peerport: remote port
112 logging.info("Connecting in the listening mode.")
113 logging.debug("Local IP address: " + str(arguments.myip))
114 logging.debug("Local port: " + str(arguments.myport))
115 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
116 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
117 # bind need single tuple as argument
118 listening_socket.bind((str(arguments.myip), arguments.myport))
119 listening_socket.listen(1)
120 bgp_socket, _ = listening_socket.accept()
121 # TODO: Verify client IP is cotroller IP.
122 listening_socket.close()
124 logging.info("Connecting in the talking mode.")
125 logging.debug("Local IP address: " + str(arguments.myip))
126 logging.debug("Local port: " + str(arguments.myport))
127 logging.debug("Remote IP address: " + str(arguments.peerip))
128 logging.debug("Remote port: " + str(arguments.peerport))
129 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
130 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
131 # bind to force specified address and port
132 talking_socket.bind((str(arguments.myip), arguments.myport))
133 # socket does not spead ipaddr, hence str()
134 talking_socket.connect((str(arguments.peerip), arguments.peerport))
135 bgp_socket = talking_socket
136 logging.info("Connected to ODL.")
140 def get_short_int_from_message(message, offset=16):
141 """Extract 2-bytes number from provided message.
144 :message: given message
145 :offset: offset of the short_int inside the message
147 :return: required short_inf value.
149 default offset value is the BGP message size offset.
151 high_byte_int = ord(message[offset])
152 low_byte_int = ord(message[offset + 1])
153 short_int = high_byte_int * 256 + low_byte_int
157 class MessageError(ValueError):
158 """Value error with logging optimized for hexlified messages.
161 def __init__(self, text, message, *args):
164 Store and call super init for textual comment,
165 store raw message which caused it.
169 super(MessageError, self).__init__(text, message, *args)
172 """Generate human readable error message.
175 :return: human readable message as string
177 Use a placeholder string if the message is to be empty.
179 message = binascii.hexlify(self.msg)
181 message = "(empty message)"
182 return self.text + ": " + message
185 def read_open_message(bgp_socket):
186 """Receive peer's OPEN message
189 :bgp_socket: the socket to be read
191 :return: received OPEN message.
193 Performs just basic incomming message checks
195 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
196 # TODO: Can the incoming open message be split in more than one packet?
199 # 37 is minimal length of open message with 4-byte AS number.
200 logging.error("Got something else than open with 4-byte AS number: " +
201 binascii.hexlify(msg_in))
202 raise MessageError("Got something else than open with 4-byte AS number",
204 # TODO: We could check BGP marker, but it is defined only later;
206 reported_length = get_short_int_from_message(msg_in)
207 if len(msg_in) != reported_length:
208 logging.error("Message length is not " + str(reported_length) +
209 " as stated in " + binascii.hexlify(msg_in))
210 raise MessageError("Message length is not " + reported_length +
211 " as stated in ", msg_in)
212 logging.info("Open message received.")
216 class MessageGenerator(object):
217 """Class which generates messages, holds states and configuration values.
220 # TODO: Define bgp marker as a class (constant) variable.
221 def __init__(self, args):
222 """Initialisation according to command-line args.
225 :args: argsparser's Namespace object which contains command-line
226 options for MesageGenerator initialisation
228 Calculates and stores default values used later on for
231 self.total_prefix_amount = args.amount
232 # Number of update messages left to be sent.
233 self.remaining_prefixes = self.total_prefix_amount
235 # New parameters initialisation
237 self.prefix_base_default = args.firstprefix
238 self.prefix_length_default = args.prefixlen
239 self.wr_prefixes_default = []
240 self.nlri_prefixes_default = []
241 self.version_default = 4
242 self.my_autonomous_system_default = args.asnumber
243 self.hold_time_default = args.holdtime # Local hold time.
244 self.bgp_identifier_default = int(args.myip)
245 self.next_hop_default = args.nexthop
246 self.single_update_default = args.updates == "single"
247 self.randomize_updates_default = args.updates == "random"
248 self.prefix_count_to_add_default = args.insert
249 self.prefix_count_to_del_default = args.withdraw
250 if self.prefix_count_to_del_default < 0:
251 self.prefix_count_to_del_default = 0
252 if not self.single_update_default and not self.prefix_count_to_del_default:
253 self.prefix_count_to_del_default = 1
254 # we need some content for the 2nd UPDATE in the iteration
255 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
256 # total number of prefixes must grow to avoid infinite test loop
257 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
258 self.slot_size_default = self.prefix_count_to_add_default
259 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
260 self.results_file_name_default = args.results
261 self.performance_threshold_default = args.threshold
262 # Default values used for randomized part
263 s1_slots = ((self.total_prefix_amount -
264 self.remaining_prefixes_threshold - 1) /
265 self.prefix_count_to_add_default + 1)
266 s2_slots = ((self.remaining_prefixes_threshold - 1) /
267 (self.prefix_count_to_add_default -
268 self.prefix_count_to_del_default) + 1)
270 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
271 s2_first_index = s1_slots * self.prefix_count_to_add_default
272 s2_last_index = (s2_first_index +
273 s2_slots * (self.prefix_count_to_add_default -
274 self.prefix_count_to_del_default) - 1)
275 self.slot_gap_default = ((self.total_prefix_amount -
276 self.remaining_prefixes_threshold - 1) /
277 self.prefix_count_to_add_default + 1)
278 self.randomize_lowest_default = s2_first_index
279 self.randomize_highest_default = s2_last_index
281 # Initialising counters
282 self.phase1_start_time = 0
283 self.phase1_stop_time = 0
284 self.phase2_start_time = 0
285 self.phase2_stop_time = 0
286 self.phase1_updates_sent = 0
287 self.phase2_updates_sent = 0
288 self.updates_sent = 0
290 # Needed for the MessageGenerator performance optimization
291 self.log_info = args.loglevel <= logging.INFO
292 self.log_debug = args.loglevel <= logging.DEBUG
294 logging.info("Generator initialisation")
295 logging.info(" Target total number of prefixes to be introduced: " +
296 str(self.total_prefix_amount))
297 logging.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
298 str(self.prefix_length_default))
299 logging.info(" My Autonomous System number: " +
300 str(self.my_autonomous_system_default))
301 logging.info(" My Hold Time: " + str(self.hold_time_default))
302 logging.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
303 logging.info(" Next Hop: " + str(self.next_hop_default))
304 logging.info(" Prefix count to be inserted at once: " +
305 str(self.prefix_count_to_add_default))
306 logging.info(" Prefix count to be withdrawn at once: " +
307 str(self.prefix_count_to_del_default))
308 logging.info(" Fast pre-fill up to " +
309 str(self.total_prefix_amount -
310 self.remaining_prefixes_threshold) + " prefixes")
311 logging.info(" Remaining number of prefixes to be processed " +
312 "in parallel with withdrawals: " +
313 str(self.remaining_prefixes_threshold))
314 logging.debug(" Prefix index range used after pre-fill procedure [" +
315 str(self.randomize_lowest_default) + ", " +
316 str(self.randomize_highest_default) + "]")
317 if self.single_update_default:
318 logging.info(" Common single UPDATE will be generated " +
319 "for both NLRI & WITHDRAWN lists")
321 logging.info(" Two separate UPDATEs will be generated " +
322 "for each NLRI & WITHDRAWN lists")
323 if self.randomize_updates_default:
324 logging.info(" Generation of UPDATE messages will be randomized")
325 logging.info(" Let\"s go ...\n")
327 # TODO: Notification for hold timer expiration can be handy.
329 def store_results(self, file_name=None, threshold=None):
330 """ Stores specified results into files based on file_name value.
333 :param file_name: Trailing (common) part of result file names
334 :param threshold: Minimum number of sent updates needed for each
335 result to be included into result csv file
336 (mainly needed because of the result accuracy)
340 # default values handling
341 if file_name is None:
342 file_name = self.results_file_name_default
343 if threshold is None:
344 threshold = self.performance_threshold_default
345 # performance calculation
346 if self.phase1_updates_sent >= threshold:
347 totals1 = self.phase1_updates_sent
348 performance1 = int(self.phase1_updates_sent /
349 (self.phase1_stop_time - self.phase1_start_time))
353 if self.phase2_updates_sent >= threshold:
354 totals2 = self.phase2_updates_sent
355 performance2 = int(self.phase2_updates_sent /
356 (self.phase2_stop_time - self.phase2_start_time))
361 logging.info("#" * 10 + " Final results " + "#" * 10)
362 logging.info("Number of iterations: " + str(self.iteration))
363 logging.info("Number of UPDATE messages sent in the pre-fill phase: " +
364 str(self.phase1_updates_sent))
365 logging.info("The pre-fill phase duration: " +
366 str(self.phase1_stop_time - self.phase1_start_time) + "s")
367 logging.info("Number of UPDATE messages sent in the 2nd test phase: " +
368 str(self.phase2_updates_sent))
369 logging.info("The 2nd test phase duration: " +
370 str(self.phase2_stop_time - self.phase2_start_time) + "s")
371 logging.info("Threshold for performance reporting: " + str(threshold))
374 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
375 " route(s) per UPDATE")
376 if self.single_update_default:
377 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
378 "/-" + str(self.prefix_count_to_del_default) +
379 " routes per UPDATE")
381 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
382 "/-" + str(self.prefix_count_to_del_default) +
383 " routes in two UPDATEs")
384 # collecting capacity and performance results
387 if totals1 is not None:
388 totals[phase1_label] = totals1
389 performance[phase1_label] = performance1
390 if totals2 is not None:
391 totals[phase2_label] = totals2
392 performance[phase2_label] = performance2
393 self.write_results_to_file(totals, "totals-" + file_name)
394 self.write_results_to_file(performance, "performance-" + file_name)
396 def write_results_to_file(self, results, file_name):
397 """Writes results to the csv plot file consumable by Jenkins.
400 :param file_name: Name of the (csv) file to be created
406 f = open(file_name, "wt")
408 for key in sorted(results):
409 first_line += key + ", "
410 second_line += str(results[key]) + ", "
411 first_line = first_line[:-2]
412 second_line = second_line[:-2]
413 f.write(first_line + "\n")
414 f.write(second_line + "\n")
415 logging.info("Performance results of message generator stored in " +
417 logging.info(" " + first_line)
418 logging.info(" " + second_line)
422 # Return pseudo-randomized (reproducible) index for selected range
423 def randomize_index(self, index, lowest=None, highest=None):
424 """Calculates pseudo-randomized index from selected range.
427 :param index: input index
428 :param lowest: the lowes index from the randomized area
429 :param highest: the highest index from the randomized area
431 :return: the (pseudo)randomized index
433 Created just as a fame for future generator enhancement.
435 # default values handling
437 lowest = self.randomize_lowest_default
439 highest = self.randomize_highest_default
441 if (index >= lowest) and (index <= highest):
442 # we are in the randomized range -> shuffle it inside
443 # the range (now just reverse the order)
444 new_index = highest - (index - lowest)
446 # we are out of the randomized range -> nothing to do
450 # Get list of prefixes
451 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
452 prefix_len=None, prefix_count=None, randomize=None):
453 """Generates list of IP address prefixes.
456 :param slot_index: index of group of prefix addresses
457 :param slot_size: size of group of prefix addresses
458 in [number of included prefixes]
459 :param prefix_base: IP address of the first prefix
460 (slot_index = 0, prefix_index = 0)
461 :param prefix_len: length of the prefix in bites
462 (the same as size of netmask)
463 :param prefix_count: number of prefixes to be returned
464 from the specified slot
466 :return: list of generated IP address prefixes
468 # default values handling
469 if slot_size is None:
470 slot_size = self.slot_size_default
471 if prefix_base is None:
472 prefix_base = self.prefix_base_default
473 if prefix_len is None:
474 prefix_len = self.prefix_length_default
475 if prefix_count is None:
476 prefix_count = slot_size
477 if randomize is None:
478 randomize = self.randomize_updates_default
479 # generating list of prefixes
482 prefix_gap = 2 ** (32 - prefix_len)
483 for i in range(prefix_count):
484 prefix_index = slot_index * slot_size + i
486 prefix_index = self.randomize_index(prefix_index)
487 indexes.append(prefix_index)
488 prefixes.append(prefix_base + prefix_index * prefix_gap)
490 logging.debug(" Prefix slot index: " + str(slot_index))
491 logging.debug(" Prefix slot size: " + str(slot_size))
492 logging.debug(" Prefix count: " + str(prefix_count))
493 logging.debug(" Prefix indexes: " + str(indexes))
494 logging.debug(" Prefix list: " + str(prefixes))
497 def compose_update_message(self, prefix_count_to_add=None,
498 prefix_count_to_del=None):
499 """Composes an UPDATE message
502 :param prefix_count_to_add: # of prefixes to put into NLRI list
503 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
505 :return: encoded UPDATE message in HEX
507 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
508 lists or common message wich includes both prefix lists.
509 Updates global counters.
511 # default values handling
512 if prefix_count_to_add is None:
513 prefix_count_to_add = self.prefix_count_to_add_default
514 if prefix_count_to_del is None:
515 prefix_count_to_del = self.prefix_count_to_del_default
517 if self.log_info and not (self.iteration % 1000):
518 logging.info("Iteration: " + str(self.iteration) +
519 " - total remaining prefixes: " +
520 str(self.remaining_prefixes))
522 logging.debug("#" * 10 + " Iteration: " +
523 str(self.iteration) + " " + "#" * 10)
524 logging.debug("Remaining prefixes: " +
525 str(self.remaining_prefixes))
526 # scenario type & one-shot counter
527 straightforward_scenario = (self.remaining_prefixes >
528 self.remaining_prefixes_threshold)
529 if straightforward_scenario:
530 prefix_count_to_del = 0
532 logging.debug("--- STARAIGHTFORWARD SCENARIO ---")
533 if not self.phase1_start_time:
534 self.phase1_start_time = time.time()
537 logging.debug("--- COMBINED SCENARIO ---")
538 if not self.phase2_start_time:
539 self.phase2_start_time = time.time()
540 # tailor the number of prefixes if needed
541 prefix_count_to_add = (prefix_count_to_del +
542 min(prefix_count_to_add - prefix_count_to_del,
543 self.remaining_prefixes))
544 # prefix slots selection for insertion and withdrawal
545 slot_index_to_add = self.iteration
546 slot_index_to_del = slot_index_to_add - self.slot_gap_default
547 # getting lists of prefixes for insertion in this iteration
549 logging.debug("Prefixes to be inserted in this iteration:")
550 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
551 prefix_count=prefix_count_to_add)
552 # getting lists of prefixes for withdrawal in this iteration
554 logging.debug("Prefixes to be withdrawn in this iteration:")
555 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
556 prefix_count=prefix_count_to_del)
557 # generating the mesage
558 if self.single_update_default:
559 # Send prefixes to be introduced and withdrawn
560 # in one UPDATE message
561 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
562 nlri_prefixes=prefix_list_to_add)
564 # Send prefixes to be introduced and withdrawn
565 # in separate UPDATE messages (if needed)
566 msg_out = self.update_message(wr_prefixes=[],
567 nlri_prefixes=prefix_list_to_add)
568 if prefix_count_to_del:
569 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
571 # updating counters - who knows ... maybe I am last time here ;)
572 if straightforward_scenario:
573 self.phase1_stop_time = time.time()
574 self.phase1_updates_sent = self.updates_sent
576 self.phase2_stop_time = time.time()
577 self.phase2_updates_sent = (self.updates_sent -
578 self.phase1_updates_sent)
579 # updating totals for the next iteration
581 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
582 # returning the encoded message
585 # Section of message encoders
587 def open_message(self, version=None, my_autonomous_system=None,
588 hold_time=None, bgp_identifier=None):
589 """Generates an OPEN Message (rfc4271#section-4.2)
592 :param version: see the rfc4271#section-4.2
593 :param my_autonomous_system: see the rfc4271#section-4.2
594 :param hold_time: see the rfc4271#section-4.2
595 :param bgp_identifier: see the rfc4271#section-4.2
597 :return: encoded OPEN message in HEX
600 # Default values handling
602 version = self.version_default
603 if my_autonomous_system is None:
604 my_autonomous_system = self.my_autonomous_system_default
605 if hold_time is None:
606 hold_time = self.hold_time_default
607 if bgp_identifier is None:
608 bgp_identifier = self.bgp_identifier_default
611 marker_hex = "\xFF" * 16
615 type_hex = struct.pack("B", type)
618 version_hex = struct.pack("B", version)
620 # my_autonomous_system
621 # AS_TRANS value, 23456 decadic.
622 my_autonomous_system_2_bytes = 23456
623 # AS number is mappable to 2 bytes
624 if my_autonomous_system < 65536:
625 my_autonomous_system_2_bytes = my_autonomous_system
626 my_autonomous_system_hex_2_bytes = struct.pack(">H",
627 my_autonomous_system)
630 hold_time_hex = struct.pack(">H", hold_time)
633 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
635 # Optional Parameters
636 optional_parameters_hex = (
637 "\x02" # Param type ("Capability Ad")
638 "\x06" # Length (6 bytes)
639 "\x01" # Capability type (NLRI Unicast),
640 # see RFC 4760, secton 8
641 "\x04" # Capability value length
642 "\x00\x01" # AFI (Ipv4)
644 "\x01" # SAFI (Unicast)
646 "\x02" # Param type ("Capability Ad")
647 "\x06" # Length (6 bytes)
648 "\x41" # "32 bit AS Numbers Support"
649 # (see RFC 6793, section 3)
650 "\x04" # Capability value length
651 # My AS in 32 bit format
652 + struct.pack(">I", my_autonomous_system)
655 # Optional Parameters Length
656 optional_parameters_length = len(optional_parameters_hex)
657 optional_parameters_length_hex = struct.pack("B",
658 optional_parameters_length)
660 # Length (big-endian)
662 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
663 len(my_autonomous_system_hex_2_bytes) +
664 len(hold_time_hex) + len(bgp_identifier_hex) +
665 len(optional_parameters_length_hex) +
666 len(optional_parameters_hex)
668 length_hex = struct.pack(">H", length)
676 my_autonomous_system_hex_2_bytes +
679 optional_parameters_length_hex +
680 optional_parameters_hex
684 logging.debug("OPEN Message encoding")
685 logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
686 logging.debug(" Length=" + str(length) + " (0x" +
687 binascii.hexlify(length_hex) + ")")
688 logging.debug(" Type=" + str(type) + " (0x" +
689 binascii.hexlify(type_hex) + ")")
690 logging.debug(" Version=" + str(version) + " (0x" +
691 binascii.hexlify(version_hex) + ")")
692 logging.debug(" My Autonomous System=" +
693 str(my_autonomous_system_2_bytes) + " (0x" +
694 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
696 logging.debug(" Hold Time=" + str(hold_time) + " (0x" +
697 binascii.hexlify(hold_time_hex) + ")")
698 logging.debug(" BGP Identifier=" + str(bgp_identifier) +
699 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
700 logging.debug(" Optional Parameters Length=" +
701 str(optional_parameters_length) + " (0x" +
702 binascii.hexlify(optional_parameters_length_hex) +
704 logging.debug(" Optional Parameters=0x" +
705 binascii.hexlify(optional_parameters_hex))
706 logging.debug(" OPEN Message encoded: 0x" +
707 binascii.b2a_hex(message_hex))
711 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
712 wr_prefix_length=None, nlri_prefix_length=None,
713 my_autonomous_system=None, next_hop=None):
714 """Generates an UPDATE Message (rfc4271#section-4.3)
717 :param wr_prefixes: see the rfc4271#section-4.3
718 :param nlri_prefixes: see the rfc4271#section-4.3
719 :param wr_prefix_length: see the rfc4271#section-4.3
720 :param nlri_prefix_length: see the rfc4271#section-4.3
721 :param my_autonomous_system: see the rfc4271#section-4.3
722 :param next_hop: see the rfc4271#section-4.3
724 :return: encoded UPDATE message in HEX
727 # Default values handling
728 if wr_prefixes is None:
729 wr_prefixes = self.wr_prefixes_default
730 if nlri_prefixes is None:
731 nlri_prefixes = self.nlri_prefixes_default
732 if wr_prefix_length is None:
733 wr_prefix_length = self.prefix_length_default
734 if nlri_prefix_length is None:
735 nlri_prefix_length = self.prefix_length_default
736 if my_autonomous_system is None:
737 my_autonomous_system = self.my_autonomous_system_default
739 next_hop = self.next_hop_default
742 marker_hex = "\xFF" * 16
746 type_hex = struct.pack("B", type)
749 bytes = ((wr_prefix_length - 1) / 8) + 1
750 withdrawn_routes_hex = ""
751 for prefix in wr_prefixes:
752 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
753 struct.pack(">I", int(prefix))[:bytes])
754 withdrawn_routes_hex += withdrawn_route_hex
756 # Withdrawn Routes Length
757 withdrawn_routes_length = len(withdrawn_routes_hex)
758 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
760 # TODO: to replace hardcoded string by encoding?
762 if nlri_prefixes != []:
763 path_attributes_hex = (
764 "\x40" # Flags ("Well-Known")
765 "\x01" # Type (ORIGIN)
768 "\x40" # Flags ("Well-Known")
769 "\x02" # Type (AS_PATH)
771 "\x02" # AS segment type (AS_SEQUENCE)
772 "\x01" # AS segment length (1)
773 # AS segment (4 bytes)
774 + struct.pack(">I", my_autonomous_system) +
775 "\x40" # Flags ("Well-Known")
776 "\x03" # Type (NEXT_HOP)
778 # IP address of the next hop (4 bytes)
779 + struct.pack(">I", int(next_hop))
782 path_attributes_hex = ""
784 # Total Path Attributes Length
785 total_path_attributes_length = len(path_attributes_hex)
786 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
788 # Network Layer Reachability Information
789 bytes = ((nlri_prefix_length - 1) / 8) + 1
791 for prefix in nlri_prefixes:
792 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
793 struct.pack(">I", int(prefix))[:bytes])
794 nlri_hex += nlri_prefix_hex
796 # Length (big-endian)
798 len(marker_hex) + 2 + len(type_hex) +
799 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
800 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
802 length_hex = struct.pack(">H", length)
809 withdrawn_routes_length_hex +
810 withdrawn_routes_hex +
811 total_path_attributes_length_hex +
812 path_attributes_hex +
817 logging.debug("UPDATE Message encoding")
818 logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
819 logging.debug(" Length=" + str(length) + " (0x" +
820 binascii.hexlify(length_hex) + ")")
821 logging.debug(" Type=" + str(type) + " (0x" +
822 binascii.hexlify(type_hex) + ")")
823 logging.debug(" withdrawn_routes_length=" +
824 str(withdrawn_routes_length) + " (0x" +
825 binascii.hexlify(withdrawn_routes_length_hex) + ")")
826 logging.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
827 str(wr_prefix_length) + " (0x" +
828 binascii.hexlify(withdrawn_routes_hex) + ")")
829 logging.debug(" Total Path Attributes Length=" +
830 str(total_path_attributes_length) + " (0x" +
831 binascii.hexlify(total_path_attributes_length_hex) +
833 logging.debug(" Path Attributes=" + "(0x" +
834 binascii.hexlify(path_attributes_hex) + ")")
835 logging.debug(" Network Layer Reachability Information=" +
836 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
837 " (0x" + binascii.hexlify(nlri_hex) + ")")
838 logging.debug(" UPDATE Message encoded: 0x" +
839 binascii.b2a_hex(message_hex))
842 self.updates_sent += 1
843 # returning encoded message
846 def notification_message(self, error_code, error_subcode, data_hex=""):
847 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
850 :param error_code: see the rfc4271#section-4.5
851 :param error_subcode: see the rfc4271#section-4.5
852 :param data_hex: see the rfc4271#section-4.5
854 :return: encoded NOTIFICATION message in HEX
858 marker_hex = "\xFF" * 16
862 type_hex = struct.pack("B", type)
865 error_code_hex = struct.pack("B", error_code)
868 error_subcode_hex = struct.pack("B", error_subcode)
870 # Length (big-endian)
871 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
872 len(error_subcode_hex) + len(data_hex))
873 length_hex = struct.pack(">H", length)
875 # NOTIFICATION Message
886 logging.debug("NOTIFICATION Message encoding")
887 logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
888 logging.debug(" Length=" + str(length) + " (0x" +
889 binascii.hexlify(length_hex) + ")")
890 logging.debug(" Type=" + str(type) + " (0x" +
891 binascii.hexlify(type_hex) + ")")
892 logging.debug(" Error Code=" + str(error_code) + " (0x" +
893 binascii.hexlify(error_code_hex) + ")")
894 logging.debug(" Error Subode=" + str(error_subcode) + " (0x" +
895 binascii.hexlify(error_subcode_hex) + ")")
896 logging.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
897 logging.debug(" NOTIFICATION Message encoded: 0x" +
898 binascii.b2a_hex(message_hex))
902 def keepalive_message(self):
903 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
906 :return: encoded KEEP ALIVE message in HEX
910 marker_hex = "\xFF" * 16
914 type_hex = struct.pack("B", type)
916 # Length (big-endian)
917 length = len(marker_hex) + 2 + len(type_hex)
918 length_hex = struct.pack(">H", length)
928 logging.debug("KEEP ALIVE Message encoding")
929 logging.debug(" Marker=0x" + binascii.hexlify(marker_hex))
930 logging.debug(" Length=" + str(length) + " (0x" +
931 binascii.hexlify(length_hex) + ")")
932 logging.debug(" Type=" + str(type) + " (0x" +
933 binascii.hexlify(type_hex) + ")")
934 logging.debug(" KEEP ALIVE Message encoded: 0x" +
935 binascii.b2a_hex(message_hex))
940 class TimeTracker(object):
941 """Class for tracking timers, both for my keepalives and
945 def __init__(self, msg_in):
946 """Initialisation. based on defaults and OPEN message from peer.
949 msg_in: the OPEN message received from peer.
951 # Note: Relative time is always named timedelta, to stress that
952 # the (non-delta) time is absolute.
953 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
954 # Upper bound for being stuck in the same state, we should
955 # at least report something before continuing.
956 # Negotiate the hold timer by taking the smaller
957 # of the 2 values (mine and the peer's).
958 hold_timedelta = 180 # Not an attribute of self yet.
959 # TODO: Make the default value configurable,
960 # default value could mirror what peer said.
961 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
962 if hold_timedelta > peer_hold_timedelta:
963 hold_timedelta = peer_hold_timedelta
964 if hold_timedelta != 0 and hold_timedelta < 3:
965 logging.error("Invalid hold timedelta value: " + str(hold_timedelta))
966 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
967 self.hold_timedelta = hold_timedelta
968 # If we do not hear from peer this long, we assume it has died.
969 self.keepalive_timedelta = int(hold_timedelta / 3.0)
970 # Upper limit for duration between messages, to avoid being
971 # declared to be dead.
972 # The same as calling snapshot(), but also declares a field.
973 self.snapshot_time = time.time()
974 # Sometimes we need to store time. This is where to get
975 # the value from afterwards. Time_keepalive may be too strict.
976 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
977 # At this time point, peer will be declared dead.
978 self.my_keepalive_time = None # to be set later
979 # At this point, we should be sending keepalive message.
982 """Store current time in instance data to use later."""
983 # Read as time before something interesting was called.
984 self.snapshot_time = time.time()
986 def reset_peer_hold_time(self):
987 """Move hold time to future as peer has just proven it still lives."""
988 self.peer_hold_time = time.time() + self.hold_timedelta
990 # Some methods could rely on self.snapshot_time, but it is better
991 # to require user to provide it explicitly.
992 def reset_my_keepalive_time(self, keepalive_time):
993 """Calculate and set the next my KEEP ALIVE timeout time
996 :keepalive_time: the initial value of the KEEP ALIVE timer
998 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1000 def is_time_for_my_keepalive(self):
1001 """Check for my KEEP ALIVE timeout occurence"""
1002 if self.hold_timedelta == 0:
1004 return self.snapshot_time >= self.my_keepalive_time
1006 def get_next_event_time(self):
1007 """Set the time of the next expected or to be sent KEEP ALIVE"""
1008 if self.hold_timedelta == 0:
1009 return self.snapshot_time + 86400
1010 return min(self.my_keepalive_time, self.peer_hold_time)
1012 def check_peer_hold_time(self, snapshot_time):
1013 """Raise error if nothing was read from peer until specified time."""
1014 # Hold time = 0 means keepalive checking off.
1015 if self.hold_timedelta != 0:
1016 # time.time() may be too strict
1017 if snapshot_time > self.peer_hold_time:
1018 logging.error("Peer has overstepped the hold timer.")
1019 raise RuntimeError("Peer has overstepped the hold timer.")
1020 # TODO: Include hold_timedelta?
1021 # TODO: Add notification sending (attempt). That means
1022 # move to write tracker.
1025 class ReadTracker(object):
1026 """Class for tracking read of mesages chunk by chunk and
1030 def __init__(self, bgp_socket, timer):
1031 """The reader initialisation.
1034 bgp_socket: socket to be used for sending
1035 timer: timer to be used for scheduling
1037 # References to outside objects.
1038 self.socket = bgp_socket
1040 # BGP marker length plus length field length.
1041 self.header_length = 18
1042 # TODO: make it class (constant) attribute
1043 # Computation of where next chunk ends depends on whether
1044 # we are beyond length field.
1045 self.reading_header = True
1046 # Countdown towards next size computation.
1047 self.bytes_to_read = self.header_length
1048 # Incremental buffer for message under read.
1051 def read_message_chunk(self):
1052 """Read up to one message
1055 Currently it does not return anything.
1057 # TODO: We could return the whole message, currently not needed.
1058 # We assume the socket is readable.
1059 chunk_message = self.socket.recv(self.bytes_to_read)
1060 self.msg_in += chunk_message
1061 self.bytes_to_read -= len(chunk_message)
1062 # TODO: bytes_to_read < 0 is not possible, right?
1063 if not self.bytes_to_read:
1064 # Finished reading a logical block.
1065 if self.reading_header:
1066 # The logical block was a BGP header.
1067 # Now we know the size of the message.
1068 self.reading_header = False
1069 self.bytes_to_read = get_short_int_from_message(self.msg_in)
1070 else: # We have finished reading the body of the message.
1071 # Peer has just proven it is still alive.
1072 self.timer.reset_peer_hold_time()
1073 # TODO: Do we want to count received messages?
1074 # This version ignores the received message.
1075 # TODO: Should we do validation and exit on anything
1076 # besides update or keepalive?
1077 # Prepare state for reading another message.
1079 self.reading_header = True
1080 self.bytes_to_read = self.header_length
1081 # We should not act upon peer_hold_time if we are reading
1082 # something right now.
1085 def wait_for_read(self):
1086 """Read message until timeout (next expected event).
1089 Used when no more updates has to be sent to avoid busy-wait.
1090 Currently it does not return anything.
1092 # Compute time to the first predictable state change
1093 event_time = self.timer.get_next_event_time()
1094 # snapshot_time would be imprecise
1095 wait_timedelta = event_time - time.time()
1096 if wait_timedelta < 0:
1097 # The program got around to waiting to an event in "very near
1098 # future" so late that it became a "past" event, thus tell
1099 # "select" to not wait at all. Passing negative timedelta to
1100 # select() would lead to either waiting forever (for -1) or
1101 # select.error("Invalid parameter") (for everything else).
1103 # And wait for event or something to read.
1104 select.select([self.socket], [], [self.socket], wait_timedelta)
1105 # Not checking anything, that will be done in next iteration.
1109 class WriteTracker(object):
1110 """Class tracking enqueueing messages and sending chunks of them.
1113 def __init__(self, bgp_socket, generator, timer):
1114 """The writter initialisation.
1117 bgp_socket: socket to be used for sending
1118 generator: generator to be used for message generation
1119 timer: timer to be used for scheduling
1121 # References to outside objects,
1122 self.socket = bgp_socket
1123 self.generator = generator
1125 # Really new fields.
1126 # TODO: Would attribute docstrings add anything substantial?
1127 self.sending_message = False
1128 self.bytes_to_send = 0
1131 def enqueue_message_for_sending(self, message):
1132 """Enqueue message and change state.
1135 message: message to be enqueued into the msg_out buffer
1137 self.msg_out += message
1138 self.bytes_to_send += len(message)
1139 self.sending_message = True
1141 def send_message_chunk_is_whole(self):
1142 """Send enqueued data from msg_out buffer
1145 :return: true if no remaining data to send
1147 # We assume there is a msg_out to send and socket is writable.
1148 # print "going to send", repr(self.msg_out)
1149 self.timer.snapshot()
1150 bytes_sent = self.socket.send(self.msg_out)
1151 # Forget the part of message that was sent.
1152 self.msg_out = self.msg_out[bytes_sent:]
1153 self.bytes_to_send -= bytes_sent
1154 if not self.bytes_to_send:
1155 # TODO: Is it possible to hit negative bytes_to_send?
1156 self.sending_message = False
1157 # We should have reset hold timer on peer side.
1158 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1159 # The possible reason for not prioritizing reads is gone.
1164 class StateTracker(object):
1165 """Main loop has state so complex it warrants this separate class."""
1167 def __init__(self, bgp_socket, generator, timer):
1168 """The state tracker initialisation.
1171 bgp_socket: socket to be used for sending / receiving
1172 generator: generator to be used for message generation
1173 timer: timer to be used for scheduling
1175 # References to outside objects.
1176 self.socket = bgp_socket
1177 self.generator = generator
1180 self.reader = ReadTracker(bgp_socket, timer)
1181 self.writer = WriteTracker(bgp_socket, generator, timer)
1182 # Prioritization state.
1183 self.prioritize_writing = False
1184 # In general, we prioritize reading over writing. But in order
1185 # not to get blocked by neverending reads, we should
1186 # check whether we are not risking running out of holdtime.
1187 # So in some situations, this field is set to True to attempt
1188 # finishing sending a message, after which this field resets
1190 # TODO: Alternative is to switch fairly between reading and
1191 # writing (called round robin from now on).
1192 # Message counting is done in generator.
1194 def perform_one_loop_iteration(self):
1195 """ The main loop iteration
1198 Calculates priority, resolves all conditions, calls
1199 appropriate method and returns to caller to repeat.
1201 self.timer.snapshot()
1202 if not self.prioritize_writing:
1203 if self.timer.is_time_for_my_keepalive():
1204 if not self.writer.sending_message:
1205 # We need to schedule a keepalive ASAP.
1206 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1207 # We are sending a message now, so let's prioritize it.
1208 self.prioritize_writing = True
1209 # Now we know what our priorities are, we have to check
1210 # which actions are available.
1211 # socket.socket() returns three lists,
1212 # we store them to list of lists.
1213 list_list = select.select([self.socket], [self.socket], [self.socket],
1214 self.timer.report_timedelta)
1215 read_list, write_list, except_list = list_list
1216 # Lists are unpacked, each is either [] or [self.socket],
1217 # so we will test them as boolean.
1219 logging.error("Exceptional state on the socket.")
1220 raise RuntimeError("Exceptional state on socket", self.socket)
1221 # We will do either read or write.
1222 if not (self.prioritize_writing and write_list):
1223 # Either we have no reason to rush writes,
1224 # or the socket is not writable.
1225 # We are focusing on reading here.
1226 if read_list: # there is something to read indeed
1227 # In this case we want to read chunk of message
1228 # and repeat the select,
1229 self.reader.read_message_chunk()
1231 # We were focusing on reading, but nothing to read was there.
1232 # Good time to check peer for hold timer.
1233 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1234 # Quiet on the read front, we can have attempt to write.
1236 # Either we really want to reset peer's view of our hold
1237 # timer, or there was nothing to read.
1238 # Were we in the middle of sending a message?
1239 if self.writer.sending_message:
1240 # Was it the end of a message?
1241 whole = self.writer.send_message_chunk_is_whole()
1242 # We were pressed to send something and we did it.
1243 if self.prioritize_writing and whole:
1244 # We prioritize reading again.
1245 self.prioritize_writing = False
1247 # Finally to check if still update messages to be generated.
1248 if self.generator.remaining_prefixes:
1249 msg_out = self.generator.compose_update_message()
1250 if not self.generator.remaining_prefixes:
1251 # We have just finished update generation,
1252 # end-of-rib is due.
1253 logging.info("All update messages generated.")
1254 logging.info("Storing performance results.")
1255 self.generator.store_results()
1256 logging.info("Finally an END-OF-RIB is going to be sent.")
1257 msg_out += self.generator.update_message(wr_prefixes=[],
1259 self.writer.enqueue_message_for_sending(msg_out)
1260 # Attempt for real sending to be done in next iteration.
1262 # Nothing to write anymore, except occasional keepalives.
1263 logging.info("Everything has been done." +
1264 "Now just waiting for possible incomming message.")
1265 # To avoid busy loop, we do idle waiting here.
1266 self.reader.wait_for_read()
1268 # We can neither read nor write.
1269 logging.warning("Input and output both blocked for " +
1270 str(self.timer.report_timedelta) + " seconds.")
1271 # FIXME: Are we sure select has been really waiting
1277 """ One time initialisation and iterations looping.
1280 Establish BGP connection and run iterations.
1282 arguments = parse_arguments()
1283 logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s",
1284 level=arguments.loglevel)
1285 bgp_socket = establish_connection(arguments)
1286 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1287 # Receive open message before sending anything.
1288 # FIXME: Add parameter to send default open message first,
1289 # to work with "you first" peers.
1290 msg_in = read_open_message(bgp_socket)
1291 timer = TimeTracker(msg_in)
1292 generator = MessageGenerator(arguments)
1293 msg_out = generator.open_message()
1294 logging.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1295 # Send our open message to the peer.
1296 bgp_socket.send(msg_out)
1297 # Wait for confirming keepalive.
1298 # TODO: Surely in just one packet?
1299 # Using exact keepalive length to not to see possible updates.
1300 msg_in = bgp_socket.recv(19)
1301 if msg_in != generator.keepalive_message():
1302 logging.error("Open not confirmed by keepalive, instead got " +
1303 binascii.hexlify(msg_in))
1304 raise MessageError("Open not confirmed by keepalive, instead got",
1306 timer.reset_peer_hold_time()
1307 # Send the keepalive to indicate the connection is accepted.
1308 timer.snapshot() # Remember this time.
1309 msg_out = generator.keepalive_message()
1310 logging.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1311 bgp_socket.send(msg_out)
1312 # Use the remembered time.
1313 timer.reset_my_keepalive_time(timer.snapshot_time)
1314 # End of initial handshake phase.
1315 state = StateTracker(bgp_socket, generator, timer)
1316 while True: # main reactor loop
1317 state.perform_one_loop_iteration()
1319 if __name__ == "__main__":