import json
import os
+# pylint: disable=wrong-import-order
import sys
import re
import signal
import psutil
import requests
+# pylint: disable=import-error
import simulators
SIMS = simulators.SIMS
executable = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", KARAF_INSTALLDIR, "target", "assembly", "bin", "karaf")
- with open('odl.log', 'w') as outfile:
+ with open('odl.log', 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
["sh", executable, "server"], stdout=outfile, stderr=outfile, stdin=None)
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", "lighty", "target", "tpce",
"clean-start-controller.sh")
- with open(TPCE_LOG, 'w') as outfile:
+ with open(TPCE_LOG, 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
["sh", executable], stdout=outfile, stderr=outfile, stdin=None)
"tail-retention": "no"}}}
return post_request(URL_SERV_DELETE, attr)
+
def service_path_request(operation: str, servicename: str, wavenumber: str, nodes, centerfreq: str,
slotwidth: int, minfreq: float, maxfreq: float, lowerslotnumber: int,
higherslotnumber: int):
attr.update(other_attr)
return post_request(URL_PATH_COMPUTATION_REQUEST, {"input": attr})
+
def tapi_create_connectivity_request(topologyidorname):
return post_request(URL_TAPI_CREATE_CONNECTIVITY, topologyidorname)
+
def tapi_delete_connectivity_request(serviceidorname):
attr = {
"input": {
"service-id-or-name": serviceidorname}}
- return post_request(URL_TAPI_DELETE_CONNECTIVITY , attr)
+ return post_request(URL_TAPI_DELETE_CONNECTIVITY, attr)
+
def tapi_get_topology_details_request(topologyidorname):
attr = {
"input": {
"topology-id-or-name": topologyidorname}}
- return post_request(URL_TAPI_TOPOLOGY_DETAILS , attr)
+ return post_request(URL_TAPI_TOPOLOGY_DETAILS, attr)
+
def tapi_get_node_details_request(topologyidorname, nodeidorname):
attr = {
"node-id-or-name": nodeidorname}}
return post_request(URL_TAPI_NODE_DETAILS, attr)
+
def tapi_get_sip_details_request():
return post_request(URL_TAPI_SIP_LIST, "")
+
def tapi_get_service_list_request():
return post_request(URL_TAPI_SERVICE_LIST, "")
+
def shutdown_process(process):
if process is not None:
for child in psutil.Process(process.pid).children():
sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "openroadm", sim[1])
if os.path.isfile(executable):
- with open(log_file, 'w') as outfile:
+ with open(log_file, 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
[executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
stdout=outfile, stderr=outfile)
with TimeOut(seconds=time_to_wait):
while not os.path.exists(log_file):
time.sleep(0.2)
- filelogs = open(log_file, 'r')
+ filelogs = open(log_file, 'r', encoding='utf-8')
filelogs.seek(0, 2)
filefound = True
print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)