1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
24 from copy import deepcopy
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
33 def parse_arguments():
34 """Use argparse to get arguments,
39 parser = argparse.ArgumentParser()
40 # TODO: Should we use --argument-names-with-spaces?
41 str_help = "Autonomous System number use in the stream."
42 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43 # FIXME: We are acting as iBGP peer,
44 # we should mirror AS number from peer's open message.
45 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46 parser.add_argument("--amount", default="1", type=int, help=str_help)
47 str_help = "Maximum number of IP prefixes to be announced in one iteration"
48 parser.add_argument("--insert", default="1", type=int, help=str_help)
49 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51 str_help = "The number of prefixes to process without withdrawals"
52 parser.add_argument("--prefill", default="0", type=int, help=str_help)
53 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54 parser.add_argument("--updates", choices=["single", "separate"],
55 default=["separate"], help=str_help)
56 str_help = "Base prefix IP address for prefix generation"
57 parser.add_argument("--firstprefix", default="8.0.1.0",
58 type=ipaddr.IPv4Address, help=str_help)
59 str_help = "The prefix length."
60 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61 str_help = "Listen for connection, instead of initiating it."
62 parser.add_argument("--listen", action="store_true", help=str_help)
63 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64 "Default value only suitable for listening.")
65 parser.add_argument("--myip", default="0.0.0.0",
66 type=ipaddr.IPv4Address, help=str_help)
67 str_help = ("TCP port to bind to when listening or initiating connection." +
68 "Default only suitable for initiating.")
69 parser.add_argument("--myport", default="0", type=int, help=str_help)
70 str_help = "The IP of the next hop to be placed into the update messages."
71 parser.add_argument("--nexthop", default="192.0.2.1",
72 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73 str_help = "Identifier of the route originator."
74 parser.add_argument("--originator", default=None,
75 type=ipaddr.IPv4Address, dest="originator", help=str_help)
76 str_help = "Cluster list item identifier."
77 parser.add_argument("--cluster", default=None,
78 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79 str_help = ("Numeric IP Address to try to connect to." +
80 "Currently no effect in listening mode.")
81 parser.add_argument("--peerip", default="127.0.0.2",
82 type=ipaddr.IPv4Address, help=str_help)
83 str_help = "TCP port to try to connect to. No effect in listening mode."
84 parser.add_argument("--peerport", default="179", type=int, help=str_help)
85 str_help = "Local hold time."
86 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87 str_help = "Log level (--error, --warning, --info, --debug)"
88 parser.add_argument("--error", dest="loglevel", action="store_const",
89 const=logging.ERROR, default=logging.INFO,
91 parser.add_argument("--warning", dest="loglevel", action="store_const",
92 const=logging.WARNING, default=logging.INFO,
94 parser.add_argument("--info", dest="loglevel", action="store_const",
95 const=logging.INFO, default=logging.INFO,
97 parser.add_argument("--debug", dest="loglevel", action="store_const",
98 const=logging.DEBUG, default=logging.INFO,
100 str_help = "Log file name"
101 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102 str_help = "Trailing part of the csv result files for plotting purposes"
103 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104 str_help = "Minimum number of updates to reach to include result into csv."
105 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107 parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
108 str_help = "How many play utilities are to be started."
109 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
110 arguments = parser.parse_args()
111 if arguments.multiplicity < 1:
112 print "Multiplicity", arguments.multiplicity, "is not positive."
114 # TODO: Are sanity checks (such as asnumber>=0) required?
118 def establish_connection(arguments):
119 """Establish connection to BGP peer.
122 :arguments: following command-line argumets are used
123 - arguments.myip: local IP address
124 - arguments.myport: local port
125 - arguments.peerip: remote IP address
126 - arguments.peerport: remote port
131 logger.info("Connecting in the listening mode.")
132 logger.debug("Local IP address: " + str(arguments.myip))
133 logger.debug("Local port: " + str(arguments.myport))
134 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
135 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
136 # bind need single tuple as argument
137 listening_socket.bind((str(arguments.myip), arguments.myport))
138 listening_socket.listen(1)
139 bgp_socket, _ = listening_socket.accept()
140 # TODO: Verify client IP is cotroller IP.
141 listening_socket.close()
143 logger.info("Connecting in the talking mode.")
144 logger.debug("Local IP address: " + str(arguments.myip))
145 logger.debug("Local port: " + str(arguments.myport))
146 logger.debug("Remote IP address: " + str(arguments.peerip))
147 logger.debug("Remote port: " + str(arguments.peerport))
148 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
150 # bind to force specified address and port
151 talking_socket.bind((str(arguments.myip), arguments.myport))
152 # socket does not spead ipaddr, hence str()
153 talking_socket.connect((str(arguments.peerip), arguments.peerport))
154 bgp_socket = talking_socket
155 logger.info("Connected to ODL.")
159 def get_short_int_from_message(message, offset=16):
160 """Extract 2-bytes number from provided message.
163 :message: given message
164 :offset: offset of the short_int inside the message
166 :return: required short_inf value.
168 default offset value is the BGP message size offset.
170 high_byte_int = ord(message[offset])
171 low_byte_int = ord(message[offset + 1])
172 short_int = high_byte_int * 256 + low_byte_int
176 def get_prefix_list_from_hex(prefixes_hex):
177 """Get decoded list of prefixes (rfc4271#section-4.3)
180 :prefixes_hex: list of prefixes to be decoded in hex
182 :return: list of prefixes in the form of ip address (X.X.X.X/X)
186 while offset < len(prefixes_hex):
187 prefix_bit_len_hex = prefixes_hex[offset]
188 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
189 prefix_len = ((prefix_bit_len - 1) / 8) + 1
190 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
191 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
192 offset += 1 + prefix_len
193 prefix_list.append(prefix + "/" + str(prefix_bit_len))
197 class MessageError(ValueError):
198 """Value error with logging optimized for hexlified messages."""
200 def __init__(self, text, message, *args):
203 Store and call super init for textual comment,
204 store raw message which caused it.
208 super(MessageError, self).__init__(text, message, *args)
211 """Generate human readable error message.
214 :return: human readable message as string
216 Use a placeholder string if the message is to be empty.
218 message = binascii.hexlify(self.msg)
220 message = "(empty message)"
221 return self.text + ": " + message
224 def read_open_message(bgp_socket):
225 """Receive peer's OPEN message
228 :bgp_socket: the socket to be read
230 :return: received OPEN message.
232 Performs just basic incomming message checks
234 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
235 # TODO: Can the incoming open message be split in more than one packet?
238 # 37 is minimal length of open message with 4-byte AS number.
239 logger.error("Got something else than open with 4-byte AS number: " +
240 binascii.hexlify(msg_in))
241 raise MessageError("Got something else than open with 4-byte AS number", msg_in)
242 # TODO: We could check BGP marker, but it is defined only later;
244 reported_length = get_short_int_from_message(msg_in)
245 if len(msg_in) != reported_length:
246 logger.error("Message length is not " + str(reported_length) +
247 " as stated in " + binascii.hexlify(msg_in))
248 raise MessageError("Message length is not " + reported_length +
249 " as stated in ", msg_in)
250 logger.info("Open message received.")
254 class MessageGenerator(object):
255 """Class which generates messages, holds states and configuration values."""
257 # TODO: Define bgp marker as a class (constant) variable.
258 def __init__(self, args):
259 """Initialisation according to command-line args.
262 :args: argsparser's Namespace object which contains command-line
263 options for MesageGenerator initialisation
265 Calculates and stores default values used later on for
268 self.total_prefix_amount = args.amount
269 # Number of update messages left to be sent.
270 self.remaining_prefixes = self.total_prefix_amount
272 # New parameters initialisation
274 self.prefix_base_default = args.firstprefix
275 self.prefix_length_default = args.prefixlen
276 self.wr_prefixes_default = []
277 self.nlri_prefixes_default = []
278 self.version_default = 4
279 self.my_autonomous_system_default = args.asnumber
280 self.hold_time_default = args.holdtime # Local hold time.
281 self.bgp_identifier_default = int(args.myip)
282 self.next_hop_default = args.nexthop
283 self.originator_id_default = args.originator
284 self.cluster_list_item_default = args.cluster
285 self.single_update_default = args.updates == "single"
286 self.randomize_updates_default = args.updates == "random"
287 self.prefix_count_to_add_default = args.insert
288 self.prefix_count_to_del_default = args.withdraw
289 if self.prefix_count_to_del_default < 0:
290 self.prefix_count_to_del_default = 0
291 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
292 # total number of prefixes must grow to avoid infinite test loop
293 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
294 self.slot_size_default = self.prefix_count_to_add_default
295 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
296 self.results_file_name_default = args.results
297 self.performance_threshold_default = args.threshold
298 self.rfc4760 = args.rfc4760 == "yes"
299 # Default values used for randomized part
300 s1_slots = ((self.total_prefix_amount -
301 self.remaining_prefixes_threshold - 1) /
302 self.prefix_count_to_add_default + 1)
303 s2_slots = ((self.remaining_prefixes_threshold - 1) /
304 (self.prefix_count_to_add_default -
305 self.prefix_count_to_del_default) + 1)
307 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
308 s2_first_index = s1_slots * self.prefix_count_to_add_default
309 s2_last_index = (s2_first_index +
310 s2_slots * (self.prefix_count_to_add_default -
311 self.prefix_count_to_del_default) - 1)
312 self.slot_gap_default = ((self.total_prefix_amount -
313 self.remaining_prefixes_threshold - 1) /
314 self.prefix_count_to_add_default + 1)
315 self.randomize_lowest_default = s2_first_index
316 self.randomize_highest_default = s2_last_index
318 # Initialising counters
319 self.phase1_start_time = 0
320 self.phase1_stop_time = 0
321 self.phase2_start_time = 0
322 self.phase2_stop_time = 0
323 self.phase1_updates_sent = 0
324 self.phase2_updates_sent = 0
325 self.updates_sent = 0
327 self.log_info = args.loglevel <= logging.INFO
328 self.log_debug = args.loglevel <= logging.DEBUG
330 Flags needed for the MessageGenerator performance optimization.
331 Calling logger methods each iteration even with proper log level set
332 slows down significantly the MessageGenerator performance.
333 Measured total generation time (1M updates, dry run, error log level):
334 - logging based on basic logger features: 36,2s
335 - logging based on advanced logger features (lazy logging): 21,2s
336 - conditional calling of logger methods enclosed inside condition: 8,6s
339 logger.info("Generator initialisation")
340 logger.info(" Target total number of prefixes to be introduced: " +
341 str(self.total_prefix_amount))
342 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
343 str(self.prefix_length_default))
344 logger.info(" My Autonomous System number: " +
345 str(self.my_autonomous_system_default))
346 logger.info(" My Hold Time: " + str(self.hold_time_default))
347 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
348 logger.info(" Next Hop: " + str(self.next_hop_default))
349 logger.info(" Originator ID: " + str(self.originator_id_default))
350 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
351 logger.info(" Prefix count to be inserted at once: " +
352 str(self.prefix_count_to_add_default))
353 logger.info(" Prefix count to be withdrawn at once: " +
354 str(self.prefix_count_to_del_default))
355 logger.info(" Fast pre-fill up to " +
356 str(self.total_prefix_amount -
357 self.remaining_prefixes_threshold) + " prefixes")
358 logger.info(" Remaining number of prefixes to be processed " +
359 "in parallel with withdrawals: " +
360 str(self.remaining_prefixes_threshold))
361 logger.debug(" Prefix index range used after pre-fill procedure [" +
362 str(self.randomize_lowest_default) + ", " +
363 str(self.randomize_highest_default) + "]")
364 if self.single_update_default:
365 logger.info(" Common single UPDATE will be generated " +
366 "for both NLRI & WITHDRAWN lists")
368 logger.info(" Two separate UPDATEs will be generated " +
369 "for each NLRI & WITHDRAWN lists")
370 if self.randomize_updates_default:
371 logger.info(" Generation of UPDATE messages will be randomized")
372 logger.info(" Let\'s go ...\n")
374 # TODO: Notification for hold timer expiration can be handy.
376 def store_results(self, file_name=None, threshold=None):
377 """ Stores specified results into files based on file_name value.
380 :param file_name: Trailing (common) part of result file names
381 :param threshold: Minimum number of sent updates needed for each
382 result to be included into result csv file
383 (mainly needed because of the result accuracy)
387 # default values handling
388 # TODO optimize default values handling (use e.g. dicionary.update() approach)
389 if file_name is None:
390 file_name = self.results_file_name_default
391 if threshold is None:
392 threshold = self.performance_threshold_default
393 # performance calculation
394 if self.phase1_updates_sent >= threshold:
395 totals1 = self.phase1_updates_sent
396 performance1 = int(self.phase1_updates_sent /
397 (self.phase1_stop_time - self.phase1_start_time))
401 if self.phase2_updates_sent >= threshold:
402 totals2 = self.phase2_updates_sent
403 performance2 = int(self.phase2_updates_sent /
404 (self.phase2_stop_time - self.phase2_start_time))
409 logger.info("#" * 10 + " Final results " + "#" * 10)
410 logger.info("Number of iterations: " + str(self.iteration))
411 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
412 str(self.phase1_updates_sent))
413 logger.info("The pre-fill phase duration: " +
414 str(self.phase1_stop_time - self.phase1_start_time) + "s")
415 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
416 str(self.phase2_updates_sent))
417 logger.info("The 2nd test phase duration: " +
418 str(self.phase2_stop_time - self.phase2_start_time) + "s")
419 logger.info("Threshold for performance reporting: " + str(threshold))
422 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
423 " route(s) per UPDATE")
424 if self.single_update_default:
425 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
426 "/-" + str(self.prefix_count_to_del_default) +
427 " routes per UPDATE")
429 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
430 "/-" + str(self.prefix_count_to_del_default) +
431 " routes in two UPDATEs")
432 # collecting capacity and performance results
435 if totals1 is not None:
436 totals[phase1_label] = totals1
437 performance[phase1_label] = performance1
438 if totals2 is not None:
439 totals[phase2_label] = totals2
440 performance[phase2_label] = performance2
441 self.write_results_to_file(totals, "totals-" + file_name)
442 self.write_results_to_file(performance, "performance-" + file_name)
444 def write_results_to_file(self, results, file_name):
445 """Writes results to the csv plot file consumable by Jenkins.
448 :param file_name: Name of the (csv) file to be created
454 f = open(file_name, "wt")
456 for key in sorted(results):
457 first_line += key + ", "
458 second_line += str(results[key]) + ", "
459 first_line = first_line[:-2]
460 second_line = second_line[:-2]
461 f.write(first_line + "\n")
462 f.write(second_line + "\n")
463 logger.info("Message generator performance results stored in " +
465 logger.info(" " + first_line)
466 logger.info(" " + second_line)
470 # Return pseudo-randomized (reproducible) index for selected range
471 def randomize_index(self, index, lowest=None, highest=None):
472 """Calculates pseudo-randomized index from selected range.
475 :param index: input index
476 :param lowest: the lowes index from the randomized area
477 :param highest: the highest index from the randomized area
479 :return: the (pseudo)randomized index
481 Created just as a fame for future generator enhancement.
483 # default values handling
484 # TODO optimize default values handling (use e.g. dicionary.update() approach)
486 lowest = self.randomize_lowest_default
488 highest = self.randomize_highest_default
490 if (index >= lowest) and (index <= highest):
491 # we are in the randomized range -> shuffle it inside
492 # the range (now just reverse the order)
493 new_index = highest - (index - lowest)
495 # we are out of the randomized range -> nothing to do
499 # Get list of prefixes
500 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
501 prefix_len=None, prefix_count=None, randomize=None):
502 """Generates list of IP address prefixes.
505 :param slot_index: index of group of prefix addresses
506 :param slot_size: size of group of prefix addresses
507 in [number of included prefixes]
508 :param prefix_base: IP address of the first prefix
509 (slot_index = 0, prefix_index = 0)
510 :param prefix_len: length of the prefix in bites
511 (the same as size of netmask)
512 :param prefix_count: number of prefixes to be returned
513 from the specified slot
515 :return: list of generated IP address prefixes
517 # default values handling
518 # TODO optimize default values handling (use e.g. dicionary.update() approach)
519 if slot_size is None:
520 slot_size = self.slot_size_default
521 if prefix_base is None:
522 prefix_base = self.prefix_base_default
523 if prefix_len is None:
524 prefix_len = self.prefix_length_default
525 if prefix_count is None:
526 prefix_count = slot_size
527 if randomize is None:
528 randomize = self.randomize_updates_default
529 # generating list of prefixes
532 prefix_gap = 2 ** (32 - prefix_len)
533 for i in range(prefix_count):
534 prefix_index = slot_index * slot_size + i
536 prefix_index = self.randomize_index(prefix_index)
537 indexes.append(prefix_index)
538 prefixes.append(prefix_base + prefix_index * prefix_gap)
540 logger.debug(" Prefix slot index: " + str(slot_index))
541 logger.debug(" Prefix slot size: " + str(slot_size))
542 logger.debug(" Prefix count: " + str(prefix_count))
543 logger.debug(" Prefix indexes: " + str(indexes))
544 logger.debug(" Prefix list: " + str(prefixes))
547 def compose_update_message(self, prefix_count_to_add=None,
548 prefix_count_to_del=None):
549 """Composes an UPDATE message
552 :param prefix_count_to_add: # of prefixes to put into NLRI list
553 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
555 :return: encoded UPDATE message in HEX
557 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
558 lists or common message wich includes both prefix lists.
559 Updates global counters.
561 # default values handling
562 # TODO optimize default values handling (use e.g. dicionary.update() approach)
563 if prefix_count_to_add is None:
564 prefix_count_to_add = self.prefix_count_to_add_default
565 if prefix_count_to_del is None:
566 prefix_count_to_del = self.prefix_count_to_del_default
568 if self.log_info and not (self.iteration % 1000):
569 logger.info("Iteration: " + str(self.iteration) +
570 " - total remaining prefixes: " +
571 str(self.remaining_prefixes))
573 logger.debug("#" * 10 + " Iteration: " +
574 str(self.iteration) + " " + "#" * 10)
575 logger.debug("Remaining prefixes: " +
576 str(self.remaining_prefixes))
577 # scenario type & one-shot counter
578 straightforward_scenario = (self.remaining_prefixes >
579 self.remaining_prefixes_threshold)
580 if straightforward_scenario:
581 prefix_count_to_del = 0
583 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
584 if not self.phase1_start_time:
585 self.phase1_start_time = time.time()
588 logger.debug("--- COMBINED SCENARIO ---")
589 if not self.phase2_start_time:
590 self.phase2_start_time = time.time()
591 # tailor the number of prefixes if needed
592 prefix_count_to_add = (prefix_count_to_del +
593 min(prefix_count_to_add - prefix_count_to_del,
594 self.remaining_prefixes))
595 # prefix slots selection for insertion and withdrawal
596 slot_index_to_add = self.iteration
597 slot_index_to_del = slot_index_to_add - self.slot_gap_default
598 # getting lists of prefixes for insertion in this iteration
600 logger.debug("Prefixes to be inserted in this iteration:")
601 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
602 prefix_count=prefix_count_to_add)
603 # getting lists of prefixes for withdrawal in this iteration
605 logger.debug("Prefixes to be withdrawn in this iteration:")
606 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
607 prefix_count=prefix_count_to_del)
608 # generating the mesage
609 if self.single_update_default:
610 # Send prefixes to be introduced and withdrawn
611 # in one UPDATE message
612 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
613 nlri_prefixes=prefix_list_to_add)
615 # Send prefixes to be introduced and withdrawn
616 # in separate UPDATE messages (if needed)
617 msg_out = self.update_message(wr_prefixes=[],
618 nlri_prefixes=prefix_list_to_add)
619 if prefix_count_to_del:
620 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
622 # updating counters - who knows ... maybe I am last time here ;)
623 if straightforward_scenario:
624 self.phase1_stop_time = time.time()
625 self.phase1_updates_sent = self.updates_sent
627 self.phase2_stop_time = time.time()
628 self.phase2_updates_sent = (self.updates_sent -
629 self.phase1_updates_sent)
630 # updating totals for the next iteration
632 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
633 # returning the encoded message
636 # Section of message encoders
638 def open_message(self, version=None, my_autonomous_system=None,
639 hold_time=None, bgp_identifier=None):
640 """Generates an OPEN Message (rfc4271#section-4.2)
643 :param version: see the rfc4271#section-4.2
644 :param my_autonomous_system: see the rfc4271#section-4.2
645 :param hold_time: see the rfc4271#section-4.2
646 :param bgp_identifier: see the rfc4271#section-4.2
648 :return: encoded OPEN message in HEX
651 # default values handling
652 # TODO optimize default values handling (use e.g. dicionary.update() approach)
654 version = self.version_default
655 if my_autonomous_system is None:
656 my_autonomous_system = self.my_autonomous_system_default
657 if hold_time is None:
658 hold_time = self.hold_time_default
659 if bgp_identifier is None:
660 bgp_identifier = self.bgp_identifier_default
663 marker_hex = "\xFF" * 16
667 type_hex = struct.pack("B", type)
670 version_hex = struct.pack("B", version)
672 # my_autonomous_system
673 # AS_TRANS value, 23456 decadic.
674 my_autonomous_system_2_bytes = 23456
675 # AS number is mappable to 2 bytes
676 if my_autonomous_system < 65536:
677 my_autonomous_system_2_bytes = my_autonomous_system
678 my_autonomous_system_hex_2_bytes = struct.pack(">H",
679 my_autonomous_system)
682 hold_time_hex = struct.pack(">H", hold_time)
685 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
687 # Optional Parameters
688 optional_parameters_hex = ""
690 optional_parameter_hex = (
691 "\x02" # Param type ("Capability Ad")
692 "\x06" # Length (6 bytes)
693 "\x01" # Capability type (NLRI Unicast),
694 # see RFC 4760, secton 8
695 "\x04" # Capability value length
696 "\x00\x01" # AFI (Ipv4)
698 "\x01" # SAFI (Unicast)
700 optional_parameters_hex += optional_parameter_hex
702 optional_parameter_hex = (
703 "\x02" # Param type ("Capability Ad")
704 "\x06" # Length (6 bytes)
705 "\x41" # "32 bit AS Numbers Support"
706 # (see RFC 6793, section 3)
707 "\x04" # Capability value length
709 optional_parameter_hex += (
710 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
712 optional_parameters_hex += optional_parameter_hex
714 # Optional Parameters Length
715 optional_parameters_length = len(optional_parameters_hex)
716 optional_parameters_length_hex = struct.pack("B",
717 optional_parameters_length)
719 # Length (big-endian)
721 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
722 len(my_autonomous_system_hex_2_bytes) +
723 len(hold_time_hex) + len(bgp_identifier_hex) +
724 len(optional_parameters_length_hex) +
725 len(optional_parameters_hex)
727 length_hex = struct.pack(">H", length)
735 my_autonomous_system_hex_2_bytes +
738 optional_parameters_length_hex +
739 optional_parameters_hex
743 logger.debug("OPEN message encoding")
744 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
745 logger.debug(" Length=" + str(length) + " (0x" +
746 binascii.hexlify(length_hex) + ")")
747 logger.debug(" Type=" + str(type) + " (0x" +
748 binascii.hexlify(type_hex) + ")")
749 logger.debug(" Version=" + str(version) + " (0x" +
750 binascii.hexlify(version_hex) + ")")
751 logger.debug(" My Autonomous System=" +
752 str(my_autonomous_system_2_bytes) + " (0x" +
753 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
755 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
756 binascii.hexlify(hold_time_hex) + ")")
757 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
758 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
759 logger.debug(" Optional Parameters Length=" +
760 str(optional_parameters_length) + " (0x" +
761 binascii.hexlify(optional_parameters_length_hex) +
763 logger.debug(" Optional Parameters=0x" +
764 binascii.hexlify(optional_parameters_hex))
765 logger.debug("OPEN message encoded: 0x%s",
766 binascii.b2a_hex(message_hex))
770 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
771 wr_prefix_length=None, nlri_prefix_length=None,
772 my_autonomous_system=None, next_hop=None,
773 originator_id=None, cluster_list_item=None):
774 """Generates an UPDATE Message (rfc4271#section-4.3)
777 :param wr_prefixes: see the rfc4271#section-4.3
778 :param nlri_prefixes: see the rfc4271#section-4.3
779 :param wr_prefix_length: see the rfc4271#section-4.3
780 :param nlri_prefix_length: see the rfc4271#section-4.3
781 :param my_autonomous_system: see the rfc4271#section-4.3
782 :param next_hop: see the rfc4271#section-4.3
784 :return: encoded UPDATE message in HEX
787 # default values handling
788 # TODO optimize default values handling (use e.g. dicionary.update() approach)
789 if wr_prefixes is None:
790 wr_prefixes = self.wr_prefixes_default
791 if nlri_prefixes is None:
792 nlri_prefixes = self.nlri_prefixes_default
793 if wr_prefix_length is None:
794 wr_prefix_length = self.prefix_length_default
795 if nlri_prefix_length is None:
796 nlri_prefix_length = self.prefix_length_default
797 if my_autonomous_system is None:
798 my_autonomous_system = self.my_autonomous_system_default
800 next_hop = self.next_hop_default
801 if originator_id is None:
802 originator_id = self.originator_id_default
803 if cluster_list_item is None:
804 cluster_list_item = self.cluster_list_item_default
807 marker_hex = "\xFF" * 16
811 type_hex = struct.pack("B", type)
814 bytes = ((wr_prefix_length - 1) / 8) + 1
815 withdrawn_routes_hex = ""
816 for prefix in wr_prefixes:
817 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
818 struct.pack(">I", int(prefix))[:bytes])
819 withdrawn_routes_hex += withdrawn_route_hex
821 # Withdrawn Routes Length
822 withdrawn_routes_length = len(withdrawn_routes_hex)
823 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
825 # TODO: to replace hardcoded string by encoding?
827 path_attributes_hex = ""
828 if nlri_prefixes != []:
829 path_attributes_hex += (
830 "\x40" # Flags ("Well-Known")
831 "\x01" # Type (ORIGIN)
835 path_attributes_hex += (
836 "\x40" # Flags ("Well-Known")
837 "\x02" # Type (AS_PATH)
839 "\x02" # AS segment type (AS_SEQUENCE)
840 "\x01" # AS segment length (1)
842 my_as_hex = struct.pack(">I", my_autonomous_system)
843 path_attributes_hex += my_as_hex # AS segment (4 bytes)
844 path_attributes_hex += (
845 "\x40" # Flags ("Well-Known")
846 "\x03" # Type (NEXT_HOP)
849 next_hop_hex = struct.pack(">I", int(next_hop))
850 path_attributes_hex += (
851 next_hop_hex # IP address of the next hop (4 bytes)
853 if originator_id is not None:
854 path_attributes_hex += (
855 "\x80" # Flags ("Optional, non-transitive")
856 "\x09" # Type (ORIGINATOR_ID)
858 # ORIGINATOR_ID (4 bytes)
859 + struct.pack(">I", int(originator_id))
861 if cluster_list_item is not None:
862 path_attributes_hex += (
863 "\x80" # Flags ("Optional, non-transitive")
864 "\x09" # Type (CLUSTER_LIST)
866 # one CLUSTER_LIST item (4 bytes)
867 + struct.pack(">I", int(cluster_list_item))
870 # Total Path Attributes Length
871 total_path_attributes_length = len(path_attributes_hex)
872 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
874 # Network Layer Reachability Information
875 bytes = ((nlri_prefix_length - 1) / 8) + 1
877 for prefix in nlri_prefixes:
878 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
879 struct.pack(">I", int(prefix))[:bytes])
880 nlri_hex += nlri_prefix_hex
882 # Length (big-endian)
884 len(marker_hex) + 2 + len(type_hex) +
885 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
886 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
888 length_hex = struct.pack(">H", length)
895 withdrawn_routes_length_hex +
896 withdrawn_routes_hex +
897 total_path_attributes_length_hex +
898 path_attributes_hex +
903 logger.debug("UPDATE message encoding")
904 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
905 logger.debug(" Length=" + str(length) + " (0x" +
906 binascii.hexlify(length_hex) + ")")
907 logger.debug(" Type=" + str(type) + " (0x" +
908 binascii.hexlify(type_hex) + ")")
909 logger.debug(" withdrawn_routes_length=" +
910 str(withdrawn_routes_length) + " (0x" +
911 binascii.hexlify(withdrawn_routes_length_hex) + ")")
912 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
913 str(wr_prefix_length) + " (0x" +
914 binascii.hexlify(withdrawn_routes_hex) + ")")
915 if total_path_attributes_length:
916 logger.debug(" Total Path Attributes Length=" +
917 str(total_path_attributes_length) + " (0x" +
918 binascii.hexlify(total_path_attributes_length_hex) + ")")
919 logger.debug(" Path Attributes=" + "(0x" +
920 binascii.hexlify(path_attributes_hex) + ")")
921 logger.debug(" Origin=IGP")
922 logger.debug(" AS path=" + str(my_autonomous_system))
923 logger.debug(" Next hop=" + str(next_hop))
924 if originator_id is not None:
925 logger.debug(" Originator id=" + str(originator_id))
926 if cluster_list_item is not None:
927 logger.debug(" Cluster list=" + str(cluster_list_item))
928 logger.debug(" Network Layer Reachability Information=" +
929 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
930 " (0x" + binascii.hexlify(nlri_hex) + ")")
931 logger.debug("UPDATE message encoded: 0x" +
932 binascii.b2a_hex(message_hex))
935 self.updates_sent += 1
936 # returning encoded message
939 def notification_message(self, error_code, error_subcode, data_hex=""):
940 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
943 :param error_code: see the rfc4271#section-4.5
944 :param error_subcode: see the rfc4271#section-4.5
945 :param data_hex: see the rfc4271#section-4.5
947 :return: encoded NOTIFICATION message in HEX
951 marker_hex = "\xFF" * 16
955 type_hex = struct.pack("B", type)
958 error_code_hex = struct.pack("B", error_code)
961 error_subcode_hex = struct.pack("B", error_subcode)
963 # Length (big-endian)
964 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
965 len(error_subcode_hex) + len(data_hex))
966 length_hex = struct.pack(">H", length)
968 # NOTIFICATION Message
979 logger.debug("NOTIFICATION message encoding")
980 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
981 logger.debug(" Length=" + str(length) + " (0x" +
982 binascii.hexlify(length_hex) + ")")
983 logger.debug(" Type=" + str(type) + " (0x" +
984 binascii.hexlify(type_hex) + ")")
985 logger.debug(" Error Code=" + str(error_code) + " (0x" +
986 binascii.hexlify(error_code_hex) + ")")
987 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
988 binascii.hexlify(error_subcode_hex) + ")")
989 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
990 logger.debug("NOTIFICATION message encoded: 0x%s",
991 binascii.b2a_hex(message_hex))
995 def keepalive_message(self):
996 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
999 :return: encoded KEEP ALIVE message in HEX
1003 marker_hex = "\xFF" * 16
1007 type_hex = struct.pack("B", type)
1009 # Length (big-endian)
1010 length = len(marker_hex) + 2 + len(type_hex)
1011 length_hex = struct.pack(">H", length)
1013 # KEEP ALIVE Message
1021 logger.debug("KEEP ALIVE message encoding")
1022 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1023 logger.debug(" Length=" + str(length) + " (0x" +
1024 binascii.hexlify(length_hex) + ")")
1025 logger.debug(" Type=" + str(type) + " (0x" +
1026 binascii.hexlify(type_hex) + ")")
1027 logger.debug("KEEP ALIVE message encoded: 0x%s",
1028 binascii.b2a_hex(message_hex))
1033 class TimeTracker(object):
1034 """Class for tracking timers, both for my keepalives and
1038 def __init__(self, msg_in):
1039 """Initialisation. based on defaults and OPEN message from peer.
1042 msg_in: the OPEN message received from peer.
1044 # Note: Relative time is always named timedelta, to stress that
1045 # the (non-delta) time is absolute.
1046 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1047 # Upper bound for being stuck in the same state, we should
1048 # at least report something before continuing.
1049 # Negotiate the hold timer by taking the smaller
1050 # of the 2 values (mine and the peer's).
1051 hold_timedelta = 180 # Not an attribute of self yet.
1052 # TODO: Make the default value configurable,
1053 # default value could mirror what peer said.
1054 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1055 if hold_timedelta > peer_hold_timedelta:
1056 hold_timedelta = peer_hold_timedelta
1057 if hold_timedelta != 0 and hold_timedelta < 3:
1058 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1059 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1060 self.hold_timedelta = hold_timedelta
1061 # If we do not hear from peer this long, we assume it has died.
1062 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1063 # Upper limit for duration between messages, to avoid being
1064 # declared to be dead.
1065 # The same as calling snapshot(), but also declares a field.
1066 self.snapshot_time = time.time()
1067 # Sometimes we need to store time. This is where to get
1068 # the value from afterwards. Time_keepalive may be too strict.
1069 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1070 # At this time point, peer will be declared dead.
1071 self.my_keepalive_time = None # to be set later
1072 # At this point, we should be sending keepalive message.
1075 """Store current time in instance data to use later."""
1076 # Read as time before something interesting was called.
1077 self.snapshot_time = time.time()
1079 def reset_peer_hold_time(self):
1080 """Move hold time to future as peer has just proven it still lives."""
1081 self.peer_hold_time = time.time() + self.hold_timedelta
1083 # Some methods could rely on self.snapshot_time, but it is better
1084 # to require user to provide it explicitly.
1085 def reset_my_keepalive_time(self, keepalive_time):
1086 """Calculate and set the next my KEEP ALIVE timeout time
1089 :keepalive_time: the initial value of the KEEP ALIVE timer
1091 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1093 def is_time_for_my_keepalive(self):
1094 """Check for my KEEP ALIVE timeout occurence"""
1095 if self.hold_timedelta == 0:
1097 return self.snapshot_time >= self.my_keepalive_time
1099 def get_next_event_time(self):
1100 """Set the time of the next expected or to be sent KEEP ALIVE"""
1101 if self.hold_timedelta == 0:
1102 return self.snapshot_time + 86400
1103 return min(self.my_keepalive_time, self.peer_hold_time)
1105 def check_peer_hold_time(self, snapshot_time):
1106 """Raise error if nothing was read from peer until specified time."""
1107 # Hold time = 0 means keepalive checking off.
1108 if self.hold_timedelta != 0:
1109 # time.time() may be too strict
1110 if snapshot_time > self.peer_hold_time:
1111 logger.error("Peer has overstepped the hold timer.")
1112 raise RuntimeError("Peer has overstepped the hold timer.")
1113 # TODO: Include hold_timedelta?
1114 # TODO: Add notification sending (attempt). That means
1115 # move to write tracker.
1118 class ReadTracker(object):
1119 """Class for tracking read of mesages chunk by chunk and
1123 def __init__(self, bgp_socket, timer):
1124 """The reader initialisation.
1127 bgp_socket: socket to be used for sending
1128 timer: timer to be used for scheduling
1130 # References to outside objects.
1131 self.socket = bgp_socket
1133 # BGP marker length plus length field length.
1134 self.header_length = 18
1135 # TODO: make it class (constant) attribute
1136 # Computation of where next chunk ends depends on whether
1137 # we are beyond length field.
1138 self.reading_header = True
1139 # Countdown towards next size computation.
1140 self.bytes_to_read = self.header_length
1141 # Incremental buffer for message under read.
1143 # Initialising counters
1144 self.updates_received = 0
1145 self.prefixes_introduced = 0
1146 self.prefixes_withdrawn = 0
1147 self.rx_idle_time = 0
1148 self.rx_activity_detected = True
1150 def read_message_chunk(self):
1151 """Read up to one message
1154 Currently it does not return anything.
1156 # TODO: We could return the whole message, currently not needed.
1157 # We assume the socket is readable.
1158 chunk_message = self.socket.recv(self.bytes_to_read)
1159 self.msg_in += chunk_message
1160 self.bytes_to_read -= len(chunk_message)
1161 # TODO: bytes_to_read < 0 is not possible, right?
1162 if not self.bytes_to_read:
1163 # Finished reading a logical block.
1164 if self.reading_header:
1165 # The logical block was a BGP header.
1166 # Now we know the size of the message.
1167 self.reading_header = False
1168 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1170 else: # We have finished reading the body of the message.
1171 # Peer has just proven it is still alive.
1172 self.timer.reset_peer_hold_time()
1173 # TODO: Do we want to count received messages?
1174 # This version ignores the received message.
1175 # TODO: Should we do validation and exit on anything
1176 # besides update or keepalive?
1177 # Prepare state for reading another message.
1178 message_type_hex = self.msg_in[self.header_length]
1179 if message_type_hex == "\x01":
1180 logger.info("OPEN message received: 0x%s",
1181 binascii.b2a_hex(self.msg_in))
1182 elif message_type_hex == "\x02":
1183 logger.debug("UPDATE message received: 0x%s",
1184 binascii.b2a_hex(self.msg_in))
1185 self.decode_update_message(self.msg_in)
1186 elif message_type_hex == "\x03":
1187 logger.info("NOTIFICATION message received: 0x%s",
1188 binascii.b2a_hex(self.msg_in))
1189 elif message_type_hex == "\x04":
1190 logger.info("KEEP ALIVE message received: 0x%s",
1191 binascii.b2a_hex(self.msg_in))
1193 logger.warning("Unexpected message received: 0x%s",
1194 binascii.b2a_hex(self.msg_in))
1196 self.reading_header = True
1197 self.bytes_to_read = self.header_length
1198 # We should not act upon peer_hold_time if we are reading
1199 # something right now.
1202 def decode_path_attributes(self, path_attributes_hex):
1203 """Decode the Path Attributes field (rfc4271#section-4.3)
1206 :path_attributes: path_attributes field to be decoded in hex
1210 hex_to_decode = path_attributes_hex
1212 while len(hex_to_decode):
1213 attr_flags_hex = hex_to_decode[0]
1214 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1215 # attr_optional_bit = attr_flags & 128
1216 # attr_transitive_bit = attr_flags & 64
1217 # attr_partial_bit = attr_flags & 32
1218 attr_extended_length_bit = attr_flags & 16
1220 attr_type_code_hex = hex_to_decode[1]
1221 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1223 if attr_extended_length_bit:
1224 attr_length_hex = hex_to_decode[2:4]
1225 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1226 attr_value_hex = hex_to_decode[4:4 + attr_length]
1227 hex_to_decode = hex_to_decode[4 + attr_length:]
1229 attr_length_hex = hex_to_decode[2]
1230 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1231 attr_value_hex = hex_to_decode[3:3 + attr_length]
1232 hex_to_decode = hex_to_decode[3 + attr_length:]
1234 if attr_type_code == 1:
1235 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1236 binascii.b2a_hex(attr_flags_hex))
1237 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1238 elif attr_type_code == 2:
1239 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1240 binascii.b2a_hex(attr_flags_hex))
1241 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1242 elif attr_type_code == 3:
1243 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1244 binascii.b2a_hex(attr_flags_hex))
1245 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1246 elif attr_type_code == 4:
1247 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1248 binascii.b2a_hex(attr_flags_hex))
1249 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1250 elif attr_type_code == 5:
1251 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1252 binascii.b2a_hex(attr_flags_hex))
1253 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1254 elif attr_type_code == 6:
1255 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1256 binascii.b2a_hex(attr_flags_hex))
1257 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1258 elif attr_type_code == 7:
1259 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1260 binascii.b2a_hex(attr_flags_hex))
1261 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1262 elif attr_type_code == 9: # rfc4456#section-8
1263 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1264 binascii.b2a_hex(attr_flags_hex))
1265 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1266 elif attr_type_code == 10: # rfc4456#section-8
1267 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1268 binascii.b2a_hex(attr_flags_hex))
1269 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1270 elif attr_type_code == 14: # rfc4760#section-3
1271 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1272 binascii.b2a_hex(attr_flags_hex))
1273 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1274 address_family_identifier_hex = attr_value_hex[0:2]
1275 logger.debug(" Address Family Identifier=0x%s",
1276 binascii.b2a_hex(address_family_identifier_hex))
1277 subsequent_address_family_identifier_hex = attr_value_hex[2]
1278 logger.debug(" Subsequent Address Family Identifier=0x%s",
1279 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1280 next_hop_netaddr_len_hex = attr_value_hex[3]
1281 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1282 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1283 next_hop_netaddr_len,
1284 binascii.b2a_hex(next_hop_netaddr_len_hex))
1285 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1286 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1287 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1288 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1289 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1290 logger.debug(" Reserved=0x%s",
1291 binascii.b2a_hex(reserved_hex))
1292 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1293 logger.debug(" Network Layer Reachability Information=0x%s",
1294 binascii.b2a_hex(nlri_hex))
1295 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1296 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1297 for prefix in nlri_prefix_list:
1298 logger.debug(" nlri_prefix_received: %s", prefix)
1299 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1300 elif attr_type_code == 15: # rfc4760#section-4
1301 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1302 binascii.b2a_hex(attr_flags_hex))
1303 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1304 address_family_identifier_hex = attr_value_hex[0:2]
1305 logger.debug(" Address Family Identifier=0x%s",
1306 binascii.b2a_hex(address_family_identifier_hex))
1307 subsequent_address_family_identifier_hex = attr_value_hex[2]
1308 logger.debug(" Subsequent Address Family Identifier=0x%s",
1309 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1310 wd_hex = attr_value_hex[3:]
1311 logger.debug(" Withdrawn Routes=0x%s",
1312 binascii.b2a_hex(wd_hex))
1313 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1314 logger.debug(" Withdrawn routes prefix list: %s",
1316 for prefix in wdr_prefix_list:
1317 logger.debug(" withdrawn_prefix_received: %s", prefix)
1318 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1320 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1321 binascii.b2a_hex(attr_flags_hex))
1322 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1325 def decode_update_message(self, msg):
1326 """Decode an UPDATE message (rfc4271#section-4.3)
1329 :msg: message to be decoded in hex
1333 logger.debug("Decoding update message:")
1334 # message header - marker
1335 marker_hex = msg[:16]
1336 logger.debug("Message header marker: 0x%s",
1337 binascii.b2a_hex(marker_hex))
1338 # message header - message length
1339 msg_length_hex = msg[16:18]
1340 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1341 logger.debug("Message lenght: 0x%s (%s)",
1342 binascii.b2a_hex(msg_length_hex), msg_length)
1343 # message header - message type
1344 msg_type_hex = msg[18:19]
1345 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1347 logger.debug("Message type: 0x%s (update)",
1348 binascii.b2a_hex(msg_type_hex))
1349 # withdrawn routes length
1350 wdr_length_hex = msg[19:21]
1351 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1352 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1353 binascii.b2a_hex(wdr_length_hex), wdr_length)
1355 wdr_hex = msg[21:21 + wdr_length]
1356 logger.debug("Withdrawn routes: 0x%s",
1357 binascii.b2a_hex(wdr_hex))
1358 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1359 logger.debug("Withdrawn routes prefix list: %s",
1361 for prefix in wdr_prefix_list:
1362 logger.debug("withdrawn_prefix_received: %s", prefix)
1363 # total path attribute length
1364 total_pa_length_offset = 21 + wdr_length
1365 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset+2]
1366 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1367 logger.debug("Total path attribute lenght: 0x%s (%s)",
1368 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1370 pa_offset = total_pa_length_offset + 2
1371 pa_hex = msg[pa_offset:pa_offset+total_pa_length]
1372 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1373 self.decode_path_attributes(pa_hex)
1374 # network layer reachability information length
1375 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1376 logger.debug("Calculated NLRI length: %s", nlri_length)
1377 # network layer reachability information
1378 nlri_offset = pa_offset + total_pa_length
1379 nlri_hex = msg[nlri_offset:nlri_offset+nlri_length]
1380 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1381 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1382 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1383 for prefix in nlri_prefix_list:
1384 logger.debug("nlri_prefix_received: %s", prefix)
1386 self.updates_received += 1
1387 self.prefixes_introduced += len(nlri_prefix_list)
1388 self.prefixes_withdrawn += len(wdr_prefix_list)
1390 logger.error("Unexpeced message type 0x%s in 0x%s",
1391 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1393 def wait_for_read(self):
1394 """Read message until timeout (next expected event).
1397 Used when no more updates has to be sent to avoid busy-wait.
1398 Currently it does not return anything.
1400 # Compute time to the first predictable state change
1401 event_time = self.timer.get_next_event_time()
1402 # snapshot_time would be imprecise
1403 wait_timedelta = min(event_time - time.time(), 10)
1404 if wait_timedelta < 0:
1405 # The program got around to waiting to an event in "very near
1406 # future" so late that it became a "past" event, thus tell
1407 # "select" to not wait at all. Passing negative timedelta to
1408 # select() would lead to either waiting forever (for -1) or
1409 # select.error("Invalid parameter") (for everything else).
1411 # And wait for event or something to read.
1413 if not self.rx_activity_detected or not (self.updates_received % 100):
1414 # right time to write statistics to the log (not for every update and
1415 # not too frequently to avoid having large log files)
1416 logger.info("total_received_update_message_counter: %s",
1417 self.updates_received)
1418 logger.info("total_received_nlri_prefix_counter: %s",
1419 self.prefixes_introduced)
1420 logger.info("total_received_withdrawn_prefix_counter: %s",
1421 self.prefixes_withdrawn)
1423 start_time = time.time()
1424 select.select([self.socket], [], [self.socket], wait_timedelta)
1425 timedelta = time.time() - start_time
1426 self.rx_idle_time += timedelta
1427 self.rx_activity_detected = timedelta < 1
1429 if not self.rx_activity_detected or not (self.updates_received % 100):
1430 # right time to write statistics to the log (not for every update and
1431 # not too frequently to avoid having large log files)
1432 logger.info("... idle for %.3fs", timedelta)
1433 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1437 class WriteTracker(object):
1438 """Class tracking enqueueing messages and sending chunks of them."""
1440 def __init__(self, bgp_socket, generator, timer):
1441 """The writter initialisation.
1444 bgp_socket: socket to be used for sending
1445 generator: generator to be used for message generation
1446 timer: timer to be used for scheduling
1448 # References to outside objects,
1449 self.socket = bgp_socket
1450 self.generator = generator
1452 # Really new fields.
1453 # TODO: Would attribute docstrings add anything substantial?
1454 self.sending_message = False
1455 self.bytes_to_send = 0
1458 def enqueue_message_for_sending(self, message):
1459 """Enqueue message and change state.
1462 message: message to be enqueued into the msg_out buffer
1464 self.msg_out += message
1465 self.bytes_to_send += len(message)
1466 self.sending_message = True
1468 def send_message_chunk_is_whole(self):
1469 """Send enqueued data from msg_out buffer
1472 :return: true if no remaining data to send
1474 # We assume there is a msg_out to send and socket is writable.
1475 # print "going to send", repr(self.msg_out)
1476 self.timer.snapshot()
1477 bytes_sent = self.socket.send(self.msg_out)
1478 # Forget the part of message that was sent.
1479 self.msg_out = self.msg_out[bytes_sent:]
1480 self.bytes_to_send -= bytes_sent
1481 if not self.bytes_to_send:
1482 # TODO: Is it possible to hit negative bytes_to_send?
1483 self.sending_message = False
1484 # We should have reset hold timer on peer side.
1485 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1486 # The possible reason for not prioritizing reads is gone.
1491 class StateTracker(object):
1492 """Main loop has state so complex it warrants this separate class."""
1494 def __init__(self, bgp_socket, generator, timer):
1495 """The state tracker initialisation.
1498 bgp_socket: socket to be used for sending / receiving
1499 generator: generator to be used for message generation
1500 timer: timer to be used for scheduling
1502 # References to outside objects.
1503 self.socket = bgp_socket
1504 self.generator = generator
1507 self.reader = ReadTracker(bgp_socket, timer)
1508 self.writer = WriteTracker(bgp_socket, generator, timer)
1509 # Prioritization state.
1510 self.prioritize_writing = False
1511 # In general, we prioritize reading over writing. But in order
1512 # not to get blocked by neverending reads, we should
1513 # check whether we are not risking running out of holdtime.
1514 # So in some situations, this field is set to True to attempt
1515 # finishing sending a message, after which this field resets
1517 # TODO: Alternative is to switch fairly between reading and
1518 # writing (called round robin from now on).
1519 # Message counting is done in generator.
1521 def perform_one_loop_iteration(self):
1522 """ The main loop iteration
1525 Calculates priority, resolves all conditions, calls
1526 appropriate method and returns to caller to repeat.
1528 self.timer.snapshot()
1529 if not self.prioritize_writing:
1530 if self.timer.is_time_for_my_keepalive():
1531 if not self.writer.sending_message:
1532 # We need to schedule a keepalive ASAP.
1533 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1534 logger.info("KEEP ALIVE is sent.")
1535 # We are sending a message now, so let's prioritize it.
1536 self.prioritize_writing = True
1537 # Now we know what our priorities are, we have to check
1538 # which actions are available.
1539 # socket.socket() returns three lists,
1540 # we store them to list of lists.
1541 list_list = select.select([self.socket], [self.socket], [self.socket],
1542 self.timer.report_timedelta)
1543 read_list, write_list, except_list = list_list
1544 # Lists are unpacked, each is either [] or [self.socket],
1545 # so we will test them as boolean.
1547 logger.error("Exceptional state on the socket.")
1548 raise RuntimeError("Exceptional state on socket", self.socket)
1549 # We will do either read or write.
1550 if not (self.prioritize_writing and write_list):
1551 # Either we have no reason to rush writes,
1552 # or the socket is not writable.
1553 # We are focusing on reading here.
1554 if read_list: # there is something to read indeed
1555 # In this case we want to read chunk of message
1556 # and repeat the select,
1557 self.reader.read_message_chunk()
1559 # We were focusing on reading, but nothing to read was there.
1560 # Good time to check peer for hold timer.
1561 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1562 # Quiet on the read front, we can have attempt to write.
1564 # Either we really want to reset peer's view of our hold
1565 # timer, or there was nothing to read.
1566 # Were we in the middle of sending a message?
1567 if self.writer.sending_message:
1568 # Was it the end of a message?
1569 whole = self.writer.send_message_chunk_is_whole()
1570 # We were pressed to send something and we did it.
1571 if self.prioritize_writing and whole:
1572 # We prioritize reading again.
1573 self.prioritize_writing = False
1575 # Finally to check if still update messages to be generated.
1576 if self.generator.remaining_prefixes:
1577 msg_out = self.generator.compose_update_message()
1578 if not self.generator.remaining_prefixes:
1579 # We have just finished update generation,
1580 # end-of-rib is due.
1581 logger.info("All update messages generated.")
1582 logger.info("Storing performance results.")
1583 self.generator.store_results()
1584 logger.info("Finally an END-OF-RIB is sent.")
1585 msg_out += self.generator.update_message(wr_prefixes=[],
1587 self.writer.enqueue_message_for_sending(msg_out)
1588 # Attempt for real sending to be done in next iteration.
1590 # Nothing to write anymore.
1591 # To avoid busy loop, we do idle waiting here.
1592 self.reader.wait_for_read()
1594 # We can neither read nor write.
1595 logger.warning("Input and output both blocked for " +
1596 str(self.timer.report_timedelta) + " seconds.")
1597 # FIXME: Are we sure select has been really waiting
1602 def create_logger(loglevel, logfile):
1603 """Create logger object
1606 :loglevel: log level
1607 :logfile: log file name
1609 :return: logger object
1611 logger = logging.getLogger("logger")
1612 log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
1613 console_handler = logging.StreamHandler()
1614 file_handler = logging.FileHandler(logfile, mode="w")
1615 console_handler.setFormatter(log_formatter)
1616 file_handler.setFormatter(log_formatter)
1617 logger.addHandler(console_handler)
1618 logger.addHandler(file_handler)
1619 logger.setLevel(loglevel)
1624 """One time initialisation and iterations looping.
1626 Establish BGP connection and run iterations.
1629 :arguments: Command line arguments
1633 bgp_socket = establish_connection(arguments)
1634 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1635 # Receive open message before sending anything.
1636 # FIXME: Add parameter to send default open message first,
1637 # to work with "you first" peers.
1638 msg_in = read_open_message(bgp_socket)
1639 timer = TimeTracker(msg_in)
1640 generator = MessageGenerator(arguments)
1641 msg_out = generator.open_message()
1642 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1643 # Send our open message to the peer.
1644 bgp_socket.send(msg_out)
1645 # Wait for confirming keepalive.
1646 # TODO: Surely in just one packet?
1647 # Using exact keepalive length to not to see possible updates.
1648 msg_in = bgp_socket.recv(19)
1649 if msg_in != generator.keepalive_message():
1650 logger.error("Open not confirmed by keepalive, instead got " +
1651 binascii.hexlify(msg_in))
1652 raise MessageError("Open not confirmed by keepalive, instead got",
1654 timer.reset_peer_hold_time()
1655 # Send the keepalive to indicate the connection is accepted.
1656 timer.snapshot() # Remember this time.
1657 msg_out = generator.keepalive_message()
1658 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1659 bgp_socket.send(msg_out)
1660 # Use the remembered time.
1661 timer.reset_my_keepalive_time(timer.snapshot_time)
1662 # End of initial handshake phase.
1663 state = StateTracker(bgp_socket, generator, timer)
1664 while True: # main reactor loop
1665 state.perform_one_loop_iteration()
1668 def threaded_job(arguments):
1669 """Run the job threaded
1672 :arguments: Command line arguments
1676 amount_left = arguments.amount
1677 utils_left = arguments.multiplicity
1678 prefix_current = arguments.firstprefix
1679 myip_current = arguments.myip
1683 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1684 amount_left -= amount_per_util
1687 args = deepcopy(arguments)
1688 args.amount = amount_per_util
1689 args.firstprefix = prefix_current
1690 args.myip = myip_current
1691 thread_args.append(args)
1695 prefix_current += amount_per_util * 16
1700 for t in thread_args:
1701 thread.start_new_thread(job, (t,))
1703 print "Error: unable to start thread."
1706 # Work remains forever
1711 if __name__ == "__main__":
1712 arguments = parse_arguments()
1713 logger = create_logger(arguments.loglevel, arguments.logfile)
1714 if arguments.multiplicity > 1:
1715 threaded_job(arguments)