X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Ftransportpce_tests%2Ftapi%2Ftest01_abstracted_topology.py;fp=tests%2Ftransportpce_tests%2Ftapi%2Ftest01_abstracted_topology.py;h=9f33dd822f3c9825818f96411a212d06ae0d57c2;hb=90ecc9a5431eec82b8f4e7cb7d66cd29884e8f9a;hp=467dd19a72d4dd45fff15a23d6751ab4f4624ef7;hpb=3ec5306a2e0eb12c1ffc9b0e4b06e0f2fe1aa403;p=transportpce.git diff --git a/tests/transportpce_tests/tapi/test01_abstracted_topology.py b/tests/transportpce_tests/tapi/test01_abstracted_topology.py index 467dd19a7..9f33dd822 100644 --- a/tests/transportpce_tests/tapi/test01_abstracted_topology.py +++ b/tests/transportpce_tests/tapi/test01_abstracted_topology.py @@ -23,7 +23,7 @@ import requests sys.path.append('transportpce_tests/common/') # pylint: disable=wrong-import-position # pylint: disable=import-error -import test_utils_rfc8040 # nopep8 +import test_utils # nopep8 CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully' @@ -145,36 +145,36 @@ class TransportTapitesting(unittest.TestCase): cls.init_failed = False os.environ['JAVA_MIN_MEM'] = '1024M' os.environ['JAVA_MAX_MEM'] = '4096M' - cls.processes = test_utils_rfc8040.start_tpce() + cls.processes = test_utils.start_tpce() # TAPI feature is not installed by default in Karaf if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True': print("installing tapi feature...") - result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi") + result = test_utils.install_karaf_feature("odl-transportpce-tapi") if result.returncode != 0: cls.init_failed = True print("Restarting OpenDaylight...") - test_utils_rfc8040.shutdown_process(cls.processes[0]) - cls.processes[0] = test_utils_rfc8040.start_karaf() - test_utils_rfc8040.process_list[0] = cls.processes[0] - cls.init_failed = not test_utils_rfc8040.wait_until_log_contains( - test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60) + test_utils.shutdown_process(cls.processes[0]) + cls.processes[0] = test_utils.start_karaf() + test_utils.process_list[0] = cls.processes[0] + cls.init_failed = not test_utils.wait_until_log_contains( + test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60) if cls.init_failed: print("tapi installation feature failed...") - test_utils_rfc8040.shutdown_process(cls.processes[0]) + test_utils.shutdown_process(cls.processes[0]) sys.exit(2) - cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), - ('roadma', cls.NODE_VERSION), - ('roadmb', cls.NODE_VERSION), - ('roadmc', cls.NODE_VERSION), - ('xpdrc', cls.NODE_VERSION), - ('spdra', cls.NODE_VERSION), - ('spdrc', cls.NODE_VERSION)]) + cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), + ('roadma', cls.NODE_VERSION), + ('roadmb', cls.NODE_VERSION), + ('roadmc', cls.NODE_VERSION), + ('xpdrc', cls.NODE_VERSION), + ('spdra', cls.NODE_VERSION), + ('spdrc', cls.NODE_VERSION)]) @classmethod def tearDownClass(cls): # pylint: disable=not-an-iterable for process in cls.processes: - test_utils_rfc8040.shutdown_process(process) + test_utils.shutdown_process(process) print("all processes killed") def setUp(self): # instruction executed before each test method @@ -184,8 +184,8 @@ class TransportTapitesting(unittest.TestCase): print("execution of {}".format(self.id().split(".")[-1])) def test_01_get_tapi_topology_T100G(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T100GE + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') @@ -200,61 +200,61 @@ class TransportTapitesting(unittest.TestCase): 'node should contain 1 node rule group') def test_02_get_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node') self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') def test_03_connect_rdmb(self): - response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) + response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) def test_04_check_tapi_topos(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T100GE + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') def test_05_disconnect_roadmb(self): - response = test_utils_rfc8040.unmount_device("ROADM-B1") + response = test_utils.unmount_device("ROADM-B1") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_06_connect_xpdra(self): - response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) + response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) def test_07_check_tapi_topos(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node') self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') def test_08_connect_rdma(self): - response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) + response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) def test_09_connect_rdmc(self): - response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) + response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) def test_10_check_tapi_topos(self): self.test_01_get_tapi_topology_T100G() - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node') @@ -267,7 +267,7 @@ class TransportTapitesting(unittest.TestCase): 'node should contain 1 node rule group') def test_11_connect_xprda_n1_to_roadma_pp1(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) @@ -277,7 +277,7 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_12_connect_roadma_pp1_to_xpdra_n1(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-rdm-xpdr-links', {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) @@ -287,8 +287,8 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_13_check_tapi_topology_T100G(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T100GE + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(1, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]), @@ -298,8 +298,8 @@ class TransportTapitesting(unittest.TestCase): 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1') def test_14_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) nodes = response["output"]["topology"]["node"] @@ -316,11 +316,11 @@ class TransportTapitesting(unittest.TestCase): 'Topology should contain 1 oms link') def test_15_connect_xpdrc(self): - response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) + response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) def test_16_connect_xprdc_n1_to_roadmc_pp1(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) @@ -330,7 +330,7 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_17_connect_roadmc_pp1_to_xpdrc_n1(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-rdm-xpdr-links', {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) @@ -340,8 +340,8 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_18_check_tapi_topology_T100G(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T100GE + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(2, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]), @@ -354,8 +354,8 @@ class TransportTapitesting(unittest.TestCase): 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1') def test_19_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) nodes = response["output"]["topology"]["node"] @@ -372,14 +372,14 @@ class TransportTapitesting(unittest.TestCase): 'Topology should contain 2 oms links') def test_20_connect_spdr_sa1(self): - response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) + response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) self.assertEqual(response.status_code, - requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) + requests.codes.created, test_utils.CODE_SHOULD_BE_201) def test_21_connect_spdr_sc1(self): - response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION)) + response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION)) self.assertEqual(response.status_code, - requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) + requests.codes.created, test_utils.CODE_SHOULD_BE_201) def test_22_check_tapi_topology_T100G(self): self.test_18_check_tapi_topology_T100G() @@ -388,7 +388,7 @@ class TransportTapitesting(unittest.TestCase): self.test_19_check_tapi_topology_T0() def test_24_connect_sprda_n1_to_roadma_pp2(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) @@ -398,7 +398,7 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_25_connect_roadma_pp2_to_spdra_n1(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-rdm-xpdr-links', {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) @@ -408,7 +408,7 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_26_connect_sprdc_n1_to_roadmc_pp2(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) @@ -418,7 +418,7 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_27_connect_roadmc_pp2_to_spdrc_n1(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-rdm-xpdr-links', {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1', 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) @@ -431,8 +431,8 @@ class TransportTapitesting(unittest.TestCase): self.test_18_check_tapi_topology_T100G() def test_29_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) nodes = response["output"]["topology"]["node"] @@ -460,7 +460,7 @@ class TransportTapitesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils_rfc8040.add_oms_attr_request( + response = test_utils.add_oms_attr_request( "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) # Config ROADMC-ROADMA oms-attributes @@ -474,12 +474,12 @@ class TransportTapitesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils_rfc8040.add_oms_attr_request( + response = test_utils.add_oms_attr_request( "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) def test_31_create_OCH_OTU4_service(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'org-openroadm-service', 'service-create', self.cr_serv_input_data) self.assertEqual(response['status_code'], requests.codes.ok) @@ -488,8 +488,8 @@ class TransportTapitesting(unittest.TestCase): time.sleep(self.WAITING) def test_32_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) nodes = response["output"]["topology"]["node"] @@ -520,7 +520,7 @@ class TransportTapitesting(unittest.TestCase): del self.cr_serv_input_data["service-z-end"]["otu-service-rate"] self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'org-openroadm-service', 'service-create', self.cr_serv_input_data) self.assertEqual(response['status_code'], requests.codes.ok) @@ -529,8 +529,8 @@ class TransportTapitesting(unittest.TestCase): time.sleep(self.WAITING) def test_34_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) nodes = response["output"]["topology"]["node"] @@ -556,7 +556,7 @@ class TransportTapitesting(unittest.TestCase): self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps') def test_35_connect_sprda_2_n2_to_roadma_pp3(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2', 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}}) @@ -566,7 +566,7 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_36_connect_roadma_pp3_to_spdra_2_n2(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-rdm-xpdr-links', {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2', 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}}) @@ -576,8 +576,8 @@ class TransportTapitesting(unittest.TestCase): time.sleep(2) def test_37_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) nodes = response["output"]["topology"]["node"] @@ -597,7 +597,7 @@ class TransportTapitesting(unittest.TestCase): def test_38_delete_ODU4_service(self): self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4" - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'org-openroadm-service', 'service-delete', self.del_serv_input_data) self.assertEqual(response['status_code'], requests.codes.ok) @@ -607,7 +607,7 @@ class TransportTapitesting(unittest.TestCase): def test_39_delete_OCH_OTU4_service(self): self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4" - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( 'org-openroadm-service', 'service-delete', self.del_serv_input_data) self.assertEqual(response['status_code'], requests.codes.ok) @@ -616,8 +616,8 @@ class TransportTapitesting(unittest.TestCase): time.sleep(self.WAITING) def test_40_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) nodes = response["output"]["topology"]["node"] @@ -628,18 +628,18 @@ class TransportTapitesting(unittest.TestCase): 'Topology should contain 0 otn link') def test_41_disconnect_xponders_from_roadm(self): - response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + response = test_utils.get_ietf_network_request('openroadm-topology', 'config') self.assertEqual(response['status_code'], requests.codes.ok) links = response['network'][0]['ietf-network-topology:link'] for link in links: if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'): - response = test_utils_rfc8040.del_ietf_network_link_request( + response = test_utils.del_ietf_network_link_request( 'openroadm-topology', link['link-id'], 'config') self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_42_check_tapi_topology_T0(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node') @@ -648,8 +648,8 @@ class TransportTapitesting(unittest.TestCase): 'node name should be: ROADM-infra') def test_43_get_tapi_topology_T100G(self): - self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE - response = test_utils_rfc8040.transportpce_api_rpc_request( + self.tapi_topo["topology-id-or-name"] = test_utils.T100GE + response = test_utils.transportpce_api_rpc_request( 'tapi-topology', 'get-topology-details', self.tapi_topo) self.assertEqual(response['status_code'], requests.codes.ok) self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') @@ -658,11 +658,11 @@ class TransportTapitesting(unittest.TestCase): 'Node should contain no owned-node-edge-points') def test_44_disconnect_roadma(self): - response = test_utils_rfc8040.unmount_device("ROADM-A1") + response = test_utils.unmount_device("ROADM-A1") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_45_disconnect_roadmc(self): - response = test_utils_rfc8040.unmount_device("ROADM-C1") + response = test_utils.unmount_device("ROADM-C1") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_46_check_tapi_topos(self): @@ -670,19 +670,19 @@ class TransportTapitesting(unittest.TestCase): self.test_02_get_tapi_topology_T0() def test_47_disconnect_xpdra(self): - response = test_utils_rfc8040.unmount_device("XPDR-A1") + response = test_utils.unmount_device("XPDR-A1") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_48_disconnect_xpdrc(self): - response = test_utils_rfc8040.unmount_device("XPDR-C1") + response = test_utils.unmount_device("XPDR-C1") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_49_disconnect_spdr_sa1(self): - response = test_utils_rfc8040.unmount_device("SPDR-SA1") + response = test_utils.unmount_device("SPDR-SA1") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_50_disconnect_spdr_sc1(self): - response = test_utils_rfc8040.unmount_device("SPDR-SC1") + response = test_utils.unmount_device("SPDR-SC1") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))