File play.py was reworked to use threads 11/30911/10
authorpavol.cibak@pantheon.sk <pavol.cibak@pantheon.sk>
Tue, 5 Jan 2016 09:09:50 +0000 (10:09 +0100)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 11 Jan 2016 09:13:20 +0000 (09:13 +0000)
File manage_play.py was deleted, robot files were updated.

Change-Id: I9f5e9cdb7821dd26aac276398369410a96b39d83
Signed-off-by: pavol.cibak@pantheon.sk <pavol.cibak@pantheon.sk>
csit/libraries/BGPSpeaker.robot
csit/suites/bgpcep/bgpingest/manypeers_changecount.robot
csit/suites/bgpcep/bgpingest/manypeers_prefixcount.robot
tools/fastbgp/manage_play.py [deleted file]
tools/fastbgp/play.py

index c593890e8b2858d01eff39336fa6dacf9e25090c..dc547cae1a1f475ee4ce5a93d9eb31788a132965 100644 (file)
@@ -40,7 +40,7 @@ Start_BGP_Speaker
 Start_BGP_Manager
     [Arguments]    ${arguments}
     [Documentation]    Start the BGP manager python utility. Redirect its error output to a log file.
-    ${command}=    BuiltIn.Set_Variable    python manage_play.py ${arguments} &> ${BGPSpeaker__OUTPUT_LOG}
+    ${command}=    BuiltIn.Set_Variable    python play.py ${arguments} &> ${BGPSpeaker__OUTPUT_LOG}
     BuiltIn.Log    ${command}
     ${output}=    SSHLibrary.Write    ${command}
 
index 711b0b1d6ddcd783e4cc3cd7554d4a61a877c998..71126b0e5a3901d2fcee0beaf6a4b7b87844dc33 100644 (file)
@@ -183,7 +183,6 @@ Setup_Everything
     # Both TODOs would probably need to update every suite relying on current Variables.
     SSHKeywords.Require_Python
     SSHLibrary.Put_File    ${CURDIR}/../../../../tools/fastbgp/play.py
-    SSHLibrary.Put_File    ${CURDIR}/../../../../tools/fastbgp/manage_play.py
     SSHKeywords.Assure_Library_Ipaddr    target_dir=.
     # Calculate the timeout value based on how many routes are going to be pushed
     ${period} =    DateTime.Convert_Time    ${CHECK_PERIOD_CHANGE_COUNT_MANY}    result_format=number
index eaec63d0725d1198ebac595e620f0e9d36f08d71..ae795679e8e19b7e6e31195045f720ca6c0741d4 100644 (file)
@@ -165,7 +165,6 @@ Setup_Everything
     SSHKeywords.Require_Python
     SSHKeywords.Assure_Library_Ipaddr    target_dir=.
     SSHLibrary.Put_File    ${CURDIR}/../../../../tools/fastbgp/play.py
-    SSHLibrary.Put_File    ${CURDIR}/../../../../tools/fastbgp/manage_play.py
     # Calculate the timeout value based on how many routes are going to be pushed.
     ${period} =    DateTime.Convert_Time    ${CHECK_PERIOD_PREFIX_COUNT_MANY}    result_format=number
     ${timeout} =    BuiltIn.Evaluate    ${TEST_DURATION_MULTIPLIER_PREFIX_COUNT_MANY} * (${COUNT_PREFIX_COUNT_MANY} * 3.0 / 10000 + ${period} * (${REPETITIONS_PREFIX_COUNT_MANY} + 1))
