1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Rpc server port."
69 parser.add_argument("--port", default="8000", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be announced in one iteration"
71 parser.add_argument("--insert", default="1", type=int, help=str_help)
72 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
73 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
74 str_help = "The number of prefixes to process without withdrawals"
75 parser.add_argument("--prefill", default="0", type=int, help=str_help)
76 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
77 parser.add_argument("--updates", choices=["single", "separate"],
78 default=["separate"], help=str_help)
79 str_help = "Base prefix IP address for prefix generation"
80 parser.add_argument("--firstprefix", default="8.0.1.0",
81 type=ipaddr.IPv4Address, help=str_help)
82 str_help = "The prefix length."
83 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
84 str_help = "Listen for connection, instead of initiating it."
85 parser.add_argument("--listen", action="store_true", help=str_help)
86 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
87 "Default value only suitable for listening.")
88 parser.add_argument("--myip", default="0.0.0.0",
89 type=ipaddr.IPv4Address, help=str_help)
90 str_help = ("TCP port to bind to when listening or initiating connection." +
91 "Default only suitable for initiating.")
92 parser.add_argument("--myport", default="0", type=int, help=str_help)
93 str_help = "The IP of the next hop to be placed into the update messages."
94 parser.add_argument("--nexthop", default="192.0.2.1",
95 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
96 str_help = "Identifier of the route originator."
97 parser.add_argument("--originator", default=None,
98 type=ipaddr.IPv4Address, dest="originator", help=str_help)
99 str_help = "Cluster list item identifier."
100 parser.add_argument("--cluster", default=None,
101 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
102 str_help = ("Numeric IP Address to try to connect to." +
103 "Currently no effect in listening mode.")
104 parser.add_argument("--peerip", default="127.0.0.2",
105 type=ipaddr.IPv4Address, help=str_help)
106 str_help = "TCP port to try to connect to. No effect in listening mode."
107 parser.add_argument("--peerport", default="179", type=int, help=str_help)
108 str_help = "Local hold time."
109 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
110 str_help = "Log level (--error, --warning, --info, --debug)"
111 parser.add_argument("--error", dest="loglevel", action="store_const",
112 const=logging.ERROR, default=logging.INFO,
114 parser.add_argument("--warning", dest="loglevel", action="store_const",
115 const=logging.WARNING, default=logging.INFO,
117 parser.add_argument("--info", dest="loglevel", action="store_const",
118 const=logging.INFO, default=logging.INFO,
120 parser.add_argument("--debug", dest="loglevel", action="store_const",
121 const=logging.DEBUG, default=logging.INFO,
123 str_help = "Log file name"
124 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
125 str_help = "Trailing part of the csv result files for plotting purposes"
126 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
127 str_help = "Minimum number of updates to reach to include result into csv."
128 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
129 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
130 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
131 str_help = "Using peerip instead of myip for xmlrpc server"
132 parser.add_argument("--usepeerip", default=False, action="store_true", help=str_help)
133 str_help = "Link-State NLRI supported"
134 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
135 str_help = "Link-State NLRI: Identifier"
136 parser.add_argument("-lsid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: Tunnel ID"
138 parser.add_argument("-lstid", default="1", type=int, help=str_help)
139 str_help = "Link-State NLRI: LSP ID"
140 parser.add_argument("-lspid", default="1", type=int, help=str_help)
141 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
142 parser.add_argument("--lstsaddr", default="1.2.3.4",
143 type=ipaddr.IPv4Address, help=str_help)
144 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
145 parser.add_argument("--lsteaddr", default="5.6.7.8",
146 type=ipaddr.IPv4Address, help=str_help)
147 str_help = "Link-State NLRI: Identifier Step"
148 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
149 str_help = "Link-State NLRI: Tunnel ID Step"
150 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
151 str_help = "Link-State NLRI: LSP ID Step"
152 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
153 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
154 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
155 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
156 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
157 str_help = "How many play utilities are to be started."
158 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
159 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
160 Enabling this flag makes the script not decoding the update mesage, because of not\
161 supported decoding for these elements."
162 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
163 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
164 Enabling this flag makes the script not decoding the update mesage, because of not\
165 supported decoding for these elements."
166 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
167 str_help = "Open message includes L3VPN-MULTICAST arguments.\
168 Enabling this flag makes the script not decoding the update mesage, because of not\
169 supported decoding for these elements."
170 parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
171 str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
172 parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
173 str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
174 parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
175 str_help = "Add all supported families without message decoding."
176 parser.add_argument("--allf", default=False, action="store_true", help=str_help)
177 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
178 str_help = "Skipping well known attributes for update message"
179 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
180 arguments = parser.parse_args()
181 if arguments.multiplicity < 1:
182 print "Multiplicity", arguments.multiplicity, "is not positive."
184 # TODO: Are sanity checks (such as asnumber>=0) required?
188 def establish_connection(arguments):
189 """Establish connection to BGP peer.
192 :arguments: following command-line arguments are used
193 - arguments.myip: local IP address
194 - arguments.myport: local port
195 - arguments.peerip: remote IP address
196 - arguments.peerport: remote port
201 logger.info("Connecting in the listening mode.")
202 logger.debug("Local IP address: " + str(arguments.myip))
203 logger.debug("Local port: " + str(arguments.myport))
204 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
205 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
206 # bind need single tuple as argument
207 listening_socket.bind((str(arguments.myip), arguments.myport))
208 listening_socket.listen(1)
209 bgp_socket, _ = listening_socket.accept()
210 # TODO: Verify client IP is cotroller IP.
211 listening_socket.close()
213 logger.info("Connecting in the talking mode.")
214 logger.debug("Local IP address: " + str(arguments.myip))
215 logger.debug("Local port: " + str(arguments.myport))
216 logger.debug("Remote IP address: " + str(arguments.peerip))
217 logger.debug("Remote port: " + str(arguments.peerport))
218 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
219 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
220 # bind to force specified address and port
221 talking_socket.bind((str(arguments.myip), arguments.myport))
222 # socket does not spead ipaddr, hence str()
223 talking_socket.connect((str(arguments.peerip), arguments.peerport))
224 bgp_socket = talking_socket
225 logger.info("Connected to ODL.")
229 def get_short_int_from_message(message, offset=16):
230 """Extract 2-bytes number from provided message.
233 :message: given message
234 :offset: offset of the short_int inside the message
236 :return: required short_inf value.
238 default offset value is the BGP message size offset.
240 high_byte_int = ord(message[offset])
241 low_byte_int = ord(message[offset + 1])
242 short_int = high_byte_int * 256 + low_byte_int
246 def get_prefix_list_from_hex(prefixes_hex):
247 """Get decoded list of prefixes (rfc4271#section-4.3)
250 :prefixes_hex: list of prefixes to be decoded in hex
252 :return: list of prefixes in the form of ip address (X.X.X.X/X)
256 while offset < len(prefixes_hex):
257 prefix_bit_len_hex = prefixes_hex[offset]
258 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
259 prefix_len = ((prefix_bit_len - 1) / 8) + 1
260 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
261 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
262 offset += 1 + prefix_len
263 prefix_list.append(prefix + "/" + str(prefix_bit_len))
267 class MessageError(ValueError):
268 """Value error with logging optimized for hexlified messages."""
270 def __init__(self, text, message, *args):
273 Store and call super init for textual comment,
274 store raw message which caused it.
278 super(MessageError, self).__init__(text, message, *args)
281 """Generate human readable error message.
284 :return: human readable message as string
286 Use a placeholder string if the message is to be empty.
288 message = binascii.hexlify(self.msg)
290 message = "(empty message)"
291 return self.text + ": " + message
294 def read_open_message(bgp_socket):
295 """Receive peer's OPEN message
298 :bgp_socket: the socket to be read
300 :return: received OPEN message.
302 Performs just basic incomming message checks
304 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
305 # TODO: Can the incoming open message be split in more than one packet?
308 # 37 is minimal length of open message with 4-byte AS number.
310 "Message length (" + str(len(msg_in)) + ") is smaller than "
311 "minimal length of OPEN message with 4-byte AS number (37)"
313 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
314 raise MessageError(error_msg, msg_in)
315 # TODO: We could check BGP marker, but it is defined only later;
317 reported_length = get_short_int_from_message(msg_in)
318 if len(msg_in) != reported_length:
320 "Expected message length (" + reported_length +
321 ") does not match actual length (" + str(len(msg_in)) + ")"
323 logger.error(error_msg + binascii.hexlify(msg_in))
324 raise MessageError(error_msg, msg_in)
325 logger.info("Open message received.")
329 class MessageGenerator(object):
330 """Class which generates messages, holds states and configuration values."""
332 # TODO: Define bgp marker as a class (constant) variable.
333 def __init__(self, args):
334 """Initialisation according to command-line args.
337 :args: argsparser's Namespace object which contains command-line
338 options for MesageGenerator initialisation
340 Calculates and stores default values used later on for
343 self.total_prefix_amount = args.amount
344 # Number of update messages left to be sent.
345 self.remaining_prefixes = self.total_prefix_amount
347 # New parameters initialisation
348 self.port = args.port
350 self.prefix_base_default = args.firstprefix
351 self.prefix_length_default = args.prefixlen
352 self.wr_prefixes_default = []
353 self.nlri_prefixes_default = []
354 self.version_default = 4
355 self.my_autonomous_system_default = args.asnumber
356 self.hold_time_default = args.holdtime # Local hold time.
357 self.bgp_identifier_default = int(args.myip)
358 self.next_hop_default = args.nexthop
359 self.originator_id_default = args.originator
360 self.cluster_list_item_default = args.cluster
361 self.single_update_default = args.updates == "single"
362 self.randomize_updates_default = args.updates == "random"
363 self.prefix_count_to_add_default = args.insert
364 self.prefix_count_to_del_default = args.withdraw
365 if self.prefix_count_to_del_default < 0:
366 self.prefix_count_to_del_default = 0
367 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
368 # total number of prefixes must grow to avoid infinite test loop
369 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
370 self.slot_size_default = self.prefix_count_to_add_default
371 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
372 self.results_file_name_default = args.results
373 self.performance_threshold_default = args.threshold
374 self.rfc4760 = args.rfc4760
375 self.bgpls = args.bgpls
376 self.evpn = args.evpn
377 self.mvpn = args.mvpn
378 self.l3vpn_mcast = args.l3vpn_mcast
379 self.l3vpn = args.l3vpn
380 self.rt_constrain = args.rt_constrain
381 self.allf = args.allf
382 self.skipattr = args.skipattr
383 # Default values when BGP-LS Attributes are used
385 self.prefix_count_to_add_default = 1
386 self.prefix_count_to_del_default = 0
387 self.ls_nlri_default = {"Identifier": args.lsid,
388 "TunnelID": args.lstid,
390 "IPv4TunnelSenderAddress": args.lstsaddr,
391 "IPv4TunnelEndPointAddress": args.lsteaddr}
392 self.lsid_step = args.lsidstep
393 self.lstid_step = args.lstidstep
394 self.lspid_step = args.lspidstep
395 self.lstsaddr_step = args.lstsaddrstep
396 self.lsteaddr_step = args.lsteaddrstep
397 # Default values used for randomized part
398 s1_slots = ((self.total_prefix_amount -
399 self.remaining_prefixes_threshold - 1) /
400 self.prefix_count_to_add_default + 1)
401 s2_slots = ((self.remaining_prefixes_threshold - 1) /
402 (self.prefix_count_to_add_default -
403 self.prefix_count_to_del_default) + 1)
405 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
406 s2_first_index = s1_slots * self.prefix_count_to_add_default
407 s2_last_index = (s2_first_index +
408 s2_slots * (self.prefix_count_to_add_default -
409 self.prefix_count_to_del_default) - 1)
410 self.slot_gap_default = ((self.total_prefix_amount -
411 self.remaining_prefixes_threshold - 1) /
412 self.prefix_count_to_add_default + 1)
413 self.randomize_lowest_default = s2_first_index
414 self.randomize_highest_default = s2_last_index
415 # Initialising counters
416 self.phase1_start_time = 0
417 self.phase1_stop_time = 0
418 self.phase2_start_time = 0
419 self.phase2_stop_time = 0
420 self.phase1_updates_sent = 0
421 self.phase2_updates_sent = 0
422 self.updates_sent = 0
424 self.log_info = args.loglevel <= logging.INFO
425 self.log_debug = args.loglevel <= logging.DEBUG
427 Flags needed for the MessageGenerator performance optimization.
428 Calling logger methods each iteration even with proper log level set
429 slows down significantly the MessageGenerator performance.
430 Measured total generation time (1M updates, dry run, error log level):
431 - logging based on basic logger features: 36,2s
432 - logging based on advanced logger features (lazy logging): 21,2s
433 - conditional calling of logger methods enclosed inside condition: 8,6s
436 logger.info("Generator initialisation")
437 logger.info(" Target total number of prefixes to be introduced: " +
438 str(self.total_prefix_amount))
439 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
440 str(self.prefix_length_default))
441 logger.info(" My Autonomous System number: " +
442 str(self.my_autonomous_system_default))
443 logger.info(" My Hold Time: " + str(self.hold_time_default))
444 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
445 logger.info(" Next Hop: " + str(self.next_hop_default))
446 logger.info(" Originator ID: " + str(self.originator_id_default))
447 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
448 logger.info(" Prefix count to be inserted at once: " +
449 str(self.prefix_count_to_add_default))
450 logger.info(" Prefix count to be withdrawn at once: " +
451 str(self.prefix_count_to_del_default))
452 logger.info(" Fast pre-fill up to " +
453 str(self.total_prefix_amount -
454 self.remaining_prefixes_threshold) + " prefixes")
455 logger.info(" Remaining number of prefixes to be processed " +
456 "in parallel with withdrawals: " +
457 str(self.remaining_prefixes_threshold))
458 logger.debug(" Prefix index range used after pre-fill procedure [" +
459 str(self.randomize_lowest_default) + ", " +
460 str(self.randomize_highest_default) + "]")
461 if self.single_update_default:
462 logger.info(" Common single UPDATE will be generated " +
463 "for both NLRI & WITHDRAWN lists")
465 logger.info(" Two separate UPDATEs will be generated " +
466 "for each NLRI & WITHDRAWN lists")
467 if self.randomize_updates_default:
468 logger.info(" Generation of UPDATE messages will be randomized")
469 logger.info(" Let\'s go ...\n")
471 # TODO: Notification for hold timer expiration can be handy.
473 def store_results(self, file_name=None, threshold=None):
474 """ Stores specified results into files based on file_name value.
477 :param file_name: Trailing (common) part of result file names
478 :param threshold: Minimum number of sent updates needed for each
479 result to be included into result csv file
480 (mainly needed because of the result accuracy)
484 # default values handling
485 # TODO optimize default values handling (use e.g. dicionary.update() approach)
486 if file_name is None:
487 file_name = self.results_file_name_default
488 if threshold is None:
489 threshold = self.performance_threshold_default
490 # performance calculation
491 if self.phase1_updates_sent >= threshold:
492 totals1 = self.phase1_updates_sent
493 performance1 = int(self.phase1_updates_sent /
494 (self.phase1_stop_time - self.phase1_start_time))
498 if self.phase2_updates_sent >= threshold:
499 totals2 = self.phase2_updates_sent
500 performance2 = int(self.phase2_updates_sent /
501 (self.phase2_stop_time - self.phase2_start_time))
506 logger.info("#" * 10 + " Final results " + "#" * 10)
507 logger.info("Number of iterations: " + str(self.iteration))
508 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
509 str(self.phase1_updates_sent))
510 logger.info("The pre-fill phase duration: " +
511 str(self.phase1_stop_time - self.phase1_start_time) + "s")
512 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
513 str(self.phase2_updates_sent))
514 logger.info("The 2nd test phase duration: " +
515 str(self.phase2_stop_time - self.phase2_start_time) + "s")
516 logger.info("Threshold for performance reporting: " + str(threshold))
519 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
520 " route(s) per UPDATE")
521 if self.single_update_default:
522 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
523 "/-" + str(self.prefix_count_to_del_default) +
524 " routes per UPDATE")
526 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
527 "/-" + str(self.prefix_count_to_del_default) +
528 " routes in two UPDATEs")
529 # collecting capacity and performance results
532 if totals1 is not None:
533 totals[phase1_label] = totals1
534 performance[phase1_label] = performance1
535 if totals2 is not None:
536 totals[phase2_label] = totals2
537 performance[phase2_label] = performance2
538 self.write_results_to_file(totals, "totals-" + file_name)
539 self.write_results_to_file(performance, "performance-" + file_name)
541 def write_results_to_file(self, results, file_name):
542 """Writes results to the csv plot file consumable by Jenkins.
545 :param file_name: Name of the (csv) file to be created
551 f = open(file_name, "wt")
553 for key in sorted(results):
554 first_line += key + ", "
555 second_line += str(results[key]) + ", "
556 first_line = first_line[:-2]
557 second_line = second_line[:-2]
558 f.write(first_line + "\n")
559 f.write(second_line + "\n")
560 logger.info("Message generator performance results stored in " +
562 logger.info(" " + first_line)
563 logger.info(" " + second_line)
567 # Return pseudo-randomized (reproducible) index for selected range
568 def randomize_index(self, index, lowest=None, highest=None):
569 """Calculates pseudo-randomized index from selected range.
572 :param index: input index
573 :param lowest: the lowes index from the randomized area
574 :param highest: the highest index from the randomized area
576 :return: the (pseudo)randomized index
578 Created just as a fame for future generator enhancement.
580 # default values handling
581 # TODO optimize default values handling (use e.g. dicionary.update() approach)
583 lowest = self.randomize_lowest_default
585 highest = self.randomize_highest_default
587 if (index >= lowest) and (index <= highest):
588 # we are in the randomized range -> shuffle it inside
589 # the range (now just reverse the order)
590 new_index = highest - (index - lowest)
592 # we are out of the randomized range -> nothing to do
596 def get_ls_nlri_values(self, index):
597 """Generates LS-NLRI parameters.
598 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
601 :param index: index (iteration)
603 :return: dictionary of LS NLRI parameters and values
605 # generating list of LS NLRI parameters
606 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
607 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
608 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
609 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
610 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
611 ls_nlri_values = {"Identifier": identifier,
612 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
613 "TunnelID": tunnel_id, "LSPID": lsp_id,
614 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
615 return ls_nlri_values
617 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
618 prefix_len=None, prefix_count=None, randomize=None):
619 """Generates list of IP address prefixes.
622 :param slot_index: index of group of prefix addresses
623 :param slot_size: size of group of prefix addresses
624 in [number of included prefixes]
625 :param prefix_base: IP address of the first prefix
626 (slot_index = 0, prefix_index = 0)
627 :param prefix_len: length of the prefix in bites
628 (the same as size of netmask)
629 :param prefix_count: number of prefixes to be returned
630 from the specified slot
632 :return: list of generated IP address prefixes
634 # default values handling
635 # TODO optimize default values handling (use e.g. dicionary.update() approach)
636 if slot_size is None:
637 slot_size = self.slot_size_default
638 if prefix_base is None:
639 prefix_base = self.prefix_base_default
640 if prefix_len is None:
641 prefix_len = self.prefix_length_default
642 if prefix_count is None:
643 prefix_count = slot_size
644 if randomize is None:
645 randomize = self.randomize_updates_default
646 # generating list of prefixes
649 prefix_gap = 2 ** (32 - prefix_len)
650 for i in range(prefix_count):
651 prefix_index = slot_index * slot_size + i
653 prefix_index = self.randomize_index(prefix_index)
654 indexes.append(prefix_index)
655 prefixes.append(prefix_base + prefix_index * prefix_gap)
657 logger.debug(" Prefix slot index: " + str(slot_index))
658 logger.debug(" Prefix slot size: " + str(slot_size))
659 logger.debug(" Prefix count: " + str(prefix_count))
660 logger.debug(" Prefix indexes: " + str(indexes))
661 logger.debug(" Prefix list: " + str(prefixes))
664 def compose_update_message(self, prefix_count_to_add=None,
665 prefix_count_to_del=None):
666 """Composes an UPDATE message
669 :param prefix_count_to_add: # of prefixes to put into NLRI list
670 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
672 :return: encoded UPDATE message in HEX
674 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
675 lists or common message wich includes both prefix lists.
676 Updates global counters.
678 # default values handling
679 # TODO optimize default values handling (use e.g. dicionary.update() approach)
680 if prefix_count_to_add is None:
681 prefix_count_to_add = self.prefix_count_to_add_default
682 if prefix_count_to_del is None:
683 prefix_count_to_del = self.prefix_count_to_del_default
685 if self.log_info and not (self.iteration % 1000):
686 logger.info("Iteration: " + str(self.iteration) +
687 " - total remaining prefixes: " +
688 str(self.remaining_prefixes))
690 logger.debug("#" * 10 + " Iteration: " +
691 str(self.iteration) + " " + "#" * 10)
692 logger.debug("Remaining prefixes: " +
693 str(self.remaining_prefixes))
694 # scenario type & one-shot counter
695 straightforward_scenario = (self.remaining_prefixes >
696 self.remaining_prefixes_threshold)
697 if straightforward_scenario:
698 prefix_count_to_del = 0
700 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
701 if not self.phase1_start_time:
702 self.phase1_start_time = time.time()
705 logger.debug("--- COMBINED SCENARIO ---")
706 if not self.phase2_start_time:
707 self.phase2_start_time = time.time()
708 # tailor the number of prefixes if needed
709 prefix_count_to_add = (prefix_count_to_del +
710 min(prefix_count_to_add - prefix_count_to_del,
711 self.remaining_prefixes))
712 # prefix slots selection for insertion and withdrawal
713 slot_index_to_add = self.iteration
714 slot_index_to_del = slot_index_to_add - self.slot_gap_default
715 # getting lists of prefixes for insertion in this iteration
717 logger.debug("Prefixes to be inserted in this iteration:")
718 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
719 prefix_count=prefix_count_to_add)
720 # getting lists of prefixes for withdrawal in this iteration
722 logger.debug("Prefixes to be withdrawn in this iteration:")
723 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
724 prefix_count=prefix_count_to_del)
725 # generating the UPDATE mesage with LS-NLRI only
727 ls_nlri = self.get_ls_nlri_values(self.iteration)
728 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
731 # generating the UPDATE message with prefix lists
732 if self.single_update_default:
733 # Send prefixes to be introduced and withdrawn
734 # in one UPDATE message
735 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
736 nlri_prefixes=prefix_list_to_add)
738 # Send prefixes to be introduced and withdrawn
739 # in separate UPDATE messages (if needed)
740 msg_out = self.update_message(wr_prefixes=[],
741 nlri_prefixes=prefix_list_to_add)
742 if prefix_count_to_del:
743 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
745 # updating counters - who knows ... maybe I am last time here ;)
746 if straightforward_scenario:
747 self.phase1_stop_time = time.time()
748 self.phase1_updates_sent = self.updates_sent
750 self.phase2_stop_time = time.time()
751 self.phase2_updates_sent = (self.updates_sent -
752 self.phase1_updates_sent)
753 # updating totals for the next iteration
755 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
756 # returning the encoded message
759 # Section of message encoders
761 def open_message(self, version=None, my_autonomous_system=None,
762 hold_time=None, bgp_identifier=None):
763 """Generates an OPEN Message (rfc4271#section-4.2)
766 :param version: see the rfc4271#section-4.2
767 :param my_autonomous_system: see the rfc4271#section-4.2
768 :param hold_time: see the rfc4271#section-4.2
769 :param bgp_identifier: see the rfc4271#section-4.2
771 :return: encoded OPEN message in HEX
774 # default values handling
775 # TODO optimize default values handling (use e.g. dicionary.update() approach)
777 version = self.version_default
778 if my_autonomous_system is None:
779 my_autonomous_system = self.my_autonomous_system_default
780 if hold_time is None:
781 hold_time = self.hold_time_default
782 if bgp_identifier is None:
783 bgp_identifier = self.bgp_identifier_default
786 marker_hex = "\xFF" * 16
790 type_hex = struct.pack("B", type)
793 version_hex = struct.pack("B", version)
795 # my_autonomous_system
796 # AS_TRANS value, 23456 decadic.
797 my_autonomous_system_2_bytes = 23456
798 # AS number is mappable to 2 bytes
799 if my_autonomous_system < 65536:
800 my_autonomous_system_2_bytes = my_autonomous_system
801 my_autonomous_system_hex_2_bytes = struct.pack(">H",
802 my_autonomous_system)
805 hold_time_hex = struct.pack(">H", hold_time)
808 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
810 # Optional Parameters
811 optional_parameters_hex = ""
812 if self.rfc4760 or self.allf:
813 optional_parameter_hex = (
814 "\x02" # Param type ("Capability Ad")
815 "\x06" # Length (6 bytes)
816 "\x01" # Capability type (NLRI Unicast),
817 # see RFC 4760, secton 8
818 "\x04" # Capability value length
819 "\x00\x01" # AFI (Ipv4)
821 "\x01" # SAFI (Unicast)
823 optional_parameters_hex += optional_parameter_hex
825 if self.bgpls or self.allf:
826 optional_parameter_hex = (
827 "\x02" # Param type ("Capability Ad")
828 "\x06" # Length (6 bytes)
829 "\x01" # Capability type (NLRI Unicast),
830 # see RFC 4760, secton 8
831 "\x04" # Capability value length
832 "\x40\x04" # AFI (BGP-LS)
834 "\x47" # SAFI (BGP-LS)
836 optional_parameters_hex += optional_parameter_hex
838 if self.evpn or self.allf:
839 optional_parameter_hex = (
840 "\x02" # Param type ("Capability Ad")
841 "\x06" # Length (6 bytes)
842 "\x01" # Multiprotocol extetension capability,
843 "\x04" # Capability value length
844 "\x00\x19" # AFI (L2-VPN)
848 optional_parameters_hex += optional_parameter_hex
850 if self.mvpn or self.allf:
851 optional_parameter_hex = (
852 "\x02" # Param type ("Capability Ad")
853 "\x06" # Length (6 bytes)
854 "\x01" # Multiprotocol extetension capability,
855 "\x04" # Capability value length
856 "\x00\x01" # AFI (IPV4)
858 "\x05" # SAFI (MCAST-VPN)
860 optional_parameters_hex += optional_parameter_hex
861 optional_parameter_hex = (
862 "\x02" # Param type ("Capability Ad")
863 "\x06" # Length (6 bytes)
864 "\x01" # Multiprotocol extetension capability,
865 "\x04" # Capability value length
866 "\x00\x02" # AFI (IPV6)
868 "\x05" # SAFI (MCAST-VPN)
870 optional_parameters_hex += optional_parameter_hex
872 if self.l3vpn_mcast or self.allf:
873 optional_parameter_hex = (
874 "\x02" # Param type ("Capability Ad")
875 "\x06" # Length (6 bytes)
876 "\x01" # Multiprotocol extetension capability,
877 "\x04" # Capability value length
878 "\x00\x01" # AFI (IPV4)
880 "\x81" # SAFI (L3VPN-MCAST)
882 optional_parameters_hex += optional_parameter_hex
883 optional_parameter_hex = (
884 "\x02" # Param type ("Capability Ad")
885 "\x06" # Length (6 bytes)
886 "\x01" # Multiprotocol extetension capability,
887 "\x04" # Capability value length
888 "\x00\x02" # AFI (IPV6)
890 "\x81" # SAFI (L3VPN-MCAST)
892 optional_parameters_hex += optional_parameter_hex
894 if self.l3vpn or self.allf:
895 optional_parameter_hex = (
896 "\x02" # Param type ("Capability Ad")
897 "\x06" # Length (6 bytes)
898 "\x01" # Multiprotocol extetension capability,
899 "\x04" # Capability value length
900 "\x00\x01" # AFI (IPV4)
902 "\x80" # SAFI (L3VPN-UNICAST)
904 optional_parameters_hex += optional_parameter_hex
905 optional_parameter_hex = (
906 "\x02" # Param type ("Capability Ad")
907 "\x06" # Length (6 bytes)
908 "\x01" # Multiprotocol extetension capability,
909 "\x04" # Capability value length
910 "\x00\x02" # AFI (IPV6)
912 "\x80" # SAFI (L3VPN-UNICAST)
914 optional_parameters_hex += optional_parameter_hex
916 if self.rt_constrain or self.allf:
917 optional_parameter_hex = (
918 "\x02" # Param type ("Capability Ad")
919 "\x06" # Length (6 bytes)
920 "\x01" # Multiprotocol extetension capability,
921 "\x04" # Capability value length
922 "\x00\x01" # AFI (IPV4)
924 "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
926 optional_parameters_hex += optional_parameter_hex
928 optional_parameter_hex = (
929 "\x02" # Param type ("Capability Ad")
930 "\x06" # Length (6 bytes)
931 "\x41" # "32 bit AS Numbers Support"
932 # (see RFC 6793, section 3)
933 "\x04" # Capability value length
935 optional_parameter_hex += (
936 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
938 optional_parameters_hex += optional_parameter_hex
940 # Optional Parameters Length
941 optional_parameters_length = len(optional_parameters_hex)
942 optional_parameters_length_hex = struct.pack("B",
943 optional_parameters_length)
945 # Length (big-endian)
947 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
948 len(my_autonomous_system_hex_2_bytes) +
949 len(hold_time_hex) + len(bgp_identifier_hex) +
950 len(optional_parameters_length_hex) +
951 len(optional_parameters_hex)
953 length_hex = struct.pack(">H", length)
961 my_autonomous_system_hex_2_bytes +
964 optional_parameters_length_hex +
965 optional_parameters_hex
969 logger.debug("OPEN message encoding")
970 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
971 logger.debug(" Length=" + str(length) + " (0x" +
972 binascii.hexlify(length_hex) + ")")
973 logger.debug(" Type=" + str(type) + " (0x" +
974 binascii.hexlify(type_hex) + ")")
975 logger.debug(" Version=" + str(version) + " (0x" +
976 binascii.hexlify(version_hex) + ")")
977 logger.debug(" My Autonomous System=" +
978 str(my_autonomous_system_2_bytes) + " (0x" +
979 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
981 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
982 binascii.hexlify(hold_time_hex) + ")")
983 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
984 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
985 logger.debug(" Optional Parameters Length=" +
986 str(optional_parameters_length) + " (0x" +
987 binascii.hexlify(optional_parameters_length_hex) +
989 logger.debug(" Optional Parameters=0x" +
990 binascii.hexlify(optional_parameters_hex))
991 logger.debug("OPEN message encoded: 0x%s",
992 binascii.b2a_hex(message_hex))
996 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
997 wr_prefix_length=None, nlri_prefix_length=None,
998 my_autonomous_system=None, next_hop=None,
999 originator_id=None, cluster_list_item=None,
1000 end_of_rib=False, **ls_nlri_params):
1001 """Generates an UPDATE Message (rfc4271#section-4.3)
1004 :param wr_prefixes: see the rfc4271#section-4.3
1005 :param nlri_prefixes: see the rfc4271#section-4.3
1006 :param wr_prefix_length: see the rfc4271#section-4.3
1007 :param nlri_prefix_length: see the rfc4271#section-4.3
1008 :param my_autonomous_system: see the rfc4271#section-4.3
1009 :param next_hop: see the rfc4271#section-4.3
1011 :return: encoded UPDATE message in HEX
1014 # default values handling
1015 # TODO optimize default values handling (use e.g. dicionary.update() approach)
1016 if wr_prefixes is None:
1017 wr_prefixes = self.wr_prefixes_default
1018 if nlri_prefixes is None:
1019 nlri_prefixes = self.nlri_prefixes_default
1020 if wr_prefix_length is None:
1021 wr_prefix_length = self.prefix_length_default
1022 if nlri_prefix_length is None:
1023 nlri_prefix_length = self.prefix_length_default
1024 if my_autonomous_system is None:
1025 my_autonomous_system = self.my_autonomous_system_default
1026 if next_hop is None:
1027 next_hop = self.next_hop_default
1028 if originator_id is None:
1029 originator_id = self.originator_id_default
1030 if cluster_list_item is None:
1031 cluster_list_item = self.cluster_list_item_default
1032 ls_nlri = self.ls_nlri_default.copy()
1033 ls_nlri.update(ls_nlri_params)
1036 marker_hex = "\xFF" * 16
1040 type_hex = struct.pack("B", type)
1043 withdrawn_routes_hex = ""
1045 bytes = ((wr_prefix_length - 1) / 8) + 1
1046 for prefix in wr_prefixes:
1047 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1048 struct.pack(">I", int(prefix))[:bytes])
1049 withdrawn_routes_hex += withdrawn_route_hex
1051 # Withdrawn Routes Length
1052 withdrawn_routes_length = len(withdrawn_routes_hex)
1053 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1055 # TODO: to replace hardcoded string by encoding?
1057 path_attributes_hex = ""
1058 if not self.skipattr:
1059 path_attributes_hex += (
1060 "\x40" # Flags ("Well-Known")
1061 "\x01" # Type (ORIGIN)
1063 "\x00" # Origin: IGP
1065 path_attributes_hex += (
1066 "\x40" # Flags ("Well-Known")
1067 "\x02" # Type (AS_PATH)
1069 "\x02" # AS segment type (AS_SEQUENCE)
1070 "\x01" # AS segment length (1)
1072 my_as_hex = struct.pack(">I", my_autonomous_system)
1073 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1074 path_attributes_hex += (
1075 "\x40" # Flags ("Well-Known")
1076 "\x05" # Type (LOCAL_PREF)
1078 "\x00\x00\x00\x64" # (100)
1080 if nlri_prefixes != []:
1081 path_attributes_hex += (
1082 "\x40" # Flags ("Well-Known")
1083 "\x03" # Type (NEXT_HOP)
1086 next_hop_hex = struct.pack(">I", int(next_hop))
1087 path_attributes_hex += (
1088 next_hop_hex # IP address of the next hop (4 bytes)
1090 if originator_id is not None:
1091 path_attributes_hex += (
1092 "\x80" # Flags ("Optional, non-transitive")
1093 "\x09" # Type (ORIGINATOR_ID)
1095 ) # ORIGINATOR_ID (4 bytes)
1096 path_attributes_hex += struct.pack(">I", int(originator_id))
1097 if cluster_list_item is not None:
1098 path_attributes_hex += (
1099 "\x80" # Flags ("Optional, non-transitive")
1100 "\x0a" # Type (CLUSTER_LIST)
1102 ) # one CLUSTER_LIST item (4 bytes)
1103 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1105 if self.bgpls and not end_of_rib:
1106 path_attributes_hex += (
1107 "\x80" # Flags ("Optional, non-transitive")
1108 "\x0e" # Type (MP_REACH_NLRI)
1109 "\x22" # Length (34)
1110 "\x40\x04" # AFI (BGP-LS)
1111 "\x47" # SAFI (BGP-LS)
1112 "\x04" # Next Hop Length (4)
1114 path_attributes_hex += struct.pack(">I", int(next_hop))
1115 path_attributes_hex += "\x00" # Reserved
1116 path_attributes_hex += (
1117 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1118 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1119 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1121 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1122 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1123 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1124 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1125 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1127 # Total Path Attributes Length
1128 total_path_attributes_length = len(path_attributes_hex)
1129 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1131 # Network Layer Reachability Information
1134 bytes = ((nlri_prefix_length - 1) / 8) + 1
1135 for prefix in nlri_prefixes:
1136 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1137 struct.pack(">I", int(prefix))[:bytes])
1138 nlri_hex += nlri_prefix_hex
1140 # Length (big-endian)
1142 len(marker_hex) + 2 + len(type_hex) +
1143 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1144 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1146 length_hex = struct.pack(">H", length)
1153 withdrawn_routes_length_hex +
1154 withdrawn_routes_hex +
1155 total_path_attributes_length_hex +
1156 path_attributes_hex +
1161 logger.debug("UPDATE message encoding")
1162 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1163 logger.debug(" Length=" + str(length) + " (0x" +
1164 binascii.hexlify(length_hex) + ")")
1165 logger.debug(" Type=" + str(type) + " (0x" +
1166 binascii.hexlify(type_hex) + ")")
1167 logger.debug(" withdrawn_routes_length=" +
1168 str(withdrawn_routes_length) + " (0x" +
1169 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1170 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1171 str(wr_prefix_length) + " (0x" +
1172 binascii.hexlify(withdrawn_routes_hex) + ")")
1173 if total_path_attributes_length:
1174 logger.debug(" Total Path Attributes Length=" +
1175 str(total_path_attributes_length) + " (0x" +
1176 binascii.hexlify(total_path_attributes_length_hex) + ")")
1177 logger.debug(" Path Attributes=" + "(0x" +
1178 binascii.hexlify(path_attributes_hex) + ")")
1179 logger.debug(" Origin=IGP")
1180 logger.debug(" AS path=" + str(my_autonomous_system))
1181 logger.debug(" Next hop=" + str(next_hop))
1182 if originator_id is not None:
1183 logger.debug(" Originator id=" + str(originator_id))
1184 if cluster_list_item is not None:
1185 logger.debug(" Cluster list=" + str(cluster_list_item))
1187 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1188 logger.debug(" Network Layer Reachability Information=" +
1189 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1190 " (0x" + binascii.hexlify(nlri_hex) + ")")
1191 logger.debug("UPDATE message encoded: 0x" +
1192 binascii.b2a_hex(message_hex))
1195 self.updates_sent += 1
1196 # returning encoded message
1199 def notification_message(self, error_code, error_subcode, data_hex=""):
1200 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1203 :param error_code: see the rfc4271#section-4.5
1204 :param error_subcode: see the rfc4271#section-4.5
1205 :param data_hex: see the rfc4271#section-4.5
1207 :return: encoded NOTIFICATION message in HEX
1211 marker_hex = "\xFF" * 16
1215 type_hex = struct.pack("B", type)
1218 error_code_hex = struct.pack("B", error_code)
1221 error_subcode_hex = struct.pack("B", error_subcode)
1223 # Length (big-endian)
1224 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1225 len(error_subcode_hex) + len(data_hex))
1226 length_hex = struct.pack(">H", length)
1228 # NOTIFICATION Message
1239 logger.debug("NOTIFICATION message encoding")
1240 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1241 logger.debug(" Length=" + str(length) + " (0x" +
1242 binascii.hexlify(length_hex) + ")")
1243 logger.debug(" Type=" + str(type) + " (0x" +
1244 binascii.hexlify(type_hex) + ")")
1245 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1246 binascii.hexlify(error_code_hex) + ")")
1247 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1248 binascii.hexlify(error_subcode_hex) + ")")
1249 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1250 logger.debug("NOTIFICATION message encoded: 0x%s",
1251 binascii.b2a_hex(message_hex))
1255 def keepalive_message(self):
1256 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1259 :return: encoded KEEP ALIVE message in HEX
1263 marker_hex = "\xFF" * 16
1267 type_hex = struct.pack("B", type)
1269 # Length (big-endian)
1270 length = len(marker_hex) + 2 + len(type_hex)
1271 length_hex = struct.pack(">H", length)
1273 # KEEP ALIVE Message
1281 logger.debug("KEEP ALIVE message encoding")
1282 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1283 logger.debug(" Length=" + str(length) + " (0x" +
1284 binascii.hexlify(length_hex) + ")")
1285 logger.debug(" Type=" + str(type) + " (0x" +
1286 binascii.hexlify(type_hex) + ")")
1287 logger.debug("KEEP ALIVE message encoded: 0x%s",
1288 binascii.b2a_hex(message_hex))
1293 class TimeTracker(object):
1294 """Class for tracking timers, both for my keepalives and
1298 def __init__(self, msg_in):
1299 """Initialisation. based on defaults and OPEN message from peer.
1302 msg_in: the OPEN message received from peer.
1304 # Note: Relative time is always named timedelta, to stress that
1305 # the (non-delta) time is absolute.
1306 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1307 # Upper bound for being stuck in the same state, we should
1308 # at least report something before continuing.
1309 # Negotiate the hold timer by taking the smaller
1310 # of the 2 values (mine and the peer's).
1311 hold_timedelta = 180 # Not an attribute of self yet.
1312 # TODO: Make the default value configurable,
1313 # default value could mirror what peer said.
1314 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1315 if hold_timedelta > peer_hold_timedelta:
1316 hold_timedelta = peer_hold_timedelta
1317 if hold_timedelta != 0 and hold_timedelta < 3:
1318 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1319 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1320 self.hold_timedelta = hold_timedelta
1321 # If we do not hear from peer this long, we assume it has died.
1322 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1323 # Upper limit for duration between messages, to avoid being
1324 # declared to be dead.
1325 # The same as calling snapshot(), but also declares a field.
1326 self.snapshot_time = time.time()
1327 # Sometimes we need to store time. This is where to get
1328 # the value from afterwards. Time_keepalive may be too strict.
1329 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1330 # At this time point, peer will be declared dead.
1331 self.my_keepalive_time = None # to be set later
1332 # At this point, we should be sending keepalive message.
1335 """Store current time in instance data to use later."""
1336 # Read as time before something interesting was called.
1337 self.snapshot_time = time.time()
1339 def reset_peer_hold_time(self):
1340 """Move hold time to future as peer has just proven it still lives."""
1341 self.peer_hold_time = time.time() + self.hold_timedelta
1343 # Some methods could rely on self.snapshot_time, but it is better
1344 # to require user to provide it explicitly.
1345 def reset_my_keepalive_time(self, keepalive_time):
1346 """Calculate and set the next my KEEP ALIVE timeout time
1349 :keepalive_time: the initial value of the KEEP ALIVE timer
1351 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1353 def is_time_for_my_keepalive(self):
1354 """Check for my KEEP ALIVE timeout occurence"""
1355 if self.hold_timedelta == 0:
1357 return self.snapshot_time >= self.my_keepalive_time
1359 def get_next_event_time(self):
1360 """Set the time of the next expected or to be sent KEEP ALIVE"""
1361 if self.hold_timedelta == 0:
1362 return self.snapshot_time + 86400
1363 return min(self.my_keepalive_time, self.peer_hold_time)
1365 def check_peer_hold_time(self, snapshot_time):
1366 """Raise error if nothing was read from peer until specified time."""
1367 # Hold time = 0 means keepalive checking off.
1368 if self.hold_timedelta != 0:
1369 # time.time() may be too strict
1370 if snapshot_time > self.peer_hold_time:
1371 logger.error("Peer has overstepped the hold timer.")
1372 raise RuntimeError("Peer has overstepped the hold timer.")
1373 # TODO: Include hold_timedelta?
1374 # TODO: Add notification sending (attempt). That means
1375 # move to write tracker.
1378 class ReadTracker(object):
1379 """Class for tracking read of mesages chunk by chunk and
1383 def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1384 l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1386 """The reader initialisation.
1389 bgp_socket: socket to be used for sending
1390 timer: timer to be used for scheduling
1391 storage: thread safe dict
1392 evpn: flag that evpn functionality is tested
1393 mvpn: flag that mvpn functionality is tested
1394 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1395 l3vpn: flag that l3vpn unicast functionality is tested
1396 rt_constrain: flag that rt-constrain functionality is tested
1397 allf: flag for all family testing.
1399 # References to outside objects.
1400 self.socket = bgp_socket
1402 # BGP marker length plus length field length.
1403 self.header_length = 18
1404 # TODO: make it class (constant) attribute
1405 # Computation of where next chunk ends depends on whether
1406 # we are beyond length field.
1407 self.reading_header = True
1408 # Countdown towards next size computation.
1409 self.bytes_to_read = self.header_length
1410 # Incremental buffer for message under read.
1412 # Initialising counters
1413 self.updates_received = 0
1414 self.prefixes_introduced = 0
1415 self.prefixes_withdrawn = 0
1416 self.rx_idle_time = 0
1417 self.rx_activity_detected = True
1418 self.storage = storage
1421 self.l3vpn_mcast = l3vpn_mcast
1423 self.rt_constrain = rt_constrain
1425 self.wfr = wait_for_read
1427 def read_message_chunk(self):
1428 """Read up to one message
1431 Currently it does not return anything.
1433 # TODO: We could return the whole message, currently not needed.
1434 # We assume the socket is readable.
1435 chunk_message = self.socket.recv(self.bytes_to_read)
1436 self.msg_in += chunk_message
1437 self.bytes_to_read -= len(chunk_message)
1438 # TODO: bytes_to_read < 0 is not possible, right?
1439 if not self.bytes_to_read:
1440 # Finished reading a logical block.
1441 if self.reading_header:
1442 # The logical block was a BGP header.
1443 # Now we know the size of the message.
1444 self.reading_header = False
1445 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1447 else: # We have finished reading the body of the message.
1448 # Peer has just proven it is still alive.
1449 self.timer.reset_peer_hold_time()
1450 # TODO: Do we want to count received messages?
1451 # This version ignores the received message.
1452 # TODO: Should we do validation and exit on anything
1453 # besides update or keepalive?
1454 # Prepare state for reading another message.
1455 message_type_hex = self.msg_in[self.header_length]
1456 if message_type_hex == "\x01":
1457 logger.info("OPEN message received: 0x%s",
1458 binascii.b2a_hex(self.msg_in))
1459 elif message_type_hex == "\x02":
1460 logger.debug("UPDATE message received: 0x%s",
1461 binascii.b2a_hex(self.msg_in))
1462 self.decode_update_message(self.msg_in)
1463 elif message_type_hex == "\x03":
1464 logger.info("NOTIFICATION message received: 0x%s",
1465 binascii.b2a_hex(self.msg_in))
1466 elif message_type_hex == "\x04":
1467 logger.info("KEEP ALIVE message received: 0x%s",
1468 binascii.b2a_hex(self.msg_in))
1470 logger.warning("Unexpected message received: 0x%s",
1471 binascii.b2a_hex(self.msg_in))
1473 self.reading_header = True
1474 self.bytes_to_read = self.header_length
1475 # We should not act upon peer_hold_time if we are reading
1476 # something right now.
1479 def decode_path_attributes(self, path_attributes_hex):
1480 """Decode the Path Attributes field (rfc4271#section-4.3)
1483 :path_attributes: path_attributes field to be decoded in hex
1487 hex_to_decode = path_attributes_hex
1489 while len(hex_to_decode):
1490 attr_flags_hex = hex_to_decode[0]
1491 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1492 # attr_optional_bit = attr_flags & 128
1493 # attr_transitive_bit = attr_flags & 64
1494 # attr_partial_bit = attr_flags & 32
1495 attr_extended_length_bit = attr_flags & 16
1497 attr_type_code_hex = hex_to_decode[1]
1498 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1500 if attr_extended_length_bit:
1501 attr_length_hex = hex_to_decode[2:4]
1502 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1503 attr_value_hex = hex_to_decode[4:4 + attr_length]
1504 hex_to_decode = hex_to_decode[4 + attr_length:]
1506 attr_length_hex = hex_to_decode[2]
1507 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1508 attr_value_hex = hex_to_decode[3:3 + attr_length]
1509 hex_to_decode = hex_to_decode[3 + attr_length:]
1511 if attr_type_code == 1:
1512 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1513 binascii.b2a_hex(attr_flags_hex))
1514 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1515 elif attr_type_code == 2:
1516 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1517 binascii.b2a_hex(attr_flags_hex))
1518 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1519 elif attr_type_code == 3:
1520 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1521 binascii.b2a_hex(attr_flags_hex))
1522 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1523 elif attr_type_code == 4:
1524 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1525 binascii.b2a_hex(attr_flags_hex))
1526 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1527 elif attr_type_code == 5:
1528 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1529 binascii.b2a_hex(attr_flags_hex))
1530 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1531 elif attr_type_code == 6:
1532 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1533 binascii.b2a_hex(attr_flags_hex))
1534 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1535 elif attr_type_code == 7:
1536 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1537 binascii.b2a_hex(attr_flags_hex))
1538 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1539 elif attr_type_code == 9: # rfc4456#section-8
1540 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1541 binascii.b2a_hex(attr_flags_hex))
1542 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1543 elif attr_type_code == 10: # rfc4456#section-8
1544 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1545 binascii.b2a_hex(attr_flags_hex))
1546 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1547 elif attr_type_code == 14: # rfc4760#section-3
1548 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1549 binascii.b2a_hex(attr_flags_hex))
1550 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1551 address_family_identifier_hex = attr_value_hex[0:2]
1552 logger.debug(" Address Family Identifier=0x%s",
1553 binascii.b2a_hex(address_family_identifier_hex))
1554 subsequent_address_family_identifier_hex = attr_value_hex[2]
1555 logger.debug(" Subsequent Address Family Identifier=0x%s",
1556 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1557 next_hop_netaddr_len_hex = attr_value_hex[3]
1558 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1559 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1560 next_hop_netaddr_len,
1561 binascii.b2a_hex(next_hop_netaddr_len_hex))
1562 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1563 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1564 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1565 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1566 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1567 logger.debug(" Reserved=0x%s",
1568 binascii.b2a_hex(reserved_hex))
1569 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1570 logger.debug(" Network Layer Reachability Information=0x%s",
1571 binascii.b2a_hex(nlri_hex))
1572 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1573 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1574 for prefix in nlri_prefix_list:
1575 logger.debug(" nlri_prefix_received: %s", prefix)
1576 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1577 elif attr_type_code == 15: # rfc4760#section-4
1578 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1579 binascii.b2a_hex(attr_flags_hex))
1580 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1581 address_family_identifier_hex = attr_value_hex[0:2]
1582 logger.debug(" Address Family Identifier=0x%s",
1583 binascii.b2a_hex(address_family_identifier_hex))
1584 subsequent_address_family_identifier_hex = attr_value_hex[2]
1585 logger.debug(" Subsequent Address Family Identifier=0x%s",
1586 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1587 wd_hex = attr_value_hex[3:]
1588 logger.debug(" Withdrawn Routes=0x%s",
1589 binascii.b2a_hex(wd_hex))
1590 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1591 logger.debug(" Withdrawn routes prefix list: %s",
1593 for prefix in wdr_prefix_list:
1594 logger.debug(" withdrawn_prefix_received: %s", prefix)
1595 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1597 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1598 binascii.b2a_hex(attr_flags_hex))
1599 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1602 def decode_update_message(self, msg):
1603 """Decode an UPDATE message (rfc4271#section-4.3)
1606 :msg: message to be decoded in hex
1610 logger.debug("Decoding update message:")
1611 # message header - marker
1612 marker_hex = msg[:16]
1613 logger.debug("Message header marker: 0x%s",
1614 binascii.b2a_hex(marker_hex))
1615 # message header - message length
1616 msg_length_hex = msg[16:18]
1617 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1618 logger.debug("Message lenght: 0x%s (%s)",
1619 binascii.b2a_hex(msg_length_hex), msg_length)
1620 # message header - message type
1621 msg_type_hex = msg[18:19]
1622 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1624 with self.storage as stor:
1625 # this will replace the previously stored message
1626 stor['update'] = binascii.hexlify(msg)
1628 logger.debug("Evpn {}".format(self.evpn))
1630 logger.debug("Skipping update decoding due to evpn data expected")
1633 logger.debug("Mvpn {}".format(self.mvpn))
1635 logger.debug("Skipping update decoding due to mvpn data expected")
1638 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1639 if self.l3vpn_mcast:
1640 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1643 logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1644 if self.l3vpn_mcast:
1645 logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1648 logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1649 if self.rt_constrain:
1650 logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1653 logger.debug("Allf {}".format(self.allf))
1655 logger.debug("Skipping update decoding")
1659 logger.debug("Message type: 0x%s (update)",
1660 binascii.b2a_hex(msg_type_hex))
1661 # withdrawn routes length
1662 wdr_length_hex = msg[19:21]
1663 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1664 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1665 binascii.b2a_hex(wdr_length_hex), wdr_length)
1667 wdr_hex = msg[21:21 + wdr_length]
1668 logger.debug("Withdrawn routes: 0x%s",
1669 binascii.b2a_hex(wdr_hex))
1670 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1671 logger.debug("Withdrawn routes prefix list: %s",
1673 for prefix in wdr_prefix_list:
1674 logger.debug("withdrawn_prefix_received: %s", prefix)
1675 # total path attribute length
1676 total_pa_length_offset = 21 + wdr_length
1677 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1678 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1679 logger.debug("Total path attribute lenght: 0x%s (%s)",
1680 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1682 pa_offset = total_pa_length_offset + 2
1683 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1684 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1685 self.decode_path_attributes(pa_hex)
1686 # network layer reachability information length
1687 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1688 logger.debug("Calculated NLRI length: %s", nlri_length)
1689 # network layer reachability information
1690 nlri_offset = pa_offset + total_pa_length
1691 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1692 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1693 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1694 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1695 for prefix in nlri_prefix_list:
1696 logger.debug("nlri_prefix_received: %s", prefix)
1698 self.updates_received += 1
1699 self.prefixes_introduced += len(nlri_prefix_list)
1700 self.prefixes_withdrawn += len(wdr_prefix_list)
1702 logger.error("Unexpeced message type 0x%s in 0x%s",
1703 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1705 def wait_for_read(self):
1706 """Read message until timeout (next expected event).
1709 Used when no more updates has to be sent to avoid busy-wait.
1710 Currently it does not return anything.
1712 # Compute time to the first predictable state change
1713 event_time = self.timer.get_next_event_time()
1714 # snapshot_time would be imprecise
1715 wait_timedelta = min(event_time - time.time(), self.wfr)
1716 if wait_timedelta < 0:
1717 # The program got around to waiting to an event in "very near
1718 # future" so late that it became a "past" event, thus tell
1719 # "select" to not wait at all. Passing negative timedelta to
1720 # select() would lead to either waiting forever (for -1) or
1721 # select.error("Invalid parameter") (for everything else).
1723 # And wait for event or something to read.
1725 if not self.rx_activity_detected or not (self.updates_received % 100):
1726 # right time to write statistics to the log (not for every update and
1727 # not too frequently to avoid having large log files)
1728 logger.info("total_received_update_message_counter: %s",
1729 self.updates_received)
1730 logger.info("total_received_nlri_prefix_counter: %s",
1731 self.prefixes_introduced)
1732 logger.info("total_received_withdrawn_prefix_counter: %s",
1733 self.prefixes_withdrawn)
1735 start_time = time.time()
1736 select.select([self.socket], [], [self.socket], wait_timedelta)
1737 timedelta = time.time() - start_time
1738 self.rx_idle_time += timedelta
1739 self.rx_activity_detected = timedelta < 1
1741 if not self.rx_activity_detected or not (self.updates_received % 100):
1742 # right time to write statistics to the log (not for every update and
1743 # not too frequently to avoid having large log files)
1744 logger.info("... idle for %.3fs", timedelta)
1745 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1749 class WriteTracker(object):
1750 """Class tracking enqueueing messages and sending chunks of them."""
1752 def __init__(self, bgp_socket, generator, timer):
1753 """The writter initialisation.
1756 bgp_socket: socket to be used for sending
1757 generator: generator to be used for message generation
1758 timer: timer to be used for scheduling
1760 # References to outside objects,
1761 self.socket = bgp_socket
1762 self.generator = generator
1764 # Really new fields.
1765 # TODO: Would attribute docstrings add anything substantial?
1766 self.sending_message = False
1767 self.bytes_to_send = 0
1770 def enqueue_message_for_sending(self, message):
1771 """Enqueue message and change state.
1774 message: message to be enqueued into the msg_out buffer
1776 self.msg_out += message
1777 self.bytes_to_send += len(message)
1778 self.sending_message = True
1780 def send_message_chunk_is_whole(self):
1781 """Send enqueued data from msg_out buffer
1784 :return: true if no remaining data to send
1786 # We assume there is a msg_out to send and socket is writable.
1787 # print "going to send", repr(self.msg_out)
1788 self.timer.snapshot()
1789 bytes_sent = self.socket.send(self.msg_out)
1790 # Forget the part of message that was sent.
1791 self.msg_out = self.msg_out[bytes_sent:]
1792 self.bytes_to_send -= bytes_sent
1793 if not self.bytes_to_send:
1794 # TODO: Is it possible to hit negative bytes_to_send?
1795 self.sending_message = False
1796 # We should have reset hold timer on peer side.
1797 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1798 # The possible reason for not prioritizing reads is gone.
1803 class StateTracker(object):
1804 """Main loop has state so complex it warrants this separate class."""
1806 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1807 """The state tracker initialisation.
1810 bgp_socket: socket to be used for sending / receiving
1811 generator: generator to be used for message generation
1812 timer: timer to be used for scheduling
1813 inqueue: user initiated messages queue
1814 storage: thread safe dict to store data for the rpc server
1815 cliargs: cli args from the user
1817 # References to outside objects.
1818 self.socket = bgp_socket
1819 self.generator = generator
1822 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1823 l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1824 rt_constrain=cliargs.rt_constrain, wait_for_read=cliargs.wfr)
1825 self.writer = WriteTracker(bgp_socket, generator, timer)
1826 # Prioritization state.
1827 self.prioritize_writing = False
1828 # In general, we prioritize reading over writing. But in order
1829 # not to get blocked by neverending reads, we should
1830 # check whether we are not risking running out of holdtime.
1831 # So in some situations, this field is set to True to attempt
1832 # finishing sending a message, after which this field resets
1834 # TODO: Alternative is to switch fairly between reading and
1835 # writing (called round robin from now on).
1836 # Message counting is done in generator.
1837 self.inqueue = inqueue
1839 def perform_one_loop_iteration(self):
1840 """ The main loop iteration
1843 Calculates priority, resolves all conditions, calls
1844 appropriate method and returns to caller to repeat.
1846 self.timer.snapshot()
1847 if not self.prioritize_writing:
1848 if self.timer.is_time_for_my_keepalive():
1849 if not self.writer.sending_message:
1850 # We need to schedule a keepalive ASAP.
1851 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1852 logger.info("KEEP ALIVE is sent.")
1853 # We are sending a message now, so let's prioritize it.
1854 self.prioritize_writing = True
1857 msg = self.inqueue.get_nowait()
1858 logger.info("Received message: {}".format(msg))
1859 msgbin = binascii.unhexlify(msg)
1860 self.writer.enqueue_message_for_sending(msgbin)
1863 # Now we know what our priorities are, we have to check
1864 # which actions are available.
1865 # socket.socket() returns three lists,
1866 # we store them to list of lists.
1867 list_list = select.select([self.socket], [self.socket], [self.socket],
1868 self.timer.report_timedelta)
1869 read_list, write_list, except_list = list_list
1870 # Lists are unpacked, each is either [] or [self.socket],
1871 # so we will test them as boolean.
1873 logger.error("Exceptional state on the socket.")
1874 raise RuntimeError("Exceptional state on socket", self.socket)
1875 # We will do either read or write.
1876 if not (self.prioritize_writing and write_list):
1877 # Either we have no reason to rush writes,
1878 # or the socket is not writable.
1879 # We are focusing on reading here.
1880 if read_list: # there is something to read indeed
1881 # In this case we want to read chunk of message
1882 # and repeat the select,
1883 self.reader.read_message_chunk()
1885 # We were focusing on reading, but nothing to read was there.
1886 # Good time to check peer for hold timer.
1887 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1888 # Quiet on the read front, we can have attempt to write.
1890 # Either we really want to reset peer's view of our hold
1891 # timer, or there was nothing to read.
1892 # Were we in the middle of sending a message?
1893 if self.writer.sending_message:
1894 # Was it the end of a message?
1895 whole = self.writer.send_message_chunk_is_whole()
1896 # We were pressed to send something and we did it.
1897 if self.prioritize_writing and whole:
1898 # We prioritize reading again.
1899 self.prioritize_writing = False
1901 # Finally to check if still update messages to be generated.
1902 if self.generator.remaining_prefixes:
1903 msg_out = self.generator.compose_update_message()
1904 if not self.generator.remaining_prefixes:
1905 # We have just finished update generation,
1906 # end-of-rib is due.
1907 logger.info("All update messages generated.")
1908 logger.info("Storing performance results.")
1909 self.generator.store_results()
1910 logger.info("Finally an END-OF-RIB is sent.")
1911 msg_out += self.generator.update_message(wr_prefixes=[],
1914 self.writer.enqueue_message_for_sending(msg_out)
1915 # Attempt for real sending to be done in next iteration.
1917 # Nothing to write anymore.
1918 # To avoid busy loop, we do idle waiting here.
1919 self.reader.wait_for_read()
1921 # We can neither read nor write.
1922 logger.warning("Input and output both blocked for " +
1923 str(self.timer.report_timedelta) + " seconds.")
1924 # FIXME: Are we sure select has been really waiting
1929 def create_logger(loglevel, logfile):
1930 """Create logger object
1933 :loglevel: log level
1934 :logfile: log file name
1936 :return: logger object
1938 logger = logging.getLogger("logger")
1939 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1940 console_handler = logging.StreamHandler()
1941 file_handler = logging.FileHandler(logfile, mode="w")
1942 console_handler.setFormatter(log_formatter)
1943 file_handler.setFormatter(log_formatter)
1944 logger.addHandler(console_handler)
1945 logger.addHandler(file_handler)
1946 logger.setLevel(loglevel)
1950 def job(arguments, inqueue, storage):
1951 """One time initialisation and iterations looping.
1953 Establish BGP connection and run iterations.
1956 :arguments: Command line arguments
1957 :inqueue: Data to be sent from play.py
1958 :storage: Shared dict for rpc server
1962 bgp_socket = establish_connection(arguments)
1963 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1964 # Receive open message before sending anything.
1965 # FIXME: Add parameter to send default open message first,
1966 # to work with "you first" peers.
1967 msg_in = read_open_message(bgp_socket)
1968 timer = TimeTracker(msg_in)
1969 generator = MessageGenerator(arguments)
1970 msg_out = generator.open_message()
1971 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1972 # Send our open message to the peer.
1973 bgp_socket.send(msg_out)
1974 # Wait for confirming keepalive.
1975 # TODO: Surely in just one packet?
1976 # Using exact keepalive length to not to see possible updates.
1977 msg_in = bgp_socket.recv(19)
1978 if msg_in != generator.keepalive_message():
1979 error_msg = "Open not confirmed by keepalive, instead got"
1980 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1981 raise MessageError(error_msg, msg_in)
1982 timer.reset_peer_hold_time()
1983 # Send the keepalive to indicate the connection is accepted.
1984 timer.snapshot() # Remember this time.
1985 msg_out = generator.keepalive_message()
1986 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1987 bgp_socket.send(msg_out)
1988 # Use the remembered time.
1989 timer.reset_my_keepalive_time(timer.snapshot_time)
1990 # End of initial handshake phase.
1991 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1992 while True: # main reactor loop
1993 state.perform_one_loop_iteration()
1997 '''Handler for SimpleXMLRPCServer'''
1999 def __init__(self, sendqueue, storage):
2003 :sendqueue: queue for data to be sent towards odl
2004 :storage: thread safe dict
2006 self.queue = sendqueue
2007 self.storage = storage
2009 def send(self, text):
2013 :text: hes string of the data to be sent
2015 self.queue.put(text)
2017 def get(self, text=''):
2018 '''Reads data form the storage
2020 - returns stored data or an empty string, at the moment only
2024 :text: a key to the storage to get the data
2028 with self.storage as stor:
2029 return stor.get(text, '')
2031 def clean(self, text=''):
2032 '''Cleans data form the storage
2035 :text: a key to the storage to clean the data
2037 with self.storage as stor:
2042 def threaded_job(arguments):
2043 """Run the job threaded
2046 :arguments: Command line arguments
2050 amount_left = arguments.amount
2051 utils_left = arguments.multiplicity
2052 prefix_current = arguments.firstprefix
2053 myip_current = arguments.myip
2054 port = arguments.port
2056 rpcqueue = Queue.Queue()
2057 storage = SafeDict()
2060 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
2061 amount_left -= amount_per_util
2064 args = deepcopy(arguments)
2065 args.amount = amount_per_util
2066 args.firstprefix = prefix_current
2067 args.myip = myip_current
2068 thread_args.append(args)
2072 prefix_current += amount_per_util * 16
2077 for t in thread_args:
2078 thread.start_new_thread(job, (t, rpcqueue, storage))
2080 print "Error: unable to start thread."
2083 if arguments.usepeerip:
2084 ip = arguments.peerip
2087 rpcserver = SimpleXMLRPCServer((ip.compressed, port), allow_none=True)
2088 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2089 rpcserver.serve_forever()
2092 if __name__ == "__main__":
2093 arguments = parse_arguments()
2094 logger = create_logger(arguments.loglevel, arguments.logfile)
2095 threaded_job(arguments)