1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Rpc server port."
69 parser.add_argument("--port", default="8000", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be announced in one iteration"
71 parser.add_argument("--insert", default="1", type=int, help=str_help)
72 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74 str_help = "The number of prefixes to process without withdrawals"
75 parser.add_argument("--prefill", default="0", type=int, help=str_help)
76 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77 parser.add_argument("--updates", choices=["single", "separate"],
78 default=["separate"], help=str_help)
79 str_help = "Base prefix IP address for prefix generation"
80 parser.add_argument("--firstprefix", default="8.0.1.0",
81 type=ipaddr.IPv4Address, help=str_help)
82 str_help = "The prefix length."
83 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84 str_help = "Listen for connection, instead of initiating it."
85 parser.add_argument("--listen", action="store_true", help=str_help)
86 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87 "Default value only suitable for listening.")
88 parser.add_argument("--myip", default="0.0.0.0",
89 type=ipaddr.IPv4Address, help=str_help)
90 str_help = ("TCP port to bind to when listening or initiating connection." +
91 "Default only suitable for initiating.")
92 parser.add_argument("--myport", default="0", type=int, help=str_help)
93 str_help = "The IP of the next hop to be placed into the update messages."
94 parser.add_argument("--nexthop", default="192.0.2.1",
95 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96 str_help = "Identifier of the route originator."
97 parser.add_argument("--originator", default=None,
98 type=ipaddr.IPv4Address, dest="originator", help=str_help)
99 str_help = "Cluster list item identifier."
100 parser.add_argument("--cluster", default=None,
101 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102 str_help = ("Numeric IP Address to try to connect to." +
103 "Currently no effect in listening mode.")
104 parser.add_argument("--peerip", default="127.0.0.2",
105 type=ipaddr.IPv4Address, help=str_help)
106 str_help = "TCP port to try to connect to. No effect in listening mode."
107 parser.add_argument("--peerport", default="179", type=int, help=str_help)
108 str_help = "Local hold time."
109 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110 str_help = "Log level (--error, --warning, --info, --debug)"
111 parser.add_argument("--error", dest="loglevel", action="store_const",
112 const=logging.ERROR, default=logging.INFO,
114 parser.add_argument("--warning", dest="loglevel", action="store_const",
115 const=logging.WARNING, default=logging.INFO,
117 parser.add_argument("--info", dest="loglevel", action="store_const",
118 const=logging.INFO, default=logging.INFO,
120 parser.add_argument("--debug", dest="loglevel", action="store_const",
121 const=logging.DEBUG, default=logging.INFO,
123 str_help = "Log file name"
124 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125 str_help = "Trailing part of the csv result files for plotting purposes"
126 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127 str_help = "Minimum number of updates to reach to include result into csv."
128 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131 str_help = "Using peerip instead of myip for xmlrpc server"
132 parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133 str_help = "Link-State NLRI supported"
134 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135 str_help = "Link-State NLRI: Identifier"
136 parser.add_argument("-lsid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: Tunnel ID"
138 parser.add_argument("-lstid", default="1", type=int, help=str_help)
139 str_help = "Link-State NLRI: LSP ID"
140 parser.add_argument("-lspid", default="1", type=int, help=str_help)
141 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142 parser.add_argument("--lstsaddr", default="1.2.3.4",
143 type=ipaddr.IPv4Address, help=str_help)
144 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145 parser.add_argument("--lsteaddr", default="5.6.7.8",
146 type=ipaddr.IPv4Address, help=str_help)
147 str_help = "Link-State NLRI: Identifier Step"
148 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149 str_help = "Link-State NLRI: Tunnel ID Step"
150 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151 str_help = "Link-State NLRI: LSP ID Step"
152 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157 str_help = "How many play utilities are to be started."
158 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160 Enabling this flag makes the script not decoding the update mesage, because of not\
161 supported decoding for these elements."
162 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164 Enabling this flag makes the script not decoding the update mesage, because of not\
165 supported decoding for these elements."
166 parser.add_argument("--grace", default="8", type=int, help=str_help)
167 str_help = "Open message includes Graceful-restart capability, containing AFI/SAFIS:\
168 IPV4-Unicast, IPV6-Unicast, BGP-LS\
169 Enabling this flag makes the script not decoding the update mesage, because of not\
170 supported decoding for these elements."
171 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
172 str_help = "Open message includes L3VPN-MULTICAST arguments.\
173 Enabling this flag makes the script not decoding the update mesage, because of not\
174 supported decoding for these elements."
175 parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
176 str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
177 parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
178 str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
179 parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
180 str_help = "Open message includes ipv6-unicast family, without message decoding."
181 parser.add_argument("--ipv6", default=False, action="store_true", help=str_help)
182 str_help = "Add all supported families without message decoding."
183 parser.add_argument("--allf", default=False, action="store_true", help=str_help)
184 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
185 str_help = "Skipping well known attributes for update message"
186 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
187 arguments = parser.parse_args()
188 if arguments.multiplicity < 1:
189 print "Multiplicity", arguments.multiplicity, "is not positive."
191 # TODO: Are sanity checks (such as asnumber>=0) required?
195 def establish_connection(arguments):
196 """Establish connection to BGP peer.
199 :arguments: following command-line arguments are used
200 - arguments.myip: local IP address
201 - arguments.myport: local port
202 - arguments.peerip: remote IP address
203 - arguments.peerport: remote port
208 logger.info("Connecting in the listening mode.")
209 logger.debug("Local IP address: " + str(arguments.myip))
210 logger.debug("Local port: " + str(arguments.myport))
211 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
213 # bind need single tuple as argument
214 listening_socket.bind((str(arguments.myip), arguments.myport))
215 listening_socket.listen(1)
216 bgp_socket, _ = listening_socket.accept()
217 # TODO: Verify client IP is cotroller IP.
218 listening_socket.close()
220 logger.info("Connecting in the talking mode.")
221 logger.debug("Local IP address: " + str(arguments.myip))
222 logger.debug("Local port: " + str(arguments.myport))
223 logger.debug("Remote IP address: " + str(arguments.peerip))
224 logger.debug("Remote port: " + str(arguments.peerport))
225 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227 # bind to force specified address and port
228 talking_socket.bind((str(arguments.myip), arguments.myport))
229 # socket does not spead ipaddr, hence str()
230 talking_socket.connect((str(arguments.peerip), arguments.peerport))
231 bgp_socket = talking_socket
232 logger.info("Connected to ODL.")
236 def get_short_int_from_message(message, offset=16):
237 """Extract 2-bytes number from provided message.
240 :message: given message
241 :offset: offset of the short_int inside the message
243 :return: required short_inf value.
245 default offset value is the BGP message size offset.
247 high_byte_int = ord(message[offset])
248 low_byte_int = ord(message[offset + 1])
249 short_int = high_byte_int * 256 + low_byte_int
253 def get_prefix_list_from_hex(prefixes_hex):
254 """Get decoded list of prefixes (rfc4271#section-4.3)
257 :prefixes_hex: list of prefixes to be decoded in hex
259 :return: list of prefixes in the form of ip address (X.X.X.X/X)
263 while offset < len(prefixes_hex):
264 prefix_bit_len_hex = prefixes_hex[offset]
265 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
266 prefix_len = ((prefix_bit_len - 1) / 8) + 1
267 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
268 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
269 offset += 1 + prefix_len
270 prefix_list.append(prefix + "/" + str(prefix_bit_len))
274 class MessageError(ValueError):
275 """Value error with logging optimized for hexlified messages."""
277 def __init__(self, text, message, *args):
280 Store and call super init for textual comment,
281 store raw message which caused it.
285 super(MessageError, self).__init__(text, message, *args)
288 """Generate human readable error message.
291 :return: human readable message as string
293 Use a placeholder string if the message is to be empty.
295 message = binascii.hexlify(self.msg)
297 message = "(empty message)"
298 return self.text + ": " + message
301 def read_open_message(bgp_socket):
302 """Receive peer's OPEN message
305 :bgp_socket: the socket to be read
307 :return: received OPEN message.
309 Performs just basic incomming message checks
311 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
312 # TODO: Can the incoming open message be split in more than one packet?
315 # 37 is minimal length of open message with 4-byte AS number.
317 "Message length (" + str(len(msg_in)) + ") is smaller than "
318 "minimal length of OPEN message with 4-byte AS number (37)"
320 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
321 raise MessageError(error_msg, msg_in)
322 # TODO: We could check BGP marker, but it is defined only later;
324 reported_length = get_short_int_from_message(msg_in)
325 if len(msg_in) != reported_length:
327 "Expected message length (" + reported_length +
328 ") does not match actual length (" + str(len(msg_in)) + ")"
330 logger.error(error_msg + binascii.hexlify(msg_in))
331 raise MessageError(error_msg, msg_in)
332 logger.info("Open message received.")
336 class MessageGenerator(object):
337 """Class which generates messages, holds states and configuration values."""
339 # TODO: Define bgp marker as a class (constant) variable.
340 def __init__(self, args):
341 """Initialisation according to command-line args.
344 :args: argsparser's Namespace object which contains command-line
345 options for MesageGenerator initialisation
347 Calculates and stores default values used later on for
350 self.total_prefix_amount = args.amount
351 # Number of update messages left to be sent.
352 self.remaining_prefixes = self.total_prefix_amount
354 # New parameters initialisation
355 self.port = args.port
357 self.prefix_base_default = args.firstprefix
358 self.prefix_length_default = args.prefixlen
359 self.wr_prefixes_default = []
360 self.nlri_prefixes_default = []
361 self.version_default = 4
362 self.my_autonomous_system_default = args.asnumber
363 self.hold_time_default = args.holdtime # Local hold time.
364 self.bgp_identifier_default = int(args.myip)
365 self.next_hop_default = args.nexthop
366 self.originator_id_default = args.originator
367 self.cluster_list_item_default = args.cluster
368 self.single_update_default = args.updates == "single"
369 self.randomize_updates_default = args.updates == "random"
370 self.prefix_count_to_add_default = args.insert
371 self.prefix_count_to_del_default = args.withdraw
372 if self.prefix_count_to_del_default < 0:
373 self.prefix_count_to_del_default = 0
374 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
375 # total number of prefixes must grow to avoid infinite test loop
376 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
377 self.slot_size_default = self.prefix_count_to_add_default
378 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
379 self.results_file_name_default = args.results
380 self.performance_threshold_default = args.threshold
381 self.rfc4760 = args.rfc4760
382 self.bgpls = args.bgpls
383 self.evpn = args.evpn
384 self.mvpn = args.mvpn
385 self.l3vpn_mcast = args.l3vpn_mcast
386 self.l3vpn = args.l3vpn
387 self.rt_constrain = args.rt_constrain
388 self.ipv6 = args.ipv6
389 self.allf = args.allf
390 self.skipattr = args.skipattr
391 self.grace = args.grace
392 # Default values when BGP-LS Attributes are used
394 self.prefix_count_to_add_default = 1
395 self.prefix_count_to_del_default = 0
396 self.ls_nlri_default = {"Identifier": args.lsid,
397 "TunnelID": args.lstid,
399 "IPv4TunnelSenderAddress": args.lstsaddr,
400 "IPv4TunnelEndPointAddress": args.lsteaddr}
401 self.lsid_step = args.lsidstep
402 self.lstid_step = args.lstidstep
403 self.lspid_step = args.lspidstep
404 self.lstsaddr_step = args.lstsaddrstep
405 self.lsteaddr_step = args.lsteaddrstep
406 # Default values used for randomized part
407 s1_slots = ((self.total_prefix_amount -
408 self.remaining_prefixes_threshold - 1) /
409 self.prefix_count_to_add_default + 1)
410 s2_slots = ((self.remaining_prefixes_threshold - 1) /
411 (self.prefix_count_to_add_default -
412 self.prefix_count_to_del_default) + 1)
414 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
415 s2_first_index = s1_slots * self.prefix_count_to_add_default
416 s2_last_index = (s2_first_index +
417 s2_slots * (self.prefix_count_to_add_default -
418 self.prefix_count_to_del_default) - 1)
419 self.slot_gap_default = ((self.total_prefix_amount -
420 self.remaining_prefixes_threshold - 1) /
421 self.prefix_count_to_add_default + 1)
422 self.randomize_lowest_default = s2_first_index
423 self.randomize_highest_default = s2_last_index
424 # Initialising counters
425 self.phase1_start_time = 0
426 self.phase1_stop_time = 0
427 self.phase2_start_time = 0
428 self.phase2_stop_time = 0
429 self.phase1_updates_sent = 0
430 self.phase2_updates_sent = 0
431 self.updates_sent = 0
433 self.log_info = args.loglevel <= logging.INFO
434 self.log_debug = args.loglevel <= logging.DEBUG
436 Flags needed for the MessageGenerator performance optimization.
437 Calling logger methods each iteration even with proper log level set
438 slows down significantly the MessageGenerator performance.
439 Measured total generation time (1M updates, dry run, error log level):
440 - logging based on basic logger features: 36,2s
441 - logging based on advanced logger features (lazy logging): 21,2s
442 - conditional calling of logger methods enclosed inside condition: 8,6s
445 logger.info("Generator initialisation")
446 logger.info(" Target total number of prefixes to be introduced: " +
447 str(self.total_prefix_amount))
448 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
449 str(self.prefix_length_default))
450 logger.info(" My Autonomous System number: " +
451 str(self.my_autonomous_system_default))
452 logger.info(" My Hold Time: " + str(self.hold_time_default))
453 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
454 logger.info(" Next Hop: " + str(self.next_hop_default))
455 logger.info(" Originator ID: " + str(self.originator_id_default))
456 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
457 logger.info(" Prefix count to be inserted at once: " +
458 str(self.prefix_count_to_add_default))
459 logger.info(" Prefix count to be withdrawn at once: " +
460 str(self.prefix_count_to_del_default))
461 logger.info(" Fast pre-fill up to " +
462 str(self.total_prefix_amount -
463 self.remaining_prefixes_threshold) + " prefixes")
464 logger.info(" Remaining number of prefixes to be processed " +
465 "in parallel with withdrawals: " +
466 str(self.remaining_prefixes_threshold))
467 logger.debug(" Prefix index range used after pre-fill procedure [" +
468 str(self.randomize_lowest_default) + ", " +
469 str(self.randomize_highest_default) + "]")
470 if self.single_update_default:
471 logger.info(" Common single UPDATE will be generated " +
472 "for both NLRI & WITHDRAWN lists")
474 logger.info(" Two separate UPDATEs will be generated " +
475 "for each NLRI & WITHDRAWN lists")
476 if self.randomize_updates_default:
477 logger.info(" Generation of UPDATE messages will be randomized")
478 logger.info(" Let\'s go ...\n")
480 # TODO: Notification for hold timer expiration can be handy.
482 def store_results(self, file_name=None, threshold=None):
483 """ Stores specified results into files based on file_name value.
486 :param file_name: Trailing (common) part of result file names
487 :param threshold: Minimum number of sent updates needed for each
488 result to be included into result csv file
489 (mainly needed because of the result accuracy)
493 # default values handling
494 # TODO optimize default values handling (use e.g. dicionary.update() approach)
495 if file_name is None:
496 file_name = self.results_file_name_default
497 if threshold is None:
498 threshold = self.performance_threshold_default
499 # performance calculation
500 if self.phase1_updates_sent >= threshold:
501 totals1 = self.phase1_updates_sent
502 performance1 = int(self.phase1_updates_sent /
503 (self.phase1_stop_time - self.phase1_start_time))
507 if self.phase2_updates_sent >= threshold:
508 totals2 = self.phase2_updates_sent
509 performance2 = int(self.phase2_updates_sent /
510 (self.phase2_stop_time - self.phase2_start_time))
515 logger.info("#" * 10 + " Final results " + "#" * 10)
516 logger.info("Number of iterations: " + str(self.iteration))
517 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
518 str(self.phase1_updates_sent))
519 logger.info("The pre-fill phase duration: " +
520 str(self.phase1_stop_time - self.phase1_start_time) + "s")
521 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
522 str(self.phase2_updates_sent))
523 logger.info("The 2nd test phase duration: " +
524 str(self.phase2_stop_time - self.phase2_start_time) + "s")
525 logger.info("Threshold for performance reporting: " + str(threshold))
528 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
529 " route(s) per UPDATE")
530 if self.single_update_default:
531 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
532 "/-" + str(self.prefix_count_to_del_default) +
533 " routes per UPDATE")
535 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
536 "/-" + str(self.prefix_count_to_del_default) +
537 " routes in two UPDATEs")
538 # collecting capacity and performance results
541 if totals1 is not None:
542 totals[phase1_label] = totals1
543 performance[phase1_label] = performance1
544 if totals2 is not None:
545 totals[phase2_label] = totals2
546 performance[phase2_label] = performance2
547 self.write_results_to_file(totals, "totals-" + file_name)
548 self.write_results_to_file(performance, "performance-" + file_name)
550 def write_results_to_file(self, results, file_name):
551 """Writes results to the csv plot file consumable by Jenkins.
554 :param file_name: Name of the (csv) file to be created
560 f = open(file_name, "wt")
562 for key in sorted(results):
563 first_line += key + ", "
564 second_line += str(results[key]) + ", "
565 first_line = first_line[:-2]
566 second_line = second_line[:-2]
567 f.write(first_line + "\n")
568 f.write(second_line + "\n")
569 logger.info("Message generator performance results stored in " +
571 logger.info(" " + first_line)
572 logger.info(" " + second_line)
576 # Return pseudo-randomized (reproducible) index for selected range
577 def randomize_index(self, index, lowest=None, highest=None):
578 """Calculates pseudo-randomized index from selected range.
581 :param index: input index
582 :param lowest: the lowes index from the randomized area
583 :param highest: the highest index from the randomized area
585 :return: the (pseudo)randomized index
587 Created just as a fame for future generator enhancement.
589 # default values handling
590 # TODO optimize default values handling (use e.g. dicionary.update() approach)
592 lowest = self.randomize_lowest_default
594 highest = self.randomize_highest_default
596 if (index >= lowest) and (index <= highest):
597 # we are in the randomized range -> shuffle it inside
598 # the range (now just reverse the order)
599 new_index = highest - (index - lowest)
601 # we are out of the randomized range -> nothing to do
605 def get_ls_nlri_values(self, index):
606 """Generates LS-NLRI parameters.
607 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
610 :param index: index (iteration)
612 :return: dictionary of LS NLRI parameters and values
614 # generating list of LS NLRI parameters
615 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
616 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
617 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
618 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
619 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
620 ls_nlri_values = {"Identifier": identifier,
621 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
622 "TunnelID": tunnel_id, "LSPID": lsp_id,
623 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
624 return ls_nlri_values
626 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
627 prefix_len=None, prefix_count=None, randomize=None):
628 """Generates list of IP address prefixes.
631 :param slot_index: index of group of prefix addresses
632 :param slot_size: size of group of prefix addresses
633 in [number of included prefixes]
634 :param prefix_base: IP address of the first prefix
635 (slot_index = 0, prefix_index = 0)
636 :param prefix_len: length of the prefix in bites
637 (the same as size of netmask)
638 :param prefix_count: number of prefixes to be returned
639 from the specified slot
641 :return: list of generated IP address prefixes
643 # default values handling
644 # TODO optimize default values handling (use e.g. dicionary.update() approach)
645 if slot_size is None:
646 slot_size = self.slot_size_default
647 if prefix_base is None:
648 prefix_base = self.prefix_base_default
649 if prefix_len is None:
650 prefix_len = self.prefix_length_default
651 if prefix_count is None:
652 prefix_count = slot_size
653 if randomize is None:
654 randomize = self.randomize_updates_default
655 # generating list of prefixes
658 prefix_gap = 2 ** (32 - prefix_len)
659 for i in range(prefix_count):
660 prefix_index = slot_index * slot_size + i
662 prefix_index = self.randomize_index(prefix_index)
663 indexes.append(prefix_index)
664 prefixes.append(prefix_base + prefix_index * prefix_gap)
666 logger.debug(" Prefix slot index: " + str(slot_index))
667 logger.debug(" Prefix slot size: " + str(slot_size))
668 logger.debug(" Prefix count: " + str(prefix_count))
669 logger.debug(" Prefix indexes: " + str(indexes))
670 logger.debug(" Prefix list: " + str(prefixes))
673 def compose_update_message(self, prefix_count_to_add=None,
674 prefix_count_to_del=None):
675 """Composes an UPDATE message
678 :param prefix_count_to_add: # of prefixes to put into NLRI list
679 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
681 :return: encoded UPDATE message in HEX
683 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
684 lists or common message wich includes both prefix lists.
685 Updates global counters.
687 # default values handling
688 # TODO optimize default values handling (use e.g. dicionary.update() approach)
689 if prefix_count_to_add is None:
690 prefix_count_to_add = self.prefix_count_to_add_default
691 if prefix_count_to_del is None:
692 prefix_count_to_del = self.prefix_count_to_del_default
694 if self.log_info and not (self.iteration % 1000):
695 logger.info("Iteration: " + str(self.iteration) +
696 " - total remaining prefixes: " +
697 str(self.remaining_prefixes))
699 logger.debug("#" * 10 + " Iteration: " +
700 str(self.iteration) + " " + "#" * 10)
701 logger.debug("Remaining prefixes: " +
702 str(self.remaining_prefixes))
703 # scenario type & one-shot counter
704 straightforward_scenario = (self.remaining_prefixes >
705 self.remaining_prefixes_threshold)
706 if straightforward_scenario:
707 prefix_count_to_del = 0
709 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
710 if not self.phase1_start_time:
711 self.phase1_start_time = time.time()
714 logger.debug("--- COMBINED SCENARIO ---")
715 if not self.phase2_start_time:
716 self.phase2_start_time = time.time()
717 # tailor the number of prefixes if needed
718 prefix_count_to_add = (prefix_count_to_del +
719 min(prefix_count_to_add - prefix_count_to_del,
720 self.remaining_prefixes))
721 # prefix slots selection for insertion and withdrawal
722 slot_index_to_add = self.iteration
723 slot_index_to_del = slot_index_to_add - self.slot_gap_default
724 # getting lists of prefixes for insertion in this iteration
726 logger.debug("Prefixes to be inserted in this iteration:")
727 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
728 prefix_count=prefix_count_to_add)
729 # getting lists of prefixes for withdrawal in this iteration
731 logger.debug("Prefixes to be withdrawn in this iteration:")
732 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
733 prefix_count=prefix_count_to_del)
734 # generating the UPDATE mesage with LS-NLRI only
736 ls_nlri = self.get_ls_nlri_values(self.iteration)
737 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
740 # generating the UPDATE message with prefix lists
741 if self.single_update_default:
742 # Send prefixes to be introduced and withdrawn
743 # in one UPDATE message
744 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
745 nlri_prefixes=prefix_list_to_add)
747 # Send prefixes to be introduced and withdrawn
748 # in separate UPDATE messages (if needed)
749 msg_out = self.update_message(wr_prefixes=[],
750 nlri_prefixes=prefix_list_to_add)
751 if prefix_count_to_del:
752 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
754 # updating counters - who knows ... maybe I am last time here ;)
755 if straightforward_scenario:
756 self.phase1_stop_time = time.time()
757 self.phase1_updates_sent = self.updates_sent
759 self.phase2_stop_time = time.time()
760 self.phase2_updates_sent = (self.updates_sent -
761 self.phase1_updates_sent)
762 # updating totals for the next iteration
764 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
765 # returning the encoded message
768 # Section of message encoders
770 def open_message(self, version=None, my_autonomous_system=None,
771 hold_time=None, bgp_identifier=None):
772 """Generates an OPEN Message (rfc4271#section-4.2)
775 :param version: see the rfc4271#section-4.2
776 :param my_autonomous_system: see the rfc4271#section-4.2
777 :param hold_time: see the rfc4271#section-4.2
778 :param bgp_identifier: see the rfc4271#section-4.2
780 :return: encoded OPEN message in HEX
783 # default values handling
784 # TODO optimize default values handling (use e.g. dicionary.update() approach)
786 version = self.version_default
787 if my_autonomous_system is None:
788 my_autonomous_system = self.my_autonomous_system_default
789 if hold_time is None:
790 hold_time = self.hold_time_default
791 if bgp_identifier is None:
792 bgp_identifier = self.bgp_identifier_default
795 marker_hex = "\xFF" * 16
799 type_hex = struct.pack("B", type)
802 version_hex = struct.pack("B", version)
804 # my_autonomous_system
805 # AS_TRANS value, 23456 decadic.
806 my_autonomous_system_2_bytes = 23456
807 # AS number is mappable to 2 bytes
808 if my_autonomous_system < 65536:
809 my_autonomous_system_2_bytes = my_autonomous_system
810 my_autonomous_system_hex_2_bytes = struct.pack(">H",
811 my_autonomous_system)
814 hold_time_hex = struct.pack(">H", hold_time)
817 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
819 # Optional Parameters
820 optional_parameters_hex = ""
821 if self.rfc4760 or self.allf:
822 optional_parameter_hex = (
823 "\x02" # Param type ("Capability Ad")
824 "\x06" # Length (6 bytes)
825 "\x01" # Capability type (NLRI Unicast),
826 # see RFC 4760, secton 8
827 "\x04" # Capability value length
828 "\x00\x01" # AFI (Ipv4)
830 "\x01" # SAFI (Unicast)
832 optional_parameters_hex += optional_parameter_hex
834 if self.ipv6 or self.allf:
835 optional_parameter_hex = (
836 "\x02" # Param type ("Capability Ad")
837 "\x06" # Length (6 bytes)
838 "\x01" # Multiprotocol extetension capability,
839 "\x04" # Capability value length
840 "\x00\x02" # AFI (IPV6)
842 "\x01" # SAFI (UNICAST)
844 optional_parameters_hex += optional_parameter_hex
846 if self.bgpls or self.allf:
847 optional_parameter_hex = (
848 "\x02" # Param type ("Capability Ad")
849 "\x06" # Length (6 bytes)
850 "\x01" # Capability type (NLRI Unicast),
851 # see RFC 4760, secton 8
852 "\x04" # Capability value length
853 "\x40\x04" # AFI (BGP-LS)
855 "\x47" # SAFI (BGP-LS)
857 optional_parameters_hex += optional_parameter_hex
859 if self.evpn or self.allf:
860 optional_parameter_hex = (
861 "\x02" # Param type ("Capability Ad")
862 "\x06" # Length (6 bytes)
863 "\x01" # Multiprotocol extetension capability,
864 "\x04" # Capability value length
865 "\x00\x19" # AFI (L2-VPN)
869 optional_parameters_hex += optional_parameter_hex
871 if self.mvpn or self.allf:
872 optional_parameter_hex = (
873 "\x02" # Param type ("Capability Ad")
874 "\x06" # Length (6 bytes)
875 "\x01" # Multiprotocol extetension capability,
876 "\x04" # Capability value length
877 "\x00\x01" # AFI (IPV4)
879 "\x05" # SAFI (MCAST-VPN)
881 optional_parameters_hex += optional_parameter_hex
882 optional_parameter_hex = (
883 "\x02" # Param type ("Capability Ad")
884 "\x06" # Length (6 bytes)
885 "\x01" # Multiprotocol extetension capability,
886 "\x04" # Capability value length
887 "\x00\x02" # AFI (IPV6)
889 "\x05" # SAFI (MCAST-VPN)
891 optional_parameters_hex += optional_parameter_hex
893 if self.l3vpn_mcast or self.allf:
894 optional_parameter_hex = (
895 "\x02" # Param type ("Capability Ad")
896 "\x06" # Length (6 bytes)
897 "\x01" # Multiprotocol extetension capability,
898 "\x04" # Capability value length
899 "\x00\x01" # AFI (IPV4)
901 "\x81" # SAFI (L3VPN-MCAST)
903 optional_parameters_hex += optional_parameter_hex
904 optional_parameter_hex = (
905 "\x02" # Param type ("Capability Ad")
906 "\x06" # Length (6 bytes)
907 "\x01" # Multiprotocol extetension capability,
908 "\x04" # Capability value length
909 "\x00\x02" # AFI (IPV6)
911 "\x81" # SAFI (L3VPN-MCAST)
913 optional_parameters_hex += optional_parameter_hex
915 if self.l3vpn or self.allf:
916 optional_parameter_hex = (
917 "\x02" # Param type ("Capability Ad")
918 "\x06" # Length (6 bytes)
919 "\x01" # Multiprotocol extetension capability,
920 "\x04" # Capability value length
921 "\x00\x01" # AFI (IPV4)
923 "\x80" # SAFI (L3VPN-UNICAST)
925 optional_parameters_hex += optional_parameter_hex
926 optional_parameter_hex = (
927 "\x02" # Param type ("Capability Ad")
928 "\x06" # Length (6 bytes)
929 "\x01" # Multiprotocol extetension capability,
930 "\x04" # Capability value length
931 "\x00\x02" # AFI (IPV6)
933 "\x80" # SAFI (L3VPN-UNICAST)
935 optional_parameters_hex += optional_parameter_hex
937 if self.rt_constrain or self.allf:
938 optional_parameter_hex = (
939 "\x02" # Param type ("Capability Ad")
940 "\x06" # Length (6 bytes)
941 "\x01" # Multiprotocol extetension capability,
942 "\x04" # Capability value length
943 "\x00\x01" # AFI (IPV4)
945 "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
947 optional_parameters_hex += optional_parameter_hex
949 optional_parameter_hex = (
950 "\x02" # Param type ("Capability Ad")
951 "\x06" # Length (6 bytes)
952 "\x41" # "32 bit AS Numbers Support"
953 # (see RFC 6793, section 3)
954 "\x04" # Capability value length
956 optional_parameter_hex += (
957 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
959 optional_parameters_hex += optional_parameter_hex
962 b = list(bin(self.grace)[2:])
963 b = b + [0] * (3 - len(b))
966 restart_flag = "\x80\x05"
968 restart_flag = "\x00\x05"
974 ll_gr = "\x47\x07\x00\x01\x01\x00\x00\x00\x1e"
978 logger.debug("Grace parameters list: {}".format(b))
979 # "\x02" Param type ("Capability Ad")
980 # :param length: Length of whole message
981 # "\x40" Graceful-restart capability
982 # "\x06" Length (6 bytes)
983 # "\x00" Restart Flag (customizable - turned on when grace == 2,3,6,7)
984 # "\x05" Restart timer (5sec)
985 # "\x00\x01" AFI (IPV4)
986 # "\x01" SAFI (Unicast)
987 # "\x00" Ipv4 Flag (customizable - turned on when grace == 1,3,5,7)
988 # "\x47\x07\x00\x01\x01\x00\x00\x00\x1e" ipv4 ll-graceful-restart capability, timer 30sec
989 # ll-gr turned on when grace is between 4-7
990 optional_parameter_hex = "\x02{}\x40\x06{}\x00\x01\x01{}{}".format(
991 length, restart_flag, ipv4_flag, ll_gr)
992 optional_parameters_hex += optional_parameter_hex
994 # Optional Parameters Length
995 optional_parameters_length = len(optional_parameters_hex)
996 optional_parameters_length_hex = struct.pack("B",
997 optional_parameters_length)
999 # Length (big-endian)
1001 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
1002 len(my_autonomous_system_hex_2_bytes) +
1003 len(hold_time_hex) + len(bgp_identifier_hex) +
1004 len(optional_parameters_length_hex) +
1005 len(optional_parameters_hex)
1007 length_hex = struct.pack(">H", length)
1015 my_autonomous_system_hex_2_bytes +
1017 bgp_identifier_hex +
1018 optional_parameters_length_hex +
1019 optional_parameters_hex
1023 logger.debug("OPEN message encoding")
1024 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1025 logger.debug(" Length=" + str(length) + " (0x" +
1026 binascii.hexlify(length_hex) + ")")
1027 logger.debug(" Type=" + str(type) + " (0x" +
1028 binascii.hexlify(type_hex) + ")")
1029 logger.debug(" Version=" + str(version) + " (0x" +
1030 binascii.hexlify(version_hex) + ")")
1031 logger.debug(" My Autonomous System=" +
1032 str(my_autonomous_system_2_bytes) + " (0x" +
1033 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
1035 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
1036 binascii.hexlify(hold_time_hex) + ")")
1037 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
1038 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
1039 logger.debug(" Optional Parameters Length=" +
1040 str(optional_parameters_length) + " (0x" +
1041 binascii.hexlify(optional_parameters_length_hex) +
1043 logger.debug(" Optional Parameters=0x" +
1044 binascii.hexlify(optional_parameters_hex))
1045 logger.debug("OPEN message encoded: 0x%s",
1046 binascii.b2a_hex(message_hex))
1050 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
1051 wr_prefix_length=None, nlri_prefix_length=None,
1052 my_autonomous_system=None, next_hop=None,
1053 originator_id=None, cluster_list_item=None,
1054 end_of_rib=False, **ls_nlri_params):
1055 """Generates an UPDATE Message (rfc4271#section-4.3)
1058 :param wr_prefixes: see the rfc4271#section-4.3
1059 :param nlri_prefixes: see the rfc4271#section-4.3
1060 :param wr_prefix_length: see the rfc4271#section-4.3
1061 :param nlri_prefix_length: see the rfc4271#section-4.3
1062 :param my_autonomous_system: see the rfc4271#section-4.3
1063 :param next_hop: see the rfc4271#section-4.3
1065 :return: encoded UPDATE message in HEX
1068 # default values handling
1069 # TODO optimize default values handling (use e.g. dicionary.update() approach)
1070 if wr_prefixes is None:
1071 wr_prefixes = self.wr_prefixes_default
1072 if nlri_prefixes is None:
1073 nlri_prefixes = self.nlri_prefixes_default
1074 if wr_prefix_length is None:
1075 wr_prefix_length = self.prefix_length_default
1076 if nlri_prefix_length is None:
1077 nlri_prefix_length = self.prefix_length_default
1078 if my_autonomous_system is None:
1079 my_autonomous_system = self.my_autonomous_system_default
1080 if next_hop is None:
1081 next_hop = self.next_hop_default
1082 if originator_id is None:
1083 originator_id = self.originator_id_default
1084 if cluster_list_item is None:
1085 cluster_list_item = self.cluster_list_item_default
1086 ls_nlri = self.ls_nlri_default.copy()
1087 ls_nlri.update(ls_nlri_params)
1090 marker_hex = "\xFF" * 16
1094 type_hex = struct.pack("B", type)
1097 withdrawn_routes_hex = ""
1099 bytes = ((wr_prefix_length - 1) / 8) + 1
1100 for prefix in wr_prefixes:
1101 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1102 struct.pack(">I", int(prefix))[:bytes])
1103 withdrawn_routes_hex += withdrawn_route_hex
1105 # Withdrawn Routes Length
1106 withdrawn_routes_length = len(withdrawn_routes_hex)
1107 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1109 # TODO: to replace hardcoded string by encoding?
1111 path_attributes_hex = ""
1112 if not self.skipattr:
1113 path_attributes_hex += (
1114 "\x40" # Flags ("Well-Known")
1115 "\x01" # Type (ORIGIN)
1117 "\x00" # Origin: IGP
1119 path_attributes_hex += (
1120 "\x40" # Flags ("Well-Known")
1121 "\x02" # Type (AS_PATH)
1123 "\x02" # AS segment type (AS_SEQUENCE)
1124 "\x01" # AS segment length (1)
1126 my_as_hex = struct.pack(">I", my_autonomous_system)
1127 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1128 path_attributes_hex += (
1129 "\x40" # Flags ("Well-Known")
1130 "\x05" # Type (LOCAL_PREF)
1132 "\x00\x00\x00\x64" # (100)
1134 if nlri_prefixes != []:
1135 path_attributes_hex += (
1136 "\x40" # Flags ("Well-Known")
1137 "\x03" # Type (NEXT_HOP)
1140 next_hop_hex = struct.pack(">I", int(next_hop))
1141 path_attributes_hex += (
1142 next_hop_hex # IP address of the next hop (4 bytes)
1144 if originator_id is not None:
1145 path_attributes_hex += (
1146 "\x80" # Flags ("Optional, non-transitive")
1147 "\x09" # Type (ORIGINATOR_ID)
1149 ) # ORIGINATOR_ID (4 bytes)
1150 path_attributes_hex += struct.pack(">I", int(originator_id))
1151 if cluster_list_item is not None:
1152 path_attributes_hex += (
1153 "\x80" # Flags ("Optional, non-transitive")
1154 "\x0a" # Type (CLUSTER_LIST)
1156 ) # one CLUSTER_LIST item (4 bytes)
1157 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1159 if self.bgpls and not end_of_rib:
1160 path_attributes_hex += (
1161 "\x80" # Flags ("Optional, non-transitive")
1162 "\x0e" # Type (MP_REACH_NLRI)
1163 "\x22" # Length (34)
1164 "\x40\x04" # AFI (BGP-LS)
1165 "\x47" # SAFI (BGP-LS)
1166 "\x04" # Next Hop Length (4)
1168 path_attributes_hex += struct.pack(">I", int(next_hop))
1169 path_attributes_hex += "\x00" # Reserved
1170 path_attributes_hex += (
1171 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1172 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1173 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1175 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1176 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1177 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1178 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1179 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1181 # Total Path Attributes Length
1182 total_path_attributes_length = len(path_attributes_hex)
1183 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1185 # Network Layer Reachability Information
1188 bytes = ((nlri_prefix_length - 1) / 8) + 1
1189 for prefix in nlri_prefixes:
1190 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1191 struct.pack(">I", int(prefix))[:bytes])
1192 nlri_hex += nlri_prefix_hex
1194 # Length (big-endian)
1196 len(marker_hex) + 2 + len(type_hex) +
1197 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1198 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1200 length_hex = struct.pack(">H", length)
1207 withdrawn_routes_length_hex +
1208 withdrawn_routes_hex +
1209 total_path_attributes_length_hex +
1210 path_attributes_hex +
1214 if self.grace != 8 and self.grace != 0 and end_of_rib:
1215 message_hex = (marker_hex + binascii.unhexlify("00170200000000"))
1218 logger.debug("UPDATE message encoding")
1219 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1220 logger.debug(" Length=" + str(length) + " (0x" +
1221 binascii.hexlify(length_hex) + ")")
1222 logger.debug(" Type=" + str(type) + " (0x" +
1223 binascii.hexlify(type_hex) + ")")
1224 logger.debug(" withdrawn_routes_length=" +
1225 str(withdrawn_routes_length) + " (0x" +
1226 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1227 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1228 str(wr_prefix_length) + " (0x" +
1229 binascii.hexlify(withdrawn_routes_hex) + ")")
1230 if total_path_attributes_length:
1231 logger.debug(" Total Path Attributes Length=" +
1232 str(total_path_attributes_length) + " (0x" +
1233 binascii.hexlify(total_path_attributes_length_hex) + ")")
1234 logger.debug(" Path Attributes=" + "(0x" +
1235 binascii.hexlify(path_attributes_hex) + ")")
1236 logger.debug(" Origin=IGP")
1237 logger.debug(" AS path=" + str(my_autonomous_system))
1238 logger.debug(" Next hop=" + str(next_hop))
1239 if originator_id is not None:
1240 logger.debug(" Originator id=" + str(originator_id))
1241 if cluster_list_item is not None:
1242 logger.debug(" Cluster list=" + str(cluster_list_item))
1244 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1245 logger.debug(" Network Layer Reachability Information=" +
1246 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1247 " (0x" + binascii.hexlify(nlri_hex) + ")")
1248 logger.debug("UPDATE message encoded: 0x" +
1249 binascii.b2a_hex(message_hex))
1252 self.updates_sent += 1
1253 # returning encoded message
1256 def notification_message(self, error_code, error_subcode, data_hex=""):
1257 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1260 :param error_code: see the rfc4271#section-4.5
1261 :param error_subcode: see the rfc4271#section-4.5
1262 :param data_hex: see the rfc4271#section-4.5
1264 :return: encoded NOTIFICATION message in HEX
1268 marker_hex = "\xFF" * 16
1272 type_hex = struct.pack("B", type)
1275 error_code_hex = struct.pack("B", error_code)
1278 error_subcode_hex = struct.pack("B", error_subcode)
1280 # Length (big-endian)
1281 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1282 len(error_subcode_hex) + len(data_hex))
1283 length_hex = struct.pack(">H", length)
1285 # NOTIFICATION Message
1296 logger.debug("NOTIFICATION message encoding")
1297 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1298 logger.debug(" Length=" + str(length) + " (0x" +
1299 binascii.hexlify(length_hex) + ")")
1300 logger.debug(" Type=" + str(type) + " (0x" +
1301 binascii.hexlify(type_hex) + ")")
1302 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1303 binascii.hexlify(error_code_hex) + ")")
1304 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1305 binascii.hexlify(error_subcode_hex) + ")")
1306 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1307 logger.debug("NOTIFICATION message encoded: 0x%s",
1308 binascii.b2a_hex(message_hex))
1312 def keepalive_message(self):
1313 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1316 :return: encoded KEEP ALIVE message in HEX
1320 marker_hex = "\xFF" * 16
1324 type_hex = struct.pack("B", type)
1326 # Length (big-endian)
1327 length = len(marker_hex) + 2 + len(type_hex)
1328 length_hex = struct.pack(">H", length)
1330 # KEEP ALIVE Message
1338 logger.debug("KEEP ALIVE message encoding")
1339 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1340 logger.debug(" Length=" + str(length) + " (0x" +
1341 binascii.hexlify(length_hex) + ")")
1342 logger.debug(" Type=" + str(type) + " (0x" +
1343 binascii.hexlify(type_hex) + ")")
1344 logger.debug("KEEP ALIVE message encoded: 0x%s",
1345 binascii.b2a_hex(message_hex))
1350 class TimeTracker(object):
1351 """Class for tracking timers, both for my keepalives and
1355 def __init__(self, msg_in):
1356 """Initialisation. based on defaults and OPEN message from peer.
1359 msg_in: the OPEN message received from peer.
1361 # Note: Relative time is always named timedelta, to stress that
1362 # the (non-delta) time is absolute.
1363 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1364 # Upper bound for being stuck in the same state, we should
1365 # at least report something before continuing.
1366 # Negotiate the hold timer by taking the smaller
1367 # of the 2 values (mine and the peer's).
1368 hold_timedelta = 180 # Not an attribute of self yet.
1369 # TODO: Make the default value configurable,
1370 # default value could mirror what peer said.
1371 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1372 if hold_timedelta > peer_hold_timedelta:
1373 hold_timedelta = peer_hold_timedelta
1374 if hold_timedelta != 0 and hold_timedelta < 3:
1375 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1376 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1377 self.hold_timedelta = hold_timedelta
1378 # If we do not hear from peer this long, we assume it has died.
1379 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1380 # Upper limit for duration between messages, to avoid being
1381 # declared to be dead.
1382 # The same as calling snapshot(), but also declares a field.
1383 self.snapshot_time = time.time()
1384 # Sometimes we need to store time. This is where to get
1385 # the value from afterwards. Time_keepalive may be too strict.
1386 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1387 # At this time point, peer will be declared dead.
1388 self.my_keepalive_time = None # to be set later
1389 # At this point, we should be sending keepalive message.
1392 """Store current time in instance data to use later."""
1393 # Read as time before something interesting was called.
1394 self.snapshot_time = time.time()
1396 def reset_peer_hold_time(self):
1397 """Move hold time to future as peer has just proven it still lives."""
1398 self.peer_hold_time = time.time() + self.hold_timedelta
1400 # Some methods could rely on self.snapshot_time, but it is better
1401 # to require user to provide it explicitly.
1402 def reset_my_keepalive_time(self, keepalive_time):
1403 """Calculate and set the next my KEEP ALIVE timeout time
1406 :keepalive_time: the initial value of the KEEP ALIVE timer
1408 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1410 def is_time_for_my_keepalive(self):
1411 """Check for my KEEP ALIVE timeout occurence"""
1412 if self.hold_timedelta == 0:
1414 return self.snapshot_time >= self.my_keepalive_time
1416 def get_next_event_time(self):
1417 """Set the time of the next expected or to be sent KEEP ALIVE"""
1418 if self.hold_timedelta == 0:
1419 return self.snapshot_time + 86400
1420 return min(self.my_keepalive_time, self.peer_hold_time)
1422 def check_peer_hold_time(self, snapshot_time):
1423 """Raise error if nothing was read from peer until specified time."""
1424 # Hold time = 0 means keepalive checking off.
1425 if self.hold_timedelta != 0:
1426 # time.time() may be too strict
1427 if snapshot_time > self.peer_hold_time:
1428 logger.error("Peer has overstepped the hold timer.")
1429 raise RuntimeError("Peer has overstepped the hold timer.")
1430 # TODO: Include hold_timedelta?
1431 # TODO: Add notification sending (attempt). That means
1432 # move to write tracker.
1435 class ReadTracker(object):
1436 """Class for tracking read of mesages chunk by chunk and
1440 def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1441 l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1442 ipv6=False, grace=8, wait_for_read=10):
1443 """The reader initialisation.
1446 bgp_socket: socket to be used for sending
1447 timer: timer to be used for scheduling
1448 storage: thread safe dict
1449 evpn: flag that evpn functionality is tested
1450 mvpn: flag that mvpn functionality is tested
1451 grace: flag that grace-restart functionality is tested
1452 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1453 l3vpn: flag that l3vpn unicast functionality is tested
1454 rt_constrain: flag that rt-constrain functionality is tested
1455 allf: flag for all family testing.
1457 # References to outside objects.
1458 self.socket = bgp_socket
1460 # BGP marker length plus length field length.
1461 self.header_length = 18
1462 # TODO: make it class (constant) attribute
1463 # Computation of where next chunk ends depends on whether
1464 # we are beyond length field.
1465 self.reading_header = True
1466 # Countdown towards next size computation.
1467 self.bytes_to_read = self.header_length
1468 # Incremental buffer for message under read.
1470 # Initialising counters
1471 self.updates_received = 0
1472 self.prefixes_introduced = 0
1473 self.prefixes_withdrawn = 0
1474 self.rx_idle_time = 0
1475 self.rx_activity_detected = True
1476 self.storage = storage
1479 self.l3vpn_mcast = l3vpn_mcast
1481 self.rt_constrain = rt_constrain
1484 self.wfr = wait_for_read
1487 def read_message_chunk(self):
1488 """Read up to one message
1491 Currently it does not return anything.
1493 # TODO: We could return the whole message, currently not needed.
1494 # We assume the socket is readable.
1495 chunk_message = self.socket.recv(self.bytes_to_read)
1496 self.msg_in += chunk_message
1497 self.bytes_to_read -= len(chunk_message)
1498 # TODO: bytes_to_read < 0 is not possible, right?
1499 if not self.bytes_to_read:
1500 # Finished reading a logical block.
1501 if self.reading_header:
1502 # The logical block was a BGP header.
1503 # Now we know the size of the message.
1504 self.reading_header = False
1505 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1507 else: # We have finished reading the body of the message.
1508 # Peer has just proven it is still alive.
1509 self.timer.reset_peer_hold_time()
1510 # TODO: Do we want to count received messages?
1511 # This version ignores the received message.
1512 # TODO: Should we do validation and exit on anything
1513 # besides update or keepalive?
1514 # Prepare state for reading another message.
1515 message_type_hex = self.msg_in[self.header_length]
1516 if message_type_hex == "\x01":
1517 logger.info("OPEN message received: 0x%s",
1518 binascii.b2a_hex(self.msg_in))
1519 elif message_type_hex == "\x02":
1520 logger.debug("UPDATE message received: 0x%s",
1521 binascii.b2a_hex(self.msg_in))
1522 self.decode_update_message(self.msg_in)
1523 elif message_type_hex == "\x03":
1524 logger.info("NOTIFICATION message received: 0x%s",
1525 binascii.b2a_hex(self.msg_in))
1526 elif message_type_hex == "\x04":
1527 logger.info("KEEP ALIVE message received: 0x%s",
1528 binascii.b2a_hex(self.msg_in))
1530 logger.warning("Unexpected message received: 0x%s",
1531 binascii.b2a_hex(self.msg_in))
1533 self.reading_header = True
1534 self.bytes_to_read = self.header_length
1535 # We should not act upon peer_hold_time if we are reading
1536 # something right now.
1539 def decode_path_attributes(self, path_attributes_hex):
1540 """Decode the Path Attributes field (rfc4271#section-4.3)
1543 :path_attributes: path_attributes field to be decoded in hex
1547 hex_to_decode = path_attributes_hex
1549 while len(hex_to_decode):
1550 attr_flags_hex = hex_to_decode[0]
1551 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1552 # attr_optional_bit = attr_flags & 128
1553 # attr_transitive_bit = attr_flags & 64
1554 # attr_partial_bit = attr_flags & 32
1555 attr_extended_length_bit = attr_flags & 16
1557 attr_type_code_hex = hex_to_decode[1]
1558 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1560 if attr_extended_length_bit:
1561 attr_length_hex = hex_to_decode[2:4]
1562 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1563 attr_value_hex = hex_to_decode[4:4 + attr_length]
1564 hex_to_decode = hex_to_decode[4 + attr_length:]
1566 attr_length_hex = hex_to_decode[2]
1567 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1568 attr_value_hex = hex_to_decode[3:3 + attr_length]
1569 hex_to_decode = hex_to_decode[3 + attr_length:]
1571 if attr_type_code == 1:
1572 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1573 binascii.b2a_hex(attr_flags_hex))
1574 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1575 elif attr_type_code == 2:
1576 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1577 binascii.b2a_hex(attr_flags_hex))
1578 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1579 elif attr_type_code == 3:
1580 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1581 binascii.b2a_hex(attr_flags_hex))
1582 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1583 elif attr_type_code == 4:
1584 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1585 binascii.b2a_hex(attr_flags_hex))
1586 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1587 elif attr_type_code == 5:
1588 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1589 binascii.b2a_hex(attr_flags_hex))
1590 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1591 elif attr_type_code == 6:
1592 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1593 binascii.b2a_hex(attr_flags_hex))
1594 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1595 elif attr_type_code == 7:
1596 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1597 binascii.b2a_hex(attr_flags_hex))
1598 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1599 elif attr_type_code == 9: # rfc4456#section-8
1600 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1601 binascii.b2a_hex(attr_flags_hex))
1602 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1603 elif attr_type_code == 10: # rfc4456#section-8
1604 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1605 binascii.b2a_hex(attr_flags_hex))
1606 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1607 elif attr_type_code == 14: # rfc4760#section-3
1608 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1609 binascii.b2a_hex(attr_flags_hex))
1610 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1611 address_family_identifier_hex = attr_value_hex[0:2]
1612 logger.debug(" Address Family Identifier=0x%s",
1613 binascii.b2a_hex(address_family_identifier_hex))
1614 subsequent_address_family_identifier_hex = attr_value_hex[2]
1615 logger.debug(" Subsequent Address Family Identifier=0x%s",
1616 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1617 next_hop_netaddr_len_hex = attr_value_hex[3]
1618 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1619 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1620 next_hop_netaddr_len,
1621 binascii.b2a_hex(next_hop_netaddr_len_hex))
1622 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1623 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1624 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1625 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1626 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1627 logger.debug(" Reserved=0x%s",
1628 binascii.b2a_hex(reserved_hex))
1629 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1630 logger.debug(" Network Layer Reachability Information=0x%s",
1631 binascii.b2a_hex(nlri_hex))
1632 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1633 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1634 for prefix in nlri_prefix_list:
1635 logger.debug(" nlri_prefix_received: %s", prefix)
1636 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1637 elif attr_type_code == 15: # rfc4760#section-4
1638 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1639 binascii.b2a_hex(attr_flags_hex))
1640 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1641 address_family_identifier_hex = attr_value_hex[0:2]
1642 logger.debug(" Address Family Identifier=0x%s",
1643 binascii.b2a_hex(address_family_identifier_hex))
1644 subsequent_address_family_identifier_hex = attr_value_hex[2]
1645 logger.debug(" Subsequent Address Family Identifier=0x%s",
1646 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1647 wd_hex = attr_value_hex[3:]
1648 logger.debug(" Withdrawn Routes=0x%s",
1649 binascii.b2a_hex(wd_hex))
1650 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1651 logger.debug(" Withdrawn routes prefix list: %s",
1653 for prefix in wdr_prefix_list:
1654 logger.debug(" withdrawn_prefix_received: %s", prefix)
1655 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1657 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1658 binascii.b2a_hex(attr_flags_hex))
1659 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1662 def decode_update_message(self, msg):
1663 """Decode an UPDATE message (rfc4271#section-4.3)
1666 :msg: message to be decoded in hex
1670 logger.debug("Decoding update message:")
1671 # message header - marker
1672 marker_hex = msg[:16]
1673 logger.debug("Message header marker: 0x%s",
1674 binascii.b2a_hex(marker_hex))
1675 # message header - message length
1676 msg_length_hex = msg[16:18]
1677 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1678 logger.debug("Message lenght: 0x%s (%s)",
1679 binascii.b2a_hex(msg_length_hex), msg_length)
1680 # message header - message type
1681 msg_type_hex = msg[18:19]
1682 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1684 with self.storage as stor:
1685 # this will replace the previously stored message
1686 stor['update'] = binascii.hexlify(msg)
1688 logger.debug("Evpn {}".format(self.evpn))
1690 logger.debug("Skipping update decoding due to evpn data expected")
1693 logger.debug("Graceful-restart {}".format(self.grace))
1695 logger.debug("Skipping update decoding due to graceful-restart data expected")
1698 logger.debug("Mvpn {}".format(self.mvpn))
1700 logger.debug("Skipping update decoding due to mvpn data expected")
1703 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1704 if self.l3vpn_mcast:
1705 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1708 logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1709 if self.l3vpn_mcast:
1710 logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1713 logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1714 if self.rt_constrain:
1715 logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1718 logger.debug("Ipv6-Unicast {}".format(self.ipv6))
1720 logger.debug("Skipping update decoding due to Ipv6 data expected")
1723 logger.debug("Allf {}".format(self.allf))
1725 logger.debug("Skipping update decoding")
1729 logger.debug("Message type: 0x%s (update)",
1730 binascii.b2a_hex(msg_type_hex))
1731 # withdrawn routes length
1732 wdr_length_hex = msg[19:21]
1733 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1734 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1735 binascii.b2a_hex(wdr_length_hex), wdr_length)
1737 wdr_hex = msg[21:21 + wdr_length]
1738 logger.debug("Withdrawn routes: 0x%s",
1739 binascii.b2a_hex(wdr_hex))
1740 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1741 logger.debug("Withdrawn routes prefix list: %s",
1743 for prefix in wdr_prefix_list:
1744 logger.debug("withdrawn_prefix_received: %s", prefix)
1745 # total path attribute length
1746 total_pa_length_offset = 21 + wdr_length
1747 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1748 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1749 logger.debug("Total path attribute lenght: 0x%s (%s)",
1750 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1752 pa_offset = total_pa_length_offset + 2
1753 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1754 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1755 self.decode_path_attributes(pa_hex)
1756 # network layer reachability information length
1757 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1758 logger.debug("Calculated NLRI length: %s", nlri_length)
1759 # network layer reachability information
1760 nlri_offset = pa_offset + total_pa_length
1761 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1762 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1763 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1764 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1765 for prefix in nlri_prefix_list:
1766 logger.debug("nlri_prefix_received: %s", prefix)
1768 self.updates_received += 1
1769 self.prefixes_introduced += len(nlri_prefix_list)
1770 self.prefixes_withdrawn += len(wdr_prefix_list)
1772 logger.error("Unexpeced message type 0x%s in 0x%s",
1773 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1775 def wait_for_read(self):
1776 """Read message until timeout (next expected event).
1779 Used when no more updates has to be sent to avoid busy-wait.
1780 Currently it does not return anything.
1782 # Compute time to the first predictable state change
1783 event_time = self.timer.get_next_event_time()
1784 # snapshot_time would be imprecise
1785 wait_timedelta = min(event_time - time.time(), self.wfr)
1786 if wait_timedelta < 0:
1787 # The program got around to waiting to an event in "very near
1788 # future" so late that it became a "past" event, thus tell
1789 # "select" to not wait at all. Passing negative timedelta to
1790 # select() would lead to either waiting forever (for -1) or
1791 # select.error("Invalid parameter") (for everything else).
1793 # And wait for event or something to read.
1795 if not self.rx_activity_detected or not (self.updates_received % 100):
1796 # right time to write statistics to the log (not for every update and
1797 # not too frequently to avoid having large log files)
1798 logger.info("total_received_update_message_counter: %s",
1799 self.updates_received)
1800 logger.info("total_received_nlri_prefix_counter: %s",
1801 self.prefixes_introduced)
1802 logger.info("total_received_withdrawn_prefix_counter: %s",
1803 self.prefixes_withdrawn)
1805 start_time = time.time()
1806 select.select([self.socket], [], [self.socket], wait_timedelta)
1807 timedelta = time.time() - start_time
1808 self.rx_idle_time += timedelta
1809 self.rx_activity_detected = timedelta < 1
1811 if not self.rx_activity_detected or not (self.updates_received % 100):
1812 # right time to write statistics to the log (not for every update and
1813 # not too frequently to avoid having large log files)
1814 logger.info("... idle for %.3fs", timedelta)
1815 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1819 class WriteTracker(object):
1820 """Class tracking enqueueing messages and sending chunks of them."""
1822 def __init__(self, bgp_socket, generator, timer):
1823 """The writter initialisation.
1826 bgp_socket: socket to be used for sending
1827 generator: generator to be used for message generation
1828 timer: timer to be used for scheduling
1830 # References to outside objects,
1831 self.socket = bgp_socket
1832 self.generator = generator
1834 # Really new fields.
1835 # TODO: Would attribute docstrings add anything substantial?
1836 self.sending_message = False
1837 self.bytes_to_send = 0
1840 def enqueue_message_for_sending(self, message):
1841 """Enqueue message and change state.
1844 message: message to be enqueued into the msg_out buffer
1846 self.msg_out += message
1847 self.bytes_to_send += len(message)
1848 self.sending_message = True
1850 def send_message_chunk_is_whole(self):
1851 """Send enqueued data from msg_out buffer
1854 :return: true if no remaining data to send
1856 # We assume there is a msg_out to send and socket is writable.
1857 # print "going to send", repr(self.msg_out)
1858 self.timer.snapshot()
1859 bytes_sent = self.socket.send(self.msg_out)
1860 # Forget the part of message that was sent.
1861 self.msg_out = self.msg_out[bytes_sent:]
1862 self.bytes_to_send -= bytes_sent
1863 if not self.bytes_to_send:
1864 # TODO: Is it possible to hit negative bytes_to_send?
1865 self.sending_message = False
1866 # We should have reset hold timer on peer side.
1867 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1868 # The possible reason for not prioritizing reads is gone.
1873 class StateTracker(object):
1874 """Main loop has state so complex it warrants this separate class."""
1876 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1877 """The state tracker initialisation.
1880 bgp_socket: socket to be used for sending / receiving
1881 generator: generator to be used for message generation
1882 timer: timer to be used for scheduling
1883 inqueue: user initiated messages queue
1884 storage: thread safe dict to store data for the rpc server
1885 cliargs: cli args from the user
1887 # References to outside objects.
1888 self.socket = bgp_socket
1889 self.generator = generator
1892 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1893 l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1894 rt_constrain=cliargs.rt_constrain, ipv6=cliargs.ipv6, grace=cliargs.grace,
1895 wait_for_read=cliargs.wfr)
1896 self.writer = WriteTracker(bgp_socket, generator, timer)
1897 # Prioritization state.
1898 self.prioritize_writing = False
1899 # In general, we prioritize reading over writing. But in order
1900 # not to get blocked by neverending reads, we should
1901 # check whether we are not risking running out of holdtime.
1902 # So in some situations, this field is set to True to attempt
1903 # finishing sending a message, after which this field resets
1905 # TODO: Alternative is to switch fairly between reading and
1906 # writing (called round robin from now on).
1907 # Message counting is done in generator.
1908 self.inqueue = inqueue
1910 def perform_one_loop_iteration(self):
1911 """ The main loop iteration
1914 Calculates priority, resolves all conditions, calls
1915 appropriate method and returns to caller to repeat.
1917 self.timer.snapshot()
1918 if not self.prioritize_writing:
1919 if self.timer.is_time_for_my_keepalive():
1920 if not self.writer.sending_message:
1921 # We need to schedule a keepalive ASAP.
1922 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1923 logger.info("KEEP ALIVE is sent.")
1924 # We are sending a message now, so let's prioritize it.
1925 self.prioritize_writing = True
1928 msg = self.inqueue.get_nowait()
1929 logger.info("Received message: {}".format(msg))
1930 msgbin = binascii.unhexlify(msg)
1931 self.writer.enqueue_message_for_sending(msgbin)
1934 # Now we know what our priorities are, we have to check
1935 # which actions are available.
1936 # socket.socket() returns three lists,
1937 # we store them to list of lists.
1938 list_list = select.select([self.socket], [self.socket], [self.socket],
1939 self.timer.report_timedelta)
1940 read_list, write_list, except_list = list_list
1941 # Lists are unpacked, each is either [] or [self.socket],
1942 # so we will test them as boolean.
1944 logger.error("Exceptional state on the socket.")
1945 raise RuntimeError("Exceptional state on socket", self.socket)
1946 # We will do either read or write.
1947 if not (self.prioritize_writing and write_list):
1948 # Either we have no reason to rush writes,
1949 # or the socket is not writable.
1950 # We are focusing on reading here.
1951 if read_list: # there is something to read indeed
1952 # In this case we want to read chunk of message
1953 # and repeat the select,
1954 self.reader.read_message_chunk()
1956 # We were focusing on reading, but nothing to read was there.
1957 # Good time to check peer for hold timer.
1958 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1959 # Quiet on the read front, we can have attempt to write.
1961 # Either we really want to reset peer's view of our hold
1962 # timer, or there was nothing to read.
1963 # Were we in the middle of sending a message?
1964 if self.writer.sending_message:
1965 # Was it the end of a message?
1966 whole = self.writer.send_message_chunk_is_whole()
1967 # We were pressed to send something and we did it.
1968 if self.prioritize_writing and whole:
1969 # We prioritize reading again.
1970 self.prioritize_writing = False
1972 # Finally to check if still update messages to be generated.
1973 if self.generator.remaining_prefixes:
1974 msg_out = self.generator.compose_update_message()
1975 if not self.generator.remaining_prefixes:
1976 # We have just finished update generation,
1977 # end-of-rib is due.
1978 logger.info("All update messages generated.")
1979 logger.info("Storing performance results.")
1980 self.generator.store_results()
1981 logger.info("Finally an END-OF-RIB is sent.")
1982 msg_out += self.generator.update_message(wr_prefixes=[],
1985 self.writer.enqueue_message_for_sending(msg_out)
1986 # Attempt for real sending to be done in next iteration.
1988 # Nothing to write anymore.
1989 # To avoid busy loop, we do idle waiting here.
1990 self.reader.wait_for_read()
1992 # We can neither read nor write.
1993 logger.warning("Input and output both blocked for " +
1994 str(self.timer.report_timedelta) + " seconds.")
1995 # FIXME: Are we sure select has been really waiting
2000 def create_logger(loglevel, logfile):
2001 """Create logger object
2004 :loglevel: log level
2005 :logfile: log file name
2007 :return: logger object
2009 logger = logging.getLogger("logger")
2010 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
2011 console_handler = logging.StreamHandler()
2012 file_handler = logging.FileHandler(logfile, mode="w")
2013 console_handler.setFormatter(log_formatter)
2014 file_handler.setFormatter(log_formatter)
2015 logger.addHandler(console_handler)
2016 logger.addHandler(file_handler)
2017 logger.setLevel(loglevel)
2021 def job(arguments, inqueue, storage):
2022 """One time initialisation and iterations looping.
2024 Establish BGP connection and run iterations.
2027 :arguments: Command line arguments
2028 :inqueue: Data to be sent from play.py
2029 :storage: Shared dict for rpc server
2033 bgp_socket = establish_connection(arguments)
2034 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
2035 # Receive open message before sending anything.
2036 # FIXME: Add parameter to send default open message first,
2037 # to work with "you first" peers.
2038 msg_in = read_open_message(bgp_socket)
2039 logger.info(binascii.hexlify(msg_in))
2040 storage['open'] = binascii.hexlify(msg_in)
2041 timer = TimeTracker(msg_in)
2042 generator = MessageGenerator(arguments)
2043 msg_out = generator.open_message()
2044 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
2045 # Send our open message to the peer.
2046 bgp_socket.send(msg_out)
2047 # Wait for confirming keepalive.
2048 # TODO: Surely in just one packet?
2049 # Using exact keepalive length to not to see possible updates.
2050 msg_in = bgp_socket.recv(19)
2051 if msg_in != generator.keepalive_message():
2052 error_msg = "Open not confirmed by keepalive, instead got"
2053 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
2054 raise MessageError(error_msg, msg_in)
2055 timer.reset_peer_hold_time()
2056 # Send the keepalive to indicate the connection is accepted.
2057 timer.snapshot() # Remember this time.
2058 msg_out = generator.keepalive_message()
2059 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
2060 bgp_socket.send(msg_out)
2061 # Use the remembered time.
2062 timer.reset_my_keepalive_time(timer.snapshot_time)
2063 # End of initial handshake phase.
2064 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
2065 while True: # main reactor loop
2066 state.perform_one_loop_iteration()
2070 '''Handler for SimpleXMLRPCServer'''
2072 def __init__(self, sendqueue, storage):
2076 :sendqueue: queue for data to be sent towards odl
2077 :storage: thread safe dict
2079 self.queue = sendqueue
2080 self.storage = storage
2082 def send(self, text):
2086 :text: hes string of the data to be sent
2088 self.queue.put(text)
2090 def get(self, text=''):
2091 '''Reads data form the storage
2093 - returns stored data or an empty string, at the moment only
2097 :text: a key to the storage to get the data
2101 with self.storage as stor:
2102 return stor.get(text, '')
2104 def clean(self, text=''):
2105 '''Cleans data form the storage
2108 :text: a key to the storage to clean the data
2110 with self.storage as stor:
2115 def threaded_job(arguments):
2116 """Run the job threaded
2119 :arguments: Command line arguments
2123 amount_left = arguments.amount
2124 utils_left = arguments.multiplicity
2125 prefix_current = arguments.firstprefix
2126 myip_current = arguments.myip
2127 port = arguments.port
2129 rpcqueue = Queue.Queue()
2130 storage = SafeDict()
2133 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
2134 amount_left -= amount_per_util
2137 args = deepcopy(arguments)
2138 args.amount = amount_per_util
2139 args.firstprefix = prefix_current
2140 args.myip = myip_current
2141 thread_args.append(args)
2145 prefix_current += amount_per_util * 16
2150 for t in thread_args:
2151 thread.start_new_thread(job, (t, rpcqueue, storage))
2153 print "Error: unable to start thread."
2156 if arguments.usepeerip:
2157 ip = arguments.peerip
2160 rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2161 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2162 rpcserver.serve_forever()
2165 if __name__ == "__main__":
2166 arguments = parse_arguments()
2167 logger = create_logger(arguments.loglevel, arguments.logfile)
2168 threaded_job(arguments)