1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Maximum number of IP prefixes to be announced in one iteration"
69 parser.add_argument("--insert", default="1", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72 str_help = "The number of prefixes to process without withdrawals"
73 parser.add_argument("--prefill", default="0", type=int, help=str_help)
74 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75 parser.add_argument("--updates", choices=["single", "separate"],
76 default=["separate"], help=str_help)
77 str_help = "Base prefix IP address for prefix generation"
78 parser.add_argument("--firstprefix", default="8.0.1.0",
79 type=ipaddr.IPv4Address, help=str_help)
80 str_help = "The prefix length."
81 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82 str_help = "Listen for connection, instead of initiating it."
83 parser.add_argument("--listen", action="store_true", help=str_help)
84 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85 "Default value only suitable for listening.")
86 parser.add_argument("--myip", default="0.0.0.0",
87 type=ipaddr.IPv4Address, help=str_help)
88 str_help = ("TCP port to bind to when listening or initiating connection." +
89 "Default only suitable for initiating.")
90 parser.add_argument("--myport", default="0", type=int, help=str_help)
91 str_help = "The IP of the next hop to be placed into the update messages."
92 parser.add_argument("--nexthop", default="192.0.2.1",
93 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94 str_help = "Identifier of the route originator."
95 parser.add_argument("--originator", default=None,
96 type=ipaddr.IPv4Address, dest="originator", help=str_help)
97 str_help = "Cluster list item identifier."
98 parser.add_argument("--cluster", default=None,
99 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100 str_help = ("Numeric IP Address to try to connect to." +
101 "Currently no effect in listening mode.")
102 parser.add_argument("--peerip", default="127.0.0.2",
103 type=ipaddr.IPv4Address, help=str_help)
104 str_help = "TCP port to try to connect to. No effect in listening mode."
105 parser.add_argument("--peerport", default="179", type=int, help=str_help)
106 str_help = "Local hold time."
107 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108 str_help = "Log level (--error, --warning, --info, --debug)"
109 parser.add_argument("--error", dest="loglevel", action="store_const",
110 const=logging.ERROR, default=logging.INFO,
112 parser.add_argument("--warning", dest="loglevel", action="store_const",
113 const=logging.WARNING, default=logging.INFO,
115 parser.add_argument("--info", dest="loglevel", action="store_const",
116 const=logging.INFO, default=logging.INFO,
118 parser.add_argument("--debug", dest="loglevel", action="store_const",
119 const=logging.DEBUG, default=logging.INFO,
121 str_help = "Log file name"
122 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123 str_help = "Trailing part of the csv result files for plotting purposes"
124 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125 str_help = "Minimum number of updates to reach to include result into csv."
126 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129 str_help = "Link-State NLRI supported"
130 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131 str_help = "Link-State NLRI: Identifier"
132 parser.add_argument("-lsid", default="1", type=int, help=str_help)
133 str_help = "Link-State NLRI: Tunnel ID"
134 parser.add_argument("-lstid", default="1", type=int, help=str_help)
135 str_help = "Link-State NLRI: LSP ID"
136 parser.add_argument("-lspid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138 parser.add_argument("--lstsaddr", default="1.2.3.4",
139 type=ipaddr.IPv4Address, help=str_help)
140 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141 parser.add_argument("--lsteaddr", default="5.6.7.8",
142 type=ipaddr.IPv4Address, help=str_help)
143 str_help = "Link-State NLRI: Identifier Step"
144 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145 str_help = "Link-State NLRI: Tunnel ID Step"
146 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147 str_help = "Link-State NLRI: LSP ID Step"
148 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153 str_help = "How many play utilities are to be started."
154 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159 str_help = "Open message includes Multicast in MPLS/BGP IP VPNs arguments.\
160 Enabling this flag makes the script not decoding the update mesage, because of not\
161 supported decoding for these elements."
162 parser.add_argument("--mvpn", default=False, action="store_true", help=str_help)
163 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
164 str_help = "Skipping well known attributes for update message"
165 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
166 arguments = parser.parse_args()
167 if arguments.multiplicity < 1:
168 print "Multiplicity", arguments.multiplicity, "is not positive."
170 # TODO: Are sanity checks (such as asnumber>=0) required?
174 def establish_connection(arguments):
175 """Establish connection to BGP peer.
178 :arguments: following command-line argumets are used
179 - arguments.myip: local IP address
180 - arguments.myport: local port
181 - arguments.peerip: remote IP address
182 - arguments.peerport: remote port
187 logger.info("Connecting in the listening mode.")
188 logger.debug("Local IP address: " + str(arguments.myip))
189 logger.debug("Local port: " + str(arguments.myport))
190 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
191 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
192 # bind need single tuple as argument
193 listening_socket.bind((str(arguments.myip), arguments.myport))
194 listening_socket.listen(1)
195 bgp_socket, _ = listening_socket.accept()
196 # TODO: Verify client IP is cotroller IP.
197 listening_socket.close()
199 logger.info("Connecting in the talking mode.")
200 logger.debug("Local IP address: " + str(arguments.myip))
201 logger.debug("Local port: " + str(arguments.myport))
202 logger.debug("Remote IP address: " + str(arguments.peerip))
203 logger.debug("Remote port: " + str(arguments.peerport))
204 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
205 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
206 # bind to force specified address and port
207 talking_socket.bind((str(arguments.myip), arguments.myport))
208 # socket does not spead ipaddr, hence str()
209 talking_socket.connect((str(arguments.peerip), arguments.peerport))
210 bgp_socket = talking_socket
211 logger.info("Connected to ODL.")
215 def get_short_int_from_message(message, offset=16):
216 """Extract 2-bytes number from provided message.
219 :message: given message
220 :offset: offset of the short_int inside the message
222 :return: required short_inf value.
224 default offset value is the BGP message size offset.
226 high_byte_int = ord(message[offset])
227 low_byte_int = ord(message[offset + 1])
228 short_int = high_byte_int * 256 + low_byte_int
232 def get_prefix_list_from_hex(prefixes_hex):
233 """Get decoded list of prefixes (rfc4271#section-4.3)
236 :prefixes_hex: list of prefixes to be decoded in hex
238 :return: list of prefixes in the form of ip address (X.X.X.X/X)
242 while offset < len(prefixes_hex):
243 prefix_bit_len_hex = prefixes_hex[offset]
244 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
245 prefix_len = ((prefix_bit_len - 1) / 8) + 1
246 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
247 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
248 offset += 1 + prefix_len
249 prefix_list.append(prefix + "/" + str(prefix_bit_len))
253 class MessageError(ValueError):
254 """Value error with logging optimized for hexlified messages."""
256 def __init__(self, text, message, *args):
259 Store and call super init for textual comment,
260 store raw message which caused it.
264 super(MessageError, self).__init__(text, message, *args)
267 """Generate human readable error message.
270 :return: human readable message as string
272 Use a placeholder string if the message is to be empty.
274 message = binascii.hexlify(self.msg)
276 message = "(empty message)"
277 return self.text + ": " + message
280 def read_open_message(bgp_socket):
281 """Receive peer's OPEN message
284 :bgp_socket: the socket to be read
286 :return: received OPEN message.
288 Performs just basic incomming message checks
290 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
291 # TODO: Can the incoming open message be split in more than one packet?
294 # 37 is minimal length of open message with 4-byte AS number.
296 "Message length (" + str(len(msg_in)) + ") is smaller than "
297 "minimal length of OPEN message with 4-byte AS number (37)"
299 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
300 raise MessageError(error_msg, msg_in)
301 # TODO: We could check BGP marker, but it is defined only later;
303 reported_length = get_short_int_from_message(msg_in)
304 if len(msg_in) != reported_length:
306 "Expected message length (" + reported_length +
307 ") does not match actual length (" + str(len(msg_in)) + ")"
309 logger.error(error_msg + binascii.hexlify(msg_in))
310 raise MessageError(error_msg, msg_in)
311 logger.info("Open message received.")
315 class MessageGenerator(object):
316 """Class which generates messages, holds states and configuration values."""
318 # TODO: Define bgp marker as a class (constant) variable.
319 def __init__(self, args):
320 """Initialisation according to command-line args.
323 :args: argsparser's Namespace object which contains command-line
324 options for MesageGenerator initialisation
326 Calculates and stores default values used later on for
329 self.total_prefix_amount = args.amount
330 # Number of update messages left to be sent.
331 self.remaining_prefixes = self.total_prefix_amount
333 # New parameters initialisation
335 self.prefix_base_default = args.firstprefix
336 self.prefix_length_default = args.prefixlen
337 self.wr_prefixes_default = []
338 self.nlri_prefixes_default = []
339 self.version_default = 4
340 self.my_autonomous_system_default = args.asnumber
341 self.hold_time_default = args.holdtime # Local hold time.
342 self.bgp_identifier_default = int(args.myip)
343 self.next_hop_default = args.nexthop
344 self.originator_id_default = args.originator
345 self.cluster_list_item_default = args.cluster
346 self.single_update_default = args.updates == "single"
347 self.randomize_updates_default = args.updates == "random"
348 self.prefix_count_to_add_default = args.insert
349 self.prefix_count_to_del_default = args.withdraw
350 if self.prefix_count_to_del_default < 0:
351 self.prefix_count_to_del_default = 0
352 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
353 # total number of prefixes must grow to avoid infinite test loop
354 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
355 self.slot_size_default = self.prefix_count_to_add_default
356 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
357 self.results_file_name_default = args.results
358 self.performance_threshold_default = args.threshold
359 self.rfc4760 = args.rfc4760
360 self.bgpls = args.bgpls
361 self.evpn = args.evpn
362 self.mvpn = args.mvpn
363 self.skipattr = args.skipattr
364 # Default values when BGP-LS Attributes are used
366 self.prefix_count_to_add_default = 1
367 self.prefix_count_to_del_default = 0
368 self.ls_nlri_default = {"Identifier": args.lsid,
369 "TunnelID": args.lstid,
371 "IPv4TunnelSenderAddress": args.lstsaddr,
372 "IPv4TunnelEndPointAddress": args.lsteaddr}
373 self.lsid_step = args.lsidstep
374 self.lstid_step = args.lstidstep
375 self.lspid_step = args.lspidstep
376 self.lstsaddr_step = args.lstsaddrstep
377 self.lsteaddr_step = args.lsteaddrstep
378 # Default values used for randomized part
379 s1_slots = ((self.total_prefix_amount -
380 self.remaining_prefixes_threshold - 1) /
381 self.prefix_count_to_add_default + 1)
382 s2_slots = ((self.remaining_prefixes_threshold - 1) /
383 (self.prefix_count_to_add_default -
384 self.prefix_count_to_del_default) + 1)
386 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
387 s2_first_index = s1_slots * self.prefix_count_to_add_default
388 s2_last_index = (s2_first_index +
389 s2_slots * (self.prefix_count_to_add_default -
390 self.prefix_count_to_del_default) - 1)
391 self.slot_gap_default = ((self.total_prefix_amount -
392 self.remaining_prefixes_threshold - 1) /
393 self.prefix_count_to_add_default + 1)
394 self.randomize_lowest_default = s2_first_index
395 self.randomize_highest_default = s2_last_index
396 # Initialising counters
397 self.phase1_start_time = 0
398 self.phase1_stop_time = 0
399 self.phase2_start_time = 0
400 self.phase2_stop_time = 0
401 self.phase1_updates_sent = 0
402 self.phase2_updates_sent = 0
403 self.updates_sent = 0
405 self.log_info = args.loglevel <= logging.INFO
406 self.log_debug = args.loglevel <= logging.DEBUG
408 Flags needed for the MessageGenerator performance optimization.
409 Calling logger methods each iteration even with proper log level set
410 slows down significantly the MessageGenerator performance.
411 Measured total generation time (1M updates, dry run, error log level):
412 - logging based on basic logger features: 36,2s
413 - logging based on advanced logger features (lazy logging): 21,2s
414 - conditional calling of logger methods enclosed inside condition: 8,6s
417 logger.info("Generator initialisation")
418 logger.info(" Target total number of prefixes to be introduced: " +
419 str(self.total_prefix_amount))
420 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
421 str(self.prefix_length_default))
422 logger.info(" My Autonomous System number: " +
423 str(self.my_autonomous_system_default))
424 logger.info(" My Hold Time: " + str(self.hold_time_default))
425 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
426 logger.info(" Next Hop: " + str(self.next_hop_default))
427 logger.info(" Originator ID: " + str(self.originator_id_default))
428 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
429 logger.info(" Prefix count to be inserted at once: " +
430 str(self.prefix_count_to_add_default))
431 logger.info(" Prefix count to be withdrawn at once: " +
432 str(self.prefix_count_to_del_default))
433 logger.info(" Fast pre-fill up to " +
434 str(self.total_prefix_amount -
435 self.remaining_prefixes_threshold) + " prefixes")
436 logger.info(" Remaining number of prefixes to be processed " +
437 "in parallel with withdrawals: " +
438 str(self.remaining_prefixes_threshold))
439 logger.debug(" Prefix index range used after pre-fill procedure [" +
440 str(self.randomize_lowest_default) + ", " +
441 str(self.randomize_highest_default) + "]")
442 if self.single_update_default:
443 logger.info(" Common single UPDATE will be generated " +
444 "for both NLRI & WITHDRAWN lists")
446 logger.info(" Two separate UPDATEs will be generated " +
447 "for each NLRI & WITHDRAWN lists")
448 if self.randomize_updates_default:
449 logger.info(" Generation of UPDATE messages will be randomized")
450 logger.info(" Let\'s go ...\n")
452 # TODO: Notification for hold timer expiration can be handy.
454 def store_results(self, file_name=None, threshold=None):
455 """ Stores specified results into files based on file_name value.
458 :param file_name: Trailing (common) part of result file names
459 :param threshold: Minimum number of sent updates needed for each
460 result to be included into result csv file
461 (mainly needed because of the result accuracy)
465 # default values handling
466 # TODO optimize default values handling (use e.g. dicionary.update() approach)
467 if file_name is None:
468 file_name = self.results_file_name_default
469 if threshold is None:
470 threshold = self.performance_threshold_default
471 # performance calculation
472 if self.phase1_updates_sent >= threshold:
473 totals1 = self.phase1_updates_sent
474 performance1 = int(self.phase1_updates_sent /
475 (self.phase1_stop_time - self.phase1_start_time))
479 if self.phase2_updates_sent >= threshold:
480 totals2 = self.phase2_updates_sent
481 performance2 = int(self.phase2_updates_sent /
482 (self.phase2_stop_time - self.phase2_start_time))
487 logger.info("#" * 10 + " Final results " + "#" * 10)
488 logger.info("Number of iterations: " + str(self.iteration))
489 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
490 str(self.phase1_updates_sent))
491 logger.info("The pre-fill phase duration: " +
492 str(self.phase1_stop_time - self.phase1_start_time) + "s")
493 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
494 str(self.phase2_updates_sent))
495 logger.info("The 2nd test phase duration: " +
496 str(self.phase2_stop_time - self.phase2_start_time) + "s")
497 logger.info("Threshold for performance reporting: " + str(threshold))
500 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
501 " route(s) per UPDATE")
502 if self.single_update_default:
503 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
504 "/-" + str(self.prefix_count_to_del_default) +
505 " routes per UPDATE")
507 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
508 "/-" + str(self.prefix_count_to_del_default) +
509 " routes in two UPDATEs")
510 # collecting capacity and performance results
513 if totals1 is not None:
514 totals[phase1_label] = totals1
515 performance[phase1_label] = performance1
516 if totals2 is not None:
517 totals[phase2_label] = totals2
518 performance[phase2_label] = performance2
519 self.write_results_to_file(totals, "totals-" + file_name)
520 self.write_results_to_file(performance, "performance-" + file_name)
522 def write_results_to_file(self, results, file_name):
523 """Writes results to the csv plot file consumable by Jenkins.
526 :param file_name: Name of the (csv) file to be created
532 f = open(file_name, "wt")
534 for key in sorted(results):
535 first_line += key + ", "
536 second_line += str(results[key]) + ", "
537 first_line = first_line[:-2]
538 second_line = second_line[:-2]
539 f.write(first_line + "\n")
540 f.write(second_line + "\n")
541 logger.info("Message generator performance results stored in " +
543 logger.info(" " + first_line)
544 logger.info(" " + second_line)
548 # Return pseudo-randomized (reproducible) index for selected range
549 def randomize_index(self, index, lowest=None, highest=None):
550 """Calculates pseudo-randomized index from selected range.
553 :param index: input index
554 :param lowest: the lowes index from the randomized area
555 :param highest: the highest index from the randomized area
557 :return: the (pseudo)randomized index
559 Created just as a fame for future generator enhancement.
561 # default values handling
562 # TODO optimize default values handling (use e.g. dicionary.update() approach)
564 lowest = self.randomize_lowest_default
566 highest = self.randomize_highest_default
568 if (index >= lowest) and (index <= highest):
569 # we are in the randomized range -> shuffle it inside
570 # the range (now just reverse the order)
571 new_index = highest - (index - lowest)
573 # we are out of the randomized range -> nothing to do
577 def get_ls_nlri_values(self, index):
578 """Generates LS-NLRI parameters.
579 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
582 :param index: index (iteration)
584 :return: dictionary of LS NLRI parameters and values
586 # generating list of LS NLRI parameters
587 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
588 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
589 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
590 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
591 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
592 ls_nlri_values = {"Identifier": identifier,
593 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
594 "TunnelID": tunnel_id, "LSPID": lsp_id,
595 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
596 return ls_nlri_values
598 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
599 prefix_len=None, prefix_count=None, randomize=None):
600 """Generates list of IP address prefixes.
603 :param slot_index: index of group of prefix addresses
604 :param slot_size: size of group of prefix addresses
605 in [number of included prefixes]
606 :param prefix_base: IP address of the first prefix
607 (slot_index = 0, prefix_index = 0)
608 :param prefix_len: length of the prefix in bites
609 (the same as size of netmask)
610 :param prefix_count: number of prefixes to be returned
611 from the specified slot
613 :return: list of generated IP address prefixes
615 # default values handling
616 # TODO optimize default values handling (use e.g. dicionary.update() approach)
617 if slot_size is None:
618 slot_size = self.slot_size_default
619 if prefix_base is None:
620 prefix_base = self.prefix_base_default
621 if prefix_len is None:
622 prefix_len = self.prefix_length_default
623 if prefix_count is None:
624 prefix_count = slot_size
625 if randomize is None:
626 randomize = self.randomize_updates_default
627 # generating list of prefixes
630 prefix_gap = 2 ** (32 - prefix_len)
631 for i in range(prefix_count):
632 prefix_index = slot_index * slot_size + i
634 prefix_index = self.randomize_index(prefix_index)
635 indexes.append(prefix_index)
636 prefixes.append(prefix_base + prefix_index * prefix_gap)
638 logger.debug(" Prefix slot index: " + str(slot_index))
639 logger.debug(" Prefix slot size: " + str(slot_size))
640 logger.debug(" Prefix count: " + str(prefix_count))
641 logger.debug(" Prefix indexes: " + str(indexes))
642 logger.debug(" Prefix list: " + str(prefixes))
645 def compose_update_message(self, prefix_count_to_add=None,
646 prefix_count_to_del=None):
647 """Composes an UPDATE message
650 :param prefix_count_to_add: # of prefixes to put into NLRI list
651 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
653 :return: encoded UPDATE message in HEX
655 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
656 lists or common message wich includes both prefix lists.
657 Updates global counters.
659 # default values handling
660 # TODO optimize default values handling (use e.g. dicionary.update() approach)
661 if prefix_count_to_add is None:
662 prefix_count_to_add = self.prefix_count_to_add_default
663 if prefix_count_to_del is None:
664 prefix_count_to_del = self.prefix_count_to_del_default
666 if self.log_info and not (self.iteration % 1000):
667 logger.info("Iteration: " + str(self.iteration) +
668 " - total remaining prefixes: " +
669 str(self.remaining_prefixes))
671 logger.debug("#" * 10 + " Iteration: " +
672 str(self.iteration) + " " + "#" * 10)
673 logger.debug("Remaining prefixes: " +
674 str(self.remaining_prefixes))
675 # scenario type & one-shot counter
676 straightforward_scenario = (self.remaining_prefixes >
677 self.remaining_prefixes_threshold)
678 if straightforward_scenario:
679 prefix_count_to_del = 0
681 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
682 if not self.phase1_start_time:
683 self.phase1_start_time = time.time()
686 logger.debug("--- COMBINED SCENARIO ---")
687 if not self.phase2_start_time:
688 self.phase2_start_time = time.time()
689 # tailor the number of prefixes if needed
690 prefix_count_to_add = (prefix_count_to_del +
691 min(prefix_count_to_add - prefix_count_to_del,
692 self.remaining_prefixes))
693 # prefix slots selection for insertion and withdrawal
694 slot_index_to_add = self.iteration
695 slot_index_to_del = slot_index_to_add - self.slot_gap_default
696 # getting lists of prefixes for insertion in this iteration
698 logger.debug("Prefixes to be inserted in this iteration:")
699 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
700 prefix_count=prefix_count_to_add)
701 # getting lists of prefixes for withdrawal in this iteration
703 logger.debug("Prefixes to be withdrawn in this iteration:")
704 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
705 prefix_count=prefix_count_to_del)
706 # generating the UPDATE mesage with LS-NLRI only
708 ls_nlri = self.get_ls_nlri_values(self.iteration)
709 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
712 # generating the UPDATE message with prefix lists
713 if self.single_update_default:
714 # Send prefixes to be introduced and withdrawn
715 # in one UPDATE message
716 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
717 nlri_prefixes=prefix_list_to_add)
719 # Send prefixes to be introduced and withdrawn
720 # in separate UPDATE messages (if needed)
721 msg_out = self.update_message(wr_prefixes=[],
722 nlri_prefixes=prefix_list_to_add)
723 if prefix_count_to_del:
724 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
726 # updating counters - who knows ... maybe I am last time here ;)
727 if straightforward_scenario:
728 self.phase1_stop_time = time.time()
729 self.phase1_updates_sent = self.updates_sent
731 self.phase2_stop_time = time.time()
732 self.phase2_updates_sent = (self.updates_sent -
733 self.phase1_updates_sent)
734 # updating totals for the next iteration
736 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
737 # returning the encoded message
740 # Section of message encoders
742 def open_message(self, version=None, my_autonomous_system=None,
743 hold_time=None, bgp_identifier=None):
744 """Generates an OPEN Message (rfc4271#section-4.2)
747 :param version: see the rfc4271#section-4.2
748 :param my_autonomous_system: see the rfc4271#section-4.2
749 :param hold_time: see the rfc4271#section-4.2
750 :param bgp_identifier: see the rfc4271#section-4.2
752 :return: encoded OPEN message in HEX
755 # default values handling
756 # TODO optimize default values handling (use e.g. dicionary.update() approach)
758 version = self.version_default
759 if my_autonomous_system is None:
760 my_autonomous_system = self.my_autonomous_system_default
761 if hold_time is None:
762 hold_time = self.hold_time_default
763 if bgp_identifier is None:
764 bgp_identifier = self.bgp_identifier_default
767 marker_hex = "\xFF" * 16
771 type_hex = struct.pack("B", type)
774 version_hex = struct.pack("B", version)
776 # my_autonomous_system
777 # AS_TRANS value, 23456 decadic.
778 my_autonomous_system_2_bytes = 23456
779 # AS number is mappable to 2 bytes
780 if my_autonomous_system < 65536:
781 my_autonomous_system_2_bytes = my_autonomous_system
782 my_autonomous_system_hex_2_bytes = struct.pack(">H",
783 my_autonomous_system)
786 hold_time_hex = struct.pack(">H", hold_time)
789 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
791 # Optional Parameters
792 optional_parameters_hex = ""
794 optional_parameter_hex = (
795 "\x02" # Param type ("Capability Ad")
796 "\x06" # Length (6 bytes)
797 "\x01" # Capability type (NLRI Unicast),
798 # see RFC 4760, secton 8
799 "\x04" # Capability value length
800 "\x00\x01" # AFI (Ipv4)
802 "\x01" # SAFI (Unicast)
804 optional_parameters_hex += optional_parameter_hex
807 optional_parameter_hex = (
808 "\x02" # Param type ("Capability Ad")
809 "\x06" # Length (6 bytes)
810 "\x01" # Capability type (NLRI Unicast),
811 # see RFC 4760, secton 8
812 "\x04" # Capability value length
813 "\x40\x04" # AFI (BGP-LS)
815 "\x47" # SAFI (BGP-LS)
817 optional_parameters_hex += optional_parameter_hex
820 optional_parameter_hex = (
821 "\x02" # Param type ("Capability Ad")
822 "\x06" # Length (6 bytes)
823 "\x01" # Multiprotocol extetension capability,
824 "\x04" # Capability value length
825 "\x00\x19" # AFI (L2-VPN)
829 optional_parameters_hex += optional_parameter_hex
832 optional_parameter_hex = (
833 "\x02" # Param type ("Capability Ad")
834 "\x06" # Length (6 bytes)
835 "\x01" # Multiprotocol extetension capability,
836 "\x04" # Capability value length
837 "\x00\x01" # AFI (IPV4)
839 "\x05" # SAFI (MCAST-VPN)
841 optional_parameters_hex += optional_parameter_hex
842 optional_parameter_hex = (
843 "\x02" # Param type ("Capability Ad")
844 "\x06" # Length (6 bytes)
845 "\x01" # Multiprotocol extetension capability,
846 "\x04" # Capability value length
847 "\x00\x02" # AFI (IPV6)
849 "\x05" # SAFI (MCAST-VPN)
851 optional_parameters_hex += optional_parameter_hex
853 optional_parameter_hex = (
854 "\x02" # Param type ("Capability Ad")
855 "\x06" # Length (6 bytes)
856 "\x41" # "32 bit AS Numbers Support"
857 # (see RFC 6793, section 3)
858 "\x04" # Capability value length
860 optional_parameter_hex += (
861 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
863 optional_parameters_hex += optional_parameter_hex
865 # Optional Parameters Length
866 optional_parameters_length = len(optional_parameters_hex)
867 optional_parameters_length_hex = struct.pack("B",
868 optional_parameters_length)
870 # Length (big-endian)
872 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
873 len(my_autonomous_system_hex_2_bytes) +
874 len(hold_time_hex) + len(bgp_identifier_hex) +
875 len(optional_parameters_length_hex) +
876 len(optional_parameters_hex)
878 length_hex = struct.pack(">H", length)
886 my_autonomous_system_hex_2_bytes +
889 optional_parameters_length_hex +
890 optional_parameters_hex
894 logger.debug("OPEN message encoding")
895 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
896 logger.debug(" Length=" + str(length) + " (0x" +
897 binascii.hexlify(length_hex) + ")")
898 logger.debug(" Type=" + str(type) + " (0x" +
899 binascii.hexlify(type_hex) + ")")
900 logger.debug(" Version=" + str(version) + " (0x" +
901 binascii.hexlify(version_hex) + ")")
902 logger.debug(" My Autonomous System=" +
903 str(my_autonomous_system_2_bytes) + " (0x" +
904 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
906 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
907 binascii.hexlify(hold_time_hex) + ")")
908 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
909 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
910 logger.debug(" Optional Parameters Length=" +
911 str(optional_parameters_length) + " (0x" +
912 binascii.hexlify(optional_parameters_length_hex) +
914 logger.debug(" Optional Parameters=0x" +
915 binascii.hexlify(optional_parameters_hex))
916 logger.debug("OPEN message encoded: 0x%s",
917 binascii.b2a_hex(message_hex))
921 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
922 wr_prefix_length=None, nlri_prefix_length=None,
923 my_autonomous_system=None, next_hop=None,
924 originator_id=None, cluster_list_item=None,
925 end_of_rib=False, **ls_nlri_params):
926 """Generates an UPDATE Message (rfc4271#section-4.3)
929 :param wr_prefixes: see the rfc4271#section-4.3
930 :param nlri_prefixes: see the rfc4271#section-4.3
931 :param wr_prefix_length: see the rfc4271#section-4.3
932 :param nlri_prefix_length: see the rfc4271#section-4.3
933 :param my_autonomous_system: see the rfc4271#section-4.3
934 :param next_hop: see the rfc4271#section-4.3
936 :return: encoded UPDATE message in HEX
939 # default values handling
940 # TODO optimize default values handling (use e.g. dicionary.update() approach)
941 if wr_prefixes is None:
942 wr_prefixes = self.wr_prefixes_default
943 if nlri_prefixes is None:
944 nlri_prefixes = self.nlri_prefixes_default
945 if wr_prefix_length is None:
946 wr_prefix_length = self.prefix_length_default
947 if nlri_prefix_length is None:
948 nlri_prefix_length = self.prefix_length_default
949 if my_autonomous_system is None:
950 my_autonomous_system = self.my_autonomous_system_default
952 next_hop = self.next_hop_default
953 if originator_id is None:
954 originator_id = self.originator_id_default
955 if cluster_list_item is None:
956 cluster_list_item = self.cluster_list_item_default
957 ls_nlri = self.ls_nlri_default.copy()
958 ls_nlri.update(ls_nlri_params)
961 marker_hex = "\xFF" * 16
965 type_hex = struct.pack("B", type)
968 withdrawn_routes_hex = ""
970 bytes = ((wr_prefix_length - 1) / 8) + 1
971 for prefix in wr_prefixes:
972 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
973 struct.pack(">I", int(prefix))[:bytes])
974 withdrawn_routes_hex += withdrawn_route_hex
976 # Withdrawn Routes Length
977 withdrawn_routes_length = len(withdrawn_routes_hex)
978 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
980 # TODO: to replace hardcoded string by encoding?
982 path_attributes_hex = ""
983 if not self.skipattr:
984 path_attributes_hex += (
985 "\x40" # Flags ("Well-Known")
986 "\x01" # Type (ORIGIN)
990 path_attributes_hex += (
991 "\x40" # Flags ("Well-Known")
992 "\x02" # Type (AS_PATH)
994 "\x02" # AS segment type (AS_SEQUENCE)
995 "\x01" # AS segment length (1)
997 my_as_hex = struct.pack(">I", my_autonomous_system)
998 path_attributes_hex += my_as_hex # AS segment (4 bytes)
999 path_attributes_hex += (
1000 "\x40" # Flags ("Well-Known")
1001 "\x05" # Type (LOCAL_PREF)
1003 "\x00\x00\x00\x64" # (100)
1005 if nlri_prefixes != []:
1006 path_attributes_hex += (
1007 "\x40" # Flags ("Well-Known")
1008 "\x03" # Type (NEXT_HOP)
1011 next_hop_hex = struct.pack(">I", int(next_hop))
1012 path_attributes_hex += (
1013 next_hop_hex # IP address of the next hop (4 bytes)
1015 if originator_id is not None:
1016 path_attributes_hex += (
1017 "\x80" # Flags ("Optional, non-transitive")
1018 "\x09" # Type (ORIGINATOR_ID)
1020 ) # ORIGINATOR_ID (4 bytes)
1021 path_attributes_hex += struct.pack(">I", int(originator_id))
1022 if cluster_list_item is not None:
1023 path_attributes_hex += (
1024 "\x80" # Flags ("Optional, non-transitive")
1025 "\x0a" # Type (CLUSTER_LIST)
1027 ) # one CLUSTER_LIST item (4 bytes)
1028 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1030 if self.bgpls and not end_of_rib:
1031 path_attributes_hex += (
1032 "\x80" # Flags ("Optional, non-transitive")
1033 "\x0e" # Type (MP_REACH_NLRI)
1034 "\x22" # Length (34)
1035 "\x40\x04" # AFI (BGP-LS)
1036 "\x47" # SAFI (BGP-LS)
1037 "\x04" # Next Hop Length (4)
1039 path_attributes_hex += struct.pack(">I", int(next_hop))
1040 path_attributes_hex += "\x00" # Reserved
1041 path_attributes_hex += (
1042 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1043 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1044 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1046 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1047 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1048 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1049 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1050 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1052 # Total Path Attributes Length
1053 total_path_attributes_length = len(path_attributes_hex)
1054 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1056 # Network Layer Reachability Information
1059 bytes = ((nlri_prefix_length - 1) / 8) + 1
1060 for prefix in nlri_prefixes:
1061 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1062 struct.pack(">I", int(prefix))[:bytes])
1063 nlri_hex += nlri_prefix_hex
1065 # Length (big-endian)
1067 len(marker_hex) + 2 + len(type_hex) +
1068 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1069 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1071 length_hex = struct.pack(">H", length)
1078 withdrawn_routes_length_hex +
1079 withdrawn_routes_hex +
1080 total_path_attributes_length_hex +
1081 path_attributes_hex +
1086 logger.debug("UPDATE message encoding")
1087 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1088 logger.debug(" Length=" + str(length) + " (0x" +
1089 binascii.hexlify(length_hex) + ")")
1090 logger.debug(" Type=" + str(type) + " (0x" +
1091 binascii.hexlify(type_hex) + ")")
1092 logger.debug(" withdrawn_routes_length=" +
1093 str(withdrawn_routes_length) + " (0x" +
1094 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1095 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1096 str(wr_prefix_length) + " (0x" +
1097 binascii.hexlify(withdrawn_routes_hex) + ")")
1098 if total_path_attributes_length:
1099 logger.debug(" Total Path Attributes Length=" +
1100 str(total_path_attributes_length) + " (0x" +
1101 binascii.hexlify(total_path_attributes_length_hex) + ")")
1102 logger.debug(" Path Attributes=" + "(0x" +
1103 binascii.hexlify(path_attributes_hex) + ")")
1104 logger.debug(" Origin=IGP")
1105 logger.debug(" AS path=" + str(my_autonomous_system))
1106 logger.debug(" Next hop=" + str(next_hop))
1107 if originator_id is not None:
1108 logger.debug(" Originator id=" + str(originator_id))
1109 if cluster_list_item is not None:
1110 logger.debug(" Cluster list=" + str(cluster_list_item))
1112 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1113 logger.debug(" Network Layer Reachability Information=" +
1114 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1115 " (0x" + binascii.hexlify(nlri_hex) + ")")
1116 logger.debug("UPDATE message encoded: 0x" +
1117 binascii.b2a_hex(message_hex))
1120 self.updates_sent += 1
1121 # returning encoded message
1124 def notification_message(self, error_code, error_subcode, data_hex=""):
1125 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1128 :param error_code: see the rfc4271#section-4.5
1129 :param error_subcode: see the rfc4271#section-4.5
1130 :param data_hex: see the rfc4271#section-4.5
1132 :return: encoded NOTIFICATION message in HEX
1136 marker_hex = "\xFF" * 16
1140 type_hex = struct.pack("B", type)
1143 error_code_hex = struct.pack("B", error_code)
1146 error_subcode_hex = struct.pack("B", error_subcode)
1148 # Length (big-endian)
1149 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1150 len(error_subcode_hex) + len(data_hex))
1151 length_hex = struct.pack(">H", length)
1153 # NOTIFICATION Message
1164 logger.debug("NOTIFICATION message encoding")
1165 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1166 logger.debug(" Length=" + str(length) + " (0x" +
1167 binascii.hexlify(length_hex) + ")")
1168 logger.debug(" Type=" + str(type) + " (0x" +
1169 binascii.hexlify(type_hex) + ")")
1170 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1171 binascii.hexlify(error_code_hex) + ")")
1172 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1173 binascii.hexlify(error_subcode_hex) + ")")
1174 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1175 logger.debug("NOTIFICATION message encoded: 0x%s",
1176 binascii.b2a_hex(message_hex))
1180 def keepalive_message(self):
1181 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1184 :return: encoded KEEP ALIVE message in HEX
1188 marker_hex = "\xFF" * 16
1192 type_hex = struct.pack("B", type)
1194 # Length (big-endian)
1195 length = len(marker_hex) + 2 + len(type_hex)
1196 length_hex = struct.pack(">H", length)
1198 # KEEP ALIVE Message
1206 logger.debug("KEEP ALIVE message encoding")
1207 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1208 logger.debug(" Length=" + str(length) + " (0x" +
1209 binascii.hexlify(length_hex) + ")")
1210 logger.debug(" Type=" + str(type) + " (0x" +
1211 binascii.hexlify(type_hex) + ")")
1212 logger.debug("KEEP ALIVE message encoded: 0x%s",
1213 binascii.b2a_hex(message_hex))
1218 class TimeTracker(object):
1219 """Class for tracking timers, both for my keepalives and
1223 def __init__(self, msg_in):
1224 """Initialisation. based on defaults and OPEN message from peer.
1227 msg_in: the OPEN message received from peer.
1229 # Note: Relative time is always named timedelta, to stress that
1230 # the (non-delta) time is absolute.
1231 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1232 # Upper bound for being stuck in the same state, we should
1233 # at least report something before continuing.
1234 # Negotiate the hold timer by taking the smaller
1235 # of the 2 values (mine and the peer's).
1236 hold_timedelta = 180 # Not an attribute of self yet.
1237 # TODO: Make the default value configurable,
1238 # default value could mirror what peer said.
1239 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1240 if hold_timedelta > peer_hold_timedelta:
1241 hold_timedelta = peer_hold_timedelta
1242 if hold_timedelta != 0 and hold_timedelta < 3:
1243 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1244 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1245 self.hold_timedelta = hold_timedelta
1246 # If we do not hear from peer this long, we assume it has died.
1247 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1248 # Upper limit for duration between messages, to avoid being
1249 # declared to be dead.
1250 # The same as calling snapshot(), but also declares a field.
1251 self.snapshot_time = time.time()
1252 # Sometimes we need to store time. This is where to get
1253 # the value from afterwards. Time_keepalive may be too strict.
1254 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1255 # At this time point, peer will be declared dead.
1256 self.my_keepalive_time = None # to be set later
1257 # At this point, we should be sending keepalive message.
1260 """Store current time in instance data to use later."""
1261 # Read as time before something interesting was called.
1262 self.snapshot_time = time.time()
1264 def reset_peer_hold_time(self):
1265 """Move hold time to future as peer has just proven it still lives."""
1266 self.peer_hold_time = time.time() + self.hold_timedelta
1268 # Some methods could rely on self.snapshot_time, but it is better
1269 # to require user to provide it explicitly.
1270 def reset_my_keepalive_time(self, keepalive_time):
1271 """Calculate and set the next my KEEP ALIVE timeout time
1274 :keepalive_time: the initial value of the KEEP ALIVE timer
1276 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1278 def is_time_for_my_keepalive(self):
1279 """Check for my KEEP ALIVE timeout occurence"""
1280 if self.hold_timedelta == 0:
1282 return self.snapshot_time >= self.my_keepalive_time
1284 def get_next_event_time(self):
1285 """Set the time of the next expected or to be sent KEEP ALIVE"""
1286 if self.hold_timedelta == 0:
1287 return self.snapshot_time + 86400
1288 return min(self.my_keepalive_time, self.peer_hold_time)
1290 def check_peer_hold_time(self, snapshot_time):
1291 """Raise error if nothing was read from peer until specified time."""
1292 # Hold time = 0 means keepalive checking off.
1293 if self.hold_timedelta != 0:
1294 # time.time() may be too strict
1295 if snapshot_time > self.peer_hold_time:
1296 logger.error("Peer has overstepped the hold timer.")
1297 raise RuntimeError("Peer has overstepped the hold timer.")
1298 # TODO: Include hold_timedelta?
1299 # TODO: Add notification sending (attempt). That means
1300 # move to write tracker.
1303 class ReadTracker(object):
1304 """Class for tracking read of mesages chunk by chunk and
1308 def __init__(self, bgp_socket, timer, storage, evpn=False, mvpn=False, wait_for_read=10):
1309 """The reader initialisation.
1312 bgp_socket: socket to be used for sending
1313 timer: timer to be used for scheduling
1314 storage: thread safe dict
1315 evpn: flag that evpn functionality is tested
1316 mvpn: flag that mvpn functionality is tested
1318 # References to outside objects.
1319 self.socket = bgp_socket
1321 # BGP marker length plus length field length.
1322 self.header_length = 18
1323 # TODO: make it class (constant) attribute
1324 # Computation of where next chunk ends depends on whether
1325 # we are beyond length field.
1326 self.reading_header = True
1327 # Countdown towards next size computation.
1328 self.bytes_to_read = self.header_length
1329 # Incremental buffer for message under read.
1331 # Initialising counters
1332 self.updates_received = 0
1333 self.prefixes_introduced = 0
1334 self.prefixes_withdrawn = 0
1335 self.rx_idle_time = 0
1336 self.rx_activity_detected = True
1337 self.storage = storage
1340 self.wfr = wait_for_read
1342 def read_message_chunk(self):
1343 """Read up to one message
1346 Currently it does not return anything.
1348 # TODO: We could return the whole message, currently not needed.
1349 # We assume the socket is readable.
1350 logger.info("READING MESSAGE")
1351 chunk_message = self.socket.recv(self.bytes_to_read)
1352 self.msg_in += chunk_message
1353 self.bytes_to_read -= len(chunk_message)
1354 # TODO: bytes_to_read < 0 is not possible, right?
1355 if not self.bytes_to_read:
1356 # Finished reading a logical block.
1357 if self.reading_header:
1358 # The logical block was a BGP header.
1359 # Now we know the size of the message.
1360 self.reading_header = False
1361 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1363 else: # We have finished reading the body of the message.
1364 # Peer has just proven it is still alive.
1365 self.timer.reset_peer_hold_time()
1366 # TODO: Do we want to count received messages?
1367 # This version ignores the received message.
1368 # TODO: Should we do validation and exit on anything
1369 # besides update or keepalive?
1370 # Prepare state for reading another message.
1371 message_type_hex = self.msg_in[self.header_length]
1372 if message_type_hex == "\x01":
1373 logger.info("OPEN message received: 0x%s",
1374 binascii.b2a_hex(self.msg_in))
1375 elif message_type_hex == "\x02":
1376 logger.debug("UPDATE message received: 0x%s",
1377 binascii.b2a_hex(self.msg_in))
1378 self.decode_update_message(self.msg_in)
1379 elif message_type_hex == "\x03":
1380 logger.info("NOTIFICATION message received: 0x%s",
1381 binascii.b2a_hex(self.msg_in))
1382 elif message_type_hex == "\x04":
1383 logger.info("KEEP ALIVE message received: 0x%s",
1384 binascii.b2a_hex(self.msg_in))
1386 logger.warning("Unexpected message received: 0x%s",
1387 binascii.b2a_hex(self.msg_in))
1389 self.reading_header = True
1390 self.bytes_to_read = self.header_length
1391 # We should not act upon peer_hold_time if we are reading
1392 # something right now.
1395 def decode_path_attributes(self, path_attributes_hex):
1396 """Decode the Path Attributes field (rfc4271#section-4.3)
1399 :path_attributes: path_attributes field to be decoded in hex
1403 hex_to_decode = path_attributes_hex
1405 while len(hex_to_decode):
1406 attr_flags_hex = hex_to_decode[0]
1407 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1408 # attr_optional_bit = attr_flags & 128
1409 # attr_transitive_bit = attr_flags & 64
1410 # attr_partial_bit = attr_flags & 32
1411 attr_extended_length_bit = attr_flags & 16
1413 attr_type_code_hex = hex_to_decode[1]
1414 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1416 if attr_extended_length_bit:
1417 attr_length_hex = hex_to_decode[2:4]
1418 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1419 attr_value_hex = hex_to_decode[4:4 + attr_length]
1420 hex_to_decode = hex_to_decode[4 + attr_length:]
1422 attr_length_hex = hex_to_decode[2]
1423 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1424 attr_value_hex = hex_to_decode[3:3 + attr_length]
1425 hex_to_decode = hex_to_decode[3 + attr_length:]
1427 if attr_type_code == 1:
1428 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1429 binascii.b2a_hex(attr_flags_hex))
1430 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1431 elif attr_type_code == 2:
1432 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1433 binascii.b2a_hex(attr_flags_hex))
1434 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1435 elif attr_type_code == 3:
1436 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1437 binascii.b2a_hex(attr_flags_hex))
1438 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1439 elif attr_type_code == 4:
1440 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1441 binascii.b2a_hex(attr_flags_hex))
1442 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1443 elif attr_type_code == 5:
1444 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1445 binascii.b2a_hex(attr_flags_hex))
1446 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1447 elif attr_type_code == 6:
1448 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1449 binascii.b2a_hex(attr_flags_hex))
1450 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1451 elif attr_type_code == 7:
1452 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1453 binascii.b2a_hex(attr_flags_hex))
1454 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1455 elif attr_type_code == 9: # rfc4456#section-8
1456 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1457 binascii.b2a_hex(attr_flags_hex))
1458 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1459 elif attr_type_code == 10: # rfc4456#section-8
1460 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1461 binascii.b2a_hex(attr_flags_hex))
1462 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1463 elif attr_type_code == 14: # rfc4760#section-3
1464 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1465 binascii.b2a_hex(attr_flags_hex))
1466 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1467 address_family_identifier_hex = attr_value_hex[0:2]
1468 logger.debug(" Address Family Identifier=0x%s",
1469 binascii.b2a_hex(address_family_identifier_hex))
1470 subsequent_address_family_identifier_hex = attr_value_hex[2]
1471 logger.debug(" Subsequent Address Family Identifier=0x%s",
1472 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1473 next_hop_netaddr_len_hex = attr_value_hex[3]
1474 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1475 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1476 next_hop_netaddr_len,
1477 binascii.b2a_hex(next_hop_netaddr_len_hex))
1478 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1479 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1480 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1481 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1482 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1483 logger.debug(" Reserved=0x%s",
1484 binascii.b2a_hex(reserved_hex))
1485 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1486 logger.debug(" Network Layer Reachability Information=0x%s",
1487 binascii.b2a_hex(nlri_hex))
1488 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1489 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1490 for prefix in nlri_prefix_list:
1491 logger.debug(" nlri_prefix_received: %s", prefix)
1492 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1493 elif attr_type_code == 15: # rfc4760#section-4
1494 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1495 binascii.b2a_hex(attr_flags_hex))
1496 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1497 address_family_identifier_hex = attr_value_hex[0:2]
1498 logger.debug(" Address Family Identifier=0x%s",
1499 binascii.b2a_hex(address_family_identifier_hex))
1500 subsequent_address_family_identifier_hex = attr_value_hex[2]
1501 logger.debug(" Subsequent Address Family Identifier=0x%s",
1502 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1503 wd_hex = attr_value_hex[3:]
1504 logger.debug(" Withdrawn Routes=0x%s",
1505 binascii.b2a_hex(wd_hex))
1506 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1507 logger.debug(" Withdrawn routes prefix list: %s",
1509 for prefix in wdr_prefix_list:
1510 logger.debug(" withdrawn_prefix_received: %s", prefix)
1511 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1513 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1514 binascii.b2a_hex(attr_flags_hex))
1515 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1518 def decode_update_message(self, msg):
1519 """Decode an UPDATE message (rfc4271#section-4.3)
1522 :msg: message to be decoded in hex
1526 logger.debug("Decoding update message:")
1527 # message header - marker
1528 marker_hex = msg[:16]
1529 logger.debug("Message header marker: 0x%s",
1530 binascii.b2a_hex(marker_hex))
1531 # message header - message length
1532 msg_length_hex = msg[16:18]
1533 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1534 logger.debug("Message lenght: 0x%s (%s)",
1535 binascii.b2a_hex(msg_length_hex), msg_length)
1536 # message header - message type
1537 msg_type_hex = msg[18:19]
1538 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1540 with self.storage as stor:
1541 # this will replace the previously stored message
1542 stor['update'] = binascii.hexlify(msg)
1544 logger.debug("Evpn {}".format(self.evpn))
1546 logger.debug("Skipping update decoding due to evpn data expected")
1549 logger.debug("Mvpn {}".format(self.mvpn))
1551 logger.debug("Skipping update decoding due to mvpn data expected")
1555 logger.debug("Message type: 0x%s (update)",
1556 binascii.b2a_hex(msg_type_hex))
1557 # withdrawn routes length
1558 wdr_length_hex = msg[19:21]
1559 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1560 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1561 binascii.b2a_hex(wdr_length_hex), wdr_length)
1563 wdr_hex = msg[21:21 + wdr_length]
1564 logger.debug("Withdrawn routes: 0x%s",
1565 binascii.b2a_hex(wdr_hex))
1566 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1567 logger.debug("Withdrawn routes prefix list: %s",
1569 for prefix in wdr_prefix_list:
1570 logger.debug("withdrawn_prefix_received: %s", prefix)
1571 # total path attribute length
1572 total_pa_length_offset = 21 + wdr_length
1573 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1574 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1575 logger.debug("Total path attribute lenght: 0x%s (%s)",
1576 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1578 pa_offset = total_pa_length_offset + 2
1579 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1580 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1581 self.decode_path_attributes(pa_hex)
1582 # network layer reachability information length
1583 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1584 logger.debug("Calculated NLRI length: %s", nlri_length)
1585 # network layer reachability information
1586 nlri_offset = pa_offset + total_pa_length
1587 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1588 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1589 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1590 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1591 for prefix in nlri_prefix_list:
1592 logger.debug("nlri_prefix_received: %s", prefix)
1594 self.updates_received += 1
1595 self.prefixes_introduced += len(nlri_prefix_list)
1596 self.prefixes_withdrawn += len(wdr_prefix_list)
1598 logger.error("Unexpeced message type 0x%s in 0x%s",
1599 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1601 def wait_for_read(self):
1602 """Read message until timeout (next expected event).
1605 Used when no more updates has to be sent to avoid busy-wait.
1606 Currently it does not return anything.
1608 # Compute time to the first predictable state change
1609 event_time = self.timer.get_next_event_time()
1610 # snapshot_time would be imprecise
1611 wait_timedelta = min(event_time - time.time(), self.wfr)
1612 if wait_timedelta < 0:
1613 # The program got around to waiting to an event in "very near
1614 # future" so late that it became a "past" event, thus tell
1615 # "select" to not wait at all. Passing negative timedelta to
1616 # select() would lead to either waiting forever (for -1) or
1617 # select.error("Invalid parameter") (for everything else).
1619 # And wait for event or something to read.
1621 if not self.rx_activity_detected or not (self.updates_received % 100):
1622 # right time to write statistics to the log (not for every update and
1623 # not too frequently to avoid having large log files)
1624 logger.info("total_received_update_message_counter: %s",
1625 self.updates_received)
1626 logger.info("total_received_nlri_prefix_counter: %s",
1627 self.prefixes_introduced)
1628 logger.info("total_received_withdrawn_prefix_counter: %s",
1629 self.prefixes_withdrawn)
1631 start_time = time.time()
1632 select.select([self.socket], [], [self.socket], wait_timedelta)
1633 timedelta = time.time() - start_time
1634 self.rx_idle_time += timedelta
1635 self.rx_activity_detected = timedelta < 1
1637 if not self.rx_activity_detected or not (self.updates_received % 100):
1638 # right time to write statistics to the log (not for every update and
1639 # not too frequently to avoid having large log files)
1640 logger.info("... idle for %.3fs", timedelta)
1641 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1645 class WriteTracker(object):
1646 """Class tracking enqueueing messages and sending chunks of them."""
1648 def __init__(self, bgp_socket, generator, timer):
1649 """The writter initialisation.
1652 bgp_socket: socket to be used for sending
1653 generator: generator to be used for message generation
1654 timer: timer to be used for scheduling
1656 # References to outside objects,
1657 self.socket = bgp_socket
1658 self.generator = generator
1660 # Really new fields.
1661 # TODO: Would attribute docstrings add anything substantial?
1662 self.sending_message = False
1663 self.bytes_to_send = 0
1666 def enqueue_message_for_sending(self, message):
1667 """Enqueue message and change state.
1670 message: message to be enqueued into the msg_out buffer
1672 self.msg_out += message
1673 self.bytes_to_send += len(message)
1674 self.sending_message = True
1676 def send_message_chunk_is_whole(self):
1677 """Send enqueued data from msg_out buffer
1680 :return: true if no remaining data to send
1682 # We assume there is a msg_out to send and socket is writable.
1683 # print "going to send", repr(self.msg_out)
1684 self.timer.snapshot()
1685 bytes_sent = self.socket.send(self.msg_out)
1686 # Forget the part of message that was sent.
1687 self.msg_out = self.msg_out[bytes_sent:]
1688 self.bytes_to_send -= bytes_sent
1689 if not self.bytes_to_send:
1690 # TODO: Is it possible to hit negative bytes_to_send?
1691 self.sending_message = False
1692 # We should have reset hold timer on peer side.
1693 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1694 # The possible reason for not prioritizing reads is gone.
1699 class StateTracker(object):
1700 """Main loop has state so complex it warrants this separate class."""
1702 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1703 """The state tracker initialisation.
1706 bgp_socket: socket to be used for sending / receiving
1707 generator: generator to be used for message generation
1708 timer: timer to be used for scheduling
1709 inqueue: user initiated messages queue
1710 storage: thread safe dict to store data for the rpc server
1711 cliargs: cli args from the user
1713 # References to outside objects.
1714 self.socket = bgp_socket
1715 self.generator = generator
1718 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn,
1719 mvpn=cliargs.mvpn, wait_for_read=cliargs.wfr)
1720 self.writer = WriteTracker(bgp_socket, generator, timer)
1721 # Prioritization state.
1722 self.prioritize_writing = False
1723 # In general, we prioritize reading over writing. But in order
1724 # not to get blocked by neverending reads, we should
1725 # check whether we are not risking running out of holdtime.
1726 # So in some situations, this field is set to True to attempt
1727 # finishing sending a message, after which this field resets
1729 # TODO: Alternative is to switch fairly between reading and
1730 # writing (called round robin from now on).
1731 # Message counting is done in generator.
1732 self.inqueue = inqueue
1734 def perform_one_loop_iteration(self):
1735 """ The main loop iteration
1738 Calculates priority, resolves all conditions, calls
1739 appropriate method and returns to caller to repeat.
1741 self.timer.snapshot()
1742 if not self.prioritize_writing:
1743 if self.timer.is_time_for_my_keepalive():
1744 if not self.writer.sending_message:
1745 # We need to schedule a keepalive ASAP.
1746 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1747 logger.info("KEEP ALIVE is sent.")
1748 # We are sending a message now, so let's prioritize it.
1749 self.prioritize_writing = True
1752 msg = self.inqueue.get_nowait()
1753 logger.info("Received message: {}".format(msg))
1754 msgbin = binascii.unhexlify(msg)
1755 self.writer.enqueue_message_for_sending(msgbin)
1758 # Now we know what our priorities are, we have to check
1759 # which actions are available.
1760 # socket.socket() returns three lists,
1761 # we store them to list of lists.
1762 list_list = select.select([self.socket], [self.socket], [self.socket],
1763 self.timer.report_timedelta)
1764 read_list, write_list, except_list = list_list
1765 # Lists are unpacked, each is either [] or [self.socket],
1766 # so we will test them as boolean.
1768 logger.error("Exceptional state on the socket.")
1769 raise RuntimeError("Exceptional state on socket", self.socket)
1770 # We will do either read or write.
1771 if not (self.prioritize_writing and write_list):
1772 # Either we have no reason to rush writes,
1773 # or the socket is not writable.
1774 # We are focusing on reading here.
1775 if read_list: # there is something to read indeed
1776 # In this case we want to read chunk of message
1777 # and repeat the select,
1778 self.reader.read_message_chunk()
1780 # We were focusing on reading, but nothing to read was there.
1781 # Good time to check peer for hold timer.
1782 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1783 # Quiet on the read front, we can have attempt to write.
1785 # Either we really want to reset peer's view of our hold
1786 # timer, or there was nothing to read.
1787 # Were we in the middle of sending a message?
1788 if self.writer.sending_message:
1789 # Was it the end of a message?
1790 whole = self.writer.send_message_chunk_is_whole()
1791 # We were pressed to send something and we did it.
1792 if self.prioritize_writing and whole:
1793 # We prioritize reading again.
1794 self.prioritize_writing = False
1796 # Finally to check if still update messages to be generated.
1797 if self.generator.remaining_prefixes:
1798 msg_out = self.generator.compose_update_message()
1799 if not self.generator.remaining_prefixes:
1800 # We have just finished update generation,
1801 # end-of-rib is due.
1802 logger.info("All update messages generated.")
1803 logger.info("Storing performance results.")
1804 self.generator.store_results()
1805 logger.info("Finally an END-OF-RIB is sent.")
1806 msg_out += self.generator.update_message(wr_prefixes=[],
1809 self.writer.enqueue_message_for_sending(msg_out)
1810 # Attempt for real sending to be done in next iteration.
1812 # Nothing to write anymore.
1813 # To avoid busy loop, we do idle waiting here.
1814 self.reader.wait_for_read()
1816 # We can neither read nor write.
1817 logger.warning("Input and output both blocked for " +
1818 str(self.timer.report_timedelta) + " seconds.")
1819 # FIXME: Are we sure select has been really waiting
1824 def create_logger(loglevel, logfile):
1825 """Create logger object
1828 :loglevel: log level
1829 :logfile: log file name
1831 :return: logger object
1833 logger = logging.getLogger("logger")
1834 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1835 console_handler = logging.StreamHandler()
1836 file_handler = logging.FileHandler(logfile, mode="w")
1837 console_handler.setFormatter(log_formatter)
1838 file_handler.setFormatter(log_formatter)
1839 logger.addHandler(console_handler)
1840 logger.addHandler(file_handler)
1841 logger.setLevel(loglevel)
1845 def job(arguments, inqueue, storage):
1846 """One time initialisation and iterations looping.
1848 Establish BGP connection and run iterations.
1851 :arguments: Command line arguments
1852 :inqueue: Data to be sent from play.py
1853 :storage: Shared dict for rpc server
1857 bgp_socket = establish_connection(arguments)
1858 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1859 # Receive open message before sending anything.
1860 # FIXME: Add parameter to send default open message first,
1861 # to work with "you first" peers.
1862 msg_in = read_open_message(bgp_socket)
1863 timer = TimeTracker(msg_in)
1864 generator = MessageGenerator(arguments)
1865 msg_out = generator.open_message()
1866 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1867 # Send our open message to the peer.
1868 bgp_socket.send(msg_out)
1869 # Wait for confirming keepalive.
1870 # TODO: Surely in just one packet?
1871 # Using exact keepalive length to not to see possible updates.
1872 msg_in = bgp_socket.recv(19)
1873 if msg_in != generator.keepalive_message():
1874 error_msg = "Open not confirmed by keepalive, instead got"
1875 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1876 raise MessageError(error_msg, msg_in)
1877 timer.reset_peer_hold_time()
1878 # Send the keepalive to indicate the connection is accepted.
1879 timer.snapshot() # Remember this time.
1880 msg_out = generator.keepalive_message()
1881 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1882 bgp_socket.send(msg_out)
1883 # Use the remembered time.
1884 timer.reset_my_keepalive_time(timer.snapshot_time)
1885 # End of initial handshake phase.
1886 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1887 while True: # main reactor loop
1888 state.perform_one_loop_iteration()
1892 '''Handler for SimpleXMLRPCServer'''
1894 def __init__(self, sendqueue, storage):
1898 :sendqueue: queue for data to be sent towards odl
1899 :storage: thread safe dict
1901 self.queue = sendqueue
1902 self.storage = storage
1904 def send(self, text):
1908 :text: hes string of the data to be sent
1910 self.queue.put(text)
1912 def get(self, text=''):
1913 '''Reads data form the storage
1915 - returns stored data or an empty string, at the moment only
1919 :text: a key to the storage to get the data
1923 with self.storage as stor:
1924 return stor.get(text, '')
1926 def clean(self, text=''):
1927 '''Cleans data form the storage
1930 :text: a key to the storage to clean the data
1932 with self.storage as stor:
1937 def threaded_job(arguments):
1938 """Run the job threaded
1941 :arguments: Command line arguments
1945 amount_left = arguments.amount
1946 utils_left = arguments.multiplicity
1947 prefix_current = arguments.firstprefix
1948 myip_current = arguments.myip
1950 rpcqueue = Queue.Queue()
1951 storage = SafeDict()
1954 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1955 amount_left -= amount_per_util
1958 args = deepcopy(arguments)
1959 args.amount = amount_per_util
1960 args.firstprefix = prefix_current
1961 args.myip = myip_current
1962 thread_args.append(args)
1966 prefix_current += amount_per_util * 16
1971 for t in thread_args:
1972 thread.start_new_thread(job, (t, rpcqueue, storage))
1974 print "Error: unable to start thread."
1977 rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
1978 rpcserver.register_instance(Rpcs(rpcqueue, storage))
1979 rpcserver.serve_forever()
1982 if __name__ == "__main__":
1983 arguments = parse_arguments()
1984 logger = create_logger(arguments.loglevel, arguments.logfile)
1985 threaded_job(arguments)