1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Maximum number of IP prefixes to be announced in one iteration"
69 parser.add_argument("--insert", default="1", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72 str_help = "The number of prefixes to process without withdrawals"
73 parser.add_argument("--prefill", default="0", type=int, help=str_help)
74 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75 parser.add_argument("--updates", choices=["single", "separate"],
76 default=["separate"], help=str_help)
77 str_help = "Base prefix IP address for prefix generation"
78 parser.add_argument("--firstprefix", default="8.0.1.0",
79 type=ipaddr.IPv4Address, help=str_help)
80 str_help = "The prefix length."
81 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82 str_help = "Listen for connection, instead of initiating it."
83 parser.add_argument("--listen", action="store_true", help=str_help)
84 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85 "Default value only suitable for listening.")
86 parser.add_argument("--myip", default="0.0.0.0",
87 type=ipaddr.IPv4Address, help=str_help)
88 str_help = ("TCP port to bind to when listening or initiating connection." +
89 "Default only suitable for initiating.")
90 parser.add_argument("--myport", default="0", type=int, help=str_help)
91 str_help = "The IP of the next hop to be placed into the update messages."
92 parser.add_argument("--nexthop", default="192.0.2.1",
93 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94 str_help = "Identifier of the route originator."
95 parser.add_argument("--originator", default=None,
96 type=ipaddr.IPv4Address, dest="originator", help=str_help)
97 str_help = "Cluster list item identifier."
98 parser.add_argument("--cluster", default=None,
99 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100 str_help = ("Numeric IP Address to try to connect to." +
101 "Currently no effect in listening mode.")
102 parser.add_argument("--peerip", default="127.0.0.2",
103 type=ipaddr.IPv4Address, help=str_help)
104 str_help = "TCP port to try to connect to. No effect in listening mode."
105 parser.add_argument("--peerport", default="179", type=int, help=str_help)
106 str_help = "Local hold time."
107 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108 str_help = "Log level (--error, --warning, --info, --debug)"
109 parser.add_argument("--error", dest="loglevel", action="store_const",
110 const=logging.ERROR, default=logging.INFO,
112 parser.add_argument("--warning", dest="loglevel", action="store_const",
113 const=logging.WARNING, default=logging.INFO,
115 parser.add_argument("--info", dest="loglevel", action="store_const",
116 const=logging.INFO, default=logging.INFO,
118 parser.add_argument("--debug", dest="loglevel", action="store_const",
119 const=logging.DEBUG, default=logging.INFO,
121 str_help = "Log file name"
122 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123 str_help = "Trailing part of the csv result files for plotting purposes"
124 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125 str_help = "Minimum number of updates to reach to include result into csv."
126 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129 str_help = "Link-State NLRI supported"
130 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131 str_help = "Link-State NLRI: Identifier"
132 parser.add_argument("-lsid", default="1", type=int, help=str_help)
133 str_help = "Link-State NLRI: Tunnel ID"
134 parser.add_argument("-lstid", default="1", type=int, help=str_help)
135 str_help = "Link-State NLRI: LSP ID"
136 parser.add_argument("-lspid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138 parser.add_argument("--lstsaddr", default="1.2.3.4",
139 type=ipaddr.IPv4Address, help=str_help)
140 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141 parser.add_argument("--lsteaddr", default="5.6.7.8",
142 type=ipaddr.IPv4Address, help=str_help)
143 str_help = "Link-State NLRI: Identifier Step"
144 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145 str_help = "Link-State NLRI: Tunnel ID Step"
146 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147 str_help = "Link-State NLRI: LSP ID Step"
148 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153 str_help = "How many play utilities are to be started."
154 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
160 Enabling this flag makes the script not decoding the update mesage, because of not\
161 supported decoding for these elements."
162 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
163 str_help = "Open message includes L3VPN-MULTICAST arguments.\
164 Enabling this flag makes the script not decoding the update mesage, because of not\
165 supported decoding for these elements."
166 parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
167 str_help = "Open message includes L3VPN-UNICAST arguments, without message decoding."
168 parser.add_argument("--l3vpn", default=False, action="store_true", help=str_help)
169 str_help = "Open message includes ROUTE-TARGET-CONSTRAIN arguments, without message decoding."
170 parser.add_argument("--rt_constrain", default=False, action="store_true", help=str_help)
171 str_help = "Add all supported families without message decoding."
172 parser.add_argument("--allf", default=False, action="store_true", help=str_help)
173 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
174 str_help = "Skipping well known attributes for update message"
175 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
176 arguments = parser.parse_args()
177 if arguments.multiplicity < 1:
178 print "Multiplicity", arguments.multiplicity, "is not positive."
180 # TODO: Are sanity checks (such as asnumber>=0) required?
184 def establish_connection(arguments):
185 """Establish connection to BGP peer.
188 :arguments: following command-line argumets are used
189 - arguments.myip: local IP address
190 - arguments.myport: local port
191 - arguments.peerip: remote IP address
192 - arguments.peerport: remote port
197 logger.info("Connecting in the listening mode.")
198 logger.debug("Local IP address: " + str(arguments.myip))
199 logger.debug("Local port: " + str(arguments.myport))
200 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
201 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
202 # bind need single tuple as argument
203 listening_socket.bind((str(arguments.myip), arguments.myport))
204 listening_socket.listen(1)
205 bgp_socket, _ = listening_socket.accept()
206 # TODO: Verify client IP is cotroller IP.
207 listening_socket.close()
209 logger.info("Connecting in the talking mode.")
210 logger.debug("Local IP address: " + str(arguments.myip))
211 logger.debug("Local port: " + str(arguments.myport))
212 logger.debug("Remote IP address: " + str(arguments.peerip))
213 logger.debug("Remote port: " + str(arguments.peerport))
214 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
215 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
216 # bind to force specified address and port
217 talking_socket.bind((str(arguments.myip), arguments.myport))
218 # socket does not spead ipaddr, hence str()
219 talking_socket.connect((str(arguments.peerip), arguments.peerport))
220 bgp_socket = talking_socket
221 logger.info("Connected to ODL.")
225 def get_short_int_from_message(message, offset=16):
226 """Extract 2-bytes number from provided message.
229 :message: given message
230 :offset: offset of the short_int inside the message
232 :return: required short_inf value.
234 default offset value is the BGP message size offset.
236 high_byte_int = ord(message[offset])
237 low_byte_int = ord(message[offset + 1])
238 short_int = high_byte_int * 256 + low_byte_int
242 def get_prefix_list_from_hex(prefixes_hex):
243 """Get decoded list of prefixes (rfc4271#section-4.3)
246 :prefixes_hex: list of prefixes to be decoded in hex
248 :return: list of prefixes in the form of ip address (X.X.X.X/X)
252 while offset < len(prefixes_hex):
253 prefix_bit_len_hex = prefixes_hex[offset]
254 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
255 prefix_len = ((prefix_bit_len - 1) / 8) + 1
256 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
257 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
258 offset += 1 + prefix_len
259 prefix_list.append(prefix + "/" + str(prefix_bit_len))
263 class MessageError(ValueError):
264 """Value error with logging optimized for hexlified messages."""
266 def __init__(self, text, message, *args):
269 Store and call super init for textual comment,
270 store raw message which caused it.
274 super(MessageError, self).__init__(text, message, *args)
277 """Generate human readable error message.
280 :return: human readable message as string
282 Use a placeholder string if the message is to be empty.
284 message = binascii.hexlify(self.msg)
286 message = "(empty message)"
287 return self.text + ": " + message
290 def read_open_message(bgp_socket):
291 """Receive peer's OPEN message
294 :bgp_socket: the socket to be read
296 :return: received OPEN message.
298 Performs just basic incomming message checks
300 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
301 # TODO: Can the incoming open message be split in more than one packet?
304 # 37 is minimal length of open message with 4-byte AS number.
306 "Message length (" + str(len(msg_in)) + ") is smaller than "
307 "minimal length of OPEN message with 4-byte AS number (37)"
309 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
310 raise MessageError(error_msg, msg_in)
311 # TODO: We could check BGP marker, but it is defined only later;
313 reported_length = get_short_int_from_message(msg_in)
314 if len(msg_in) != reported_length:
316 "Expected message length (" + reported_length +
317 ") does not match actual length (" + str(len(msg_in)) + ")"
319 logger.error(error_msg + binascii.hexlify(msg_in))
320 raise MessageError(error_msg, msg_in)
321 logger.info("Open message received.")
325 class MessageGenerator(object):
326 """Class which generates messages, holds states and configuration values."""
328 # TODO: Define bgp marker as a class (constant) variable.
329 def __init__(self, args):
330 """Initialisation according to command-line args.
333 :args: argsparser's Namespace object which contains command-line
334 options for MesageGenerator initialisation
336 Calculates and stores default values used later on for
339 self.total_prefix_amount = args.amount
340 # Number of update messages left to be sent.
341 self.remaining_prefixes = self.total_prefix_amount
343 # New parameters initialisation
345 self.prefix_base_default = args.firstprefix
346 self.prefix_length_default = args.prefixlen
347 self.wr_prefixes_default = []
348 self.nlri_prefixes_default = []
349 self.version_default = 4
350 self.my_autonomous_system_default = args.asnumber
351 self.hold_time_default = args.holdtime # Local hold time.
352 self.bgp_identifier_default = int(args.myip)
353 self.next_hop_default = args.nexthop
354 self.originator_id_default = args.originator
355 self.cluster_list_item_default = args.cluster
356 self.single_update_default = args.updates == "single"
357 self.randomize_updates_default = args.updates == "random"
358 self.prefix_count_to_add_default = args.insert
359 self.prefix_count_to_del_default = args.withdraw
360 if self.prefix_count_to_del_default < 0:
361 self.prefix_count_to_del_default = 0
362 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
363 # total number of prefixes must grow to avoid infinite test loop
364 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
365 self.slot_size_default = self.prefix_count_to_add_default
366 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
367 self.results_file_name_default = args.results
368 self.performance_threshold_default = args.threshold
369 self.rfc4760 = args.rfc4760
370 self.bgpls = args.bgpls
371 self.evpn = args.evpn
372 self.mvpn = args.mvpn
373 self.l3vpn_mcast = args.l3vpn_mcast
374 self.l3vpn = args.l3vpn
375 self.rt_constrain = args.rt_constrain
376 self.allf = args.allf
377 self.skipattr = args.skipattr
378 # Default values when BGP-LS Attributes are used
380 self.prefix_count_to_add_default = 1
381 self.prefix_count_to_del_default = 0
382 self.ls_nlri_default = {"Identifier": args.lsid,
383 "TunnelID": args.lstid,
385 "IPv4TunnelSenderAddress": args.lstsaddr,
386 "IPv4TunnelEndPointAddress": args.lsteaddr}
387 self.lsid_step = args.lsidstep
388 self.lstid_step = args.lstidstep
389 self.lspid_step = args.lspidstep
390 self.lstsaddr_step = args.lstsaddrstep
391 self.lsteaddr_step = args.lsteaddrstep
392 # Default values used for randomized part
393 s1_slots = ((self.total_prefix_amount -
394 self.remaining_prefixes_threshold - 1) /
395 self.prefix_count_to_add_default + 1)
396 s2_slots = ((self.remaining_prefixes_threshold - 1) /
397 (self.prefix_count_to_add_default -
398 self.prefix_count_to_del_default) + 1)
400 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
401 s2_first_index = s1_slots * self.prefix_count_to_add_default
402 s2_last_index = (s2_first_index +
403 s2_slots * (self.prefix_count_to_add_default -
404 self.prefix_count_to_del_default) - 1)
405 self.slot_gap_default = ((self.total_prefix_amount -
406 self.remaining_prefixes_threshold - 1) /
407 self.prefix_count_to_add_default + 1)
408 self.randomize_lowest_default = s2_first_index
409 self.randomize_highest_default = s2_last_index
410 # Initialising counters
411 self.phase1_start_time = 0
412 self.phase1_stop_time = 0
413 self.phase2_start_time = 0
414 self.phase2_stop_time = 0
415 self.phase1_updates_sent = 0
416 self.phase2_updates_sent = 0
417 self.updates_sent = 0
419 self.log_info = args.loglevel <= logging.INFO
420 self.log_debug = args.loglevel <= logging.DEBUG
422 Flags needed for the MessageGenerator performance optimization.
423 Calling logger methods each iteration even with proper log level set
424 slows down significantly the MessageGenerator performance.
425 Measured total generation time (1M updates, dry run, error log level):
426 - logging based on basic logger features: 36,2s
427 - logging based on advanced logger features (lazy logging): 21,2s
428 - conditional calling of logger methods enclosed inside condition: 8,6s
431 logger.info("Generator initialisation")
432 logger.info(" Target total number of prefixes to be introduced: " +
433 str(self.total_prefix_amount))
434 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
435 str(self.prefix_length_default))
436 logger.info(" My Autonomous System number: " +
437 str(self.my_autonomous_system_default))
438 logger.info(" My Hold Time: " + str(self.hold_time_default))
439 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
440 logger.info(" Next Hop: " + str(self.next_hop_default))
441 logger.info(" Originator ID: " + str(self.originator_id_default))
442 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
443 logger.info(" Prefix count to be inserted at once: " +
444 str(self.prefix_count_to_add_default))
445 logger.info(" Prefix count to be withdrawn at once: " +
446 str(self.prefix_count_to_del_default))
447 logger.info(" Fast pre-fill up to " +
448 str(self.total_prefix_amount -
449 self.remaining_prefixes_threshold) + " prefixes")
450 logger.info(" Remaining number of prefixes to be processed " +
451 "in parallel with withdrawals: " +
452 str(self.remaining_prefixes_threshold))
453 logger.debug(" Prefix index range used after pre-fill procedure [" +
454 str(self.randomize_lowest_default) + ", " +
455 str(self.randomize_highest_default) + "]")
456 if self.single_update_default:
457 logger.info(" Common single UPDATE will be generated " +
458 "for both NLRI & WITHDRAWN lists")
460 logger.info(" Two separate UPDATEs will be generated " +
461 "for each NLRI & WITHDRAWN lists")
462 if self.randomize_updates_default:
463 logger.info(" Generation of UPDATE messages will be randomized")
464 logger.info(" Let\'s go ...\n")
466 # TODO: Notification for hold timer expiration can be handy.
468 def store_results(self, file_name=None, threshold=None):
469 """ Stores specified results into files based on file_name value.
472 :param file_name: Trailing (common) part of result file names
473 :param threshold: Minimum number of sent updates needed for each
474 result to be included into result csv file
475 (mainly needed because of the result accuracy)
479 # default values handling
480 # TODO optimize default values handling (use e.g. dicionary.update() approach)
481 if file_name is None:
482 file_name = self.results_file_name_default
483 if threshold is None:
484 threshold = self.performance_threshold_default
485 # performance calculation
486 if self.phase1_updates_sent >= threshold:
487 totals1 = self.phase1_updates_sent
488 performance1 = int(self.phase1_updates_sent /
489 (self.phase1_stop_time - self.phase1_start_time))
493 if self.phase2_updates_sent >= threshold:
494 totals2 = self.phase2_updates_sent
495 performance2 = int(self.phase2_updates_sent /
496 (self.phase2_stop_time - self.phase2_start_time))
501 logger.info("#" * 10 + " Final results " + "#" * 10)
502 logger.info("Number of iterations: " + str(self.iteration))
503 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
504 str(self.phase1_updates_sent))
505 logger.info("The pre-fill phase duration: " +
506 str(self.phase1_stop_time - self.phase1_start_time) + "s")
507 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
508 str(self.phase2_updates_sent))
509 logger.info("The 2nd test phase duration: " +
510 str(self.phase2_stop_time - self.phase2_start_time) + "s")
511 logger.info("Threshold for performance reporting: " + str(threshold))
514 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
515 " route(s) per UPDATE")
516 if self.single_update_default:
517 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
518 "/-" + str(self.prefix_count_to_del_default) +
519 " routes per UPDATE")
521 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
522 "/-" + str(self.prefix_count_to_del_default) +
523 " routes in two UPDATEs")
524 # collecting capacity and performance results
527 if totals1 is not None:
528 totals[phase1_label] = totals1
529 performance[phase1_label] = performance1
530 if totals2 is not None:
531 totals[phase2_label] = totals2
532 performance[phase2_label] = performance2
533 self.write_results_to_file(totals, "totals-" + file_name)
534 self.write_results_to_file(performance, "performance-" + file_name)
536 def write_results_to_file(self, results, file_name):
537 """Writes results to the csv plot file consumable by Jenkins.
540 :param file_name: Name of the (csv) file to be created
546 f = open(file_name, "wt")
548 for key in sorted(results):
549 first_line += key + ", "
550 second_line += str(results[key]) + ", "
551 first_line = first_line[:-2]
552 second_line = second_line[:-2]
553 f.write(first_line + "\n")
554 f.write(second_line + "\n")
555 logger.info("Message generator performance results stored in " +
557 logger.info(" " + first_line)
558 logger.info(" " + second_line)
562 # Return pseudo-randomized (reproducible) index for selected range
563 def randomize_index(self, index, lowest=None, highest=None):
564 """Calculates pseudo-randomized index from selected range.
567 :param index: input index
568 :param lowest: the lowes index from the randomized area
569 :param highest: the highest index from the randomized area
571 :return: the (pseudo)randomized index
573 Created just as a fame for future generator enhancement.
575 # default values handling
576 # TODO optimize default values handling (use e.g. dicionary.update() approach)
578 lowest = self.randomize_lowest_default
580 highest = self.randomize_highest_default
582 if (index >= lowest) and (index <= highest):
583 # we are in the randomized range -> shuffle it inside
584 # the range (now just reverse the order)
585 new_index = highest - (index - lowest)
587 # we are out of the randomized range -> nothing to do
591 def get_ls_nlri_values(self, index):
592 """Generates LS-NLRI parameters.
593 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
596 :param index: index (iteration)
598 :return: dictionary of LS NLRI parameters and values
600 # generating list of LS NLRI parameters
601 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
602 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
603 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
604 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
605 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
606 ls_nlri_values = {"Identifier": identifier,
607 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
608 "TunnelID": tunnel_id, "LSPID": lsp_id,
609 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
610 return ls_nlri_values
612 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
613 prefix_len=None, prefix_count=None, randomize=None):
614 """Generates list of IP address prefixes.
617 :param slot_index: index of group of prefix addresses
618 :param slot_size: size of group of prefix addresses
619 in [number of included prefixes]
620 :param prefix_base: IP address of the first prefix
621 (slot_index = 0, prefix_index = 0)
622 :param prefix_len: length of the prefix in bites
623 (the same as size of netmask)
624 :param prefix_count: number of prefixes to be returned
625 from the specified slot
627 :return: list of generated IP address prefixes
629 # default values handling
630 # TODO optimize default values handling (use e.g. dicionary.update() approach)
631 if slot_size is None:
632 slot_size = self.slot_size_default
633 if prefix_base is None:
634 prefix_base = self.prefix_base_default
635 if prefix_len is None:
636 prefix_len = self.prefix_length_default
637 if prefix_count is None:
638 prefix_count = slot_size
639 if randomize is None:
640 randomize = self.randomize_updates_default
641 # generating list of prefixes
644 prefix_gap = 2 ** (32 - prefix_len)
645 for i in range(prefix_count):
646 prefix_index = slot_index * slot_size + i
648 prefix_index = self.randomize_index(prefix_index)
649 indexes.append(prefix_index)
650 prefixes.append(prefix_base + prefix_index * prefix_gap)
652 logger.debug(" Prefix slot index: " + str(slot_index))
653 logger.debug(" Prefix slot size: " + str(slot_size))
654 logger.debug(" Prefix count: " + str(prefix_count))
655 logger.debug(" Prefix indexes: " + str(indexes))
656 logger.debug(" Prefix list: " + str(prefixes))
659 def compose_update_message(self, prefix_count_to_add=None,
660 prefix_count_to_del=None):
661 """Composes an UPDATE message
664 :param prefix_count_to_add: # of prefixes to put into NLRI list
665 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
667 :return: encoded UPDATE message in HEX
669 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
670 lists or common message wich includes both prefix lists.
671 Updates global counters.
673 # default values handling
674 # TODO optimize default values handling (use e.g. dicionary.update() approach)
675 if prefix_count_to_add is None:
676 prefix_count_to_add = self.prefix_count_to_add_default
677 if prefix_count_to_del is None:
678 prefix_count_to_del = self.prefix_count_to_del_default
680 if self.log_info and not (self.iteration % 1000):
681 logger.info("Iteration: " + str(self.iteration) +
682 " - total remaining prefixes: " +
683 str(self.remaining_prefixes))
685 logger.debug("#" * 10 + " Iteration: " +
686 str(self.iteration) + " " + "#" * 10)
687 logger.debug("Remaining prefixes: " +
688 str(self.remaining_prefixes))
689 # scenario type & one-shot counter
690 straightforward_scenario = (self.remaining_prefixes >
691 self.remaining_prefixes_threshold)
692 if straightforward_scenario:
693 prefix_count_to_del = 0
695 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
696 if not self.phase1_start_time:
697 self.phase1_start_time = time.time()
700 logger.debug("--- COMBINED SCENARIO ---")
701 if not self.phase2_start_time:
702 self.phase2_start_time = time.time()
703 # tailor the number of prefixes if needed
704 prefix_count_to_add = (prefix_count_to_del +
705 min(prefix_count_to_add - prefix_count_to_del,
706 self.remaining_prefixes))
707 # prefix slots selection for insertion and withdrawal
708 slot_index_to_add = self.iteration
709 slot_index_to_del = slot_index_to_add - self.slot_gap_default
710 # getting lists of prefixes for insertion in this iteration
712 logger.debug("Prefixes to be inserted in this iteration:")
713 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
714 prefix_count=prefix_count_to_add)
715 # getting lists of prefixes for withdrawal in this iteration
717 logger.debug("Prefixes to be withdrawn in this iteration:")
718 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
719 prefix_count=prefix_count_to_del)
720 # generating the UPDATE mesage with LS-NLRI only
722 ls_nlri = self.get_ls_nlri_values(self.iteration)
723 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
726 # generating the UPDATE message with prefix lists
727 if self.single_update_default:
728 # Send prefixes to be introduced and withdrawn
729 # in one UPDATE message
730 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
731 nlri_prefixes=prefix_list_to_add)
733 # Send prefixes to be introduced and withdrawn
734 # in separate UPDATE messages (if needed)
735 msg_out = self.update_message(wr_prefixes=[],
736 nlri_prefixes=prefix_list_to_add)
737 if prefix_count_to_del:
738 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
740 # updating counters - who knows ... maybe I am last time here ;)
741 if straightforward_scenario:
742 self.phase1_stop_time = time.time()
743 self.phase1_updates_sent = self.updates_sent
745 self.phase2_stop_time = time.time()
746 self.phase2_updates_sent = (self.updates_sent -
747 self.phase1_updates_sent)
748 # updating totals for the next iteration
750 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
751 # returning the encoded message
754 # Section of message encoders
756 def open_message(self, version=None, my_autonomous_system=None,
757 hold_time=None, bgp_identifier=None):
758 """Generates an OPEN Message (rfc4271#section-4.2)
761 :param version: see the rfc4271#section-4.2
762 :param my_autonomous_system: see the rfc4271#section-4.2
763 :param hold_time: see the rfc4271#section-4.2
764 :param bgp_identifier: see the rfc4271#section-4.2
766 :return: encoded OPEN message in HEX
769 # default values handling
770 # TODO optimize default values handling (use e.g. dicionary.update() approach)
772 version = self.version_default
773 if my_autonomous_system is None:
774 my_autonomous_system = self.my_autonomous_system_default
775 if hold_time is None:
776 hold_time = self.hold_time_default
777 if bgp_identifier is None:
778 bgp_identifier = self.bgp_identifier_default
781 marker_hex = "\xFF" * 16
785 type_hex = struct.pack("B", type)
788 version_hex = struct.pack("B", version)
790 # my_autonomous_system
791 # AS_TRANS value, 23456 decadic.
792 my_autonomous_system_2_bytes = 23456
793 # AS number is mappable to 2 bytes
794 if my_autonomous_system < 65536:
795 my_autonomous_system_2_bytes = my_autonomous_system
796 my_autonomous_system_hex_2_bytes = struct.pack(">H",
797 my_autonomous_system)
800 hold_time_hex = struct.pack(">H", hold_time)
803 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
805 # Optional Parameters
806 optional_parameters_hex = ""
807 if self.rfc4760 or self.allf:
808 optional_parameter_hex = (
809 "\x02" # Param type ("Capability Ad")
810 "\x06" # Length (6 bytes)
811 "\x01" # Capability type (NLRI Unicast),
812 # see RFC 4760, secton 8
813 "\x04" # Capability value length
814 "\x00\x01" # AFI (Ipv4)
816 "\x01" # SAFI (Unicast)
818 optional_parameters_hex += optional_parameter_hex
820 if self.bgpls or self.allf:
821 optional_parameter_hex = (
822 "\x02" # Param type ("Capability Ad")
823 "\x06" # Length (6 bytes)
824 "\x01" # Capability type (NLRI Unicast),
825 # see RFC 4760, secton 8
826 "\x04" # Capability value length
827 "\x40\x04" # AFI (BGP-LS)
829 "\x47" # SAFI (BGP-LS)
831 optional_parameters_hex += optional_parameter_hex
833 if self.evpn or self.allf:
834 optional_parameter_hex = (
835 "\x02" # Param type ("Capability Ad")
836 "\x06" # Length (6 bytes)
837 "\x01" # Multiprotocol extetension capability,
838 "\x04" # Capability value length
839 "\x00\x19" # AFI (L2-VPN)
843 optional_parameters_hex += optional_parameter_hex
845 if self.mvpn or self.allf:
846 optional_parameter_hex = (
847 "\x02" # Param type ("Capability Ad")
848 "\x06" # Length (6 bytes)
849 "\x01" # Multiprotocol extetension capability,
850 "\x04" # Capability value length
851 "\x00\x01" # AFI (IPV4)
853 "\x05" # SAFI (MCAST-VPN)
855 optional_parameters_hex += optional_parameter_hex
856 optional_parameter_hex = (
857 "\x02" # Param type ("Capability Ad")
858 "\x06" # Length (6 bytes)
859 "\x01" # Multiprotocol extetension capability,
860 "\x04" # Capability value length
861 "\x00\x02" # AFI (IPV6)
863 "\x05" # SAFI (MCAST-VPN)
865 optional_parameters_hex += optional_parameter_hex
867 if self.l3vpn_mcast or self.allf:
868 optional_parameter_hex = (
869 "\x02" # Param type ("Capability Ad")
870 "\x06" # Length (6 bytes)
871 "\x01" # Multiprotocol extetension capability,
872 "\x04" # Capability value length
873 "\x00\x01" # AFI (IPV4)
875 "\x81" # SAFI (L3VPN-MCAST)
877 optional_parameters_hex += optional_parameter_hex
878 optional_parameter_hex = (
879 "\x02" # Param type ("Capability Ad")
880 "\x06" # Length (6 bytes)
881 "\x01" # Multiprotocol extetension capability,
882 "\x04" # Capability value length
883 "\x00\x02" # AFI (IPV6)
885 "\x81" # SAFI (L3VPN-MCAST)
887 optional_parameters_hex += optional_parameter_hex
889 if self.l3vpn or self.allf:
890 optional_parameter_hex = (
891 "\x02" # Param type ("Capability Ad")
892 "\x06" # Length (6 bytes)
893 "\x01" # Multiprotocol extetension capability,
894 "\x04" # Capability value length
895 "\x00\x01" # AFI (IPV4)
897 "\x80" # SAFI (L3VPN-UNICAST)
899 optional_parameters_hex += optional_parameter_hex
900 optional_parameter_hex = (
901 "\x02" # Param type ("Capability Ad")
902 "\x06" # Length (6 bytes)
903 "\x01" # Multiprotocol extetension capability,
904 "\x04" # Capability value length
905 "\x00\x02" # AFI (IPV6)
907 "\x80" # SAFI (L3VPN-UNICAST)
909 optional_parameters_hex += optional_parameter_hex
911 if self.rt_constrain or self.allf:
912 optional_parameter_hex = (
913 "\x02" # Param type ("Capability Ad")
914 "\x06" # Length (6 bytes)
915 "\x01" # Multiprotocol extetension capability,
916 "\x04" # Capability value length
917 "\x00\x01" # AFI (IPV4)
919 "\x84" # SAFI (ROUTE-TARGET-CONSTRAIN)
921 optional_parameters_hex += optional_parameter_hex
923 optional_parameter_hex = (
924 "\x02" # Param type ("Capability Ad")
925 "\x06" # Length (6 bytes)
926 "\x41" # "32 bit AS Numbers Support"
927 # (see RFC 6793, section 3)
928 "\x04" # Capability value length
930 optional_parameter_hex += (
931 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
933 optional_parameters_hex += optional_parameter_hex
935 # Optional Parameters Length
936 optional_parameters_length = len(optional_parameters_hex)
937 optional_parameters_length_hex = struct.pack("B",
938 optional_parameters_length)
940 # Length (big-endian)
942 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
943 len(my_autonomous_system_hex_2_bytes) +
944 len(hold_time_hex) + len(bgp_identifier_hex) +
945 len(optional_parameters_length_hex) +
946 len(optional_parameters_hex)
948 length_hex = struct.pack(">H", length)
956 my_autonomous_system_hex_2_bytes +
959 optional_parameters_length_hex +
960 optional_parameters_hex
964 logger.debug("OPEN message encoding")
965 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
966 logger.debug(" Length=" + str(length) + " (0x" +
967 binascii.hexlify(length_hex) + ")")
968 logger.debug(" Type=" + str(type) + " (0x" +
969 binascii.hexlify(type_hex) + ")")
970 logger.debug(" Version=" + str(version) + " (0x" +
971 binascii.hexlify(version_hex) + ")")
972 logger.debug(" My Autonomous System=" +
973 str(my_autonomous_system_2_bytes) + " (0x" +
974 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
976 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
977 binascii.hexlify(hold_time_hex) + ")")
978 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
979 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
980 logger.debug(" Optional Parameters Length=" +
981 str(optional_parameters_length) + " (0x" +
982 binascii.hexlify(optional_parameters_length_hex) +
984 logger.debug(" Optional Parameters=0x" +
985 binascii.hexlify(optional_parameters_hex))
986 logger.debug("OPEN message encoded: 0x%s",
987 binascii.b2a_hex(message_hex))
991 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
992 wr_prefix_length=None, nlri_prefix_length=None,
993 my_autonomous_system=None, next_hop=None,
994 originator_id=None, cluster_list_item=None,
995 end_of_rib=False, **ls_nlri_params):
996 """Generates an UPDATE Message (rfc4271#section-4.3)
999 :param wr_prefixes: see the rfc4271#section-4.3
1000 :param nlri_prefixes: see the rfc4271#section-4.3
1001 :param wr_prefix_length: see the rfc4271#section-4.3
1002 :param nlri_prefix_length: see the rfc4271#section-4.3
1003 :param my_autonomous_system: see the rfc4271#section-4.3
1004 :param next_hop: see the rfc4271#section-4.3
1006 :return: encoded UPDATE message in HEX
1009 # default values handling
1010 # TODO optimize default values handling (use e.g. dicionary.update() approach)
1011 if wr_prefixes is None:
1012 wr_prefixes = self.wr_prefixes_default
1013 if nlri_prefixes is None:
1014 nlri_prefixes = self.nlri_prefixes_default
1015 if wr_prefix_length is None:
1016 wr_prefix_length = self.prefix_length_default
1017 if nlri_prefix_length is None:
1018 nlri_prefix_length = self.prefix_length_default
1019 if my_autonomous_system is None:
1020 my_autonomous_system = self.my_autonomous_system_default
1021 if next_hop is None:
1022 next_hop = self.next_hop_default
1023 if originator_id is None:
1024 originator_id = self.originator_id_default
1025 if cluster_list_item is None:
1026 cluster_list_item = self.cluster_list_item_default
1027 ls_nlri = self.ls_nlri_default.copy()
1028 ls_nlri.update(ls_nlri_params)
1031 marker_hex = "\xFF" * 16
1035 type_hex = struct.pack("B", type)
1038 withdrawn_routes_hex = ""
1040 bytes = ((wr_prefix_length - 1) / 8) + 1
1041 for prefix in wr_prefixes:
1042 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1043 struct.pack(">I", int(prefix))[:bytes])
1044 withdrawn_routes_hex += withdrawn_route_hex
1046 # Withdrawn Routes Length
1047 withdrawn_routes_length = len(withdrawn_routes_hex)
1048 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1050 # TODO: to replace hardcoded string by encoding?
1052 path_attributes_hex = ""
1053 if not self.skipattr:
1054 path_attributes_hex += (
1055 "\x40" # Flags ("Well-Known")
1056 "\x01" # Type (ORIGIN)
1058 "\x00" # Origin: IGP
1060 path_attributes_hex += (
1061 "\x40" # Flags ("Well-Known")
1062 "\x02" # Type (AS_PATH)
1064 "\x02" # AS segment type (AS_SEQUENCE)
1065 "\x01" # AS segment length (1)
1067 my_as_hex = struct.pack(">I", my_autonomous_system)
1068 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1069 path_attributes_hex += (
1070 "\x40" # Flags ("Well-Known")
1071 "\x05" # Type (LOCAL_PREF)
1073 "\x00\x00\x00\x64" # (100)
1075 if nlri_prefixes != []:
1076 path_attributes_hex += (
1077 "\x40" # Flags ("Well-Known")
1078 "\x03" # Type (NEXT_HOP)
1081 next_hop_hex = struct.pack(">I", int(next_hop))
1082 path_attributes_hex += (
1083 next_hop_hex # IP address of the next hop (4 bytes)
1085 if originator_id is not None:
1086 path_attributes_hex += (
1087 "\x80" # Flags ("Optional, non-transitive")
1088 "\x09" # Type (ORIGINATOR_ID)
1090 ) # ORIGINATOR_ID (4 bytes)
1091 path_attributes_hex += struct.pack(">I", int(originator_id))
1092 if cluster_list_item is not None:
1093 path_attributes_hex += (
1094 "\x80" # Flags ("Optional, non-transitive")
1095 "\x0a" # Type (CLUSTER_LIST)
1097 ) # one CLUSTER_LIST item (4 bytes)
1098 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1100 if self.bgpls and not end_of_rib:
1101 path_attributes_hex += (
1102 "\x80" # Flags ("Optional, non-transitive")
1103 "\x0e" # Type (MP_REACH_NLRI)
1104 "\x22" # Length (34)
1105 "\x40\x04" # AFI (BGP-LS)
1106 "\x47" # SAFI (BGP-LS)
1107 "\x04" # Next Hop Length (4)
1109 path_attributes_hex += struct.pack(">I", int(next_hop))
1110 path_attributes_hex += "\x00" # Reserved
1111 path_attributes_hex += (
1112 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1113 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1114 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1116 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1117 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1118 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1119 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1120 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1122 # Total Path Attributes Length
1123 total_path_attributes_length = len(path_attributes_hex)
1124 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1126 # Network Layer Reachability Information
1129 bytes = ((nlri_prefix_length - 1) / 8) + 1
1130 for prefix in nlri_prefixes:
1131 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1132 struct.pack(">I", int(prefix))[:bytes])
1133 nlri_hex += nlri_prefix_hex
1135 # Length (big-endian)
1137 len(marker_hex) + 2 + len(type_hex) +
1138 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1139 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1141 length_hex = struct.pack(">H", length)
1148 withdrawn_routes_length_hex +
1149 withdrawn_routes_hex +
1150 total_path_attributes_length_hex +
1151 path_attributes_hex +
1156 logger.debug("UPDATE message encoding")
1157 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1158 logger.debug(" Length=" + str(length) + " (0x" +
1159 binascii.hexlify(length_hex) + ")")
1160 logger.debug(" Type=" + str(type) + " (0x" +
1161 binascii.hexlify(type_hex) + ")")
1162 logger.debug(" withdrawn_routes_length=" +
1163 str(withdrawn_routes_length) + " (0x" +
1164 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1165 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1166 str(wr_prefix_length) + " (0x" +
1167 binascii.hexlify(withdrawn_routes_hex) + ")")
1168 if total_path_attributes_length:
1169 logger.debug(" Total Path Attributes Length=" +
1170 str(total_path_attributes_length) + " (0x" +
1171 binascii.hexlify(total_path_attributes_length_hex) + ")")
1172 logger.debug(" Path Attributes=" + "(0x" +
1173 binascii.hexlify(path_attributes_hex) + ")")
1174 logger.debug(" Origin=IGP")
1175 logger.debug(" AS path=" + str(my_autonomous_system))
1176 logger.debug(" Next hop=" + str(next_hop))
1177 if originator_id is not None:
1178 logger.debug(" Originator id=" + str(originator_id))
1179 if cluster_list_item is not None:
1180 logger.debug(" Cluster list=" + str(cluster_list_item))
1182 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1183 logger.debug(" Network Layer Reachability Information=" +
1184 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1185 " (0x" + binascii.hexlify(nlri_hex) + ")")
1186 logger.debug("UPDATE message encoded: 0x" +
1187 binascii.b2a_hex(message_hex))
1190 self.updates_sent += 1
1191 # returning encoded message
1194 def notification_message(self, error_code, error_subcode, data_hex=""):
1195 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1198 :param error_code: see the rfc4271#section-4.5
1199 :param error_subcode: see the rfc4271#section-4.5
1200 :param data_hex: see the rfc4271#section-4.5
1202 :return: encoded NOTIFICATION message in HEX
1206 marker_hex = "\xFF" * 16
1210 type_hex = struct.pack("B", type)
1213 error_code_hex = struct.pack("B", error_code)
1216 error_subcode_hex = struct.pack("B", error_subcode)
1218 # Length (big-endian)
1219 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1220 len(error_subcode_hex) + len(data_hex))
1221 length_hex = struct.pack(">H", length)
1223 # NOTIFICATION Message
1234 logger.debug("NOTIFICATION message encoding")
1235 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1236 logger.debug(" Length=" + str(length) + " (0x" +
1237 binascii.hexlify(length_hex) + ")")
1238 logger.debug(" Type=" + str(type) + " (0x" +
1239 binascii.hexlify(type_hex) + ")")
1240 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1241 binascii.hexlify(error_code_hex) + ")")
1242 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1243 binascii.hexlify(error_subcode_hex) + ")")
1244 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1245 logger.debug("NOTIFICATION message encoded: 0x%s",
1246 binascii.b2a_hex(message_hex))
1250 def keepalive_message(self):
1251 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1254 :return: encoded KEEP ALIVE message in HEX
1258 marker_hex = "\xFF" * 16
1262 type_hex = struct.pack("B", type)
1264 # Length (big-endian)
1265 length = len(marker_hex) + 2 + len(type_hex)
1266 length_hex = struct.pack(">H", length)
1268 # KEEP ALIVE Message
1276 logger.debug("KEEP ALIVE message encoding")
1277 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1278 logger.debug(" Length=" + str(length) + " (0x" +
1279 binascii.hexlify(length_hex) + ")")
1280 logger.debug(" Type=" + str(type) + " (0x" +
1281 binascii.hexlify(type_hex) + ")")
1282 logger.debug("KEEP ALIVE message encoded: 0x%s",
1283 binascii.b2a_hex(message_hex))
1288 class TimeTracker(object):
1289 """Class for tracking timers, both for my keepalives and
1293 def __init__(self, msg_in):
1294 """Initialisation. based on defaults and OPEN message from peer.
1297 msg_in: the OPEN message received from peer.
1299 # Note: Relative time is always named timedelta, to stress that
1300 # the (non-delta) time is absolute.
1301 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1302 # Upper bound for being stuck in the same state, we should
1303 # at least report something before continuing.
1304 # Negotiate the hold timer by taking the smaller
1305 # of the 2 values (mine and the peer's).
1306 hold_timedelta = 180 # Not an attribute of self yet.
1307 # TODO: Make the default value configurable,
1308 # default value could mirror what peer said.
1309 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1310 if hold_timedelta > peer_hold_timedelta:
1311 hold_timedelta = peer_hold_timedelta
1312 if hold_timedelta != 0 and hold_timedelta < 3:
1313 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1314 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1315 self.hold_timedelta = hold_timedelta
1316 # If we do not hear from peer this long, we assume it has died.
1317 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1318 # Upper limit for duration between messages, to avoid being
1319 # declared to be dead.
1320 # The same as calling snapshot(), but also declares a field.
1321 self.snapshot_time = time.time()
1322 # Sometimes we need to store time. This is where to get
1323 # the value from afterwards. Time_keepalive may be too strict.
1324 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1325 # At this time point, peer will be declared dead.
1326 self.my_keepalive_time = None # to be set later
1327 # At this point, we should be sending keepalive message.
1330 """Store current time in instance data to use later."""
1331 # Read as time before something interesting was called.
1332 self.snapshot_time = time.time()
1334 def reset_peer_hold_time(self):
1335 """Move hold time to future as peer has just proven it still lives."""
1336 self.peer_hold_time = time.time() + self.hold_timedelta
1338 # Some methods could rely on self.snapshot_time, but it is better
1339 # to require user to provide it explicitly.
1340 def reset_my_keepalive_time(self, keepalive_time):
1341 """Calculate and set the next my KEEP ALIVE timeout time
1344 :keepalive_time: the initial value of the KEEP ALIVE timer
1346 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1348 def is_time_for_my_keepalive(self):
1349 """Check for my KEEP ALIVE timeout occurence"""
1350 if self.hold_timedelta == 0:
1352 return self.snapshot_time >= self.my_keepalive_time
1354 def get_next_event_time(self):
1355 """Set the time of the next expected or to be sent KEEP ALIVE"""
1356 if self.hold_timedelta == 0:
1357 return self.snapshot_time + 86400
1358 return min(self.my_keepalive_time, self.peer_hold_time)
1360 def check_peer_hold_time(self, snapshot_time):
1361 """Raise error if nothing was read from peer until specified time."""
1362 # Hold time = 0 means keepalive checking off.
1363 if self.hold_timedelta != 0:
1364 # time.time() may be too strict
1365 if snapshot_time > self.peer_hold_time:
1366 logger.error("Peer has overstepped the hold timer.")
1367 raise RuntimeError("Peer has overstepped the hold timer.")
1368 # TODO: Include hold_timedelta?
1369 # TODO: Add notification sending (attempt). That means
1370 # move to write tracker.
1373 class ReadTracker(object):
1374 """Class for tracking read of mesages chunk by chunk and
1378 def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1379 l3vpn_mcast=False, allf=False, l3vpn=False, rt_constrain=False,
1381 """The reader initialisation.
1384 bgp_socket: socket to be used for sending
1385 timer: timer to be used for scheduling
1386 storage: thread safe dict
1387 evpn: flag that evpn functionality is tested
1388 mvpn: flag that mvpn functionality is tested
1389 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1390 l3vpn: flag that l3vpn unicast functionality is tested
1391 rt_constrain: flag that rt-constrain functionality is tested
1392 allf: flag for all family testing.
1394 # References to outside objects.
1395 self.socket = bgp_socket
1397 # BGP marker length plus length field length.
1398 self.header_length = 18
1399 # TODO: make it class (constant) attribute
1400 # Computation of where next chunk ends depends on whether
1401 # we are beyond length field.
1402 self.reading_header = True
1403 # Countdown towards next size computation.
1404 self.bytes_to_read = self.header_length
1405 # Incremental buffer for message under read.
1407 # Initialising counters
1408 self.updates_received = 0
1409 self.prefixes_introduced = 0
1410 self.prefixes_withdrawn = 0
1411 self.rx_idle_time = 0
1412 self.rx_activity_detected = True
1413 self.storage = storage
1416 self.l3vpn_mcast = l3vpn_mcast
1418 self.rt_constrain = rt_constrain
1420 self.wfr = wait_for_read
1422 def read_message_chunk(self):
1423 """Read up to one message
1426 Currently it does not return anything.
1428 # TODO: We could return the whole message, currently not needed.
1429 # We assume the socket is readable.
1430 chunk_message = self.socket.recv(self.bytes_to_read)
1431 self.msg_in += chunk_message
1432 self.bytes_to_read -= len(chunk_message)
1433 # TODO: bytes_to_read < 0 is not possible, right?
1434 if not self.bytes_to_read:
1435 # Finished reading a logical block.
1436 if self.reading_header:
1437 # The logical block was a BGP header.
1438 # Now we know the size of the message.
1439 self.reading_header = False
1440 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1442 else: # We have finished reading the body of the message.
1443 # Peer has just proven it is still alive.
1444 self.timer.reset_peer_hold_time()
1445 # TODO: Do we want to count received messages?
1446 # This version ignores the received message.
1447 # TODO: Should we do validation and exit on anything
1448 # besides update or keepalive?
1449 # Prepare state for reading another message.
1450 message_type_hex = self.msg_in[self.header_length]
1451 if message_type_hex == "\x01":
1452 logger.info("OPEN message received: 0x%s",
1453 binascii.b2a_hex(self.msg_in))
1454 elif message_type_hex == "\x02":
1455 logger.debug("UPDATE message received: 0x%s",
1456 binascii.b2a_hex(self.msg_in))
1457 self.decode_update_message(self.msg_in)
1458 elif message_type_hex == "\x03":
1459 logger.info("NOTIFICATION message received: 0x%s",
1460 binascii.b2a_hex(self.msg_in))
1461 elif message_type_hex == "\x04":
1462 logger.info("KEEP ALIVE message received: 0x%s",
1463 binascii.b2a_hex(self.msg_in))
1465 logger.warning("Unexpected message received: 0x%s",
1466 binascii.b2a_hex(self.msg_in))
1468 self.reading_header = True
1469 self.bytes_to_read = self.header_length
1470 # We should not act upon peer_hold_time if we are reading
1471 # something right now.
1474 def decode_path_attributes(self, path_attributes_hex):
1475 """Decode the Path Attributes field (rfc4271#section-4.3)
1478 :path_attributes: path_attributes field to be decoded in hex
1482 hex_to_decode = path_attributes_hex
1484 while len(hex_to_decode):
1485 attr_flags_hex = hex_to_decode[0]
1486 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1487 # attr_optional_bit = attr_flags & 128
1488 # attr_transitive_bit = attr_flags & 64
1489 # attr_partial_bit = attr_flags & 32
1490 attr_extended_length_bit = attr_flags & 16
1492 attr_type_code_hex = hex_to_decode[1]
1493 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1495 if attr_extended_length_bit:
1496 attr_length_hex = hex_to_decode[2:4]
1497 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1498 attr_value_hex = hex_to_decode[4:4 + attr_length]
1499 hex_to_decode = hex_to_decode[4 + attr_length:]
1501 attr_length_hex = hex_to_decode[2]
1502 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1503 attr_value_hex = hex_to_decode[3:3 + attr_length]
1504 hex_to_decode = hex_to_decode[3 + attr_length:]
1506 if attr_type_code == 1:
1507 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1508 binascii.b2a_hex(attr_flags_hex))
1509 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1510 elif attr_type_code == 2:
1511 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1512 binascii.b2a_hex(attr_flags_hex))
1513 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1514 elif attr_type_code == 3:
1515 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1516 binascii.b2a_hex(attr_flags_hex))
1517 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1518 elif attr_type_code == 4:
1519 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1520 binascii.b2a_hex(attr_flags_hex))
1521 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1522 elif attr_type_code == 5:
1523 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1524 binascii.b2a_hex(attr_flags_hex))
1525 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1526 elif attr_type_code == 6:
1527 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1528 binascii.b2a_hex(attr_flags_hex))
1529 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1530 elif attr_type_code == 7:
1531 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1532 binascii.b2a_hex(attr_flags_hex))
1533 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1534 elif attr_type_code == 9: # rfc4456#section-8
1535 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1536 binascii.b2a_hex(attr_flags_hex))
1537 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1538 elif attr_type_code == 10: # rfc4456#section-8
1539 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1540 binascii.b2a_hex(attr_flags_hex))
1541 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1542 elif attr_type_code == 14: # rfc4760#section-3
1543 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1544 binascii.b2a_hex(attr_flags_hex))
1545 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1546 address_family_identifier_hex = attr_value_hex[0:2]
1547 logger.debug(" Address Family Identifier=0x%s",
1548 binascii.b2a_hex(address_family_identifier_hex))
1549 subsequent_address_family_identifier_hex = attr_value_hex[2]
1550 logger.debug(" Subsequent Address Family Identifier=0x%s",
1551 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1552 next_hop_netaddr_len_hex = attr_value_hex[3]
1553 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1554 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1555 next_hop_netaddr_len,
1556 binascii.b2a_hex(next_hop_netaddr_len_hex))
1557 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1558 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1559 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1560 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1561 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1562 logger.debug(" Reserved=0x%s",
1563 binascii.b2a_hex(reserved_hex))
1564 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1565 logger.debug(" Network Layer Reachability Information=0x%s",
1566 binascii.b2a_hex(nlri_hex))
1567 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1568 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1569 for prefix in nlri_prefix_list:
1570 logger.debug(" nlri_prefix_received: %s", prefix)
1571 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1572 elif attr_type_code == 15: # rfc4760#section-4
1573 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1574 binascii.b2a_hex(attr_flags_hex))
1575 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1576 address_family_identifier_hex = attr_value_hex[0:2]
1577 logger.debug(" Address Family Identifier=0x%s",
1578 binascii.b2a_hex(address_family_identifier_hex))
1579 subsequent_address_family_identifier_hex = attr_value_hex[2]
1580 logger.debug(" Subsequent Address Family Identifier=0x%s",
1581 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1582 wd_hex = attr_value_hex[3:]
1583 logger.debug(" Withdrawn Routes=0x%s",
1584 binascii.b2a_hex(wd_hex))
1585 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1586 logger.debug(" Withdrawn routes prefix list: %s",
1588 for prefix in wdr_prefix_list:
1589 logger.debug(" withdrawn_prefix_received: %s", prefix)
1590 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1592 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1593 binascii.b2a_hex(attr_flags_hex))
1594 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1597 def decode_update_message(self, msg):
1598 """Decode an UPDATE message (rfc4271#section-4.3)
1601 :msg: message to be decoded in hex
1605 logger.debug("Decoding update message:")
1606 # message header - marker
1607 marker_hex = msg[:16]
1608 logger.debug("Message header marker: 0x%s",
1609 binascii.b2a_hex(marker_hex))
1610 # message header - message length
1611 msg_length_hex = msg[16:18]
1612 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1613 logger.debug("Message lenght: 0x%s (%s)",
1614 binascii.b2a_hex(msg_length_hex), msg_length)
1615 # message header - message type
1616 msg_type_hex = msg[18:19]
1617 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1619 with self.storage as stor:
1620 # this will replace the previously stored message
1621 stor['update'] = binascii.hexlify(msg)
1623 logger.debug("Evpn {}".format(self.evpn))
1625 logger.debug("Skipping update decoding due to evpn data expected")
1628 logger.debug("Mvpn {}".format(self.mvpn))
1630 logger.debug("Skipping update decoding due to mvpn data expected")
1633 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1634 if self.l3vpn_mcast:
1635 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1638 logger.debug("L3vpn-unicast {}".format(self.l3vpn))
1639 if self.l3vpn_mcast:
1640 logger.debug("Skipping update decoding due to l3vpn-unicast data expected")
1643 logger.debug("Route-Target-Constrain {}".format(self.rt_constrain))
1644 if self.rt_constrain:
1645 logger.debug("Skipping update decoding due to Route-Target-Constrain data expected")
1648 logger.debug("Allf {}".format(self.allf))
1650 logger.debug("Skipping update decoding")
1654 logger.debug("Message type: 0x%s (update)",
1655 binascii.b2a_hex(msg_type_hex))
1656 # withdrawn routes length
1657 wdr_length_hex = msg[19:21]
1658 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1659 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1660 binascii.b2a_hex(wdr_length_hex), wdr_length)
1662 wdr_hex = msg[21:21 + wdr_length]
1663 logger.debug("Withdrawn routes: 0x%s",
1664 binascii.b2a_hex(wdr_hex))
1665 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1666 logger.debug("Withdrawn routes prefix list: %s",
1668 for prefix in wdr_prefix_list:
1669 logger.debug("withdrawn_prefix_received: %s", prefix)
1670 # total path attribute length
1671 total_pa_length_offset = 21 + wdr_length
1672 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1673 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1674 logger.debug("Total path attribute lenght: 0x%s (%s)",
1675 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1677 pa_offset = total_pa_length_offset + 2
1678 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1679 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1680 self.decode_path_attributes(pa_hex)
1681 # network layer reachability information length
1682 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1683 logger.debug("Calculated NLRI length: %s", nlri_length)
1684 # network layer reachability information
1685 nlri_offset = pa_offset + total_pa_length
1686 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1687 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1688 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1689 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1690 for prefix in nlri_prefix_list:
1691 logger.debug("nlri_prefix_received: %s", prefix)
1693 self.updates_received += 1
1694 self.prefixes_introduced += len(nlri_prefix_list)
1695 self.prefixes_withdrawn += len(wdr_prefix_list)
1697 logger.error("Unexpeced message type 0x%s in 0x%s",
1698 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1700 def wait_for_read(self):
1701 """Read message until timeout (next expected event).
1704 Used when no more updates has to be sent to avoid busy-wait.
1705 Currently it does not return anything.
1707 # Compute time to the first predictable state change
1708 event_time = self.timer.get_next_event_time()
1709 # snapshot_time would be imprecise
1710 wait_timedelta = min(event_time - time.time(), self.wfr)
1711 if wait_timedelta < 0:
1712 # The program got around to waiting to an event in "very near
1713 # future" so late that it became a "past" event, thus tell
1714 # "select" to not wait at all. Passing negative timedelta to
1715 # select() would lead to either waiting forever (for -1) or
1716 # select.error("Invalid parameter") (for everything else).
1718 # And wait for event or something to read.
1720 if not self.rx_activity_detected or not (self.updates_received % 100):
1721 # right time to write statistics to the log (not for every update and
1722 # not too frequently to avoid having large log files)
1723 logger.info("total_received_update_message_counter: %s",
1724 self.updates_received)
1725 logger.info("total_received_nlri_prefix_counter: %s",
1726 self.prefixes_introduced)
1727 logger.info("total_received_withdrawn_prefix_counter: %s",
1728 self.prefixes_withdrawn)
1730 start_time = time.time()
1731 select.select([self.socket], [], [self.socket], wait_timedelta)
1732 timedelta = time.time() - start_time
1733 self.rx_idle_time += timedelta
1734 self.rx_activity_detected = timedelta < 1
1736 if not self.rx_activity_detected or not (self.updates_received % 100):
1737 # right time to write statistics to the log (not for every update and
1738 # not too frequently to avoid having large log files)
1739 logger.info("... idle for %.3fs", timedelta)
1740 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1744 class WriteTracker(object):
1745 """Class tracking enqueueing messages and sending chunks of them."""
1747 def __init__(self, bgp_socket, generator, timer):
1748 """The writter initialisation.
1751 bgp_socket: socket to be used for sending
1752 generator: generator to be used for message generation
1753 timer: timer to be used for scheduling
1755 # References to outside objects,
1756 self.socket = bgp_socket
1757 self.generator = generator
1759 # Really new fields.
1760 # TODO: Would attribute docstrings add anything substantial?
1761 self.sending_message = False
1762 self.bytes_to_send = 0
1765 def enqueue_message_for_sending(self, message):
1766 """Enqueue message and change state.
1769 message: message to be enqueued into the msg_out buffer
1771 self.msg_out += message
1772 self.bytes_to_send += len(message)
1773 self.sending_message = True
1775 def send_message_chunk_is_whole(self):
1776 """Send enqueued data from msg_out buffer
1779 :return: true if no remaining data to send
1781 # We assume there is a msg_out to send and socket is writable.
1782 # print "going to send", repr(self.msg_out)
1783 self.timer.snapshot()
1784 bytes_sent = self.socket.send(self.msg_out)
1785 # Forget the part of message that was sent.
1786 self.msg_out = self.msg_out[bytes_sent:]
1787 self.bytes_to_send -= bytes_sent
1788 if not self.bytes_to_send:
1789 # TODO: Is it possible to hit negative bytes_to_send?
1790 self.sending_message = False
1791 # We should have reset hold timer on peer side.
1792 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1793 # The possible reason for not prioritizing reads is gone.
1798 class StateTracker(object):
1799 """Main loop has state so complex it warrants this separate class."""
1801 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1802 """The state tracker initialisation.
1805 bgp_socket: socket to be used for sending / receiving
1806 generator: generator to be used for message generation
1807 timer: timer to be used for scheduling
1808 inqueue: user initiated messages queue
1809 storage: thread safe dict to store data for the rpc server
1810 cliargs: cli args from the user
1812 # References to outside objects.
1813 self.socket = bgp_socket
1814 self.generator = generator
1817 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, mvpn=cliargs.mvpn,
1818 l3vpn_mcast=cliargs.l3vpn_mcast, l3vpn=cliargs.l3vpn, allf=cliargs.allf,
1819 rt_constrain=cliargs.rt_constrain, wait_for_read=cliargs.wfr)
1820 self.writer = WriteTracker(bgp_socket, generator, timer)
1821 # Prioritization state.
1822 self.prioritize_writing = False
1823 # In general, we prioritize reading over writing. But in order
1824 # not to get blocked by neverending reads, we should
1825 # check whether we are not risking running out of holdtime.
1826 # So in some situations, this field is set to True to attempt
1827 # finishing sending a message, after which this field resets
1829 # TODO: Alternative is to switch fairly between reading and
1830 # writing (called round robin from now on).
1831 # Message counting is done in generator.
1832 self.inqueue = inqueue
1834 def perform_one_loop_iteration(self):
1835 """ The main loop iteration
1838 Calculates priority, resolves all conditions, calls
1839 appropriate method and returns to caller to repeat.
1841 self.timer.snapshot()
1842 if not self.prioritize_writing:
1843 if self.timer.is_time_for_my_keepalive():
1844 if not self.writer.sending_message:
1845 # We need to schedule a keepalive ASAP.
1846 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1847 logger.info("KEEP ALIVE is sent.")
1848 # We are sending a message now, so let's prioritize it.
1849 self.prioritize_writing = True
1852 msg = self.inqueue.get_nowait()
1853 logger.info("Received message: {}".format(msg))
1854 msgbin = binascii.unhexlify(msg)
1855 self.writer.enqueue_message_for_sending(msgbin)
1858 # Now we know what our priorities are, we have to check
1859 # which actions are available.
1860 # socket.socket() returns three lists,
1861 # we store them to list of lists.
1862 list_list = select.select([self.socket], [self.socket], [self.socket],
1863 self.timer.report_timedelta)
1864 read_list, write_list, except_list = list_list
1865 # Lists are unpacked, each is either [] or [self.socket],
1866 # so we will test them as boolean.
1868 logger.error("Exceptional state on the socket.")
1869 raise RuntimeError("Exceptional state on socket", self.socket)
1870 # We will do either read or write.
1871 if not (self.prioritize_writing and write_list):
1872 # Either we have no reason to rush writes,
1873 # or the socket is not writable.
1874 # We are focusing on reading here.
1875 if read_list: # there is something to read indeed
1876 # In this case we want to read chunk of message
1877 # and repeat the select,
1878 self.reader.read_message_chunk()
1880 # We were focusing on reading, but nothing to read was there.
1881 # Good time to check peer for hold timer.
1882 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1883 # Quiet on the read front, we can have attempt to write.
1885 # Either we really want to reset peer's view of our hold
1886 # timer, or there was nothing to read.
1887 # Were we in the middle of sending a message?
1888 if self.writer.sending_message:
1889 # Was it the end of a message?
1890 whole = self.writer.send_message_chunk_is_whole()
1891 # We were pressed to send something and we did it.
1892 if self.prioritize_writing and whole:
1893 # We prioritize reading again.
1894 self.prioritize_writing = False
1896 # Finally to check if still update messages to be generated.
1897 if self.generator.remaining_prefixes:
1898 msg_out = self.generator.compose_update_message()
1899 if not self.generator.remaining_prefixes:
1900 # We have just finished update generation,
1901 # end-of-rib is due.
1902 logger.info("All update messages generated.")
1903 logger.info("Storing performance results.")
1904 self.generator.store_results()
1905 logger.info("Finally an END-OF-RIB is sent.")
1906 msg_out += self.generator.update_message(wr_prefixes=[],
1909 self.writer.enqueue_message_for_sending(msg_out)
1910 # Attempt for real sending to be done in next iteration.
1912 # Nothing to write anymore.
1913 # To avoid busy loop, we do idle waiting here.
1914 self.reader.wait_for_read()
1916 # We can neither read nor write.
1917 logger.warning("Input and output both blocked for " +
1918 str(self.timer.report_timedelta) + " seconds.")
1919 # FIXME: Are we sure select has been really waiting
1924 def create_logger(loglevel, logfile):
1925 """Create logger object
1928 :loglevel: log level
1929 :logfile: log file name
1931 :return: logger object
1933 logger = logging.getLogger("logger")
1934 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1935 console_handler = logging.StreamHandler()
1936 file_handler = logging.FileHandler(logfile, mode="w")
1937 console_handler.setFormatter(log_formatter)
1938 file_handler.setFormatter(log_formatter)
1939 logger.addHandler(console_handler)
1940 logger.addHandler(file_handler)
1941 logger.setLevel(loglevel)
1945 def job(arguments, inqueue, storage):
1946 """One time initialisation and iterations looping.
1948 Establish BGP connection and run iterations.
1951 :arguments: Command line arguments
1952 :inqueue: Data to be sent from play.py
1953 :storage: Shared dict for rpc server
1957 bgp_socket = establish_connection(arguments)
1958 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1959 # Receive open message before sending anything.
1960 # FIXME: Add parameter to send default open message first,
1961 # to work with "you first" peers.
1962 msg_in = read_open_message(bgp_socket)
1963 timer = TimeTracker(msg_in)
1964 generator = MessageGenerator(arguments)
1965 msg_out = generator.open_message()
1966 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1967 # Send our open message to the peer.
1968 bgp_socket.send(msg_out)
1969 # Wait for confirming keepalive.
1970 # TODO: Surely in just one packet?
1971 # Using exact keepalive length to not to see possible updates.
1972 msg_in = bgp_socket.recv(19)
1973 if msg_in != generator.keepalive_message():
1974 error_msg = "Open not confirmed by keepalive, instead got"
1975 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1976 raise MessageError(error_msg, msg_in)
1977 timer.reset_peer_hold_time()
1978 # Send the keepalive to indicate the connection is accepted.
1979 timer.snapshot() # Remember this time.
1980 msg_out = generator.keepalive_message()
1981 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1982 bgp_socket.send(msg_out)
1983 # Use the remembered time.
1984 timer.reset_my_keepalive_time(timer.snapshot_time)
1985 # End of initial handshake phase.
1986 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1987 while True: # main reactor loop
1988 state.perform_one_loop_iteration()
1992 '''Handler for SimpleXMLRPCServer'''
1994 def __init__(self, sendqueue, storage):
1998 :sendqueue: queue for data to be sent towards odl
1999 :storage: thread safe dict
2001 self.queue = sendqueue
2002 self.storage = storage
2004 def send(self, text):
2008 :text: hes string of the data to be sent
2010 self.queue.put(text)
2012 def get(self, text=''):
2013 '''Reads data form the storage
2015 - returns stored data or an empty string, at the moment only
2019 :text: a key to the storage to get the data
2023 with self.storage as stor:
2024 return stor.get(text, '')
2026 def clean(self, text=''):
2027 '''Cleans data form the storage
2030 :text: a key to the storage to clean the data
2032 with self.storage as stor:
2037 def threaded_job(arguments):
2038 """Run the job threaded
2041 :arguments: Command line arguments
2045 amount_left = arguments.amount
2046 utils_left = arguments.multiplicity
2047 prefix_current = arguments.firstprefix
2048 myip_current = arguments.myip
2050 rpcqueue = Queue.Queue()
2051 storage = SafeDict()
2054 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
2055 amount_left -= amount_per_util
2058 args = deepcopy(arguments)
2059 args.amount = amount_per_util
2060 args.firstprefix = prefix_current
2061 args.myip = myip_current
2062 thread_args.append(args)
2066 prefix_current += amount_per_util * 16
2071 for t in thread_args:
2072 thread.start_new_thread(job, (t, rpcqueue, storage))
2074 print "Error: unable to start thread."
2077 rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
2078 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2079 rpcserver.serve_forever()
2082 if __name__ == "__main__":
2083 arguments = parse_arguments()
2084 logger = create_logger(arguments.loglevel, arguments.logfile)
2085 threaded_job(arguments)