def parse_arguments():
"""Return parsed form of command-line arguments."""
parser = argparse.ArgumentParser()
- parser.add_argument("--odladdress", default="127.0.0.1",
- help="IP address of ODL Restconf to be used")
- parser.add_argument("--restconfport", default="8181",
- help="Port on which ODL Restconf to be used")
- parser.add_argument("--restconfuser", default="admin",
- help="Username for ODL Restconf authentication")
- parser.add_argument("--restconfpassword", default="admin",
- help="Password for ODL Restconf authentication")
- parser.add_argument("--scope", default="sdn",
- help="Scope for ODL Restconf authentication")
- parser.add_argument("--deviceaddress", default="127.0.0.1",
- help="Common IP address for all available devices")
- parser.add_argument("--devices", default="1", type=int,
- help="Number of devices available for connecting")
- parser.add_argument("--deviceuser", default="admin",
- help="Username for netconf device authentication")
- parser.add_argument("--devicepassword", default="admin",
- help="Password for netconf device authentication")
- parser.add_argument("--startport", default="17830", type=int,
- help="Port number of first device")
+ parser.add_argument(
+ "--odladdress",
+ default="127.0.0.1",
+ help="IP address of ODL Restconf to be used",
+ )
+ parser.add_argument(
+ "--restconfport", default="8181", help="Port on which ODL Restconf to be used"
+ )
+ parser.add_argument(
+ "--restconfuser",
+ default="admin",
+ help="Username for ODL Restconf authentication",
+ )
+ parser.add_argument(
+ "--restconfpassword",
+ default="admin",
+ help="Password for ODL Restconf authentication",
+ )
+ parser.add_argument(
+ "--scope", default="sdn", help="Scope for ODL Restconf authentication"
+ )
+ parser.add_argument(
+ "--deviceaddress",
+ default="127.0.0.1",
+ help="Common IP address for all available devices",
+ )
+ parser.add_argument(
+ "--devices",
+ default="1",
+ type=int,
+ help="Number of devices available for connecting",
+ )
+ parser.add_argument(
+ "--deviceuser",
+ default="admin",
+ help="Username for netconf device authentication",
+ )
+ parser.add_argument(
+ "--devicepassword",
+ default="admin",
+ help="Password for netconf device authentication",
+ )
+ parser.add_argument(
+ "--startport", default="17830", type=int, help="Port number of first device"
+ )
# FIXME: There has to be a better name, "delay" evokes seconds, not number of connections.
- parser.add_argument("--disconndelay", default="0", type=int,
- help="Deconfigure oldest device if more than this devices were configured")
- parser.add_argument("--connsleep", default="0.0", type=float,
- help="Sleep this many seconds after configuration to allow operational update.")
- parser.add_argument("--basename", default="sim-device",
- help="Name of device without the generated suffixes")
- parser.add_argument("--reuse", default="True", type=str2bool,
- help="Should single requests session be re-used")
+ parser.add_argument(
+ "--disconndelay",
+ default="0",
+ type=int,
+ help="Deconfigure oldest device if more than this devices were configured",
+ )
+ parser.add_argument(
+ "--connsleep",
+ default="0.0",
+ type=float,
+ help="Sleep this many seconds after configuration to allow operational update.",
+ )
+ parser.add_argument(
+ "--basename",
+ default="sim-device",
+ help="Name of device without the generated suffixes",
+ )
+ parser.add_argument(
+ "--reuse",
+ default="True",
+ type=str2bool,
+ help="Should single requests session be re-used",
+ )
return parser.parse_args() # arguments are read
-DATA_TEMPLATE = string.Template('''{
+DATA_TEMPLATE = string.Template(
+ """{
"network-topology:node": {
"node-id": "$DEVICE_NAME",
"netconf-node-topology:host": "$DEVICE_IP",
"netconf-node-topology:tcp-only": "false",
"netconf-node-topology:keepalive-delay": 0
}
-}''')
+}"""
+)
def count_response(counter, response, method):
for key_tuple in counter:
short_counter[(key_tuple[0], key_tuple[1])] += counter[key_tuple]
short_list = sorted(short_counter.keys())
- short_text = ", ".join(["(" + item[0] + ":" + item[1] + ")x" + str(short_counter[item]) for item in short_list])
+ short_text = ", ".join(
+ [
+ "(" + item[0] + ":" + item[1] + ")x" + str(short_counter[item])
+ for item in short_list
+ ]
+ )
long_text = "\n".join([item[2] for item in sorted(counter.keys(), reverse=True)])
return short_text + "\nresponses:\n" + long_text
def main():
"""Top-level logic to execute."""
args = parse_arguments()
- uri_part = "config/network-topology:network-topology/topology/topology-netconf/node/"
+ uri_part = (
+ "config/network-topology:network-topology/topology/topology-netconf/node/"
+ )
put_headers = {"Content-Type": "application/json", "Accept": "application/json"}
delete_headers = {"Accept": "application/json"}
counter = collections.Counter()
- def handle_sigint(received_signal, frame): # This is a closure as it refers to the counter.
+ def handle_sigint(
+ received_signal, frame
+ ): # This is a closure as it refers to the counter.
"""Upon SIGINT, print counter contents and exit gracefully."""
signal.signal(signal.SIGINT, signal.SIG_DFL)
print(sorted_repr(counter))
signal.signal(signal.SIGINT, handle_sigint)
session = AuthStandalone.Init_Session(
- args.odladdress, args.restconfuser, args.restconfpassword, args.scope, args.reuse)
+ args.odladdress,
+ args.restconfuser,
+ args.restconfpassword,
+ args.scope,
+ args.reuse,
+ )
subst_dict = {}
subst_dict["DEVICE_IP"] = args.deviceaddress
subst_dict["DEVICE_USER"] = args.deviceuser
while port < wrap_port:
if len(delayed) > args.disconndelay:
delete_name = delayed.popleft()
- response = AuthStandalone.Delete_Using_Session(session, uri_part + delete_name, headers=delete_headers)
+ response = AuthStandalone.Delete_Using_Session(
+ session, uri_part + delete_name, headers=delete_headers
+ )
count_response(counter, response, "delete")
put_name = args.basename + "-" + str(port) + "-" + str(iteration)
subst_dict["DEVICE_NAME"] = put_name
subst_dict["DEVICE_PORT"] = str(port)
put_data = DATA_TEMPLATE.substitute(subst_dict)
uri = uri_part + put_name
- response = AuthStandalone.Put_Using_Session(session, uri, data=put_data, headers=put_headers)
+ response = AuthStandalone.Put_Using_Session(
+ session, uri, data=put_data, headers=put_headers
+ )
count_response(counter, response, "put")
delayed.append(put_name) # schedule for deconfiguration unconditionally
time.sleep(args.connsleep)