1 """Utility for playing generated BGP data to ODL.
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
8 # Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
14 from copy import deepcopy
15 from SimpleXMLRPCServer import SimpleXMLRPCServer
29 __author__ = "Vratko Polak"
30 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
31 __license__ = "Eclipse Public License v1.0"
32 __email__ = "vrpolak@cisco.com"
36 '''Thread safe dictionary
38 The object will serve as thread safe data storage.
39 It should be used with "with" statement.
42 def __init__(self, * p_arg, ** n_arg):
43 super(SafeDict, self).__init__()
44 self._lock = threading.Lock()
50 def __exit__(self, type, value, traceback):
54 def parse_arguments():
55 """Use argparse to get arguments,
60 parser = argparse.ArgumentParser()
61 # TODO: Should we use --argument-names-with-spaces?
62 str_help = "Autonomous System number use in the stream."
63 parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
64 # FIXME: We are acting as iBGP peer,
65 # we should mirror AS number from peer's open message.
66 str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
67 parser.add_argument("--amount", default="1", type=int, help=str_help)
68 str_help = "Maximum number of IP prefixes to be announced in one iteration"
69 parser.add_argument("--insert", default="1", type=int, help=str_help)
70 str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
71 parser.add_argument("--withdraw", default="0", type=int, help=str_help)
72 str_help = "The number of prefixes to process without withdrawals"
73 parser.add_argument("--prefill", default="0", type=int, help=str_help)
74 str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
75 parser.add_argument("--updates", choices=["single", "separate"],
76 default=["separate"], help=str_help)
77 str_help = "Base prefix IP address for prefix generation"
78 parser.add_argument("--firstprefix", default="8.0.1.0",
79 type=ipaddr.IPv4Address, help=str_help)
80 str_help = "The prefix length."
81 parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
82 str_help = "Listen for connection, instead of initiating it."
83 parser.add_argument("--listen", action="store_true", help=str_help)
84 str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
85 "Default value only suitable for listening.")
86 parser.add_argument("--myip", default="0.0.0.0",
87 type=ipaddr.IPv4Address, help=str_help)
88 str_help = ("TCP port to bind to when listening or initiating connection." +
89 "Default only suitable for initiating.")
90 parser.add_argument("--myport", default="0", type=int, help=str_help)
91 str_help = "The IP of the next hop to be placed into the update messages."
92 parser.add_argument("--nexthop", default="192.0.2.1",
93 type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
94 str_help = "Identifier of the route originator."
95 parser.add_argument("--originator", default=None,
96 type=ipaddr.IPv4Address, dest="originator", help=str_help)
97 str_help = "Cluster list item identifier."
98 parser.add_argument("--cluster", default=None,
99 type=ipaddr.IPv4Address, dest="cluster", help=str_help)
100 str_help = ("Numeric IP Address to try to connect to." +
101 "Currently no effect in listening mode.")
102 parser.add_argument("--peerip", default="127.0.0.2",
103 type=ipaddr.IPv4Address, help=str_help)
104 str_help = "TCP port to try to connect to. No effect in listening mode."
105 parser.add_argument("--peerport", default="179", type=int, help=str_help)
106 str_help = "Local hold time."
107 parser.add_argument("--holdtime", default="180", type=int, help=str_help)
108 str_help = "Log level (--error, --warning, --info, --debug)"
109 parser.add_argument("--error", dest="loglevel", action="store_const",
110 const=logging.ERROR, default=logging.INFO,
112 parser.add_argument("--warning", dest="loglevel", action="store_const",
113 const=logging.WARNING, default=logging.INFO,
115 parser.add_argument("--info", dest="loglevel", action="store_const",
116 const=logging.INFO, default=logging.INFO,
118 parser.add_argument("--debug", dest="loglevel", action="store_const",
119 const=logging.DEBUG, default=logging.INFO,
121 str_help = "Log file name"
122 parser.add_argument("--logfile", default="bgp_peer.log", help=str_help)
123 str_help = "Trailing part of the csv result files for plotting purposes"
124 parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
125 str_help = "Minimum number of updates to reach to include result into csv."
126 parser.add_argument("--threshold", default="1000", type=int, help=str_help)
127 str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
128 parser.add_argument("--rfc4760", default=True, type=bool, help=str_help)
129 str_help = "Link-State NLRI supported"
130 parser.add_argument("--bgpls", default=False, type=bool, help=str_help)
131 str_help = "Link-State NLRI: Identifier"
132 parser.add_argument("-lsid", default="1", type=int, help=str_help)
133 str_help = "Link-State NLRI: Tunnel ID"
134 parser.add_argument("-lstid", default="1", type=int, help=str_help)
135 str_help = "Link-State NLRI: LSP ID"
136 parser.add_argument("-lspid", default="1", type=int, help=str_help)
137 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address"
138 parser.add_argument("--lstsaddr", default="1.2.3.4",
139 type=ipaddr.IPv4Address, help=str_help)
140 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address"
141 parser.add_argument("--lsteaddr", default="5.6.7.8",
142 type=ipaddr.IPv4Address, help=str_help)
143 str_help = "Link-State NLRI: Identifier Step"
144 parser.add_argument("-lsidstep", default="1", type=int, help=str_help)
145 str_help = "Link-State NLRI: Tunnel ID Step"
146 parser.add_argument("-lstidstep", default="2", type=int, help=str_help)
147 str_help = "Link-State NLRI: LSP ID Step"
148 parser.add_argument("-lspidstep", default="4", type=int, help=str_help)
149 str_help = "Link-State NLRI: IPv4 Tunnel Sender Address Step"
150 parser.add_argument("-lstsaddrstep", default="16", type=int, help=str_help)
151 str_help = "Link-State NLRI: IPv4 Tunnel End Point Address Step"
152 parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
153 str_help = "How many play utilities are to be started."
154 parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
155 str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
156 Enabling this flag makes the script not decoding the update mesage, because of not\
157 supported decoding for these elements."
158 parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
159 parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
160 str_help = "Skipping well known attributes for update message"
161 parser.add_argument("--skipattr", default=False, action="store_true", help=str_help)
162 arguments = parser.parse_args()
163 if arguments.multiplicity < 1:
164 print "Multiplicity", arguments.multiplicity, "is not positive."
166 # TODO: Are sanity checks (such as asnumber>=0) required?
170 def establish_connection(arguments):
171 """Establish connection to BGP peer.
174 :arguments: following command-line argumets are used
175 - arguments.myip: local IP address
176 - arguments.myport: local port
177 - arguments.peerip: remote IP address
178 - arguments.peerport: remote port
183 logger.info("Connecting in the listening mode.")
184 logger.debug("Local IP address: " + str(arguments.myip))
185 logger.debug("Local port: " + str(arguments.myport))
186 listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
187 listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
188 # bind need single tuple as argument
189 listening_socket.bind((str(arguments.myip), arguments.myport))
190 listening_socket.listen(1)
191 bgp_socket, _ = listening_socket.accept()
192 # TODO: Verify client IP is cotroller IP.
193 listening_socket.close()
195 logger.info("Connecting in the talking mode.")
196 logger.debug("Local IP address: " + str(arguments.myip))
197 logger.debug("Local port: " + str(arguments.myport))
198 logger.debug("Remote IP address: " + str(arguments.peerip))
199 logger.debug("Remote port: " + str(arguments.peerport))
200 talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
201 talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
202 # bind to force specified address and port
203 talking_socket.bind((str(arguments.myip), arguments.myport))
204 # socket does not spead ipaddr, hence str()
205 talking_socket.connect((str(arguments.peerip), arguments.peerport))
206 bgp_socket = talking_socket
207 logger.info("Connected to ODL.")
211 def get_short_int_from_message(message, offset=16):
212 """Extract 2-bytes number from provided message.
215 :message: given message
216 :offset: offset of the short_int inside the message
218 :return: required short_inf value.
220 default offset value is the BGP message size offset.
222 high_byte_int = ord(message[offset])
223 low_byte_int = ord(message[offset + 1])
224 short_int = high_byte_int * 256 + low_byte_int
228 def get_prefix_list_from_hex(prefixes_hex):
229 """Get decoded list of prefixes (rfc4271#section-4.3)
232 :prefixes_hex: list of prefixes to be decoded in hex
234 :return: list of prefixes in the form of ip address (X.X.X.X/X)
238 while offset < len(prefixes_hex):
239 prefix_bit_len_hex = prefixes_hex[offset]
240 prefix_bit_len = int(binascii.b2a_hex(prefix_bit_len_hex), 16)
241 prefix_len = ((prefix_bit_len - 1) / 8) + 1
242 prefix_hex = prefixes_hex[offset + 1: offset + 1 + prefix_len]
243 prefix = ".".join(str(i) for i in struct.unpack("BBBB", prefix_hex))
244 offset += 1 + prefix_len
245 prefix_list.append(prefix + "/" + str(prefix_bit_len))
249 class MessageError(ValueError):
250 """Value error with logging optimized for hexlified messages."""
252 def __init__(self, text, message, *args):
255 Store and call super init for textual comment,
256 store raw message which caused it.
260 super(MessageError, self).__init__(text, message, *args)
263 """Generate human readable error message.
266 :return: human readable message as string
268 Use a placeholder string if the message is to be empty.
270 message = binascii.hexlify(self.msg)
272 message = "(empty message)"
273 return self.text + ": " + message
276 def read_open_message(bgp_socket):
277 """Receive peer's OPEN message
280 :bgp_socket: the socket to be read
282 :return: received OPEN message.
284 Performs just basic incomming message checks
286 msg_in = bgp_socket.recv(65535) # TODO: Is smaller buffer size safe?
287 # TODO: Can the incoming open message be split in more than one packet?
290 # 37 is minimal length of open message with 4-byte AS number.
292 "Message length (" + str(len(msg_in)) + ") is smaller than "
293 "minimal length of OPEN message with 4-byte AS number (37)"
295 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
296 raise MessageError(error_msg, msg_in)
297 # TODO: We could check BGP marker, but it is defined only later;
299 reported_length = get_short_int_from_message(msg_in)
300 if len(msg_in) != reported_length:
302 "Expected message length (" + reported_length +
303 ") does not match actual length (" + str(len(msg_in)) + ")"
305 logger.error(error_msg + binascii.hexlify(msg_in))
306 raise MessageError(error_msg, msg_in)
307 logger.info("Open message received.")
311 class MessageGenerator(object):
312 """Class which generates messages, holds states and configuration values."""
314 # TODO: Define bgp marker as a class (constant) variable.
315 def __init__(self, args):
316 """Initialisation according to command-line args.
319 :args: argsparser's Namespace object which contains command-line
320 options for MesageGenerator initialisation
322 Calculates and stores default values used later on for
325 self.total_prefix_amount = args.amount
326 # Number of update messages left to be sent.
327 self.remaining_prefixes = self.total_prefix_amount
329 # New parameters initialisation
331 self.prefix_base_default = args.firstprefix
332 self.prefix_length_default = args.prefixlen
333 self.wr_prefixes_default = []
334 self.nlri_prefixes_default = []
335 self.version_default = 4
336 self.my_autonomous_system_default = args.asnumber
337 self.hold_time_default = args.holdtime # Local hold time.
338 self.bgp_identifier_default = int(args.myip)
339 self.next_hop_default = args.nexthop
340 self.originator_id_default = args.originator
341 self.cluster_list_item_default = args.cluster
342 self.single_update_default = args.updates == "single"
343 self.randomize_updates_default = args.updates == "random"
344 self.prefix_count_to_add_default = args.insert
345 self.prefix_count_to_del_default = args.withdraw
346 if self.prefix_count_to_del_default < 0:
347 self.prefix_count_to_del_default = 0
348 if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
349 # total number of prefixes must grow to avoid infinite test loop
350 self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
351 self.slot_size_default = self.prefix_count_to_add_default
352 self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
353 self.results_file_name_default = args.results
354 self.performance_threshold_default = args.threshold
355 self.rfc4760 = args.rfc4760
356 self.bgpls = args.bgpls
357 self.evpn = args.evpn
358 self.skipattr = args.skipattr
359 # Default values when BGP-LS Attributes are used
361 self.prefix_count_to_add_default = 1
362 self.prefix_count_to_del_default = 0
363 self.ls_nlri_default = {"Identifier": args.lsid,
364 "TunnelID": args.lstid,
366 "IPv4TunnelSenderAddress": args.lstsaddr,
367 "IPv4TunnelEndPointAddress": args.lsteaddr}
368 self.lsid_step = args.lsidstep
369 self.lstid_step = args.lstidstep
370 self.lspid_step = args.lspidstep
371 self.lstsaddr_step = args.lstsaddrstep
372 self.lsteaddr_step = args.lsteaddrstep
373 # Default values used for randomized part
374 s1_slots = ((self.total_prefix_amount -
375 self.remaining_prefixes_threshold - 1) /
376 self.prefix_count_to_add_default + 1)
377 s2_slots = ((self.remaining_prefixes_threshold - 1) /
378 (self.prefix_count_to_add_default -
379 self.prefix_count_to_del_default) + 1)
381 # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
382 s2_first_index = s1_slots * self.prefix_count_to_add_default
383 s2_last_index = (s2_first_index +
384 s2_slots * (self.prefix_count_to_add_default -
385 self.prefix_count_to_del_default) - 1)
386 self.slot_gap_default = ((self.total_prefix_amount -
387 self.remaining_prefixes_threshold - 1) /
388 self.prefix_count_to_add_default + 1)
389 self.randomize_lowest_default = s2_first_index
390 self.randomize_highest_default = s2_last_index
391 # Initialising counters
392 self.phase1_start_time = 0
393 self.phase1_stop_time = 0
394 self.phase2_start_time = 0
395 self.phase2_stop_time = 0
396 self.phase1_updates_sent = 0
397 self.phase2_updates_sent = 0
398 self.updates_sent = 0
400 self.log_info = args.loglevel <= logging.INFO
401 self.log_debug = args.loglevel <= logging.DEBUG
403 Flags needed for the MessageGenerator performance optimization.
404 Calling logger methods each iteration even with proper log level set
405 slows down significantly the MessageGenerator performance.
406 Measured total generation time (1M updates, dry run, error log level):
407 - logging based on basic logger features: 36,2s
408 - logging based on advanced logger features (lazy logging): 21,2s
409 - conditional calling of logger methods enclosed inside condition: 8,6s
412 logger.info("Generator initialisation")
413 logger.info(" Target total number of prefixes to be introduced: " +
414 str(self.total_prefix_amount))
415 logger.info(" Prefix base: " + str(self.prefix_base_default) + "/" +
416 str(self.prefix_length_default))
417 logger.info(" My Autonomous System number: " +
418 str(self.my_autonomous_system_default))
419 logger.info(" My Hold Time: " + str(self.hold_time_default))
420 logger.info(" My BGP Identifier: " + str(self.bgp_identifier_default))
421 logger.info(" Next Hop: " + str(self.next_hop_default))
422 logger.info(" Originator ID: " + str(self.originator_id_default))
423 logger.info(" Cluster list: " + str(self.cluster_list_item_default))
424 logger.info(" Prefix count to be inserted at once: " +
425 str(self.prefix_count_to_add_default))
426 logger.info(" Prefix count to be withdrawn at once: " +
427 str(self.prefix_count_to_del_default))
428 logger.info(" Fast pre-fill up to " +
429 str(self.total_prefix_amount -
430 self.remaining_prefixes_threshold) + " prefixes")
431 logger.info(" Remaining number of prefixes to be processed " +
432 "in parallel with withdrawals: " +
433 str(self.remaining_prefixes_threshold))
434 logger.debug(" Prefix index range used after pre-fill procedure [" +
435 str(self.randomize_lowest_default) + ", " +
436 str(self.randomize_highest_default) + "]")
437 if self.single_update_default:
438 logger.info(" Common single UPDATE will be generated " +
439 "for both NLRI & WITHDRAWN lists")
441 logger.info(" Two separate UPDATEs will be generated " +
442 "for each NLRI & WITHDRAWN lists")
443 if self.randomize_updates_default:
444 logger.info(" Generation of UPDATE messages will be randomized")
445 logger.info(" Let\'s go ...\n")
447 # TODO: Notification for hold timer expiration can be handy.
449 def store_results(self, file_name=None, threshold=None):
450 """ Stores specified results into files based on file_name value.
453 :param file_name: Trailing (common) part of result file names
454 :param threshold: Minimum number of sent updates needed for each
455 result to be included into result csv file
456 (mainly needed because of the result accuracy)
460 # default values handling
461 # TODO optimize default values handling (use e.g. dicionary.update() approach)
462 if file_name is None:
463 file_name = self.results_file_name_default
464 if threshold is None:
465 threshold = self.performance_threshold_default
466 # performance calculation
467 if self.phase1_updates_sent >= threshold:
468 totals1 = self.phase1_updates_sent
469 performance1 = int(self.phase1_updates_sent /
470 (self.phase1_stop_time - self.phase1_start_time))
474 if self.phase2_updates_sent >= threshold:
475 totals2 = self.phase2_updates_sent
476 performance2 = int(self.phase2_updates_sent /
477 (self.phase2_stop_time - self.phase2_start_time))
482 logger.info("#" * 10 + " Final results " + "#" * 10)
483 logger.info("Number of iterations: " + str(self.iteration))
484 logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
485 str(self.phase1_updates_sent))
486 logger.info("The pre-fill phase duration: " +
487 str(self.phase1_stop_time - self.phase1_start_time) + "s")
488 logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
489 str(self.phase2_updates_sent))
490 logger.info("The 2nd test phase duration: " +
491 str(self.phase2_stop_time - self.phase2_start_time) + "s")
492 logger.info("Threshold for performance reporting: " + str(threshold))
495 phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
496 " route(s) per UPDATE")
497 if self.single_update_default:
498 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
499 "/-" + str(self.prefix_count_to_del_default) +
500 " routes per UPDATE")
502 phase2_label = "+" + (str(self.prefix_count_to_add_default) +
503 "/-" + str(self.prefix_count_to_del_default) +
504 " routes in two UPDATEs")
505 # collecting capacity and performance results
508 if totals1 is not None:
509 totals[phase1_label] = totals1
510 performance[phase1_label] = performance1
511 if totals2 is not None:
512 totals[phase2_label] = totals2
513 performance[phase2_label] = performance2
514 self.write_results_to_file(totals, "totals-" + file_name)
515 self.write_results_to_file(performance, "performance-" + file_name)
517 def write_results_to_file(self, results, file_name):
518 """Writes results to the csv plot file consumable by Jenkins.
521 :param file_name: Name of the (csv) file to be created
527 f = open(file_name, "wt")
529 for key in sorted(results):
530 first_line += key + ", "
531 second_line += str(results[key]) + ", "
532 first_line = first_line[:-2]
533 second_line = second_line[:-2]
534 f.write(first_line + "\n")
535 f.write(second_line + "\n")
536 logger.info("Message generator performance results stored in " +
538 logger.info(" " + first_line)
539 logger.info(" " + second_line)
543 # Return pseudo-randomized (reproducible) index for selected range
544 def randomize_index(self, index, lowest=None, highest=None):
545 """Calculates pseudo-randomized index from selected range.
548 :param index: input index
549 :param lowest: the lowes index from the randomized area
550 :param highest: the highest index from the randomized area
552 :return: the (pseudo)randomized index
554 Created just as a fame for future generator enhancement.
556 # default values handling
557 # TODO optimize default values handling (use e.g. dicionary.update() approach)
559 lowest = self.randomize_lowest_default
561 highest = self.randomize_highest_default
563 if (index >= lowest) and (index <= highest):
564 # we are in the randomized range -> shuffle it inside
565 # the range (now just reverse the order)
566 new_index = highest - (index - lowest)
568 # we are out of the randomized range -> nothing to do
572 def get_ls_nlri_values(self, index):
573 """Generates LS-NLRI parameters.
574 http://tools.ietf.org/html/draft-ietf-idr-te-lsp-distribution-03
577 :param index: index (iteration)
579 :return: dictionary of LS NLRI parameters and values
581 # generating list of LS NLRI parameters
582 identifier = self.ls_nlri_default["Identifier"] + index / self.lsid_step
583 ipv4_tunnel_sender_address = self.ls_nlri_default["IPv4TunnelSenderAddress"] + index / self.lstsaddr_step
584 tunnel_id = self.ls_nlri_default["TunnelID"] + index / self.lstid_step
585 lsp_id = self.ls_nlri_default["LSPID"] + index / self.lspid_step
586 ipv4_tunnel_endpoint_address = self.ls_nlri_default["IPv4TunnelEndPointAddress"] + index / self.lsteaddr_step
587 ls_nlri_values = {"Identifier": identifier,
588 "IPv4TunnelSenderAddress": ipv4_tunnel_sender_address,
589 "TunnelID": tunnel_id, "LSPID": lsp_id,
590 "IPv4TunnelEndPointAddress": ipv4_tunnel_endpoint_address}
591 return ls_nlri_values
593 def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
594 prefix_len=None, prefix_count=None, randomize=None):
595 """Generates list of IP address prefixes.
598 :param slot_index: index of group of prefix addresses
599 :param slot_size: size of group of prefix addresses
600 in [number of included prefixes]
601 :param prefix_base: IP address of the first prefix
602 (slot_index = 0, prefix_index = 0)
603 :param prefix_len: length of the prefix in bites
604 (the same as size of netmask)
605 :param prefix_count: number of prefixes to be returned
606 from the specified slot
608 :return: list of generated IP address prefixes
610 # default values handling
611 # TODO optimize default values handling (use e.g. dicionary.update() approach)
612 if slot_size is None:
613 slot_size = self.slot_size_default
614 if prefix_base is None:
615 prefix_base = self.prefix_base_default
616 if prefix_len is None:
617 prefix_len = self.prefix_length_default
618 if prefix_count is None:
619 prefix_count = slot_size
620 if randomize is None:
621 randomize = self.randomize_updates_default
622 # generating list of prefixes
625 prefix_gap = 2 ** (32 - prefix_len)
626 for i in range(prefix_count):
627 prefix_index = slot_index * slot_size + i
629 prefix_index = self.randomize_index(prefix_index)
630 indexes.append(prefix_index)
631 prefixes.append(prefix_base + prefix_index * prefix_gap)
633 logger.debug(" Prefix slot index: " + str(slot_index))
634 logger.debug(" Prefix slot size: " + str(slot_size))
635 logger.debug(" Prefix count: " + str(prefix_count))
636 logger.debug(" Prefix indexes: " + str(indexes))
637 logger.debug(" Prefix list: " + str(prefixes))
640 def compose_update_message(self, prefix_count_to_add=None,
641 prefix_count_to_del=None):
642 """Composes an UPDATE message
645 :param prefix_count_to_add: # of prefixes to put into NLRI list
646 :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
648 :return: encoded UPDATE message in HEX
650 Optionally generates separate UPDATEs for NLRI and WITHDRAWN
651 lists or common message wich includes both prefix lists.
652 Updates global counters.
654 # default values handling
655 # TODO optimize default values handling (use e.g. dicionary.update() approach)
656 if prefix_count_to_add is None:
657 prefix_count_to_add = self.prefix_count_to_add_default
658 if prefix_count_to_del is None:
659 prefix_count_to_del = self.prefix_count_to_del_default
661 if self.log_info and not (self.iteration % 1000):
662 logger.info("Iteration: " + str(self.iteration) +
663 " - total remaining prefixes: " +
664 str(self.remaining_prefixes))
666 logger.debug("#" * 10 + " Iteration: " +
667 str(self.iteration) + " " + "#" * 10)
668 logger.debug("Remaining prefixes: " +
669 str(self.remaining_prefixes))
670 # scenario type & one-shot counter
671 straightforward_scenario = (self.remaining_prefixes >
672 self.remaining_prefixes_threshold)
673 if straightforward_scenario:
674 prefix_count_to_del = 0
676 logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
677 if not self.phase1_start_time:
678 self.phase1_start_time = time.time()
681 logger.debug("--- COMBINED SCENARIO ---")
682 if not self.phase2_start_time:
683 self.phase2_start_time = time.time()
684 # tailor the number of prefixes if needed
685 prefix_count_to_add = (prefix_count_to_del +
686 min(prefix_count_to_add - prefix_count_to_del,
687 self.remaining_prefixes))
688 # prefix slots selection for insertion and withdrawal
689 slot_index_to_add = self.iteration
690 slot_index_to_del = slot_index_to_add - self.slot_gap_default
691 # getting lists of prefixes for insertion in this iteration
693 logger.debug("Prefixes to be inserted in this iteration:")
694 prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
695 prefix_count=prefix_count_to_add)
696 # getting lists of prefixes for withdrawal in this iteration
698 logger.debug("Prefixes to be withdrawn in this iteration:")
699 prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
700 prefix_count=prefix_count_to_del)
701 # generating the UPDATE mesage with LS-NLRI only
703 ls_nlri = self.get_ls_nlri_values(self.iteration)
704 msg_out = self.update_message(wr_prefixes=[], nlri_prefixes=[],
707 # generating the UPDATE message with prefix lists
708 if self.single_update_default:
709 # Send prefixes to be introduced and withdrawn
710 # in one UPDATE message
711 msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
712 nlri_prefixes=prefix_list_to_add)
714 # Send prefixes to be introduced and withdrawn
715 # in separate UPDATE messages (if needed)
716 msg_out = self.update_message(wr_prefixes=[],
717 nlri_prefixes=prefix_list_to_add)
718 if prefix_count_to_del:
719 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
721 # updating counters - who knows ... maybe I am last time here ;)
722 if straightforward_scenario:
723 self.phase1_stop_time = time.time()
724 self.phase1_updates_sent = self.updates_sent
726 self.phase2_stop_time = time.time()
727 self.phase2_updates_sent = (self.updates_sent -
728 self.phase1_updates_sent)
729 # updating totals for the next iteration
731 self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
732 # returning the encoded message
735 # Section of message encoders
737 def open_message(self, version=None, my_autonomous_system=None,
738 hold_time=None, bgp_identifier=None):
739 """Generates an OPEN Message (rfc4271#section-4.2)
742 :param version: see the rfc4271#section-4.2
743 :param my_autonomous_system: see the rfc4271#section-4.2
744 :param hold_time: see the rfc4271#section-4.2
745 :param bgp_identifier: see the rfc4271#section-4.2
747 :return: encoded OPEN message in HEX
750 # default values handling
751 # TODO optimize default values handling (use e.g. dicionary.update() approach)
753 version = self.version_default
754 if my_autonomous_system is None:
755 my_autonomous_system = self.my_autonomous_system_default
756 if hold_time is None:
757 hold_time = self.hold_time_default
758 if bgp_identifier is None:
759 bgp_identifier = self.bgp_identifier_default
762 marker_hex = "\xFF" * 16
766 type_hex = struct.pack("B", type)
769 version_hex = struct.pack("B", version)
771 # my_autonomous_system
772 # AS_TRANS value, 23456 decadic.
773 my_autonomous_system_2_bytes = 23456
774 # AS number is mappable to 2 bytes
775 if my_autonomous_system < 65536:
776 my_autonomous_system_2_bytes = my_autonomous_system
777 my_autonomous_system_hex_2_bytes = struct.pack(">H",
778 my_autonomous_system)
781 hold_time_hex = struct.pack(">H", hold_time)
784 bgp_identifier_hex = struct.pack(">I", bgp_identifier)
786 # Optional Parameters
787 optional_parameters_hex = ""
789 optional_parameter_hex = (
790 "\x02" # Param type ("Capability Ad")
791 "\x06" # Length (6 bytes)
792 "\x01" # Capability type (NLRI Unicast),
793 # see RFC 4760, secton 8
794 "\x04" # Capability value length
795 "\x00\x01" # AFI (Ipv4)
797 "\x01" # SAFI (Unicast)
799 optional_parameters_hex += optional_parameter_hex
802 optional_parameter_hex = (
803 "\x02" # Param type ("Capability Ad")
804 "\x06" # Length (6 bytes)
805 "\x01" # Capability type (NLRI Unicast),
806 # see RFC 4760, secton 8
807 "\x04" # Capability value length
808 "\x40\x04" # AFI (BGP-LS)
810 "\x47" # SAFI (BGP-LS)
812 optional_parameters_hex += optional_parameter_hex
815 optional_parameter_hex = (
816 "\x02" # Param type ("Capability Ad")
817 "\x06" # Length (6 bytes)
818 "\x01" # Multiprotocol extetension capability,
819 "\x04" # Capability value length
820 "\x00\x19" # AFI (L2-VPN)
824 optional_parameters_hex += optional_parameter_hex
826 optional_parameter_hex = (
827 "\x02" # Param type ("Capability Ad")
828 "\x06" # Length (6 bytes)
829 "\x41" # "32 bit AS Numbers Support"
830 # (see RFC 6793, section 3)
831 "\x04" # Capability value length
833 optional_parameter_hex += (
834 struct.pack(">I", my_autonomous_system) # My AS in 32 bit format
836 optional_parameters_hex += optional_parameter_hex
838 # Optional Parameters Length
839 optional_parameters_length = len(optional_parameters_hex)
840 optional_parameters_length_hex = struct.pack("B",
841 optional_parameters_length)
843 # Length (big-endian)
845 len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
846 len(my_autonomous_system_hex_2_bytes) +
847 len(hold_time_hex) + len(bgp_identifier_hex) +
848 len(optional_parameters_length_hex) +
849 len(optional_parameters_hex)
851 length_hex = struct.pack(">H", length)
859 my_autonomous_system_hex_2_bytes +
862 optional_parameters_length_hex +
863 optional_parameters_hex
867 logger.debug("OPEN message encoding")
868 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
869 logger.debug(" Length=" + str(length) + " (0x" +
870 binascii.hexlify(length_hex) + ")")
871 logger.debug(" Type=" + str(type) + " (0x" +
872 binascii.hexlify(type_hex) + ")")
873 logger.debug(" Version=" + str(version) + " (0x" +
874 binascii.hexlify(version_hex) + ")")
875 logger.debug(" My Autonomous System=" +
876 str(my_autonomous_system_2_bytes) + " (0x" +
877 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
879 logger.debug(" Hold Time=" + str(hold_time) + " (0x" +
880 binascii.hexlify(hold_time_hex) + ")")
881 logger.debug(" BGP Identifier=" + str(bgp_identifier) +
882 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
883 logger.debug(" Optional Parameters Length=" +
884 str(optional_parameters_length) + " (0x" +
885 binascii.hexlify(optional_parameters_length_hex) +
887 logger.debug(" Optional Parameters=0x" +
888 binascii.hexlify(optional_parameters_hex))
889 logger.debug("OPEN message encoded: 0x%s",
890 binascii.b2a_hex(message_hex))
894 def update_message(self, wr_prefixes=None, nlri_prefixes=None,
895 wr_prefix_length=None, nlri_prefix_length=None,
896 my_autonomous_system=None, next_hop=None,
897 originator_id=None, cluster_list_item=None,
898 end_of_rib=False, **ls_nlri_params):
899 """Generates an UPDATE Message (rfc4271#section-4.3)
902 :param wr_prefixes: see the rfc4271#section-4.3
903 :param nlri_prefixes: see the rfc4271#section-4.3
904 :param wr_prefix_length: see the rfc4271#section-4.3
905 :param nlri_prefix_length: see the rfc4271#section-4.3
906 :param my_autonomous_system: see the rfc4271#section-4.3
907 :param next_hop: see the rfc4271#section-4.3
909 :return: encoded UPDATE message in HEX
912 # default values handling
913 # TODO optimize default values handling (use e.g. dicionary.update() approach)
914 if wr_prefixes is None:
915 wr_prefixes = self.wr_prefixes_default
916 if nlri_prefixes is None:
917 nlri_prefixes = self.nlri_prefixes_default
918 if wr_prefix_length is None:
919 wr_prefix_length = self.prefix_length_default
920 if nlri_prefix_length is None:
921 nlri_prefix_length = self.prefix_length_default
922 if my_autonomous_system is None:
923 my_autonomous_system = self.my_autonomous_system_default
925 next_hop = self.next_hop_default
926 if originator_id is None:
927 originator_id = self.originator_id_default
928 if cluster_list_item is None:
929 cluster_list_item = self.cluster_list_item_default
930 ls_nlri = self.ls_nlri_default.copy()
931 ls_nlri.update(ls_nlri_params)
934 marker_hex = "\xFF" * 16
938 type_hex = struct.pack("B", type)
941 withdrawn_routes_hex = ""
943 bytes = ((wr_prefix_length - 1) / 8) + 1
944 for prefix in wr_prefixes:
945 withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
946 struct.pack(">I", int(prefix))[:bytes])
947 withdrawn_routes_hex += withdrawn_route_hex
949 # Withdrawn Routes Length
950 withdrawn_routes_length = len(withdrawn_routes_hex)
951 withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
953 # TODO: to replace hardcoded string by encoding?
955 path_attributes_hex = ""
956 if not self.skipattr:
957 path_attributes_hex += (
958 "\x40" # Flags ("Well-Known")
959 "\x01" # Type (ORIGIN)
963 path_attributes_hex += (
964 "\x40" # Flags ("Well-Known")
965 "\x02" # Type (AS_PATH)
967 "\x02" # AS segment type (AS_SEQUENCE)
968 "\x01" # AS segment length (1)
970 my_as_hex = struct.pack(">I", my_autonomous_system)
971 path_attributes_hex += my_as_hex # AS segment (4 bytes)
972 path_attributes_hex += (
973 "\x40" # Flags ("Well-Known")
974 "\x05" # Type (LOCAL_PREF)
976 "\x00\x00\x00\x64" # (100)
978 if nlri_prefixes != []:
979 path_attributes_hex += (
980 "\x40" # Flags ("Well-Known")
981 "\x03" # Type (NEXT_HOP)
984 next_hop_hex = struct.pack(">I", int(next_hop))
985 path_attributes_hex += (
986 next_hop_hex # IP address of the next hop (4 bytes)
988 if originator_id is not None:
989 path_attributes_hex += (
990 "\x80" # Flags ("Optional, non-transitive")
991 "\x09" # Type (ORIGINATOR_ID)
993 ) # ORIGINATOR_ID (4 bytes)
994 path_attributes_hex += struct.pack(">I", int(originator_id))
995 if cluster_list_item is not None:
996 path_attributes_hex += (
997 "\x80" # Flags ("Optional, non-transitive")
998 "\x09" # Type (CLUSTER_LIST)
1000 ) # one CLUSTER_LIST item (4 bytes)
1001 path_attributes_hex += struct.pack(">I", int(cluster_list_item))
1003 if self.bgpls and not end_of_rib:
1004 path_attributes_hex += (
1005 "\x80" # Flags ("Optional, non-transitive")
1006 "\x0e" # Type (MP_REACH_NLRI)
1007 "\x22" # Length (34)
1008 "\x40\x04" # AFI (BGP-LS)
1009 "\x47" # SAFI (BGP-LS)
1010 "\x04" # Next Hop Length (4)
1012 path_attributes_hex += struct.pack(">I", int(next_hop))
1013 path_attributes_hex += "\x00" # Reserved
1014 path_attributes_hex += (
1015 "\x00\x05" # LS-NLRI.NLRIType (IPv4 TE LSP NLRI)
1016 "\x00\x15" # LS-NLRI.TotalNLRILength (21)
1017 "\x07" # LS-NLRI.Variable.ProtocolID (RSVP-TE)
1019 path_attributes_hex += struct.pack(">Q", int(ls_nlri["Identifier"]))
1020 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelSenderAddress"]))
1021 path_attributes_hex += struct.pack(">H", int(ls_nlri["TunnelID"]))
1022 path_attributes_hex += struct.pack(">H", int(ls_nlri["LSPID"]))
1023 path_attributes_hex += struct.pack(">I", int(ls_nlri["IPv4TunnelEndPointAddress"]))
1025 # Total Path Attributes Length
1026 total_path_attributes_length = len(path_attributes_hex)
1027 total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
1029 # Network Layer Reachability Information
1032 bytes = ((nlri_prefix_length - 1) / 8) + 1
1033 for prefix in nlri_prefixes:
1034 nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
1035 struct.pack(">I", int(prefix))[:bytes])
1036 nlri_hex += nlri_prefix_hex
1038 # Length (big-endian)
1040 len(marker_hex) + 2 + len(type_hex) +
1041 len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
1042 len(total_path_attributes_length_hex) + len(path_attributes_hex) +
1044 length_hex = struct.pack(">H", length)
1051 withdrawn_routes_length_hex +
1052 withdrawn_routes_hex +
1053 total_path_attributes_length_hex +
1054 path_attributes_hex +
1059 logger.debug("UPDATE message encoding")
1060 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1061 logger.debug(" Length=" + str(length) + " (0x" +
1062 binascii.hexlify(length_hex) + ")")
1063 logger.debug(" Type=" + str(type) + " (0x" +
1064 binascii.hexlify(type_hex) + ")")
1065 logger.debug(" withdrawn_routes_length=" +
1066 str(withdrawn_routes_length) + " (0x" +
1067 binascii.hexlify(withdrawn_routes_length_hex) + ")")
1068 logger.debug(" Withdrawn_Routes=" + str(wr_prefixes) + "/" +
1069 str(wr_prefix_length) + " (0x" +
1070 binascii.hexlify(withdrawn_routes_hex) + ")")
1071 if total_path_attributes_length:
1072 logger.debug(" Total Path Attributes Length=" +
1073 str(total_path_attributes_length) + " (0x" +
1074 binascii.hexlify(total_path_attributes_length_hex) + ")")
1075 logger.debug(" Path Attributes=" + "(0x" +
1076 binascii.hexlify(path_attributes_hex) + ")")
1077 logger.debug(" Origin=IGP")
1078 logger.debug(" AS path=" + str(my_autonomous_system))
1079 logger.debug(" Next hop=" + str(next_hop))
1080 if originator_id is not None:
1081 logger.debug(" Originator id=" + str(originator_id))
1082 if cluster_list_item is not None:
1083 logger.debug(" Cluster list=" + str(cluster_list_item))
1085 logger.debug(" MP_REACH_NLRI: %s", ls_nlri)
1086 logger.debug(" Network Layer Reachability Information=" +
1087 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
1088 " (0x" + binascii.hexlify(nlri_hex) + ")")
1089 logger.debug("UPDATE message encoded: 0x" +
1090 binascii.b2a_hex(message_hex))
1093 self.updates_sent += 1
1094 # returning encoded message
1097 def notification_message(self, error_code, error_subcode, data_hex=""):
1098 """Generates a NOTIFICATION Message (rfc4271#section-4.5)
1101 :param error_code: see the rfc4271#section-4.5
1102 :param error_subcode: see the rfc4271#section-4.5
1103 :param data_hex: see the rfc4271#section-4.5
1105 :return: encoded NOTIFICATION message in HEX
1109 marker_hex = "\xFF" * 16
1113 type_hex = struct.pack("B", type)
1116 error_code_hex = struct.pack("B", error_code)
1119 error_subcode_hex = struct.pack("B", error_subcode)
1121 # Length (big-endian)
1122 length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
1123 len(error_subcode_hex) + len(data_hex))
1124 length_hex = struct.pack(">H", length)
1126 # NOTIFICATION Message
1137 logger.debug("NOTIFICATION message encoding")
1138 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1139 logger.debug(" Length=" + str(length) + " (0x" +
1140 binascii.hexlify(length_hex) + ")")
1141 logger.debug(" Type=" + str(type) + " (0x" +
1142 binascii.hexlify(type_hex) + ")")
1143 logger.debug(" Error Code=" + str(error_code) + " (0x" +
1144 binascii.hexlify(error_code_hex) + ")")
1145 logger.debug(" Error Subode=" + str(error_subcode) + " (0x" +
1146 binascii.hexlify(error_subcode_hex) + ")")
1147 logger.debug(" Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
1148 logger.debug("NOTIFICATION message encoded: 0x%s",
1149 binascii.b2a_hex(message_hex))
1153 def keepalive_message(self):
1154 """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
1157 :return: encoded KEEP ALIVE message in HEX
1161 marker_hex = "\xFF" * 16
1165 type_hex = struct.pack("B", type)
1167 # Length (big-endian)
1168 length = len(marker_hex) + 2 + len(type_hex)
1169 length_hex = struct.pack(">H", length)
1171 # KEEP ALIVE Message
1179 logger.debug("KEEP ALIVE message encoding")
1180 logger.debug(" Marker=0x" + binascii.hexlify(marker_hex))
1181 logger.debug(" Length=" + str(length) + " (0x" +
1182 binascii.hexlify(length_hex) + ")")
1183 logger.debug(" Type=" + str(type) + " (0x" +
1184 binascii.hexlify(type_hex) + ")")
1185 logger.debug("KEEP ALIVE message encoded: 0x%s",
1186 binascii.b2a_hex(message_hex))
1191 class TimeTracker(object):
1192 """Class for tracking timers, both for my keepalives and
1196 def __init__(self, msg_in):
1197 """Initialisation. based on defaults and OPEN message from peer.
1200 msg_in: the OPEN message received from peer.
1202 # Note: Relative time is always named timedelta, to stress that
1203 # the (non-delta) time is absolute.
1204 self.report_timedelta = 1.0 # In seconds. TODO: Configurable?
1205 # Upper bound for being stuck in the same state, we should
1206 # at least report something before continuing.
1207 # Negotiate the hold timer by taking the smaller
1208 # of the 2 values (mine and the peer's).
1209 hold_timedelta = 180 # Not an attribute of self yet.
1210 # TODO: Make the default value configurable,
1211 # default value could mirror what peer said.
1212 peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
1213 if hold_timedelta > peer_hold_timedelta:
1214 hold_timedelta = peer_hold_timedelta
1215 if hold_timedelta != 0 and hold_timedelta < 3:
1216 logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
1217 raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
1218 self.hold_timedelta = hold_timedelta
1219 # If we do not hear from peer this long, we assume it has died.
1220 self.keepalive_timedelta = int(hold_timedelta / 3.0)
1221 # Upper limit for duration between messages, to avoid being
1222 # declared to be dead.
1223 # The same as calling snapshot(), but also declares a field.
1224 self.snapshot_time = time.time()
1225 # Sometimes we need to store time. This is where to get
1226 # the value from afterwards. Time_keepalive may be too strict.
1227 self.peer_hold_time = self.snapshot_time + self.hold_timedelta
1228 # At this time point, peer will be declared dead.
1229 self.my_keepalive_time = None # to be set later
1230 # At this point, we should be sending keepalive message.
1233 """Store current time in instance data to use later."""
1234 # Read as time before something interesting was called.
1235 self.snapshot_time = time.time()
1237 def reset_peer_hold_time(self):
1238 """Move hold time to future as peer has just proven it still lives."""
1239 self.peer_hold_time = time.time() + self.hold_timedelta
1241 # Some methods could rely on self.snapshot_time, but it is better
1242 # to require user to provide it explicitly.
1243 def reset_my_keepalive_time(self, keepalive_time):
1244 """Calculate and set the next my KEEP ALIVE timeout time
1247 :keepalive_time: the initial value of the KEEP ALIVE timer
1249 self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1251 def is_time_for_my_keepalive(self):
1252 """Check for my KEEP ALIVE timeout occurence"""
1253 if self.hold_timedelta == 0:
1255 return self.snapshot_time >= self.my_keepalive_time
1257 def get_next_event_time(self):
1258 """Set the time of the next expected or to be sent KEEP ALIVE"""
1259 if self.hold_timedelta == 0:
1260 return self.snapshot_time + 86400
1261 return min(self.my_keepalive_time, self.peer_hold_time)
1263 def check_peer_hold_time(self, snapshot_time):
1264 """Raise error if nothing was read from peer until specified time."""
1265 # Hold time = 0 means keepalive checking off.
1266 if self.hold_timedelta != 0:
1267 # time.time() may be too strict
1268 if snapshot_time > self.peer_hold_time:
1269 logger.error("Peer has overstepped the hold timer.")
1270 raise RuntimeError("Peer has overstepped the hold timer.")
1271 # TODO: Include hold_timedelta?
1272 # TODO: Add notification sending (attempt). That means
1273 # move to write tracker.
1276 class ReadTracker(object):
1277 """Class for tracking read of mesages chunk by chunk and
1281 def __init__(self, bgp_socket, timer, storage, evpn=False, wait_for_read=10):
1282 """The reader initialisation.
1285 bgp_socket: socket to be used for sending
1286 timer: timer to be used for scheduling
1287 storage: thread safe dict
1288 evpn: flag that evpn functionality is tested
1290 # References to outside objects.
1291 self.socket = bgp_socket
1293 # BGP marker length plus length field length.
1294 self.header_length = 18
1295 # TODO: make it class (constant) attribute
1296 # Computation of where next chunk ends depends on whether
1297 # we are beyond length field.
1298 self.reading_header = True
1299 # Countdown towards next size computation.
1300 self.bytes_to_read = self.header_length
1301 # Incremental buffer for message under read.
1303 # Initialising counters
1304 self.updates_received = 0
1305 self.prefixes_introduced = 0
1306 self.prefixes_withdrawn = 0
1307 self.rx_idle_time = 0
1308 self.rx_activity_detected = True
1309 self.storage = storage
1311 self.wfr = wait_for_read
1313 def read_message_chunk(self):
1314 """Read up to one message
1317 Currently it does not return anything.
1319 # TODO: We could return the whole message, currently not needed.
1320 # We assume the socket is readable.
1321 chunk_message = self.socket.recv(self.bytes_to_read)
1322 self.msg_in += chunk_message
1323 self.bytes_to_read -= len(chunk_message)
1324 # TODO: bytes_to_read < 0 is not possible, right?
1325 if not self.bytes_to_read:
1326 # Finished reading a logical block.
1327 if self.reading_header:
1328 # The logical block was a BGP header.
1329 # Now we know the size of the message.
1330 self.reading_header = False
1331 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1333 else: # We have finished reading the body of the message.
1334 # Peer has just proven it is still alive.
1335 self.timer.reset_peer_hold_time()
1336 # TODO: Do we want to count received messages?
1337 # This version ignores the received message.
1338 # TODO: Should we do validation and exit on anything
1339 # besides update or keepalive?
1340 # Prepare state for reading another message.
1341 message_type_hex = self.msg_in[self.header_length]
1342 if message_type_hex == "\x01":
1343 logger.info("OPEN message received: 0x%s",
1344 binascii.b2a_hex(self.msg_in))
1345 elif message_type_hex == "\x02":
1346 logger.debug("UPDATE message received: 0x%s",
1347 binascii.b2a_hex(self.msg_in))
1348 self.decode_update_message(self.msg_in)
1349 elif message_type_hex == "\x03":
1350 logger.info("NOTIFICATION message received: 0x%s",
1351 binascii.b2a_hex(self.msg_in))
1352 elif message_type_hex == "\x04":
1353 logger.info("KEEP ALIVE message received: 0x%s",
1354 binascii.b2a_hex(self.msg_in))
1356 logger.warning("Unexpected message received: 0x%s",
1357 binascii.b2a_hex(self.msg_in))
1359 self.reading_header = True
1360 self.bytes_to_read = self.header_length
1361 # We should not act upon peer_hold_time if we are reading
1362 # something right now.
1365 def decode_path_attributes(self, path_attributes_hex):
1366 """Decode the Path Attributes field (rfc4271#section-4.3)
1369 :path_attributes: path_attributes field to be decoded in hex
1373 hex_to_decode = path_attributes_hex
1375 while len(hex_to_decode):
1376 attr_flags_hex = hex_to_decode[0]
1377 attr_flags = int(binascii.b2a_hex(attr_flags_hex), 16)
1378 # attr_optional_bit = attr_flags & 128
1379 # attr_transitive_bit = attr_flags & 64
1380 # attr_partial_bit = attr_flags & 32
1381 attr_extended_length_bit = attr_flags & 16
1383 attr_type_code_hex = hex_to_decode[1]
1384 attr_type_code = int(binascii.b2a_hex(attr_type_code_hex), 16)
1386 if attr_extended_length_bit:
1387 attr_length_hex = hex_to_decode[2:4]
1388 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1389 attr_value_hex = hex_to_decode[4:4 + attr_length]
1390 hex_to_decode = hex_to_decode[4 + attr_length:]
1392 attr_length_hex = hex_to_decode[2]
1393 attr_length = int(binascii.b2a_hex(attr_length_hex), 16)
1394 attr_value_hex = hex_to_decode[3:3 + attr_length]
1395 hex_to_decode = hex_to_decode[3 + attr_length:]
1397 if attr_type_code == 1:
1398 logger.debug("Attribute type=1 (ORIGIN, flags:0x%s)",
1399 binascii.b2a_hex(attr_flags_hex))
1400 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1401 elif attr_type_code == 2:
1402 logger.debug("Attribute type=2 (AS_PATH, flags:0x%s)",
1403 binascii.b2a_hex(attr_flags_hex))
1404 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1405 elif attr_type_code == 3:
1406 logger.debug("Attribute type=3 (NEXT_HOP, flags:0x%s)",
1407 binascii.b2a_hex(attr_flags_hex))
1408 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1409 elif attr_type_code == 4:
1410 logger.debug("Attribute type=4 (MULTI_EXIT_DISC, flags:0x%s)",
1411 binascii.b2a_hex(attr_flags_hex))
1412 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1413 elif attr_type_code == 5:
1414 logger.debug("Attribute type=5 (LOCAL_PREF, flags:0x%s)",
1415 binascii.b2a_hex(attr_flags_hex))
1416 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1417 elif attr_type_code == 6:
1418 logger.debug("Attribute type=6 (ATOMIC_AGGREGATE, flags:0x%s)",
1419 binascii.b2a_hex(attr_flags_hex))
1420 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1421 elif attr_type_code == 7:
1422 logger.debug("Attribute type=7 (AGGREGATOR, flags:0x%s)",
1423 binascii.b2a_hex(attr_flags_hex))
1424 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1425 elif attr_type_code == 9: # rfc4456#section-8
1426 logger.debug("Attribute type=9 (ORIGINATOR_ID, flags:0x%s)",
1427 binascii.b2a_hex(attr_flags_hex))
1428 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1429 elif attr_type_code == 10: # rfc4456#section-8
1430 logger.debug("Attribute type=10 (CLUSTER_LIST, flags:0x%s)",
1431 binascii.b2a_hex(attr_flags_hex))
1432 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1433 elif attr_type_code == 14: # rfc4760#section-3
1434 logger.debug("Attribute type=14 (MP_REACH_NLRI, flags:0x%s)",
1435 binascii.b2a_hex(attr_flags_hex))
1436 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1437 address_family_identifier_hex = attr_value_hex[0:2]
1438 logger.debug(" Address Family Identifier=0x%s",
1439 binascii.b2a_hex(address_family_identifier_hex))
1440 subsequent_address_family_identifier_hex = attr_value_hex[2]
1441 logger.debug(" Subsequent Address Family Identifier=0x%s",
1442 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1443 next_hop_netaddr_len_hex = attr_value_hex[3]
1444 next_hop_netaddr_len = int(binascii.b2a_hex(next_hop_netaddr_len_hex), 16)
1445 logger.debug(" Length of Next Hop Network Address=%s (0x%s)",
1446 next_hop_netaddr_len,
1447 binascii.b2a_hex(next_hop_netaddr_len_hex))
1448 next_hop_netaddr_hex = attr_value_hex[4:4 + next_hop_netaddr_len]
1449 next_hop_netaddr = ".".join(str(i) for i in struct.unpack("BBBB", next_hop_netaddr_hex))
1450 logger.debug(" Network Address of Next Hop=%s (0x%s)",
1451 next_hop_netaddr, binascii.b2a_hex(next_hop_netaddr_hex))
1452 reserved_hex = attr_value_hex[4 + next_hop_netaddr_len]
1453 logger.debug(" Reserved=0x%s",
1454 binascii.b2a_hex(reserved_hex))
1455 nlri_hex = attr_value_hex[4 + next_hop_netaddr_len + 1:]
1456 logger.debug(" Network Layer Reachability Information=0x%s",
1457 binascii.b2a_hex(nlri_hex))
1458 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1459 logger.debug(" NLRI prefix list: %s", nlri_prefix_list)
1460 for prefix in nlri_prefix_list:
1461 logger.debug(" nlri_prefix_received: %s", prefix)
1462 self.prefixes_introduced += len(nlri_prefix_list) # update counter
1463 elif attr_type_code == 15: # rfc4760#section-4
1464 logger.debug("Attribute type=15 (MP_UNREACH_NLRI, flags:0x%s)",
1465 binascii.b2a_hex(attr_flags_hex))
1466 logger.debug("Attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1467 address_family_identifier_hex = attr_value_hex[0:2]
1468 logger.debug(" Address Family Identifier=0x%s",
1469 binascii.b2a_hex(address_family_identifier_hex))
1470 subsequent_address_family_identifier_hex = attr_value_hex[2]
1471 logger.debug(" Subsequent Address Family Identifier=0x%s",
1472 binascii.b2a_hex(subsequent_address_family_identifier_hex))
1473 wd_hex = attr_value_hex[3:]
1474 logger.debug(" Withdrawn Routes=0x%s",
1475 binascii.b2a_hex(wd_hex))
1476 wdr_prefix_list = get_prefix_list_from_hex(wd_hex)
1477 logger.debug(" Withdrawn routes prefix list: %s",
1479 for prefix in wdr_prefix_list:
1480 logger.debug(" withdrawn_prefix_received: %s", prefix)
1481 self.prefixes_withdrawn += len(wdr_prefix_list) # update counter
1483 logger.debug("Unknown attribute type=%s, flags:0x%s)", attr_type_code,
1484 binascii.b2a_hex(attr_flags_hex))
1485 logger.debug("Unknown attribute value=0x%s", binascii.b2a_hex(attr_value_hex))
1488 def decode_update_message(self, msg):
1489 """Decode an UPDATE message (rfc4271#section-4.3)
1492 :msg: message to be decoded in hex
1496 logger.debug("Decoding update message:")
1497 # message header - marker
1498 marker_hex = msg[:16]
1499 logger.debug("Message header marker: 0x%s",
1500 binascii.b2a_hex(marker_hex))
1501 # message header - message length
1502 msg_length_hex = msg[16:18]
1503 msg_length = int(binascii.b2a_hex(msg_length_hex), 16)
1504 logger.debug("Message lenght: 0x%s (%s)",
1505 binascii.b2a_hex(msg_length_hex), msg_length)
1506 # message header - message type
1507 msg_type_hex = msg[18:19]
1508 msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
1510 with self.storage as stor:
1511 # this will replace the previously stored message
1512 stor['update'] = binascii.hexlify(msg)
1514 logger.debug("Evpn {}".format(self.evpn))
1516 logger.debug("Skipping update decoding due to evpn data expected")
1520 logger.debug("Message type: 0x%s (update)",
1521 binascii.b2a_hex(msg_type_hex))
1522 # withdrawn routes length
1523 wdr_length_hex = msg[19:21]
1524 wdr_length = int(binascii.b2a_hex(wdr_length_hex), 16)
1525 logger.debug("Withdrawn routes lenght: 0x%s (%s)",
1526 binascii.b2a_hex(wdr_length_hex), wdr_length)
1528 wdr_hex = msg[21:21 + wdr_length]
1529 logger.debug("Withdrawn routes: 0x%s",
1530 binascii.b2a_hex(wdr_hex))
1531 wdr_prefix_list = get_prefix_list_from_hex(wdr_hex)
1532 logger.debug("Withdrawn routes prefix list: %s",
1534 for prefix in wdr_prefix_list:
1535 logger.debug("withdrawn_prefix_received: %s", prefix)
1536 # total path attribute length
1537 total_pa_length_offset = 21 + wdr_length
1538 total_pa_length_hex = msg[total_pa_length_offset:total_pa_length_offset + 2]
1539 total_pa_length = int(binascii.b2a_hex(total_pa_length_hex), 16)
1540 logger.debug("Total path attribute lenght: 0x%s (%s)",
1541 binascii.b2a_hex(total_pa_length_hex), total_pa_length)
1543 pa_offset = total_pa_length_offset + 2
1544 pa_hex = msg[pa_offset:pa_offset + total_pa_length]
1545 logger.debug("Path attributes: 0x%s", binascii.b2a_hex(pa_hex))
1546 self.decode_path_attributes(pa_hex)
1547 # network layer reachability information length
1548 nlri_length = msg_length - 23 - total_pa_length - wdr_length
1549 logger.debug("Calculated NLRI length: %s", nlri_length)
1550 # network layer reachability information
1551 nlri_offset = pa_offset + total_pa_length
1552 nlri_hex = msg[nlri_offset:nlri_offset + nlri_length]
1553 logger.debug("NLRI: 0x%s", binascii.b2a_hex(nlri_hex))
1554 nlri_prefix_list = get_prefix_list_from_hex(nlri_hex)
1555 logger.debug("NLRI prefix list: %s", nlri_prefix_list)
1556 for prefix in nlri_prefix_list:
1557 logger.debug("nlri_prefix_received: %s", prefix)
1559 self.updates_received += 1
1560 self.prefixes_introduced += len(nlri_prefix_list)
1561 self.prefixes_withdrawn += len(wdr_prefix_list)
1563 logger.error("Unexpeced message type 0x%s in 0x%s",
1564 binascii.b2a_hex(msg_type_hex), binascii.b2a_hex(msg))
1566 def wait_for_read(self):
1567 """Read message until timeout (next expected event).
1570 Used when no more updates has to be sent to avoid busy-wait.
1571 Currently it does not return anything.
1573 # Compute time to the first predictable state change
1574 event_time = self.timer.get_next_event_time()
1575 # snapshot_time would be imprecise
1576 wait_timedelta = min(event_time - time.time(), self.wfr)
1577 if wait_timedelta < 0:
1578 # The program got around to waiting to an event in "very near
1579 # future" so late that it became a "past" event, thus tell
1580 # "select" to not wait at all. Passing negative timedelta to
1581 # select() would lead to either waiting forever (for -1) or
1582 # select.error("Invalid parameter") (for everything else).
1584 # And wait for event or something to read.
1586 if not self.rx_activity_detected or not (self.updates_received % 100):
1587 # right time to write statistics to the log (not for every update and
1588 # not too frequently to avoid having large log files)
1589 logger.info("total_received_update_message_counter: %s",
1590 self.updates_received)
1591 logger.info("total_received_nlri_prefix_counter: %s",
1592 self.prefixes_introduced)
1593 logger.info("total_received_withdrawn_prefix_counter: %s",
1594 self.prefixes_withdrawn)
1596 start_time = time.time()
1597 select.select([self.socket], [], [self.socket], wait_timedelta)
1598 timedelta = time.time() - start_time
1599 self.rx_idle_time += timedelta
1600 self.rx_activity_detected = timedelta < 1
1602 if not self.rx_activity_detected or not (self.updates_received % 100):
1603 # right time to write statistics to the log (not for every update and
1604 # not too frequently to avoid having large log files)
1605 logger.info("... idle for %.3fs", timedelta)
1606 logger.info("total_rx_idle_time_counter: %.3fs", self.rx_idle_time)
1610 class WriteTracker(object):
1611 """Class tracking enqueueing messages and sending chunks of them."""
1613 def __init__(self, bgp_socket, generator, timer):
1614 """The writter initialisation.
1617 bgp_socket: socket to be used for sending
1618 generator: generator to be used for message generation
1619 timer: timer to be used for scheduling
1621 # References to outside objects,
1622 self.socket = bgp_socket
1623 self.generator = generator
1625 # Really new fields.
1626 # TODO: Would attribute docstrings add anything substantial?
1627 self.sending_message = False
1628 self.bytes_to_send = 0
1631 def enqueue_message_for_sending(self, message):
1632 """Enqueue message and change state.
1635 message: message to be enqueued into the msg_out buffer
1637 self.msg_out += message
1638 self.bytes_to_send += len(message)
1639 self.sending_message = True
1641 def send_message_chunk_is_whole(self):
1642 """Send enqueued data from msg_out buffer
1645 :return: true if no remaining data to send
1647 # We assume there is a msg_out to send and socket is writable.
1648 # print "going to send", repr(self.msg_out)
1649 self.timer.snapshot()
1650 bytes_sent = self.socket.send(self.msg_out)
1651 # Forget the part of message that was sent.
1652 self.msg_out = self.msg_out[bytes_sent:]
1653 self.bytes_to_send -= bytes_sent
1654 if not self.bytes_to_send:
1655 # TODO: Is it possible to hit negative bytes_to_send?
1656 self.sending_message = False
1657 # We should have reset hold timer on peer side.
1658 self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1659 # The possible reason for not prioritizing reads is gone.
1664 class StateTracker(object):
1665 """Main loop has state so complex it warrants this separate class."""
1667 def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
1668 """The state tracker initialisation.
1671 bgp_socket: socket to be used for sending / receiving
1672 generator: generator to be used for message generation
1673 timer: timer to be used for scheduling
1674 inqueue: user initiated messages queue
1675 storage: thread safe dict to store data for the rpc server
1676 cliargs: cli args from the user
1678 # References to outside objects.
1679 self.socket = bgp_socket
1680 self.generator = generator
1683 self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, wait_for_read=cliargs.wfr)
1684 self.writer = WriteTracker(bgp_socket, generator, timer)
1685 # Prioritization state.
1686 self.prioritize_writing = False
1687 # In general, we prioritize reading over writing. But in order
1688 # not to get blocked by neverending reads, we should
1689 # check whether we are not risking running out of holdtime.
1690 # So in some situations, this field is set to True to attempt
1691 # finishing sending a message, after which this field resets
1693 # TODO: Alternative is to switch fairly between reading and
1694 # writing (called round robin from now on).
1695 # Message counting is done in generator.
1696 self.inqueue = inqueue
1698 def perform_one_loop_iteration(self):
1699 """ The main loop iteration
1702 Calculates priority, resolves all conditions, calls
1703 appropriate method and returns to caller to repeat.
1705 self.timer.snapshot()
1706 if not self.prioritize_writing:
1707 if self.timer.is_time_for_my_keepalive():
1708 if not self.writer.sending_message:
1709 # We need to schedule a keepalive ASAP.
1710 self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1711 logger.info("KEEP ALIVE is sent.")
1712 # We are sending a message now, so let's prioritize it.
1713 self.prioritize_writing = True
1716 msg = self.inqueue.get_nowait()
1717 logger.info("Received message: {}".format(msg))
1718 msgbin = binascii.unhexlify(msg)
1719 self.writer.enqueue_message_for_sending(msgbin)
1722 # Now we know what our priorities are, we have to check
1723 # which actions are available.
1724 # socket.socket() returns three lists,
1725 # we store them to list of lists.
1726 list_list = select.select([self.socket], [self.socket], [self.socket],
1727 self.timer.report_timedelta)
1728 read_list, write_list, except_list = list_list
1729 # Lists are unpacked, each is either [] or [self.socket],
1730 # so we will test them as boolean.
1732 logger.error("Exceptional state on the socket.")
1733 raise RuntimeError("Exceptional state on socket", self.socket)
1734 # We will do either read or write.
1735 if not (self.prioritize_writing and write_list):
1736 # Either we have no reason to rush writes,
1737 # or the socket is not writable.
1738 # We are focusing on reading here.
1739 if read_list: # there is something to read indeed
1740 # In this case we want to read chunk of message
1741 # and repeat the select,
1742 self.reader.read_message_chunk()
1744 # We were focusing on reading, but nothing to read was there.
1745 # Good time to check peer for hold timer.
1746 self.timer.check_peer_hold_time(self.timer.snapshot_time)
1747 # Quiet on the read front, we can have attempt to write.
1749 # Either we really want to reset peer's view of our hold
1750 # timer, or there was nothing to read.
1751 # Were we in the middle of sending a message?
1752 if self.writer.sending_message:
1753 # Was it the end of a message?
1754 whole = self.writer.send_message_chunk_is_whole()
1755 # We were pressed to send something and we did it.
1756 if self.prioritize_writing and whole:
1757 # We prioritize reading again.
1758 self.prioritize_writing = False
1760 # Finally to check if still update messages to be generated.
1761 if self.generator.remaining_prefixes:
1762 msg_out = self.generator.compose_update_message()
1763 if not self.generator.remaining_prefixes:
1764 # We have just finished update generation,
1765 # end-of-rib is due.
1766 logger.info("All update messages generated.")
1767 logger.info("Storing performance results.")
1768 self.generator.store_results()
1769 logger.info("Finally an END-OF-RIB is sent.")
1770 msg_out += self.generator.update_message(wr_prefixes=[],
1773 self.writer.enqueue_message_for_sending(msg_out)
1774 # Attempt for real sending to be done in next iteration.
1776 # Nothing to write anymore.
1777 # To avoid busy loop, we do idle waiting here.
1778 self.reader.wait_for_read()
1780 # We can neither read nor write.
1781 logger.warning("Input and output both blocked for " +
1782 str(self.timer.report_timedelta) + " seconds.")
1783 # FIXME: Are we sure select has been really waiting
1788 def create_logger(loglevel, logfile):
1789 """Create logger object
1792 :loglevel: log level
1793 :logfile: log file name
1795 :return: logger object
1797 logger = logging.getLogger("logger")
1798 log_formatter = logging.Formatter("%(asctime)s %(levelname)s BGP-%(threadName)s: %(message)s")
1799 console_handler = logging.StreamHandler()
1800 file_handler = logging.FileHandler(logfile, mode="w")
1801 console_handler.setFormatter(log_formatter)
1802 file_handler.setFormatter(log_formatter)
1803 logger.addHandler(console_handler)
1804 logger.addHandler(file_handler)
1805 logger.setLevel(loglevel)
1809 def job(arguments, inqueue, storage):
1810 """One time initialisation and iterations looping.
1812 Establish BGP connection and run iterations.
1815 :arguments: Command line arguments
1816 :inqueue: Data to be sent from play.py
1817 :storage: Shared dict for rpc server
1821 bgp_socket = establish_connection(arguments)
1822 # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1823 # Receive open message before sending anything.
1824 # FIXME: Add parameter to send default open message first,
1825 # to work with "you first" peers.
1826 msg_in = read_open_message(bgp_socket)
1827 timer = TimeTracker(msg_in)
1828 generator = MessageGenerator(arguments)
1829 msg_out = generator.open_message()
1830 logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1831 # Send our open message to the peer.
1832 bgp_socket.send(msg_out)
1833 # Wait for confirming keepalive.
1834 # TODO: Surely in just one packet?
1835 # Using exact keepalive length to not to see possible updates.
1836 msg_in = bgp_socket.recv(19)
1837 if msg_in != generator.keepalive_message():
1838 error_msg = "Open not confirmed by keepalive, instead got"
1839 logger.error(error_msg + ": " + binascii.hexlify(msg_in))
1840 raise MessageError(error_msg, msg_in)
1841 timer.reset_peer_hold_time()
1842 # Send the keepalive to indicate the connection is accepted.
1843 timer.snapshot() # Remember this time.
1844 msg_out = generator.keepalive_message()
1845 logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1846 bgp_socket.send(msg_out)
1847 # Use the remembered time.
1848 timer.reset_my_keepalive_time(timer.snapshot_time)
1849 # End of initial handshake phase.
1850 state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
1851 while True: # main reactor loop
1852 state.perform_one_loop_iteration()
1856 '''Handler for SimpleXMLRPCServer'''
1858 def __init__(self, sendqueue, storage):
1862 :sendqueue: queue for data to be sent towards odl
1863 :storage: thread safe dict
1865 self.queue = sendqueue
1866 self.storage = storage
1868 def send(self, text):
1872 :text: hes string of the data to be sent
1874 self.queue.put(text)
1876 def get(self, text=''):
1877 '''Reads data form the storage
1879 - returns stored data or an empty string, at the moment only
1883 :text: a key to the storage to get the data
1887 with self.storage as stor:
1888 return stor.get(text, '')
1890 def clean(self, text=''):
1891 '''Cleans data form the storage
1894 :text: a key to the storage to clean the data
1896 with self.storage as stor:
1901 def threaded_job(arguments):
1902 """Run the job threaded
1905 :arguments: Command line arguments
1909 amount_left = arguments.amount
1910 utils_left = arguments.multiplicity
1911 prefix_current = arguments.firstprefix
1912 myip_current = arguments.myip
1914 rpcqueue = Queue.Queue()
1915 storage = SafeDict()
1918 amount_per_util = (amount_left - 1) / utils_left + 1 # round up
1919 amount_left -= amount_per_util
1922 args = deepcopy(arguments)
1923 args.amount = amount_per_util
1924 args.firstprefix = prefix_current
1925 args.myip = myip_current
1926 thread_args.append(args)
1930 prefix_current += amount_per_util * 16
1935 for t in thread_args:
1936 thread.start_new_thread(job, (t, rpcqueue, storage))
1938 print "Error: unable to start thread."
1941 rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
1942 rpcserver.register_instance(Rpcs(rpcqueue, storage))
1943 rpcserver.serve_forever()
1946 if __name__ == "__main__":
1947 arguments = parse_arguments()
1948 logger = create_logger(arguments.loglevel, arguments.logfile)
1949 threaded_job(arguments)