1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
24 from copy import deepcopy
27 __author__ = "Vratko Polak"
28 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
29 __license__ = "Eclipse Public License v1.0"
30 __email__ = "vrpolak@cisco.com"
33 def parse_arguments():
34 """Use argparse to get arguments,
39 parser = argparse.ArgumentParser()
40 # TODO: Should we use --argument-names-with-spaces?
41 str_help = "Autonomous System number use in the stream."
42 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
43 # FIXME: We are acting as iBGP peer,
44 # we should mirror AS number from peer's open message.
45 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
46 parser.add_argument("--amount", default="1", type=int, help=str_help)
47 str_help = "Maximum number of IP prefixes to be announced in one iteration"
48 parser.add_argument("--insert", default="1", type=int, help=str_help)
49 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
50 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
51 str_help = "The number of prefixes to process without withdrawals"
52 parser.add_argument("--prefill", default="0", type=int, help=str_help)
53 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
54 parser.add_argument("--updates", choices=["single", "separate"],
55 default=["separate"], help=str_help)
56 str_help = "Base prefix IP address for prefix generation"
57 parser.add_argument("--firstprefix", default="8.0.1.0",
58 type=ipaddr.IPv4Address, help=str_help)
59 str_help = "The prefix length."
60 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
61 str_help = "Listen for connection, instead of initiating it."
62 parser.add_argument("--listen", action="store_true", help=str_help)
63 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
64 "Default value only suitable for listening.")
65 parser.add_argument("--myip", default="0.0.0.0",
66 type=ipaddr.IPv4Address, help=str_help)
67 str_help = ("TCP port to bind to when listening or initiating connection." +
68 "Default only suitable for initiating.")
69 parser.add_argument("--myport", default="0", type=int, help=str_help)
70 str_help = "The IP of the next hop to be placed into the update messages."
71 parser.add_argument("--nexthop", default="192.0.2.1",
72 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
73 str_help = "Identifier of the route originator."
74 parser.add_argument("--originator", default=None,
75 type=ipaddr.IPv4Address, dest="originator", help=str_help)
76 str_help = "Cluster list item identifier."
77 parser.add_argument("--cluster", default=None,
78 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
79 str_help = ("Numeric IP Address to try to connect to." +
80 "Currently no effect in listening mode.")
81 parser.add_argument("--peerip", default="127.0.0.2",
82 type=ipaddr.IPv4Address, help=str_help)
83 str_help = "TCP port to try to connect to. No effect in listening mode."
84 parser.add_argument("--peerport", default="179", type=int, help=str_help)
85 str_help = "Local hold time."
86 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
87 str_help = "Log level (--error, --warning, --info, --debug)"
88 parser.add_argument("--error", dest="loglevel", action="store_const",
89 const=logging.ERROR, default=logging.INFO,
91 parser.add_argument("--warning", dest="loglevel", action="store_const",
92 const=logging.WARNING, default=logging.INFO,
94 parser.add_argument("--info", dest="loglevel", action="store_const",
95 const=logging.INFO, default=logging.INFO,
97 parser.add_argument("--debug", dest="loglevel", action="store_const",
98 const=logging.DEBUG, default=logging.INFO,
100 str_help = "Log file name"
101 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
102 str_help = "Trailing part of the csv result files for plotting purposes"
103 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
104 str_help = "Minimum number of updates to reach to include result into csv."
105 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
106 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
107 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
108 str_help = "Link-State NLRI supported"
109 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
110 str_help = "Link-State NLRI: Identifier"
111 parser.add_argument("-lsid", default="1", type=int, help=str_help)
112 str_help = "Link-State NLRI: Tunnel ID"
113 parser.add_argument("-lstid", default="1", type=int, help=str_help)
114 str_help = "Link-State NLRI: LSP ID"
115 parser.add_argument("-lspid", default="1", type=int, help=str_help)
116 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
117 parser.add_argument("--lstsaddr", default="1.2.3.4",
118 type=ipaddr.IPv4Address, help=str_help)
119 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
120 parser.add_argument("--lsteaddr", default="5.6.7.8",
121 type=ipaddr.IPv4Address, help=str_help)
122 str_help = "Link-State NLRI: Identifier Step"
123 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
124 str_help = "Link-State NLRI: Tunnel ID Step"
125 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
126 str_help = "Link-State NLRI: LSP ID Step"
127 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
128 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
129 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
130 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
131 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
132 str_help = "How many play utilities are to be started."
133 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
134 arguments = parser.parse_args()
135 if arguments.multiplicity < 1:
136 print "Multiplicity", arguments.multiplicity, "is not positive."
138 # TODO: Are sanity checks (such as asnumber>=0) required?
142 def establish_connection(arguments):
143 """Establish connection to BGP peer.
146 :arguments: following command-line argumets are used
147 - arguments.myip: local IP address
148 - arguments.myport: local port
149 - arguments.peerip: remote IP address
150 - arguments.peerport: remote port
155 logger.info("Connecting in the listening mode.")
156 logger.debug("Local IP address: " + str(arguments.myip))
157 logger.debug("Local port: " + str(arguments.myport))
158 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
159 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
160 # bind need single tuple as argument
161 listening_socket.bind((str(arguments.myip), arguments.myport))
162 listening_socket.listen(1)
163 bgp_socket, _ = listening_socket.accept()
164 # TODO: Verify client IP is cotroller IP.
165 listening_socket.close()
167 logger.info("Connecting in the talking mode.")
168 logger.debug("Local IP address: " + str(arguments.myip))
169 logger.debug("Local port: " + str(arguments.myport))
170 logger.debug("Remote IP address: " + str(arguments.peerip))
171 logger.debug("Remote port: " + str(arguments.peerport))
172 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
173 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
174 # bind to force specified address and port
175 talking_socket.bind((str(arguments.myip), arguments.myport))
176 # socket does not spead ipaddr, hence str()
177 talking_socket.connect((str(arguments.peerip), arguments.peerport))
178 bgp_socket = talking_socket
179 logger.info("Connected to ODL.")
183 def get_short_int_from_message(message, offset=16):
184 """Extract 2-bytes number from provided message.
187 :message: given message
188 :offset: offset of the short_int inside the message
190 :return: required short_inf value.
192 default offset value is the BGP message size offset.
194 high_byte_int = ord(message[offset])
195 low_byte_int = ord(message[offset + 1])
196 short_int = high_byte_int * 256 + low_byte_int
200 def get_prefix_list_from_hex(prefixes_hex):
201 """Get decoded list of prefixes (rfc4271#section-4.3)
204 :prefixes_hex: list of prefixes to be decoded in hex
206 :return: list of prefixes in the form of ip address (X.X.X.X/X)
210 while offset < len(prefixes_hex):
211 prefix_bit_len_hex = prefixes_hex[offset]
212 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
213 prefix_len = ((prefix_bit_len - 1) / 8) + 1
214 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
215 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
216 offset += 1 + prefix_len
217 prefix_list.append(prefix + "/" + str(prefix_bit_len))
221 class MessageError(ValueError):
222 """Value error with logging optimized for hexlified messages."""
224 def __init__(self, text, message, *args):
227 Store and call super init for textual comment,
228 store raw message which caused it.
232 super(MessageError, self).__init__(text, message, *args)
235 """Generate human readable error message.
238 :return: human readable message as string
240 Use a placeholder string if the message is to be empty.
242 message = binascii.hexlify(self.msg)
244 message = "(empty message)"
245 return self.text + ": " + message
248 def read_open_message(bgp_socket):
249 """Receive peer's OPEN message
252 :bgp_socket: the socket to be read
254 :return: received OPEN message.
256 Performs just basic incomming message checks
258 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
259 # TODO: Can the incoming open message be split in more than one packet?
262 # 37 is minimal length of open message with 4-byte AS number.
264 "Message length (" + str(len(msg_in)) + ") is smaller than "
265 "minimal length of OPEN message with 4-byte AS number (37)"
267 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
268 raise MessageError(error_msg, msg_in)
269 # TODO: We could check BGP marker, but it is defined only later;
271 reported_length = get_short_int_from_message(msg_in)
272 if len(msg_in) != reported_length:
274 "Expected message length (" + reported_length +
275 ") does not match actual length (" + str(len(msg_in)) + ")"
277 logger.error(error_msg + binascii.hexlify(msg_in))
278 raise MessageError(error_msg, msg_in)
279 logger.info("Open message received.")
283 class MessageGenerator(object):
284 """Class which generates messages, holds states and configuration values."""
286 # TODO: Define bgp marker as a class (constant) variable.
287 def __init__(self, args):
288 """Initialisation according to command-line args.
291 :args: argsparser's Namespace object which contains command-line
292 options for MesageGenerator initialisation
294 Calculates and stores default values used later on for
297 self.total_prefix_amount = args.amount
298 # Number of update messages left to be sent.
299 self.remaining_prefixes = self.total_prefix_amount
301 # New parameters initialisation
303 self.prefix_base_default = args.firstprefix
304 self.prefix_length_default = args.prefixlen
305 self.wr_prefixes_default = []
306 self.nlri_prefixes_default = []
307 self.version_default = 4
308 self.my_autonomous_system_default = args.asnumber
309 self.hold_time_default = args.holdtime # Local hold time.
310 self.bgp_identifier_default = int(args.myip)
311 self.next_hop_default = args.nexthop
312 self.originator_id_default = args.originator
313 self.cluster_list_item_default = args.cluster
314 self.single_update_default = args.updates == "single"
315 self.randomize_updates_default = args.updates == "random"
316 self.prefix_count_to_add_default = args.insert
317 self.prefix_count_to_del_default = args.withdraw
318 if self.prefix_count_to_del_default < 0:
319 self.prefix_count_to_del_default = 0
320 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
321 # total number of prefixes must grow to avoid infinite test loop
322 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
323 self.slot_size_default = self.prefix_count_to_add_default
324 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
325 self.results_file_name_default = args.results
326 self.performance_threshold_default = args.threshold
327 self.rfc4760 = args.rfc4760
328 self.bgpls = args.bgpls
329 # Default values when BGP-LS Attributes are used
331 self.prefix_count_to_add_default = 1
332 self.prefix_count_to_del_default = 0
333 self.ls_nlri_default = {"Identifier": args.lsid,
334 "TunnelID": args.lstid,
336 "IPv4TunnelSenderAddress": args.lstsaddr,
337 "IPv4TunnelEndPointAddress": args.lsteaddr}
338 self.lsid_step = args.lsidstep
339 self.lstid_step = args.lstidstep
340 self.lspid_step = args.lspidstep
341 self.lstsaddr_step = args.lstsaddrstep
342 self.lsteaddr_step = args.lsteaddrstep
343 # Default values used for randomized part
344 s1_slots = ((self.total_prefix_amount -
345 self.remaining_prefixes_threshold - 1) /
346 self.prefix_count_to_add_default + 1)
347 s2_slots = ((self.remaining_prefixes_threshold - 1) /
348 (self.prefix_count_to_add_default -
349 self.prefix_count_to_del_default) + 1)
351 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
352 s2_first_index = s1_slots * self.prefix_count_to_add_default
353 s2_last_index = (s2_first_index +
354 s2_slots * (self.prefix_count_to_add_default -
355 self.prefix_count_to_del_default) - 1)
356 self.slot_gap_default = ((self.total_prefix_amount -
357 self.remaining_prefixes_threshold - 1) /
358 self.prefix_count_to_add_default + 1)
359 self.randomize_lowest_default = s2_first_index
360 self.randomize_highest_default = s2_last_index
361 # Initialising counters
362 self.phase1_start_time = 0
363 self.phase1_stop_time = 0
364 self.phase2_start_time = 0
365 self.phase2_stop_time = 0
366 self.phase1_updates_sent = 0
367 self.phase2_updates_sent = 0
368 self.updates_sent = 0
370 self.log_info = args.loglevel <= logging.INFO
371 self.log_debug = args.loglevel <= logging.DEBUG
373 Flags needed for the MessageGenerator performance optimization.
374 Calling logger methods each iteration even with proper log level set
375 slows down significantly the MessageGenerator performance.
376 Measured total generation time (1M updates, dry run, error log level):
377 - logging based on basic logger features: 36,2s
378 - logging based on advanced logger features (lazy logging): 21,2s
379 - conditional calling of logger methods enclosed inside condition: 8,6s
382 logger.info("Generator initialisation")
383 logger.info(" Target total number of prefixes to be introduced: " +
384 str(self.total_prefix_amount))
385 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
386 str(self.prefix_length_default))
387 logger.info(" My Autonomous System number: " +
388 str(self.my_autonomous_system_default))
389 logger.info(" My Hold Time: " + str(self.hold_time_default))
390 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
391 logger.info(" Next Hop: " + str(self.next_hop_default))
392 logger.info(" Originator ID: " + str(self.originator_id_default))
393 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
394 logger.info(" Prefix count to be inserted at once: " +
395 str(self.prefix_count_to_add_default))
396 logger.info(" Prefix count to be withdrawn at once: " +
397 str(self.prefix_count_to_del_default))
398 logger.info(" Fast pre-fill up to " +
399 str(self.total_prefix_amount -
400 self.remaining_prefixes_threshold) + " prefixes")
401 logger.info(" Remaining number of prefixes to be processed " +
402 "in parallel with withdrawals: " +
403 str(self.remaining_prefixes_threshold))
404 logger.debug(" Prefix index range used after pre-fill procedure [" +
405 str(self.randomize_lowest_default) + ", " +
406 str(self.randomize_highest_default) + "]")
407 if self.single_update_default:
408 logger.info(" Common single UPDATE will be generated " +
409 "for both NLRI & WITHDRAWN lists")
411 logger.info(" Two separate UPDATEs will be generated " +
412 "for each NLRI & WITHDRAWN lists")
413 if self.randomize_updates_default:
414 logger.info(" Generation of UPDATE messages will be randomized")
415 logger.info(" Let\'s go ...\n")
417 # TODO: Notification for hold timer expiration can be handy.
419 def store_results(self, file_name=None, threshold=None):
420 """ Stores specified results into files based on file_name value.
423 :param file_name: Trailing (common) part of result file names
424 :param threshold: Minimum number of sent updates needed for each
425 result to be included into result csv file
426 (mainly needed because of the result accuracy)
430 # default values handling
431 # TODO optimize default values handling (use e.g. dicionary.update() approach)
432 if file_name is None:
433 file_name = self.results_file_name_default
434 if threshold is None:
435 threshold = self.performance_threshold_default
436 # performance calculation
437 if self.phase1_updates_sent >= threshold:
438 totals1 = self.phase1_updates_sent
439 performance1 = int(self.phase1_updates_sent /
440 (self.phase1_stop_time - self.phase1_start_time))
444 if self.phase2_updates_sent >= threshold:
445 totals2 = self.phase2_updates_sent
446 performance2 = int(self.phase2_updates_sent /
447 (self.phase2_stop_time - self.phase2_start_time))
452 logger.info("#" * 10 + " Final results " + "#" * 10)
453 logger.info("Number of iterations: " + str(self.iteration))
454 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
455 str(self.phase1_updates_sent))
456 logger.info("The pre-fill phase duration: " +
457 str(self.phase1_stop_time - self.phase1_start_time) + "s")
458 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
459 str(self.phase2_updates_sent))
460 logger.info("The 2nd test phase duration: " +
461 str(self.phase2_stop_time - self.phase2_start_time) + "s")
462 logger.info("Threshold for performance reporting: " + str(threshold))
465 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
466 " route(s) per UPDATE")
467 if self.single_update_default:
468 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
469 "/-" + str(self.prefix_count_to_del_default) +
470 " routes per UPDATE")
472 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
473 "/-" + str(self.prefix_count_to_del_default) +
474 " routes in two UPDATEs")
475 # collecting capacity and performance results
478 if totals1 is not None:
479 totals[phase1_label] = totals1
480 performance[phase1_label] = performance1
481 if totals2 is not None:
482 totals[phase2_label] = totals2
483 performance[phase2_label] = performance2
484 self.write_results_to_file(totals, "totals-" + file_name)
485 self.write_results_to_file(performance, "performance-" + file_name)
487 def write_results_to_file(self, results, file_name):
488 """Writes results to the csv plot file consumable by Jenkins.
491 :param file_name: Name of the (csv) file to be created
497 f = open(file_name, "wt")
499 for key in sorted(results):
500 first_line += key + ", "
501 second_line += str(results[key]) + ", "
502 first_line = first_line[:-2]
503 second_line = second_line[:-2]
504 f.write(first_line + "\n")
505 f.write(second_line + "\n")
506 logger.info("Message generator performance results stored in " +
508 logger.info(" " + first_line)
509 logger.info(" " + second_line)
513 # Return pseudo-randomized (reproducible) index for selected range
514 def randomize_index(self, index, lowest=None, highest=None):
515 """Calculates pseudo-randomized index from selected range.
518 :param index: input index
519 :param lowest: the lowes index from the randomized area
520 :param highest: the highest index from the randomized area
522 :return: the (pseudo)randomized index
524 Created just as a fame for future generator enhancement.
526 # default values handling
527 # TODO optimize default values handling (use e.g. dicionary.update() approach)
529 lowest = self.randomize_lowest_default
531 highest = self.randomize_highest_default
533 if (index >= lowest) and (index <= highest):
534 # we are in the randomized range -> shuffle it inside
535 # the range (now just reverse the order)
536 new_index = highest - (index - lowest)
538 # we are out of the randomized range -> nothing to do
542 def get_ls_nlri_values(self, index):
543 """Generates LS-NLRI parameters.
544 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
547 :param index: index (iteration)
549 :return: dictionary of LS NLRI parameters and values
551 # generating list of LS NLRI parameters
552 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
553 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
554 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
555 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
556 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
557 ls_nlri_values = {"Identifier": identifier,
558 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
559 "TunnelID": tunnel_id, "LSPID": lsp_id,
560 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
561 return ls_nlri_values
563 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
564 prefix_len=None, prefix_count=None, randomize=None):
565 """Generates list of IP address prefixes.
568 :param slot_index: index of group of prefix addresses
569 :param slot_size: size of group of prefix addresses
570 in [number of included prefixes]
571 :param prefix_base: IP address of the first prefix
572 (slot_index = 0, prefix_index = 0)
573 :param prefix_len: length of the prefix in bites
574 (the same as size of netmask)
575 :param prefix_count: number of prefixes to be returned
576 from the specified slot
578 :return: list of generated IP address prefixes
580 # default values handling
581 # TODO optimize default values handling (use e.g. dicionary.update() approach)
582 if slot_size is None:
583 slot_size = self.slot_size_default
584 if prefix_base is None:
585 prefix_base = self.prefix_base_default
586 if prefix_len is None:
587 prefix_len = self.prefix_length_default
588 if prefix_count is None:
589 prefix_count = slot_size
590 if randomize is None:
591 randomize = self.randomize_updates_default
592 # generating list of prefixes
595 prefix_gap = 2 ** (32 - prefix_len)
596 for i in range(prefix_count):
597 prefix_index = slot_index * slot_size + i
599 prefix_index = self.randomize_index(prefix_index)
600 indexes.append(prefix_index)
601 prefixes.append(prefix_base + prefix_index * prefix_gap)
603 logger.debug(" Prefix slot index: " + str(slot_index))
604 logger.debug(" Prefix slot size: " + str(slot_size))
605 logger.debug(" Prefix count: " + str(prefix_count))
606 logger.debug(" Prefix indexes: " + str(indexes))
607 logger.debug(" Prefix list: " + str(prefixes))
610 def compose_update_message(self, prefix_count_to_add=None,
611 prefix_count_to_del=None):
612 """Composes an UPDATE message
615 :param prefix_count_to_add: # of prefixes to put into NLRI list
616 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
618 :return: encoded UPDATE message in HEX
620 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
621 lists or common message wich includes both prefix lists.
622 Updates global counters.
624 # default values handling
625 # TODO optimize default values handling (use e.g. dicionary.update() approach)
626 if prefix_count_to_add is None:
627 prefix_count_to_add = self.prefix_count_to_add_default
628 if prefix_count_to_del is None:
629 prefix_count_to_del = self.prefix_count_to_del_default
631 if self.log_info and not (self.iteration % 1000):
632 logger.info("Iteration: " + str(self.iteration) +
633 " - total remaining prefixes: " +
634 str(self.remaining_prefixes))
636 logger.debug("#" * 10 + " Iteration: " +
637 str(self.iteration) + " " + "#" * 10)
638 logger.debug("Remaining prefixes: " +
639 str(self.remaining_prefixes))
640 # scenario type & one-shot counter
641 straightforward_scenario = (self.remaining_prefixes >
642 self.remaining_prefixes_threshold)
643 if straightforward_scenario:
644 prefix_count_to_del = 0
646 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
647 if not self.phase1_start_time:
648 self.phase1_start_time = time.time()
651 logger.debug("--- COMBINED SCENARIO ---")
652 if not self.phase2_start_time:
653 self.phase2_start_time = time.time()
654 # tailor the number of prefixes if needed
655 prefix_count_to_add = (prefix_count_to_del +
656 min(prefix_count_to_add - prefix_count_to_del,
657 self.remaining_prefixes))
658 # prefix slots selection for insertion and withdrawal
659 slot_index_to_add = self.iteration
660 slot_index_to_del = slot_index_to_add - self.slot_gap_default
661 # getting lists of prefixes for insertion in this iteration
663 logger.debug("Prefixes to be inserted in this iteration:")
664 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
665 prefix_count=prefix_count_to_add)
666 # getting lists of prefixes for withdrawal in this iteration
668 logger.debug("Prefixes to be withdrawn in this iteration:")
669 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
670 prefix_count=prefix_count_to_del)
671 # generating the UPDATE mesage with LS-NLRI only
673 ls_nlri = self.get_ls_nlri_values(self.iteration)
674 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
677 # generating the UPDATE message with prefix lists
678 if self.single_update_default:
679 # Send prefixes to be introduced and withdrawn
680 # in one UPDATE message
681 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
682 nlri_prefixes=prefix_list_to_add)
684 # Send prefixes to be introduced and withdrawn
685 # in separate UPDATE messages (if needed)
686 msg_out = self.update_message(wr_prefixes=[],
687 nlri_prefixes=prefix_list_to_add)
688 if prefix_count_to_del:
689 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
691 # updating counters - who knows ... maybe I am last time here ;)
692 if straightforward_scenario:
693 self.phase1_stop_time = time.time()
694 self.phase1_updates_sent = self.updates_sent
696 self.phase2_stop_time = time.time()
697 self.phase2_updates_sent = (self.updates_sent -
698 self.phase1_updates_sent)
699 # updating totals for the next iteration
701 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
702 # returning the encoded message
705 # Section of message encoders
707 def open_message(self, version=None, my_autonomous_system=None,
708 hold_time=None, bgp_identifier=None):
709 """Generates an OPEN Message (rfc4271#section-4.2)
712 :param version: see the rfc4271#section-4.2
713 :param my_autonomous_system: see the rfc4271#section-4.2
714 :param hold_time: see the rfc4271#section-4.2
715 :param bgp_identifier: see the rfc4271#section-4.2
717 :return: encoded OPEN message in HEX
720 # default values handling
721 # TODO optimize default values handling (use e.g. dicionary.update() approach)
723 version = self.version_default
724 if my_autonomous_system is None:
725 my_autonomous_system = self.my_autonomous_system_default
726 if hold_time is None:
727 hold_time = self.hold_time_default
728 if bgp_identifier is None:
729 bgp_identifier = self.bgp_identifier_default
732 marker_hex = "\xFF" * 16
736 type_hex = struct.pack("B", type)
739 version_hex = struct.pack("B", version)
741 # my_autonomous_system
742 # AS_TRANS value, 23456 decadic.
743 my_autonomous_system_2_bytes = 23456
744 # AS number is mappable to 2 bytes
745 if my_autonomous_system < 65536:
746 my_autonomous_system_2_bytes = my_autonomous_system
747 my_autonomous_system_hex_2_bytes = struct.pack(">H",
748 my_autonomous_system)
751 hold_time_hex = struct.pack(">H", hold_time)
754 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
756 # Optional Parameters
757 optional_parameters_hex = ""
759 optional_parameter_hex = (
760 "\x02" # Param type ("Capability Ad")
761 "\x06" # Length (6 bytes)
762 "\x01" # Capability type (NLRI Unicast),
763 # see RFC 4760, secton 8
764 "\x04" # Capability value length
765 "\x00\x01" # AFI (Ipv4)
767 "\x01" # SAFI (Unicast)
769 optional_parameters_hex += optional_parameter_hex
772 optional_parameter_hex = (
773 "\x02" # Param type ("Capability Ad")
774 "\x06" # Length (6 bytes)
775 "\x01" # Capability type (NLRI Unicast),
776 # see RFC 4760, secton 8
777 "\x04" # Capability value length
778 "\x40\x04" # AFI (BGP-LS)
780 "\x47" # SAFI (BGP-LS)
782 optional_parameters_hex += optional_parameter_hex
784 optional_parameter_hex = (
785 "\x02" # Param type ("Capability Ad")
786 "\x06" # Length (6 bytes)
787 "\x41" # "32 bit AS Numbers Support"
788 # (see RFC 6793, section 3)
789 "\x04" # Capability value length
791 optional_parameter_hex += (
792 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
794 optional_parameters_hex += optional_parameter_hex
796 # Optional Parameters Length
797 optional_parameters_length = len(optional_parameters_hex)
798 optional_parameters_length_hex = struct.pack("B",
799 optional_parameters_length)
801 # Length (big-endian)
803 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
804 len(my_autonomous_system_hex_2_bytes) +
805 len(hold_time_hex) + len(bgp_identifier_hex) +
806 len(optional_parameters_length_hex) +
807 len(optional_parameters_hex)
809 length_hex = struct.pack(">H", length)
817 my_autonomous_system_hex_2_bytes +
820 optional_parameters_length_hex +
821 optional_parameters_hex
825 logger.debug("OPEN message encoding")
826 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
827 logger.debug(" Length=" + str(length) + " (0x" +
828 binascii.hexlify(length_hex) + ")")
829 logger.debug(" Type=" + str(type) + " (0x" +
830 binascii.hexlify(type_hex) + ")")
831 logger.debug(" Version=" + str(version) + " (0x" +
832 binascii.hexlify(version_hex) + ")")
833 logger.debug(" My Autonomous System=" +
834 str(my_autonomous_system_2_bytes) + " (0x" +
835 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
837 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
838 binascii.hexlify(hold_time_hex) + ")")
839 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
840 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
841 logger.debug(" Optional Parameters Length=" +
842 str(optional_parameters_length) + " (0x" +
843 binascii.hexlify(optional_parameters_length_hex) +
845 logger.debug(" Optional Parameters=0x" +
846 binascii.hexlify(optional_parameters_hex))
847 logger.debug("OPEN message encoded: 0x%s",
848 binascii.b2a_hex(message_hex))
852 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
853 wr_prefix_length=None, nlri_prefix_length=None,
854 my_autonomous_system=None, next_hop=None,
855 originator_id=None, cluster_list_item=None,
856 end_of_rib=False, **ls_nlri_params):
857 """Generates an UPDATE Message (rfc4271#section-4.3)
860 :param wr_prefixes: see the rfc4271#section-4.3
861 :param nlri_prefixes: see the rfc4271#section-4.3
862 :param wr_prefix_length: see the rfc4271#section-4.3
863 :param nlri_prefix_length: see the rfc4271#section-4.3
864 :param my_autonomous_system: see the rfc4271#section-4.3
865 :param next_hop: see the rfc4271#section-4.3
867 :return: encoded UPDATE message in HEX
870 # default values handling
871 # TODO optimize default values handling (use e.g. dicionary.update() approach)
872 if wr_prefixes is None:
873 wr_prefixes = self.wr_prefixes_default
874 if nlri_prefixes is None:
875 nlri_prefixes = self.nlri_prefixes_default
876 if wr_prefix_length is None:
877 wr_prefix_length = self.prefix_length_default
878 if nlri_prefix_length is None:
879 nlri_prefix_length = self.prefix_length_default
880 if my_autonomous_system is None:
881 my_autonomous_system = self.my_autonomous_system_default
883 next_hop = self.next_hop_default
884 if originator_id is None:
885 originator_id = self.originator_id_default
886 if cluster_list_item is None:
887 cluster_list_item = self.cluster_list_item_default
888 ls_nlri = self.ls_nlri_default.copy()
889 ls_nlri.update(ls_nlri_params)
892 marker_hex = "\xFF" * 16
896 type_hex = struct.pack("B", type)
899 withdrawn_routes_hex = ""
901 bytes = ((wr_prefix_length - 1) / 8) + 1
902 for prefix in wr_prefixes:
903 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
904 struct.pack(">I", int(prefix))[:bytes])
905 withdrawn_routes_hex += withdrawn_route_hex
907 # Withdrawn Routes Length
908 withdrawn_routes_length = len(withdrawn_routes_hex)
909 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
911 # TODO: to replace hardcoded string by encoding?
913 path_attributes_hex = ""
914 if nlri_prefixes != []:
915 path_attributes_hex += (
916 "\x40" # Flags ("Well-Known")
917 "\x01" # Type (ORIGIN)
921 path_attributes_hex += (
922 "\x40" # Flags ("Well-Known")
923 "\x02" # Type (AS_PATH)
925 "\x02" # AS segment type (AS_SEQUENCE)
926 "\x01" # AS segment length (1)
928 my_as_hex = struct.pack(">I", my_autonomous_system)
929 path_attributes_hex += my_as_hex # AS segment (4 bytes)
930 path_attributes_hex += (
931 "\x40" # Flags ("Well-Known")
932 "\x03" # Type (NEXT_HOP)
935 next_hop_hex = struct.pack(">I", int(next_hop))
936 path_attributes_hex += (
937 next_hop_hex # IP address of the next hop (4 bytes)
939 path_attributes_hex += (
940 "\x40" # Flags ("Well-Known")
941 "\x05" # Type (LOCAL_PREF)
943 "\x00\x00\x00\x64" # (100)
945 if originator_id is not None:
946 path_attributes_hex += (
947 "\x80" # Flags ("Optional, non-transitive")
948 "\x09" # Type (ORIGINATOR_ID)
950 ) # ORIGINATOR_ID (4 bytes)
951 path_attributes_hex += struct.pack(">I", int(originator_id))
952 if cluster_list_item is not None:
953 path_attributes_hex += (
954 "\x80" # Flags ("Optional, non-transitive")
955 "\x09" # Type (CLUSTER_LIST)
957 ) # one CLUSTER_LIST item (4 bytes)
958 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
960 if self.bgpls and not end_of_rib:
961 path_attributes_hex += (
962 "\x80" # Flags ("Optional, non-transitive")
963 "\x0e" # Type (MP_REACH_NLRI)
965 "\x40\x04" # AFI (BGP-LS)
966 "\x47" # SAFI (BGP-LS)
967 "\x04" # Next Hop Length (4)
969 path_attributes_hex += struct.pack(">I", int(next_hop))
970 path_attributes_hex += "\x00" # Reserved
971 path_attributes_hex += (
972 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
973 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
974 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
976 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
977 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
978 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
979 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
980 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
982 # Total Path Attributes Length
983 total_path_attributes_length = len(path_attributes_hex)
984 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
986 # Network Layer Reachability Information
989 bytes = ((nlri_prefix_length - 1) / 8) + 1
990 for prefix in nlri_prefixes:
991 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
992 struct.pack(">I", int(prefix))[:bytes])
993 nlri_hex += nlri_prefix_hex
995 # Length (big-endian)
997 len(marker_hex) + 2 + len(type_hex) +
998 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
999 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1001 length_hex = struct.pack(">H", length)
1008 withdrawn_routes_length_hex +
1009 withdrawn_routes_hex +
1010 total_path_attributes_length_hex +
1011 path_attributes_hex +
1016 logger.debug("UPDATE message encoding")
1017 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1018 logger.debug(" Length=" + str(length) + " (0x" +
1019 binascii.hexlify(length_hex) + ")")
1020 logger.debug(" Type=" + str(type) + " (0x" +
1021 binascii.hexlify(type_hex) + ")")
1022 logger.debug(" withdrawn_routes_length=" +
1023 str(withdrawn_routes_length) + " (0x" +
1024 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1025 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1026 str(wr_prefix_length) + " (0x" +
1027 binascii.hexlify(withdrawn_routes_hex) + ")")
1028 if total_path_attributes_length:
1029 logger.debug(" Total Path Attributes Length=" +
1030 str(total_path_attributes_length) + " (0x" +
1031 binascii.hexlify(total_path_attributes_length_hex) + ")")
1032 logger.debug(" Path Attributes=" + "(0x" +
1033 binascii.hexlify(path_attributes_hex) + ")")
1034 logger.debug(" Origin=IGP")
1035 logger.debug(" AS path=" + str(my_autonomous_system))
1036 logger.debug(" Next hop=" + str(next_hop))
1037 if originator_id is not None:
1038 logger.debug(" Originator id=" + str(originator_id))
1039 if cluster_list_item is not None:
1040 logger.debug(" Cluster list=" + str(cluster_list_item))
1042 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1043 logger.debug(" Network Layer Reachability Information=" +
1044 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1045 " (0x" + binascii.hexlify(nlri_hex) + ")")
1046 logger.debug("UPDATE message encoded: 0x" +
1047 binascii.b2a_hex(message_hex))
1050 self.updates_sent += 1
1051 # returning encoded message
1054 def notification_message(self, error_code, error_subcode, data_hex=""):
1055 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1058 :param error_code: see the rfc4271#section-4.5
1059 :param error_subcode: see the rfc4271#section-4.5
1060 :param data_hex: see the rfc4271#section-4.5
1062 :return: encoded NOTIFICATION message in HEX
1066 marker_hex = "\xFF" * 16
1070 type_hex = struct.pack("B", type)
1073 error_code_hex = struct.pack("B", error_code)
1076 error_subcode_hex = struct.pack("B", error_subcode)
1078 # Length (big-endian)
1079 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1080 len(error_subcode_hex) + len(data_hex))
1081 length_hex = struct.pack(">H", length)
1083 # NOTIFICATION Message
1094 logger.debug("NOTIFICATION message encoding")
1095 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1096 logger.debug(" Length=" + str(length) + " (0x" +
1097 binascii.hexlify(length_hex) + ")")
1098 logger.debug(" Type=" + str(type) + " (0x" +
1099 binascii.hexlify(type_hex) + ")")
1100 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1101 binascii.hexlify(error_code_hex) + ")")
1102 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1103 binascii.hexlify(error_subcode_hex) + ")")
1104 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1105 logger.debug("NOTIFICATION message encoded: 0x%s",
1106 binascii.b2a_hex(message_hex))
1110 def keepalive_message(self):
1111 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1114 :return: encoded KEEP ALIVE message in HEX
1118 marker_hex = "\xFF" * 16
1122 type_hex = struct.pack("B", type)
1124 # Length (big-endian)
1125 length = len(marker_hex) + 2 + len(type_hex)
1126 length_hex = struct.pack(">H", length)
1128 # KEEP ALIVE Message
1136 logger.debug("KEEP ALIVE message encoding")
1137 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1138 logger.debug(" Length=" + str(length) + " (0x" +
1139 binascii.hexlify(length_hex) + ")")
1140 logger.debug(" Type=" + str(type) + " (0x" +
1141 binascii.hexlify(type_hex) + ")")
1142 logger.debug("KEEP ALIVE message encoded: 0x%s",
1143 binascii.b2a_hex(message_hex))
1148 class TimeTracker(object):
1149 """Class for tracking timers, both for my keepalives and
1153 def __init__(self, msg_in):
1154 """Initialisation. based on defaults and OPEN message from peer.
1157 msg_in: the OPEN message received from peer.
1159 # Note: Relative time is always named timedelta, to stress that
1160 # the (non-delta) time is absolute.
1161 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1162 # Upper bound for being stuck in the same state, we should
1163 # at least report something before continuing.
1164 # Negotiate the hold timer by taking the smaller
1165 # of the 2 values (mine and the peer's).
1166 hold_timedelta = 180 # Not an attribute of self yet.
1167 # TODO: Make the default value configurable,
1168 # default value could mirror what peer said.
1169 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1170 if hold_timedelta > peer_hold_timedelta:
1171 hold_timedelta = peer_hold_timedelta
1172 if hold_timedelta != 0 and hold_timedelta < 3:
1173 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1174 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1175 self.hold_timedelta = hold_timedelta
1176 # If we do not hear from peer this long, we assume it has died.
1177 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1178 # Upper limit for duration between messages, to avoid being
1179 # declared to be dead.
1180 # The same as calling snapshot(), but also declares a field.
1181 self.snapshot_time = time.time()
1182 # Sometimes we need to store time. This is where to get
1183 # the value from afterwards. Time_keepalive may be too strict.
1184 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1185 # At this time point, peer will be declared dead.
1186 self.my_keepalive_time = None # to be set later
1187 # At this point, we should be sending keepalive message.
1190 """Store current time in instance data to use later."""
1191 # Read as time before something interesting was called.
1192 self.snapshot_time = time.time()
1194 def reset_peer_hold_time(self):
1195 """Move hold time to future as peer has just proven it still lives."""
1196 self.peer_hold_time = time.time() + self.hold_timedelta
1198 # Some methods could rely on self.snapshot_time, but it is better
1199 # to require user to provide it explicitly.
1200 def reset_my_keepalive_time(self, keepalive_time):
1201 """Calculate and set the next my KEEP ALIVE timeout time
1204 :keepalive_time: the initial value of the KEEP ALIVE timer
1206 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1208 def is_time_for_my_keepalive(self):
1209 """Check for my KEEP ALIVE timeout occurence"""
1210 if self.hold_timedelta == 0:
1212 return self.snapshot_time >= self.my_keepalive_time
1214 def get_next_event_time(self):
1215 """Set the time of the next expected or to be sent KEEP ALIVE"""
1216 if self.hold_timedelta == 0:
1217 return self.snapshot_time + 86400
1218 return min(self.my_keepalive_time, self.peer_hold_time)
1220 def check_peer_hold_time(self, snapshot_time):
1221 """Raise error if nothing was read from peer until specified time."""
1222 # Hold time = 0 means keepalive checking off.
1223 if self.hold_timedelta != 0:
1224 # time.time() may be too strict
1225 if snapshot_time > self.peer_hold_time:
1226 logger.error("Peer has overstepped the hold timer.")
1227 raise RuntimeError("Peer has overstepped the hold timer.")
1228 # TODO: Include hold_timedelta?
1229 # TODO: Add notification sending (attempt). That means
1230 # move to write tracker.
1233 class ReadTracker(object):
1234 """Class for tracking read of mesages chunk by chunk and
1238 def __init__(self, bgp_socket, timer):
1239 """The reader initialisation.
1242 bgp_socket: socket to be used for sending
1243 timer: timer to be used for scheduling
1245 # References to outside objects.
1246 self.socket = bgp_socket
1248 # BGP marker length plus length field length.
1249 self.header_length = 18
1250 # TODO: make it class (constant) attribute
1251 # Computation of where next chunk ends depends on whether
1252 # we are beyond length field.
1253 self.reading_header = True
1254 # Countdown towards next size computation.
1255 self.bytes_to_read = self.header_length
1256 # Incremental buffer for message under read.
1258 # Initialising counters
1259 self.updates_received = 0
1260 self.prefixes_introduced = 0
1261 self.prefixes_withdrawn = 0
1262 self.rx_idle_time = 0
1263 self.rx_activity_detected = True
1265 def read_message_chunk(self):
1266 """Read up to one message
1269 Currently it does not return anything.
1271 # TODO: We could return the whole message, currently not needed.
1272 # We assume the socket is readable.
1273 chunk_message = self.socket.recv(self.bytes_to_read)
1274 self.msg_in += chunk_message
1275 self.bytes_to_read -= len(chunk_message)
1276 # TODO: bytes_to_read < 0 is not possible, right?
1277 if not self.bytes_to_read:
1278 # Finished reading a logical block.
1279 if self.reading_header:
1280 # The logical block was a BGP header.
1281 # Now we know the size of the message.
1282 self.reading_header = False
1283 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1285 else: # We have finished reading the body of the message.
1286 # Peer has just proven it is still alive.
1287 self.timer.reset_peer_hold_time()
1288 # TODO: Do we want to count received messages?
1289 # This version ignores the received message.
1290 # TODO: Should we do validation and exit on anything
1291 # besides update or keepalive?
1292 # Prepare state for reading another message.
1293 message_type_hex = self.msg_in[self.header_length]
1294 if message_type_hex == "\x01":
1295 logger.info("OPEN message received: 0x%s",
1296 binascii.b2a_hex(self.msg_in))
1297 elif message_type_hex == "\x02":
1298 logger.debug("UPDATE message received: 0x%s",
1299 binascii.b2a_hex(self.msg_in))
1300 self.decode_update_message(self.msg_in)
1301 elif message_type_hex == "\x03":
1302 logger.info("NOTIFICATION message received: 0x%s",
1303 binascii.b2a_hex(self.msg_in))
1304 elif message_type_hex == "\x04":
1305 logger.info("KEEP ALIVE message received: 0x%s",
1306 binascii.b2a_hex(self.msg_in))
1308 logger.warning("Unexpected message received: 0x%s",
1309 binascii.b2a_hex(self.msg_in))
1311 self.reading_header = True
1312 self.bytes_to_read = self.header_length
1313 # We should not act upon peer_hold_time if we are reading
1314 # something right now.
1317 def decode_path_attributes(self, path_attributes_hex):
1318 """Decode the Path Attributes field (rfc4271#section-4.3)
1321 :path_attributes: path_attributes field to be decoded in hex
1325 hex_to_decode = path_attributes_hex
1327 while len(hex_to_decode):
1328 attr_flags_hex = hex_to_decode[0]
1329 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1330 # attr_optional_bit = attr_flags & 128
1331 # attr_transitive_bit = attr_flags & 64
1332 # attr_partial_bit = attr_flags & 32
1333 attr_extended_length_bit = attr_flags & 16
1335 attr_type_code_hex = hex_to_decode[1]
1336 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1338 if attr_extended_length_bit:
1339 attr_length_hex = hex_to_decode[2:4]
1340 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1341 attr_value_hex = hex_to_decode[4:4 + attr_length]
1342 hex_to_decode = hex_to_decode[4 + attr_length:]
1344 attr_length_hex = hex_to_decode[2]
1345 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1346 attr_value_hex = hex_to_decode[3:3 + attr_length]
1347 hex_to_decode = hex_to_decode[3 + attr_length:]
1349 if attr_type_code == 1:
1350 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1351 binascii.b2a_hex(attr_flags_hex))
1352 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1353 elif attr_type_code == 2:
1354 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1355 binascii.b2a_hex(attr_flags_hex))
1356 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1357 elif attr_type_code == 3:
1358 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1359 binascii.b2a_hex(attr_flags_hex))
1360 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1361 elif attr_type_code == 4:
1362 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1363 binascii.b2a_hex(attr_flags_hex))
1364 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1365 elif attr_type_code == 5:
1366 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1367 binascii.b2a_hex(attr_flags_hex))
1368 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1369 elif attr_type_code == 6:
1370 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1371 binascii.b2a_hex(attr_flags_hex))
1372 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1373 elif attr_type_code == 7:
1374 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1375 binascii.b2a_hex(attr_flags_hex))
1376 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1377 elif attr_type_code == 9: # rfc4456#section-8
1378 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1379 binascii.b2a_hex(attr_flags_hex))
1380 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1381 elif attr_type_code == 10: # rfc4456#section-8
1382 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1383 binascii.b2a_hex(attr_flags_hex))
1384 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1385 elif attr_type_code == 14: # rfc4760#section-3
1386 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1387 binascii.b2a_hex(attr_flags_hex))
1388 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1389 address_family_identifier_hex = attr_value_hex[0:2]
1390 logger.debug(" Address Family Identifier=0x%s",
1391 binascii.b2a_hex(address_family_identifier_hex))
1392 subsequent_address_family_identifier_hex = attr_value_hex[2]
1393 logger.debug(" Subsequent Address Family Identifier=0x%s",
1394 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1395 next_hop_netaddr_len_hex = attr_value_hex[3]
1396 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1397 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1398 next_hop_netaddr_len,
1399 binascii.b2a_hex(next_hop_netaddr_len_hex))
1400 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1401 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1402 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1403 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1404 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1405 logger.debug(" Reserved=0x%s",
1406 binascii.b2a_hex(reserved_hex))
1407 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1408 logger.debug(" Network Layer Reachability Information=0x%s",
1409 binascii.b2a_hex(nlri_hex))
1410 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1411 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1412 for prefix in nlri_prefix_list:
1413 logger.debug(" nlri_prefix_received: %s", prefix)
1414 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1415 elif attr_type_code == 15: # rfc4760#section-4
1416 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1417 binascii.b2a_hex(attr_flags_hex))
1418 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1419 address_family_identifier_hex = attr_value_hex[0:2]
1420 logger.debug(" Address Family Identifier=0x%s",
1421 binascii.b2a_hex(address_family_identifier_hex))
1422 subsequent_address_family_identifier_hex = attr_value_hex[2]
1423 logger.debug(" Subsequent Address Family Identifier=0x%s",
1424 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1425 wd_hex = attr_value_hex[3:]
1426 logger.debug(" Withdrawn Routes=0x%s",
1427 binascii.b2a_hex(wd_hex))
1428 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1429 logger.debug(" Withdrawn routes prefix list: %s",
1431 for prefix in wdr_prefix_list:
1432 logger.debug(" withdrawn_prefix_received: %s", prefix)
1433 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1435 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1436 binascii.b2a_hex(attr_flags_hex))
1437 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1440 def decode_update_message(self, msg):
1441 """Decode an UPDATE message (rfc4271#section-4.3)
1444 :msg: message to be decoded in hex
1448 logger.debug("Decoding update message:")
1449 # message header - marker
1450 marker_hex = msg[:16]
1451 logger.debug("Message header marker: 0x%s",
1452 binascii.b2a_hex(marker_hex))
1453 # message header - message length
1454 msg_length_hex = msg[16:18]
1455 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1456 logger.debug("Message lenght: 0x%s (%s)",
1457 binascii.b2a_hex(msg_length_hex), msg_length)
1458 # message header - message type
1459 msg_type_hex = msg[18:19]
1460 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1462 logger.debug("Message type: 0x%s (update)",
1463 binascii.b2a_hex(msg_type_hex))
1464 # withdrawn routes length
1465 wdr_length_hex = msg[19:21]
1466 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1467 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1468 binascii.b2a_hex(wdr_length_hex), wdr_length)
1470 wdr_hex = msg[21:21 + wdr_length]
1471 logger.debug("Withdrawn routes: 0x%s",
1472 binascii.b2a_hex(wdr_hex))
1473 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1474 logger.debug("Withdrawn routes prefix list: %s",
1476 for prefix in wdr_prefix_list:
1477 logger.debug("withdrawn_prefix_received: %s", prefix)
1478 # total path attribute length
1479 total_pa_length_offset = 21 + wdr_length
1480 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1481 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1482 logger.debug("Total path attribute lenght: 0x%s (%s)",
1483 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1485 pa_offset = total_pa_length_offset + 2
1486 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1487 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1488 self.decode_path_attributes(pa_hex)
1489 # network layer reachability information length
1490 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1491 logger.debug("Calculated NLRI length: %s", nlri_length)
1492 # network layer reachability information
1493 nlri_offset = pa_offset + total_pa_length
1494 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1495 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1496 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1497 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1498 for prefix in nlri_prefix_list:
1499 logger.debug("nlri_prefix_received: %s", prefix)
1501 self.updates_received += 1
1502 self.prefixes_introduced += len(nlri_prefix_list)
1503 self.prefixes_withdrawn += len(wdr_prefix_list)
1505 logger.error("Unexpeced message type 0x%s in 0x%s",
1506 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1508 def wait_for_read(self):
1509 """Read message until timeout (next expected event).
1512 Used when no more updates has to be sent to avoid busy-wait.
1513 Currently it does not return anything.
1515 # Compute time to the first predictable state change
1516 event_time = self.timer.get_next_event_time()
1517 # snapshot_time would be imprecise
1518 wait_timedelta = min(event_time - time.time(), 10)
1519 if wait_timedelta < 0:
1520 # The program got around to waiting to an event in "very near
1521 # future" so late that it became a "past" event, thus tell
1522 # "select" to not wait at all. Passing negative timedelta to
1523 # select() would lead to either waiting forever (for -1) or
1524 # select.error("Invalid parameter") (for everything else).
1526 # And wait for event or something to read.
1528 if not self.rx_activity_detected or not (self.updates_received % 100):
1529 # right time to write statistics to the log (not for every update and
1530 # not too frequently to avoid having large log files)
1531 logger.info("total_received_update_message_counter: %s",
1532 self.updates_received)
1533 logger.info("total_received_nlri_prefix_counter: %s",
1534 self.prefixes_introduced)
1535 logger.info("total_received_withdrawn_prefix_counter: %s",
1536 self.prefixes_withdrawn)
1538 start_time = time.time()
1539 select.select([self.socket], [], [self.socket], wait_timedelta)
1540 timedelta = time.time() - start_time
1541 self.rx_idle_time += timedelta
1542 self.rx_activity_detected = timedelta < 1
1544 if not self.rx_activity_detected or not (self.updates_received % 100):
1545 # right time to write statistics to the log (not for every update and
1546 # not too frequently to avoid having large log files)
1547 logger.info("... idle for %.3fs", timedelta)
1548 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1552 class WriteTracker(object):
1553 """Class tracking enqueueing messages and sending chunks of them."""
1555 def __init__(self, bgp_socket, generator, timer):
1556 """The writter initialisation.
1559 bgp_socket: socket to be used for sending
1560 generator: generator to be used for message generation
1561 timer: timer to be used for scheduling
1563 # References to outside objects,
1564 self.socket = bgp_socket
1565 self.generator = generator
1567 # Really new fields.
1568 # TODO: Would attribute docstrings add anything substantial?
1569 self.sending_message = False
1570 self.bytes_to_send = 0
1573 def enqueue_message_for_sending(self, message):
1574 """Enqueue message and change state.
1577 message: message to be enqueued into the msg_out buffer
1579 self.msg_out += message
1580 self.bytes_to_send += len(message)
1581 self.sending_message = True
1583 def send_message_chunk_is_whole(self):
1584 """Send enqueued data from msg_out buffer
1587 :return: true if no remaining data to send
1589 # We assume there is a msg_out to send and socket is writable.
1590 # print "going to send", repr(self.msg_out)
1591 self.timer.snapshot()
1592 bytes_sent = self.socket.send(self.msg_out)
1593 # Forget the part of message that was sent.
1594 self.msg_out = self.msg_out[bytes_sent:]
1595 self.bytes_to_send -= bytes_sent
1596 if not self.bytes_to_send:
1597 # TODO: Is it possible to hit negative bytes_to_send?
1598 self.sending_message = False
1599 # We should have reset hold timer on peer side.
1600 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1601 # The possible reason for not prioritizing reads is gone.
1606 class StateTracker(object):
1607 """Main loop has state so complex it warrants this separate class."""
1609 def __init__(self, bgp_socket, generator, timer):
1610 """The state tracker initialisation.
1613 bgp_socket: socket to be used for sending / receiving
1614 generator: generator to be used for message generation
1615 timer: timer to be used for scheduling
1617 # References to outside objects.
1618 self.socket = bgp_socket
1619 self.generator = generator
1622 self.reader = ReadTracker(bgp_socket, timer)
1623 self.writer = WriteTracker(bgp_socket, generator, timer)
1624 # Prioritization state.
1625 self.prioritize_writing = False
1626 # In general, we prioritize reading over writing. But in order
1627 # not to get blocked by neverending reads, we should
1628 # check whether we are not risking running out of holdtime.
1629 # So in some situations, this field is set to True to attempt
1630 # finishing sending a message, after which this field resets
1632 # TODO: Alternative is to switch fairly between reading and
1633 # writing (called round robin from now on).
1634 # Message counting is done in generator.
1636 def perform_one_loop_iteration(self):
1637 """ The main loop iteration
1640 Calculates priority, resolves all conditions, calls
1641 appropriate method and returns to caller to repeat.
1643 self.timer.snapshot()
1644 if not self.prioritize_writing:
1645 if self.timer.is_time_for_my_keepalive():
1646 if not self.writer.sending_message:
1647 # We need to schedule a keepalive ASAP.
1648 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1649 logger.info("KEEP ALIVE is sent.")
1650 # We are sending a message now, so let's prioritize it.
1651 self.prioritize_writing = True
1652 # Now we know what our priorities are, we have to check
1653 # which actions are available.
1654 # socket.socket() returns three lists,
1655 # we store them to list of lists.
1656 list_list = select.select([self.socket], [self.socket], [self.socket],
1657 self.timer.report_timedelta)
1658 read_list, write_list, except_list = list_list
1659 # Lists are unpacked, each is either [] or [self.socket],
1660 # so we will test them as boolean.
1662 logger.error("Exceptional state on the socket.")
1663 raise RuntimeError("Exceptional state on socket", self.socket)
1664 # We will do either read or write.
1665 if not (self.prioritize_writing and write_list):
1666 # Either we have no reason to rush writes,
1667 # or the socket is not writable.
1668 # We are focusing on reading here.
1669 if read_list: # there is something to read indeed
1670 # In this case we want to read chunk of message
1671 # and repeat the select,
1672 self.reader.read_message_chunk()
1674 # We were focusing on reading, but nothing to read was there.
1675 # Good time to check peer for hold timer.
1676 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1677 # Quiet on the read front, we can have attempt to write.
1679 # Either we really want to reset peer's view of our hold
1680 # timer, or there was nothing to read.
1681 # Were we in the middle of sending a message?
1682 if self.writer.sending_message:
1683 # Was it the end of a message?
1684 whole = self.writer.send_message_chunk_is_whole()
1685 # We were pressed to send something and we did it.
1686 if self.prioritize_writing and whole:
1687 # We prioritize reading again.
1688 self.prioritize_writing = False
1690 # Finally to check if still update messages to be generated.
1691 if self.generator.remaining_prefixes:
1692 msg_out = self.generator.compose_update_message()
1693 if not self.generator.remaining_prefixes:
1694 # We have just finished update generation,
1695 # end-of-rib is due.
1696 logger.info("All update messages generated.")
1697 logger.info("Storing performance results.")
1698 self.generator.store_results()
1699 logger.info("Finally an END-OF-RIB is sent.")
1700 msg_out += self.generator.update_message(wr_prefixes=[],
1703 self.writer.enqueue_message_for_sending(msg_out)
1704 # Attempt for real sending to be done in next iteration.
1706 # Nothing to write anymore.
1707 # To avoid busy loop, we do idle waiting here.
1708 self.reader.wait_for_read()
1710 # We can neither read nor write.
1711 logger.warning("Input and output both blocked for " +
1712 str(self.timer.report_timedelta) + " seconds.")
1713 # FIXME: Are we sure select has been really waiting
1718 def create_logger(loglevel, logfile):
1719 """Create logger object
1722 :loglevel: log level
1723 :logfile: log file name
1725 :return: logger object
1727 logger = logging.getLogger("logger")
1728 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1729 console_handler = logging.StreamHandler()
1730 file_handler = logging.FileHandler(logfile, mode="w")
1731 console_handler.setFormatter(log_formatter)
1732 file_handler.setFormatter(log_formatter)
1733 logger.addHandler(console_handler)
1734 logger.addHandler(file_handler)
1735 logger.setLevel(loglevel)
1740 """One time initialisation and iterations looping.
1742 Establish BGP connection and run iterations.
1745 :arguments: Command line arguments
1749 bgp_socket = establish_connection(arguments)
1750 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1751 # Receive open message before sending anything.
1752 # FIXME: Add parameter to send default open message first,
1753 # to work with "you first" peers.
1754 msg_in = read_open_message(bgp_socket)
1755 timer = TimeTracker(msg_in)
1756 generator = MessageGenerator(arguments)
1757 msg_out = generator.open_message()
1758 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1759 # Send our open message to the peer.
1760 bgp_socket.send(msg_out)
1761 # Wait for confirming keepalive.
1762 # TODO: Surely in just one packet?
1763 # Using exact keepalive length to not to see possible updates.
1764 msg_in = bgp_socket.recv(19)
1765 if msg_in != generator.keepalive_message():
1766 error_msg = "Open not confirmed by keepalive, instead got"
1767 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1768 raise MessageError(error_msg, msg_in)
1769 timer.reset_peer_hold_time()
1770 # Send the keepalive to indicate the connection is accepted.
1771 timer.snapshot() # Remember this time.
1772 msg_out = generator.keepalive_message()
1773 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1774 bgp_socket.send(msg_out)
1775 # Use the remembered time.
1776 timer.reset_my_keepalive_time(timer.snapshot_time)
1777 # End of initial handshake phase.
1778 state = StateTracker(bgp_socket, generator, timer)
1779 while True: # main reactor loop
1780 state.perform_one_loop_iteration()
1783 def threaded_job(arguments):
1784 """Run the job threaded
1787 :arguments: Command line arguments
1791 amount_left = arguments.amount
1792 utils_left = arguments.multiplicity
1793 prefix_current = arguments.firstprefix
1794 myip_current = arguments.myip
1798 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1799 amount_left -= amount_per_util
1802 args = deepcopy(arguments)
1803 args.amount = amount_per_util
1804 args.firstprefix = prefix_current
1805 args.myip = myip_current
1806 thread_args.append(args)
1810 prefix_current += amount_per_util * 16
1815 for t in thread_args:
1816 thread.start_new_thread(job, (t,))
1818 print "Error: unable to start thread."
1821 # Work remains forever
1826 if __name__ == "__main__":
1827 arguments = parse_arguments()
1828 logger = create_logger(arguments.loglevel, arguments.logfile)
1829 threaded_job(arguments)