1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
24 from copy import deepcopy
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
33 def parse_arguments():
34 """Use argparse to get arguments,
39 parser = argparse.ArgumentParser()
40 # TODO: Should we use --argument-names-with-spaces?
41 str_help = "Autonomous System number use in the stream."
42 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43 # FIXME: We are acting as iBGP peer,
44 # we should mirror AS number from peer's open message.
45 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46 parser.add_argument("--amount", default="1", type=int, help=str_help)
47 str_help = "Maximum number of IP prefixes to be announced in one iteration"
48 parser.add_argument("--insert", default="1", type=int, help=str_help)
49 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51 str_help = "The number of prefixes to process without withdrawals"
52 parser.add_argument("--prefill", default="0", type=int, help=str_help)
53 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54 parser.add_argument("--updates", choices=["single", "separate"],
55 default=["separate"], help=str_help)
56 str_help = "Base prefix IP address for prefix generation"
57 parser.add_argument("--firstprefix", default="8.0.1.0",
58 type=ipaddr.IPv4Address, help=str_help)
59 str_help = "The prefix length."
60 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61 str_help = "Listen for connection, instead of initiating it."
62 parser.add_argument("--listen", action="store_true", help=str_help)
63 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64 "Default value only suitable for listening.")
65 parser.add_argument("--myip", default="0.0.0.0",
66 type=ipaddr.IPv4Address, help=str_help)
67 str_help = ("TCP port to bind to when listening or initiating connection." +
68 "Default only suitable for initiating.")
69 parser.add_argument("--myport", default="0", type=int, help=str_help)
70 str_help = "The IP of the next hop to be placed into the update messages."
71 parser.add_argument("--nexthop", default="192.0.2.1",
72 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73 str_help = "Identifier of the route originator."
74 parser.add_argument("--originator", default=None,
75 type=ipaddr.IPv4Address, dest="originator", help=str_help)
76 str_help = "Cluster list item identifier."
77 parser.add_argument("--cluster", default=None,
78 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79 str_help = ("Numeric IP Address to try to connect to." +
80 "Currently no effect in listening mode.")
81 parser.add_argument("--peerip", default="127.0.0.2",
82 type=ipaddr.IPv4Address, help=str_help)
83 str_help = "TCP port to try to connect to. No effect in listening mode."
84 parser.add_argument("--peerport", default="179", type=int, help=str_help)
85 str_help = "Local hold time."
86 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87 str_help = "Log level (--error, --warning, --info, --debug)"
88 parser.add_argument("--error", dest="loglevel", action="store_const",
89 const=logging.ERROR, default=logging.INFO,
91 parser.add_argument("--warning", dest="loglevel", action="store_const",
92 const=logging.WARNING, default=logging.INFO,
94 parser.add_argument("--info", dest="loglevel", action="store_const",
95 const=logging.INFO, default=logging.INFO,
97 parser.add_argument("--debug", dest="loglevel", action="store_const",
98 const=logging.DEBUG, default=logging.INFO,
100 str_help = "Log file name"
101 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102 str_help = "Trailing part of the csv result files for plotting purposes"
103 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104 str_help = "Minimum number of updates to reach to include result into csv."
105 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107 parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
108 str_help = "How many play utilities are to be started."
109 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
110 arguments = parser.parse_args()
111 if arguments.multiplicity < 1:
112 print "Multiplicity", arguments.multiplicity, "is not positive."
114 # TODO: Are sanity checks (such as asnumber>=0) required?
118 def establish_connection(arguments):
119 """Establish connection to BGP peer.
122 :arguments: following command-line argumets are used
123 - arguments.myip: local IP address
124 - arguments.myport: local port
125 - arguments.peerip: remote IP address
126 - arguments.peerport: remote port
131 logger.info("Connecting in the listening mode.")
132 logger.debug("Local IP address: " + str(arguments.myip))
133 logger.debug("Local port: " + str(arguments.myport))
134 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
135 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
136 # bind need single tuple as argument
137 listening_socket.bind((str(arguments.myip), arguments.myport))
138 listening_socket.listen(1)
139 bgp_socket, _ = listening_socket.accept()
140 # TODO: Verify client IP is cotroller IP.
141 listening_socket.close()
143 logger.info("Connecting in the talking mode.")
144 logger.debug("Local IP address: " + str(arguments.myip))
145 logger.debug("Local port: " + str(arguments.myport))
146 logger.debug("Remote IP address: " + str(arguments.peerip))
147 logger.debug("Remote port: " + str(arguments.peerport))
148 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
150 # bind to force specified address and port
151 talking_socket.bind((str(arguments.myip), arguments.myport))
152 # socket does not spead ipaddr, hence str()
153 talking_socket.connect((str(arguments.peerip), arguments.peerport))
154 bgp_socket = talking_socket
155 logger.info("Connected to ODL.")
159 def get_short_int_from_message(message, offset=16):
160 """Extract 2-bytes number from provided message.
163 :message: given message
164 :offset: offset of the short_int inside the message
166 :return: required short_inf value.
168 default offset value is the BGP message size offset.
170 high_byte_int = ord(message[offset])
171 low_byte_int = ord(message[offset + 1])
172 short_int = high_byte_int * 256 + low_byte_int
176 def get_prefix_list_from_hex(prefixes_hex):
177 """Get decoded list of prefixes (rfc4271#section-4.3)
180 :prefixes_hex: list of prefixes to be decoded in hex
182 :return: list of prefixes in the form of ip address (X.X.X.X/X)
186 while offset < len(prefixes_hex):
187 prefix_bit_len_hex = prefixes_hex[offset]
188 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
189 prefix_len = ((prefix_bit_len - 1) / 8) + 1
190 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
191 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
192 offset += 1 + prefix_len
193 prefix_list.append(prefix + "/" + str(prefix_bit_len))
197 class MessageError(ValueError):
198 """Value error with logging optimized for hexlified messages."""
200 def __init__(self, text, message, *args):
203 Store and call super init for textual comment,
204 store raw message which caused it.
208 super(MessageError, self).__init__(text, message, *args)
211 """Generate human readable error message.
214 :return: human readable message as string
216 Use a placeholder string if the message is to be empty.
218 message = binascii.hexlify(self.msg)
220 message = "(empty message)"
221 return self.text + ": " + message
224 def read_open_message(bgp_socket):
225 """Receive peer's OPEN message
228 :bgp_socket: the socket to be read
230 :return: received OPEN message.
232 Performs just basic incomming message checks
234 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
235 # TODO: Can the incoming open message be split in more than one packet?
238 # 37 is minimal length of open message with 4-byte AS number.
239 logger.error("Got something else than open with 4-byte AS number: " +
240 binascii.hexlify(msg_in))
241 raise MessageError("Got something else than open with 4-byte AS number", msg_in)
242 # TODO: We could check BGP marker, but it is defined only later;
244 reported_length = get_short_int_from_message(msg_in)
245 if len(msg_in) != reported_length:
246 logger.error("Message length is not " + str(reported_length) +
247 " as stated in " + binascii.hexlify(msg_in))
248 raise MessageError("Message length is not " + reported_length +
249 " as stated in ", msg_in)
250 logger.info("Open message received.")
254 class MessageGenerator(object):
255 """Class which generates messages, holds states and configuration values."""
257 # TODO: Define bgp marker as a class (constant) variable.
258 def __init__(self, args):
259 """Initialisation according to command-line args.
262 :args: argsparser's Namespace object which contains command-line
263 options for MesageGenerator initialisation
265 Calculates and stores default values used later on for
268 self.total_prefix_amount = args.amount
269 # Number of update messages left to be sent.
270 self.remaining_prefixes = self.total_prefix_amount
272 # New parameters initialisation
274 self.prefix_base_default = args.firstprefix
275 self.prefix_length_default = args.prefixlen
276 self.wr_prefixes_default = []
277 self.nlri_prefixes_default = []
278 self.version_default = 4
279 self.my_autonomous_system_default = args.asnumber
280 self.hold_time_default = args.holdtime # Local hold time.
281 self.bgp_identifier_default = int(args.myip)
282 self.next_hop_default = args.nexthop
283 self.originator_id_default = args.originator
284 self.cluster_list_item_default = args.cluster
285 self.single_update_default = args.updates == "single"
286 self.randomize_updates_default = args.updates == "random"
287 self.prefix_count_to_add_default = args.insert
288 self.prefix_count_to_del_default = args.withdraw
289 if self.prefix_count_to_del_default < 0:
290 self.prefix_count_to_del_default = 0
291 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
292 # total number of prefixes must grow to avoid infinite test loop
293 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
294 self.slot_size_default = self.prefix_count_to_add_default
295 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
296 self.results_file_name_default = args.results
297 self.performance_threshold_default = args.threshold
298 self.rfc4760 = args.rfc4760 == "yes"
299 # Default values used for randomized part
300 s1_slots = ((self.total_prefix_amount -
301 self.remaining_prefixes_threshold - 1) /
302 self.prefix_count_to_add_default + 1)
303 s2_slots = ((self.remaining_prefixes_threshold - 1) /
304 (self.prefix_count_to_add_default -
305 self.prefix_count_to_del_default) + 1)
307 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
308 s2_first_index = s1_slots * self.prefix_count_to_add_default
309 s2_last_index = (s2_first_index +
310 s2_slots * (self.prefix_count_to_add_default -
311 self.prefix_count_to_del_default) - 1)
312 self.slot_gap_default = ((self.total_prefix_amount -
313 self.remaining_prefixes_threshold - 1) /
314 self.prefix_count_to_add_default + 1)
315 self.randomize_lowest_default = s2_first_index
316 self.randomize_highest_default = s2_last_index
318 # Initialising counters
319 self.phase1_start_time = 0
320 self.phase1_stop_time = 0
321 self.phase2_start_time = 0
322 self.phase2_stop_time = 0
323 self.phase1_updates_sent = 0
324 self.phase2_updates_sent = 0
325 self.updates_sent = 0
327 self.log_info = args.loglevel <= logging.INFO
328 self.log_debug = args.loglevel <= logging.DEBUG
330 Flags needed for the MessageGenerator performance optimization.
331 Calling logger methods each iteration even with proper log level set
332 slows down significantly the MessageGenerator performance.
333 Measured total generation time (1M updates, dry run, error log level):
334 - logging based on basic logger features: 36,2s
335 - logging based on advanced logger features (lazy logging): 21,2s
336 - conditional calling of logger methods enclosed inside condition: 8,6s
339 logger.info("Generator initialisation")
340 logger.info(" Target total number of prefixes to be introduced: " +
341 str(self.total_prefix_amount))
342 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
343 str(self.prefix_length_default))
344 logger.info(" My Autonomous System number: " +
345 str(self.my_autonomous_system_default))
346 logger.info(" My Hold Time: " + str(self.hold_time_default))
347 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
348 logger.info(" Next Hop: " + str(self.next_hop_default))
349 logger.info(" Originator ID: " + str(self.originator_id_default))
350 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
351 logger.info(" Prefix count to be inserted at once: " +
352 str(self.prefix_count_to_add_default))
353 logger.info(" Prefix count to be withdrawn at once: " +
354 str(self.prefix_count_to_del_default))
355 logger.info(" Fast pre-fill up to " +
356 str(self.total_prefix_amount -
357 self.remaining_prefixes_threshold) + " prefixes")
358 logger.info(" Remaining number of prefixes to be processed " +
359 "in parallel with withdrawals: " +
360 str(self.remaining_prefixes_threshold))
361 logger.debug(" Prefix index range used after pre-fill procedure [" +
362 str(self.randomize_lowest_default) + ", " +
363 str(self.randomize_highest_default) + "]")
364 if self.single_update_default:
365 logger.info(" Common single UPDATE will be generated " +
366 "for both NLRI & WITHDRAWN lists")
368 logger.info(" Two separate UPDATEs will be generated " +
369 "for each NLRI & WITHDRAWN lists")
370 if self.randomize_updates_default:
371 logger.info(" Generation of UPDATE messages will be randomized")
372 logger.info(" Let\'s go ...\n")
374 # TODO: Notification for hold timer expiration can be handy.
376 def store_results(self, file_name=None, threshold=None):
377 """ Stores specified results into files based on file_name value.
380 :param file_name: Trailing (common) part of result file names
381 :param threshold: Minimum number of sent updates needed for each
382 result to be included into result csv file
383 (mainly needed because of the result accuracy)
387 # default values handling
388 # TODO optimize default values handling (use e.g. dicionary.update() approach)
389 if file_name is None:
390 file_name = self.results_file_name_default
391 if threshold is None:
392 threshold = self.performance_threshold_default
393 # performance calculation
394 if self.phase1_updates_sent >= threshold:
395 totals1 = self.phase1_updates_sent
396 performance1 = int(self.phase1_updates_sent /
397 (self.phase1_stop_time - self.phase1_start_time))
401 if self.phase2_updates_sent >= threshold:
402 totals2 = self.phase2_updates_sent
403 performance2 = int(self.phase2_updates_sent /
404 (self.phase2_stop_time - self.phase2_start_time))
409 logger.info("#" * 10 + " Final results " + "#" * 10)
410 logger.info("Number of iterations: " + str(self.iteration))
411 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
412 str(self.phase1_updates_sent))
413 logger.info("The pre-fill phase duration: " +
414 str(self.phase1_stop_time - self.phase1_start_time) + "s")
415 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
416 str(self.phase2_updates_sent))
417 logger.info("The 2nd test phase duration: " +
418 str(self.phase2_stop_time - self.phase2_start_time) + "s")
419 logger.info("Threshold for performance reporting: " + str(threshold))
422 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
423 " route(s) per UPDATE")
424 if self.single_update_default:
425 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
426 "/-" + str(self.prefix_count_to_del_default) +
427 " routes per UPDATE")
429 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
430 "/-" + str(self.prefix_count_to_del_default) +
431 " routes in two UPDATEs")
432 # collecting capacity and performance results
435 if totals1 is not None:
436 totals[phase1_label] = totals1
437 performance[phase1_label] = performance1
438 if totals2 is not None:
439 totals[phase2_label] = totals2
440 performance[phase2_label] = performance2
441 self.write_results_to_file(totals, "totals-" + file_name)
442 self.write_results_to_file(performance, "performance-" + file_name)
444 def write_results_to_file(self, results, file_name):
445 """Writes results to the csv plot file consumable by Jenkins.
448 :param file_name: Name of the (csv) file to be created
454 f = open(file_name, "wt")
456 for key in sorted(results):
457 first_line += key + ", "
458 second_line += str(results[key]) + ", "
459 first_line = first_line[:-2]
460 second_line = second_line[:-2]
461 f.write(first_line + "\n")
462 f.write(second_line + "\n")
463 logger.info("Message generator performance results stored in " +
465 logger.info(" " + first_line)
466 logger.info(" " + second_line)
470 # Return pseudo-randomized (reproducible) index for selected range
471 def randomize_index(self, index, lowest=None, highest=None):
472 """Calculates pseudo-randomized index from selected range.
475 :param index: input index
476 :param lowest: the lowes index from the randomized area
477 :param highest: the highest index from the randomized area
479 :return: the (pseudo)randomized index
481 Created just as a fame for future generator enhancement.
483 # default values handling
484 # TODO optimize default values handling (use e.g. dicionary.update() approach)
486 lowest = self.randomize_lowest_default
488 highest = self.randomize_highest_default
490 if (index >= lowest) and (index <= highest):
491 # we are in the randomized range -> shuffle it inside
492 # the range (now just reverse the order)
493 new_index = highest - (index - lowest)
495 # we are out of the randomized range -> nothing to do
499 # Get list of prefixes
500 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
501 prefix_len=None, prefix_count=None, randomize=None):
502 """Generates list of IP address prefixes.
505 :param slot_index: index of group of prefix addresses
506 :param slot_size: size of group of prefix addresses
507 in [number of included prefixes]
508 :param prefix_base: IP address of the first prefix
509 (slot_index = 0, prefix_index = 0)
510 :param prefix_len: length of the prefix in bites
511 (the same as size of netmask)
512 :param prefix_count: number of prefixes to be returned
513 from the specified slot
515 :return: list of generated IP address prefixes
517 # default values handling
518 # TODO optimize default values handling (use e.g. dicionary.update() approach)
519 if slot_size is None:
520 slot_size = self.slot_size_default
521 if prefix_base is None:
522 prefix_base = self.prefix_base_default
523 if prefix_len is None:
524 prefix_len = self.prefix_length_default
525 if prefix_count is None:
526 prefix_count = slot_size
527 if randomize is None:
528 randomize = self.randomize_updates_default
529 # generating list of prefixes
532 prefix_gap = 2 ** (32 - prefix_len)
533 for i in range(prefix_count):
534 prefix_index = slot_index * slot_size + i
536 prefix_index = self.randomize_index(prefix_index)
537 indexes.append(prefix_index)
538 prefixes.append(prefix_base + prefix_index * prefix_gap)
540 logger.debug(" Prefix slot index: " + str(slot_index))
541 logger.debug(" Prefix slot size: " + str(slot_size))
542 logger.debug(" Prefix count: " + str(prefix_count))
543 logger.debug(" Prefix indexes: " + str(indexes))
544 logger.debug(" Prefix list: " + str(prefixes))
547 def compose_update_message(self, prefix_count_to_add=None,
548 prefix_count_to_del=None):
549 """Composes an UPDATE message
552 :param prefix_count_to_add: # of prefixes to put into NLRI list
553 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
555 :return: encoded UPDATE message in HEX
557 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
558 lists or common message wich includes both prefix lists.
559 Updates global counters.
561 # default values handling
562 # TODO optimize default values handling (use e.g. dicionary.update() approach)
563 if prefix_count_to_add is None:
564 prefix_count_to_add = self.prefix_count_to_add_default
565 if prefix_count_to_del is None:
566 prefix_count_to_del = self.prefix_count_to_del_default
568 if self.log_info and not (self.iteration % 1000):
569 logger.info("Iteration: " + str(self.iteration) +
570 " - total remaining prefixes: " +
571 str(self.remaining_prefixes))
573 logger.debug("#" * 10 + " Iteration: " +
574 str(self.iteration) + " " + "#" * 10)
575 logger.debug("Remaining prefixes: " +
576 str(self.remaining_prefixes))
577 # scenario type & one-shot counter
578 straightforward_scenario = (self.remaining_prefixes >
579 self.remaining_prefixes_threshold)
580 if straightforward_scenario:
581 prefix_count_to_del = 0
583 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
584 if not self.phase1_start_time:
585 self.phase1_start_time = time.time()
588 logger.debug("--- COMBINED SCENARIO ---")
589 if not self.phase2_start_time:
590 self.phase2_start_time = time.time()
591 # tailor the number of prefixes if needed
592 prefix_count_to_add = (prefix_count_to_del +
593 min(prefix_count_to_add - prefix_count_to_del,
594 self.remaining_prefixes))
595 # prefix slots selection for insertion and withdrawal
596 slot_index_to_add = self.iteration
597 slot_index_to_del = slot_index_to_add - self.slot_gap_default
598 # getting lists of prefixes for insertion in this iteration
600 logger.debug("Prefixes to be inserted in this iteration:")
601 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
602 prefix_count=prefix_count_to_add)
603 # getting lists of prefixes for withdrawal in this iteration
605 logger.debug("Prefixes to be withdrawn in this iteration:")
606 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
607 prefix_count=prefix_count_to_del)
608 # generating the mesage
609 if self.single_update_default:
610 # Send prefixes to be introduced and withdrawn
611 # in one UPDATE message
612 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
613 nlri_prefixes=prefix_list_to_add)
615 # Send prefixes to be introduced and withdrawn
616 # in separate UPDATE messages (if needed)
617 msg_out = self.update_message(wr_prefixes=[],
618 nlri_prefixes=prefix_list_to_add)
619 if prefix_count_to_del:
620 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
622 # updating counters - who knows ... maybe I am last time here ;)
623 if straightforward_scenario:
624 self.phase1_stop_time = time.time()
625 self.phase1_updates_sent = self.updates_sent
627 self.phase2_stop_time = time.time()
628 self.phase2_updates_sent = (self.updates_sent -
629 self.phase1_updates_sent)
630 # updating totals for the next iteration
632 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
633 # returning the encoded message
636 # Section of message encoders
638 def open_message(self, version=None, my_autonomous_system=None,
639 hold_time=None, bgp_identifier=None):
640 """Generates an OPEN Message (rfc4271#section-4.2)
643 :param version: see the rfc4271#section-4.2
644 :param my_autonomous_system: see the rfc4271#section-4.2
645 :param hold_time: see the rfc4271#section-4.2
646 :param bgp_identifier: see the rfc4271#section-4.2
648 :return: encoded OPEN message in HEX
651 # default values handling
652 # TODO optimize default values handling (use e.g. dicionary.update() approach)
654 version = self.version_default
655 if my_autonomous_system is None:
656 my_autonomous_system = self.my_autonomous_system_default
657 if hold_time is None:
658 hold_time = self.hold_time_default
659 if bgp_identifier is None:
660 bgp_identifier = self.bgp_identifier_default
663 marker_hex = "\xFF" * 16
667 type_hex = struct.pack("B", type)
670 version_hex = struct.pack("B", version)
672 # my_autonomous_system
673 # AS_TRANS value, 23456 decadic.
674 my_autonomous_system_2_bytes = 23456
675 # AS number is mappable to 2 bytes
676 if my_autonomous_system < 65536:
677 my_autonomous_system_2_bytes = my_autonomous_system
678 my_autonomous_system_hex_2_bytes = struct.pack(">H",
679 my_autonomous_system)
682 hold_time_hex = struct.pack(">H", hold_time)
685 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
687 # Optional Parameters
688 optional_parameters_hex = ""
690 optional_parameter_hex = (
691 "\x02" # Param type ("Capability Ad")
692 "\x06" # Length (6 bytes)
693 "\x01" # Capability type (NLRI Unicast),
694 # see RFC 4760, secton 8
695 "\x04" # Capability value length
696 "\x00\x01" # AFI (Ipv4)
698 "\x01" # SAFI (Unicast)
700 optional_parameters_hex += optional_parameter_hex
702 optional_parameter_hex = (
703 "\x02" # Param type ("Capability Ad")
704 "\x06" # Length (6 bytes)
705 "\x41" # "32 bit AS Numbers Support"
706 # (see RFC 6793, section 3)
707 "\x04" # Capability value length
709 optional_parameter_hex += (
710 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
712 optional_parameters_hex += optional_parameter_hex
714 # Optional Parameters Length
715 optional_parameters_length = len(optional_parameters_hex)
716 optional_parameters_length_hex = struct.pack("B",
717 optional_parameters_length)
719 # Length (big-endian)
721 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
722 len(my_autonomous_system_hex_2_bytes) +
723 len(hold_time_hex) + len(bgp_identifier_hex) +
724 len(optional_parameters_length_hex) +
725 len(optional_parameters_hex)
727 length_hex = struct.pack(">H", length)
735 my_autonomous_system_hex_2_bytes +
738 optional_parameters_length_hex +
739 optional_parameters_hex
743 logger.debug("OPEN message encoding")
744 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
745 logger.debug(" Length=" + str(length) + " (0x" +
746 binascii.hexlify(length_hex) + ")")
747 logger.debug(" Type=" + str(type) + " (0x" +
748 binascii.hexlify(type_hex) + ")")
749 logger.debug(" Version=" + str(version) + " (0x" +
750 binascii.hexlify(version_hex) + ")")
751 logger.debug(" My Autonomous System=" +
752 str(my_autonomous_system_2_bytes) + " (0x" +
753 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
755 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
756 binascii.hexlify(hold_time_hex) + ")")
757 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
758 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
759 logger.debug(" Optional Parameters Length=" +
760 str(optional_parameters_length) + " (0x" +
761 binascii.hexlify(optional_parameters_length_hex) +
763 logger.debug(" Optional Parameters=0x" +
764 binascii.hexlify(optional_parameters_hex))
765 logger.debug("OPEN message encoded: 0x%s",
766 binascii.b2a_hex(message_hex))
770 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
771 wr_prefix_length=None, nlri_prefix_length=None,
772 my_autonomous_system=None, next_hop=None,
773 originator_id=None, cluster_list_item=None):
774 """Generates an UPDATE Message (rfc4271#section-4.3)
777 :param wr_prefixes: see the rfc4271#section-4.3
778 :param nlri_prefixes: see the rfc4271#section-4.3
779 :param wr_prefix_length: see the rfc4271#section-4.3
780 :param nlri_prefix_length: see the rfc4271#section-4.3
781 :param my_autonomous_system: see the rfc4271#section-4.3
782 :param next_hop: see the rfc4271#section-4.3
784 :return: encoded UPDATE message in HEX
787 # default values handling
788 # TODO optimize default values handling (use e.g. dicionary.update() approach)
789 if wr_prefixes is None:
790 wr_prefixes = self.wr_prefixes_default
791 if nlri_prefixes is None:
792 nlri_prefixes = self.nlri_prefixes_default
793 if wr_prefix_length is None:
794 wr_prefix_length = self.prefix_length_default
795 if nlri_prefix_length is None:
796 nlri_prefix_length = self.prefix_length_default
797 if my_autonomous_system is None:
798 my_autonomous_system = self.my_autonomous_system_default
800 next_hop = self.next_hop_default
801 if originator_id is None:
802 originator_id = self.originator_id_default
803 if cluster_list_item is None:
804 cluster_list_item = self.cluster_list_item_default
807 marker_hex = "\xFF" * 16
811 type_hex = struct.pack("B", type)
814 bytes = ((wr_prefix_length - 1) / 8) + 1
815 withdrawn_routes_hex = ""
816 for prefix in wr_prefixes:
817 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
818 struct.pack(">I", int(prefix))[:bytes])
819 withdrawn_routes_hex += withdrawn_route_hex
821 # Withdrawn Routes Length
822 withdrawn_routes_length = len(withdrawn_routes_hex)
823 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
825 # TODO: to replace hardcoded string by encoding?
827 path_attributes_hex = ""
828 if nlri_prefixes != []:
829 path_attributes_hex += (
830 "\x40" # Flags ("Well-Known")
831 "\x01" # Type (ORIGIN)
835 path_attributes_hex += (
836 "\x40" # Flags ("Well-Known")
837 "\x02" # Type (AS_PATH)
839 "\x02" # AS segment type (AS_SEQUENCE)
840 "\x01" # AS segment length (1)
842 my_as_hex = struct.pack(">I", my_autonomous_system)
843 path_attributes_hex += my_as_hex # AS segment (4 bytes)
844 path_attributes_hex += (
845 "\x40" # Flags ("Well-Known")
846 "\x03" # Type (NEXT_HOP)
849 next_hop_hex = struct.pack(">I", int(next_hop))
850 path_attributes_hex += (
851 next_hop_hex # IP address of the next hop (4 bytes)
853 if originator_id is not None:
854 path_attributes_hex += (
855 "\x80" # Flags ("Optional, non-transitive")
856 "\x09" # Type (ORIGINATOR_ID)
858 ) # ORIGINATOR_ID (4 bytes)
859 path_attributes_hex += struct.pack(">I", int(originator_id))
860 if cluster_list_item is not None:
861 path_attributes_hex += (
862 "\x80" # Flags ("Optional, non-transitive")
863 "\x09" # Type (CLUSTER_LIST)
865 ) # one CLUSTER_LIST item (4 bytes)
866 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
868 # Total Path Attributes Length
869 total_path_attributes_length = len(path_attributes_hex)
870 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
872 # Network Layer Reachability Information
873 bytes = ((nlri_prefix_length - 1) / 8) + 1
875 for prefix in nlri_prefixes:
876 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
877 struct.pack(">I", int(prefix))[:bytes])
878 nlri_hex += nlri_prefix_hex
880 # Length (big-endian)
882 len(marker_hex) + 2 + len(type_hex) +
883 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
884 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
886 length_hex = struct.pack(">H", length)
893 withdrawn_routes_length_hex +
894 withdrawn_routes_hex +
895 total_path_attributes_length_hex +
896 path_attributes_hex +
901 logger.debug("UPDATE message encoding")
902 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
903 logger.debug(" Length=" + str(length) + " (0x" +
904 binascii.hexlify(length_hex) + ")")
905 logger.debug(" Type=" + str(type) + " (0x" +
906 binascii.hexlify(type_hex) + ")")
907 logger.debug(" withdrawn_routes_length=" +
908 str(withdrawn_routes_length) + " (0x" +
909 binascii.hexlify(withdrawn_routes_length_hex) + ")")
910 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
911 str(wr_prefix_length) + " (0x" +
912 binascii.hexlify(withdrawn_routes_hex) + ")")
913 if total_path_attributes_length:
914 logger.debug(" Total Path Attributes Length=" +
915 str(total_path_attributes_length) + " (0x" +
916 binascii.hexlify(total_path_attributes_length_hex) + ")")
917 logger.debug(" Path Attributes=" + "(0x" +
918 binascii.hexlify(path_attributes_hex) + ")")
919 logger.debug(" Origin=IGP")
920 logger.debug(" AS path=" + str(my_autonomous_system))
921 logger.debug(" Next hop=" + str(next_hop))
922 if originator_id is not None:
923 logger.debug(" Originator id=" + str(originator_id))
924 if cluster_list_item is not None:
925 logger.debug(" Cluster list=" + str(cluster_list_item))
926 logger.debug(" Network Layer Reachability Information=" +
927 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
928 " (0x" + binascii.hexlify(nlri_hex) + ")")
929 logger.debug("UPDATE message encoded: 0x" +
930 binascii.b2a_hex(message_hex))
933 self.updates_sent += 1
934 # returning encoded message
937 def notification_message(self, error_code, error_subcode, data_hex=""):
938 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
941 :param error_code: see the rfc4271#section-4.5
942 :param error_subcode: see the rfc4271#section-4.5
943 :param data_hex: see the rfc4271#section-4.5
945 :return: encoded NOTIFICATION message in HEX
949 marker_hex = "\xFF" * 16
953 type_hex = struct.pack("B", type)
956 error_code_hex = struct.pack("B", error_code)
959 error_subcode_hex = struct.pack("B", error_subcode)
961 # Length (big-endian)
962 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
963 len(error_subcode_hex) + len(data_hex))
964 length_hex = struct.pack(">H", length)
966 # NOTIFICATION Message
977 logger.debug("NOTIFICATION message encoding")
978 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
979 logger.debug(" Length=" + str(length) + " (0x" +
980 binascii.hexlify(length_hex) + ")")
981 logger.debug(" Type=" + str(type) + " (0x" +
982 binascii.hexlify(type_hex) + ")")
983 logger.debug(" Error Code=" + str(error_code) + " (0x" +
984 binascii.hexlify(error_code_hex) + ")")
985 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
986 binascii.hexlify(error_subcode_hex) + ")")
987 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
988 logger.debug("NOTIFICATION message encoded: 0x%s",
989 binascii.b2a_hex(message_hex))
993 def keepalive_message(self):
994 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
997 :return: encoded KEEP ALIVE message in HEX
1001 marker_hex = "\xFF" * 16
1005 type_hex = struct.pack("B", type)
1007 # Length (big-endian)
1008 length = len(marker_hex) + 2 + len(type_hex)
1009 length_hex = struct.pack(">H", length)
1011 # KEEP ALIVE Message
1019 logger.debug("KEEP ALIVE message encoding")
1020 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1021 logger.debug(" Length=" + str(length) + " (0x" +
1022 binascii.hexlify(length_hex) + ")")
1023 logger.debug(" Type=" + str(type) + " (0x" +
1024 binascii.hexlify(type_hex) + ")")
1025 logger.debug("KEEP ALIVE message encoded: 0x%s",
1026 binascii.b2a_hex(message_hex))
1031 class TimeTracker(object):
1032 """Class for tracking timers, both for my keepalives and
1036 def __init__(self, msg_in):
1037 """Initialisation. based on defaults and OPEN message from peer.
1040 msg_in: the OPEN message received from peer.
1042 # Note: Relative time is always named timedelta, to stress that
1043 # the (non-delta) time is absolute.
1044 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1045 # Upper bound for being stuck in the same state, we should
1046 # at least report something before continuing.
1047 # Negotiate the hold timer by taking the smaller
1048 # of the 2 values (mine and the peer's).
1049 hold_timedelta = 180 # Not an attribute of self yet.
1050 # TODO: Make the default value configurable,
1051 # default value could mirror what peer said.
1052 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1053 if hold_timedelta > peer_hold_timedelta:
1054 hold_timedelta = peer_hold_timedelta
1055 if hold_timedelta != 0 and hold_timedelta < 3:
1056 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1057 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1058 self.hold_timedelta = hold_timedelta
1059 # If we do not hear from peer this long, we assume it has died.
1060 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1061 # Upper limit for duration between messages, to avoid being
1062 # declared to be dead.
1063 # The same as calling snapshot(), but also declares a field.
1064 self.snapshot_time = time.time()
1065 # Sometimes we need to store time. This is where to get
1066 # the value from afterwards. Time_keepalive may be too strict.
1067 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1068 # At this time point, peer will be declared dead.
1069 self.my_keepalive_time = None # to be set later
1070 # At this point, we should be sending keepalive message.
1073 """Store current time in instance data to use later."""
1074 # Read as time before something interesting was called.
1075 self.snapshot_time = time.time()
1077 def reset_peer_hold_time(self):
1078 """Move hold time to future as peer has just proven it still lives."""
1079 self.peer_hold_time = time.time() + self.hold_timedelta
1081 # Some methods could rely on self.snapshot_time, but it is better
1082 # to require user to provide it explicitly.
1083 def reset_my_keepalive_time(self, keepalive_time):
1084 """Calculate and set the next my KEEP ALIVE timeout time
1087 :keepalive_time: the initial value of the KEEP ALIVE timer
1089 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1091 def is_time_for_my_keepalive(self):
1092 """Check for my KEEP ALIVE timeout occurence"""
1093 if self.hold_timedelta == 0:
1095 return self.snapshot_time >= self.my_keepalive_time
1097 def get_next_event_time(self):
1098 """Set the time of the next expected or to be sent KEEP ALIVE"""
1099 if self.hold_timedelta == 0:
1100 return self.snapshot_time + 86400
1101 return min(self.my_keepalive_time, self.peer_hold_time)
1103 def check_peer_hold_time(self, snapshot_time):
1104 """Raise error if nothing was read from peer until specified time."""
1105 # Hold time = 0 means keepalive checking off.
1106 if self.hold_timedelta != 0:
1107 # time.time() may be too strict
1108 if snapshot_time > self.peer_hold_time:
1109 logger.error("Peer has overstepped the hold timer.")
1110 raise RuntimeError("Peer has overstepped the hold timer.")
1111 # TODO: Include hold_timedelta?
1112 # TODO: Add notification sending (attempt). That means
1113 # move to write tracker.
1116 class ReadTracker(object):
1117 """Class for tracking read of mesages chunk by chunk and
1121 def __init__(self, bgp_socket, timer):
1122 """The reader initialisation.
1125 bgp_socket: socket to be used for sending
1126 timer: timer to be used for scheduling
1128 # References to outside objects.
1129 self.socket = bgp_socket
1131 # BGP marker length plus length field length.
1132 self.header_length = 18
1133 # TODO: make it class (constant) attribute
1134 # Computation of where next chunk ends depends on whether
1135 # we are beyond length field.
1136 self.reading_header = True
1137 # Countdown towards next size computation.
1138 self.bytes_to_read = self.header_length
1139 # Incremental buffer for message under read.
1141 # Initialising counters
1142 self.updates_received = 0
1143 self.prefixes_introduced = 0
1144 self.prefixes_withdrawn = 0
1145 self.rx_idle_time = 0
1146 self.rx_activity_detected = True
1148 def read_message_chunk(self):
1149 """Read up to one message
1152 Currently it does not return anything.
1154 # TODO: We could return the whole message, currently not needed.
1155 # We assume the socket is readable.
1156 chunk_message = self.socket.recv(self.bytes_to_read)
1157 self.msg_in += chunk_message
1158 self.bytes_to_read -= len(chunk_message)
1159 # TODO: bytes_to_read < 0 is not possible, right?
1160 if not self.bytes_to_read:
1161 # Finished reading a logical block.
1162 if self.reading_header:
1163 # The logical block was a BGP header.
1164 # Now we know the size of the message.
1165 self.reading_header = False
1166 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1168 else: # We have finished reading the body of the message.
1169 # Peer has just proven it is still alive.
1170 self.timer.reset_peer_hold_time()
1171 # TODO: Do we want to count received messages?
1172 # This version ignores the received message.
1173 # TODO: Should we do validation and exit on anything
1174 # besides update or keepalive?
1175 # Prepare state for reading another message.
1176 message_type_hex = self.msg_in[self.header_length]
1177 if message_type_hex == "\x01":
1178 logger.info("OPEN message received: 0x%s",
1179 binascii.b2a_hex(self.msg_in))
1180 elif message_type_hex == "\x02":
1181 logger.debug("UPDATE message received: 0x%s",
1182 binascii.b2a_hex(self.msg_in))
1183 self.decode_update_message(self.msg_in)
1184 elif message_type_hex == "\x03":
1185 logger.info("NOTIFICATION message received: 0x%s",
1186 binascii.b2a_hex(self.msg_in))
1187 elif message_type_hex == "\x04":
1188 logger.info("KEEP ALIVE message received: 0x%s",
1189 binascii.b2a_hex(self.msg_in))
1191 logger.warning("Unexpected message received: 0x%s",
1192 binascii.b2a_hex(self.msg_in))
1194 self.reading_header = True
1195 self.bytes_to_read = self.header_length
1196 # We should not act upon peer_hold_time if we are reading
1197 # something right now.
1200 def decode_path_attributes(self, path_attributes_hex):
1201 """Decode the Path Attributes field (rfc4271#section-4.3)
1204 :path_attributes: path_attributes field to be decoded in hex
1208 hex_to_decode = path_attributes_hex
1210 while len(hex_to_decode):
1211 attr_flags_hex = hex_to_decode[0]
1212 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1213 # attr_optional_bit = attr_flags & 128
1214 # attr_transitive_bit = attr_flags & 64
1215 # attr_partial_bit = attr_flags & 32
1216 attr_extended_length_bit = attr_flags & 16
1218 attr_type_code_hex = hex_to_decode[1]
1219 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1221 if attr_extended_length_bit:
1222 attr_length_hex = hex_to_decode[2:4]
1223 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1224 attr_value_hex = hex_to_decode[4:4 + attr_length]
1225 hex_to_decode = hex_to_decode[4 + attr_length:]
1227 attr_length_hex = hex_to_decode[2]
1228 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1229 attr_value_hex = hex_to_decode[3:3 + attr_length]
1230 hex_to_decode = hex_to_decode[3 + attr_length:]
1232 if attr_type_code == 1:
1233 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1234 binascii.b2a_hex(attr_flags_hex))
1235 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1236 elif attr_type_code == 2:
1237 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1238 binascii.b2a_hex(attr_flags_hex))
1239 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1240 elif attr_type_code == 3:
1241 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1242 binascii.b2a_hex(attr_flags_hex))
1243 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1244 elif attr_type_code == 4:
1245 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1246 binascii.b2a_hex(attr_flags_hex))
1247 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1248 elif attr_type_code == 5:
1249 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1250 binascii.b2a_hex(attr_flags_hex))
1251 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1252 elif attr_type_code == 6:
1253 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1254 binascii.b2a_hex(attr_flags_hex))
1255 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1256 elif attr_type_code == 7:
1257 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1258 binascii.b2a_hex(attr_flags_hex))
1259 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1260 elif attr_type_code == 9: # rfc4456#section-8
1261 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1262 binascii.b2a_hex(attr_flags_hex))
1263 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1264 elif attr_type_code == 10: # rfc4456#section-8
1265 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1266 binascii.b2a_hex(attr_flags_hex))
1267 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1268 elif attr_type_code == 14: # rfc4760#section-3
1269 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1270 binascii.b2a_hex(attr_flags_hex))
1271 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1272 address_family_identifier_hex = attr_value_hex[0:2]
1273 logger.debug(" Address Family Identifier=0x%s",
1274 binascii.b2a_hex(address_family_identifier_hex))
1275 subsequent_address_family_identifier_hex = attr_value_hex[2]
1276 logger.debug(" Subsequent Address Family Identifier=0x%s",
1277 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1278 next_hop_netaddr_len_hex = attr_value_hex[3]
1279 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1280 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1281 next_hop_netaddr_len,
1282 binascii.b2a_hex(next_hop_netaddr_len_hex))
1283 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1284 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1285 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1286 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1287 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1288 logger.debug(" Reserved=0x%s",
1289 binascii.b2a_hex(reserved_hex))
1290 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1291 logger.debug(" Network Layer Reachability Information=0x%s",
1292 binascii.b2a_hex(nlri_hex))
1293 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1294 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1295 for prefix in nlri_prefix_list:
1296 logger.debug(" nlri_prefix_received: %s", prefix)
1297 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1298 elif attr_type_code == 15: # rfc4760#section-4
1299 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1300 binascii.b2a_hex(attr_flags_hex))
1301 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1302 address_family_identifier_hex = attr_value_hex[0:2]
1303 logger.debug(" Address Family Identifier=0x%s",
1304 binascii.b2a_hex(address_family_identifier_hex))
1305 subsequent_address_family_identifier_hex = attr_value_hex[2]
1306 logger.debug(" Subsequent Address Family Identifier=0x%s",
1307 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1308 wd_hex = attr_value_hex[3:]
1309 logger.debug(" Withdrawn Routes=0x%s",
1310 binascii.b2a_hex(wd_hex))
1311 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1312 logger.debug(" Withdrawn routes prefix list: %s",
1314 for prefix in wdr_prefix_list:
1315 logger.debug(" withdrawn_prefix_received: %s", prefix)
1316 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1318 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1319 binascii.b2a_hex(attr_flags_hex))
1320 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1323 def decode_update_message(self, msg):
1324 """Decode an UPDATE message (rfc4271#section-4.3)
1327 :msg: message to be decoded in hex
1331 logger.debug("Decoding update message:")
1332 # message header - marker
1333 marker_hex = msg[:16]
1334 logger.debug("Message header marker: 0x%s",
1335 binascii.b2a_hex(marker_hex))
1336 # message header - message length
1337 msg_length_hex = msg[16:18]
1338 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1339 logger.debug("Message lenght: 0x%s (%s)",
1340 binascii.b2a_hex(msg_length_hex), msg_length)
1341 # message header - message type
1342 msg_type_hex = msg[18:19]
1343 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1345 logger.debug("Message type: 0x%s (update)",
1346 binascii.b2a_hex(msg_type_hex))
1347 # withdrawn routes length
1348 wdr_length_hex = msg[19:21]
1349 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1350 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1351 binascii.b2a_hex(wdr_length_hex), wdr_length)
1353 wdr_hex = msg[21:21 + wdr_length]
1354 logger.debug("Withdrawn routes: 0x%s",
1355 binascii.b2a_hex(wdr_hex))
1356 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1357 logger.debug("Withdrawn routes prefix list: %s",
1359 for prefix in wdr_prefix_list:
1360 logger.debug("withdrawn_prefix_received: %s", prefix)
1361 # total path attribute length
1362 total_pa_length_offset = 21 + wdr_length
1363 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1364 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1365 logger.debug("Total path attribute lenght: 0x%s (%s)",
1366 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1368 pa_offset = total_pa_length_offset + 2
1369 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1370 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1371 self.decode_path_attributes(pa_hex)
1372 # network layer reachability information length
1373 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1374 logger.debug("Calculated NLRI length: %s", nlri_length)
1375 # network layer reachability information
1376 nlri_offset = pa_offset + total_pa_length
1377 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1378 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1379 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1380 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1381 for prefix in nlri_prefix_list:
1382 logger.debug("nlri_prefix_received: %s", prefix)
1384 self.updates_received += 1
1385 self.prefixes_introduced += len(nlri_prefix_list)
1386 self.prefixes_withdrawn += len(wdr_prefix_list)
1388 logger.error("Unexpeced message type 0x%s in 0x%s",
1389 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1391 def wait_for_read(self):
1392 """Read message until timeout (next expected event).
1395 Used when no more updates has to be sent to avoid busy-wait.
1396 Currently it does not return anything.
1398 # Compute time to the first predictable state change
1399 event_time = self.timer.get_next_event_time()
1400 # snapshot_time would be imprecise
1401 wait_timedelta = min(event_time - time.time(), 10)
1402 if wait_timedelta < 0:
1403 # The program got around to waiting to an event in "very near
1404 # future" so late that it became a "past" event, thus tell
1405 # "select" to not wait at all. Passing negative timedelta to
1406 # select() would lead to either waiting forever (for -1) or
1407 # select.error("Invalid parameter") (for everything else).
1409 # And wait for event or something to read.
1411 if not self.rx_activity_detected or not (self.updates_received % 100):
1412 # right time to write statistics to the log (not for every update and
1413 # not too frequently to avoid having large log files)
1414 logger.info("total_received_update_message_counter: %s",
1415 self.updates_received)
1416 logger.info("total_received_nlri_prefix_counter: %s",
1417 self.prefixes_introduced)
1418 logger.info("total_received_withdrawn_prefix_counter: %s",
1419 self.prefixes_withdrawn)
1421 start_time = time.time()
1422 select.select([self.socket], [], [self.socket], wait_timedelta)
1423 timedelta = time.time() - start_time
1424 self.rx_idle_time += timedelta
1425 self.rx_activity_detected = timedelta < 1
1427 if not self.rx_activity_detected or not (self.updates_received % 100):
1428 # right time to write statistics to the log (not for every update and
1429 # not too frequently to avoid having large log files)
1430 logger.info("... idle for %.3fs", timedelta)
1431 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1435 class WriteTracker(object):
1436 """Class tracking enqueueing messages and sending chunks of them."""
1438 def __init__(self, bgp_socket, generator, timer):
1439 """The writter initialisation.
1442 bgp_socket: socket to be used for sending
1443 generator: generator to be used for message generation
1444 timer: timer to be used for scheduling
1446 # References to outside objects,
1447 self.socket = bgp_socket
1448 self.generator = generator
1450 # Really new fields.
1451 # TODO: Would attribute docstrings add anything substantial?
1452 self.sending_message = False
1453 self.bytes_to_send = 0
1456 def enqueue_message_for_sending(self, message):
1457 """Enqueue message and change state.
1460 message: message to be enqueued into the msg_out buffer
1462 self.msg_out += message
1463 self.bytes_to_send += len(message)
1464 self.sending_message = True
1466 def send_message_chunk_is_whole(self):
1467 """Send enqueued data from msg_out buffer
1470 :return: true if no remaining data to send
1472 # We assume there is a msg_out to send and socket is writable.
1473 # print "going to send", repr(self.msg_out)
1474 self.timer.snapshot()
1475 bytes_sent = self.socket.send(self.msg_out)
1476 # Forget the part of message that was sent.
1477 self.msg_out = self.msg_out[bytes_sent:]
1478 self.bytes_to_send -= bytes_sent
1479 if not self.bytes_to_send:
1480 # TODO: Is it possible to hit negative bytes_to_send?
1481 self.sending_message = False
1482 # We should have reset hold timer on peer side.
1483 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1484 # The possible reason for not prioritizing reads is gone.
1489 class StateTracker(object):
1490 """Main loop has state so complex it warrants this separate class."""
1492 def __init__(self, bgp_socket, generator, timer):
1493 """The state tracker initialisation.
1496 bgp_socket: socket to be used for sending / receiving
1497 generator: generator to be used for message generation
1498 timer: timer to be used for scheduling
1500 # References to outside objects.
1501 self.socket = bgp_socket
1502 self.generator = generator
1505 self.reader = ReadTracker(bgp_socket, timer)
1506 self.writer = WriteTracker(bgp_socket, generator, timer)
1507 # Prioritization state.
1508 self.prioritize_writing = False
1509 # In general, we prioritize reading over writing. But in order
1510 # not to get blocked by neverending reads, we should
1511 # check whether we are not risking running out of holdtime.
1512 # So in some situations, this field is set to True to attempt
1513 # finishing sending a message, after which this field resets
1515 # TODO: Alternative is to switch fairly between reading and
1516 # writing (called round robin from now on).
1517 # Message counting is done in generator.
1519 def perform_one_loop_iteration(self):
1520 """ The main loop iteration
1523 Calculates priority, resolves all conditions, calls
1524 appropriate method and returns to caller to repeat.
1526 self.timer.snapshot()
1527 if not self.prioritize_writing:
1528 if self.timer.is_time_for_my_keepalive():
1529 if not self.writer.sending_message:
1530 # We need to schedule a keepalive ASAP.
1531 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1532 logger.info("KEEP ALIVE is sent.")
1533 # We are sending a message now, so let's prioritize it.
1534 self.prioritize_writing = True
1535 # Now we know what our priorities are, we have to check
1536 # which actions are available.
1537 # socket.socket() returns three lists,
1538 # we store them to list of lists.
1539 list_list = select.select([self.socket], [self.socket], [self.socket],
1540 self.timer.report_timedelta)
1541 read_list, write_list, except_list = list_list
1542 # Lists are unpacked, each is either [] or [self.socket],
1543 # so we will test them as boolean.
1545 logger.error("Exceptional state on the socket.")
1546 raise RuntimeError("Exceptional state on socket", self.socket)
1547 # We will do either read or write.
1548 if not (self.prioritize_writing and write_list):
1549 # Either we have no reason to rush writes,
1550 # or the socket is not writable.
1551 # We are focusing on reading here.
1552 if read_list: # there is something to read indeed
1553 # In this case we want to read chunk of message
1554 # and repeat the select,
1555 self.reader.read_message_chunk()
1557 # We were focusing on reading, but nothing to read was there.
1558 # Good time to check peer for hold timer.
1559 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1560 # Quiet on the read front, we can have attempt to write.
1562 # Either we really want to reset peer's view of our hold
1563 # timer, or there was nothing to read.
1564 # Were we in the middle of sending a message?
1565 if self.writer.sending_message:
1566 # Was it the end of a message?
1567 whole = self.writer.send_message_chunk_is_whole()
1568 # We were pressed to send something and we did it.
1569 if self.prioritize_writing and whole:
1570 # We prioritize reading again.
1571 self.prioritize_writing = False
1573 # Finally to check if still update messages to be generated.
1574 if self.generator.remaining_prefixes:
1575 msg_out = self.generator.compose_update_message()
1576 if not self.generator.remaining_prefixes:
1577 # We have just finished update generation,
1578 # end-of-rib is due.
1579 logger.info("All update messages generated.")
1580 logger.info("Storing performance results.")
1581 self.generator.store_results()
1582 logger.info("Finally an END-OF-RIB is sent.")
1583 msg_out += self.generator.update_message(wr_prefixes=[],
1585 self.writer.enqueue_message_for_sending(msg_out)
1586 # Attempt for real sending to be done in next iteration.
1588 # Nothing to write anymore.
1589 # To avoid busy loop, we do idle waiting here.
1590 self.reader.wait_for_read()
1592 # We can neither read nor write.
1593 logger.warning("Input and output both blocked for " +
1594 str(self.timer.report_timedelta) + " seconds.")
1595 # FIXME: Are we sure select has been really waiting
1600 def create_logger(loglevel, logfile):
1601 """Create logger object
1604 :loglevel: log level
1605 :logfile: log file name
1607 :return: logger object
1609 logger = logging.getLogger("logger")
1610 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1611 console_handler = logging.StreamHandler()
1612 file_handler = logging.FileHandler(logfile, mode="w")
1613 console_handler.setFormatter(log_formatter)
1614 file_handler.setFormatter(log_formatter)
1615 logger.addHandler(console_handler)
1616 logger.addHandler(file_handler)
1617 logger.setLevel(loglevel)
1622 """One time initialisation and iterations looping.
1624 Establish BGP connection and run iterations.
1627 :arguments: Command line arguments
1631 bgp_socket = establish_connection(arguments)
1632 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1633 # Receive open message before sending anything.
1634 # FIXME: Add parameter to send default open message first,
1635 # to work with "you first" peers.
1636 msg_in = read_open_message(bgp_socket)
1637 timer = TimeTracker(msg_in)
1638 generator = MessageGenerator(arguments)
1639 msg_out = generator.open_message()
1640 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1641 # Send our open message to the peer.
1642 bgp_socket.send(msg_out)
1643 # Wait for confirming keepalive.
1644 # TODO: Surely in just one packet?
1645 # Using exact keepalive length to not to see possible updates.
1646 msg_in = bgp_socket.recv(19)
1647 if msg_in != generator.keepalive_message():
1648 logger.error("Open not confirmed by keepalive, instead got " +
1649 binascii.hexlify(msg_in))
1650 raise MessageError("Open not confirmed by keepalive, instead got",
1652 timer.reset_peer_hold_time()
1653 # Send the keepalive to indicate the connection is accepted.
1654 timer.snapshot() # Remember this time.
1655 msg_out = generator.keepalive_message()
1656 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1657 bgp_socket.send(msg_out)
1658 # Use the remembered time.
1659 timer.reset_my_keepalive_time(timer.snapshot_time)
1660 # End of initial handshake phase.
1661 state = StateTracker(bgp_socket, generator, timer)
1662 while True: # main reactor loop
1663 state.perform_one_loop_iteration()
1666 def threaded_job(arguments):
1667 """Run the job threaded
1670 :arguments: Command line arguments
1674 amount_left = arguments.amount
1675 utils_left = arguments.multiplicity
1676 prefix_current = arguments.firstprefix
1677 myip_current = arguments.myip
1681 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1682 amount_left -= amount_per_util
1685 args = deepcopy(arguments)
1686 args.amount = amount_per_util
1687 args.firstprefix = prefix_current
1688 args.myip = myip_current
1689 thread_args.append(args)
1693 prefix_current += amount_per_util * 16
1698 for t in thread_args:
1699 thread.start_new_thread(job, (t,))
1701 print "Error: unable to start thread."
1704 # Work remains forever
1709 if __name__ == "__main__":
1710 arguments = parse_arguments()
1711 logger = create_logger(arguments.loglevel, arguments.logfile)
1712 threaded_job(arguments)