# terms of the Eclipse Public License v1.0 which accompanies this distribution,
# and is available at http://www.eclipse.org/legal/epl-v10.html
+from copy import deepcopy
+from SimpleXMLRPCServer import SimpleXMLRPCServer
import argparse
import binascii
import ipaddr
+import logging
+import Queue
import select
import socket
-import time
-import logging
import struct
-
import thread
-from copy import deepcopy
+import threading
+import time
__author__ = "Vratko Polak"
__email__ = "vrpolak@cisco.com"
+class SafeDict(dict):
+ '''Thread safe dictionary
+
+ The object will serve as thread safe data storage.
+ It should be used with "with" statement.
+ '''
+
+ def __init__(self, * p_arg, ** n_arg):
+ super(SafeDict, self).__init__()
+ self._lock = threading.Lock()
+
+ def __enter__(self):
+ self._lock.acquire()
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self._lock.release()
+
+
def parse_arguments():
"""Use argparse to get arguments,
parser.add_argument("-lsteaddrstep", default="1", type=int, help=str_help)
str_help = "How many play utilities are to be started."
parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
+ str_help = "Open message includes multiprotocol extension capability l2vpn-evpn.\
+Enabling this flag makes the script not decoding the update mesage, because of not\
+supported decoding for these elements."
+ parser.add_argument("--evpn", default=False, action="store_true", help=str_help)
+ parser.add_argument("--wfr", default=10, type=int, help="Wait for read timeout")
arguments = parser.parse_args()
if arguments.multiplicity < 1:
print "Multiplicity", arguments.multiplicity, "is not positive."
self.performance_threshold_default = args.threshold
self.rfc4760 = args.rfc4760
self.bgpls = args.bgpls
+ self.evpn = args.evpn
# Default values when BGP-LS Attributes are used
if self.bgpls:
self.prefix_count_to_add_default = 1
)
optional_parameters_hex += optional_parameter_hex
+ if self.evpn:
+ optional_parameter_hex = (
+ "\x02" # Param type ("Capability Ad")
+ "\x06" # Length (6 bytes)
+ "\x01" # Multiprotocol extetension capability,
+ "\x04" # Capability value length
+ "\x00\x19" # AFI (L2-VPN)
+ "\x00" # (reserved)
+ "\x46" # SAFI (EVPN)
+ )
+ optional_parameters_hex += optional_parameter_hex
+
optional_parameter_hex = (
"\x02" # Param type ("Capability Ad")
"\x06" # Length (6 bytes)
for idle waiting.
"""
- def __init__(self, bgp_socket, timer):
+ def __init__(self, bgp_socket, timer, storage, evpn=False, wait_for_read=10):
"""The reader initialisation.
Arguments:
bgp_socket: socket to be used for sending
timer: timer to be used for scheduling
+ storage: thread safe dict
+ evpn: flag that evpn functionality is tested
"""
# References to outside objects.
self.socket = bgp_socket
self.prefixes_withdrawn = 0
self.rx_idle_time = 0
self.rx_activity_detected = True
+ self.storage = storage
+ self.evpn = evpn
+ self.wfr = wait_for_read
def read_message_chunk(self):
"""Read up to one message
# message header - message type
msg_type_hex = msg[18:19]
msg_type = int(binascii.b2a_hex(msg_type_hex), 16)
+
+ with self.storage as stor:
+ # this will replace the previously stored message
+ stor['update'] = binascii.hexlify(msg)
+
+ logger.debug("Evpn {}".format(self.evpn))
+ if self.evpn:
+ logger.debug("Skipping update decoding due to evpn data expected")
+ return
+
if msg_type == 2:
logger.debug("Message type: 0x%s (update)",
binascii.b2a_hex(msg_type_hex))
# Compute time to the first predictable state change
event_time = self.timer.get_next_event_time()
# snapshot_time would be imprecise
- wait_timedelta = min(event_time - time.time(), 10)
+ wait_timedelta = min(event_time - time.time(), self.wfr)
if wait_timedelta < 0:
# The program got around to waiting to an event in "very near
# future" so late that it became a "past" event, thus tell
class StateTracker(object):
"""Main loop has state so complex it warrants this separate class."""
- def __init__(self, bgp_socket, generator, timer):
+ def __init__(self, bgp_socket, generator, timer, inqueue, storage, cliargs):
"""The state tracker initialisation.
Arguments:
bgp_socket: socket to be used for sending / receiving
generator: generator to be used for message generation
timer: timer to be used for scheduling
+ inqueue: user initiated messages queue
+ storage: thread safe dict to store data for the rpc server
+ cliargs: cli args from the user
"""
# References to outside objects.
self.socket = bgp_socket
self.generator = generator
self.timer = timer
# Sub-trackers.
- self.reader = ReadTracker(bgp_socket, timer)
+ self.reader = ReadTracker(bgp_socket, timer, storage, evpn=cliargs.evpn, wait_for_read=cliargs.wfr)
self.writer = WriteTracker(bgp_socket, generator, timer)
# Prioritization state.
self.prioritize_writing = False
# TODO: Alternative is to switch fairly between reading and
# writing (called round robin from now on).
# Message counting is done in generator.
+ self.inqueue = inqueue
def perform_one_loop_iteration(self):
""" The main loop iteration
logger.info("KEEP ALIVE is sent.")
# We are sending a message now, so let's prioritize it.
self.prioritize_writing = True
+
+ try:
+ msg = self.inqueue.get_nowait()
+ logger.info("Received message: {}".format(msg))
+ msgbin = binascii.unhexlify(msg)
+ self.writer.enqueue_message_for_sending(msgbin)
+ except Queue.Empty:
+ pass
# Now we know what our priorities are, we have to check
# which actions are available.
# socket.socket() returns three lists,
return logger
-def job(arguments):
+def job(arguments, inqueue, storage):
"""One time initialisation and iterations looping.
Notes:
Establish BGP connection and run iterations.
Arguments:
:arguments: Command line arguments
+ :inqueue: Data to be sent from play.py
+ :storage: Shared dict for rpc server
Returns:
:return: None
"""
# Use the remembered time.
timer.reset_my_keepalive_time(timer.snapshot_time)
# End of initial handshake phase.
- state = StateTracker(bgp_socket, generator, timer)
+ state = StateTracker(bgp_socket, generator, timer, inqueue, storage, arguments)
while True: # main reactor loop
state.perform_one_loop_iteration()
+class Rpcs:
+ '''Handler for SimpleXMLRPCServer'''
+ def __init__(self, sendqueue, storage):
+ '''Init method
+
+ Arguments:
+ :sendqueue: queue for data to be sent towards odl
+ :storage: thread safe dict
+ '''
+ self.queue = sendqueue
+ self.storage = storage
+
+ def send(self, text):
+ '''Data to be sent
+
+ Arguments:
+ :text: hes string of the data to be sent
+ '''
+ self.queue.put(text)
+
+ def get(self, text=''):
+ '''Reads data form the storage
+
+ - returns stored data or an empty string, at the moment only
+ 'update' is stored
+
+ Arguments:
+ :text: a key to the storage to get the data
+ Returns:
+ :data: stored data
+ '''
+ with self.storage as stor:
+ return stor.get(text, '')
+
+ def clean(self, text=''):
+ '''Cleans data form the storage
+
+ Arguments:
+ :text: a key to the storage to clean the data
+ '''
+ with self.storage as stor:
+ if text in stor:
+ del stor[text]
+
+
def threaded_job(arguments):
"""Run the job threaded
prefix_current = arguments.firstprefix
myip_current = arguments.myip
thread_args = []
+ rpcqueue = Queue.Queue()
+ storage = SafeDict()
while 1:
amount_per_util = (amount_left - 1) / utils_left + 1 # round up
try:
# Create threads
for t in thread_args:
- thread.start_new_thread(job, (t,))
+ thread.start_new_thread(job, (t, rpcqueue, storage))
except Exception:
print "Error: unable to start thread."
raise SystemExit(2)
- # Work remains forever
- while 1:
- time.sleep(5)
+ rpcserver = SimpleXMLRPCServer((arguments.myip.compressed, 8000), allow_none=True)
+ rpcserver.register_instance(Rpcs(rpcqueue, storage))
+ rpcserver.serve_forever()
if __name__ == "__main__":