1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
24 from copy import deepcopy
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
33 def parse_arguments():
34 """Use argparse to get arguments,
39 parser = argparse.ArgumentParser()
40 # TODO: Should we use --argument-names-with-spaces?
41 str_help = "Autonomous System number use in the stream."
42 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43 # FIXME: We are acting as iBGP peer,
44 # we should mirror AS number from peer's open message.
45 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46 parser.add_argument("--amount", default="1", type=int, help=str_help)
47 str_help = "Maximum number of IP prefixes to be announced in one iteration"
48 parser.add_argument("--insert", default="1", type=int, help=str_help)
49 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51 str_help = "The number of prefixes to process without withdrawals"
52 parser.add_argument("--prefill", default="0", type=int, help=str_help)
53 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54 parser.add_argument("--updates", choices=["single", "separate"],
55 default=["separate"], help=str_help)
56 str_help = "Base prefix IP address for prefix generation"
57 parser.add_argument("--firstprefix", default="8.0.1.0",
58 type=ipaddr.IPv4Address, help=str_help)
59 str_help = "The prefix length."
60 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61 str_help = "Listen for connection, instead of initiating it."
62 parser.add_argument("--listen", action="store_true", help=str_help)
63 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64 "Default value only suitable for listening.")
65 parser.add_argument("--myip", default="0.0.0.0",
66 type=ipaddr.IPv4Address, help=str_help)
67 str_help = ("TCP port to bind to when listening or initiating connection." +
68 "Default only suitable for initiating.")
69 parser.add_argument("--myport", default="0", type=int, help=str_help)
70 str_help = "The IP of the next hop to be placed into the update messages."
71 parser.add_argument("--nexthop", default="192.0.2.1",
72 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73 str_help = "Identifier of the route originator."
74 parser.add_argument("--originator", default=None,
75 type=ipaddr.IPv4Address, dest="originator", help=str_help)
76 str_help = "Cluster list item identifier."
77 parser.add_argument("--cluster", default=None,
78 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79 str_help = ("Numeric IP Address to try to connect to." +
80 "Currently no effect in listening mode.")
81 parser.add_argument("--peerip", default="127.0.0.2",
82 type=ipaddr.IPv4Address, help=str_help)
83 str_help = "TCP port to try to connect to. No effect in listening mode."
84 parser.add_argument("--peerport", default="179", type=int, help=str_help)
85 str_help = "Local hold time."
86 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87 str_help = "Log level (--error, --warning, --info, --debug)"
88 parser.add_argument("--error", dest="loglevel", action="store_const",
89 const=logging.ERROR, default=logging.INFO,
91 parser.add_argument("--warning", dest="loglevel", action="store_const",
92 const=logging.WARNING, default=logging.INFO,
94 parser.add_argument("--info", dest="loglevel", action="store_const",
95 const=logging.INFO, default=logging.INFO,
97 parser.add_argument("--debug", dest="loglevel", action="store_const",
98 const=logging.DEBUG, default=logging.INFO,
100 str_help = "Log file name"
101 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102 str_help = "Trailing part of the csv result files for plotting purposes"
103 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104 str_help = "Minimum number of updates to reach to include result into csv."
105 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107 parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
108 str_help = "How many play utilities are to be started."
109 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
110 arguments = parser.parse_args()
111 if arguments.multiplicity < 1:
112 print "Multiplicity", arguments.multiplicity, "is not positive."
114 # TODO: Are sanity checks (such as asnumber>=0) required?
118 def establish_connection(arguments):
119 """Establish connection to BGP peer.
122 :arguments: following command-line argumets are used
123 - arguments.myip: local IP address
124 - arguments.myport: local port
125 - arguments.peerip: remote IP address
126 - arguments.peerport: remote port
131 logger.info("Connecting in the listening mode.")
132 logger.debug("Local IP address: " + str(arguments.myip))
133 logger.debug("Local port: " + str(arguments.myport))
134 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
135 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
136 # bind need single tuple as argument
137 listening_socket.bind((str(arguments.myip), arguments.myport))
138 listening_socket.listen(1)
139 bgp_socket, _ = listening_socket.accept()
140 # TODO: Verify client IP is cotroller IP.
141 listening_socket.close()
143 logger.info("Connecting in the talking mode.")
144 logger.debug("Local IP address: " + str(arguments.myip))
145 logger.debug("Local port: " + str(arguments.myport))
146 logger.debug("Remote IP address: " + str(arguments.peerip))
147 logger.debug("Remote port: " + str(arguments.peerport))
148 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
149 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
150 # bind to force specified address and port
151 talking_socket.bind((str(arguments.myip), arguments.myport))
152 # socket does not spead ipaddr, hence str()
153 talking_socket.connect((str(arguments.peerip), arguments.peerport))
154 bgp_socket = talking_socket
155 logger.info("Connected to ODL.")
159 def get_short_int_from_message(message, offset=16):
160 """Extract 2-bytes number from provided message.
163 :message: given message
164 :offset: offset of the short_int inside the message
166 :return: required short_inf value.
168 default offset value is the BGP message size offset.
170 high_byte_int = ord(message[offset])
171 low_byte_int = ord(message[offset + 1])
172 short_int = high_byte_int * 256 + low_byte_int
176 def get_prefix_list_from_hex(prefixes_hex):
177 """Get decoded list of prefixes (rfc4271#section-4.3)
180 :prefixes_hex: list of prefixes to be decoded in hex
182 :return: list of prefixes in the form of ip address (X.X.X.X/X)
186 while offset < len(prefixes_hex):
187 prefix_bit_len_hex = prefixes_hex[offset]
188 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
189 prefix_len = ((prefix_bit_len - 1) / 8) + 1
190 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
191 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
192 offset += 1 + prefix_len
193 prefix_list.append(prefix + "/" + str(prefix_bit_len))
197 class MessageError(ValueError):
198 """Value error with logging optimized for hexlified messages."""
200 def __init__(self, text, message, *args):
203 Store and call super init for textual comment,
204 store raw message which caused it.
208 super(MessageError, self).__init__(text, message, *args)
211 """Generate human readable error message.
214 :return: human readable message as string
216 Use a placeholder string if the message is to be empty.
218 message = binascii.hexlify(self.msg)
220 message = "(empty message)"
221 return self.text + ": " + message
224 def read_open_message(bgp_socket):
225 """Receive peer's OPEN message
228 :bgp_socket: the socket to be read
230 :return: received OPEN message.
232 Performs just basic incomming message checks
234 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
235 # TODO: Can the incoming open message be split in more than one packet?
238 # 37 is minimal length of open message with 4-byte AS number.
240 "Message length (" + str(len(msg_in)) + ") is smaller than "
241 "minimal length of OPEN message with 4-byte AS number (37)"
243 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
244 raise MessageError(error_msg, msg_in)
245 # TODO: We could check BGP marker, but it is defined only later;
247 reported_length = get_short_int_from_message(msg_in)
248 if len(msg_in) != reported_length:
250 "Expected message length (" + reported_length +
251 ") does not match actual length (" + str(len(msg_in)) + ")"
253 logger.error(error_msg + binascii.hexlify(msg_in))
254 raise MessageError(error_msg, msg_in)
255 logger.info("Open message received.")
259 class MessageGenerator(object):
260 """Class which generates messages, holds states and configuration values."""
262 # TODO: Define bgp marker as a class (constant) variable.
263 def __init__(self, args):
264 """Initialisation according to command-line args.
267 :args: argsparser's Namespace object which contains command-line
268 options for MesageGenerator initialisation
270 Calculates and stores default values used later on for
273 self.total_prefix_amount = args.amount
274 # Number of update messages left to be sent.
275 self.remaining_prefixes = self.total_prefix_amount
277 # New parameters initialisation
279 self.prefix_base_default = args.firstprefix
280 self.prefix_length_default = args.prefixlen
281 self.wr_prefixes_default = []
282 self.nlri_prefixes_default = []
283 self.version_default = 4
284 self.my_autonomous_system_default = args.asnumber
285 self.hold_time_default = args.holdtime # Local hold time.
286 self.bgp_identifier_default = int(args.myip)
287 self.next_hop_default = args.nexthop
288 self.originator_id_default = args.originator
289 self.cluster_list_item_default = args.cluster
290 self.single_update_default = args.updates == "single"
291 self.randomize_updates_default = args.updates == "random"
292 self.prefix_count_to_add_default = args.insert
293 self.prefix_count_to_del_default = args.withdraw
294 if self.prefix_count_to_del_default < 0:
295 self.prefix_count_to_del_default = 0
296 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
297 # total number of prefixes must grow to avoid infinite test loop
298 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
299 self.slot_size_default = self.prefix_count_to_add_default
300 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
301 self.results_file_name_default = args.results
302 self.performance_threshold_default = args.threshold
303 self.rfc4760 = args.rfc4760 == "yes"
304 # Default values used for randomized part
305 s1_slots = ((self.total_prefix_amount -
306 self.remaining_prefixes_threshold - 1) /
307 self.prefix_count_to_add_default + 1)
308 s2_slots = ((self.remaining_prefixes_threshold - 1) /
309 (self.prefix_count_to_add_default -
310 self.prefix_count_to_del_default) + 1)
312 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
313 s2_first_index = s1_slots * self.prefix_count_to_add_default
314 s2_last_index = (s2_first_index +
315 s2_slots * (self.prefix_count_to_add_default -
316 self.prefix_count_to_del_default) - 1)
317 self.slot_gap_default = ((self.total_prefix_amount -
318 self.remaining_prefixes_threshold - 1) /
319 self.prefix_count_to_add_default + 1)
320 self.randomize_lowest_default = s2_first_index
321 self.randomize_highest_default = s2_last_index
323 # Initialising counters
324 self.phase1_start_time = 0
325 self.phase1_stop_time = 0
326 self.phase2_start_time = 0
327 self.phase2_stop_time = 0
328 self.phase1_updates_sent = 0
329 self.phase2_updates_sent = 0
330 self.updates_sent = 0
332 self.log_info = args.loglevel <= logging.INFO
333 self.log_debug = args.loglevel <= logging.DEBUG
335 Flags needed for the MessageGenerator performance optimization.
336 Calling logger methods each iteration even with proper log level set
337 slows down significantly the MessageGenerator performance.
338 Measured total generation time (1M updates, dry run, error log level):
339 - logging based on basic logger features: 36,2s
340 - logging based on advanced logger features (lazy logging): 21,2s
341 - conditional calling of logger methods enclosed inside condition: 8,6s
344 logger.info("Generator initialisation")
345 logger.info(" Target total number of prefixes to be introduced: " +
346 str(self.total_prefix_amount))
347 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
348 str(self.prefix_length_default))
349 logger.info(" My Autonomous System number: " +
350 str(self.my_autonomous_system_default))
351 logger.info(" My Hold Time: " + str(self.hold_time_default))
352 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
353 logger.info(" Next Hop: " + str(self.next_hop_default))
354 logger.info(" Originator ID: " + str(self.originator_id_default))
355 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
356 logger.info(" Prefix count to be inserted at once: " +
357 str(self.prefix_count_to_add_default))
358 logger.info(" Prefix count to be withdrawn at once: " +
359 str(self.prefix_count_to_del_default))
360 logger.info(" Fast pre-fill up to " +
361 str(self.total_prefix_amount -
362 self.remaining_prefixes_threshold) + " prefixes")
363 logger.info(" Remaining number of prefixes to be processed " +
364 "in parallel with withdrawals: " +
365 str(self.remaining_prefixes_threshold))
366 logger.debug(" Prefix index range used after pre-fill procedure [" +
367 str(self.randomize_lowest_default) + ", " +
368 str(self.randomize_highest_default) + "]")
369 if self.single_update_default:
370 logger.info(" Common single UPDATE will be generated " +
371 "for both NLRI & WITHDRAWN lists")
373 logger.info(" Two separate UPDATEs will be generated " +
374 "for each NLRI & WITHDRAWN lists")
375 if self.randomize_updates_default:
376 logger.info(" Generation of UPDATE messages will be randomized")
377 logger.info(" Let\'s go ...\n")
379 # TODO: Notification for hold timer expiration can be handy.
381 def store_results(self, file_name=None, threshold=None):
382 """ Stores specified results into files based on file_name value.
385 :param file_name: Trailing (common) part of result file names
386 :param threshold: Minimum number of sent updates needed for each
387 result to be included into result csv file
388 (mainly needed because of the result accuracy)
392 # default values handling
393 # TODO optimize default values handling (use e.g. dicionary.update() approach)
394 if file_name is None:
395 file_name = self.results_file_name_default
396 if threshold is None:
397 threshold = self.performance_threshold_default
398 # performance calculation
399 if self.phase1_updates_sent >= threshold:
400 totals1 = self.phase1_updates_sent
401 performance1 = int(self.phase1_updates_sent /
402 (self.phase1_stop_time - self.phase1_start_time))
406 if self.phase2_updates_sent >= threshold:
407 totals2 = self.phase2_updates_sent
408 performance2 = int(self.phase2_updates_sent /
409 (self.phase2_stop_time - self.phase2_start_time))
414 logger.info("#" * 10 + " Final results " + "#" * 10)
415 logger.info("Number of iterations: " + str(self.iteration))
416 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
417 str(self.phase1_updates_sent))
418 logger.info("The pre-fill phase duration: " +
419 str(self.phase1_stop_time - self.phase1_start_time) + "s")
420 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
421 str(self.phase2_updates_sent))
422 logger.info("The 2nd test phase duration: " +
423 str(self.phase2_stop_time - self.phase2_start_time) + "s")
424 logger.info("Threshold for performance reporting: " + str(threshold))
427 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
428 " route(s) per UPDATE")
429 if self.single_update_default:
430 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
431 "/-" + str(self.prefix_count_to_del_default) +
432 " routes per UPDATE")
434 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
435 "/-" + str(self.prefix_count_to_del_default) +
436 " routes in two UPDATEs")
437 # collecting capacity and performance results
440 if totals1 is not None:
441 totals[phase1_label] = totals1
442 performance[phase1_label] = performance1
443 if totals2 is not None:
444 totals[phase2_label] = totals2
445 performance[phase2_label] = performance2
446 self.write_results_to_file(totals, "totals-" + file_name)
447 self.write_results_to_file(performance, "performance-" + file_name)
449 def write_results_to_file(self, results, file_name):
450 """Writes results to the csv plot file consumable by Jenkins.
453 :param file_name: Name of the (csv) file to be created
459 f = open(file_name, "wt")
461 for key in sorted(results):
462 first_line += key + ", "
463 second_line += str(results[key]) + ", "
464 first_line = first_line[:-2]
465 second_line = second_line[:-2]
466 f.write(first_line + "\n")
467 f.write(second_line + "\n")
468 logger.info("Message generator performance results stored in " +
470 logger.info(" " + first_line)
471 logger.info(" " + second_line)
475 # Return pseudo-randomized (reproducible) index for selected range
476 def randomize_index(self, index, lowest=None, highest=None):
477 """Calculates pseudo-randomized index from selected range.
480 :param index: input index
481 :param lowest: the lowes index from the randomized area
482 :param highest: the highest index from the randomized area
484 :return: the (pseudo)randomized index
486 Created just as a fame for future generator enhancement.
488 # default values handling
489 # TODO optimize default values handling (use e.g. dicionary.update() approach)
491 lowest = self.randomize_lowest_default
493 highest = self.randomize_highest_default
495 if (index >= lowest) and (index <= highest):
496 # we are in the randomized range -> shuffle it inside
497 # the range (now just reverse the order)
498 new_index = highest - (index - lowest)
500 # we are out of the randomized range -> nothing to do
504 # Get list of prefixes
505 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
506 prefix_len=None, prefix_count=None, randomize=None):
507 """Generates list of IP address prefixes.
510 :param slot_index: index of group of prefix addresses
511 :param slot_size: size of group of prefix addresses
512 in [number of included prefixes]
513 :param prefix_base: IP address of the first prefix
514 (slot_index = 0, prefix_index = 0)
515 :param prefix_len: length of the prefix in bites
516 (the same as size of netmask)
517 :param prefix_count: number of prefixes to be returned
518 from the specified slot
520 :return: list of generated IP address prefixes
522 # default values handling
523 # TODO optimize default values handling (use e.g. dicionary.update() approach)
524 if slot_size is None:
525 slot_size = self.slot_size_default
526 if prefix_base is None:
527 prefix_base = self.prefix_base_default
528 if prefix_len is None:
529 prefix_len = self.prefix_length_default
530 if prefix_count is None:
531 prefix_count = slot_size
532 if randomize is None:
533 randomize = self.randomize_updates_default
534 # generating list of prefixes
537 prefix_gap = 2 ** (32 - prefix_len)
538 for i in range(prefix_count):
539 prefix_index = slot_index * slot_size + i
541 prefix_index = self.randomize_index(prefix_index)
542 indexes.append(prefix_index)
543 prefixes.append(prefix_base + prefix_index * prefix_gap)
545 logger.debug(" Prefix slot index: " + str(slot_index))
546 logger.debug(" Prefix slot size: " + str(slot_size))
547 logger.debug(" Prefix count: " + str(prefix_count))
548 logger.debug(" Prefix indexes: " + str(indexes))
549 logger.debug(" Prefix list: " + str(prefixes))
552 def compose_update_message(self, prefix_count_to_add=None,
553 prefix_count_to_del=None):
554 """Composes an UPDATE message
557 :param prefix_count_to_add: # of prefixes to put into NLRI list
558 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
560 :return: encoded UPDATE message in HEX
562 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
563 lists or common message wich includes both prefix lists.
564 Updates global counters.
566 # default values handling
567 # TODO optimize default values handling (use e.g. dicionary.update() approach)
568 if prefix_count_to_add is None:
569 prefix_count_to_add = self.prefix_count_to_add_default
570 if prefix_count_to_del is None:
571 prefix_count_to_del = self.prefix_count_to_del_default
573 if self.log_info and not (self.iteration % 1000):
574 logger.info("Iteration: " + str(self.iteration) +
575 " - total remaining prefixes: " +
576 str(self.remaining_prefixes))
578 logger.debug("#" * 10 + " Iteration: " +
579 str(self.iteration) + " " + "#" * 10)
580 logger.debug("Remaining prefixes: " +
581 str(self.remaining_prefixes))
582 # scenario type & one-shot counter
583 straightforward_scenario = (self.remaining_prefixes >
584 self.remaining_prefixes_threshold)
585 if straightforward_scenario:
586 prefix_count_to_del = 0
588 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
589 if not self.phase1_start_time:
590 self.phase1_start_time = time.time()
593 logger.debug("--- COMBINED SCENARIO ---")
594 if not self.phase2_start_time:
595 self.phase2_start_time = time.time()
596 # tailor the number of prefixes if needed
597 prefix_count_to_add = (prefix_count_to_del +
598 min(prefix_count_to_add - prefix_count_to_del,
599 self.remaining_prefixes))
600 # prefix slots selection for insertion and withdrawal
601 slot_index_to_add = self.iteration
602 slot_index_to_del = slot_index_to_add - self.slot_gap_default
603 # getting lists of prefixes for insertion in this iteration
605 logger.debug("Prefixes to be inserted in this iteration:")
606 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
607 prefix_count=prefix_count_to_add)
608 # getting lists of prefixes for withdrawal in this iteration
610 logger.debug("Prefixes to be withdrawn in this iteration:")
611 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
612 prefix_count=prefix_count_to_del)
613 # generating the mesage
614 if self.single_update_default:
615 # Send prefixes to be introduced and withdrawn
616 # in one UPDATE message
617 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
618 nlri_prefixes=prefix_list_to_add)
620 # Send prefixes to be introduced and withdrawn
621 # in separate UPDATE messages (if needed)
622 msg_out = self.update_message(wr_prefixes=[],
623 nlri_prefixes=prefix_list_to_add)
624 if prefix_count_to_del:
625 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
627 # updating counters - who knows ... maybe I am last time here ;)
628 if straightforward_scenario:
629 self.phase1_stop_time = time.time()
630 self.phase1_updates_sent = self.updates_sent
632 self.phase2_stop_time = time.time()
633 self.phase2_updates_sent = (self.updates_sent -
634 self.phase1_updates_sent)
635 # updating totals for the next iteration
637 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
638 # returning the encoded message
641 # Section of message encoders
643 def open_message(self, version=None, my_autonomous_system=None,
644 hold_time=None, bgp_identifier=None):
645 """Generates an OPEN Message (rfc4271#section-4.2)
648 :param version: see the rfc4271#section-4.2
649 :param my_autonomous_system: see the rfc4271#section-4.2
650 :param hold_time: see the rfc4271#section-4.2
651 :param bgp_identifier: see the rfc4271#section-4.2
653 :return: encoded OPEN message in HEX
656 # default values handling
657 # TODO optimize default values handling (use e.g. dicionary.update() approach)
659 version = self.version_default
660 if my_autonomous_system is None:
661 my_autonomous_system = self.my_autonomous_system_default
662 if hold_time is None:
663 hold_time = self.hold_time_default
664 if bgp_identifier is None:
665 bgp_identifier = self.bgp_identifier_default
668 marker_hex = "\xFF" * 16
672 type_hex = struct.pack("B", type)
675 version_hex = struct.pack("B", version)
677 # my_autonomous_system
678 # AS_TRANS value, 23456 decadic.
679 my_autonomous_system_2_bytes = 23456
680 # AS number is mappable to 2 bytes
681 if my_autonomous_system < 65536:
682 my_autonomous_system_2_bytes = my_autonomous_system
683 my_autonomous_system_hex_2_bytes = struct.pack(">H",
684 my_autonomous_system)
687 hold_time_hex = struct.pack(">H", hold_time)
690 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
692 # Optional Parameters
693 optional_parameters_hex = ""
695 optional_parameter_hex = (
696 "\x02" # Param type ("Capability Ad")
697 "\x06" # Length (6 bytes)
698 "\x01" # Capability type (NLRI Unicast),
699 # see RFC 4760, secton 8
700 "\x04" # Capability value length
701 "\x00\x01" # AFI (Ipv4)
703 "\x01" # SAFI (Unicast)
705 optional_parameters_hex += optional_parameter_hex
707 optional_parameter_hex = (
708 "\x02" # Param type ("Capability Ad")
709 "\x06" # Length (6 bytes)
710 "\x41" # "32 bit AS Numbers Support"
711 # (see RFC 6793, section 3)
712 "\x04" # Capability value length
714 optional_parameter_hex += (
715 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
717 optional_parameters_hex += optional_parameter_hex
719 # Optional Parameters Length
720 optional_parameters_length = len(optional_parameters_hex)
721 optional_parameters_length_hex = struct.pack("B",
722 optional_parameters_length)
724 # Length (big-endian)
726 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
727 len(my_autonomous_system_hex_2_bytes) +
728 len(hold_time_hex) + len(bgp_identifier_hex) +
729 len(optional_parameters_length_hex) +
730 len(optional_parameters_hex)
732 length_hex = struct.pack(">H", length)
740 my_autonomous_system_hex_2_bytes +
743 optional_parameters_length_hex +
744 optional_parameters_hex
748 logger.debug("OPEN message encoding")
749 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
750 logger.debug(" Length=" + str(length) + " (0x" +
751 binascii.hexlify(length_hex) + ")")
752 logger.debug(" Type=" + str(type) + " (0x" +
753 binascii.hexlify(type_hex) + ")")
754 logger.debug(" Version=" + str(version) + " (0x" +
755 binascii.hexlify(version_hex) + ")")
756 logger.debug(" My Autonomous System=" +
757 str(my_autonomous_system_2_bytes) + " (0x" +
758 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
760 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
761 binascii.hexlify(hold_time_hex) + ")")
762 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
763 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
764 logger.debug(" Optional Parameters Length=" +
765 str(optional_parameters_length) + " (0x" +
766 binascii.hexlify(optional_parameters_length_hex) +
768 logger.debug(" Optional Parameters=0x" +
769 binascii.hexlify(optional_parameters_hex))
770 logger.debug("OPEN message encoded: 0x%s",
771 binascii.b2a_hex(message_hex))
775 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
776 wr_prefix_length=None, nlri_prefix_length=None,
777 my_autonomous_system=None, next_hop=None,
778 originator_id=None, cluster_list_item=None):
779 """Generates an UPDATE Message (rfc4271#section-4.3)
782 :param wr_prefixes: see the rfc4271#section-4.3
783 :param nlri_prefixes: see the rfc4271#section-4.3
784 :param wr_prefix_length: see the rfc4271#section-4.3
785 :param nlri_prefix_length: see the rfc4271#section-4.3
786 :param my_autonomous_system: see the rfc4271#section-4.3
787 :param next_hop: see the rfc4271#section-4.3
789 :return: encoded UPDATE message in HEX
792 # default values handling
793 # TODO optimize default values handling (use e.g. dicionary.update() approach)
794 if wr_prefixes is None:
795 wr_prefixes = self.wr_prefixes_default
796 if nlri_prefixes is None:
797 nlri_prefixes = self.nlri_prefixes_default
798 if wr_prefix_length is None:
799 wr_prefix_length = self.prefix_length_default
800 if nlri_prefix_length is None:
801 nlri_prefix_length = self.prefix_length_default
802 if my_autonomous_system is None:
803 my_autonomous_system = self.my_autonomous_system_default
805 next_hop = self.next_hop_default
806 if originator_id is None:
807 originator_id = self.originator_id_default
808 if cluster_list_item is None:
809 cluster_list_item = self.cluster_list_item_default
812 marker_hex = "\xFF" * 16
816 type_hex = struct.pack("B", type)
819 bytes = ((wr_prefix_length - 1) / 8) + 1
820 withdrawn_routes_hex = ""
821 for prefix in wr_prefixes:
822 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
823 struct.pack(">I", int(prefix))[:bytes])
824 withdrawn_routes_hex += withdrawn_route_hex
826 # Withdrawn Routes Length
827 withdrawn_routes_length = len(withdrawn_routes_hex)
828 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
830 # TODO: to replace hardcoded string by encoding?
832 path_attributes_hex = ""
833 if nlri_prefixes != []:
834 path_attributes_hex += (
835 "\x40" # Flags ("Well-Known")
836 "\x01" # Type (ORIGIN)
840 path_attributes_hex += (
841 "\x40" # Flags ("Well-Known")
842 "\x02" # Type (AS_PATH)
844 "\x02" # AS segment type (AS_SEQUENCE)
845 "\x01" # AS segment length (1)
847 my_as_hex = struct.pack(">I", my_autonomous_system)
848 path_attributes_hex += my_as_hex # AS segment (4 bytes)
849 path_attributes_hex += (
850 "\x40" # Flags ("Well-Known")
851 "\x03" # Type (NEXT_HOP)
854 next_hop_hex = struct.pack(">I", int(next_hop))
855 path_attributes_hex += (
856 next_hop_hex # IP address of the next hop (4 bytes)
858 if originator_id is not None:
859 path_attributes_hex += (
860 "\x80" # Flags ("Optional, non-transitive")
861 "\x09" # Type (ORIGINATOR_ID)
863 ) # ORIGINATOR_ID (4 bytes)
864 path_attributes_hex += struct.pack(">I", int(originator_id))
865 if cluster_list_item is not None:
866 path_attributes_hex += (
867 "\x80" # Flags ("Optional, non-transitive")
868 "\x09" # Type (CLUSTER_LIST)
870 ) # one CLUSTER_LIST item (4 bytes)
871 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
873 # Total Path Attributes Length
874 total_path_attributes_length = len(path_attributes_hex)
875 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
877 # Network Layer Reachability Information
878 bytes = ((nlri_prefix_length - 1) / 8) + 1
880 for prefix in nlri_prefixes:
881 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
882 struct.pack(">I", int(prefix))[:bytes])
883 nlri_hex += nlri_prefix_hex
885 # Length (big-endian)
887 len(marker_hex) + 2 + len(type_hex) +
888 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
889 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
891 length_hex = struct.pack(">H", length)
898 withdrawn_routes_length_hex +
899 withdrawn_routes_hex +
900 total_path_attributes_length_hex +
901 path_attributes_hex +
906 logger.debug("UPDATE message encoding")
907 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
908 logger.debug(" Length=" + str(length) + " (0x" +
909 binascii.hexlify(length_hex) + ")")
910 logger.debug(" Type=" + str(type) + " (0x" +
911 binascii.hexlify(type_hex) + ")")
912 logger.debug(" withdrawn_routes_length=" +
913 str(withdrawn_routes_length) + " (0x" +
914 binascii.hexlify(withdrawn_routes_length_hex) + ")")
915 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
916 str(wr_prefix_length) + " (0x" +
917 binascii.hexlify(withdrawn_routes_hex) + ")")
918 if total_path_attributes_length:
919 logger.debug(" Total Path Attributes Length=" +
920 str(total_path_attributes_length) + " (0x" +
921 binascii.hexlify(total_path_attributes_length_hex) + ")")
922 logger.debug(" Path Attributes=" + "(0x" +
923 binascii.hexlify(path_attributes_hex) + ")")
924 logger.debug(" Origin=IGP")
925 logger.debug(" AS path=" + str(my_autonomous_system))
926 logger.debug(" Next hop=" + str(next_hop))
927 if originator_id is not None:
928 logger.debug(" Originator id=" + str(originator_id))
929 if cluster_list_item is not None:
930 logger.debug(" Cluster list=" + str(cluster_list_item))
931 logger.debug(" Network Layer Reachability Information=" +
932 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
933 " (0x" + binascii.hexlify(nlri_hex) + ")")
934 logger.debug("UPDATE message encoded: 0x" +
935 binascii.b2a_hex(message_hex))
938 self.updates_sent += 1
939 # returning encoded message
942 def notification_message(self, error_code, error_subcode, data_hex=""):
943 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
946 :param error_code: see the rfc4271#section-4.5
947 :param error_subcode: see the rfc4271#section-4.5
948 :param data_hex: see the rfc4271#section-4.5
950 :return: encoded NOTIFICATION message in HEX
954 marker_hex = "\xFF" * 16
958 type_hex = struct.pack("B", type)
961 error_code_hex = struct.pack("B", error_code)
964 error_subcode_hex = struct.pack("B", error_subcode)
966 # Length (big-endian)
967 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
968 len(error_subcode_hex) + len(data_hex))
969 length_hex = struct.pack(">H", length)
971 # NOTIFICATION Message
982 logger.debug("NOTIFICATION message encoding")
983 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
984 logger.debug(" Length=" + str(length) + " (0x" +
985 binascii.hexlify(length_hex) + ")")
986 logger.debug(" Type=" + str(type) + " (0x" +
987 binascii.hexlify(type_hex) + ")")
988 logger.debug(" Error Code=" + str(error_code) + " (0x" +
989 binascii.hexlify(error_code_hex) + ")")
990 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
991 binascii.hexlify(error_subcode_hex) + ")")
992 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
993 logger.debug("NOTIFICATION message encoded: 0x%s",
994 binascii.b2a_hex(message_hex))
998 def keepalive_message(self):
999 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1002 :return: encoded KEEP ALIVE message in HEX
1006 marker_hex = "\xFF" * 16
1010 type_hex = struct.pack("B", type)
1012 # Length (big-endian)
1013 length = len(marker_hex) + 2 + len(type_hex)
1014 length_hex = struct.pack(">H", length)
1016 # KEEP ALIVE Message
1024 logger.debug("KEEP ALIVE message encoding")
1025 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1026 logger.debug(" Length=" + str(length) + " (0x" +
1027 binascii.hexlify(length_hex) + ")")
1028 logger.debug(" Type=" + str(type) + " (0x" +
1029 binascii.hexlify(type_hex) + ")")
1030 logger.debug("KEEP ALIVE message encoded: 0x%s",
1031 binascii.b2a_hex(message_hex))
1036 class TimeTracker(object):
1037 """Class for tracking timers, both for my keepalives and
1041 def __init__(self, msg_in):
1042 """Initialisation. based on defaults and OPEN message from peer.
1045 msg_in: the OPEN message received from peer.
1047 # Note: Relative time is always named timedelta, to stress that
1048 # the (non-delta) time is absolute.
1049 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1050 # Upper bound for being stuck in the same state, we should
1051 # at least report something before continuing.
1052 # Negotiate the hold timer by taking the smaller
1053 # of the 2 values (mine and the peer's).
1054 hold_timedelta = 180 # Not an attribute of self yet.
1055 # TODO: Make the default value configurable,
1056 # default value could mirror what peer said.
1057 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1058 if hold_timedelta > peer_hold_timedelta:
1059 hold_timedelta = peer_hold_timedelta
1060 if hold_timedelta != 0 and hold_timedelta < 3:
1061 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1062 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1063 self.hold_timedelta = hold_timedelta
1064 # If we do not hear from peer this long, we assume it has died.
1065 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1066 # Upper limit for duration between messages, to avoid being
1067 # declared to be dead.
1068 # The same as calling snapshot(), but also declares a field.
1069 self.snapshot_time = time.time()
1070 # Sometimes we need to store time. This is where to get
1071 # the value from afterwards. Time_keepalive may be too strict.
1072 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1073 # At this time point, peer will be declared dead.
1074 self.my_keepalive_time = None # to be set later
1075 # At this point, we should be sending keepalive message.
1078 """Store current time in instance data to use later."""
1079 # Read as time before something interesting was called.
1080 self.snapshot_time = time.time()
1082 def reset_peer_hold_time(self):
1083 """Move hold time to future as peer has just proven it still lives."""
1084 self.peer_hold_time = time.time() + self.hold_timedelta
1086 # Some methods could rely on self.snapshot_time, but it is better
1087 # to require user to provide it explicitly.
1088 def reset_my_keepalive_time(self, keepalive_time):
1089 """Calculate and set the next my KEEP ALIVE timeout time
1092 :keepalive_time: the initial value of the KEEP ALIVE timer
1094 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1096 def is_time_for_my_keepalive(self):
1097 """Check for my KEEP ALIVE timeout occurence"""
1098 if self.hold_timedelta == 0:
1100 return self.snapshot_time >= self.my_keepalive_time
1102 def get_next_event_time(self):
1103 """Set the time of the next expected or to be sent KEEP ALIVE"""
1104 if self.hold_timedelta == 0:
1105 return self.snapshot_time + 86400
1106 return min(self.my_keepalive_time, self.peer_hold_time)
1108 def check_peer_hold_time(self, snapshot_time):
1109 """Raise error if nothing was read from peer until specified time."""
1110 # Hold time = 0 means keepalive checking off.
1111 if self.hold_timedelta != 0:
1112 # time.time() may be too strict
1113 if snapshot_time > self.peer_hold_time:
1114 logger.error("Peer has overstepped the hold timer.")
1115 raise RuntimeError("Peer has overstepped the hold timer.")
1116 # TODO: Include hold_timedelta?
1117 # TODO: Add notification sending (attempt). That means
1118 # move to write tracker.
1121 class ReadTracker(object):
1122 """Class for tracking read of mesages chunk by chunk and
1126 def __init__(self, bgp_socket, timer):
1127 """The reader initialisation.
1130 bgp_socket: socket to be used for sending
1131 timer: timer to be used for scheduling
1133 # References to outside objects.
1134 self.socket = bgp_socket
1136 # BGP marker length plus length field length.
1137 self.header_length = 18
1138 # TODO: make it class (constant) attribute
1139 # Computation of where next chunk ends depends on whether
1140 # we are beyond length field.
1141 self.reading_header = True
1142 # Countdown towards next size computation.
1143 self.bytes_to_read = self.header_length
1144 # Incremental buffer for message under read.
1146 # Initialising counters
1147 self.updates_received = 0
1148 self.prefixes_introduced = 0
1149 self.prefixes_withdrawn = 0
1150 self.rx_idle_time = 0
1151 self.rx_activity_detected = True
1153 def read_message_chunk(self):
1154 """Read up to one message
1157 Currently it does not return anything.
1159 # TODO: We could return the whole message, currently not needed.
1160 # We assume the socket is readable.
1161 chunk_message = self.socket.recv(self.bytes_to_read)
1162 self.msg_in += chunk_message
1163 self.bytes_to_read -= len(chunk_message)
1164 # TODO: bytes_to_read < 0 is not possible, right?
1165 if not self.bytes_to_read:
1166 # Finished reading a logical block.
1167 if self.reading_header:
1168 # The logical block was a BGP header.
1169 # Now we know the size of the message.
1170 self.reading_header = False
1171 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1173 else: # We have finished reading the body of the message.
1174 # Peer has just proven it is still alive.
1175 self.timer.reset_peer_hold_time()
1176 # TODO: Do we want to count received messages?
1177 # This version ignores the received message.
1178 # TODO: Should we do validation and exit on anything
1179 # besides update or keepalive?
1180 # Prepare state for reading another message.
1181 message_type_hex = self.msg_in[self.header_length]
1182 if message_type_hex == "\x01":
1183 logger.info("OPEN message received: 0x%s",
1184 binascii.b2a_hex(self.msg_in))
1185 elif message_type_hex == "\x02":
1186 logger.debug("UPDATE message received: 0x%s",
1187 binascii.b2a_hex(self.msg_in))
1188 self.decode_update_message(self.msg_in)
1189 elif message_type_hex == "\x03":
1190 logger.info("NOTIFICATION message received: 0x%s",
1191 binascii.b2a_hex(self.msg_in))
1192 elif message_type_hex == "\x04":
1193 logger.info("KEEP ALIVE message received: 0x%s",
1194 binascii.b2a_hex(self.msg_in))
1196 logger.warning("Unexpected message received: 0x%s",
1197 binascii.b2a_hex(self.msg_in))
1199 self.reading_header = True
1200 self.bytes_to_read = self.header_length
1201 # We should not act upon peer_hold_time if we are reading
1202 # something right now.
1205 def decode_path_attributes(self, path_attributes_hex):
1206 """Decode the Path Attributes field (rfc4271#section-4.3)
1209 :path_attributes: path_attributes field to be decoded in hex
1213 hex_to_decode = path_attributes_hex
1215 while len(hex_to_decode):
1216 attr_flags_hex = hex_to_decode[0]
1217 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1218 # attr_optional_bit = attr_flags & 128
1219 # attr_transitive_bit = attr_flags & 64
1220 # attr_partial_bit = attr_flags & 32
1221 attr_extended_length_bit = attr_flags & 16
1223 attr_type_code_hex = hex_to_decode[1]
1224 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1226 if attr_extended_length_bit:
1227 attr_length_hex = hex_to_decode[2:4]
1228 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1229 attr_value_hex = hex_to_decode[4:4 + attr_length]
1230 hex_to_decode = hex_to_decode[4 + attr_length:]
1232 attr_length_hex = hex_to_decode[2]
1233 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1234 attr_value_hex = hex_to_decode[3:3 + attr_length]
1235 hex_to_decode = hex_to_decode[3 + attr_length:]
1237 if attr_type_code == 1:
1238 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1239 binascii.b2a_hex(attr_flags_hex))
1240 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1241 elif attr_type_code == 2:
1242 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1243 binascii.b2a_hex(attr_flags_hex))
1244 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1245 elif attr_type_code == 3:
1246 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1247 binascii.b2a_hex(attr_flags_hex))
1248 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1249 elif attr_type_code == 4:
1250 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1251 binascii.b2a_hex(attr_flags_hex))
1252 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1253 elif attr_type_code == 5:
1254 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1255 binascii.b2a_hex(attr_flags_hex))
1256 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1257 elif attr_type_code == 6:
1258 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1259 binascii.b2a_hex(attr_flags_hex))
1260 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1261 elif attr_type_code == 7:
1262 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1263 binascii.b2a_hex(attr_flags_hex))
1264 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1265 elif attr_type_code == 9: # rfc4456#section-8
1266 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1267 binascii.b2a_hex(attr_flags_hex))
1268 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1269 elif attr_type_code == 10: # rfc4456#section-8
1270 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1271 binascii.b2a_hex(attr_flags_hex))
1272 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1273 elif attr_type_code == 14: # rfc4760#section-3
1274 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1275 binascii.b2a_hex(attr_flags_hex))
1276 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1277 address_family_identifier_hex = attr_value_hex[0:2]
1278 logger.debug(" Address Family Identifier=0x%s",
1279 binascii.b2a_hex(address_family_identifier_hex))
1280 subsequent_address_family_identifier_hex = attr_value_hex[2]
1281 logger.debug(" Subsequent Address Family Identifier=0x%s",
1282 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1283 next_hop_netaddr_len_hex = attr_value_hex[3]
1284 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1285 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1286 next_hop_netaddr_len,
1287 binascii.b2a_hex(next_hop_netaddr_len_hex))
1288 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1289 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1290 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1291 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1292 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1293 logger.debug(" Reserved=0x%s",
1294 binascii.b2a_hex(reserved_hex))
1295 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1296 logger.debug(" Network Layer Reachability Information=0x%s",
1297 binascii.b2a_hex(nlri_hex))
1298 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1299 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1300 for prefix in nlri_prefix_list:
1301 logger.debug(" nlri_prefix_received: %s", prefix)
1302 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1303 elif attr_type_code == 15: # rfc4760#section-4
1304 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1305 binascii.b2a_hex(attr_flags_hex))
1306 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1307 address_family_identifier_hex = attr_value_hex[0:2]
1308 logger.debug(" Address Family Identifier=0x%s",
1309 binascii.b2a_hex(address_family_identifier_hex))
1310 subsequent_address_family_identifier_hex = attr_value_hex[2]
1311 logger.debug(" Subsequent Address Family Identifier=0x%s",
1312 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1313 wd_hex = attr_value_hex[3:]
1314 logger.debug(" Withdrawn Routes=0x%s",
1315 binascii.b2a_hex(wd_hex))
1316 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1317 logger.debug(" Withdrawn routes prefix list: %s",
1319 for prefix in wdr_prefix_list:
1320 logger.debug(" withdrawn_prefix_received: %s", prefix)
1321 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1323 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1324 binascii.b2a_hex(attr_flags_hex))
1325 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1328 def decode_update_message(self, msg):
1329 """Decode an UPDATE message (rfc4271#section-4.3)
1332 :msg: message to be decoded in hex
1336 logger.debug("Decoding update message:")
1337 # message header - marker
1338 marker_hex = msg[:16]
1339 logger.debug("Message header marker: 0x%s",
1340 binascii.b2a_hex(marker_hex))
1341 # message header - message length
1342 msg_length_hex = msg[16:18]
1343 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1344 logger.debug("Message lenght: 0x%s (%s)",
1345 binascii.b2a_hex(msg_length_hex), msg_length)
1346 # message header - message type
1347 msg_type_hex = msg[18:19]
1348 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1350 logger.debug("Message type: 0x%s (update)",
1351 binascii.b2a_hex(msg_type_hex))
1352 # withdrawn routes length
1353 wdr_length_hex = msg[19:21]
1354 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1355 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1356 binascii.b2a_hex(wdr_length_hex), wdr_length)
1358 wdr_hex = msg[21:21 + wdr_length]
1359 logger.debug("Withdrawn routes: 0x%s",
1360 binascii.b2a_hex(wdr_hex))
1361 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1362 logger.debug("Withdrawn routes prefix list: %s",
1364 for prefix in wdr_prefix_list:
1365 logger.debug("withdrawn_prefix_received: %s", prefix)
1366 # total path attribute length
1367 total_pa_length_offset = 21 + wdr_length
1368 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1369 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1370 logger.debug("Total path attribute lenght: 0x%s (%s)",
1371 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1373 pa_offset = total_pa_length_offset + 2
1374 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1375 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1376 self.decode_path_attributes(pa_hex)
1377 # network layer reachability information length
1378 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1379 logger.debug("Calculated NLRI length: %s", nlri_length)
1380 # network layer reachability information
1381 nlri_offset = pa_offset + total_pa_length
1382 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1383 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1384 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1385 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1386 for prefix in nlri_prefix_list:
1387 logger.debug("nlri_prefix_received: %s", prefix)
1389 self.updates_received += 1
1390 self.prefixes_introduced += len(nlri_prefix_list)
1391 self.prefixes_withdrawn += len(wdr_prefix_list)
1393 logger.error("Unexpeced message type 0x%s in 0x%s",
1394 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1396 def wait_for_read(self):
1397 """Read message until timeout (next expected event).
1400 Used when no more updates has to be sent to avoid busy-wait.
1401 Currently it does not return anything.
1403 # Compute time to the first predictable state change
1404 event_time = self.timer.get_next_event_time()
1405 # snapshot_time would be imprecise
1406 wait_timedelta = min(event_time - time.time(), 10)
1407 if wait_timedelta < 0:
1408 # The program got around to waiting to an event in "very near
1409 # future" so late that it became a "past" event, thus tell
1410 # "select" to not wait at all. Passing negative timedelta to
1411 # select() would lead to either waiting forever (for -1) or
1412 # select.error("Invalid parameter") (for everything else).
1414 # And wait for event or something to read.
1416 if not self.rx_activity_detected or not (self.updates_received % 100):
1417 # right time to write statistics to the log (not for every update and
1418 # not too frequently to avoid having large log files)
1419 logger.info("total_received_update_message_counter: %s",
1420 self.updates_received)
1421 logger.info("total_received_nlri_prefix_counter: %s",
1422 self.prefixes_introduced)
1423 logger.info("total_received_withdrawn_prefix_counter: %s",
1424 self.prefixes_withdrawn)
1426 start_time = time.time()
1427 select.select([self.socket], [], [self.socket], wait_timedelta)
1428 timedelta = time.time() - start_time
1429 self.rx_idle_time += timedelta
1430 self.rx_activity_detected = timedelta < 1
1432 if not self.rx_activity_detected or not (self.updates_received % 100):
1433 # right time to write statistics to the log (not for every update and
1434 # not too frequently to avoid having large log files)
1435 logger.info("... idle for %.3fs", timedelta)
1436 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1440 class WriteTracker(object):
1441 """Class tracking enqueueing messages and sending chunks of them."""
1443 def __init__(self, bgp_socket, generator, timer):
1444 """The writter initialisation.
1447 bgp_socket: socket to be used for sending
1448 generator: generator to be used for message generation
1449 timer: timer to be used for scheduling
1451 # References to outside objects,
1452 self.socket = bgp_socket
1453 self.generator = generator
1455 # Really new fields.
1456 # TODO: Would attribute docstrings add anything substantial?
1457 self.sending_message = False
1458 self.bytes_to_send = 0
1461 def enqueue_message_for_sending(self, message):
1462 """Enqueue message and change state.
1465 message: message to be enqueued into the msg_out buffer
1467 self.msg_out += message
1468 self.bytes_to_send += len(message)
1469 self.sending_message = True
1471 def send_message_chunk_is_whole(self):
1472 """Send enqueued data from msg_out buffer
1475 :return: true if no remaining data to send
1477 # We assume there is a msg_out to send and socket is writable.
1478 # print "going to send", repr(self.msg_out)
1479 self.timer.snapshot()
1480 bytes_sent = self.socket.send(self.msg_out)
1481 # Forget the part of message that was sent.
1482 self.msg_out = self.msg_out[bytes_sent:]
1483 self.bytes_to_send -= bytes_sent
1484 if not self.bytes_to_send:
1485 # TODO: Is it possible to hit negative bytes_to_send?
1486 self.sending_message = False
1487 # We should have reset hold timer on peer side.
1488 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1489 # The possible reason for not prioritizing reads is gone.
1494 class StateTracker(object):
1495 """Main loop has state so complex it warrants this separate class."""
1497 def __init__(self, bgp_socket, generator, timer):
1498 """The state tracker initialisation.
1501 bgp_socket: socket to be used for sending / receiving
1502 generator: generator to be used for message generation
1503 timer: timer to be used for scheduling
1505 # References to outside objects.
1506 self.socket = bgp_socket
1507 self.generator = generator
1510 self.reader = ReadTracker(bgp_socket, timer)
1511 self.writer = WriteTracker(bgp_socket, generator, timer)
1512 # Prioritization state.
1513 self.prioritize_writing = False
1514 # In general, we prioritize reading over writing. But in order
1515 # not to get blocked by neverending reads, we should
1516 # check whether we are not risking running out of holdtime.
1517 # So in some situations, this field is set to True to attempt
1518 # finishing sending a message, after which this field resets
1520 # TODO: Alternative is to switch fairly between reading and
1521 # writing (called round robin from now on).
1522 # Message counting is done in generator.
1524 def perform_one_loop_iteration(self):
1525 """ The main loop iteration
1528 Calculates priority, resolves all conditions, calls
1529 appropriate method and returns to caller to repeat.
1531 self.timer.snapshot()
1532 if not self.prioritize_writing:
1533 if self.timer.is_time_for_my_keepalive():
1534 if not self.writer.sending_message:
1535 # We need to schedule a keepalive ASAP.
1536 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1537 logger.info("KEEP ALIVE is sent.")
1538 # We are sending a message now, so let's prioritize it.
1539 self.prioritize_writing = True
1540 # Now we know what our priorities are, we have to check
1541 # which actions are available.
1542 # socket.socket() returns three lists,
1543 # we store them to list of lists.
1544 list_list = select.select([self.socket], [self.socket], [self.socket],
1545 self.timer.report_timedelta)
1546 read_list, write_list, except_list = list_list
1547 # Lists are unpacked, each is either [] or [self.socket],
1548 # so we will test them as boolean.
1550 logger.error("Exceptional state on the socket.")
1551 raise RuntimeError("Exceptional state on socket", self.socket)
1552 # We will do either read or write.
1553 if not (self.prioritize_writing and write_list):
1554 # Either we have no reason to rush writes,
1555 # or the socket is not writable.
1556 # We are focusing on reading here.
1557 if read_list: # there is something to read indeed
1558 # In this case we want to read chunk of message
1559 # and repeat the select,
1560 self.reader.read_message_chunk()
1562 # We were focusing on reading, but nothing to read was there.
1563 # Good time to check peer for hold timer.
1564 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1565 # Quiet on the read front, we can have attempt to write.
1567 # Either we really want to reset peer's view of our hold
1568 # timer, or there was nothing to read.
1569 # Were we in the middle of sending a message?
1570 if self.writer.sending_message:
1571 # Was it the end of a message?
1572 whole = self.writer.send_message_chunk_is_whole()
1573 # We were pressed to send something and we did it.
1574 if self.prioritize_writing and whole:
1575 # We prioritize reading again.
1576 self.prioritize_writing = False
1578 # Finally to check if still update messages to be generated.
1579 if self.generator.remaining_prefixes:
1580 msg_out = self.generator.compose_update_message()
1581 if not self.generator.remaining_prefixes:
1582 # We have just finished update generation,
1583 # end-of-rib is due.
1584 logger.info("All update messages generated.")
1585 logger.info("Storing performance results.")
1586 self.generator.store_results()
1587 logger.info("Finally an END-OF-RIB is sent.")
1588 msg_out += self.generator.update_message(wr_prefixes=[],
1590 self.writer.enqueue_message_for_sending(msg_out)
1591 # Attempt for real sending to be done in next iteration.
1593 # Nothing to write anymore.
1594 # To avoid busy loop, we do idle waiting here.
1595 self.reader.wait_for_read()
1597 # We can neither read nor write.
1598 logger.warning("Input and output both blocked for " +
1599 str(self.timer.report_timedelta) + " seconds.")
1600 # FIXME: Are we sure select has been really waiting
1605 def create_logger(loglevel, logfile):
1606 """Create logger object
1609 :loglevel: log level
1610 :logfile: log file name
1612 :return: logger object
1614 logger = logging.getLogger("logger")
1615 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1616 console_handler = logging.StreamHandler()
1617 file_handler = logging.FileHandler(logfile, mode="w")
1618 console_handler.setFormatter(log_formatter)
1619 file_handler.setFormatter(log_formatter)
1620 logger.addHandler(console_handler)
1621 logger.addHandler(file_handler)
1622 logger.setLevel(loglevel)
1627 """One time initialisation and iterations looping.
1629 Establish BGP connection and run iterations.
1632 :arguments: Command line arguments
1636 bgp_socket = establish_connection(arguments)
1637 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1638 # Receive open message before sending anything.
1639 # FIXME: Add parameter to send default open message first,
1640 # to work with "you first" peers.
1641 msg_in = read_open_message(bgp_socket)
1642 timer = TimeTracker(msg_in)
1643 generator = MessageGenerator(arguments)
1644 msg_out = generator.open_message()
1645 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1646 # Send our open message to the peer.
1647 bgp_socket.send(msg_out)
1648 # Wait for confirming keepalive.
1649 # TODO: Surely in just one packet?
1650 # Using exact keepalive length to not to see possible updates.
1651 msg_in = bgp_socket.recv(19)
1652 if msg_in != generator.keepalive_message():
1653 error_msg = "Open not confirmed by keepalive, instead got"
1654 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1655 raise MessageError(error_msg, msg_in)
1656 timer.reset_peer_hold_time()
1657 # Send the keepalive to indicate the connection is accepted.
1658 timer.snapshot() # Remember this time.
1659 msg_out = generator.keepalive_message()
1660 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1661 bgp_socket.send(msg_out)
1662 # Use the remembered time.
1663 timer.reset_my_keepalive_time(timer.snapshot_time)
1664 # End of initial handshake phase.
1665 state = StateTracker(bgp_socket, generator, timer)
1666 while True: # main reactor loop
1667 state.perform_one_loop_iteration()
1670 def threaded_job(arguments):
1671 """Run the job threaded
1674 :arguments: Command line arguments
1678 amount_left = arguments.amount
1679 utils_left = arguments.multiplicity
1680 prefix_current = arguments.firstprefix
1681 myip_current = arguments.myip
1685 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1686 amount_left -= amount_per_util
1689 args = deepcopy(arguments)
1690 args.amount = amount_per_util
1691 args.firstprefix = prefix_current
1692 args.myip = myip_current
1693 thread_args.append(args)
1697 prefix_current += amount_per_util * 16
1702 for t in thread_args:
1703 thread.start_new_thread(job, (t,))
1705 print "Error: unable to start thread."
1708 # Work remains forever
1713 if __name__ == "__main__":
1714 arguments = parse_arguments()
1715 logger = create_logger(arguments.loglevel, arguments.logfile)
1716 threaded_job(arguments)