1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Maximum number of IP prefixes to be announced in one iteration"
69 parser.add_argument("--insert", default="1", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72 str_help = "The number of prefixes to process without withdrawals"
73 parser.add_argument("--prefill", default="0", type=int, help=str_help)
74 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75 parser.add_argument("--updates", choices=["single", "separate"],
76 default=["separate"], help=str_help)
77 str_help = "Base prefix IP address for prefix generation"
78 parser.add_argument("--firstprefix", default="8.0.1.0",
79 type=ipaddr.IPv4Address, help=str_help)
80 str_help = "The prefix length."
81 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82 str_help = "Listen for connection, instead of initiating it."
83 parser.add_argument("--listen", action="store_true", help=str_help)
84 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85 "Default value only suitable for listening.")
86 parser.add_argument("--myip", default="0.0.0.0",
87 type=ipaddr.IPv4Address, help=str_help)
88 str_help = ("TCP port to bind to when listening or initiating connection." +
89 "Default only suitable for initiating.")
90 parser.add_argument("--myport", default="0", type=int, help=str_help)
91 str_help = "The IP of the next hop to be placed into the update messages."
92 parser.add_argument("--nexthop", default="192.0.2.1",
93 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94 str_help = "Identifier of the route originator."
95 parser.add_argument("--originator", default=None,
96 type=ipaddr.IPv4Address, dest="originator", help=str_help)
97 str_help = "Cluster list item identifier."
98 parser.add_argument("--cluster", default=None,
99 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100 str_help = ("Numeric IP Address to try to connect to." +
101 "Currently no effect in listening mode.")
102 parser.add_argument("--peerip", default="127.0.0.2",
103 type=ipaddr.IPv4Address, help=str_help)
104 str_help = "TCP port to try to connect to. No effect in listening mode."
105 parser.add_argument("--peerport", default="179", type=int, help=str_help)
106 str_help = "Local hold time."
107 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108 str_help = "Log level (--error, --warning, --info, --debug)"
109 parser.add_argument("--error", dest="loglevel", action="store_const",
110 const=logging.ERROR, default=logging.INFO,
112 parser.add_argument("--warning", dest="loglevel", action="store_const",
113 const=logging.WARNING, default=logging.INFO,
115 parser.add_argument("--info", dest="loglevel", action="store_const",
116 const=logging.INFO, default=logging.INFO,
118 parser.add_argument("--debug", dest="loglevel", action="store_const",
119 const=logging.DEBUG, default=logging.INFO,
121 str_help = "Log file name"
122 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123 str_help = "Trailing part of the csv result files for plotting purposes"
124 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125 str_help = "Minimum number of updates to reach to include result into csv."
126 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129 str_help = "Link-State NLRI supported"
130 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131 str_help = "Link-State NLRI: Identifier"
132 parser.add_argument("-lsid", default="1", type=int, help=str_help)
133 str_help = "Link-State NLRI: Tunnel ID"
134 parser.add_argument("-lstid", default="1", type=int, help=str_help)
135 str_help = "Link-State NLRI: LSP ID"
136 parser.add_argument("-lspid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138 parser.add_argument("--lstsaddr", default="1.2.3.4",
139 type=ipaddr.IPv4Address, help=str_help)
140 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141 parser.add_argument("--lsteaddr", default="5.6.7.8",
142 type=ipaddr.IPv4Address, help=str_help)
143 str_help = "Link-State NLRI: Identifier Step"
144 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145 str_help = "Link-State NLRI: Tunnel ID Step"
146 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147 str_help = "Link-State NLRI: LSP ID Step"
148 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153 str_help = "How many play utilities are to be started."
154 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
160 arguments = parser.parse_args()
161 if arguments.multiplicity < 1:
162 print "Multiplicity", arguments.multiplicity, "is not positive."
164 # TODO: Are sanity checks (such as asnumber>=0) required?
168 def establish_connection(arguments):
169 """Establish connection to BGP peer.
172 :arguments: following command-line argumets are used
173 - arguments.myip: local IP address
174 - arguments.myport: local port
175 - arguments.peerip: remote IP address
176 - arguments.peerport: remote port
181 logger.info("Connecting in the listening mode.")
182 logger.debug("Local IP address: " + str(arguments.myip))
183 logger.debug("Local port: " + str(arguments.myport))
184 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
185 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
186 # bind need single tuple as argument
187 listening_socket.bind((str(arguments.myip), arguments.myport))
188 listening_socket.listen(1)
189 bgp_socket, _ = listening_socket.accept()
190 # TODO: Verify client IP is cotroller IP.
191 listening_socket.close()
193 logger.info("Connecting in the talking mode.")
194 logger.debug("Local IP address: " + str(arguments.myip))
195 logger.debug("Local port: " + str(arguments.myport))
196 logger.debug("Remote IP address: " + str(arguments.peerip))
197 logger.debug("Remote port: " + str(arguments.peerport))
198 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
199 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
200 # bind to force specified address and port
201 talking_socket.bind((str(arguments.myip), arguments.myport))
202 # socket does not spead ipaddr, hence str()
203 talking_socket.connect((str(arguments.peerip), arguments.peerport))
204 bgp_socket = talking_socket
205 logger.info("Connected to ODL.")
209 def get_short_int_from_message(message, offset=16):
210 """Extract 2-bytes number from provided message.
213 :message: given message
214 :offset: offset of the short_int inside the message
216 :return: required short_inf value.
218 default offset value is the BGP message size offset.
220 high_byte_int = ord(message[offset])
221 low_byte_int = ord(message[offset + 1])
222 short_int = high_byte_int * 256 + low_byte_int
226 def get_prefix_list_from_hex(prefixes_hex):
227 """Get decoded list of prefixes (rfc4271#section-4.3)
230 :prefixes_hex: list of prefixes to be decoded in hex
232 :return: list of prefixes in the form of ip address (X.X.X.X/X)
236 while offset < len(prefixes_hex):
237 prefix_bit_len_hex = prefixes_hex[offset]
238 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
239 prefix_len = ((prefix_bit_len - 1) / 8) + 1
240 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
241 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
242 offset += 1 + prefix_len
243 prefix_list.append(prefix + "/" + str(prefix_bit_len))
247 class MessageError(ValueError):
248 """Value error with logging optimized for hexlified messages."""
250 def __init__(self, text, message, *args):
253 Store and call super init for textual comment,
254 store raw message which caused it.
258 super(MessageError, self).__init__(text, message, *args)
261 """Generate human readable error message.
264 :return: human readable message as string
266 Use a placeholder string if the message is to be empty.
268 message = binascii.hexlify(self.msg)
270 message = "(empty message)"
271 return self.text + ": " + message
274 def read_open_message(bgp_socket):
275 """Receive peer's OPEN message
278 :bgp_socket: the socket to be read
280 :return: received OPEN message.
282 Performs just basic incomming message checks
284 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
285 # TODO: Can the incoming open message be split in more than one packet?
288 # 37 is minimal length of open message with 4-byte AS number.
290 "Message length (" + str(len(msg_in)) + ") is smaller than "
291 "minimal length of OPEN message with 4-byte AS number (37)"
293 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
294 raise MessageError(error_msg, msg_in)
295 # TODO: We could check BGP marker, but it is defined only later;
297 reported_length = get_short_int_from_message(msg_in)
298 if len(msg_in) != reported_length:
300 "Expected message length (" + reported_length +
301 ") does not match actual length (" + str(len(msg_in)) + ")"
303 logger.error(error_msg + binascii.hexlify(msg_in))
304 raise MessageError(error_msg, msg_in)
305 logger.info("Open message received.")
309 class MessageGenerator(object):
310 """Class which generates messages, holds states and configuration values."""
312 # TODO: Define bgp marker as a class (constant) variable.
313 def __init__(self, args):
314 """Initialisation according to command-line args.
317 :args: argsparser's Namespace object which contains command-line
318 options for MesageGenerator initialisation
320 Calculates and stores default values used later on for
323 self.total_prefix_amount = args.amount
324 # Number of update messages left to be sent.
325 self.remaining_prefixes = self.total_prefix_amount
327 # New parameters initialisation
329 self.prefix_base_default = args.firstprefix
330 self.prefix_length_default = args.prefixlen
331 self.wr_prefixes_default = []
332 self.nlri_prefixes_default = []
333 self.version_default = 4
334 self.my_autonomous_system_default = args.asnumber
335 self.hold_time_default = args.holdtime # Local hold time.
336 self.bgp_identifier_default = int(args.myip)
337 self.next_hop_default = args.nexthop
338 self.originator_id_default = args.originator
339 self.cluster_list_item_default = args.cluster
340 self.single_update_default = args.updates == "single"
341 self.randomize_updates_default = args.updates == "random"
342 self.prefix_count_to_add_default = args.insert
343 self.prefix_count_to_del_default = args.withdraw
344 if self.prefix_count_to_del_default < 0:
345 self.prefix_count_to_del_default = 0
346 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
347 # total number of prefixes must grow to avoid infinite test loop
348 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
349 self.slot_size_default = self.prefix_count_to_add_default
350 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
351 self.results_file_name_default = args.results
352 self.performance_threshold_default = args.threshold
353 self.rfc4760 = args.rfc4760
354 self.bgpls = args.bgpls
355 self.evpn = args.evpn
356 # Default values when BGP-LS Attributes are used
358 self.prefix_count_to_add_default = 1
359 self.prefix_count_to_del_default = 0
360 self.ls_nlri_default = {"Identifier": args.lsid,
361 "TunnelID": args.lstid,
363 "IPv4TunnelSenderAddress": args.lstsaddr,
364 "IPv4TunnelEndPointAddress": args.lsteaddr}
365 self.lsid_step = args.lsidstep
366 self.lstid_step = args.lstidstep
367 self.lspid_step = args.lspidstep
368 self.lstsaddr_step = args.lstsaddrstep
369 self.lsteaddr_step = args.lsteaddrstep
370 # Default values used for randomized part
371 s1_slots = ((self.total_prefix_amount -
372 self.remaining_prefixes_threshold - 1) /
373 self.prefix_count_to_add_default + 1)
374 s2_slots = ((self.remaining_prefixes_threshold - 1) /
375 (self.prefix_count_to_add_default -
376 self.prefix_count_to_del_default) + 1)
378 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
379 s2_first_index = s1_slots * self.prefix_count_to_add_default
380 s2_last_index = (s2_first_index +
381 s2_slots * (self.prefix_count_to_add_default -
382 self.prefix_count_to_del_default) - 1)
383 self.slot_gap_default = ((self.total_prefix_amount -
384 self.remaining_prefixes_threshold - 1) /
385 self.prefix_count_to_add_default + 1)
386 self.randomize_lowest_default = s2_first_index
387 self.randomize_highest_default = s2_last_index
388 # Initialising counters
389 self.phase1_start_time = 0
390 self.phase1_stop_time = 0
391 self.phase2_start_time = 0
392 self.phase2_stop_time = 0
393 self.phase1_updates_sent = 0
394 self.phase2_updates_sent = 0
395 self.updates_sent = 0
397 self.log_info = args.loglevel <= logging.INFO
398 self.log_debug = args.loglevel <= logging.DEBUG
400 Flags needed for the MessageGenerator performance optimization.
401 Calling logger methods each iteration even with proper log level set
402 slows down significantly the MessageGenerator performance.
403 Measured total generation time (1M updates, dry run, error log level):
404 - logging based on basic logger features: 36,2s
405 - logging based on advanced logger features (lazy logging): 21,2s
406 - conditional calling of logger methods enclosed inside condition: 8,6s
409 logger.info("Generator initialisation")
410 logger.info(" Target total number of prefixes to be introduced: " +
411 str(self.total_prefix_amount))
412 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
413 str(self.prefix_length_default))
414 logger.info(" My Autonomous System number: " +
415 str(self.my_autonomous_system_default))
416 logger.info(" My Hold Time: " + str(self.hold_time_default))
417 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
418 logger.info(" Next Hop: " + str(self.next_hop_default))
419 logger.info(" Originator ID: " + str(self.originator_id_default))
420 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
421 logger.info(" Prefix count to be inserted at once: " +
422 str(self.prefix_count_to_add_default))
423 logger.info(" Prefix count to be withdrawn at once: " +
424 str(self.prefix_count_to_del_default))
425 logger.info(" Fast pre-fill up to " +
426 str(self.total_prefix_amount -
427 self.remaining_prefixes_threshold) + " prefixes")
428 logger.info(" Remaining number of prefixes to be processed " +
429 "in parallel with withdrawals: " +
430 str(self.remaining_prefixes_threshold))
431 logger.debug(" Prefix index range used after pre-fill procedure [" +
432 str(self.randomize_lowest_default) + ", " +
433 str(self.randomize_highest_default) + "]")
434 if self.single_update_default:
435 logger.info(" Common single UPDATE will be generated " +
436 "for both NLRI & WITHDRAWN lists")
438 logger.info(" Two separate UPDATEs will be generated " +
439 "for each NLRI & WITHDRAWN lists")
440 if self.randomize_updates_default:
441 logger.info(" Generation of UPDATE messages will be randomized")
442 logger.info(" Let\'s go ...\n")
444 # TODO: Notification for hold timer expiration can be handy.
446 def store_results(self, file_name=None, threshold=None):
447 """ Stores specified results into files based on file_name value.
450 :param file_name: Trailing (common) part of result file names
451 :param threshold: Minimum number of sent updates needed for each
452 result to be included into result csv file
453 (mainly needed because of the result accuracy)
457 # default values handling
458 # TODO optimize default values handling (use e.g. dicionary.update() approach)
459 if file_name is None:
460 file_name = self.results_file_name_default
461 if threshold is None:
462 threshold = self.performance_threshold_default
463 # performance calculation
464 if self.phase1_updates_sent >= threshold:
465 totals1 = self.phase1_updates_sent
466 performance1 = int(self.phase1_updates_sent /
467 (self.phase1_stop_time - self.phase1_start_time))
471 if self.phase2_updates_sent >= threshold:
472 totals2 = self.phase2_updates_sent
473 performance2 = int(self.phase2_updates_sent /
474 (self.phase2_stop_time - self.phase2_start_time))
479 logger.info("#" * 10 + " Final results " + "#" * 10)
480 logger.info("Number of iterations: " + str(self.iteration))
481 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
482 str(self.phase1_updates_sent))
483 logger.info("The pre-fill phase duration: " +
484 str(self.phase1_stop_time - self.phase1_start_time) + "s")
485 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
486 str(self.phase2_updates_sent))
487 logger.info("The 2nd test phase duration: " +
488 str(self.phase2_stop_time - self.phase2_start_time) + "s")
489 logger.info("Threshold for performance reporting: " + str(threshold))
492 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
493 " route(s) per UPDATE")
494 if self.single_update_default:
495 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
496 "/-" + str(self.prefix_count_to_del_default) +
497 " routes per UPDATE")
499 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
500 "/-" + str(self.prefix_count_to_del_default) +
501 " routes in two UPDATEs")
502 # collecting capacity and performance results
505 if totals1 is not None:
506 totals[phase1_label] = totals1
507 performance[phase1_label] = performance1
508 if totals2 is not None:
509 totals[phase2_label] = totals2
510 performance[phase2_label] = performance2
511 self.write_results_to_file(totals, "totals-" + file_name)
512 self.write_results_to_file(performance, "performance-" + file_name)
514 def write_results_to_file(self, results, file_name):
515 """Writes results to the csv plot file consumable by Jenkins.
518 :param file_name: Name of the (csv) file to be created
524 f = open(file_name, "wt")
526 for key in sorted(results):
527 first_line += key + ", "
528 second_line += str(results[key]) + ", "
529 first_line = first_line[:-2]
530 second_line = second_line[:-2]
531 f.write(first_line + "\n")
532 f.write(second_line + "\n")
533 logger.info("Message generator performance results stored in " +
535 logger.info(" " + first_line)
536 logger.info(" " + second_line)
540 # Return pseudo-randomized (reproducible) index for selected range
541 def randomize_index(self, index, lowest=None, highest=None):
542 """Calculates pseudo-randomized index from selected range.
545 :param index: input index
546 :param lowest: the lowes index from the randomized area
547 :param highest: the highest index from the randomized area
549 :return: the (pseudo)randomized index
551 Created just as a fame for future generator enhancement.
553 # default values handling
554 # TODO optimize default values handling (use e.g. dicionary.update() approach)
556 lowest = self.randomize_lowest_default
558 highest = self.randomize_highest_default
560 if (index >= lowest) and (index <= highest):
561 # we are in the randomized range -> shuffle it inside
562 # the range (now just reverse the order)
563 new_index = highest - (index - lowest)
565 # we are out of the randomized range -> nothing to do
569 def get_ls_nlri_values(self, index):
570 """Generates LS-NLRI parameters.
571 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
574 :param index: index (iteration)
576 :return: dictionary of LS NLRI parameters and values
578 # generating list of LS NLRI parameters
579 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
580 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
581 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
582 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
583 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
584 ls_nlri_values = {"Identifier": identifier,
585 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
586 "TunnelID": tunnel_id, "LSPID": lsp_id,
587 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
588 return ls_nlri_values
590 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
591 prefix_len=None, prefix_count=None, randomize=None):
592 """Generates list of IP address prefixes.
595 :param slot_index: index of group of prefix addresses
596 :param slot_size: size of group of prefix addresses
597 in [number of included prefixes]
598 :param prefix_base: IP address of the first prefix
599 (slot_index = 0, prefix_index = 0)
600 :param prefix_len: length of the prefix in bites
601 (the same as size of netmask)
602 :param prefix_count: number of prefixes to be returned
603 from the specified slot
605 :return: list of generated IP address prefixes
607 # default values handling
608 # TODO optimize default values handling (use e.g. dicionary.update() approach)
609 if slot_size is None:
610 slot_size = self.slot_size_default
611 if prefix_base is None:
612 prefix_base = self.prefix_base_default
613 if prefix_len is None:
614 prefix_len = self.prefix_length_default
615 if prefix_count is None:
616 prefix_count = slot_size
617 if randomize is None:
618 randomize = self.randomize_updates_default
619 # generating list of prefixes
622 prefix_gap = 2 ** (32 - prefix_len)
623 for i in range(prefix_count):
624 prefix_index = slot_index * slot_size + i
626 prefix_index = self.randomize_index(prefix_index)
627 indexes.append(prefix_index)
628 prefixes.append(prefix_base + prefix_index * prefix_gap)
630 logger.debug(" Prefix slot index: " + str(slot_index))
631 logger.debug(" Prefix slot size: " + str(slot_size))
632 logger.debug(" Prefix count: " + str(prefix_count))
633 logger.debug(" Prefix indexes: " + str(indexes))
634 logger.debug(" Prefix list: " + str(prefixes))
637 def compose_update_message(self, prefix_count_to_add=None,
638 prefix_count_to_del=None):
639 """Composes an UPDATE message
642 :param prefix_count_to_add: # of prefixes to put into NLRI list
643 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
645 :return: encoded UPDATE message in HEX
647 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
648 lists or common message wich includes both prefix lists.
649 Updates global counters.
651 # default values handling
652 # TODO optimize default values handling (use e.g. dicionary.update() approach)
653 if prefix_count_to_add is None:
654 prefix_count_to_add = self.prefix_count_to_add_default
655 if prefix_count_to_del is None:
656 prefix_count_to_del = self.prefix_count_to_del_default
658 if self.log_info and not (self.iteration % 1000):
659 logger.info("Iteration: " + str(self.iteration) +
660 " - total remaining prefixes: " +
661 str(self.remaining_prefixes))
663 logger.debug("#" * 10 + " Iteration: " +
664 str(self.iteration) + " " + "#" * 10)
665 logger.debug("Remaining prefixes: " +
666 str(self.remaining_prefixes))
667 # scenario type & one-shot counter
668 straightforward_scenario = (self.remaining_prefixes >
669 self.remaining_prefixes_threshold)
670 if straightforward_scenario:
671 prefix_count_to_del = 0
673 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
674 if not self.phase1_start_time:
675 self.phase1_start_time = time.time()
678 logger.debug("--- COMBINED SCENARIO ---")
679 if not self.phase2_start_time:
680 self.phase2_start_time = time.time()
681 # tailor the number of prefixes if needed
682 prefix_count_to_add = (prefix_count_to_del +
683 min(prefix_count_to_add - prefix_count_to_del,
684 self.remaining_prefixes))
685 # prefix slots selection for insertion and withdrawal
686 slot_index_to_add = self.iteration
687 slot_index_to_del = slot_index_to_add - self.slot_gap_default
688 # getting lists of prefixes for insertion in this iteration
690 logger.debug("Prefixes to be inserted in this iteration:")
691 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
692 prefix_count=prefix_count_to_add)
693 # getting lists of prefixes for withdrawal in this iteration
695 logger.debug("Prefixes to be withdrawn in this iteration:")
696 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
697 prefix_count=prefix_count_to_del)
698 # generating the UPDATE mesage with LS-NLRI only
700 ls_nlri = self.get_ls_nlri_values(self.iteration)
701 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
704 # generating the UPDATE message with prefix lists
705 if self.single_update_default:
706 # Send prefixes to be introduced and withdrawn
707 # in one UPDATE message
708 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
709 nlri_prefixes=prefix_list_to_add)
711 # Send prefixes to be introduced and withdrawn
712 # in separate UPDATE messages (if needed)
713 msg_out = self.update_message(wr_prefixes=[],
714 nlri_prefixes=prefix_list_to_add)
715 if prefix_count_to_del:
716 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
718 # updating counters - who knows ... maybe I am last time here ;)
719 if straightforward_scenario:
720 self.phase1_stop_time = time.time()
721 self.phase1_updates_sent = self.updates_sent
723 self.phase2_stop_time = time.time()
724 self.phase2_updates_sent = (self.updates_sent -
725 self.phase1_updates_sent)
726 # updating totals for the next iteration
728 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
729 # returning the encoded message
732 # Section of message encoders
734 def open_message(self, version=None, my_autonomous_system=None,
735 hold_time=None, bgp_identifier=None):
736 """Generates an OPEN Message (rfc4271#section-4.2)
739 :param version: see the rfc4271#section-4.2
740 :param my_autonomous_system: see the rfc4271#section-4.2
741 :param hold_time: see the rfc4271#section-4.2
742 :param bgp_identifier: see the rfc4271#section-4.2
744 :return: encoded OPEN message in HEX
747 # default values handling
748 # TODO optimize default values handling (use e.g. dicionary.update() approach)
750 version = self.version_default
751 if my_autonomous_system is None:
752 my_autonomous_system = self.my_autonomous_system_default
753 if hold_time is None:
754 hold_time = self.hold_time_default
755 if bgp_identifier is None:
756 bgp_identifier = self.bgp_identifier_default
759 marker_hex = "\xFF" * 16
763 type_hex = struct.pack("B", type)
766 version_hex = struct.pack("B", version)
768 # my_autonomous_system
769 # AS_TRANS value, 23456 decadic.
770 my_autonomous_system_2_bytes = 23456
771 # AS number is mappable to 2 bytes
772 if my_autonomous_system < 65536:
773 my_autonomous_system_2_bytes = my_autonomous_system
774 my_autonomous_system_hex_2_bytes = struct.pack(">H",
775 my_autonomous_system)
778 hold_time_hex = struct.pack(">H", hold_time)
781 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
783 # Optional Parameters
784 optional_parameters_hex = ""
786 optional_parameter_hex = (
787 "\x02" # Param type ("Capability Ad")
788 "\x06" # Length (6 bytes)
789 "\x01" # Capability type (NLRI Unicast),
790 # see RFC 4760, secton 8
791 "\x04" # Capability value length
792 "\x00\x01" # AFI (Ipv4)
794 "\x01" # SAFI (Unicast)
796 optional_parameters_hex += optional_parameter_hex
799 optional_parameter_hex = (
800 "\x02" # Param type ("Capability Ad")
801 "\x06" # Length (6 bytes)
802 "\x01" # Capability type (NLRI Unicast),
803 # see RFC 4760, secton 8
804 "\x04" # Capability value length
805 "\x40\x04" # AFI (BGP-LS)
807 "\x47" # SAFI (BGP-LS)
809 optional_parameters_hex += optional_parameter_hex
812 optional_parameter_hex = (
813 "\x02" # Param type ("Capability Ad")
814 "\x06" # Length (6 bytes)
815 "\x01" # Multiprotocol extetension capability,
816 "\x04" # Capability value length
817 "\x00\x19" # AFI (L2-VPN)
821 optional_parameters_hex += optional_parameter_hex
823 optional_parameter_hex = (
824 "\x02" # Param type ("Capability Ad")
825 "\x06" # Length (6 bytes)
826 "\x41" # "32 bit AS Numbers Support"
827 # (see RFC 6793, section 3)
828 "\x04" # Capability value length
830 optional_parameter_hex += (
831 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
833 optional_parameters_hex += optional_parameter_hex
835 # Optional Parameters Length
836 optional_parameters_length = len(optional_parameters_hex)
837 optional_parameters_length_hex = struct.pack("B",
838 optional_parameters_length)
840 # Length (big-endian)
842 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
843 len(my_autonomous_system_hex_2_bytes) +
844 len(hold_time_hex) + len(bgp_identifier_hex) +
845 len(optional_parameters_length_hex) +
846 len(optional_parameters_hex)
848 length_hex = struct.pack(">H", length)
856 my_autonomous_system_hex_2_bytes +
859 optional_parameters_length_hex +
860 optional_parameters_hex
864 logger.debug("OPEN message encoding")
865 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
866 logger.debug(" Length=" + str(length) + " (0x" +
867 binascii.hexlify(length_hex) + ")")
868 logger.debug(" Type=" + str(type) + " (0x" +
869 binascii.hexlify(type_hex) + ")")
870 logger.debug(" Version=" + str(version) + " (0x" +
871 binascii.hexlify(version_hex) + ")")
872 logger.debug(" My Autonomous System=" +
873 str(my_autonomous_system_2_bytes) + " (0x" +
874 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
876 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
877 binascii.hexlify(hold_time_hex) + ")")
878 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
879 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
880 logger.debug(" Optional Parameters Length=" +
881 str(optional_parameters_length) + " (0x" +
882 binascii.hexlify(optional_parameters_length_hex) +
884 logger.debug(" Optional Parameters=0x" +
885 binascii.hexlify(optional_parameters_hex))
886 logger.debug("OPEN message encoded: 0x%s",
887 binascii.b2a_hex(message_hex))
891 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
892 wr_prefix_length=None, nlri_prefix_length=None,
893 my_autonomous_system=None, next_hop=None,
894 originator_id=None, cluster_list_item=None,
895 end_of_rib=False, **ls_nlri_params):
896 """Generates an UPDATE Message (rfc4271#section-4.3)
899 :param wr_prefixes: see the rfc4271#section-4.3
900 :param nlri_prefixes: see the rfc4271#section-4.3
901 :param wr_prefix_length: see the rfc4271#section-4.3
902 :param nlri_prefix_length: see the rfc4271#section-4.3
903 :param my_autonomous_system: see the rfc4271#section-4.3
904 :param next_hop: see the rfc4271#section-4.3
906 :return: encoded UPDATE message in HEX
909 # default values handling
910 # TODO optimize default values handling (use e.g. dicionary.update() approach)
911 if wr_prefixes is None:
912 wr_prefixes = self.wr_prefixes_default
913 if nlri_prefixes is None:
914 nlri_prefixes = self.nlri_prefixes_default
915 if wr_prefix_length is None:
916 wr_prefix_length = self.prefix_length_default
917 if nlri_prefix_length is None:
918 nlri_prefix_length = self.prefix_length_default
919 if my_autonomous_system is None:
920 my_autonomous_system = self.my_autonomous_system_default
922 next_hop = self.next_hop_default
923 if originator_id is None:
924 originator_id = self.originator_id_default
925 if cluster_list_item is None:
926 cluster_list_item = self.cluster_list_item_default
927 ls_nlri = self.ls_nlri_default.copy()
928 ls_nlri.update(ls_nlri_params)
931 marker_hex = "\xFF" * 16
935 type_hex = struct.pack("B", type)
938 withdrawn_routes_hex = ""
940 bytes = ((wr_prefix_length - 1) / 8) + 1
941 for prefix in wr_prefixes:
942 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
943 struct.pack(">I", int(prefix))[:bytes])
944 withdrawn_routes_hex += withdrawn_route_hex
946 # Withdrawn Routes Length
947 withdrawn_routes_length = len(withdrawn_routes_hex)
948 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
950 # TODO: to replace hardcoded string by encoding?
952 path_attributes_hex = ""
953 if nlri_prefixes != []:
954 path_attributes_hex += (
955 "\x40" # Flags ("Well-Known")
956 "\x01" # Type (ORIGIN)
960 path_attributes_hex += (
961 "\x40" # Flags ("Well-Known")
962 "\x02" # Type (AS_PATH)
964 "\x02" # AS segment type (AS_SEQUENCE)
965 "\x01" # AS segment length (1)
967 my_as_hex = struct.pack(">I", my_autonomous_system)
968 path_attributes_hex += my_as_hex # AS segment (4 bytes)
969 path_attributes_hex += (
970 "\x40" # Flags ("Well-Known")
971 "\x03" # Type (NEXT_HOP)
974 next_hop_hex = struct.pack(">I", int(next_hop))
975 path_attributes_hex += (
976 next_hop_hex # IP address of the next hop (4 bytes)
978 path_attributes_hex += (
979 "\x40" # Flags ("Well-Known")
980 "\x05" # Type (LOCAL_PREF)
982 "\x00\x00\x00\x64" # (100)
984 if originator_id is not None:
985 path_attributes_hex += (
986 "\x80" # Flags ("Optional, non-transitive")
987 "\x09" # Type (ORIGINATOR_ID)
989 ) # ORIGINATOR_ID (4 bytes)
990 path_attributes_hex += struct.pack(">I", int(originator_id))
991 if cluster_list_item is not None:
992 path_attributes_hex += (
993 "\x80" # Flags ("Optional, non-transitive")
994 "\x09" # Type (CLUSTER_LIST)
996 ) # one CLUSTER_LIST item (4 bytes)
997 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
999 if self.bgpls and not end_of_rib:
1000 path_attributes_hex += (
1001 "\x80" # Flags ("Optional, non-transitive")
1002 "\x0e" # Type (MP_REACH_NLRI)
1003 "\x22" # Length (34)
1004 "\x40\x04" # AFI (BGP-LS)
1005 "\x47" # SAFI (BGP-LS)
1006 "\x04" # Next Hop Length (4)
1008 path_attributes_hex += struct.pack(">I", int(next_hop))
1009 path_attributes_hex += "\x00" # Reserved
1010 path_attributes_hex += (
1011 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1012 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1013 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1015 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1016 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1017 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1018 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1019 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1021 # Total Path Attributes Length
1022 total_path_attributes_length = len(path_attributes_hex)
1023 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1025 # Network Layer Reachability Information
1028 bytes = ((nlri_prefix_length - 1) / 8) + 1
1029 for prefix in nlri_prefixes:
1030 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1031 struct.pack(">I", int(prefix))[:bytes])
1032 nlri_hex += nlri_prefix_hex
1034 # Length (big-endian)
1036 len(marker_hex) + 2 + len(type_hex) +
1037 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1038 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1040 length_hex = struct.pack(">H", length)
1047 withdrawn_routes_length_hex +
1048 withdrawn_routes_hex +
1049 total_path_attributes_length_hex +
1050 path_attributes_hex +
1055 logger.debug("UPDATE message encoding")
1056 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1057 logger.debug(" Length=" + str(length) + " (0x" +
1058 binascii.hexlify(length_hex) + ")")
1059 logger.debug(" Type=" + str(type) + " (0x" +
1060 binascii.hexlify(type_hex) + ")")
1061 logger.debug(" withdrawn_routes_length=" +
1062 str(withdrawn_routes_length) + " (0x" +
1063 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1064 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1065 str(wr_prefix_length) + " (0x" +
1066 binascii.hexlify(withdrawn_routes_hex) + ")")
1067 if total_path_attributes_length:
1068 logger.debug(" Total Path Attributes Length=" +
1069 str(total_path_attributes_length) + " (0x" +
1070 binascii.hexlify(total_path_attributes_length_hex) + ")")
1071 logger.debug(" Path Attributes=" + "(0x" +
1072 binascii.hexlify(path_attributes_hex) + ")")
1073 logger.debug(" Origin=IGP")
1074 logger.debug(" AS path=" + str(my_autonomous_system))
1075 logger.debug(" Next hop=" + str(next_hop))
1076 if originator_id is not None:
1077 logger.debug(" Originator id=" + str(originator_id))
1078 if cluster_list_item is not None:
1079 logger.debug(" Cluster list=" + str(cluster_list_item))
1081 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1082 logger.debug(" Network Layer Reachability Information=" +
1083 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1084 " (0x" + binascii.hexlify(nlri_hex) + ")")
1085 logger.debug("UPDATE message encoded: 0x" +
1086 binascii.b2a_hex(message_hex))
1089 self.updates_sent += 1
1090 # returning encoded message
1093 def notification_message(self, error_code, error_subcode, data_hex=""):
1094 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1097 :param error_code: see the rfc4271#section-4.5
1098 :param error_subcode: see the rfc4271#section-4.5
1099 :param data_hex: see the rfc4271#section-4.5
1101 :return: encoded NOTIFICATION message in HEX
1105 marker_hex = "\xFF" * 16
1109 type_hex = struct.pack("B", type)
1112 error_code_hex = struct.pack("B", error_code)
1115 error_subcode_hex = struct.pack("B", error_subcode)
1117 # Length (big-endian)
1118 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1119 len(error_subcode_hex) + len(data_hex))
1120 length_hex = struct.pack(">H", length)
1122 # NOTIFICATION Message
1133 logger.debug("NOTIFICATION message encoding")
1134 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1135 logger.debug(" Length=" + str(length) + " (0x" +
1136 binascii.hexlify(length_hex) + ")")
1137 logger.debug(" Type=" + str(type) + " (0x" +
1138 binascii.hexlify(type_hex) + ")")
1139 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1140 binascii.hexlify(error_code_hex) + ")")
1141 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1142 binascii.hexlify(error_subcode_hex) + ")")
1143 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1144 logger.debug("NOTIFICATION message encoded: 0x%s",
1145 binascii.b2a_hex(message_hex))
1149 def keepalive_message(self):
1150 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1153 :return: encoded KEEP ALIVE message in HEX
1157 marker_hex = "\xFF" * 16
1161 type_hex = struct.pack("B", type)
1163 # Length (big-endian)
1164 length = len(marker_hex) + 2 + len(type_hex)
1165 length_hex = struct.pack(">H", length)
1167 # KEEP ALIVE Message
1175 logger.debug("KEEP ALIVE message encoding")
1176 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1177 logger.debug(" Length=" + str(length) + " (0x" +
1178 binascii.hexlify(length_hex) + ")")
1179 logger.debug(" Type=" + str(type) + " (0x" +
1180 binascii.hexlify(type_hex) + ")")
1181 logger.debug("KEEP ALIVE message encoded: 0x%s",
1182 binascii.b2a_hex(message_hex))
1187 class TimeTracker(object):
1188 """Class for tracking timers, both for my keepalives and
1192 def __init__(self, msg_in):
1193 """Initialisation. based on defaults and OPEN message from peer.
1196 msg_in: the OPEN message received from peer.
1198 # Note: Relative time is always named timedelta, to stress that
1199 # the (non-delta) time is absolute.
1200 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1201 # Upper bound for being stuck in the same state, we should
1202 # at least report something before continuing.
1203 # Negotiate the hold timer by taking the smaller
1204 # of the 2 values (mine and the peer's).
1205 hold_timedelta = 180 # Not an attribute of self yet.
1206 # TODO: Make the default value configurable,
1207 # default value could mirror what peer said.
1208 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1209 if hold_timedelta > peer_hold_timedelta:
1210 hold_timedelta = peer_hold_timedelta
1211 if hold_timedelta != 0 and hold_timedelta < 3:
1212 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1213 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1214 self.hold_timedelta = hold_timedelta
1215 # If we do not hear from peer this long, we assume it has died.
1216 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1217 # Upper limit for duration between messages, to avoid being
1218 # declared to be dead.
1219 # The same as calling snapshot(), but also declares a field.
1220 self.snapshot_time = time.time()
1221 # Sometimes we need to store time. This is where to get
1222 # the value from afterwards. Time_keepalive may be too strict.
1223 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1224 # At this time point, peer will be declared dead.
1225 self.my_keepalive_time = None # to be set later
1226 # At this point, we should be sending keepalive message.
1229 """Store current time in instance data to use later."""
1230 # Read as time before something interesting was called.
1231 self.snapshot_time = time.time()
1233 def reset_peer_hold_time(self):
1234 """Move hold time to future as peer has just proven it still lives."""
1235 self.peer_hold_time = time.time() + self.hold_timedelta
1237 # Some methods could rely on self.snapshot_time, but it is better
1238 # to require user to provide it explicitly.
1239 def reset_my_keepalive_time(self, keepalive_time):
1240 """Calculate and set the next my KEEP ALIVE timeout time
1243 :keepalive_time: the initial value of the KEEP ALIVE timer
1245 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1247 def is_time_for_my_keepalive(self):
1248 """Check for my KEEP ALIVE timeout occurence"""
1249 if self.hold_timedelta == 0:
1251 return self.snapshot_time >= self.my_keepalive_time
1253 def get_next_event_time(self):
1254 """Set the time of the next expected or to be sent KEEP ALIVE"""
1255 if self.hold_timedelta == 0:
1256 return self.snapshot_time + 86400
1257 return min(self.my_keepalive_time, self.peer_hold_time)
1259 def check_peer_hold_time(self, snapshot_time):
1260 """Raise error if nothing was read from peer until specified time."""
1261 # Hold time = 0 means keepalive checking off.
1262 if self.hold_timedelta != 0:
1263 # time.time() may be too strict
1264 if snapshot_time > self.peer_hold_time:
1265 logger.error("Peer has overstepped the hold timer.")
1266 raise RuntimeError("Peer has overstepped the hold timer.")
1267 # TODO: Include hold_timedelta?
1268 # TODO: Add notification sending (attempt). That means
1269 # move to write tracker.
1272 class ReadTracker(object):
1273 """Class for tracking read of mesages chunk by chunk and
1277 def __init__(self, bgp_socket, timer, storage, evpn=False, wait_for_read=10):
1278 """The reader initialisation.
1281 bgp_socket: socket to be used for sending
1282 timer: timer to be used for scheduling
1283 storage: thread safe dict
1284 evpn: flag that evpn functionality is tested
1286 # References to outside objects.
1287 self.socket = bgp_socket
1289 # BGP marker length plus length field length.
1290 self.header_length = 18
1291 # TODO: make it class (constant) attribute
1292 # Computation of where next chunk ends depends on whether
1293 # we are beyond length field.
1294 self.reading_header = True
1295 # Countdown towards next size computation.
1296 self.bytes_to_read = self.header_length
1297 # Incremental buffer for message under read.
1299 # Initialising counters
1300 self.updates_received = 0
1301 self.prefixes_introduced = 0
1302 self.prefixes_withdrawn = 0
1303 self.rx_idle_time = 0
1304 self.rx_activity_detected = True
1305 self.storage = storage
1307 self.wfr = wait_for_read
1309 def read_message_chunk(self):
1310 """Read up to one message
1313 Currently it does not return anything.
1315 # TODO: We could return the whole message, currently not needed.
1316 # We assume the socket is readable.
1317 chunk_message = self.socket.recv(self.bytes_to_read)
1318 self.msg_in += chunk_message
1319 self.bytes_to_read -= len(chunk_message)
1320 # TODO: bytes_to_read < 0 is not possible, right?
1321 if not self.bytes_to_read:
1322 # Finished reading a logical block.
1323 if self.reading_header:
1324 # The logical block was a BGP header.
1325 # Now we know the size of the message.
1326 self.reading_header = False
1327 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1329 else: # We have finished reading the body of the message.
1330 # Peer has just proven it is still alive.
1331 self.timer.reset_peer_hold_time()
1332 # TODO: Do we want to count received messages?
1333 # This version ignores the received message.
1334 # TODO: Should we do validation and exit on anything
1335 # besides update or keepalive?
1336 # Prepare state for reading another message.
1337 message_type_hex = self.msg_in[self.header_length]
1338 if message_type_hex == "\x01":
1339 logger.info("OPEN message received: 0x%s",
1340 binascii.b2a_hex(self.msg_in))
1341 elif message_type_hex == "\x02":
1342 logger.debug("UPDATE message received: 0x%s",
1343 binascii.b2a_hex(self.msg_in))
1344 self.decode_update_message(self.msg_in)
1345 elif message_type_hex == "\x03":
1346 logger.info("NOTIFICATION message received: 0x%s",
1347 binascii.b2a_hex(self.msg_in))
1348 elif message_type_hex == "\x04":
1349 logger.info("KEEP ALIVE message received: 0x%s",
1350 binascii.b2a_hex(self.msg_in))
1352 logger.warning("Unexpected message received: 0x%s",
1353 binascii.b2a_hex(self.msg_in))
1355 self.reading_header = True
1356 self.bytes_to_read = self.header_length
1357 # We should not act upon peer_hold_time if we are reading
1358 # something right now.
1361 def decode_path_attributes(self, path_attributes_hex):
1362 """Decode the Path Attributes field (rfc4271#section-4.3)
1365 :path_attributes: path_attributes field to be decoded in hex
1369 hex_to_decode = path_attributes_hex
1371 while len(hex_to_decode):
1372 attr_flags_hex = hex_to_decode[0]
1373 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1374 # attr_optional_bit = attr_flags & 128
1375 # attr_transitive_bit = attr_flags & 64
1376 # attr_partial_bit = attr_flags & 32
1377 attr_extended_length_bit = attr_flags & 16
1379 attr_type_code_hex = hex_to_decode[1]
1380 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1382 if attr_extended_length_bit:
1383 attr_length_hex = hex_to_decode[2:4]
1384 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1385 attr_value_hex = hex_to_decode[4:4 + attr_length]
1386 hex_to_decode = hex_to_decode[4 + attr_length:]
1388 attr_length_hex = hex_to_decode[2]
1389 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1390 attr_value_hex = hex_to_decode[3:3 + attr_length]
1391 hex_to_decode = hex_to_decode[3 + attr_length:]
1393 if attr_type_code == 1:
1394 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1395 binascii.b2a_hex(attr_flags_hex))
1396 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1397 elif attr_type_code == 2:
1398 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1399 binascii.b2a_hex(attr_flags_hex))
1400 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1401 elif attr_type_code == 3:
1402 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1403 binascii.b2a_hex(attr_flags_hex))
1404 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1405 elif attr_type_code == 4:
1406 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1407 binascii.b2a_hex(attr_flags_hex))
1408 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1409 elif attr_type_code == 5:
1410 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1411 binascii.b2a_hex(attr_flags_hex))
1412 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1413 elif attr_type_code == 6:
1414 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1415 binascii.b2a_hex(attr_flags_hex))
1416 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1417 elif attr_type_code == 7:
1418 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1419 binascii.b2a_hex(attr_flags_hex))
1420 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1421 elif attr_type_code == 9: # rfc4456#section-8
1422 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1423 binascii.b2a_hex(attr_flags_hex))
1424 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1425 elif attr_type_code == 10: # rfc4456#section-8
1426 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1427 binascii.b2a_hex(attr_flags_hex))
1428 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1429 elif attr_type_code == 14: # rfc4760#section-3
1430 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1431 binascii.b2a_hex(attr_flags_hex))
1432 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1433 address_family_identifier_hex = attr_value_hex[0:2]
1434 logger.debug(" Address Family Identifier=0x%s",
1435 binascii.b2a_hex(address_family_identifier_hex))
1436 subsequent_address_family_identifier_hex = attr_value_hex[2]
1437 logger.debug(" Subsequent Address Family Identifier=0x%s",
1438 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1439 next_hop_netaddr_len_hex = attr_value_hex[3]
1440 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1441 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1442 next_hop_netaddr_len,
1443 binascii.b2a_hex(next_hop_netaddr_len_hex))
1444 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1445 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1446 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1447 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1448 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1449 logger.debug(" Reserved=0x%s",
1450 binascii.b2a_hex(reserved_hex))
1451 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1452 logger.debug(" Network Layer Reachability Information=0x%s",
1453 binascii.b2a_hex(nlri_hex))
1454 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1455 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1456 for prefix in nlri_prefix_list:
1457 logger.debug(" nlri_prefix_received: %s", prefix)
1458 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1459 elif attr_type_code == 15: # rfc4760#section-4
1460 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1461 binascii.b2a_hex(attr_flags_hex))
1462 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1463 address_family_identifier_hex = attr_value_hex[0:2]
1464 logger.debug(" Address Family Identifier=0x%s",
1465 binascii.b2a_hex(address_family_identifier_hex))
1466 subsequent_address_family_identifier_hex = attr_value_hex[2]
1467 logger.debug(" Subsequent Address Family Identifier=0x%s",
1468 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1469 wd_hex = attr_value_hex[3:]
1470 logger.debug(" Withdrawn Routes=0x%s",
1471 binascii.b2a_hex(wd_hex))
1472 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1473 logger.debug(" Withdrawn routes prefix list: %s",
1475 for prefix in wdr_prefix_list:
1476 logger.debug(" withdrawn_prefix_received: %s", prefix)
1477 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1479 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1480 binascii.b2a_hex(attr_flags_hex))
1481 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1484 def decode_update_message(self, msg):
1485 """Decode an UPDATE message (rfc4271#section-4.3)
1488 :msg: message to be decoded in hex
1492 logger.debug("Decoding update message:")
1493 # message header - marker
1494 marker_hex = msg[:16]
1495 logger.debug("Message header marker: 0x%s",
1496 binascii.b2a_hex(marker_hex))
1497 # message header - message length
1498 msg_length_hex = msg[16:18]
1499 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1500 logger.debug("Message lenght: 0x%s (%s)",
1501 binascii.b2a_hex(msg_length_hex), msg_length)
1502 # message header - message type
1503 msg_type_hex = msg[18:19]
1504 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1506 with self.storage as stor:
1507 # this will replace the previously stored message
1508 stor['update'] = binascii.hexlify(msg)
1510 logger.debug("Evpn {}".format(self.evpn))
1512 logger.debug("Skipping update decoding due to evpn data expected")
1516 logger.debug("Message type: 0x%s (update)",
1517 binascii.b2a_hex(msg_type_hex))
1518 # withdrawn routes length
1519 wdr_length_hex = msg[19:21]
1520 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1521 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1522 binascii.b2a_hex(wdr_length_hex), wdr_length)
1524 wdr_hex = msg[21:21 + wdr_length]
1525 logger.debug("Withdrawn routes: 0x%s",
1526 binascii.b2a_hex(wdr_hex))
1527 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1528 logger.debug("Withdrawn routes prefix list: %s",
1530 for prefix in wdr_prefix_list:
1531 logger.debug("withdrawn_prefix_received: %s", prefix)
1532 # total path attribute length
1533 total_pa_length_offset = 21 + wdr_length
1534 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1535 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1536 logger.debug("Total path attribute lenght: 0x%s (%s)",
1537 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1539 pa_offset = total_pa_length_offset + 2
1540 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1541 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1542 self.decode_path_attributes(pa_hex)
1543 # network layer reachability information length
1544 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1545 logger.debug("Calculated NLRI length: %s", nlri_length)
1546 # network layer reachability information
1547 nlri_offset = pa_offset + total_pa_length
1548 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1549 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1550 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1551 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1552 for prefix in nlri_prefix_list:
1553 logger.debug("nlri_prefix_received: %s", prefix)
1555 self.updates_received += 1
1556 self.prefixes_introduced += len(nlri_prefix_list)
1557 self.prefixes_withdrawn += len(wdr_prefix_list)
1559 logger.error("Unexpeced message type 0x%s in 0x%s",
1560 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1562 def wait_for_read(self):
1563 """Read message until timeout (next expected event).
1566 Used when no more updates has to be sent to avoid busy-wait.
1567 Currently it does not return anything.
1569 # Compute time to the first predictable state change
1570 event_time = self.timer.get_next_event_time()
1571 # snapshot_time would be imprecise
1572 wait_timedelta = min(event_time - time.time(), self.wfr)
1573 if wait_timedelta < 0:
1574 # The program got around to waiting to an event in "very near
1575 # future" so late that it became a "past" event, thus tell
1576 # "select" to not wait at all. Passing negative timedelta to
1577 # select() would lead to either waiting forever (for -1) or
1578 # select.error("Invalid parameter") (for everything else).
1580 # And wait for event or something to read.
1582 if not self.rx_activity_detected or not (self.updates_received % 100):
1583 # right time to write statistics to the log (not for every update and
1584 # not too frequently to avoid having large log files)
1585 logger.info("total_received_update_message_counter: %s",
1586 self.updates_received)
1587 logger.info("total_received_nlri_prefix_counter: %s",
1588 self.prefixes_introduced)
1589 logger.info("total_received_withdrawn_prefix_counter: %s",
1590 self.prefixes_withdrawn)
1592 start_time = time.time()
1593 select.select([self.socket], [], [self.socket], wait_timedelta)
1594 timedelta = time.time() - start_time
1595 self.rx_idle_time += timedelta
1596 self.rx_activity_detected = timedelta < 1
1598 if not self.rx_activity_detected or not (self.updates_received % 100):
1599 # right time to write statistics to the log (not for every update and
1600 # not too frequently to avoid having large log files)
1601 logger.info("... idle for %.3fs", timedelta)
1602 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1606 class WriteTracker(object):
1607 """Class tracking enqueueing messages and sending chunks of them."""
1609 def __init__(self, bgp_socket, generator, timer):
1610 """The writter initialisation.
1613 bgp_socket: socket to be used for sending
1614 generator: generator to be used for message generation
1615 timer: timer to be used for scheduling
1617 # References to outside objects,
1618 self.socket = bgp_socket
1619 self.generator = generator
1621 # Really new fields.
1622 # TODO: Would attribute docstrings add anything substantial?
1623 self.sending_message = False
1624 self.bytes_to_send = 0
1627 def enqueue_message_for_sending(self, message):
1628 """Enqueue message and change state.
1631 message: message to be enqueued into the msg_out buffer
1633 self.msg_out += message
1634 self.bytes_to_send += len(message)
1635 self.sending_message = True
1637 def send_message_chunk_is_whole(self):
1638 """Send enqueued data from msg_out buffer
1641 :return: true if no remaining data to send
1643 # We assume there is a msg_out to send and socket is writable.
1644 # print "going to send", repr(self.msg_out)
1645 self.timer.snapshot()
1646 bytes_sent = self.socket.send(self.msg_out)
1647 # Forget the part of message that was sent.
1648 self.msg_out = self.msg_out[bytes_sent:]
1649 self.bytes_to_send -= bytes_sent
1650 if not self.bytes_to_send:
1651 # TODO: Is it possible to hit negative bytes_to_send?
1652 self.sending_message = False
1653 # We should have reset hold timer on peer side.
1654 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1655 # The possible reason for not prioritizing reads is gone.
1660 class StateTracker(object):
1661 """Main loop has state so complex it warrants this separate class."""
1663 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1664 """The state tracker initialisation.
1667 bgp_socket: socket to be used for sending / receiving
1668 generator: generator to be used for message generation
1669 timer: timer to be used for scheduling
1670 inqueue: user initiated messages queue
1671 storage: thread safe dict to store data for the rpc server
1672 cliargs: cli args from the user
1674 # References to outside objects.
1675 self.socket = bgp_socket
1676 self.generator = generator
1679 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, wait_for_read=cliargs.wfr)
1680 self.writer = WriteTracker(bgp_socket, generator, timer)
1681 # Prioritization state.
1682 self.prioritize_writing = False
1683 # In general, we prioritize reading over writing. But in order
1684 # not to get blocked by neverending reads, we should
1685 # check whether we are not risking running out of holdtime.
1686 # So in some situations, this field is set to True to attempt
1687 # finishing sending a message, after which this field resets
1689 # TODO: Alternative is to switch fairly between reading and
1690 # writing (called round robin from now on).
1691 # Message counting is done in generator.
1692 self.inqueue = inqueue
1694 def perform_one_loop_iteration(self):
1695 """ The main loop iteration
1698 Calculates priority, resolves all conditions, calls
1699 appropriate method and returns to caller to repeat.
1701 self.timer.snapshot()
1702 if not self.prioritize_writing:
1703 if self.timer.is_time_for_my_keepalive():
1704 if not self.writer.sending_message:
1705 # We need to schedule a keepalive ASAP.
1706 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1707 logger.info("KEEP ALIVE is sent.")
1708 # We are sending a message now, so let's prioritize it.
1709 self.prioritize_writing = True
1712 msg = self.inqueue.get_nowait()
1713 logger.info("Received message: {}".format(msg))
1714 msgbin = binascii.unhexlify(msg)
1715 self.writer.enqueue_message_for_sending(msgbin)
1718 # Now we know what our priorities are, we have to check
1719 # which actions are available.
1720 # socket.socket() returns three lists,
1721 # we store them to list of lists.
1722 list_list = select.select([self.socket], [self.socket], [self.socket],
1723 self.timer.report_timedelta)
1724 read_list, write_list, except_list = list_list
1725 # Lists are unpacked, each is either [] or [self.socket],
1726 # so we will test them as boolean.
1728 logger.error("Exceptional state on the socket.")
1729 raise RuntimeError("Exceptional state on socket", self.socket)
1730 # We will do either read or write.
1731 if not (self.prioritize_writing and write_list):
1732 # Either we have no reason to rush writes,
1733 # or the socket is not writable.
1734 # We are focusing on reading here.
1735 if read_list: # there is something to read indeed
1736 # In this case we want to read chunk of message
1737 # and repeat the select,
1738 self.reader.read_message_chunk()
1740 # We were focusing on reading, but nothing to read was there.
1741 # Good time to check peer for hold timer.
1742 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1743 # Quiet on the read front, we can have attempt to write.
1745 # Either we really want to reset peer's view of our hold
1746 # timer, or there was nothing to read.
1747 # Were we in the middle of sending a message?
1748 if self.writer.sending_message:
1749 # Was it the end of a message?
1750 whole = self.writer.send_message_chunk_is_whole()
1751 # We were pressed to send something and we did it.
1752 if self.prioritize_writing and whole:
1753 # We prioritize reading again.
1754 self.prioritize_writing = False
1756 # Finally to check if still update messages to be generated.
1757 if self.generator.remaining_prefixes:
1758 msg_out = self.generator.compose_update_message()
1759 if not self.generator.remaining_prefixes:
1760 # We have just finished update generation,
1761 # end-of-rib is due.
1762 logger.info("All update messages generated.")
1763 logger.info("Storing performance results.")
1764 self.generator.store_results()
1765 logger.info("Finally an END-OF-RIB is sent.")
1766 msg_out += self.generator.update_message(wr_prefixes=[],
1769 self.writer.enqueue_message_for_sending(msg_out)
1770 # Attempt for real sending to be done in next iteration.
1772 # Nothing to write anymore.
1773 # To avoid busy loop, we do idle waiting here.
1774 self.reader.wait_for_read()
1776 # We can neither read nor write.
1777 logger.warning("Input and output both blocked for " +
1778 str(self.timer.report_timedelta) + " seconds.")
1779 # FIXME: Are we sure select has been really waiting
1784 def create_logger(loglevel, logfile):
1785 """Create logger object
1788 :loglevel: log level
1789 :logfile: log file name
1791 :return: logger object
1793 logger = logging.getLogger("logger")
1794 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1795 console_handler = logging.StreamHandler()
1796 file_handler = logging.FileHandler(logfile, mode="w")
1797 console_handler.setFormatter(log_formatter)
1798 file_handler.setFormatter(log_formatter)
1799 logger.addHandler(console_handler)
1800 logger.addHandler(file_handler)
1801 logger.setLevel(loglevel)
1805 def job(arguments, inqueue, storage):
1806 """One time initialisation and iterations looping.
1808 Establish BGP connection and run iterations.
1811 :arguments: Command line arguments
1812 :inqueue: Data to be sent from play.py
1813 :storage: Shared dict for rpc server
1817 bgp_socket = establish_connection(arguments)
1818 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1819 # Receive open message before sending anything.
1820 # FIXME: Add parameter to send default open message first,
1821 # to work with "you first" peers.
1822 msg_in = read_open_message(bgp_socket)
1823 timer = TimeTracker(msg_in)
1824 generator = MessageGenerator(arguments)
1825 msg_out = generator.open_message()
1826 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1827 # Send our open message to the peer.
1828 bgp_socket.send(msg_out)
1829 # Wait for confirming keepalive.
1830 # TODO: Surely in just one packet?
1831 # Using exact keepalive length to not to see possible updates.
1832 msg_in = bgp_socket.recv(19)
1833 if msg_in != generator.keepalive_message():
1834 error_msg = "Open not confirmed by keepalive, instead got"
1835 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1836 raise MessageError(error_msg, msg_in)
1837 timer.reset_peer_hold_time()
1838 # Send the keepalive to indicate the connection is accepted.
1839 timer.snapshot() # Remember this time.
1840 msg_out = generator.keepalive_message()
1841 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1842 bgp_socket.send(msg_out)
1843 # Use the remembered time.
1844 timer.reset_my_keepalive_time(timer.snapshot_time)
1845 # End of initial handshake phase.
1846 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1847 while True: # main reactor loop
1848 state.perform_one_loop_iteration()
1852 '''Handler for SimpleXMLRPCServer'''
1853 def __init__(self, sendqueue, storage):
1857 :sendqueue: queue for data to be sent towards odl
1858 :storage: thread safe dict
1860 self.queue = sendqueue
1861 self.storage = storage
1863 def send(self, text):
1867 :text: hes string of the data to be sent
1869 self.queue.put(text)
1871 def get(self, text=''):
1872 '''Reads data form the storage
1874 - returns stored data or an empty string, at the moment only
1878 :text: a key to the storage to get the data
1882 with self.storage as stor:
1883 return stor.get(text, '')
1885 def clean(self, text=''):
1886 '''Cleans data form the storage
1889 :text: a key to the storage to clean the data
1891 with self.storage as stor:
1896 def threaded_job(arguments):
1897 """Run the job threaded
1900 :arguments: Command line arguments
1904 amount_left = arguments.amount
1905 utils_left = arguments.multiplicity
1906 prefix_current = arguments.firstprefix
1907 myip_current = arguments.myip
1909 rpcqueue = Queue.Queue()
1910 storage = SafeDict()
1913 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1914 amount_left -= amount_per_util
1917 args = deepcopy(arguments)
1918 args.amount = amount_per_util
1919 args.firstprefix = prefix_current
1920 args.myip = myip_current
1921 thread_args.append(args)
1925 prefix_current += amount_per_util * 16
1930 for t in thread_args:
1931 thread.start_new_thread(job, (t, rpcqueue, storage))
1933 print "Error: unable to start thread."
1936 rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
1937 rpcserver.register_instance(Rpcs(rpcqueue, storage))
1938 rpcserver.serve_forever()
1941 if __name__ == "__main__":
1942 arguments = parse_arguments()
1943 logger = create_logger(arguments.loglevel, arguments.logfile)
1944 threaded_job(arguments)