c9dba81ecca07754c7b1b37a626f4d36e6ea20af
[integration/test.git] / tools / fastbgp / play.py
1 """Utility for playing generated BGP data to ODL.
2
3 It needs to be run with sudo-able user when you want to use ports below 1024
4 as --myip. This utility is used to avoid excessive waiting times which EXABGP
5 exhibits when used with huge router tables and also avoid the memory cost of
6 EXABGP in this type of scenario."""
7
8 # Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
9 #
10 # This program and the accompanying materials are made available under the
11 # terms of the Eclipse Public License v1.0 which accompanies this distribution,
12 # and is available at http://www.eclipse.org/legal/epl-v10.html
13
14 __author__ = "Vratko Polak"
15 __copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
16 __license__ = "Eclipse Public License v1.0"
17 __email__ = "vrpolak@cisco.com"
18
19 import argparse
20 import binascii
21 import ipaddr
22 import select
23 import socket
24 import time
25 import logging
26 import struct
27
28
29 def parse_arguments():
30     """Use argparse to get arguments,
31
32     Returns:
33         :return: args object.
34     """
35     parser = argparse.ArgumentParser()
36     # TODO: Should we use --argument-names-with-spaces?
37     str_help = "Autonomous System number use in the stream."
38     parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
39     # FIXME: We are acting as iBGP peer,
40     # we should mirror AS number from peer's open message.
41     str_help = "Amount of IP prefixes to generate. (negative means ""infinite"")."
42     parser.add_argument("--amount", default="1", type=int, help=str_help)
43     str_help = "Maximum number of IP prefixes to be announced in one iteration"
44     parser.add_argument("--insert", default="1", type=int, help=str_help)
45     str_help = "Maximum number of IP prefixes to be withdrawn in one iteration"
46     parser.add_argument("--withdraw", default="0", type=int, help=str_help)
47     str_help = "The number of prefixes to process without withdrawals"
48     parser.add_argument("--prefill", default="0", type=int, help=str_help)
49     str_help = "Single or two separate UPDATEs for NLRI and WITHDRAWN lists sent"
50     parser.add_argument("--updates", choices=["single", "mixed"],
51                         default=["mixed"], help=str_help)
52     str_help = "Base prefix IP address for prefix generation"
53     parser.add_argument("--firstprefix", default="8.0.1.0",
54                         type=ipaddr.IPv4Address, help=str_help)
55     str_help = "The prefix length."
56     parser.add_argument("--prefixlen", default=28, type=int, help=str_help)
57     str_help = "Listen for connection, instead of initiating it."
58     parser.add_argument("--listen", action="store_true", help=str_help)
59     str_help = ("Numeric IP Address to bind to and derive BGP ID from." +
60                 "Default value only suitable for listening.")
61     parser.add_argument("--myip", default="0.0.0.0",
62                         type=ipaddr.IPv4Address, help=str_help)
63     str_help = ("TCP port to bind to when listening or initiating connection." +
64                 "Default only suitable for initiating.")
65     parser.add_argument("--myport", default="0", type=int, help=str_help)
66     str_help = "The IP of the next hop to be placed into the update messages."
67     parser.add_argument("--nexthop", default="192.0.2.1",
68                         type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
69     str_help = ("Numeric IP Address to try to connect to." +
70                 "Currently no effect in listening mode.")
71     parser.add_argument("--peerip", default="127.0.0.2",
72                         type=ipaddr.IPv4Address, help=str_help)
73     str_help = "TCP port to try to connect to. No effect in listening mode."
74     parser.add_argument("--peerport", default="179", type=int, help=str_help)
75     str_help = "Local hold time."
76     parser.add_argument("--holdtime", default="180", type=int, help=str_help)
77     str_help = "Log level (--error, --warning, --info, --debug)"
78     parser.add_argument("--error", dest="loglevel", action="store_const",
79                         const=logging.ERROR, default=logging.ERROR,
80                         help=str_help)
81     parser.add_argument("--warning", dest="loglevel", action="store_const",
82                         const=logging.WARNING, default=logging.ERROR,
83                         help=str_help)
84     parser.add_argument("--info", dest="loglevel", action="store_const",
85                         const=logging.INFO, default=logging.ERROR,
86                         help=str_help)
87     parser.add_argument("--debug", dest="loglevel", action="store_const",
88                         const=logging.DEBUG, default=logging.ERROR,
89                         help=str_help)
90     str_help = "Trailing part of the csv result files for plotting purposes"
91     parser.add_argument("--results", default="bgp.csv", type=str, help=str_help)
92     str_help = "Minimum number of updates to reach to include result into csv."
93     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
94     arguments = parser.parse_args()
95     # TODO: Are sanity checks (such as asnumber>=0) required?
96     return arguments
97
98
99 def establish_connection(arguments):
100     """Establish connection to BGP peer.
101
102     Arguments:
103         :arguments: following command-line argumets are used
104             - arguments.myip: local IP address
105             - arguments.myport: local port
106             - arguments.peerip: remote IP address
107             - arguments.peerport: remote port
108     Returns:
109         :return: socket.
110     """
111     if arguments.listen:
112         logging.info("Connecting in the listening mode.")
113         logging.debug("Local IP address: " + str(arguments.myip))
114         logging.debug("Local port: " + str(arguments.myport))
115         listening_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
116         listening_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
117         # bind need single tuple as argument
118         listening_socket.bind((str(arguments.myip), arguments.myport))
119         listening_socket.listen(1)
120         bgp_socket, _ = listening_socket.accept()
121         # TODO: Verify client IP is cotroller IP.
122         listening_socket.close()
123     else:
124         logging.info("Connecting in the talking mode.")
125         logging.debug("Local IP address: " + str(arguments.myip))
126         logging.debug("Local port: " + str(arguments.myport))
127         logging.debug("Remote IP address: " + str(arguments.peerip))
128         logging.debug("Remote port: " + str(arguments.peerport))
129         talking_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
130         talking_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
131         # bind to force specified address and port
132         talking_socket.bind((str(arguments.myip), arguments.myport))
133         # socket does not spead ipaddr, hence str()
134         talking_socket.connect((str(arguments.peerip), arguments.peerport))
135         bgp_socket = talking_socket
136     logging.info("Connected to ODL.")
137     return bgp_socket
138
139
140 def get_short_int_from_message(message, offset=16):
141     """Extract 2-bytes number from provided message.
142
143     Arguments:
144         :message: given message
145         :offset: offset of the short_int inside the message
146     Returns:
147         :return: required short_inf value.
148     Notes:
149         default offset value is the BGP message size offset.
150     """
151     high_byte_int = ord(message[offset])
152     low_byte_int = ord(message[offset + 1])
153     short_int = high_byte_int * 256 + low_byte_int
154     return short_int
155
156
157 class MessageError(ValueError):
158     """Value error with logging optimized for hexlified messages.
159     """
160
161     def __init__(self, text, message, *args):
162         """Initialisation.
163
164         Store and call super init for textual comment,
165         store raw message which caused it.
166         """
167         self.text = text
168         self.msg = message
169         super(MessageError, self).__init__(text, message, *args)
170
171     def __str__(self):
172         """Generate human readable error message.
173
174         Returns:
175             :return: human readable message as string
176         Notes:
177             Use a placeholder string if the message is to be empty.
178         """
179         message = binascii.hexlify(self.msg)
180         if message == "":
181             message = "(empty message)"
182         return self.text + ": " + message
183
184
185 def read_open_message(bgp_socket):
186     """Receive peer's OPEN message
187
188     Arguments:
189         :bgp_socket: the socket to be read
190     Returns:
191         :return: received OPEN message.
192     Notes:
193         Performs just basic incomming message checks
194     """
195     msg_in = bgp_socket.recv(65535)  # TODO: Is smaller buffer size safe?
196     # TODO: Can the incoming open message be split in more than one packet?
197     # Some validation.
198     if len(msg_in) < 37:
199         # 37 is minimal length of open message with 4-byte AS number.
200         logging.error("Got something else than open with 4-byte AS number: " +
201                       binascii.hexlify(msg_in))
202         raise MessageError("Got something else than open with 4-byte AS number",
203                            msg_in)
204     # TODO: We could check BGP marker, but it is defined only later;
205     # decide what to do.
206     reported_length = get_short_int_from_message(msg_in)
207     if len(msg_in) != reported_length:
208         logging.error("Message length is not " + str(reported_length) +
209                       " as stated in " + binascii.hexlify(msg_in))
210         raise MessageError("Message length is not " + reported_length +
211                            " as stated in ", msg_in)
212     logging.info("Open message received.")
213     return msg_in
214
215
216 class MessageGenerator(object):
217     """Class which generates messages, holds states and configuration values.
218     """
219
220     # TODO: Define bgp marker as a class (constant) variable.
221     def __init__(self, args):
222         """Initialisation according to command-line args.
223
224         Arguments:
225             :args: argsparser's Namespace object which contains command-line
226                 options for MesageGenerator initialisation
227         Notes:
228             Calculates and stores default values used later on for
229             message geeration.
230         """
231         self.total_prefix_amount = args.amount
232         # Number of update messages left to be sent.
233         self.remaining_prefixes = self.total_prefix_amount
234
235         # New parameters initialisation
236         self.iteration = 0
237         self.prefix_base_default = args.firstprefix
238         self.prefix_length_default = args.prefixlen
239         self.wr_prefixes_default = []
240         self.nlri_prefixes_default = []
241         self.version_default = 4
242         self.my_autonomous_system_default = args.asnumber
243         self.hold_time_default = args.holdtime  # Local hold time.
244         self.bgp_identifier_default = int(args.myip)
245         self.next_hop_default = args.nexthop
246         self.single_update_default = args.updates == "single"
247         self.randomize_updates_default = args.updates == "random"
248         self.prefix_count_to_add_default = args.insert
249         self.prefix_count_to_del_default = args.withdraw
250         if self.prefix_count_to_del_default < 0:
251             self.prefix_count_to_del_default = 0
252         if not self.single_update_default and not self.prefix_count_to_del_default:
253             self.prefix_count_to_del_default = 1
254             # we need some content for the 2nd UPDATE in the iteration
255         if self.prefix_count_to_add_default <= self.prefix_count_to_del_default:
256             # total number of prefixes must grow to avoid infinite test loop
257             self.prefix_count_to_add_default = self.prefix_count_to_del_default + 1
258         self.slot_size_default = self.prefix_count_to_add_default
259         self.remaining_prefixes_threshold = self.total_prefix_amount - args.prefill
260         self.results_file_name_default = args.results
261         self.performance_threshold_default = args.threshold
262         # Default values used for randomized part
263         s1_slots = ((self.total_prefix_amount -
264                      self.remaining_prefixes_threshold - 1) /
265                     self.prefix_count_to_add_default + 1)
266         s2_slots = ((self.remaining_prefixes_threshold - 1) /
267                     (self.prefix_count_to_add_default -
268                     self.prefix_count_to_del_default) + 1)
269         # S1_First_Index = 0
270         # S1_Last_Index = s1_slots * self.prefix_count_to_add_default - 1
271         s2_first_index = s1_slots * self.prefix_count_to_add_default
272         s2_last_index = (s2_first_index +
273                          s2_slots * (self.prefix_count_to_add_default -
274                                      self.prefix_count_to_del_default) - 1)
275         self.slot_gap_default = ((self.total_prefix_amount -
276                                   self.remaining_prefixes_threshold - 1) /
277                                  self.prefix_count_to_add_default + 1)
278         self.randomize_lowest_default = s2_first_index
279         self.randomize_highest_default = s2_last_index
280
281         # Initialising counters
282         self.phase1_start_time = 0
283         self.phase1_stop_time = 0
284         self.phase2_start_time = 0
285         self.phase2_stop_time = 0
286         self.phase1_updates_sent = 0
287         self.phase2_updates_sent = 0
288         self.updates_sent = 0
289
290         # Needed for the MessageGenerator performance optimization
291         self.log_info = args.loglevel <= logging.INFO
292         self.log_debug = args.loglevel <= logging.DEBUG
293
294         logging.info("Generator initialisation")
295         logging.info("  Target total number of prefixes to be introduced: " +
296                      str(self.total_prefix_amount))
297         logging.info("  Prefix base: " + str(self.prefix_base_default) + "/" +
298                      str(self.prefix_length_default))
299         logging.info("  My Autonomous System number: " +
300                      str(self.my_autonomous_system_default))
301         logging.info("  My Hold Time: " + str(self.hold_time_default))
302         logging.info("  My BGP Identifier: " + str(self.bgp_identifier_default))
303         logging.info("  Next Hop: " + str(self.next_hop_default))
304         logging.info("  Prefix count to be inserted at once: " +
305                      str(self.prefix_count_to_add_default))
306         logging.info("  Prefix count to be withdrawn at once: " +
307                      str(self.prefix_count_to_del_default))
308         logging.info("  Fast pre-fill up to " +
309                      str(self.total_prefix_amount -
310                          self.remaining_prefixes_threshold) + " prefixes")
311         logging.info("  Remaining number of prefixes to be processed " +
312                      "in parallel with withdrawals: " +
313                      str(self.remaining_prefixes_threshold))
314         logging.debug("  Prefix index range used after pre-fill procedure [" +
315                       str(self.randomize_lowest_default) + ", " +
316                       str(self.randomize_highest_default) + "]")
317         if self.single_update_default:
318             logging.info("  Common single UPDATE will be generated " +
319                          "for both NLRI & WITHDRAWN lists")
320         else:
321             logging.info("  Two separate UPDATEs will be generated " +
322                          "for each NLRI & WITHDRAWN lists")
323         if self.randomize_updates_default:
324             logging.info("  Generation of UPDATE messages will be randomized")
325         logging.info("  Let\"s go ...\n")
326
327         # TODO: Notification for hold timer expiration can be handy.
328
329     def store_results(self, file_name=None, threshold=None):
330         """ Stores specified results into files based on file_name value.
331
332         Arguments:
333             :param file_name: Trailing (common) part of result file names
334             :param threshold: Minimum number of sent updates needed for each
335                               result to be included into result csv file
336                               (mainly needed because of the result accuracy)
337         Returns:
338             :return: n/a
339         """
340         # default values handling
341         if file_name is None:
342             file_name = self.results_file_name_default
343         if threshold is None:
344             threshold = self.performance_threshold_default
345         # performance calculation
346         if self.phase1_updates_sent >= threshold:
347             totals1 = self.phase1_updates_sent
348             performance1 = int(self.phase1_updates_sent /
349                                (self.phase1_stop_time - self.phase1_start_time))
350         else:
351             totals1 = None
352             performance1 = None
353         if self.phase2_updates_sent >= threshold:
354             totals2 = self.phase2_updates_sent
355             performance2 = int(self.phase2_updates_sent /
356                                (self.phase2_stop_time - self.phase2_start_time))
357         else:
358             totals2 = None
359             performance2 = None
360
361         logging.info("#" * 10 + " Final results " + "#" * 10)
362         logging.info("Number of iterations: " + str(self.iteration))
363         logging.info("Number of UPDATE messages sent in the pre-fill phase: " +
364                      str(self.phase1_updates_sent))
365         logging.info("The pre-fill phase duration: " +
366                      str(self.phase1_stop_time - self.phase1_start_time) + "s")
367         logging.info("Number of UPDATE messages sent in the 2nd test phase: " +
368                      str(self.phase2_updates_sent))
369         logging.info("The 2nd test phase duration: " +
370                      str(self.phase2_stop_time - self.phase2_start_time) + "s")
371         logging.info("Threshold for performance reporting: " + str(threshold))
372
373         # making labels
374         phase1_label = ("pre-fill " + str(self.prefix_count_to_add_default) +
375                         " route(s) per UPDATE")
376         if self.single_update_default:
377             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
378                                   "/-" + str(self.prefix_count_to_del_default) +
379                                   " routes per UPDATE")
380         else:
381             phase2_label = "+" + (str(self.prefix_count_to_add_default) +
382                                   "/-" + str(self.prefix_count_to_del_default) +
383                                   " routes in two UPDATEs")
384         # collecting capacity and performance results
385         totals = {}
386         performance = {}
387         if totals1 is not None:
388             totals[phase1_label] = totals1
389             performance[phase1_label] = performance1
390         if totals2 is not None:
391             totals[phase2_label] = totals2
392             performance[phase2_label] = performance2
393         self.write_results_to_file(totals, "totals-" + file_name)
394         self.write_results_to_file(performance, "performance-" + file_name)
395
396     def write_results_to_file(self, results, file_name):
397         """Writes results to the csv plot file consumable by Jenkins.
398
399         Arguments:
400             :param file_name: Name of the (csv) file to be created
401         Returns:
402             :return: none
403         """
404         first_line = ""
405         second_line = ""
406         f = open(file_name, "wt")
407         try:
408             for key in sorted(results):
409                 first_line += key + ", "
410                 second_line += str(results[key]) + ", "
411             first_line = first_line[:-2]
412             second_line = second_line[:-2]
413             f.write(first_line + "\n")
414             f.write(second_line + "\n")
415             logging.info("Performance results of message generator stored in " +
416                          file_name + ':')
417             logging.info("  " + first_line)
418             logging.info("  " + second_line)
419         finally:
420             f.close()
421
422     # Return pseudo-randomized (reproducible) index for selected range
423     def randomize_index(self, index, lowest=None, highest=None):
424         """Calculates pseudo-randomized index from selected range.
425
426         Arguments:
427             :param index: input index
428             :param lowest: the lowes index from the randomized area
429             :param highest: the highest index from the randomized area
430         Returns:
431             :return: the (pseudo)randomized index
432         Notes:
433             Created just as a fame for future generator enhancement.
434         """
435         # default values handling
436         if lowest is None:
437             lowest = self.randomize_lowest_default
438         if highest is None:
439             highest = self.randomize_highest_default
440         # randomize
441         if (index >= lowest) and (index <= highest):
442             # we are in the randomized range -> shuffle it inside
443             # the range (now just reverse the order)
444             new_index = highest - (index - lowest)
445         else:
446             # we are out of the randomized range -> nothing to do
447             new_index = index
448         return new_index
449
450     # Get list of prefixes
451     def get_prefix_list(self, slot_index, slot_size=None, prefix_base=None,
452                         prefix_len=None, prefix_count=None, randomize=None):
453         """Generates list of IP address prefixes.
454
455         Arguments:
456             :param slot_index: index of group of prefix addresses
457             :param slot_size: size of group of prefix addresses
458                 in [number of included prefixes]
459             :param prefix_base: IP address of the first prefix
460                 (slot_index = 0, prefix_index = 0)
461             :param prefix_len: length of the prefix in bites
462                 (the same as size of netmask)
463             :param prefix_count: number of prefixes to be returned
464                 from the specified slot
465         Returns:
466             :return: list of generated IP address prefixes
467         """
468         # default values handling
469         if slot_size is None:
470             slot_size = self.slot_size_default
471         if prefix_base is None:
472             prefix_base = self.prefix_base_default
473         if prefix_len is None:
474             prefix_len = self.prefix_length_default
475         if prefix_count is None:
476             prefix_count = slot_size
477         if randomize is None:
478             randomize = self.randomize_updates_default
479         # generating list of prefixes
480         indexes = []
481         prefixes = []
482         prefix_gap = 2 ** (32 - prefix_len)
483         for i in range(prefix_count):
484             prefix_index = slot_index * slot_size + i
485             if randomize:
486                 prefix_index = self.randomize_index(prefix_index)
487             indexes.append(prefix_index)
488             prefixes.append(prefix_base + prefix_index * prefix_gap)
489         if self.log_debug:
490             logging.debug("  Prefix slot index: " + str(slot_index))
491             logging.debug("  Prefix slot size: " + str(slot_size))
492             logging.debug("  Prefix count: " + str(prefix_count))
493             logging.debug("  Prefix indexes: " + str(indexes))
494             logging.debug("  Prefix list: " + str(prefixes))
495         return prefixes
496
497     def compose_update_message(self, prefix_count_to_add=None,
498                                prefix_count_to_del=None):
499         """Composes an UPDATE message
500
501         Arguments:
502             :param prefix_count_to_add: # of prefixes to put into NLRI list
503             :param prefix_count_to_del: # of prefixes to put into WITHDRAWN list
504         Returns:
505             :return: encoded UPDATE message in HEX
506         Notes:
507             Optionally generates separate UPDATEs for NLRI and WITHDRAWN
508             lists or common message wich includes both prefix lists.
509             Updates global counters.
510         """
511         # default values handling
512         if prefix_count_to_add is None:
513             prefix_count_to_add = self.prefix_count_to_add_default
514         if prefix_count_to_del is None:
515             prefix_count_to_del = self.prefix_count_to_del_default
516         # logging
517         if self.log_info and not (self.iteration % 1000):
518             logging.info("Iteration: " + str(self.iteration) +
519                          " - total remaining prefixes: " +
520                          str(self.remaining_prefixes))
521         if self.log_debug:
522             logging.debug("#" * 10 + " Iteration: " +
523                           str(self.iteration) + " " + "#" * 10)
524             logging.debug("Remaining prefixes: " +
525                           str(self.remaining_prefixes))
526         # scenario type & one-shot counter
527         straightforward_scenario = (self.remaining_prefixes >
528                                     self.remaining_prefixes_threshold)
529         if straightforward_scenario:
530             prefix_count_to_del = 0
531             if self.log_debug:
532                 logging.debug("--- STARAIGHTFORWARD SCENARIO ---")
533             if not self.phase1_start_time:
534                 self.phase1_start_time = time.time()
535         else:
536             if self.log_debug:
537                 logging.debug("--- COMBINED SCENARIO ---")
538             if not self.phase2_start_time:
539                 self.phase2_start_time = time.time()
540         # tailor the number of prefixes if needed
541         prefix_count_to_add = (prefix_count_to_del +
542                                min(prefix_count_to_add - prefix_count_to_del,
543                                    self.remaining_prefixes))
544         # prefix slots selection for insertion and withdrawal
545         slot_index_to_add = self.iteration
546         slot_index_to_del = slot_index_to_add - self.slot_gap_default
547         # getting lists of prefixes for insertion in this iteration
548         if self.log_debug:
549             logging.debug("Prefixes to be inserted in this iteration:")
550         prefix_list_to_add = self.get_prefix_list(slot_index_to_add,
551                                                   prefix_count=prefix_count_to_add)
552         # getting lists of prefixes for withdrawal in this iteration
553         if self.log_debug:
554             logging.debug("Prefixes to be withdrawn in this iteration:")
555         prefix_list_to_del = self.get_prefix_list(slot_index_to_del,
556                                                   prefix_count=prefix_count_to_del)
557         # generating the mesage
558         if self.single_update_default:
559             # Send prefixes to be introduced and withdrawn
560             # in one UPDATE message
561             msg_out = self.update_message(wr_prefixes=prefix_list_to_del,
562                                           nlri_prefixes=prefix_list_to_add)
563         else:
564             # Send prefixes to be introduced and withdrawn
565             # in separate UPDATE messages (if needed)
566             msg_out = self.update_message(wr_prefixes=[],
567                                           nlri_prefixes=prefix_list_to_add)
568             if prefix_count_to_del:
569                 msg_out += self.update_message(wr_prefixes=prefix_list_to_del,
570                                                nlri_prefixes=[])
571         # updating counters - who knows ... maybe I am last time here ;)
572         if straightforward_scenario:
573             self.phase1_stop_time = time.time()
574             self.phase1_updates_sent = self.updates_sent
575         else:
576             self.phase2_stop_time = time.time()
577             self.phase2_updates_sent = (self.updates_sent -
578                                         self.phase1_updates_sent)
579         # updating totals for the next iteration
580         self.iteration += 1
581         self.remaining_prefixes -= (prefix_count_to_add - prefix_count_to_del)
582         # returning the encoded message
583         return msg_out
584
585     # Section of message encoders
586
587     def open_message(self, version=None, my_autonomous_system=None,
588                      hold_time=None, bgp_identifier=None):
589         """Generates an OPEN Message (rfc4271#section-4.2)
590
591         Arguments:
592             :param version: see the rfc4271#section-4.2
593             :param my_autonomous_system: see the rfc4271#section-4.2
594             :param hold_time: see the rfc4271#section-4.2
595             :param bgp_identifier: see the rfc4271#section-4.2
596         Returns:
597             :return: encoded OPEN message in HEX
598         """
599
600         # Default values handling
601         if version is None:
602             version = self.version_default
603         if my_autonomous_system is None:
604             my_autonomous_system = self.my_autonomous_system_default
605         if hold_time is None:
606             hold_time = self.hold_time_default
607         if bgp_identifier is None:
608             bgp_identifier = self.bgp_identifier_default
609
610         # Marker
611         marker_hex = "\xFF" * 16
612
613         # Type
614         type = 1
615         type_hex = struct.pack("B", type)
616
617         # version
618         version_hex = struct.pack("B", version)
619
620         # my_autonomous_system
621         # AS_TRANS value, 23456 decadic.
622         my_autonomous_system_2_bytes = 23456
623         # AS number is mappable to 2 bytes
624         if my_autonomous_system < 65536:
625             my_autonomous_system_2_bytes = my_autonomous_system
626         my_autonomous_system_hex_2_bytes = struct.pack(">H",
627                                                        my_autonomous_system)
628
629         # Hold Time
630         hold_time_hex = struct.pack(">H", hold_time)
631
632         # BGP Identifier
633         bgp_identifier_hex = struct.pack(">I", bgp_identifier)
634
635         # Optional Parameters
636         optional_parameters_hex = (
637             "\x02"  # Param type ("Capability Ad")
638             "\x06"  # Length (6 bytes)
639             "\x01"  # Capability type (NLRI Unicast),
640                     # see RFC 4760, secton 8
641             "\x04"  # Capability value length
642             "\x00\x01"  # AFI (Ipv4)
643             "\x00"  # (reserved)
644             "\x01"  # SAFI (Unicast)
645
646             "\x02"  # Param type ("Capability Ad")
647             "\x06"  # Length (6 bytes)
648             "\x41"  # "32 bit AS Numbers Support"
649                     # (see RFC 6793, section 3)
650             "\x04"  # Capability value length
651                     # My AS in 32 bit format
652             + struct.pack(">I", my_autonomous_system)
653         )
654
655         # Optional Parameters Length
656         optional_parameters_length = len(optional_parameters_hex)
657         optional_parameters_length_hex = struct.pack("B",
658                                                      optional_parameters_length)
659
660         # Length (big-endian)
661         length = (
662             len(marker_hex) + 2 + len(type_hex) + len(version_hex) +
663             len(my_autonomous_system_hex_2_bytes) +
664             len(hold_time_hex) + len(bgp_identifier_hex) +
665             len(optional_parameters_length_hex) +
666             len(optional_parameters_hex)
667         )
668         length_hex = struct.pack(">H", length)
669
670         # OPEN Message
671         message_hex = (
672             marker_hex +
673             length_hex +
674             type_hex +
675             version_hex +
676             my_autonomous_system_hex_2_bytes +
677             hold_time_hex +
678             bgp_identifier_hex +
679             optional_parameters_length_hex +
680             optional_parameters_hex
681         )
682
683         if self.log_debug:
684             logging.debug("OPEN Message encoding")
685             logging.debug("  Marker=0x" + binascii.hexlify(marker_hex))
686             logging.debug("  Length=" + str(length) + " (0x" +
687                           binascii.hexlify(length_hex) + ")")
688             logging.debug("  Type=" + str(type) + " (0x" +
689                           binascii.hexlify(type_hex) + ")")
690             logging.debug("  Version=" + str(version) + " (0x" +
691                           binascii.hexlify(version_hex) + ")")
692             logging.debug("  My Autonomous System=" +
693                           str(my_autonomous_system_2_bytes) + " (0x" +
694                           binascii.hexlify(my_autonomous_system_hex_2_bytes) +
695                           ")")
696             logging.debug("  Hold Time=" + str(hold_time) + " (0x" +
697                           binascii.hexlify(hold_time_hex) + ")")
698             logging.debug("  BGP Identifier=" + str(bgp_identifier) +
699                           " (0x" + binascii.hexlify(bgp_identifier_hex) + ")")
700             logging.debug("  Optional Parameters Length=" +
701                           str(optional_parameters_length) + " (0x" +
702                           binascii.hexlify(optional_parameters_length_hex) +
703                           ")")
704             logging.debug("  Optional Parameters=0x" +
705                           binascii.hexlify(optional_parameters_hex))
706             logging.debug("  OPEN Message encoded: 0x" +
707                           binascii.b2a_hex(message_hex))
708
709         return message_hex
710
711     def update_message(self, wr_prefixes=None, nlri_prefixes=None,
712                        wr_prefix_length=None, nlri_prefix_length=None,
713                        my_autonomous_system=None, next_hop=None):
714         """Generates an UPDATE Message (rfc4271#section-4.3)
715
716         Arguments:
717             :param wr_prefixes: see the rfc4271#section-4.3
718             :param nlri_prefixes: see the rfc4271#section-4.3
719             :param wr_prefix_length: see the rfc4271#section-4.3
720             :param nlri_prefix_length: see the rfc4271#section-4.3
721             :param my_autonomous_system: see the rfc4271#section-4.3
722             :param next_hop: see the rfc4271#section-4.3
723         Returns:
724             :return: encoded UPDATE message in HEX
725         """
726
727         # Default values handling
728         if wr_prefixes is None:
729             wr_prefixes = self.wr_prefixes_default
730         if nlri_prefixes is None:
731             nlri_prefixes = self.nlri_prefixes_default
732         if wr_prefix_length is None:
733             wr_prefix_length = self.prefix_length_default
734         if nlri_prefix_length is None:
735             nlri_prefix_length = self.prefix_length_default
736         if my_autonomous_system is None:
737             my_autonomous_system = self.my_autonomous_system_default
738         if next_hop is None:
739             next_hop = self.next_hop_default
740
741         # Marker
742         marker_hex = "\xFF" * 16
743
744         # Type
745         type = 2
746         type_hex = struct.pack("B", type)
747
748         # Withdrawn Routes
749         bytes = ((wr_prefix_length - 1) / 8) + 1
750         withdrawn_routes_hex = ""
751         for prefix in wr_prefixes:
752             withdrawn_route_hex = (struct.pack("B", wr_prefix_length) +
753                                    struct.pack(">I", int(prefix))[:bytes])
754             withdrawn_routes_hex += withdrawn_route_hex
755
756         # Withdrawn Routes Length
757         withdrawn_routes_length = len(withdrawn_routes_hex)
758         withdrawn_routes_length_hex = struct.pack(">H", withdrawn_routes_length)
759
760         # TODO: to replace hardcoded string by encoding?
761         # Path Attributes
762         if nlri_prefixes != []:
763             path_attributes_hex = (
764                 "\x40"  # Flags ("Well-Known")
765                 "\x01"  # Type (ORIGIN)
766                 "\x01"  # Length (1)
767                 "\x00"  # Origin: IGP
768                 "\x40"  # Flags ("Well-Known")
769                 "\x02"  # Type (AS_PATH)
770                 "\x06"  # Length (6)
771                 "\x02"  # AS segment type (AS_SEQUENCE)
772                 "\x01"  # AS segment length (1)
773                         # AS segment (4 bytes)
774                 + struct.pack(">I", my_autonomous_system) +
775                 "\x40"  # Flags ("Well-Known")
776                 "\x03"  # Type (NEXT_HOP)
777                 "\x04"  # Length (4)
778                         # IP address of the next hop (4 bytes)
779                 + struct.pack(">I", int(next_hop))
780             )
781         else:
782             path_attributes_hex = ""
783
784         # Total Path Attributes Length
785         total_path_attributes_length = len(path_attributes_hex)
786         total_path_attributes_length_hex = struct.pack(">H", total_path_attributes_length)
787
788         # Network Layer Reachability Information
789         bytes = ((nlri_prefix_length - 1) / 8) + 1
790         nlri_hex = ""
791         for prefix in nlri_prefixes:
792             nlri_prefix_hex = (struct.pack("B", nlri_prefix_length) +
793                                struct.pack(">I", int(prefix))[:bytes])
794             nlri_hex += nlri_prefix_hex
795
796         # Length (big-endian)
797         length = (
798             len(marker_hex) + 2 + len(type_hex) +
799             len(withdrawn_routes_length_hex) + len(withdrawn_routes_hex) +
800             len(total_path_attributes_length_hex) + len(path_attributes_hex) +
801             len(nlri_hex))
802         length_hex = struct.pack(">H", length)
803
804         # UPDATE Message
805         message_hex = (
806             marker_hex +
807             length_hex +
808             type_hex +
809             withdrawn_routes_length_hex +
810             withdrawn_routes_hex +
811             total_path_attributes_length_hex +
812             path_attributes_hex +
813             nlri_hex
814         )
815
816         if self.log_debug:
817             logging.debug("UPDATE Message encoding")
818             logging.debug("  Marker=0x" + binascii.hexlify(marker_hex))
819             logging.debug("  Length=" + str(length) + " (0x" +
820                           binascii.hexlify(length_hex) + ")")
821             logging.debug("  Type=" + str(type) + " (0x" +
822                           binascii.hexlify(type_hex) + ")")
823             logging.debug("  withdrawn_routes_length=" +
824                           str(withdrawn_routes_length) + " (0x" +
825                           binascii.hexlify(withdrawn_routes_length_hex) + ")")
826             logging.debug("  Withdrawn_Routes=" + str(wr_prefixes) + "/" +
827                           str(wr_prefix_length) + " (0x" +
828                           binascii.hexlify(withdrawn_routes_hex) + ")")
829             logging.debug("  Total Path Attributes Length=" +
830                           str(total_path_attributes_length) + " (0x" +
831                           binascii.hexlify(total_path_attributes_length_hex) +
832                           ")")
833             logging.debug("  Path Attributes=" + "(0x" +
834                           binascii.hexlify(path_attributes_hex) + ")")
835             logging.debug("  Network Layer Reachability Information=" +
836                           str(nlri_prefixes) + "/" + str(nlri_prefix_length) +
837                           " (0x" + binascii.hexlify(nlri_hex) + ")")
838             logging.debug("  UPDATE Message encoded: 0x" +
839                           binascii.b2a_hex(message_hex))
840
841         # updating counter
842         self.updates_sent += 1
843         # returning encoded message
844         return message_hex
845
846     def notification_message(self, error_code, error_subcode, data_hex=""):
847         """Generates a NOTIFICATION Message (rfc4271#section-4.5)
848
849         Arguments:
850             :param error_code: see the rfc4271#section-4.5
851             :param error_subcode: see the rfc4271#section-4.5
852             :param data_hex: see the rfc4271#section-4.5
853         Returns:
854             :return: encoded NOTIFICATION message in HEX
855         """
856
857         # Marker
858         marker_hex = "\xFF" * 16
859
860         # Type
861         type = 3
862         type_hex = struct.pack("B", type)
863
864         # Error Code
865         error_code_hex = struct.pack("B", error_code)
866
867         # Error Subode
868         error_subcode_hex = struct.pack("B", error_subcode)
869
870         # Length (big-endian)
871         length = (len(marker_hex) + 2 + len(type_hex) + len(error_code_hex) +
872                   len(error_subcode_hex) + len(data_hex))
873         length_hex = struct.pack(">H", length)
874
875         # NOTIFICATION Message
876         message_hex = (
877             marker_hex +
878             length_hex +
879             type_hex +
880             error_code_hex +
881             error_subcode_hex +
882             data_hex
883         )
884
885         if self.log_debug:
886             logging.debug("NOTIFICATION Message encoding")
887             logging.debug("  Marker=0x" + binascii.hexlify(marker_hex))
888             logging.debug("  Length=" + str(length) + " (0x" +
889                           binascii.hexlify(length_hex) + ")")
890             logging.debug("  Type=" + str(type) + " (0x" +
891                           binascii.hexlify(type_hex) + ")")
892             logging.debug("  Error Code=" + str(error_code) + " (0x" +
893                           binascii.hexlify(error_code_hex) + ")")
894             logging.debug("  Error Subode=" + str(error_subcode) + " (0x" +
895                           binascii.hexlify(error_subcode_hex) + ")")
896             logging.debug("  Data=" + " (0x" + binascii.hexlify(data_hex) + ")")
897             logging.debug("  NOTIFICATION Message encoded: 0x" +
898                           binascii.b2a_hex(message_hex))
899
900         return message_hex
901
902     def keepalive_message(self):
903         """Generates a KEEP ALIVE Message (rfc4271#section-4.4)
904
905         Returns:
906             :return: encoded KEEP ALIVE message in HEX
907         """
908
909         # Marker
910         marker_hex = "\xFF" * 16
911
912         # Type
913         type = 4
914         type_hex = struct.pack("B", type)
915
916         # Length (big-endian)
917         length = len(marker_hex) + 2 + len(type_hex)
918         length_hex = struct.pack(">H", length)
919
920         # KEEP ALIVE Message
921         message_hex = (
922             marker_hex +
923             length_hex +
924             type_hex
925         )
926
927         if self.log_debug:
928             logging.debug("KEEP ALIVE Message encoding")
929             logging.debug("  Marker=0x" + binascii.hexlify(marker_hex))
930             logging.debug("  Length=" + str(length) + " (0x" +
931                           binascii.hexlify(length_hex) + ")")
932             logging.debug("  Type=" + str(type) + " (0x" +
933                           binascii.hexlify(type_hex) + ")")
934             logging.debug("  KEEP ALIVE Message encoded: 0x" +
935                           binascii.b2a_hex(message_hex))
936
937         return message_hex
938
939
940 class TimeTracker(object):
941     """Class for tracking timers, both for my keepalives and
942     peer's hold time.
943     """
944
945     def __init__(self, msg_in):
946         """Initialisation. based on defaults and OPEN message from peer.
947
948         Arguments:
949             msg_in: the OPEN message received from peer.
950         """
951         # Note: Relative time is always named timedelta, to stress that
952         # the (non-delta) time is absolute.
953         self.report_timedelta = 1.0  # In seconds. TODO: Configurable?
954         # Upper bound for being stuck in the same state, we should
955         # at least report something before continuing.
956         # Negotiate the hold timer by taking the smaller
957         # of the 2 values (mine and the peer's).
958         hold_timedelta = 180  # Not an attribute of self yet.
959         # TODO: Make the default value configurable,
960         # default value could mirror what peer said.
961         peer_hold_timedelta = get_short_int_from_message(msg_in, offset=22)
962         if hold_timedelta > peer_hold_timedelta:
963             hold_timedelta = peer_hold_timedelta
964         if hold_timedelta != 0 and hold_timedelta < 3:
965             logging.error("Invalid hold timedelta value: " + str(hold_timedelta))
966             raise ValueError("Invalid hold timedelta value: ", hold_timedelta)
967         self.hold_timedelta = hold_timedelta
968         # If we do not hear from peer this long, we assume it has died.
969         self.keepalive_timedelta = int(hold_timedelta / 3.0)
970         # Upper limit for duration between messages, to avoid being
971         # declared to be dead.
972         # The same as calling snapshot(), but also declares a field.
973         self.snapshot_time = time.time()
974         # Sometimes we need to store time. This is where to get
975         # the value from afterwards. Time_keepalive may be too strict.
976         self.peer_hold_time = self.snapshot_time + self.hold_timedelta
977         # At this time point, peer will be declared dead.
978         self.my_keepalive_time = None  # to be set later
979         # At this point, we should be sending keepalive message.
980
981     def snapshot(self):
982         """Store current time in instance data to use later."""
983         # Read as time before something interesting was called.
984         self.snapshot_time = time.time()
985
986     def reset_peer_hold_time(self):
987         """Move hold time to future as peer has just proven it still lives."""
988         self.peer_hold_time = time.time() + self.hold_timedelta
989
990     # Some methods could rely on self.snapshot_time, but it is better
991     # to require user to provide it explicitly.
992     def reset_my_keepalive_time(self, keepalive_time):
993         """Calculate and set the next my KEEP ALIVE timeout time
994
995         Arguments:
996             :keepalive_time: the initial value of the KEEP ALIVE timer
997         """
998         self.my_keepalive_time = keepalive_time + self.keepalive_timedelta
999
1000     def is_time_for_my_keepalive(self):
1001         """Check for my KEEP ALIVE timeout occurence"""
1002         if self.hold_timedelta == 0:
1003             return False
1004         return self.snapshot_time >= self.my_keepalive_time
1005
1006     def get_next_event_time(self):
1007         """Set the time of the next expected or to be sent KEEP ALIVE"""
1008         if self.hold_timedelta == 0:
1009             return self.snapshot_time + 86400
1010         return min(self.my_keepalive_time, self.peer_hold_time)
1011
1012     def check_peer_hold_time(self, snapshot_time):
1013         """Raise error if nothing was read from peer until specified time."""
1014         # Hold time = 0 means keepalive checking off.
1015         if self.hold_timedelta != 0:
1016             # time.time() may be too strict
1017             if snapshot_time > self.peer_hold_time:
1018                 logging.error("Peer has overstepped the hold timer.")
1019                 raise RuntimeError("Peer has overstepped the hold timer.")
1020                 # TODO: Include hold_timedelta?
1021                 # TODO: Add notification sending (attempt). That means
1022                 # move to write tracker.
1023
1024
1025 class ReadTracker(object):
1026     """Class for tracking read of mesages chunk by chunk and
1027     for idle waiting.
1028     """
1029
1030     def __init__(self, bgp_socket, timer):
1031         """The reader initialisation.
1032
1033         Arguments:
1034             bgp_socket: socket to be used for sending
1035             timer: timer to be used for scheduling
1036         """
1037         # References to outside objects.
1038         self.socket = bgp_socket
1039         self.timer = timer
1040         # BGP marker length plus length field length.
1041         self.header_length = 18
1042         # TODO: make it class (constant) attribute
1043         # Computation of where next chunk ends depends on whether
1044         # we are beyond length field.
1045         self.reading_header = True
1046         # Countdown towards next size computation.
1047         self.bytes_to_read = self.header_length
1048         # Incremental buffer for message under read.
1049         self.msg_in = ""
1050
1051     def read_message_chunk(self):
1052         """Read up to one message
1053
1054         Note:
1055             Currently it does not return anything.
1056         """
1057         # TODO: We could return the whole message, currently not needed.
1058         # We assume the socket is readable.
1059         chunk_message = self.socket.recv(self.bytes_to_read)
1060         self.msg_in += chunk_message
1061         self.bytes_to_read -= len(chunk_message)
1062         # TODO: bytes_to_read < 0 is not possible, right?
1063         if not self.bytes_to_read:
1064             # Finished reading a logical block.
1065             if self.reading_header:
1066                 # The logical block was a BGP header.
1067                 # Now we know the size of the message.
1068                 self.reading_header = False
1069                 self.bytes_to_read = get_short_int_from_message(self.msg_in)
1070             else:  # We have finished reading the body of the message.
1071                 # Peer has just proven it is still alive.
1072                 self.timer.reset_peer_hold_time()
1073                 # TODO: Do we want to count received messages?
1074                 # This version ignores the received message.
1075                 # TODO: Should we do validation and exit on anything
1076                 # besides update or keepalive?
1077                 # Prepare state for reading another message.
1078                 self.msg_in = ""
1079                 self.reading_header = True
1080                 self.bytes_to_read = self.header_length
1081         # We should not act upon peer_hold_time if we are reading
1082         # something right now.
1083         return
1084
1085     def wait_for_read(self):
1086         """Read message until timeout (next expected event).
1087
1088         Note:
1089             Used when no more updates has to be sent to avoid busy-wait.
1090             Currently it does not return anything.
1091         """
1092         # Compute time to the first predictable state change
1093         event_time = self.timer.get_next_event_time()
1094         # snapshot_time would be imprecise
1095         wait_timedelta = event_time - time.time()
1096         if wait_timedelta < 0:
1097             # The program got around to waiting to an event in "very near
1098             # future" so late that it became a "past" event, thus tell
1099             # "select" to not wait at all. Passing negative timedelta to
1100             # select() would lead to either waiting forever (for -1) or
1101             # select.error("Invalid parameter") (for everything else).
1102             wait_timedelta = 0
1103         # And wait for event or something to read.
1104         select.select([self.socket], [], [self.socket], wait_timedelta)
1105         # Not checking anything, that will be done in next iteration.
1106         return
1107
1108
1109 class WriteTracker(object):
1110     """Class tracking enqueueing messages and sending chunks of them.
1111     """
1112
1113     def __init__(self, bgp_socket, generator, timer):
1114         """The writter initialisation.
1115
1116         Arguments:
1117             bgp_socket: socket to be used for sending
1118             generator: generator to be used for message generation
1119             timer: timer to be used for scheduling
1120         """
1121         # References to outside objects,
1122         self.socket = bgp_socket
1123         self.generator = generator
1124         self.timer = timer
1125         # Really new fields.
1126         # TODO: Would attribute docstrings add anything substantial?
1127         self.sending_message = False
1128         self.bytes_to_send = 0
1129         self.msg_out = ""
1130
1131     def enqueue_message_for_sending(self, message):
1132         """Enqueue message and change state.
1133
1134         Arguments:
1135             message: message to be enqueued into the msg_out buffer
1136         """
1137         self.msg_out += message
1138         self.bytes_to_send += len(message)
1139         self.sending_message = True
1140
1141     def send_message_chunk_is_whole(self):
1142         """Send enqueued data from msg_out buffer
1143
1144         Returns:
1145             :return: true if no remaining data to send
1146         """
1147         # We assume there is a msg_out to send and socket is writable.
1148         # print "going to send", repr(self.msg_out)
1149         self.timer.snapshot()
1150         bytes_sent = self.socket.send(self.msg_out)
1151         # Forget the part of message that was sent.
1152         self.msg_out = self.msg_out[bytes_sent:]
1153         self.bytes_to_send -= bytes_sent
1154         if not self.bytes_to_send:
1155             # TODO: Is it possible to hit negative bytes_to_send?
1156             self.sending_message = False
1157             # We should have reset hold timer on peer side.
1158             self.timer.reset_my_keepalive_time(self.timer.snapshot_time)
1159             # The possible reason for not prioritizing reads is gone.
1160             return True
1161         return False
1162
1163
1164 class StateTracker(object):
1165     """Main loop has state so complex it warrants this separate class."""
1166
1167     def __init__(self, bgp_socket, generator, timer):
1168         """The state tracker initialisation.
1169
1170         Arguments:
1171             bgp_socket: socket to be used for sending / receiving
1172             generator: generator to be used for message generation
1173             timer: timer to be used for scheduling
1174         """
1175         # References to outside objects.
1176         self.socket = bgp_socket
1177         self.generator = generator
1178         self.timer = timer
1179         # Sub-trackers.
1180         self.reader = ReadTracker(bgp_socket, timer)
1181         self.writer = WriteTracker(bgp_socket, generator, timer)
1182         # Prioritization state.
1183         self.prioritize_writing = False
1184         # In general, we prioritize reading over writing. But in order
1185         # not to get blocked by neverending reads, we should
1186         # check whether we are not risking running out of holdtime.
1187         # So in some situations, this field is set to True to attempt
1188         # finishing sending a message, after which this field resets
1189         # back to False.
1190         # TODO: Alternative is to switch fairly between reading and
1191         # writing (called round robin from now on).
1192         # Message counting is done in generator.
1193
1194     def perform_one_loop_iteration(self):
1195         """ The main loop iteration
1196
1197         Notes:
1198             Calculates priority, resolves all conditions, calls
1199             appropriate method and returns to caller to repeat.
1200         """
1201         self.timer.snapshot()
1202         if not self.prioritize_writing:
1203             if self.timer.is_time_for_my_keepalive():
1204                 if not self.writer.sending_message:
1205                     # We need to schedule a keepalive ASAP.
1206                     self.writer.enqueue_message_for_sending(self.generator.keepalive_message())
1207                 # We are sending a message now, so let's prioritize it.
1208                 self.prioritize_writing = True
1209         # Now we know what our priorities are, we have to check
1210         # which actions are available.
1211         # socket.socket() returns three lists,
1212         # we store them to list of lists.
1213         list_list = select.select([self.socket], [self.socket], [self.socket],
1214                                   self.timer.report_timedelta)
1215         read_list, write_list, except_list = list_list
1216         # Lists are unpacked, each is either [] or [self.socket],
1217         # so we will test them as boolean.
1218         if except_list:
1219             logging.error("Exceptional state on the socket.")
1220             raise RuntimeError("Exceptional state on socket", self.socket)
1221         # We will do either read or write.
1222         if not (self.prioritize_writing and write_list):
1223             # Either we have no reason to rush writes,
1224             # or the socket is not writable.
1225             # We are focusing on reading here.
1226             if read_list:  # there is something to read indeed
1227                 # In this case we want to read chunk of message
1228                 # and repeat the select,
1229                 self.reader.read_message_chunk()
1230                 return
1231             # We were focusing on reading, but nothing to read was there.
1232             # Good time to check peer for hold timer.
1233             self.timer.check_peer_hold_time(self.timer.snapshot_time)
1234             # Quiet on the read front, we can have attempt to write.
1235         if write_list:
1236             # Either we really want to reset peer's view of our hold
1237             # timer, or there was nothing to read.
1238             # Were we in the middle of sending a message?
1239             if self.writer.sending_message:
1240                 # Was it the end of a message?
1241                 whole = self.writer.send_message_chunk_is_whole()
1242                 # We were pressed to send something and we did it.
1243                 if self.prioritize_writing and whole:
1244                     # We prioritize reading again.
1245                     self.prioritize_writing = False
1246                 return
1247             # Finally to check if still update messages to be generated.
1248             if self.generator.remaining_prefixes:
1249                 msg_out = self.generator.compose_update_message()
1250                 if not self.generator.remaining_prefixes:
1251                     # We have just finished update generation,
1252                     # end-of-rib is due.
1253                     logging.info("All update messages generated.")
1254                     logging.info("Storing performance results.")
1255                     self.generator.store_results()
1256                     logging.info("Finally an END-OF-RIB is going to be sent.")
1257                     msg_out += self.generator.update_message(wr_prefixes=[],
1258                                                              nlri_prefixes=[])
1259                 self.writer.enqueue_message_for_sending(msg_out)
1260                 # Attempt for real sending to be done in next iteration.
1261                 return
1262             # Nothing to write anymore, except occasional keepalives.
1263             logging.info("Everything has been done." +
1264                          "Now just waiting for possible incomming message.")
1265             # To avoid busy loop, we do idle waiting here.
1266             self.reader.wait_for_read()
1267             return
1268         # We can neither read nor write.
1269         logging.warning("Input and output both blocked for " +
1270                         str(self.timer.report_timedelta) + " seconds.")
1271         # FIXME: Are we sure select has been really waiting
1272         # the whole period?
1273         return
1274
1275
1276 def main():
1277     """ One time initialisation and iterations looping.
1278
1279     Notes:
1280         Establish BGP connection and run iterations.
1281     """
1282     arguments = parse_arguments()
1283     logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s",
1284                         level=arguments.loglevel)
1285     bgp_socket = establish_connection(arguments)
1286     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
1287     # Receive open message before sending anything.
1288     # FIXME: Add parameter to send default open message first,
1289     # to work with "you first" peers.
1290     msg_in = read_open_message(bgp_socket)
1291     timer = TimeTracker(msg_in)
1292     generator = MessageGenerator(arguments)
1293     msg_out = generator.open_message()
1294     logging.debug("Sending the OPEN message: " + binascii.hexlify(msg_out))
1295     # Send our open message to the peer.
1296     bgp_socket.send(msg_out)
1297     # Wait for confirming keepalive.
1298     # TODO: Surely in just one packet?
1299     # Using exact keepalive length to not to see possible updates.
1300     msg_in = bgp_socket.recv(19)
1301     if msg_in != generator.keepalive_message():
1302         logging.error("Open not confirmed by keepalive, instead got " +
1303                       binascii.hexlify(msg_in))
1304         raise MessageError("Open not confirmed by keepalive, instead got",
1305                            msg_in)
1306     timer.reset_peer_hold_time()
1307     # Send the keepalive to indicate the connection is accepted.
1308     timer.snapshot()  # Remember this time.
1309     msg_out = generator.keepalive_message()
1310     logging.debug("Sending a KEEP ALIVE message: " + binascii.hexlify(msg_out))
1311     bgp_socket.send(msg_out)
1312     # Use the remembered time.
1313     timer.reset_my_keepalive_time(timer.snapshot_time)
1314     # End of initial handshake phase.
1315     state = StateTracker(bgp_socket, generator, timer)
1316     while True:  # main reactor loop
1317         state.perform_one_loop_iteration()
1318
1319 if __name__ == "__main__":
1320     main()