f01a2c7fb11d081c4987e65c7a742d10f25748f9
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
18
19 import argparse
20 import binascii
21 import ipaddr
22 import select
23 import socket
24 import time
25 import logging
26 import struct
27
28
29 def parse_arguments():
30     """Use argparse to get arguments,
31
32     Returns:
33         :return: args object.
34     """
35     parser = argparse.ArgumentParser()
36     # TODO: Should we use --argument-names-with-spaces?
37     str_help = "Autonomous System number use in the stream."
38     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
39     # FIXME: We are acting as iBGP peer,
40     # we should mirror AS number from peer's open message.
41     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
42     parser.add_argument("--amount", default="1", type=int, help=str_help)
43     str_help = "Maximum number of IP prefixes to be announced in one iteration"
44     parser.add_argument("--insert", default="1", type=int, help=str_help)
45     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
46     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
47     str_help = "The number of prefixes to process without withdrawals"
48     parser.add_argument("--prefill", default="0", type=int, help=str_help)
49     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
50     parser.add_argument("--updates", choices=["single", "separate"],
51                         default=["separate"], help=str_help)
52     str_help = "Base prefix IP address for prefix generation"
53     parser.add_argument("--firstprefix", default="8.0.1.0",
54                         type=ipaddr.IPv4Address, help=str_help)
55     str_help = "The prefix length."
56     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
57     str_help = "Listen for connection, instead of initiating it."
58     parser.add_argument("--listen", action="store_true", help=str_help)
59     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
60                 "Default value only suitable for listening.")
61     parser.add_argument("--myip", default="0.0.0.0",
62                         type=ipaddr.IPv4Address, help=str_help)
63     str_help = ("TCP port to bind to when listening or initiating connection." +
64                 "Default only suitable for initiating.")
65     parser.add_argument("--myport", default="0", type=int, help=str_help)
66     str_help = "The IP of the next hop to be placed into the update messages."
67     parser.add_argument("--nexthop", default="192.0.2.1",
68                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
69     str_help = ("Numeric IP Address to try to connect to." +
70                 "Currently no effect in listening mode.")
71     parser.add_argument("--peerip", default="127.0.0.2",
72                         type=ipaddr.IPv4Address, help=str_help)
73     str_help = "TCP port to try to connect to. No effect in listening mode."
74     parser.add_argument("--peerport", default="179", type=int, help=str_help)
75     str_help = "Local hold time."
76     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
77     str_help = "Log level (--error, --warning, --info, --debug)"
78     parser.add_argument("--error", dest="loglevel", action="store_const",
79                         const=logging.ERROR, default=logging.INFO,
80                         help=str_help)
81     parser.add_argument("--warning", dest="loglevel", action="store_const",
82                         const=logging.WARNING, default=logging.INFO,
83                         help=str_help)
84     parser.add_argument("--info", dest="loglevel", action="store_const",
85                         const=logging.INFO, default=logging.INFO,
86                         help=str_help)
87     parser.add_argument("--debug", dest="loglevel", action="store_const",
88                         const=logging.DEBUG, default=logging.INFO,
89                         help=str_help)
90     str_help = "Trailing part of the csv result files for plotting purposes"
91     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
92     str_help = "Minimum number of updates to reach to include result into csv."
93     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
94     arguments = parser.parse_args()
95     # TODO: Are sanity checks (such as asnumber>=0) required?
96     return arguments
97
98
99 def establish_connection(arguments):
100     """Establish connection to BGP peer.
101
102     Arguments:
103         :arguments: following command-line argumets are used
104             - arguments.myip: local IP address
105             - arguments.myport: local port
106             - arguments.peerip: remote IP address
107             - arguments.peerport: remote port
108     Returns:
109         :return: socket.
110     """
111     if arguments.listen:
112         stdout_logger.info("Connecting in the listening mode.")
113         stdout_logger.debug("Local IP address: " + str(arguments.myip))
114         stdout_logger.debug("Local port: " + str(arguments.myport))
115         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
116         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
117         # bind need single tuple as argument
118         listening_socket.bind((str(arguments.myip), arguments.myport))
119         listening_socket.listen(1)
120         bgp_socket, _ = listening_socket.accept()
121         # TODO: Verify client IP is cotroller IP.
122         listening_socket.close()
123     else:
124         stdout_logger.info("Connecting in the talking mode.")
125         stdout_logger.debug("Local IP address: " + str(arguments.myip))
126         stdout_logger.debug("Local port: " + str(arguments.myport))
127         stdout_logger.debug("Remote IP address: " + str(arguments.peerip))
128         stdout_logger.debug("Remote port: " + str(arguments.peerport))
129         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
130         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
131         # bind to force specified address and port
132         talking_socket.bind((str(arguments.myip), arguments.myport))
133         # socket does not spead ipaddr, hence str()
134         talking_socket.connect((str(arguments.peerip), arguments.peerport))
135         bgp_socket = talking_socket
136     stdout_logger.info("Connected to ODL.")
137     return bgp_socket
138
139
140 def get_short_int_from_message(message, offset=16):
141     """Extract 2-bytes number from provided message.
142
143     Arguments:
144         :message: given message
145         :offset: offset of the short_int inside the message
146     Returns:
147         :return: required short_inf value.
148     Notes:
149         default offset value is the BGP message size offset.
150     """
151     high_byte_int = ord(message[offset])
152     low_byte_int = ord(message[offset + 1])
153     short_int = high_byte_int * 256 + low_byte_int
154     return short_int
155
156
157 class MessageError(ValueError):
158     """Value error with logging optimized for hexlified messages."""
159
160     def __init__(self, text, message, *args):
161         """Initialisation.
162
163         Store and call super init for textual comment,
164         store raw message which caused it.
165         """
166         self.text = text
167         self.msg = message
168         super(MessageError, self).__init__(text, message, *args)
169
170     def __str__(self):
171         """Generate human readable error message.
172
173         Returns:
174             :return: human readable message as string
175         Notes:
176             Use a placeholder string if the message is to be empty.
177         """
178         message = binascii.hexlify(self.msg)
179         if message == "":
180             message = "(empty message)"
181         return self.text + ": " + message
182
183
184 def read_open_message(bgp_socket):
185     """Receive peer's OPEN message
186
187     Arguments:
188         :bgp_socket: the socket to be read
189     Returns:
190         :return: received OPEN message.
191     Notes:
192         Performs just basic incomming message checks
193     """
194     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
195     # TODO: Can the incoming open message be split in more than one packet?
196     # Some validation.
197     if len(msg_in) < 37:
198         # 37 is minimal length of open message with 4-byte AS number.
199         stdout_logger.error("Got something else than open with 4-byte AS number: " +
200                             binascii.hexlify(msg_in))
201         raise MessageError("Got something else than open with 4-byte AS number",
202                            msg_in)
203     # TODO: We could check BGP marker, but it is defined only later;
204     # decide what to do.
205     reported_length = get_short_int_from_message(msg_in)
206     if len(msg_in) != reported_length:
207         stdout_logger.error("Message length is not " + str(reported_length) +
208                             " as stated in " + binascii.hexlify(msg_in))
209         raise MessageError("Message length is not " + reported_length +
210                            " as stated in ", msg_in)
211     stdout_logger.info("Open message received.")
212     return msg_in
213
214
215 class MessageGenerator(object):
216     """Class which generates messages, holds states and configuration values."""
217
218     # TODO: Define bgp marker as a class (constant) variable.
219     def __init__(self, args):
220         """Initialisation according to command-line args.
221
222         Arguments:
223             :args: argsparser's Namespace object which contains command-line
224                 options for MesageGenerator initialisation
225         Notes:
226             Calculates and stores default values used later on for
227             message geeration.
228         """
229         self.total_prefix_amount = args.amount
230         # Number of update messages left to be sent.
231         self.remaining_prefixes = self.total_prefix_amount
232
233         # New parameters initialisation
234         self.iteration = 0
235         self.prefix_base_default = args.firstprefix
236         self.prefix_length_default = args.prefixlen
237         self.wr_prefixes_default = []
238         self.nlri_prefixes_default = []
239         self.version_default = 4
240         self.my_autonomous_system_default = args.asnumber
241         self.hold_time_default = args.holdtime  # Local hold time.
242         self.bgp_identifier_default = int(args.myip)
243         self.next_hop_default = args.nexthop
244         self.single_update_default = args.updates == "single"
245         self.randomize_updates_default = args.updates == "random"
246         self.prefix_count_to_add_default = args.insert
247         self.prefix_count_to_del_default = args.withdraw
248         if self.prefix_count_to_del_default < 0:
249             self.prefix_count_to_del_default = 0
250         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
251             # total number of prefixes must grow to avoid infinite test loop
252             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
253         self.slot_size_default = self.prefix_count_to_add_default
254         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
255         self.results_file_name_default = args.results
256         self.performance_threshold_default = args.threshold
257         # Default values used for randomized part
258         s1_slots = ((self.total_prefix_amount -
259                      self.remaining_prefixes_threshold - 1) /
260                     self.prefix_count_to_add_default + 1)
261         s2_slots = ((self.remaining_prefixes_threshold - 1) /
262                     (self.prefix_count_to_add_default -
263                     self.prefix_count_to_del_default) + 1)
264         # S1_First_Index = 0
265         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
266         s2_first_index = s1_slots * self.prefix_count_to_add_default
267         s2_last_index = (s2_first_index +
268                          s2_slots * (self.prefix_count_to_add_default -
269                                      self.prefix_count_to_del_default) - 1)
270         self.slot_gap_default = ((self.total_prefix_amount -
271                                   self.remaining_prefixes_threshold - 1) /
272                                  self.prefix_count_to_add_default + 1)
273         self.randomize_lowest_default = s2_first_index
274         self.randomize_highest_default = s2_last_index
275
276         # Initialising counters
277         self.phase1_start_time = 0
278         self.phase1_stop_time = 0
279         self.phase2_start_time = 0
280         self.phase2_stop_time = 0
281         self.phase1_updates_sent = 0
282         self.phase2_updates_sent = 0
283         self.updates_sent = 0
284
285         self.log_info = args.loglevel <= logging.INFO
286         self.log_debug = args.loglevel <= logging.DEBUG
287         """
288         Flags needed for the MessageGenerator performance optimization.
289         Calling logger methods each iteration even with proper log level set
290         slows down significantly the MessageGenerator performance.
291         Measured total generation time (1M updates, dry run, error log level):
292         - logging based on basic logger features: 36,2s
293         - logging based on advanced logger features (lazy logging): 21,2s
294         - conditional calling of logger methods enclosed inside condition: 8,6s
295         """
296
297         stdout_logger.info("Generator initialisation")
298         stdout_logger.info("  Target total number of prefixes to be introduced: " +
299                            str(self.total_prefix_amount))
300         stdout_logger.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
301                            str(self.prefix_length_default))
302         stdout_logger.info("  My Autonomous System number: " +
303                            str(self.my_autonomous_system_default))
304         stdout_logger.info("  My Hold Time: " + str(self.hold_time_default))
305         stdout_logger.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
306         stdout_logger.info("  Next Hop: " + str(self.next_hop_default))
307         stdout_logger.info("  Prefix count to be inserted at once: " +
308                            str(self.prefix_count_to_add_default))
309         stdout_logger.info("  Prefix count to be withdrawn at once: " +
310                            str(self.prefix_count_to_del_default))
311         stdout_logger.info("  Fast pre-fill up to " +
312                            str(self.total_prefix_amount -
313                                self.remaining_prefixes_threshold) + " prefixes")
314         stdout_logger.info("  Remaining number of prefixes to be processed " +
315                            "in parallel with withdrawals: " +
316                            str(self.remaining_prefixes_threshold))
317         stdout_logger.debug("  Prefix index range used after pre-fill procedure [" +
318                             str(self.randomize_lowest_default) + ", " +
319                             str(self.randomize_highest_default) + "]")
320         if self.single_update_default:
321             stdout_logger.info("  Common single UPDATE will be generated " +
322                                "for both NLRI & WITHDRAWN lists")
323         else:
324             stdout_logger.info("  Two separate UPDATEs will be generated " +
325                                "for each NLRI & WITHDRAWN lists")
326         if self.randomize_updates_default:
327             stdout_logger.info("  Generation of UPDATE messages will be randomized")
328         stdout_logger.info("  Let\"s go ...\n")
329
330         # TODO: Notification for hold timer expiration can be handy.
331
332     def store_results(self, file_name=None, threshold=None):
333         """ Stores specified results into files based on file_name value.
334
335         Arguments:
336             :param file_name: Trailing (common) part of result file names
337             :param threshold: Minimum number of sent updates needed for each
338                               result to be included into result csv file
339                               (mainly needed because of the result accuracy)
340         Returns:
341             :return: n/a
342         """
343         # default values handling
344         if file_name is None:
345             file_name = self.results_file_name_default
346         if threshold is None:
347             threshold = self.performance_threshold_default
348         # performance calculation
349         if self.phase1_updates_sent >= threshold:
350             totals1 = self.phase1_updates_sent
351             performance1 = int(self.phase1_updates_sent /
352                                (self.phase1_stop_time - self.phase1_start_time))
353         else:
354             totals1 = None
355             performance1 = None
356         if self.phase2_updates_sent >= threshold:
357             totals2 = self.phase2_updates_sent
358             performance2 = int(self.phase2_updates_sent /
359                                (self.phase2_stop_time - self.phase2_start_time))
360         else:
361             totals2 = None
362             performance2 = None
363
364         stdout_logger.info("#" * 10 + " Final results " + "#" * 10)
365         stdout_logger.info("Number of iterations: " + str(self.iteration))
366         stdout_logger.info("Number of UPDATE messages sent in the pre-fill phase: " +
367                            str(self.phase1_updates_sent))
368         stdout_logger.info("The pre-fill phase duration: " +
369                            str(self.phase1_stop_time - self.phase1_start_time) + "s")
370         stdout_logger.info("Number of UPDATE messages sent in the 2nd test phase: " +
371                            str(self.phase2_updates_sent))
372         stdout_logger.info("The 2nd test phase duration: " +
373                            str(self.phase2_stop_time - self.phase2_start_time) + "s")
374         stdout_logger.info("Threshold for performance reporting: " + str(threshold))
375
376         # making labels
377         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
378                         " route(s) per UPDATE")
379         if self.single_update_default:
380             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
381                                   "/-" + str(self.prefix_count_to_del_default) +
382                                   " routes per UPDATE")
383         else:
384             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
385                                   "/-" + str(self.prefix_count_to_del_default) +
386                                   " routes in two UPDATEs")
387         # collecting capacity and performance results
388         totals = {}
389         performance = {}
390         if totals1 is not None:
391             totals[phase1_label] = totals1
392             performance[phase1_label] = performance1
393         if totals2 is not None:
394             totals[phase2_label] = totals2
395             performance[phase2_label] = performance2
396         self.write_results_to_file(totals, "totals-" + file_name)
397         self.write_results_to_file(performance, "performance-" + file_name)
398
399     def write_results_to_file(self, results, file_name):
400         """Writes results to the csv plot file consumable by Jenkins.
401
402         Arguments:
403             :param file_name: Name of the (csv) file to be created
404         Returns:
405             :return: none
406         """
407         first_line = ""
408         second_line = ""
409         f = open(file_name, "wt")
410         try:
411             for key in sorted(results):
412                 first_line += key + ", "
413                 second_line += str(results[key]) + ", "
414             first_line = first_line[:-2]
415             second_line = second_line[:-2]
416             f.write(first_line + "\n")
417             f.write(second_line + "\n")
418             stdout_logger.info("Message generator performance results stored in " +
419                                file_name + ':')
420             stdout_logger.info("  " + first_line)
421             stdout_logger.info("  " + second_line)
422         finally:
423             f.close()
424
425     # Return pseudo-randomized (reproducible) index for selected range
426     def randomize_index(self, index, lowest=None, highest=None):
427         """Calculates pseudo-randomized index from selected range.
428
429         Arguments:
430             :param index: input index
431             :param lowest: the lowes index from the randomized area
432             :param highest: the highest index from the randomized area
433         Returns:
434             :return: the (pseudo)randomized index
435         Notes:
436             Created just as a fame for future generator enhancement.
437         """
438         # default values handling
439         if lowest is None:
440             lowest = self.randomize_lowest_default
441         if highest is None:
442             highest = self.randomize_highest_default
443         # randomize
444         if (index >= lowest) and (index <= highest):
445             # we are in the randomized range -> shuffle it inside
446             # the range (now just reverse the order)
447             new_index = highest - (index - lowest)
448         else:
449             # we are out of the randomized range -> nothing to do
450             new_index = index
451         return new_index
452
453     # Get list of prefixes
454     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
455                         prefix_len=None, prefix_count=None, randomize=None):
456         """Generates list of IP address prefixes.
457
458         Arguments:
459             :param slot_index: index of group of prefix addresses
460             :param slot_size: size of group of prefix addresses
461                 in [number of included prefixes]
462             :param prefix_base: IP address of the first prefix
463                 (slot_index = 0, prefix_index = 0)
464             :param prefix_len: length of the prefix in bites
465                 (the same as size of netmask)
466             :param prefix_count: number of prefixes to be returned
467                 from the specified slot
468         Returns:
469             :return: list of generated IP address prefixes
470         """
471         # default values handling
472         if slot_size is None:
473             slot_size = self.slot_size_default
474         if prefix_base is None:
475             prefix_base = self.prefix_base_default
476         if prefix_len is None:
477             prefix_len = self.prefix_length_default
478         if prefix_count is None:
479             prefix_count = slot_size
480         if randomize is None:
481             randomize = self.randomize_updates_default
482         # generating list of prefixes
483         indexes = []
484         prefixes = []
485         prefix_gap = 2 ** (32 - prefix_len)
486         for i in range(prefix_count):
487             prefix_index = slot_index * slot_size + i
488             if randomize:
489                 prefix_index = self.randomize_index(prefix_index)
490             indexes.append(prefix_index)
491             prefixes.append(prefix_base + prefix_index * prefix_gap)
492         if self.log_debug:
493             stdout_logger.debug("  Prefix slot index: " + str(slot_index))
494             stdout_logger.debug("  Prefix slot size: " + str(slot_size))
495             stdout_logger.debug("  Prefix count: " + str(prefix_count))
496             stdout_logger.debug("  Prefix indexes: " + str(indexes))
497             stdout_logger.debug("  Prefix list: " + str(prefixes))
498         return prefixes
499
500     def compose_update_message(self, prefix_count_to_add=None,
501                                prefix_count_to_del=None):
502         """Composes an UPDATE message
503
504         Arguments:
505             :param prefix_count_to_add: # of prefixes to put into NLRI list
506             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
507         Returns:
508             :return: encoded UPDATE message in HEX
509         Notes:
510             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
511             lists or common message wich includes both prefix lists.
512             Updates global counters.
513         """
514         # default values handling
515         if prefix_count_to_add is None:
516             prefix_count_to_add = self.prefix_count_to_add_default
517         if prefix_count_to_del is None:
518             prefix_count_to_del = self.prefix_count_to_del_default
519         # logging
520         if self.log_info and not (self.iteration % 1000):
521             stdout_logger.info("Iteration: " + str(self.iteration) +
522                                " - total remaining prefixes: " +
523                                str(self.remaining_prefixes))
524         if self.log_debug:
525             stdout_logger.debug("#" * 10 + " Iteration: " +
526                                 str(self.iteration) + " " + "#" * 10)
527             stdout_logger.debug("Remaining prefixes: " +
528                                 str(self.remaining_prefixes))
529         # scenario type & one-shot counter
530         straightforward_scenario = (self.remaining_prefixes >
531                                     self.remaining_prefixes_threshold)
532         if straightforward_scenario:
533             prefix_count_to_del = 0
534             if self.log_debug:
535                 stdout_logger.debug("--- STARAIGHTFORWARD SCENARIO ---")
536             if not self.phase1_start_time:
537                 self.phase1_start_time = time.time()
538         else:
539             if self.log_debug:
540                 stdout_logger.debug("--- COMBINED SCENARIO ---")
541             if not self.phase2_start_time:
542                 self.phase2_start_time = time.time()
543         # tailor the number of prefixes if needed
544         prefix_count_to_add = (prefix_count_to_del +
545                                min(prefix_count_to_add - prefix_count_to_del,
546                                    self.remaining_prefixes))
547         # prefix slots selection for insertion and withdrawal
548         slot_index_to_add = self.iteration
549         slot_index_to_del = slot_index_to_add - self.slot_gap_default
550         # getting lists of prefixes for insertion in this iteration
551         if self.log_debug:
552             stdout_logger.debug("Prefixes to be inserted in this iteration:")
553         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
554                                                   prefix_count=prefix_count_to_add)
555         # getting lists of prefixes for withdrawal in this iteration
556         if self.log_debug:
557             stdout_logger.debug("Prefixes to be withdrawn in this iteration:")
558         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
559                                                   prefix_count=prefix_count_to_del)
560         # generating the mesage
561         if self.single_update_default:
562             # Send prefixes to be introduced and withdrawn
563             # in one UPDATE message
564             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
565                                           nlri_prefixes=prefix_list_to_add)
566         else:
567             # Send prefixes to be introduced and withdrawn
568             # in separate UPDATE messages (if needed)
569             msg_out = self.update_message(wr_prefixes=[],
570                                           nlri_prefixes=prefix_list_to_add)
571             if prefix_count_to_del:
572                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
573                                                nlri_prefixes=[])
574         # updating counters - who knows ... maybe I am last time here ;)
575         if straightforward_scenario:
576             self.phase1_stop_time = time.time()
577             self.phase1_updates_sent = self.updates_sent
578         else:
579             self.phase2_stop_time = time.time()
580             self.phase2_updates_sent = (self.updates_sent -
581                                         self.phase1_updates_sent)
582         # updating totals for the next iteration
583         self.iteration += 1
584         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
585         # returning the encoded message
586         return msg_out
587
588     # Section of message encoders
589
590     def open_message(self, version=None, my_autonomous_system=None,
591                      hold_time=None, bgp_identifier=None):
592         """Generates an OPEN Message (rfc4271#section-4.2)
593
594         Arguments:
595             :param version: see the rfc4271#section-4.2
596             :param my_autonomous_system: see the rfc4271#section-4.2
597             :param hold_time: see the rfc4271#section-4.2
598             :param bgp_identifier: see the rfc4271#section-4.2
599         Returns:
600             :return: encoded OPEN message in HEX
601         """
602
603         # Default values handling
604         if version is None:
605             version = self.version_default
606         if my_autonomous_system is None:
607             my_autonomous_system = self.my_autonomous_system_default
608         if hold_time is None:
609             hold_time = self.hold_time_default
610         if bgp_identifier is None:
611             bgp_identifier = self.bgp_identifier_default
612
613         # Marker
614         marker_hex = "\xFF" * 16
615
616         # Type
617         type = 1
618         type_hex = struct.pack("B", type)
619
620         # version
621         version_hex = struct.pack("B", version)
622
623         # my_autonomous_system
624         # AS_TRANS value, 23456 decadic.
625         my_autonomous_system_2_bytes = 23456
626         # AS number is mappable to 2 bytes
627         if my_autonomous_system < 65536:
628             my_autonomous_system_2_bytes = my_autonomous_system
629         my_autonomous_system_hex_2_bytes = struct.pack(">H",
630                                                        my_autonomous_system)
631
632         # Hold Time
633         hold_time_hex = struct.pack(">H", hold_time)
634
635         # BGP Identifier
636         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
637
638         # Optional Parameters
639         optional_parameters_hex = (
640             "\x02"  # Param type ("Capability Ad")
641             "\x06"  # Length (6 bytes)
642             "\x01"  # Capability type (NLRI Unicast),
643                     # see RFC 4760, secton 8
644             "\x04"  # Capability value length
645             "\x00\x01"  # AFI (Ipv4)
646             "\x00"  # (reserved)
647             "\x01"  # SAFI (Unicast)
648
649             "\x02"  # Param type ("Capability Ad")
650             "\x06"  # Length (6 bytes)
651             "\x41"  # "32 bit AS Numbers Support"
652                     # (see RFC 6793, section 3)
653             "\x04"  # Capability value length
654                     # My AS in 32 bit format
655             + struct.pack(">I", my_autonomous_system)
656         )
657
658         # Optional Parameters Length
659         optional_parameters_length = len(optional_parameters_hex)
660         optional_parameters_length_hex = struct.pack("B",
661                                                      optional_parameters_length)
662
663         # Length (big-endian)
664         length = (
665             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
666             len(my_autonomous_system_hex_2_bytes) +
667             len(hold_time_hex) + len(bgp_identifier_hex) +
668             len(optional_parameters_length_hex) +
669             len(optional_parameters_hex)
670         )
671         length_hex = struct.pack(">H", length)
672
673         # OPEN Message
674         message_hex = (
675             marker_hex +
676             length_hex +
677             type_hex +
678             version_hex +
679             my_autonomous_system_hex_2_bytes +
680             hold_time_hex +
681             bgp_identifier_hex +
682             optional_parameters_length_hex +
683             optional_parameters_hex
684         )
685
686         if self.log_debug:
687             stdout_logger.debug("OPEN Message encoding")
688             stdout_logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
689             stdout_logger.debug("  Length=" + str(length) + " (0x" +
690                                 binascii.hexlify(length_hex) + ")")
691             stdout_logger.debug("  Type=" + str(type) + " (0x" +
692                                 binascii.hexlify(type_hex) + ")")
693             stdout_logger.debug("  Version=" + str(version) + " (0x" +
694                                 binascii.hexlify(version_hex) + ")")
695             stdout_logger.debug("  My Autonomous System=" +
696                                 str(my_autonomous_system_2_bytes) + " (0x" +
697                                 binascii.hexlify(my_autonomous_system_hex_2_bytes) +
698                                 ")")
699             stdout_logger.debug("  Hold Time=" + str(hold_time) + " (0x" +
700                                 binascii.hexlify(hold_time_hex) + ")")
701             stdout_logger.debug("  BGP Identifier=" + str(bgp_identifier) +
702                                 " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
703             stdout_logger.debug("  Optional Parameters Length=" +
704                                 str(optional_parameters_length) + " (0x" +
705                                 binascii.hexlify(optional_parameters_length_hex) +
706                                 ")")
707             stdout_logger.debug("  Optional Parameters=0x" +
708                                 binascii.hexlify(optional_parameters_hex))
709             stdout_logger.debug("  OPEN Message encoded: 0x" +
710                                 binascii.b2a_hex(message_hex))
711
712         return message_hex
713
714     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
715                        wr_prefix_length=None, nlri_prefix_length=None,
716                        my_autonomous_system=None, next_hop=None):
717         """Generates an UPDATE Message (rfc4271#section-4.3)
718
719         Arguments:
720             :param wr_prefixes: see the rfc4271#section-4.3
721             :param nlri_prefixes: see the rfc4271#section-4.3
722             :param wr_prefix_length: see the rfc4271#section-4.3
723             :param nlri_prefix_length: see the rfc4271#section-4.3
724             :param my_autonomous_system: see the rfc4271#section-4.3
725             :param next_hop: see the rfc4271#section-4.3
726         Returns:
727             :return: encoded UPDATE message in HEX
728         """
729
730         # Default values handling
731         if wr_prefixes is None:
732             wr_prefixes = self.wr_prefixes_default
733         if nlri_prefixes is None:
734             nlri_prefixes = self.nlri_prefixes_default
735         if wr_prefix_length is None:
736             wr_prefix_length = self.prefix_length_default
737         if nlri_prefix_length is None:
738             nlri_prefix_length = self.prefix_length_default
739         if my_autonomous_system is None:
740             my_autonomous_system = self.my_autonomous_system_default
741         if next_hop is None:
742             next_hop = self.next_hop_default
743
744         # Marker
745         marker_hex = "\xFF" * 16
746
747         # Type
748         type = 2
749         type_hex = struct.pack("B", type)
750
751         # Withdrawn Routes
752         bytes = ((wr_prefix_length - 1) / 8) + 1
753         withdrawn_routes_hex = ""
754         for prefix in wr_prefixes:
755             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
756                                    struct.pack(">I", int(prefix))[:bytes])
757             withdrawn_routes_hex += withdrawn_route_hex
758
759         # Withdrawn Routes Length
760         withdrawn_routes_length = len(withdrawn_routes_hex)
761         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
762
763         # TODO: to replace hardcoded string by encoding?
764         # Path Attributes
765         if nlri_prefixes != []:
766             path_attributes_hex = (
767                 "\x40"  # Flags ("Well-Known")
768                 "\x01"  # Type (ORIGIN)
769                 "\x01"  # Length (1)
770                 "\x00"  # Origin: IGP
771                 "\x40"  # Flags ("Well-Known")
772                 "\x02"  # Type (AS_PATH)
773                 "\x06"  # Length (6)
774                 "\x02"  # AS segment type (AS_SEQUENCE)
775                 "\x01"  # AS segment length (1)
776                         # AS segment (4 bytes)
777                 + struct.pack(">I", my_autonomous_system) +
778                 "\x40"  # Flags ("Well-Known")
779                 "\x03"  # Type (NEXT_HOP)
780                 "\x04"  # Length (4)
781                         # IP address of the next hop (4 bytes)
782                 + struct.pack(">I", int(next_hop))
783             )
784         else:
785             path_attributes_hex = ""
786
787         # Total Path Attributes Length
788         total_path_attributes_length = len(path_attributes_hex)
789         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
790
791         # Network Layer Reachability Information
792         bytes = ((nlri_prefix_length - 1) / 8) + 1
793         nlri_hex = ""
794         for prefix in nlri_prefixes:
795             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
796                                struct.pack(">I", int(prefix))[:bytes])
797             nlri_hex += nlri_prefix_hex
798
799         # Length (big-endian)
800         length = (
801             len(marker_hex) + 2 + len(type_hex) +
802             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
803             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
804             len(nlri_hex))
805         length_hex = struct.pack(">H", length)
806
807         # UPDATE Message
808         message_hex = (
809             marker_hex +
810             length_hex +
811             type_hex +
812             withdrawn_routes_length_hex +
813             withdrawn_routes_hex +
814             total_path_attributes_length_hex +
815             path_attributes_hex +
816             nlri_hex
817         )
818
819         if self.log_debug:
820             stdout_logger.debug("UPDATE Message encoding")
821             stdout_logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
822             stdout_logger.debug("  Length=" + str(length) + " (0x" +
823                                 binascii.hexlify(length_hex) + ")")
824             stdout_logger.debug("  Type=" + str(type) + " (0x" +
825                                 binascii.hexlify(type_hex) + ")")
826             stdout_logger.debug("  withdrawn_routes_length=" +
827                                 str(withdrawn_routes_length) + " (0x" +
828                                 binascii.hexlify(withdrawn_routes_length_hex) + ")")
829             stdout_logger.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
830                                 str(wr_prefix_length) + " (0x" +
831                                 binascii.hexlify(withdrawn_routes_hex) + ")")
832             stdout_logger.debug("  Total Path Attributes Length=" +
833                                 str(total_path_attributes_length) + " (0x" +
834                                 binascii.hexlify(total_path_attributes_length_hex) +
835                                 ")")
836             stdout_logger.debug("  Path Attributes=" + "(0x" +
837                                 binascii.hexlify(path_attributes_hex) + ")")
838             stdout_logger.debug("  Network Layer Reachability Information=" +
839                                 str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
840                                 " (0x" + binascii.hexlify(nlri_hex) + ")")
841             stdout_logger.debug("  UPDATE Message encoded: 0x" +
842                                 binascii.b2a_hex(message_hex))
843
844         # updating counter
845         self.updates_sent += 1
846         # returning encoded message
847         return message_hex
848
849     def notification_message(self, error_code, error_subcode, data_hex=""):
850         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
851
852         Arguments:
853             :param error_code: see the rfc4271#section-4.5
854             :param error_subcode: see the rfc4271#section-4.5
855             :param data_hex: see the rfc4271#section-4.5
856         Returns:
857             :return: encoded NOTIFICATION message in HEX
858         """
859
860         # Marker
861         marker_hex = "\xFF" * 16
862
863         # Type
864         type = 3
865         type_hex = struct.pack("B", type)
866
867         # Error Code
868         error_code_hex = struct.pack("B", error_code)
869
870         # Error Subode
871         error_subcode_hex = struct.pack("B", error_subcode)
872
873         # Length (big-endian)
874         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
875                   len(error_subcode_hex) + len(data_hex))
876         length_hex = struct.pack(">H", length)
877
878         # NOTIFICATION Message
879         message_hex = (
880             marker_hex +
881             length_hex +
882             type_hex +
883             error_code_hex +
884             error_subcode_hex +
885             data_hex
886         )
887
888         if self.log_debug:
889             stdout_logger.debug("NOTIFICATION Message encoding")
890             stdout_logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
891             stdout_logger.debug("  Length=" + str(length) + " (0x" +
892                                 binascii.hexlify(length_hex) + ")")
893             stdout_logger.debug("  Type=" + str(type) + " (0x" +
894                                 binascii.hexlify(type_hex) + ")")
895             stdout_logger.debug("  Error Code=" + str(error_code) + " (0x" +
896                                 binascii.hexlify(error_code_hex) + ")")
897             stdout_logger.debug("  Error Subode=" + str(error_subcode) + " (0x" +
898                                 binascii.hexlify(error_subcode_hex) + ")")
899             stdout_logger.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
900             stdout_logger.debug("  NOTIFICATION Message encoded: 0x" +
901                                 binascii.b2a_hex(message_hex))
902
903         return message_hex
904
905     def keepalive_message(self):
906         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
907
908         Returns:
909             :return: encoded KEEP ALIVE message in HEX
910         """
911
912         # Marker
913         marker_hex = "\xFF" * 16
914
915         # Type
916         type = 4
917         type_hex = struct.pack("B", type)
918
919         # Length (big-endian)
920         length = len(marker_hex) + 2 + len(type_hex)
921         length_hex = struct.pack(">H", length)
922
923         # KEEP ALIVE Message
924         message_hex = (
925             marker_hex +
926             length_hex +
927             type_hex
928         )
929
930         if self.log_debug:
931             stdout_logger.debug("KEEP ALIVE Message encoding")
932             stdout_logger.debug("  Marker=0x" + binascii.hexlify(marker_hex))
933             stdout_logger.debug("  Length=" + str(length) + " (0x" +
934                                 binascii.hexlify(length_hex) + ")")
935             stdout_logger.debug("  Type=" + str(type) + " (0x" +
936                                 binascii.hexlify(type_hex) + ")")
937             stdout_logger.debug("  KEEP ALIVE Message encoded: 0x" +
938                                 binascii.b2a_hex(message_hex))
939
940         return message_hex
941
942
943 class TimeTracker(object):
944     """Class for tracking timers, both for my keepalives and
945     peer's hold time.
946     """
947
948     def __init__(self, msg_in):
949         """Initialisation. based on defaults and OPEN message from peer.
950
951         Arguments:
952             msg_in: the OPEN message received from peer.
953         """
954         # Note: Relative time is always named timedelta, to stress that
955         # the (non-delta) time is absolute.
956         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
957         # Upper bound for being stuck in the same state, we should
958         # at least report something before continuing.
959         # Negotiate the hold timer by taking the smaller
960         # of the 2 values (mine and the peer's).
961         hold_timedelta = 180  # Not an attribute of self yet.
962         # TODO: Make the default value configurable,
963         # default value could mirror what peer said.
964         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
965         if hold_timedelta > peer_hold_timedelta:
966             hold_timedelta = peer_hold_timedelta
967         if hold_timedelta != 0 and hold_timedelta < 3:
968             stdout_logger.error("Invalid hold timedelta value: " + str(hold_timedelta))
969             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
970         self.hold_timedelta = hold_timedelta
971         # If we do not hear from peer this long, we assume it has died.
972         self.keepalive_timedelta = int(hold_timedelta / 3.0)
973         # Upper limit for duration between messages, to avoid being
974         # declared to be dead.
975         # The same as calling snapshot(), but also declares a field.
976         self.snapshot_time = time.time()
977         # Sometimes we need to store time. This is where to get
978         # the value from afterwards. Time_keepalive may be too strict.
979         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
980         # At this time point, peer will be declared dead.
981         self.my_keepalive_time = None  # to be set later
982         # At this point, we should be sending keepalive message.
983
984     def snapshot(self):
985         """Store current time in instance data to use later."""
986         # Read as time before something interesting was called.
987         self.snapshot_time = time.time()
988
989     def reset_peer_hold_time(self):
990         """Move hold time to future as peer has just proven it still lives."""
991         self.peer_hold_time = time.time() + self.hold_timedelta
992
993     # Some methods could rely on self.snapshot_time, but it is better
994     # to require user to provide it explicitly.
995     def reset_my_keepalive_time(self, keepalive_time):
996         """Calculate and set the next my KEEP ALIVE timeout time
997
998         Arguments:
999             :keepalive_time: the initial value of the KEEP ALIVE timer
1000         """
1001         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
1002
1003     def is_time_for_my_keepalive(self):
1004         """Check for my KEEP ALIVE timeout occurence"""
1005         if self.hold_timedelta == 0:
1006             return False
1007         return self.snapshot_time >= self.my_keepalive_time
1008
1009     def get_next_event_time(self):
1010         """Set the time of the next expected or to be sent KEEP ALIVE"""
1011         if self.hold_timedelta == 0:
1012             return self.snapshot_time + 86400
1013         return min(self.my_keepalive_time, self.peer_hold_time)
1014
1015     def check_peer_hold_time(self, snapshot_time):
1016         """Raise error if nothing was read from peer until specified time."""
1017         # Hold time = 0 means keepalive checking off.
1018         if self.hold_timedelta != 0:
1019             # time.time() may be too strict
1020             if snapshot_time > self.peer_hold_time:
1021                 stdout_logger.error("Peer has overstepped the hold timer.")
1022                 raise RuntimeError("Peer has overstepped the hold timer.")
1023                 # TODO: Include hold_timedelta?
1024                 # TODO: Add notification sending (attempt). That means
1025                 # move to write tracker.
1026
1027
1028 class ReadTracker(object):
1029     """Class for tracking read of mesages chunk by chunk and
1030     for idle waiting.
1031     """
1032
1033     def __init__(self, bgp_socket, timer):
1034         """The reader initialisation.
1035
1036         Arguments:
1037             bgp_socket: socket to be used for sending
1038             timer: timer to be used for scheduling
1039         """
1040         # References to outside objects.
1041         self.socket = bgp_socket
1042         self.timer = timer
1043         # BGP marker length plus length field length.
1044         self.header_length = 18
1045         # TODO: make it class (constant) attribute
1046         # Computation of where next chunk ends depends on whether
1047         # we are beyond length field.
1048         self.reading_header = True
1049         # Countdown towards next size computation.
1050         self.bytes_to_read = self.header_length
1051         # Incremental buffer for message under read.
1052         self.msg_in = ""
1053
1054     def read_message_chunk(self):
1055         """Read up to one message
1056
1057         Note:
1058             Currently it does not return anything.
1059         """
1060         # TODO: We could return the whole message, currently not needed.
1061         # We assume the socket is readable.
1062         chunk_message = self.socket.recv(self.bytes_to_read)
1063         self.msg_in += chunk_message
1064         self.bytes_to_read -= len(chunk_message)
1065         # TODO: bytes_to_read < 0 is not possible, right?
1066         if not self.bytes_to_read:
1067             # Finished reading a logical block.
1068             if self.reading_header:
1069                 # The logical block was a BGP header.
1070                 # Now we know the size of the message.
1071                 self.reading_header = False
1072                 self.bytes_to_read = (get_short_int_from_message(self.msg_in) -
1073                                       self.header_length)
1074             else:  # We have finished reading the body of the message.
1075                 # Peer has just proven it is still alive.
1076                 self.timer.reset_peer_hold_time()
1077                 # TODO: Do we want to count received messages?
1078                 # This version ignores the received message.
1079                 # TODO: Should we do validation and exit on anything
1080                 # besides update or keepalive?
1081                 # Prepare state for reading another message.
1082                 message_type_hex = self.msg_in[self.header_length]
1083                 if message_type_hex == "\x01":
1084                     stdout_logger.info("OPEN message received: 0x%s",
1085                                        binascii.b2a_hex(self.msg_in))
1086                 elif message_type_hex == "\x02":
1087                     stdout_logger.debug("UPDATE message received: 0x%s",
1088                                         binascii.b2a_hex(self.msg_in))
1089                 elif message_type_hex == "\x03":
1090                     stdout_logger.info("NOTIFICATION message received: 0x%s",
1091                                        binascii.b2a_hex(self.msg_in))
1092                 elif message_type_hex == "\x04":
1093                     stdout_logger.info("KEEP ALIVE message received: 0x%s",
1094                                        binascii.b2a_hex(self.msg_in))
1095                 else:
1096                     stdout_logger.warning("Unexpected message received: 0x%s",
1097                                           binascii.b2a_hex(self.msg_in))
1098                 self.msg_in = ""
1099                 self.reading_header = True
1100                 self.bytes_to_read = self.header_length
1101         # We should not act upon peer_hold_time if we are reading
1102         # something right now.
1103         return
1104
1105     def wait_for_read(self):
1106         """Read message until timeout (next expected event).
1107
1108         Note:
1109             Used when no more updates has to be sent to avoid busy-wait.
1110             Currently it does not return anything.
1111         """
1112         # Compute time to the first predictable state change
1113         event_time = self.timer.get_next_event_time()
1114         # snapshot_time would be imprecise
1115         wait_timedelta = event_time - time.time()
1116         if wait_timedelta < 0:
1117             # The program got around to waiting to an event in "very near
1118             # future" so late that it became a "past" event, thus tell
1119             # "select" to not wait at all. Passing negative timedelta to
1120             # select() would lead to either waiting forever (for -1) or
1121             # select.error("Invalid parameter") (for everything else).
1122             wait_timedelta = 0
1123         # And wait for event or something to read.
1124         select.select([self.socket], [], [self.socket], wait_timedelta)
1125         # Not checking anything, that will be done in next iteration.
1126         return
1127
1128
1129 class WriteTracker(object):
1130     """Class tracking enqueueing messages and sending chunks of them."""
1131
1132     def __init__(self, bgp_socket, generator, timer):
1133         """The writter initialisation.
1134
1135         Arguments:
1136             bgp_socket: socket to be used for sending
1137             generator: generator to be used for message generation
1138             timer: timer to be used for scheduling
1139         """
1140         # References to outside objects,
1141         self.socket = bgp_socket
1142         self.generator = generator
1143         self.timer = timer
1144         # Really new fields.
1145         # TODO: Would attribute docstrings add anything substantial?
1146         self.sending_message = False
1147         self.bytes_to_send = 0
1148         self.msg_out = ""
1149
1150     def enqueue_message_for_sending(self, message):
1151         """Enqueue message and change state.
1152
1153         Arguments:
1154             message: message to be enqueued into the msg_out buffer
1155         """
1156         self.msg_out += message
1157         self.bytes_to_send += len(message)
1158         self.sending_message = True
1159
1160     def send_message_chunk_is_whole(self):
1161         """Send enqueued data from msg_out buffer
1162
1163         Returns:
1164             :return: true if no remaining data to send
1165         """
1166         # We assume there is a msg_out to send and socket is writable.
1167         # print "going to send", repr(self.msg_out)
1168         self.timer.snapshot()
1169         bytes_sent = self.socket.send(self.msg_out)
1170         # Forget the part of message that was sent.
1171         self.msg_out = self.msg_out[bytes_sent:]
1172         self.bytes_to_send -= bytes_sent
1173         if not self.bytes_to_send:
1174             # TODO: Is it possible to hit negative bytes_to_send?
1175             self.sending_message = False
1176             # We should have reset hold timer on peer side.
1177             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1178             # The possible reason for not prioritizing reads is gone.
1179             return True
1180         return False
1181
1182
1183 class StateTracker(object):
1184     """Main loop has state so complex it warrants this separate class."""
1185
1186     def __init__(self, bgp_socket, generator, timer):
1187         """The state tracker initialisation.
1188
1189         Arguments:
1190             bgp_socket: socket to be used for sending / receiving
1191             generator: generator to be used for message generation
1192             timer: timer to be used for scheduling
1193         """
1194         # References to outside objects.
1195         self.socket = bgp_socket
1196         self.generator = generator
1197         self.timer = timer
1198         # Sub-trackers.
1199         self.reader = ReadTracker(bgp_socket, timer)
1200         self.writer = WriteTracker(bgp_socket, generator, timer)
1201         # Prioritization state.
1202         self.prioritize_writing = False
1203         # In general, we prioritize reading over writing. But in order
1204         # not to get blocked by neverending reads, we should
1205         # check whether we are not risking running out of holdtime.
1206         # So in some situations, this field is set to True to attempt
1207         # finishing sending a message, after which this field resets
1208         # back to False.
1209         # TODO: Alternative is to switch fairly between reading and
1210         # writing (called round robin from now on).
1211         # Message counting is done in generator.
1212
1213     def perform_one_loop_iteration(self):
1214         """ The main loop iteration
1215
1216         Notes:
1217             Calculates priority, resolves all conditions, calls
1218             appropriate method and returns to caller to repeat.
1219         """
1220         self.timer.snapshot()
1221         if not self.prioritize_writing:
1222             if self.timer.is_time_for_my_keepalive():
1223                 if not self.writer.sending_message:
1224                     # We need to schedule a keepalive ASAP.
1225                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1226                 # We are sending a message now, so let's prioritize it.
1227                 self.prioritize_writing = True
1228         # Now we know what our priorities are, we have to check
1229         # which actions are available.
1230         # socket.socket() returns three lists,
1231         # we store them to list of lists.
1232         list_list = select.select([self.socket], [self.socket], [self.socket],
1233                                   self.timer.report_timedelta)
1234         read_list, write_list, except_list = list_list
1235         # Lists are unpacked, each is either [] or [self.socket],
1236         # so we will test them as boolean.
1237         if except_list:
1238             stdout_logger.error("Exceptional state on the socket.")
1239             raise RuntimeError("Exceptional state on socket", self.socket)
1240         # We will do either read or write.
1241         if not (self.prioritize_writing and write_list):
1242             # Either we have no reason to rush writes,
1243             # or the socket is not writable.
1244             # We are focusing on reading here.
1245             if read_list:  # there is something to read indeed
1246                 # In this case we want to read chunk of message
1247                 # and repeat the select,
1248                 self.reader.read_message_chunk()
1249                 return
1250             # We were focusing on reading, but nothing to read was there.
1251             # Good time to check peer for hold timer.
1252             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1253             # Quiet on the read front, we can have attempt to write.
1254         if write_list:
1255             # Either we really want to reset peer's view of our hold
1256             # timer, or there was nothing to read.
1257             # Were we in the middle of sending a message?
1258             if self.writer.sending_message:
1259                 # Was it the end of a message?
1260                 whole = self.writer.send_message_chunk_is_whole()
1261                 # We were pressed to send something and we did it.
1262                 if self.prioritize_writing and whole:
1263                     # We prioritize reading again.
1264                     self.prioritize_writing = False
1265                 return
1266             # Finally to check if still update messages to be generated.
1267             if self.generator.remaining_prefixes:
1268                 msg_out = self.generator.compose_update_message()
1269                 if not self.generator.remaining_prefixes:
1270                     # We have just finished update generation,
1271                     # end-of-rib is due.
1272                     stdout_logger.info("All update messages generated.")
1273                     stdout_logger.info("Storing performance results.")
1274                     self.generator.store_results()
1275                     stdout_logger.info("Finally an END-OF-RIB is going to be sent.")
1276                     msg_out += self.generator.update_message(wr_prefixes=[],
1277                                                              nlri_prefixes=[])
1278                 self.writer.enqueue_message_for_sending(msg_out)
1279                 # Attempt for real sending to be done in next iteration.
1280                 return
1281             # Nothing to write anymore, except occasional keepalives.
1282             stdout_logger.info("Everything has been done." +
1283                                "Now just waiting for possible incomming message.")
1284             # To avoid busy loop, we do idle waiting here.
1285             self.reader.wait_for_read()
1286             return
1287         # We can neither read nor write.
1288         stdout_logger.warning("Input and output both blocked for " +
1289                               str(self.timer.report_timedelta) + " seconds.")
1290         # FIXME: Are we sure select has been really waiting
1291         # the whole period?
1292         return
1293
1294
1295 if __name__ == "__main__":
1296     """ One time initialisation and iterations looping.
1297
1298     Notes:
1299         Establish BGP connection and run iterations.
1300     """
1301     arguments = parse_arguments()
1302     logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s")
1303     stdout_logger = logging.getLogger("stdout_logger")
1304     stdout_logger.setLevel(arguments.loglevel)
1305     bgp_socket = establish_connection(arguments)
1306     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1307     # Receive open message before sending anything.
1308     # FIXME: Add parameter to send default open message first,
1309     # to work with "you first" peers.
1310     msg_in = read_open_message(bgp_socket)
1311     timer = TimeTracker(msg_in)
1312     generator = MessageGenerator(arguments)
1313     msg_out = generator.open_message()
1314     stdout_logger.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1315     # Send our open message to the peer.
1316     bgp_socket.send(msg_out)
1317     # Wait for confirming keepalive.
1318     # TODO: Surely in just one packet?
1319     # Using exact keepalive length to not to see possible updates.
1320     msg_in = bgp_socket.recv(19)
1321     if msg_in != generator.keepalive_message():
1322         stdout_logger.error("Open not confirmed by keepalive, instead got " +
1323                             binascii.hexlify(msg_in))
1324         raise MessageError("Open not confirmed by keepalive, instead got",
1325                            msg_in)
1326     timer.reset_peer_hold_time()
1327     # Send the keepalive to indicate the connection is accepted.
1328     timer.snapshot()  # Remember this time.
1329     msg_out = generator.keepalive_message()
1330     stdout_logger.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1331     bgp_socket.send(msg_out)
1332     # Use the remembered time.
1333     timer.reset_my_keepalive_time(timer.snapshot_time)
1334     # End of initial handshake phase.
1335     state = StateTracker(bgp_socket, generator, timer)
1336     while True:  # main reactor loop
1337         state.perform_one_loop_iteration()