diff --git a/tools/fastbgp/manage_play.py b/tools/fastbgp/manage_play.py
deleted file mode 100644 (file)
index 1954a0d..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-"""Utility for running several play.py utilities in parallel.
-
-Only a subset of play.py functionality is fully supported.
-Notably, listening play.py is not tested yet.
-
-Only one hardcoded strategy is there to distinguish peers.
-File play.py is to be present in current working directory.
-
-It needs to be run with sudo-able user when you want to use ports below 1024
-as --myip.
-"""
-
-# Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
-#
-# This program and the accompanying materials are made available under the
-# terms of the Eclipse Public License v1.0 which accompanies this distribution,
-# and is available at http://www.eclipse.org/legal/epl-v10.html
-
-__author__ = "Vratko Polak"
-__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
-__license__ = "Eclipse Public License v1.0"
-__email__ = "vrpolak@cisco.com"
-
-
-import argparse
-import ipaddr
-import multiprocessing
-import subprocess
-
-
-def main():
-    """Use argparse to get arguments, return mgr_args object."""
-
-    parser = argparse.ArgumentParser()
-    # TODO: Should we use --argument-names-with-spaces?
-    str_help = "Autonomous System number use in the stream (current default as in ODL: 64496)."
-    parser.add_argument("--asnumber", default=64496, type=int, help=str_help)
-    # FIXME: We are acting as iBGP peer, we should mirror AS number from peer's open message.
-    str_help = "Amount of IP prefixes to generate. Negative number is taken as an overflown positive."
-    parser.add_argument("--amount", default="1", type=int, help=str_help)
-    str_help = "The first IPv4 prefix to announce, given as numeric IPv4 address."
-    parser.add_argument("--firstprefix", default="8.0.1.0", type=ipaddr.IPv4Address, help=str_help)
-    str_help = "If present, this tool will be listening for connection, instead of initiating it."
-    parser.add_argument("--listen", action="store_true", help=str_help)
-    str_help = "How many play.py utilities are to be started."
-    parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
-    str_help = "Numeric IP Address to bind to and derive BGP ID from, for the first player."
-    parser.add_argument("--myip", default="0.0.0.0", type=ipaddr.IPv4Address, help=str_help)
-    str_help = "TCP port to bind to when listening or initiating connection."
-    parser.add_argument("--myport", default="0", type=int, help=str_help)
-    str_help = "The IP of the next hop to be placed into the update messages."
-    parser.add_argument("--nexthop", default="192.0.2.1", type=ipaddr.IPv4Address, dest="nexthop", help=str_help)
-    str_help = "Numeric IP Address to try to connect to. Currently no effect in listening mode."
-    parser.add_argument("--peerip", default="127.0.0.2", type=ipaddr.IPv4Address, help=str_help)
-    str_help = "TCP port to try to connect to. No effect in listening mode."
-    parser.add_argument("--peerport", default="179", type=int, help=str_help)
-    # TODO: The step between IP prefixes is currently hardcoded to 16. Should we make it configurable?
-    # Yes, the argument list above is sorted alphabetically.
-    mgr_args = parser.parse_args()
-    # TODO: Are sanity checks (such as asnumber>=0) required?
-
-    if mgr_args.multiplicity < 1:
-        print "Multiplicity", mgr_args.multiplicity, "is not positive."
-        raise SystemExit(1)
-    amount_left = mgr_args.amount
-    utils_left = mgr_args.multiplicity
-    prefix_current = mgr_args.firstprefix
-    myip_current = mgr_args.myip
-    processes = []
-    while 1:
-        amount_per_util = (amount_left - 1) / utils_left + 1  # round up
-        amount_left -= amount_per_util
-        utils_left -= 1
-        util_args = ["python", "play.py"]
-        util_args.extend(["--asnumber", str(mgr_args.asnumber)])
-        util_args.extend(["--amount", str(amount_per_util)])
-        util_args.extend(["--firstprefix", str(prefix_current)])
-        if mgr_args.listen:
-            util_args.append("--listen")
-        util_args.extend(["--myip", str(myip_current)])
-        util_args.extend(["--myport", str(mgr_args.myport)])
-        util_args.extend(["--nexthop", str(mgr_args.nexthop)])
-        util_args.extend(["--peerip", str(mgr_args.peerip)])
-        util_args.extend(["--peerport", str(mgr_args.peerport)])
-        process = multiprocessing.Process(target=subprocess.call, args=[util_args])
-        # No shell, inherited std* file descriptors, same process group, so gets SIGINT.
-        processes.append(process)
-        if not utils_left:
-            break
-        prefix_current += amount_per_util * 16
-        myip_current += 1
-    for process in processes:
-        process.start()
-    # Usually processes run until SIGINT, but if an error happens we want to exit quickly.
-    for process in processes:
-        process.join()
-
-
-if __name__ == "__main__":
-    main()
index dcec923def4b9574f210fa7f7c2843a4b0e1a980..e79dfaf3d571e066d65b4510eef0e66fb348c9dc 100755 (executable)
@@ -25,6 +25,9 @@ import time
 import logging
 import struct
 
