1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Maximum number of IP prefixes to be announced in one iteration"
69 parser.add_argument("--insert", default="1", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72 str_help = "The number of prefixes to process without withdrawals"
73 parser.add_argument("--prefill", default="0", type=int, help=str_help)
74 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75 parser.add_argument("--updates", choices=["single", "separate"],
76 default=["separate"], help=str_help)
77 str_help = "Base prefix IP address for prefix generation"
78 parser.add_argument("--firstprefix", default="8.0.1.0",
79 type=ipaddr.IPv4Address, help=str_help)
80 str_help = "The prefix length."
81 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82 str_help = "Listen for connection, instead of initiating it."
83 parser.add_argument("--listen", action="store_true", help=str_help)
84 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85 "Default value only suitable for listening.")
86 parser.add_argument("--myip", default="0.0.0.0",
87 type=ipaddr.IPv4Address, help=str_help)
88 str_help = ("TCP port to bind to when listening or initiating connection." +
89 "Default only suitable for initiating.")
90 parser.add_argument("--myport", default="0", type=int, help=str_help)
91 str_help = "The IP of the next hop to be placed into the update messages."
92 parser.add_argument("--nexthop", default="192.0.2.1",
93 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94 str_help = "Identifier of the route originator."
95 parser.add_argument("--originator", default=None,
96 type=ipaddr.IPv4Address, dest="originator", help=str_help)
97 str_help = "Cluster list item identifier."
98 parser.add_argument("--cluster", default=None,
99 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100 str_help = ("Numeric IP Address to try to connect to." +
101 "Currently no effect in listening mode.")
102 parser.add_argument("--peerip", default="127.0.0.2",
103 type=ipaddr.IPv4Address, help=str_help)
104 str_help = "TCP port to try to connect to. No effect in listening mode."
105 parser.add_argument("--peerport", default="179", type=int, help=str_help)
106 str_help = "Local hold time."
107 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108 str_help = "Log level (--error, --warning, --info, --debug)"
109 parser.add_argument("--error", dest="loglevel", action="store_const",
110 const=logging.ERROR, default=logging.INFO,
112 parser.add_argument("--warning", dest="loglevel", action="store_const",
113 const=logging.WARNING, default=logging.INFO,
115 parser.add_argument("--info", dest="loglevel", action="store_const",
116 const=logging.INFO, default=logging.INFO,
118 parser.add_argument("--debug", dest="loglevel", action="store_const",
119 const=logging.DEBUG, default=logging.INFO,
121 str_help = "Log file name"
122 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123 str_help = "Trailing part of the csv result files for plotting purposes"
124 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125 str_help = "Minimum number of updates to reach to include result into csv."
126 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129 str_help = "Link-State NLRI supported"
130 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131 str_help = "Link-State NLRI: Identifier"
132 parser.add_argument("-lsid", default="1", type=int, help=str_help)
133 str_help = "Link-State NLRI: Tunnel ID"
134 parser.add_argument("-lstid", default="1", type=int, help=str_help)
135 str_help = "Link-State NLRI: LSP ID"
136 parser.add_argument("-lspid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138 parser.add_argument("--lstsaddr", default="1.2.3.4",
139 type=ipaddr.IPv4Address, help=str_help)
140 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141 parser.add_argument("--lsteaddr", default="5.6.7.8",
142 type=ipaddr.IPv4Address, help=str_help)
143 str_help = "Link-State NLRI: Identifier Step"
144 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145 str_help = "Link-State NLRI: Tunnel ID Step"
146 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147 str_help = "Link-State NLRI: LSP ID Step"
148 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153 str_help = "How many play utilities are to be started."
154 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
160 Enabling this flag makes the script not decoding the update mesage, because of not\
161 supported decoding for these elements."
162 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
163 str_help = "Open message includes L3VPN-MULTICAST arguments.\
164 Enabling this flag makes the script not decoding the update mesage, because of not\
165 supported decoding for these elements."
166 parser.add_argument("--l3vpn_mcast", default=False, action="store_true", help=str_help)
167 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
168 str_help = "Skipping well known attributes for update message"
169 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
170 arguments = parser.parse_args()
171 if arguments.multiplicity < 1:
172 print "Multiplicity", arguments.multiplicity, "is not positive."
174 # TODO: Are sanity checks (such as asnumber>=0) required?
178 def establish_connection(arguments):
179 """Establish connection to BGP peer.
182 :arguments: following command-line argumets are used
183 - arguments.myip: local IP address
184 - arguments.myport: local port
185 - arguments.peerip: remote IP address
186 - arguments.peerport: remote port
191 logger.info("Connecting in the listening mode.")
192 logger.debug("Local IP address: " + str(arguments.myip))
193 logger.debug("Local port: " + str(arguments.myport))
194 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
195 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
196 # bind need single tuple as argument
197 listening_socket.bind((str(arguments.myip), arguments.myport))
198 listening_socket.listen(1)
199 bgp_socket, _ = listening_socket.accept()
200 # TODO: Verify client IP is cotroller IP.
201 listening_socket.close()
203 logger.info("Connecting in the talking mode.")
204 logger.debug("Local IP address: " + str(arguments.myip))
205 logger.debug("Local port: " + str(arguments.myport))
206 logger.debug("Remote IP address: " + str(arguments.peerip))
207 logger.debug("Remote port: " + str(arguments.peerport))
208 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
209 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
210 # bind to force specified address and port
211 talking_socket.bind((str(arguments.myip), arguments.myport))
212 # socket does not spead ipaddr, hence str()
213 talking_socket.connect((str(arguments.peerip), arguments.peerport))
214 bgp_socket = talking_socket
215 logger.info("Connected to ODL.")
219 def get_short_int_from_message(message, offset=16):
220 """Extract 2-bytes number from provided message.
223 :message: given message
224 :offset: offset of the short_int inside the message
226 :return: required short_inf value.
228 default offset value is the BGP message size offset.
230 high_byte_int = ord(message[offset])
231 low_byte_int = ord(message[offset + 1])
232 short_int = high_byte_int * 256 + low_byte_int
236 def get_prefix_list_from_hex(prefixes_hex):
237 """Get decoded list of prefixes (rfc4271#section-4.3)
240 :prefixes_hex: list of prefixes to be decoded in hex
242 :return: list of prefixes in the form of ip address (X.X.X.X/X)
246 while offset < len(prefixes_hex):
247 prefix_bit_len_hex = prefixes_hex[offset]
248 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
249 prefix_len = ((prefix_bit_len - 1) / 8) + 1
250 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
251 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
252 offset += 1 + prefix_len
253 prefix_list.append(prefix + "/" + str(prefix_bit_len))
257 class MessageError(ValueError):
258 """Value error with logging optimized for hexlified messages."""
260 def __init__(self, text, message, *args):
263 Store and call super init for textual comment,
264 store raw message which caused it.
268 super(MessageError, self).__init__(text, message, *args)
271 """Generate human readable error message.
274 :return: human readable message as string
276 Use a placeholder string if the message is to be empty.
278 message = binascii.hexlify(self.msg)
280 message = "(empty message)"
281 return self.text + ": " + message
284 def read_open_message(bgp_socket):
285 """Receive peer's OPEN message
288 :bgp_socket: the socket to be read
290 :return: received OPEN message.
292 Performs just basic incomming message checks
294 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
295 # TODO: Can the incoming open message be split in more than one packet?
298 # 37 is minimal length of open message with 4-byte AS number.
300 "Message length (" + str(len(msg_in)) + ") is smaller than "
301 "minimal length of OPEN message with 4-byte AS number (37)"
303 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
304 raise MessageError(error_msg, msg_in)
305 # TODO: We could check BGP marker, but it is defined only later;
307 reported_length = get_short_int_from_message(msg_in)
308 if len(msg_in) != reported_length:
310 "Expected message length (" + reported_length +
311 ") does not match actual length (" + str(len(msg_in)) + ")"
313 logger.error(error_msg + binascii.hexlify(msg_in))
314 raise MessageError(error_msg, msg_in)
315 logger.info("Open message received.")
319 class MessageGenerator(object):
320 """Class which generates messages, holds states and configuration values."""
322 # TODO: Define bgp marker as a class (constant) variable.
323 def __init__(self, args):
324 """Initialisation according to command-line args.
327 :args: argsparser's Namespace object which contains command-line
328 options for MesageGenerator initialisation
330 Calculates and stores default values used later on for
333 self.total_prefix_amount = args.amount
334 # Number of update messages left to be sent.
335 self.remaining_prefixes = self.total_prefix_amount
337 # New parameters initialisation
339 self.prefix_base_default = args.firstprefix
340 self.prefix_length_default = args.prefixlen
341 self.wr_prefixes_default = []
342 self.nlri_prefixes_default = []
343 self.version_default = 4
344 self.my_autonomous_system_default = args.asnumber
345 self.hold_time_default = args.holdtime # Local hold time.
346 self.bgp_identifier_default = int(args.myip)
347 self.next_hop_default = args.nexthop
348 self.originator_id_default = args.originator
349 self.cluster_list_item_default = args.cluster
350 self.single_update_default = args.updates == "single"
351 self.randomize_updates_default = args.updates == "random"
352 self.prefix_count_to_add_default = args.insert
353 self.prefix_count_to_del_default = args.withdraw
354 if self.prefix_count_to_del_default < 0:
355 self.prefix_count_to_del_default = 0
356 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
357 # total number of prefixes must grow to avoid infinite test loop
358 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
359 self.slot_size_default = self.prefix_count_to_add_default
360 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
361 self.results_file_name_default = args.results
362 self.performance_threshold_default = args.threshold
363 self.rfc4760 = args.rfc4760
364 self.bgpls = args.bgpls
365 self.evpn = args.evpn
366 self.mvpn = args.mvpn
367 self.l3vpn_mcast = args.l3vpn_mcast
368 self.skipattr = args.skipattr
369 # Default values when BGP-LS Attributes are used
371 self.prefix_count_to_add_default = 1
372 self.prefix_count_to_del_default = 0
373 self.ls_nlri_default = {"Identifier": args.lsid,
374 "TunnelID": args.lstid,
376 "IPv4TunnelSenderAddress": args.lstsaddr,
377 "IPv4TunnelEndPointAddress": args.lsteaddr}
378 self.lsid_step = args.lsidstep
379 self.lstid_step = args.lstidstep
380 self.lspid_step = args.lspidstep
381 self.lstsaddr_step = args.lstsaddrstep
382 self.lsteaddr_step = args.lsteaddrstep
383 # Default values used for randomized part
384 s1_slots = ((self.total_prefix_amount -
385 self.remaining_prefixes_threshold - 1) /
386 self.prefix_count_to_add_default + 1)
387 s2_slots = ((self.remaining_prefixes_threshold - 1) /
388 (self.prefix_count_to_add_default -
389 self.prefix_count_to_del_default) + 1)
391 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
392 s2_first_index = s1_slots * self.prefix_count_to_add_default
393 s2_last_index = (s2_first_index +
394 s2_slots * (self.prefix_count_to_add_default -
395 self.prefix_count_to_del_default) - 1)
396 self.slot_gap_default = ((self.total_prefix_amount -
397 self.remaining_prefixes_threshold - 1) /
398 self.prefix_count_to_add_default + 1)
399 self.randomize_lowest_default = s2_first_index
400 self.randomize_highest_default = s2_last_index
401 # Initialising counters
402 self.phase1_start_time = 0
403 self.phase1_stop_time = 0
404 self.phase2_start_time = 0
405 self.phase2_stop_time = 0
406 self.phase1_updates_sent = 0
407 self.phase2_updates_sent = 0
408 self.updates_sent = 0
410 self.log_info = args.loglevel <= logging.INFO
411 self.log_debug = args.loglevel <= logging.DEBUG
413 Flags needed for the MessageGenerator performance optimization.
414 Calling logger methods each iteration even with proper log level set
415 slows down significantly the MessageGenerator performance.
416 Measured total generation time (1M updates, dry run, error log level):
417 - logging based on basic logger features: 36,2s
418 - logging based on advanced logger features (lazy logging): 21,2s
419 - conditional calling of logger methods enclosed inside condition: 8,6s
422 logger.info("Generator initialisation")
423 logger.info(" Target total number of prefixes to be introduced: " +
424 str(self.total_prefix_amount))
425 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
426 str(self.prefix_length_default))
427 logger.info(" My Autonomous System number: " +
428 str(self.my_autonomous_system_default))
429 logger.info(" My Hold Time: " + str(self.hold_time_default))
430 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
431 logger.info(" Next Hop: " + str(self.next_hop_default))
432 logger.info(" Originator ID: " + str(self.originator_id_default))
433 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
434 logger.info(" Prefix count to be inserted at once: " +
435 str(self.prefix_count_to_add_default))
436 logger.info(" Prefix count to be withdrawn at once: " +
437 str(self.prefix_count_to_del_default))
438 logger.info(" Fast pre-fill up to " +
439 str(self.total_prefix_amount -
440 self.remaining_prefixes_threshold) + " prefixes")
441 logger.info(" Remaining number of prefixes to be processed " +
442 "in parallel with withdrawals: " +
443 str(self.remaining_prefixes_threshold))
444 logger.debug(" Prefix index range used after pre-fill procedure [" +
445 str(self.randomize_lowest_default) + ", " +
446 str(self.randomize_highest_default) + "]")
447 if self.single_update_default:
448 logger.info(" Common single UPDATE will be generated " +
449 "for both NLRI & WITHDRAWN lists")
451 logger.info(" Two separate UPDATEs will be generated " +
452 "for each NLRI & WITHDRAWN lists")
453 if self.randomize_updates_default:
454 logger.info(" Generation of UPDATE messages will be randomized")
455 logger.info(" Let\'s go ...\n")
457 # TODO: Notification for hold timer expiration can be handy.
459 def store_results(self, file_name=None, threshold=None):
460 """ Stores specified results into files based on file_name value.
463 :param file_name: Trailing (common) part of result file names
464 :param threshold: Minimum number of sent updates needed for each
465 result to be included into result csv file
466 (mainly needed because of the result accuracy)
470 # default values handling
471 # TODO optimize default values handling (use e.g. dicionary.update() approach)
472 if file_name is None:
473 file_name = self.results_file_name_default
474 if threshold is None:
475 threshold = self.performance_threshold_default
476 # performance calculation
477 if self.phase1_updates_sent >= threshold:
478 totals1 = self.phase1_updates_sent
479 performance1 = int(self.phase1_updates_sent /
480 (self.phase1_stop_time - self.phase1_start_time))
484 if self.phase2_updates_sent >= threshold:
485 totals2 = self.phase2_updates_sent
486 performance2 = int(self.phase2_updates_sent /
487 (self.phase2_stop_time - self.phase2_start_time))
492 logger.info("#" * 10 + " Final results " + "#" * 10)
493 logger.info("Number of iterations: " + str(self.iteration))
494 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
495 str(self.phase1_updates_sent))
496 logger.info("The pre-fill phase duration: " +
497 str(self.phase1_stop_time - self.phase1_start_time) + "s")
498 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
499 str(self.phase2_updates_sent))
500 logger.info("The 2nd test phase duration: " +
501 str(self.phase2_stop_time - self.phase2_start_time) + "s")
502 logger.info("Threshold for performance reporting: " + str(threshold))
505 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
506 " route(s) per UPDATE")
507 if self.single_update_default:
508 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
509 "/-" + str(self.prefix_count_to_del_default) +
510 " routes per UPDATE")
512 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
513 "/-" + str(self.prefix_count_to_del_default) +
514 " routes in two UPDATEs")
515 # collecting capacity and performance results
518 if totals1 is not None:
519 totals[phase1_label] = totals1
520 performance[phase1_label] = performance1
521 if totals2 is not None:
522 totals[phase2_label] = totals2
523 performance[phase2_label] = performance2
524 self.write_results_to_file(totals, "totals-" + file_name)
525 self.write_results_to_file(performance, "performance-" + file_name)
527 def write_results_to_file(self, results, file_name):
528 """Writes results to the csv plot file consumable by Jenkins.
531 :param file_name: Name of the (csv) file to be created
537 f = open(file_name, "wt")
539 for key in sorted(results):
540 first_line += key + ", "
541 second_line += str(results[key]) + ", "
542 first_line = first_line[:-2]
543 second_line = second_line[:-2]
544 f.write(first_line + "\n")
545 f.write(second_line + "\n")
546 logger.info("Message generator performance results stored in " +
548 logger.info(" " + first_line)
549 logger.info(" " + second_line)
553 # Return pseudo-randomized (reproducible) index for selected range
554 def randomize_index(self, index, lowest=None, highest=None):
555 """Calculates pseudo-randomized index from selected range.
558 :param index: input index
559 :param lowest: the lowes index from the randomized area
560 :param highest: the highest index from the randomized area
562 :return: the (pseudo)randomized index
564 Created just as a fame for future generator enhancement.
566 # default values handling
567 # TODO optimize default values handling (use e.g. dicionary.update() approach)
569 lowest = self.randomize_lowest_default
571 highest = self.randomize_highest_default
573 if (index >= lowest) and (index <= highest):
574 # we are in the randomized range -> shuffle it inside
575 # the range (now just reverse the order)
576 new_index = highest - (index - lowest)
578 # we are out of the randomized range -> nothing to do
582 def get_ls_nlri_values(self, index):
583 """Generates LS-NLRI parameters.
584 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
587 :param index: index (iteration)
589 :return: dictionary of LS NLRI parameters and values
591 # generating list of LS NLRI parameters
592 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
593 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
594 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
595 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
596 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
597 ls_nlri_values = {"Identifier": identifier,
598 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
599 "TunnelID": tunnel_id, "LSPID": lsp_id,
600 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
601 return ls_nlri_values
603 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
604 prefix_len=None, prefix_count=None, randomize=None):
605 """Generates list of IP address prefixes.
608 :param slot_index: index of group of prefix addresses
609 :param slot_size: size of group of prefix addresses
610 in [number of included prefixes]
611 :param prefix_base: IP address of the first prefix
612 (slot_index = 0, prefix_index = 0)
613 :param prefix_len: length of the prefix in bites
614 (the same as size of netmask)
615 :param prefix_count: number of prefixes to be returned
616 from the specified slot
618 :return: list of generated IP address prefixes
620 # default values handling
621 # TODO optimize default values handling (use e.g. dicionary.update() approach)
622 if slot_size is None:
623 slot_size = self.slot_size_default
624 if prefix_base is None:
625 prefix_base = self.prefix_base_default
626 if prefix_len is None:
627 prefix_len = self.prefix_length_default
628 if prefix_count is None:
629 prefix_count = slot_size
630 if randomize is None:
631 randomize = self.randomize_updates_default
632 # generating list of prefixes
635 prefix_gap = 2 ** (32 - prefix_len)
636 for i in range(prefix_count):
637 prefix_index = slot_index * slot_size + i
639 prefix_index = self.randomize_index(prefix_index)
640 indexes.append(prefix_index)
641 prefixes.append(prefix_base + prefix_index * prefix_gap)
643 logger.debug(" Prefix slot index: " + str(slot_index))
644 logger.debug(" Prefix slot size: " + str(slot_size))
645 logger.debug(" Prefix count: " + str(prefix_count))
646 logger.debug(" Prefix indexes: " + str(indexes))
647 logger.debug(" Prefix list: " + str(prefixes))
650 def compose_update_message(self, prefix_count_to_add=None,
651 prefix_count_to_del=None):
652 """Composes an UPDATE message
655 :param prefix_count_to_add: # of prefixes to put into NLRI list
656 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
658 :return: encoded UPDATE message in HEX
660 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
661 lists or common message wich includes both prefix lists.
662 Updates global counters.
664 # default values handling
665 # TODO optimize default values handling (use e.g. dicionary.update() approach)
666 if prefix_count_to_add is None:
667 prefix_count_to_add = self.prefix_count_to_add_default
668 if prefix_count_to_del is None:
669 prefix_count_to_del = self.prefix_count_to_del_default
671 if self.log_info and not (self.iteration % 1000):
672 logger.info("Iteration: " + str(self.iteration) +
673 " - total remaining prefixes: " +
674 str(self.remaining_prefixes))
676 logger.debug("#" * 10 + " Iteration: " +
677 str(self.iteration) + " " + "#" * 10)
678 logger.debug("Remaining prefixes: " +
679 str(self.remaining_prefixes))
680 # scenario type & one-shot counter
681 straightforward_scenario = (self.remaining_prefixes >
682 self.remaining_prefixes_threshold)
683 if straightforward_scenario:
684 prefix_count_to_del = 0
686 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
687 if not self.phase1_start_time:
688 self.phase1_start_time = time.time()
691 logger.debug("--- COMBINED SCENARIO ---")
692 if not self.phase2_start_time:
693 self.phase2_start_time = time.time()
694 # tailor the number of prefixes if needed
695 prefix_count_to_add = (prefix_count_to_del +
696 min(prefix_count_to_add - prefix_count_to_del,
697 self.remaining_prefixes))
698 # prefix slots selection for insertion and withdrawal
699 slot_index_to_add = self.iteration
700 slot_index_to_del = slot_index_to_add - self.slot_gap_default
701 # getting lists of prefixes for insertion in this iteration
703 logger.debug("Prefixes to be inserted in this iteration:")
704 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
705 prefix_count=prefix_count_to_add)
706 # getting lists of prefixes for withdrawal in this iteration
708 logger.debug("Prefixes to be withdrawn in this iteration:")
709 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
710 prefix_count=prefix_count_to_del)
711 # generating the UPDATE mesage with LS-NLRI only
713 ls_nlri = self.get_ls_nlri_values(self.iteration)
714 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
717 # generating the UPDATE message with prefix lists
718 if self.single_update_default:
719 # Send prefixes to be introduced and withdrawn
720 # in one UPDATE message
721 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
722 nlri_prefixes=prefix_list_to_add)
724 # Send prefixes to be introduced and withdrawn
725 # in separate UPDATE messages (if needed)
726 msg_out = self.update_message(wr_prefixes=[],
727 nlri_prefixes=prefix_list_to_add)
728 if prefix_count_to_del:
729 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
731 # updating counters - who knows ... maybe I am last time here ;)
732 if straightforward_scenario:
733 self.phase1_stop_time = time.time()
734 self.phase1_updates_sent = self.updates_sent
736 self.phase2_stop_time = time.time()
737 self.phase2_updates_sent = (self.updates_sent -
738 self.phase1_updates_sent)
739 # updating totals for the next iteration
741 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
742 # returning the encoded message
745 # Section of message encoders
747 def open_message(self, version=None, my_autonomous_system=None,
748 hold_time=None, bgp_identifier=None):
749 """Generates an OPEN Message (rfc4271#section-4.2)
752 :param version: see the rfc4271#section-4.2
753 :param my_autonomous_system: see the rfc4271#section-4.2
754 :param hold_time: see the rfc4271#section-4.2
755 :param bgp_identifier: see the rfc4271#section-4.2
757 :return: encoded OPEN message in HEX
760 # default values handling
761 # TODO optimize default values handling (use e.g. dicionary.update() approach)
763 version = self.version_default
764 if my_autonomous_system is None:
765 my_autonomous_system = self.my_autonomous_system_default
766 if hold_time is None:
767 hold_time = self.hold_time_default
768 if bgp_identifier is None:
769 bgp_identifier = self.bgp_identifier_default
772 marker_hex = "\xFF" * 16
776 type_hex = struct.pack("B", type)
779 version_hex = struct.pack("B", version)
781 # my_autonomous_system
782 # AS_TRANS value, 23456 decadic.
783 my_autonomous_system_2_bytes = 23456
784 # AS number is mappable to 2 bytes
785 if my_autonomous_system < 65536:
786 my_autonomous_system_2_bytes = my_autonomous_system
787 my_autonomous_system_hex_2_bytes = struct.pack(">H",
788 my_autonomous_system)
791 hold_time_hex = struct.pack(">H", hold_time)
794 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
796 # Optional Parameters
797 optional_parameters_hex = ""
799 optional_parameter_hex = (
800 "\x02" # Param type ("Capability Ad")
801 "\x06" # Length (6 bytes)
802 "\x01" # Capability type (NLRI Unicast),
803 # see RFC 4760, secton 8
804 "\x04" # Capability value length
805 "\x00\x01" # AFI (Ipv4)
807 "\x01" # SAFI (Unicast)
809 optional_parameters_hex += optional_parameter_hex
812 optional_parameter_hex = (
813 "\x02" # Param type ("Capability Ad")
814 "\x06" # Length (6 bytes)
815 "\x01" # Capability type (NLRI Unicast),
816 # see RFC 4760, secton 8
817 "\x04" # Capability value length
818 "\x40\x04" # AFI (BGP-LS)
820 "\x47" # SAFI (BGP-LS)
822 optional_parameters_hex += optional_parameter_hex
825 optional_parameter_hex = (
826 "\x02" # Param type ("Capability Ad")
827 "\x06" # Length (6 bytes)
828 "\x01" # Multiprotocol extetension capability,
829 "\x04" # Capability value length
830 "\x00\x19" # AFI (L2-VPN)
834 optional_parameters_hex += optional_parameter_hex
837 optional_parameter_hex = (
838 "\x02" # Param type ("Capability Ad")
839 "\x06" # Length (6 bytes)
840 "\x01" # Multiprotocol extetension capability,
841 "\x04" # Capability value length
842 "\x00\x01" # AFI (IPV4)
844 "\x05" # SAFI (MCAST-VPN)
846 optional_parameters_hex += optional_parameter_hex
847 optional_parameter_hex = (
848 "\x02" # Param type ("Capability Ad")
849 "\x06" # Length (6 bytes)
850 "\x01" # Multiprotocol extetension capability,
851 "\x04" # Capability value length
852 "\x00\x02" # AFI (IPV6)
854 "\x05" # SAFI (MCAST-VPN)
856 optional_parameters_hex += optional_parameter_hex
859 optional_parameter_hex = (
860 "\x02" # Param type ("Capability Ad")
861 "\x06" # Length (6 bytes)
862 "\x01" # Multiprotocol extetension capability,
863 "\x04" # Capability value length
864 "\x00\x01" # AFI (IPV4)
866 "\x81" # SAFI (L3VPN-MCAST)
868 optional_parameters_hex += optional_parameter_hex
869 optional_parameter_hex = (
870 "\x02" # Param type ("Capability Ad")
871 "\x06" # Length (6 bytes)
872 "\x01" # Multiprotocol extetension capability,
873 "\x04" # Capability value length
874 "\x00\x02" # AFI (IPV6)
876 "\x81" # SAFI (L3VPN-MCAST)
878 optional_parameters_hex += optional_parameter_hex
880 optional_parameter_hex = (
881 "\x02" # Param type ("Capability Ad")
882 "\x06" # Length (6 bytes)
883 "\x41" # "32 bit AS Numbers Support"
884 # (see RFC 6793, section 3)
885 "\x04" # Capability value length
887 optional_parameter_hex += (
888 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
890 optional_parameters_hex += optional_parameter_hex
892 # Optional Parameters Length
893 optional_parameters_length = len(optional_parameters_hex)
894 optional_parameters_length_hex = struct.pack("B",
895 optional_parameters_length)
897 # Length (big-endian)
899 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
900 len(my_autonomous_system_hex_2_bytes) +
901 len(hold_time_hex) + len(bgp_identifier_hex) +
902 len(optional_parameters_length_hex) +
903 len(optional_parameters_hex)
905 length_hex = struct.pack(">H", length)
913 my_autonomous_system_hex_2_bytes +
916 optional_parameters_length_hex +
917 optional_parameters_hex
921 logger.debug("OPEN message encoding")
922 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
923 logger.debug(" Length=" + str(length) + " (0x" +
924 binascii.hexlify(length_hex) + ")")
925 logger.debug(" Type=" + str(type) + " (0x" +
926 binascii.hexlify(type_hex) + ")")
927 logger.debug(" Version=" + str(version) + " (0x" +
928 binascii.hexlify(version_hex) + ")")
929 logger.debug(" My Autonomous System=" +
930 str(my_autonomous_system_2_bytes) + " (0x" +
931 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
933 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
934 binascii.hexlify(hold_time_hex) + ")")
935 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
936 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
937 logger.debug(" Optional Parameters Length=" +
938 str(optional_parameters_length) + " (0x" +
939 binascii.hexlify(optional_parameters_length_hex) +
941 logger.debug(" Optional Parameters=0x" +
942 binascii.hexlify(optional_parameters_hex))
943 logger.debug("OPEN message encoded: 0x%s",
944 binascii.b2a_hex(message_hex))
948 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
949 wr_prefix_length=None, nlri_prefix_length=None,
950 my_autonomous_system=None, next_hop=None,
951 originator_id=None, cluster_list_item=None,
952 end_of_rib=False, **ls_nlri_params):
953 """Generates an UPDATE Message (rfc4271#section-4.3)
956 :param wr_prefixes: see the rfc4271#section-4.3
957 :param nlri_prefixes: see the rfc4271#section-4.3
958 :param wr_prefix_length: see the rfc4271#section-4.3
959 :param nlri_prefix_length: see the rfc4271#section-4.3
960 :param my_autonomous_system: see the rfc4271#section-4.3
961 :param next_hop: see the rfc4271#section-4.3
963 :return: encoded UPDATE message in HEX
966 # default values handling
967 # TODO optimize default values handling (use e.g. dicionary.update() approach)
968 if wr_prefixes is None:
969 wr_prefixes = self.wr_prefixes_default
970 if nlri_prefixes is None:
971 nlri_prefixes = self.nlri_prefixes_default
972 if wr_prefix_length is None:
973 wr_prefix_length = self.prefix_length_default
974 if nlri_prefix_length is None:
975 nlri_prefix_length = self.prefix_length_default
976 if my_autonomous_system is None:
977 my_autonomous_system = self.my_autonomous_system_default
979 next_hop = self.next_hop_default
980 if originator_id is None:
981 originator_id = self.originator_id_default
982 if cluster_list_item is None:
983 cluster_list_item = self.cluster_list_item_default
984 ls_nlri = self.ls_nlri_default.copy()
985 ls_nlri.update(ls_nlri_params)
988 marker_hex = "\xFF" * 16
992 type_hex = struct.pack("B", type)
995 withdrawn_routes_hex = ""
997 bytes = ((wr_prefix_length - 1) / 8) + 1
998 for prefix in wr_prefixes:
999 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
1000 struct.pack(">I", int(prefix))[:bytes])
1001 withdrawn_routes_hex += withdrawn_route_hex
1003 # Withdrawn Routes Length
1004 withdrawn_routes_length = len(withdrawn_routes_hex)
1005 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
1007 # TODO: to replace hardcoded string by encoding?
1009 path_attributes_hex = ""
1010 if not self.skipattr:
1011 path_attributes_hex += (
1012 "\x40" # Flags ("Well-Known")
1013 "\x01" # Type (ORIGIN)
1015 "\x00" # Origin: IGP
1017 path_attributes_hex += (
1018 "\x40" # Flags ("Well-Known")
1019 "\x02" # Type (AS_PATH)
1021 "\x02" # AS segment type (AS_SEQUENCE)
1022 "\x01" # AS segment length (1)
1024 my_as_hex = struct.pack(">I", my_autonomous_system)
1025 path_attributes_hex += my_as_hex # AS segment (4 bytes)
1026 path_attributes_hex += (
1027 "\x40" # Flags ("Well-Known")
1028 "\x05" # Type (LOCAL_PREF)
1030 "\x00\x00\x00\x64" # (100)
1032 if nlri_prefixes != []:
1033 path_attributes_hex += (
1034 "\x40" # Flags ("Well-Known")
1035 "\x03" # Type (NEXT_HOP)
1038 next_hop_hex = struct.pack(">I", int(next_hop))
1039 path_attributes_hex += (
1040 next_hop_hex # IP address of the next hop (4 bytes)
1042 if originator_id is not None:
1043 path_attributes_hex += (
1044 "\x80" # Flags ("Optional, non-transitive")
1045 "\x09" # Type (ORIGINATOR_ID)
1047 ) # ORIGINATOR_ID (4 bytes)
1048 path_attributes_hex += struct.pack(">I", int(originator_id))
1049 if cluster_list_item is not None:
1050 path_attributes_hex += (
1051 "\x80" # Flags ("Optional, non-transitive")
1052 "\x0a" # Type (CLUSTER_LIST)
1054 ) # one CLUSTER_LIST item (4 bytes)
1055 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1057 if self.bgpls and not end_of_rib:
1058 path_attributes_hex += (
1059 "\x80" # Flags ("Optional, non-transitive")
1060 "\x0e" # Type (MP_REACH_NLRI)
1061 "\x22" # Length (34)
1062 "\x40\x04" # AFI (BGP-LS)
1063 "\x47" # SAFI (BGP-LS)
1064 "\x04" # Next Hop Length (4)
1066 path_attributes_hex += struct.pack(">I", int(next_hop))
1067 path_attributes_hex += "\x00" # Reserved
1068 path_attributes_hex += (
1069 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1070 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1071 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1073 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1074 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1075 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1076 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1077 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1079 # Total Path Attributes Length
1080 total_path_attributes_length = len(path_attributes_hex)
1081 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1083 # Network Layer Reachability Information
1086 bytes = ((nlri_prefix_length - 1) / 8) + 1
1087 for prefix in nlri_prefixes:
1088 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1089 struct.pack(">I", int(prefix))[:bytes])
1090 nlri_hex += nlri_prefix_hex
1092 # Length (big-endian)
1094 len(marker_hex) + 2 + len(type_hex) +
1095 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1096 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1098 length_hex = struct.pack(">H", length)
1105 withdrawn_routes_length_hex +
1106 withdrawn_routes_hex +
1107 total_path_attributes_length_hex +
1108 path_attributes_hex +
1113 logger.debug("UPDATE message encoding")
1114 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1115 logger.debug(" Length=" + str(length) + " (0x" +
1116 binascii.hexlify(length_hex) + ")")
1117 logger.debug(" Type=" + str(type) + " (0x" +
1118 binascii.hexlify(type_hex) + ")")
1119 logger.debug(" withdrawn_routes_length=" +
1120 str(withdrawn_routes_length) + " (0x" +
1121 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1122 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1123 str(wr_prefix_length) + " (0x" +
1124 binascii.hexlify(withdrawn_routes_hex) + ")")
1125 if total_path_attributes_length:
1126 logger.debug(" Total Path Attributes Length=" +
1127 str(total_path_attributes_length) + " (0x" +
1128 binascii.hexlify(total_path_attributes_length_hex) + ")")
1129 logger.debug(" Path Attributes=" + "(0x" +
1130 binascii.hexlify(path_attributes_hex) + ")")
1131 logger.debug(" Origin=IGP")
1132 logger.debug(" AS path=" + str(my_autonomous_system))
1133 logger.debug(" Next hop=" + str(next_hop))
1134 if originator_id is not None:
1135 logger.debug(" Originator id=" + str(originator_id))
1136 if cluster_list_item is not None:
1137 logger.debug(" Cluster list=" + str(cluster_list_item))
1139 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1140 logger.debug(" Network Layer Reachability Information=" +
1141 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1142 " (0x" + binascii.hexlify(nlri_hex) + ")")
1143 logger.debug("UPDATE message encoded: 0x" +
1144 binascii.b2a_hex(message_hex))
1147 self.updates_sent += 1
1148 # returning encoded message
1151 def notification_message(self, error_code, error_subcode, data_hex=""):
1152 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1155 :param error_code: see the rfc4271#section-4.5
1156 :param error_subcode: see the rfc4271#section-4.5
1157 :param data_hex: see the rfc4271#section-4.5
1159 :return: encoded NOTIFICATION message in HEX
1163 marker_hex = "\xFF" * 16
1167 type_hex = struct.pack("B", type)
1170 error_code_hex = struct.pack("B", error_code)
1173 error_subcode_hex = struct.pack("B", error_subcode)
1175 # Length (big-endian)
1176 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1177 len(error_subcode_hex) + len(data_hex))
1178 length_hex = struct.pack(">H", length)
1180 # NOTIFICATION Message
1191 logger.debug("NOTIFICATION message encoding")
1192 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1193 logger.debug(" Length=" + str(length) + " (0x" +
1194 binascii.hexlify(length_hex) + ")")
1195 logger.debug(" Type=" + str(type) + " (0x" +
1196 binascii.hexlify(type_hex) + ")")
1197 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1198 binascii.hexlify(error_code_hex) + ")")
1199 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1200 binascii.hexlify(error_subcode_hex) + ")")
1201 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1202 logger.debug("NOTIFICATION message encoded: 0x%s",
1203 binascii.b2a_hex(message_hex))
1207 def keepalive_message(self):
1208 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1211 :return: encoded KEEP ALIVE message in HEX
1215 marker_hex = "\xFF" * 16
1219 type_hex = struct.pack("B", type)
1221 # Length (big-endian)
1222 length = len(marker_hex) + 2 + len(type_hex)
1223 length_hex = struct.pack(">H", length)
1225 # KEEP ALIVE Message
1233 logger.debug("KEEP ALIVE message encoding")
1234 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1235 logger.debug(" Length=" + str(length) + " (0x" +
1236 binascii.hexlify(length_hex) + ")")
1237 logger.debug(" Type=" + str(type) + " (0x" +
1238 binascii.hexlify(type_hex) + ")")
1239 logger.debug("KEEP ALIVE message encoded: 0x%s",
1240 binascii.b2a_hex(message_hex))
1245 class TimeTracker(object):
1246 """Class for tracking timers, both for my keepalives and
1250 def __init__(self, msg_in):
1251 """Initialisation. based on defaults and OPEN message from peer.
1254 msg_in: the OPEN message received from peer.
1256 # Note: Relative time is always named timedelta, to stress that
1257 # the (non-delta) time is absolute.
1258 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1259 # Upper bound for being stuck in the same state, we should
1260 # at least report something before continuing.
1261 # Negotiate the hold timer by taking the smaller
1262 # of the 2 values (mine and the peer's).
1263 hold_timedelta = 180 # Not an attribute of self yet.
1264 # TODO: Make the default value configurable,
1265 # default value could mirror what peer said.
1266 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1267 if hold_timedelta > peer_hold_timedelta:
1268 hold_timedelta = peer_hold_timedelta
1269 if hold_timedelta != 0 and hold_timedelta < 3:
1270 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1271 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1272 self.hold_timedelta = hold_timedelta
1273 # If we do not hear from peer this long, we assume it has died.
1274 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1275 # Upper limit for duration between messages, to avoid being
1276 # declared to be dead.
1277 # The same as calling snapshot(), but also declares a field.
1278 self.snapshot_time = time.time()
1279 # Sometimes we need to store time. This is where to get
1280 # the value from afterwards. Time_keepalive may be too strict.
1281 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1282 # At this time point, peer will be declared dead.
1283 self.my_keepalive_time = None # to be set later
1284 # At this point, we should be sending keepalive message.
1287 """Store current time in instance data to use later."""
1288 # Read as time before something interesting was called.
1289 self.snapshot_time = time.time()
1291 def reset_peer_hold_time(self):
1292 """Move hold time to future as peer has just proven it still lives."""
1293 self.peer_hold_time = time.time() + self.hold_timedelta
1295 # Some methods could rely on self.snapshot_time, but it is better
1296 # to require user to provide it explicitly.
1297 def reset_my_keepalive_time(self, keepalive_time):
1298 """Calculate and set the next my KEEP ALIVE timeout time
1301 :keepalive_time: the initial value of the KEEP ALIVE timer
1303 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1305 def is_time_for_my_keepalive(self):
1306 """Check for my KEEP ALIVE timeout occurence"""
1307 if self.hold_timedelta == 0:
1309 return self.snapshot_time >= self.my_keepalive_time
1311 def get_next_event_time(self):
1312 """Set the time of the next expected or to be sent KEEP ALIVE"""
1313 if self.hold_timedelta == 0:
1314 return self.snapshot_time + 86400
1315 return min(self.my_keepalive_time, self.peer_hold_time)
1317 def check_peer_hold_time(self, snapshot_time):
1318 """Raise error if nothing was read from peer until specified time."""
1319 # Hold time = 0 means keepalive checking off.
1320 if self.hold_timedelta != 0:
1321 # time.time() may be too strict
1322 if snapshot_time > self.peer_hold_time:
1323 logger.error("Peer has overstepped the hold timer.")
1324 raise RuntimeError("Peer has overstepped the hold timer.")
1325 # TODO: Include hold_timedelta?
1326 # TODO: Add notification sending (attempt). That means
1327 # move to write tracker.
1330 class ReadTracker(object):
1331 """Class for tracking read of mesages chunk by chunk and
1335 def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False,
1336 l3vpn_mcast=False, wait_for_read=10):
1337 """The reader initialisation.
1340 bgp_socket: socket to be used for sending
1341 timer: timer to be used for scheduling
1342 storage: thread safe dict
1343 evpn: flag that evpn functionality is tested
1344 mvpn: flag that mvpn functionality is tested
1345 l3vpn_mcast: flag that l3vpn_mcast functionality is tested
1347 # References to outside objects.
1348 self.socket = bgp_socket
1350 # BGP marker length plus length field length.
1351 self.header_length = 18
1352 # TODO: make it class (constant) attribute
1353 # Computation of where next chunk ends depends on whether
1354 # we are beyond length field.
1355 self.reading_header = True
1356 # Countdown towards next size computation.
1357 self.bytes_to_read = self.header_length
1358 # Incremental buffer for message under read.
1360 # Initialising counters
1361 self.updates_received = 0
1362 self.prefixes_introduced = 0
1363 self.prefixes_withdrawn = 0
1364 self.rx_idle_time = 0
1365 self.rx_activity_detected = True
1366 self.storage = storage
1369 self.l3vpn_mcast = l3vpn_mcast
1370 self.wfr = wait_for_read
1372 def read_message_chunk(self):
1373 """Read up to one message
1376 Currently it does not return anything.
1378 # TODO: We could return the whole message, currently not needed.
1379 # We assume the socket is readable.
1380 chunk_message = self.socket.recv(self.bytes_to_read)
1381 self.msg_in += chunk_message
1382 self.bytes_to_read -= len(chunk_message)
1383 # TODO: bytes_to_read < 0 is not possible, right?
1384 if not self.bytes_to_read:
1385 # Finished reading a logical block.
1386 if self.reading_header:
1387 # The logical block was a BGP header.
1388 # Now we know the size of the message.
1389 self.reading_header = False
1390 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1392 else: # We have finished reading the body of the message.
1393 # Peer has just proven it is still alive.
1394 self.timer.reset_peer_hold_time()
1395 # TODO: Do we want to count received messages?
1396 # This version ignores the received message.
1397 # TODO: Should we do validation and exit on anything
1398 # besides update or keepalive?
1399 # Prepare state for reading another message.
1400 message_type_hex = self.msg_in[self.header_length]
1401 if message_type_hex == "\x01":
1402 logger.info("OPEN message received: 0x%s",
1403 binascii.b2a_hex(self.msg_in))
1404 elif message_type_hex == "\x02":
1405 logger.debug("UPDATE message received: 0x%s",
1406 binascii.b2a_hex(self.msg_in))
1407 self.decode_update_message(self.msg_in)
1408 elif message_type_hex == "\x03":
1409 logger.info("NOTIFICATION message received: 0x%s",
1410 binascii.b2a_hex(self.msg_in))
1411 elif message_type_hex == "\x04":
1412 logger.info("KEEP ALIVE message received: 0x%s",
1413 binascii.b2a_hex(self.msg_in))
1415 logger.warning("Unexpected message received: 0x%s",
1416 binascii.b2a_hex(self.msg_in))
1418 self.reading_header = True
1419 self.bytes_to_read = self.header_length
1420 # We should not act upon peer_hold_time if we are reading
1421 # something right now.
1424 def decode_path_attributes(self, path_attributes_hex):
1425 """Decode the Path Attributes field (rfc4271#section-4.3)
1428 :path_attributes: path_attributes field to be decoded in hex
1432 hex_to_decode = path_attributes_hex
1434 while len(hex_to_decode):
1435 attr_flags_hex = hex_to_decode[0]
1436 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1437 # attr_optional_bit = attr_flags & 128
1438 # attr_transitive_bit = attr_flags & 64
1439 # attr_partial_bit = attr_flags & 32
1440 attr_extended_length_bit = attr_flags & 16
1442 attr_type_code_hex = hex_to_decode[1]
1443 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1445 if attr_extended_length_bit:
1446 attr_length_hex = hex_to_decode[2:4]
1447 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1448 attr_value_hex = hex_to_decode[4:4 + attr_length]
1449 hex_to_decode = hex_to_decode[4 + attr_length:]
1451 attr_length_hex = hex_to_decode[2]
1452 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1453 attr_value_hex = hex_to_decode[3:3 + attr_length]
1454 hex_to_decode = hex_to_decode[3 + attr_length:]
1456 if attr_type_code == 1:
1457 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1458 binascii.b2a_hex(attr_flags_hex))
1459 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1460 elif attr_type_code == 2:
1461 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1462 binascii.b2a_hex(attr_flags_hex))
1463 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1464 elif attr_type_code == 3:
1465 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1466 binascii.b2a_hex(attr_flags_hex))
1467 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1468 elif attr_type_code == 4:
1469 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1470 binascii.b2a_hex(attr_flags_hex))
1471 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1472 elif attr_type_code == 5:
1473 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1474 binascii.b2a_hex(attr_flags_hex))
1475 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1476 elif attr_type_code == 6:
1477 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1478 binascii.b2a_hex(attr_flags_hex))
1479 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1480 elif attr_type_code == 7:
1481 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1482 binascii.b2a_hex(attr_flags_hex))
1483 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1484 elif attr_type_code == 9: # rfc4456#section-8
1485 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1486 binascii.b2a_hex(attr_flags_hex))
1487 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1488 elif attr_type_code == 10: # rfc4456#section-8
1489 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1490 binascii.b2a_hex(attr_flags_hex))
1491 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1492 elif attr_type_code == 14: # rfc4760#section-3
1493 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1494 binascii.b2a_hex(attr_flags_hex))
1495 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1496 address_family_identifier_hex = attr_value_hex[0:2]
1497 logger.debug(" Address Family Identifier=0x%s",
1498 binascii.b2a_hex(address_family_identifier_hex))
1499 subsequent_address_family_identifier_hex = attr_value_hex[2]
1500 logger.debug(" Subsequent Address Family Identifier=0x%s",
1501 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1502 next_hop_netaddr_len_hex = attr_value_hex[3]
1503 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1504 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1505 next_hop_netaddr_len,
1506 binascii.b2a_hex(next_hop_netaddr_len_hex))
1507 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1508 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1509 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1510 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1511 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1512 logger.debug(" Reserved=0x%s",
1513 binascii.b2a_hex(reserved_hex))
1514 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1515 logger.debug(" Network Layer Reachability Information=0x%s",
1516 binascii.b2a_hex(nlri_hex))
1517 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1518 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1519 for prefix in nlri_prefix_list:
1520 logger.debug(" nlri_prefix_received: %s", prefix)
1521 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1522 elif attr_type_code == 15: # rfc4760#section-4
1523 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1524 binascii.b2a_hex(attr_flags_hex))
1525 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1526 address_family_identifier_hex = attr_value_hex[0:2]
1527 logger.debug(" Address Family Identifier=0x%s",
1528 binascii.b2a_hex(address_family_identifier_hex))
1529 subsequent_address_family_identifier_hex = attr_value_hex[2]
1530 logger.debug(" Subsequent Address Family Identifier=0x%s",
1531 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1532 wd_hex = attr_value_hex[3:]
1533 logger.debug(" Withdrawn Routes=0x%s",
1534 binascii.b2a_hex(wd_hex))
1535 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1536 logger.debug(" Withdrawn routes prefix list: %s",
1538 for prefix in wdr_prefix_list:
1539 logger.debug(" withdrawn_prefix_received: %s", prefix)
1540 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1542 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1543 binascii.b2a_hex(attr_flags_hex))
1544 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1547 def decode_update_message(self, msg):
1548 """Decode an UPDATE message (rfc4271#section-4.3)
1551 :msg: message to be decoded in hex
1555 logger.debug("Decoding update message:")
1556 # message header - marker
1557 marker_hex = msg[:16]
1558 logger.debug("Message header marker: 0x%s",
1559 binascii.b2a_hex(marker_hex))
1560 # message header - message length
1561 msg_length_hex = msg[16:18]
1562 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1563 logger.debug("Message lenght: 0x%s (%s)",
1564 binascii.b2a_hex(msg_length_hex), msg_length)
1565 # message header - message type
1566 msg_type_hex = msg[18:19]
1567 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1569 with self.storage as stor:
1570 # this will replace the previously stored message
1571 stor['update'] = binascii.hexlify(msg)
1573 logger.debug("Evpn {}".format(self.evpn))
1575 logger.debug("Skipping update decoding due to evpn data expected")
1578 logger.debug("Mvpn {}".format(self.mvpn))
1580 logger.debug("Skipping update decoding due to mvpn data expected")
1583 logger.debug("L3vpn-mcast {}".format(self.l3vpn_mcast))
1584 if self.l3vpn_mcast:
1585 logger.debug("Skipping update decoding due to l3vpn_mcast data expected")
1589 logger.debug("Message type: 0x%s (update)",
1590 binascii.b2a_hex(msg_type_hex))
1591 # withdrawn routes length
1592 wdr_length_hex = msg[19:21]
1593 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1594 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1595 binascii.b2a_hex(wdr_length_hex), wdr_length)
1597 wdr_hex = msg[21:21 + wdr_length]
1598 logger.debug("Withdrawn routes: 0x%s",
1599 binascii.b2a_hex(wdr_hex))
1600 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1601 logger.debug("Withdrawn routes prefix list: %s",
1603 for prefix in wdr_prefix_list:
1604 logger.debug("withdrawn_prefix_received: %s", prefix)
1605 # total path attribute length
1606 total_pa_length_offset = 21 + wdr_length
1607 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1608 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1609 logger.debug("Total path attribute lenght: 0x%s (%s)",
1610 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1612 pa_offset = total_pa_length_offset + 2
1613 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1614 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1615 self.decode_path_attributes(pa_hex)
1616 # network layer reachability information length
1617 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1618 logger.debug("Calculated NLRI length: %s", nlri_length)
1619 # network layer reachability information
1620 nlri_offset = pa_offset + total_pa_length
1621 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1622 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1623 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1624 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1625 for prefix in nlri_prefix_list:
1626 logger.debug("nlri_prefix_received: %s", prefix)
1628 self.updates_received += 1
1629 self.prefixes_introduced += len(nlri_prefix_list)
1630 self.prefixes_withdrawn += len(wdr_prefix_list)
1632 logger.error("Unexpeced message type 0x%s in 0x%s",
1633 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1635 def wait_for_read(self):
1636 """Read message until timeout (next expected event).
1639 Used when no more updates has to be sent to avoid busy-wait.
1640 Currently it does not return anything.
1642 # Compute time to the first predictable state change
1643 event_time = self.timer.get_next_event_time()
1644 # snapshot_time would be imprecise
1645 wait_timedelta = min(event_time - time.time(), self.wfr)
1646 if wait_timedelta < 0:
1647 # The program got around to waiting to an event in "very near
1648 # future" so late that it became a "past" event, thus tell
1649 # "select" to not wait at all. Passing negative timedelta to
1650 # select() would lead to either waiting forever (for -1) or
1651 # select.error("Invalid parameter") (for everything else).
1653 # And wait for event or something to read.
1655 if not self.rx_activity_detected or not (self.updates_received % 100):
1656 # right time to write statistics to the log (not for every update and
1657 # not too frequently to avoid having large log files)
1658 logger.info("total_received_update_message_counter: %s",
1659 self.updates_received)
1660 logger.info("total_received_nlri_prefix_counter: %s",
1661 self.prefixes_introduced)
1662 logger.info("total_received_withdrawn_prefix_counter: %s",
1663 self.prefixes_withdrawn)
1665 start_time = time.time()
1666 select.select([self.socket], [], [self.socket], wait_timedelta)
1667 timedelta = time.time() - start_time
1668 self.rx_idle_time += timedelta
1669 self.rx_activity_detected = timedelta < 1
1671 if not self.rx_activity_detected or not (self.updates_received % 100):
1672 # right time to write statistics to the log (not for every update and
1673 # not too frequently to avoid having large log files)
1674 logger.info("... idle for %.3fs", timedelta)
1675 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1679 class WriteTracker(object):
1680 """Class tracking enqueueing messages and sending chunks of them."""
1682 def __init__(self, bgp_socket, generator, timer):
1683 """The writter initialisation.
1686 bgp_socket: socket to be used for sending
1687 generator: generator to be used for message generation
1688 timer: timer to be used for scheduling
1690 # References to outside objects,
1691 self.socket = bgp_socket
1692 self.generator = generator
1694 # Really new fields.
1695 # TODO: Would attribute docstrings add anything substantial?
1696 self.sending_message = False
1697 self.bytes_to_send = 0
1700 def enqueue_message_for_sending(self, message):
1701 """Enqueue message and change state.
1704 message: message to be enqueued into the msg_out buffer
1706 self.msg_out += message
1707 self.bytes_to_send += len(message)
1708 self.sending_message = True
1710 def send_message_chunk_is_whole(self):
1711 """Send enqueued data from msg_out buffer
1714 :return: true if no remaining data to send
1716 # We assume there is a msg_out to send and socket is writable.
1717 # print "going to send", repr(self.msg_out)
1718 self.timer.snapshot()
1719 bytes_sent = self.socket.send(self.msg_out)
1720 # Forget the part of message that was sent.
1721 self.msg_out = self.msg_out[bytes_sent:]
1722 self.bytes_to_send -= bytes_sent
1723 if not self.bytes_to_send:
1724 # TODO: Is it possible to hit negative bytes_to_send?
1725 self.sending_message = False
1726 # We should have reset hold timer on peer side.
1727 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1728 # The possible reason for not prioritizing reads is gone.
1733 class StateTracker(object):
1734 """Main loop has state so complex it warrants this separate class."""
1736 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1737 """The state tracker initialisation.
1740 bgp_socket: socket to be used for sending / receiving
1741 generator: generator to be used for message generation
1742 timer: timer to be used for scheduling
1743 inqueue: user initiated messages queue
1744 storage: thread safe dict to store data for the rpc server
1745 cliargs: cli args from the user
1747 # References to outside objects.
1748 self.socket = bgp_socket
1749 self.generator = generator
1752 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn,
1753 mvpn=cliargs.mvpn, l3vpn_mcast=cliargs.l3vpn_mcast,
1754 wait_for_read=cliargs.wfr)
1755 self.writer = WriteTracker(bgp_socket, generator, timer)
1756 # Prioritization state.
1757 self.prioritize_writing = False
1758 # In general, we prioritize reading over writing. But in order
1759 # not to get blocked by neverending reads, we should
1760 # check whether we are not risking running out of holdtime.
1761 # So in some situations, this field is set to True to attempt
1762 # finishing sending a message, after which this field resets
1764 # TODO: Alternative is to switch fairly between reading and
1765 # writing (called round robin from now on).
1766 # Message counting is done in generator.
1767 self.inqueue = inqueue
1769 def perform_one_loop_iteration(self):
1770 """ The main loop iteration
1773 Calculates priority, resolves all conditions, calls
1774 appropriate method and returns to caller to repeat.
1776 self.timer.snapshot()
1777 if not self.prioritize_writing:
1778 if self.timer.is_time_for_my_keepalive():
1779 if not self.writer.sending_message:
1780 # We need to schedule a keepalive ASAP.
1781 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1782 logger.info("KEEP ALIVE is sent.")
1783 # We are sending a message now, so let's prioritize it.
1784 self.prioritize_writing = True
1787 msg = self.inqueue.get_nowait()
1788 logger.info("Received message: {}".format(msg))
1789 msgbin = binascii.unhexlify(msg)
1790 self.writer.enqueue_message_for_sending(msgbin)
1793 # Now we know what our priorities are, we have to check
1794 # which actions are available.
1795 # socket.socket() returns three lists,
1796 # we store them to list of lists.
1797 list_list = select.select([self.socket], [self.socket], [self.socket],
1798 self.timer.report_timedelta)
1799 read_list, write_list, except_list = list_list
1800 # Lists are unpacked, each is either [] or [self.socket],
1801 # so we will test them as boolean.
1803 logger.error("Exceptional state on the socket.")
1804 raise RuntimeError("Exceptional state on socket", self.socket)
1805 # We will do either read or write.
1806 if not (self.prioritize_writing and write_list):
1807 # Either we have no reason to rush writes,
1808 # or the socket is not writable.
1809 # We are focusing on reading here.
1810 if read_list: # there is something to read indeed
1811 # In this case we want to read chunk of message
1812 # and repeat the select,
1813 self.reader.read_message_chunk()
1815 # We were focusing on reading, but nothing to read was there.
1816 # Good time to check peer for hold timer.
1817 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1818 # Quiet on the read front, we can have attempt to write.
1820 # Either we really want to reset peer's view of our hold
1821 # timer, or there was nothing to read.
1822 # Were we in the middle of sending a message?
1823 if self.writer.sending_message:
1824 # Was it the end of a message?
1825 whole = self.writer.send_message_chunk_is_whole()
1826 # We were pressed to send something and we did it.
1827 if self.prioritize_writing and whole:
1828 # We prioritize reading again.
1829 self.prioritize_writing = False
1831 # Finally to check if still update messages to be generated.
1832 if self.generator.remaining_prefixes:
1833 msg_out = self.generator.compose_update_message()
1834 if not self.generator.remaining_prefixes:
1835 # We have just finished update generation,
1836 # end-of-rib is due.
1837 logger.info("All update messages generated.")
1838 logger.info("Storing performance results.")
1839 self.generator.store_results()
1840 logger.info("Finally an END-OF-RIB is sent.")
1841 msg_out += self.generator.update_message(wr_prefixes=[],
1844 self.writer.enqueue_message_for_sending(msg_out)
1845 # Attempt for real sending to be done in next iteration.
1847 # Nothing to write anymore.
1848 # To avoid busy loop, we do idle waiting here.
1849 self.reader.wait_for_read()
1851 # We can neither read nor write.
1852 logger.warning("Input and output both blocked for " +
1853 str(self.timer.report_timedelta) + " seconds.")
1854 # FIXME: Are we sure select has been really waiting
1859 def create_logger(loglevel, logfile):
1860 """Create logger object
1863 :loglevel: log level
1864 :logfile: log file name
1866 :return: logger object
1868 logger = logging.getLogger("logger")
1869 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1870 console_handler = logging.StreamHandler()
1871 file_handler = logging.FileHandler(logfile, mode="w")
1872 console_handler.setFormatter(log_formatter)
1873 file_handler.setFormatter(log_formatter)
1874 logger.addHandler(console_handler)
1875 logger.addHandler(file_handler)
1876 logger.setLevel(loglevel)
1880 def job(arguments, inqueue, storage):
1881 """One time initialisation and iterations looping.
1883 Establish BGP connection and run iterations.
1886 :arguments: Command line arguments
1887 :inqueue: Data to be sent from play.py
1888 :storage: Shared dict for rpc server
1892 bgp_socket = establish_connection(arguments)
1893 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1894 # Receive open message before sending anything.
1895 # FIXME: Add parameter to send default open message first,
1896 # to work with "you first" peers.
1897 msg_in = read_open_message(bgp_socket)
1898 timer = TimeTracker(msg_in)
1899 generator = MessageGenerator(arguments)
1900 msg_out = generator.open_message()
1901 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1902 # Send our open message to the peer.
1903 bgp_socket.send(msg_out)
1904 # Wait for confirming keepalive.
1905 # TODO: Surely in just one packet?
1906 # Using exact keepalive length to not to see possible updates.
1907 msg_in = bgp_socket.recv(19)
1908 if msg_in != generator.keepalive_message():
1909 error_msg = "Open not confirmed by keepalive, instead got"
1910 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1911 raise MessageError(error_msg, msg_in)
1912 timer.reset_peer_hold_time()
1913 # Send the keepalive to indicate the connection is accepted.
1914 timer.snapshot() # Remember this time.
1915 msg_out = generator.keepalive_message()
1916 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1917 bgp_socket.send(msg_out)
1918 # Use the remembered time.
1919 timer.reset_my_keepalive_time(timer.snapshot_time)
1920 # End of initial handshake phase.
1921 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1922 while True: # main reactor loop
1923 state.perform_one_loop_iteration()
1927 '''Handler for SimpleXMLRPCServer'''
1929 def __init__(self, sendqueue, storage):
1933 :sendqueue: queue for data to be sent towards odl
1934 :storage: thread safe dict
1936 self.queue = sendqueue
1937 self.storage = storage
1939 def send(self, text):
1943 :text: hes string of the data to be sent
1945 self.queue.put(text)
1947 def get(self, text=''):
1948 '''Reads data form the storage
1950 - returns stored data or an empty string, at the moment only
1954 :text: a key to the storage to get the data
1958 with self.storage as stor:
1959 return stor.get(text, '')
1961 def clean(self, text=''):
1962 '''Cleans data form the storage
1965 :text: a key to the storage to clean the data
1967 with self.storage as stor:
1972 def threaded_job(arguments):
1973 """Run the job threaded
1976 :arguments: Command line arguments
1980 amount_left = arguments.amount
1981 utils_left = arguments.multiplicity
1982 prefix_current = arguments.firstprefix
1983 myip_current = arguments.myip
1985 rpcqueue = Queue.Queue()
1986 storage = SafeDict()
1989 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1990 amount_left -= amount_per_util
1993 args = deepcopy(arguments)
1994 args.amount = amount_per_util
1995 args.firstprefix = prefix_current
1996 args.myip = myip_current
1997 thread_args.append(args)
2001 prefix_current += amount_per_util * 16
2006 for t in thread_args:
2007 thread.start_new_thread(job, (t, rpcqueue, storage))
2009 print "Error: unable to start thread."
2012 rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
2013 rpcserver.register_instance(Rpcs(rpcqueue, storage))
2014 rpcserver.serve_forever()
2017 if __name__ == "__main__":
2018 arguments = parse_arguments()
2019 logger = create_logger(arguments.loglevel, arguments.logfile)
2020 threaded_job(arguments)