sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully'
cls.init_failed = False
os.environ['JAVA_MIN_MEM'] = '1024M'
os.environ['JAVA_MAX_MEM'] = '4096M'
- cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils.start_tpce()
# TAPI feature is not installed by default in Karaf
if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
print("installing tapi feature...")
- result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi")
+ result = test_utils.install_karaf_feature("odl-transportpce-tapi")
if result.returncode != 0:
cls.init_failed = True
print("Restarting OpenDaylight...")
- test_utils_rfc8040.shutdown_process(cls.processes[0])
- cls.processes[0] = test_utils_rfc8040.start_karaf()
- test_utils_rfc8040.process_list[0] = cls.processes[0]
- cls.init_failed = not test_utils_rfc8040.wait_until_log_contains(
- test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60)
+ test_utils.shutdown_process(cls.processes[0])
+ cls.processes[0] = test_utils.start_karaf()
+ test_utils.process_list[0] = cls.processes[0]
+ cls.init_failed = not test_utils.wait_until_log_contains(
+ test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
if cls.init_failed:
print("tapi installation feature failed...")
- test_utils_rfc8040.shutdown_process(cls.processes[0])
+ test_utils.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmb', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION),
- ('xpdrc', cls.NODE_VERSION),
- ('spdra', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmb', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION),
+ ('spdra', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self): # instruction executed before each test method
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_get_tapi_topology_T100G(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
'node should contain 1 node rule group')
def test_02_get_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
def test_03_connect_rdmb(self):
- response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_check_tapi_topos(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
def test_05_disconnect_roadmb(self):
- response = test_utils_rfc8040.unmount_device("ROADM-B1")
+ response = test_utils.unmount_device("ROADM-B1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_06_connect_xpdra(self):
- response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_07_check_tapi_topos(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
def test_08_connect_rdma(self):
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_09_connect_rdmc(self):
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_10_check_tapi_topos(self):
self.test_01_get_tapi_topology_T100G()
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
'node should contain 1 node rule group')
def test_11_connect_xprda_n1_to_roadma_pp1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_12_connect_roadma_pp1_to_xpdra_n1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_13_check_tapi_topology_T100G(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(1, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
def test_14_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
nodes = response["output"]["topology"]["node"]
'Topology should contain 1 oms link')
def test_15_connect_xpdrc(self):
- response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_16_connect_xprdc_n1_to_roadmc_pp1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_17_connect_roadmc_pp1_to_xpdrc_n1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_18_check_tapi_topology_T100G(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(2, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
def test_19_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
nodes = response["output"]["topology"]["node"]
'Topology should contain 2 oms links')
def test_20_connect_spdr_sa1(self):
- response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_21_connect_spdr_sc1(self):
- response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_22_check_tapi_topology_T100G(self):
self.test_18_check_tapi_topology_T100G()
self.test_19_check_tapi_topology_T0()
def test_24_connect_sprda_n1_to_roadma_pp2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
time.sleep(2)
def test_25_connect_roadma_pp2_to_spdra_n1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
time.sleep(2)
def test_26_connect_sprdc_n1_to_roadmc_pp2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
time.sleep(2)
def test_27_connect_roadmc_pp2_to_spdrc_n1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
self.test_18_check_tapi_topology_T100G()
def test_29_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
nodes = response["output"]["topology"]["node"]
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# Config ROADMC-ROADMA oms-attributes
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_31_create_OCH_OTU4_service(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_32_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
nodes = response["output"]["topology"]["node"]
del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-create',
self.cr_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_34_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
nodes = response["output"]["topology"]["node"]
self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
def test_35_connect_sprda_2_n2_to_roadma_pp3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
time.sleep(2)
def test_36_connect_roadma_pp3_to_spdra_2_n2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
time.sleep(2)
def test_37_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
nodes = response["output"]["topology"]["node"]
def test_38_delete_ODU4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
def test_39_delete_OCH_OTU4_service(self):
self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'org-openroadm-service', 'service-delete',
self.del_serv_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
time.sleep(self.WAITING)
def test_40_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
nodes = response["output"]["topology"]["node"]
'Topology should contain 0 otn link')
def test_41_disconnect_xponders_from_roadm(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- response = test_utils_rfc8040.del_ietf_network_link_request(
+ response = test_utils.del_ietf_network_link_request(
'openroadm-topology', link['link-id'], 'config')
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_42_check_tapi_topology_T0(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
'node name should be: ROADM-infra')
def test_43_get_tapi_topology_T100G(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
'Node should contain no owned-node-edge-points')
def test_44_disconnect_roadma(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_45_disconnect_roadmc(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_46_check_tapi_topos(self):
self.test_02_get_tapi_topology_T0()
def test_47_disconnect_xpdra(self):
- response = test_utils_rfc8040.unmount_device("XPDR-A1")
+ response = test_utils.unmount_device("XPDR-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_48_disconnect_xpdrc(self):
- response = test_utils_rfc8040.unmount_device("XPDR-C1")
+ response = test_utils.unmount_device("XPDR-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_49_disconnect_spdr_sa1(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ response = test_utils.unmount_device("SPDR-SA1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_50_disconnect_spdr_sc1(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ response = test_utils.unmount_device("SPDR-SC1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))