+import thread
+from copy import deepcopy
+
 
 def parse_arguments():
     """Use argparse to get arguments,
@@ -95,7 +98,12 @@ def parse_arguments():
     parser.add_argument("--threshold", default="1000", type=int, help=str_help)
     str_help = "RFC 4760 Multiprotocol Extensions for BGP-4 supported"
     parser.add_argument("--rfc4760", default="yes", type=str, help=str_help)
+    str_help = "How many play utilities are to be started."
+    parser.add_argument("--multiplicity", default="1", type=int, help=str_help)
     arguments = parser.parse_args()
+    if arguments.multiplicity < 1:
+        print "Multiplicity", arguments.multiplicity, "is not positive."
+        raise SystemExit(1)
     # TODO: Are sanity checks (such as asnumber>=0) required?
     return arguments
 
@@ -1531,22 +1539,37 @@ class StateTracker(object):
         return
 
 
-if __name__ == "__main__":
-    """ One time initialisation and iterations looping.
+def create_logger(loglevel, logfile):
+    """Create logger object
 
-    Notes:
-        Establish BGP connection and run iterations.
+    Arguments:
+        :loglevel: log level
+        :logfile: log file name
+    Returns:
+        :return: logger object
     """
-    arguments = parse_arguments()
     logger = logging.getLogger("logger")
     log_formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
     console_handler = logging.StreamHandler()
-    file_handler = logging.FileHandler(arguments.logfile, mode="w")
+    file_handler = logging.FileHandler(logfile, mode="w")
     console_handler.setFormatter(log_formatter)
     file_handler.setFormatter(log_formatter)
     logger.addHandler(console_handler)
     logger.addHandler(file_handler)
-    logger.setLevel(arguments.loglevel)
+    logger.setLevel(loglevel)
+    return logger
+
+
+def job(arguments):
+    """One time initialisation and iterations looping.
+    Notes:
+        Establish BGP connection and run iterations.
+
+    Arguments:
+        :arguments: Command line arguments
+    Returns:
+        :return: None
+    """
     bgp_socket = establish_connection(arguments)
     # Initial handshake phase. TODO: Can it be also moved to StateTracker?
     # Receive open message before sending anything.
@@ -1580,3 +1603,55 @@ if __name__ == "__main__":
     state = StateTracker(bgp_socket, generator, timer)
     while True:  # main reactor loop
         state.perform_one_loop_iteration()
+
+
+def threaded_job(arguments):
+    """Run the job threaded
+
+    Arguments:
+        :arguments: Command line arguments
+    Returns:
+        :return: None
+    """
+    amount_left = arguments.amount
+    utils_left = arguments.multiplicity
+    prefix_current = arguments.firstprefix
+    myip_current = arguments.myip
+    thread_args = []
+
+    while 1:
+        amount_per_util = (amount_left - 1) / utils_left + 1  # round up
+        amount_left -= amount_per_util
+        utils_left -= 1
+
+        args = deepcopy(arguments)
+        args.amount = amount_per_util
+        args.firstprefix = prefix_current
+        args.myip = myip_current
+        thread_args.append(args)
+
+        if not utils_left:
+            break
+        prefix_current += amount_per_util * 16
+        myip_current += 1
+
+    try:
+        # Create threads
+        for t in thread_args:
+            thread.start_new_thread(job, (t,))
+    except Exception:
+        print "Error: unable to start thread."
+        raise SystemExit(2)
+
+    # Work remains forever
+    while 1:
+        time.sleep(5)
+
+
+if __name__ == "__main__":
+    arguments = parse_arguments()
+    logger = create_logger(arguments.loglevel, arguments.logfile)
+    if arguments.multiplicity > 1:
+        threaded_job(arguments)
+    else:
+        job(arguments)