print("all processes killed")
def setUp(self):
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(10)
print("all processes killed")
def setUp(self):
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(10)
print("all processes killed")
def setUp(self):
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(1)
print("all processes killed")
def setUp(self): # instruction executed before each test method
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
# connect netconf devices
self.test_35_check_topo_ROADMA_DEG1()
def test_49_loop_create_eth_service(self):
+ # pylint: disable=consider-using-f-string
for i in range(1, 6):
print("iteration number {}".format(i))
print("eth service creation")
response = test_utils.service_delete_request("service1")
time.sleep(5)
+ # pylint: disable=consider-using-f-string
for i in range(1, 6):
print("iteration number {}".format(i))
print("oc service creation")
print("all processes killed")
def setUp(self):
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(10)
nbMappings = 0
for i in range(0, len(firstEntry)):
nodeId = firstEntry[i]['node-id']
+ # pylint: disable=consider-using-f-string
print("nodeId={}".format(nodeId))
nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
print("nodeMapId={}".format(nodeMapId))
tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
if tpType == 'XPONDER-CLIENT':
client += 1
+ # pylint: disable=consider-using-f-string
print("tpId = {}".format(tpId))
print("tp= {}".format(res['network'][0]['node'][i]
['ietf-network-topology:termination-point'][j]))
print("all processes killed")
def setUp(self):
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(10)
print("all processes killed")
def setUp(self):
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(1)
def setUp(self): # instruction executed before each test method
if self.init_failed:
self.fail('Feature installation failed')
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_get_tapi_topology_T100G(self):
print("all processes killed")
def setUp(self): # instruction executed before each test method
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):
def test_49_loop_create_eth_service(self):
for i in range(1, 6):
+ # pylint: disable=consider-using-f-string
print("iteration number {}".format(i))
print("eth service creation")
self.test_11_create_eth_service1()
time.sleep(5)
for i in range(1, 6):
+ # pylint: disable=consider-using-f-string
print("iteration number {}".format(i))
print("oc service creation")
self.test_36_create_oc_service1()
time.sleep(self.WAITING)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- global service_pm_uuid
- service_pm_uuid = res['output']['service']['uuid']
+ global SERVICE_PM_UUID
+ SERVICE_PM_UUID = res['output']['service']['uuid']
+ # pylint: disable=consider-using-f-string
print("photonic media service uuid : {}".format(res['output']['service']['uuid']))
input_dict_1 = {'administrative-state': 'LOCKED',
def test_16_get_service_PhotonicMedia(self):
response = test_utils.get_service_list_request(
- "services/" + str(service_pm_uuid))
+ "services/" + str(SERVICE_PM_UUID))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
res['services'][0]['administrative-state'], 'inService')
self.assertEqual(
- res['services'][0]['service-name'], service_pm_uuid)
+ res['services'][0]['service-name'], SERVICE_PM_UUID)
self.assertEqual(
res['services'][0]['connection-type'], 'infrastructure')
self.assertEqual(
time.sleep(self.WAITING)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- global service_odu_uuid
- service_odu_uuid = res['output']['service']['uuid']
+ global SERVICE_ODU_UUID
+ SERVICE_ODU_UUID = res['output']['service']['uuid']
+ # pylint: disable=consider-using-f-string
print("odu service uuid : {}".format(res['output']['service']['uuid']))
input_dict_1 = {'administrative-state': 'LOCKED',
def test_18_get_service_ODU(self):
response = test_utils.get_service_list_request(
- "services/" + str(service_odu_uuid))
+ "services/" + str(SERVICE_ODU_UUID))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
res['services'][0]['administrative-state'], 'inService')
self.assertEqual(
- res['services'][0]['service-name'], service_odu_uuid)
+ res['services'][0]['service-name'], SERVICE_ODU_UUID)
self.assertEqual(
res['services'][0]['connection-type'], 'infrastructure')
self.assertEqual(
time.sleep(self.WAITING)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- global service_dsr_uuid
- service_dsr_uuid = res['output']['service']['uuid']
+ global SERVICE_DSR_UUID
+ SERVICE_DSR_UUID = res['output']['service']['uuid']
+ # pylint: disable=consider-using-f-string
print("dsr service uuid : {}".format(res['output']['service']['uuid']))
input_dict_1 = {'administrative-state': 'LOCKED',
def test_20_get_service_DSR(self):
response = test_utils.get_service_list_request(
- "services/" + str(service_dsr_uuid))
+ "services/" + str(SERVICE_DSR_UUID))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
res['services'][0]['administrative-state'], 'inService')
self.assertEqual(
- res['services'][0]['service-name'], service_dsr_uuid)
+ res['services'][0]['service-name'], SERVICE_DSR_UUID)
self.assertEqual(
res['services'][0]['connection-type'], 'service')
self.assertEqual(
res = response.json()
liste_service = res['output']['service']
for ele in liste_service:
- if ele['uuid'] == service_pm_uuid:
+ if ele['uuid'] == SERVICE_PM_UUID:
self.assertEqual(ele['operational-state'], 'ENABLED')
self.assertEqual(ele['service-layer'], 'PHOTONIC_MEDIA')
nbconnection = len(ele['connection'])
self.assertEqual(nbconnection, 9, 'There should be 9 connections')
- elif ele['uuid'] == service_odu_uuid:
+ elif ele['uuid'] == SERVICE_ODU_UUID:
self.assertEqual(ele['operational-state'], 'ENABLED')
self.assertEqual(ele['service-layer'], 'ODU')
nbconnection = len(ele['connection'])
self.assertEqual(nbconnection, 3, 'There should be 3 connections')
- elif ele['uuid'] == service_dsr_uuid:
+ elif ele['uuid'] == SERVICE_DSR_UUID:
self.assertEqual(ele['operational-state'], 'ENABLED')
self.assertEqual(ele['service-layer'], 'DSR')
nbconnection = len(ele['connection'])
time.sleep(2)
def test_22_delete_connectivity_service_DSR(self):
- response = test_utils.tapi_delete_connectivity_request(service_dsr_uuid)
+ response = test_utils.tapi_delete_connectivity_request(SERVICE_DSR_UUID)
self.assertEqual(response.status_code, requests.codes.no_content)
time.sleep(self.WAITING)
def test_23_delete_connectivity_service_ODU(self):
- response = test_utils.tapi_delete_connectivity_request(service_odu_uuid)
+ response = test_utils.tapi_delete_connectivity_request(SERVICE_ODU_UUID)
self.assertEqual(response.status_code, requests.codes.no_content)
time.sleep(self.WAITING)
def test_24_delete_connectivity_service_PhotonicMedia(self):
- response = test_utils.tapi_delete_connectivity_request(service_pm_uuid)
+ response = test_utils.tapi_delete_connectivity_request(SERVICE_PM_UUID)
self.assertEqual(response.status_code, requests.codes.no_content)
time.sleep(self.WAITING)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
+ # pylint: disable=consider-using-f-string
for ele in liste_tp:
if ele['tp-id'] == 'XPDR2-NETWORK1':
self.assertEqual({'frequency': 196.1,
print("all processes killed")
def setUp(self):
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(10)
executable = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", KARAF_INSTALLDIR, "target", "assembly", "bin", "karaf")
- with open('odl.log', 'w') as outfile:
+ with open('odl.log', 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
["sh", executable, "server"], stdout=outfile, stderr=outfile, stdin=None)
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", "lighty", "target", "tpce",
"clean-start-controller.sh")
- with open(TPCE_LOG, 'w') as outfile:
+ with open(TPCE_LOG, 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
["sh", executable], stdout=outfile, stderr=outfile, stdin=None)
sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "openroadm", sim[1])
if os.path.isfile(executable):
- with open(log_file, 'w') as outfile:
+ with open(log_file, 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
[executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
stdout=outfile, stderr=outfile)
with TimeOut(seconds=time_to_wait):
while not os.path.exists(log_file):
time.sleep(0.2)
- filelogs = open(log_file, 'r')
+ filelogs = open(log_file, 'r', encoding='utf-8')
filelogs.seek(0, 2)
filefound = True
print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
executable = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", KARAF_INSTALLDIR, "target", "assembly", "bin", "karaf")
- with open('odl.log', 'w') as outfile:
+ with open('odl.log', 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
["sh", executable, "server"], stdout=outfile, stderr=outfile, stdin=None)
os.path.dirname(os.path.realpath(__file__)),
"..", "..", "..", "lighty", "target", "tpce",
"clean-start-controller.sh")
- with open(TPCE_LOG, 'w') as outfile:
+ with open(TPCE_LOG, 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
["sh", executable], stdout=outfile, stderr=outfile, stdin=None)
sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "openroadm", sim[1])
if os.path.isfile(executable):
- with open(log_file, 'w') as outfile:
+ with open(log_file, 'w', encoding='utf-8') as outfile:
return subprocess.Popen(
[executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
stdout=outfile, stderr=outfile)
with TimeOut(seconds=time_to_wait):
while not os.path.exists(log_file):
time.sleep(0.2)
- filelogs = open(log_file, 'r')
+ filelogs = open(log_file, 'r', encoding='utf-8')
filelogs.seek(0, 2)
filefound = True
print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
url = "{}/data/transportpce-portmapping:network/nodes={}/node-info"
response = get_request(url.format('{}', node))
res = response.json()
- key = u'transportpce-portmapping:node-info'
+ key = 'transportpce-portmapping:node-info'
if key in res.keys():
node_info = res[key]
else:
time.sleep(10)
def setUp(self): # instruction executed before each test method
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+# pylint: disable=invalid-name
# pylint: disable=no-member
# pylint: disable=too-many-public-methods
# pylint: disable=too-many-lines
sample_files_parsed = False
TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "honeynode-topo.xml")
- with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
+ with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
cls.simple_topo_bi_dir_data = topo_bi_dir.read()
TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "NW-simple-topology.xml")
- with open(TOPO_UNI_DIR_FILE, 'r') as topo_uni_dir:
+ with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
cls.simple_topo_uni_dir_data = topo_uni_dir.read()
TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "NW-for-test-5-4.xml")
- with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r') as topo_uni_dir_complex:
+ with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "pce_portmapping_121.json")
- with open(PORT_MAPPING_FILE, 'r') as port_mapping:
+ with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
cls.port_mapping_data = port_mapping.read()
sample_files_parsed = True
except PermissionError as err:
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+# pylint: disable=invalid-name
# pylint: disable=no-member
# pylint: disable=too-many-public-methods
TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs",
"honeynode-topo400G.json")
- with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
+ with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
cls.topo_bi_dir_data = topo_bi_dir.read()
OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs",
"honeynode-otntopo400G.json")
- with open(OTN_TOPO_BI_DIR_FILE, 'r') as otn_topo_bi_dir:
+ with open(OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as otn_topo_bi_dir:
cls.otn_topo_bi_dir_data = otn_topo_bi_dir.read()
OTUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs",
"honeynode-otntopo400GwithOTUC4.json")
- with open(OTUC4_OTN_TOPO_BI_DIR_FILE, 'r') as otuc4_otn_topo_bi_dir:
+ with open(OTUC4_OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as otuc4_otn_topo_bi_dir:
cls.otuc4_otn_topo_bi_dir_data = otuc4_otn_topo_bi_dir.read()
ODUC4_OTN_TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs",
"honeynode-otntopo400GwithODUC4.json")
- with open(ODUC4_OTN_TOPO_BI_DIR_FILE, 'r') as oduc4_otn_topo_bi_dir:
+ with open(ODUC4_OTN_TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as oduc4_otn_topo_bi_dir:
cls.oduc4_otn_topo_bi_dir_data = oduc4_otn_topo_bi_dir.read()
PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs",
"pce_portmapping_71.json")
- with open(PORT_MAPPING_FILE, 'r') as port_mapping:
+ with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
cls.port_mapping_data = port_mapping.read()
sample_files_parsed = True
except PermissionError as err:
print("all processes killed")
def setUp(self): # instruction executed before each test method
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
time.sleep(1)
sample_files_parsed = False
TOPO_CLLINET_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "gnpy", "clliNetwork.json")
- with open(TOPO_CLLINET_FILE, 'r') as topo_cllinet:
+ with open(TOPO_CLLINET_FILE, 'r', encoding='utf-8') as topo_cllinet:
cls.topo_cllinet_data = topo_cllinet.read()
TOPO_ORDNET_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "gnpy", "openroadmNetwork.json")
- with open(TOPO_ORDNET_FILE, 'r') as topo_ordnet:
+ with open(TOPO_ORDNET_FILE, 'r', encoding='utf-8') as topo_ordnet:
cls.topo_ordnet_data = topo_ordnet.read()
TOPO_ORDTOPO_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "gnpy", "openroadmTopology.json")
- with open(TOPO_ORDTOPO_FILE, 'r') as topo_ordtopo:
+ with open(TOPO_ORDTOPO_FILE, 'r', encoding='utf-8') as topo_ordtopo:
cls.topo_ordtopo_data = topo_ordtopo.read()
PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
"..", "..", "sample_configs", "gnpy", "gnpy_portmapping_121.json")
- with open(PORT_MAPPING_FILE, 'r') as port_mapping:
+ with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
cls.port_mapping_data = port_mapping.read()
sample_files_parsed = True
except PermissionError as err:
print("all processes killed")
def setUp(self): # instruction executed before each test method
+ # pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):