From 27444f6cbb1711c988dc255736b26613f6b3114c Mon Sep 17 00:00:00 2001 From: Gilles Thouenon Date: Thu, 25 Aug 2022 11:15:03 +0200 Subject: [PATCH] Migrate TAPI functional tests to RFC8040 JIRA: TRNSPRTPCE-567 Signed-off-by: Gilles Thouenon Change-Id: I168b6ce08c11433f82067bbe613f73797a185591 --- .../common/test_utils_rfc8040.py | 8 +- .../tapi/test01_abstracted_topology.py | 636 ++++++++---------- .../tapi/test02_full_topology.py | 457 ++++++------- ...test03_tapi_device_change_notifications.py | 573 ++++++++-------- 4 files changed, 781 insertions(+), 893 deletions(-) diff --git a/tests/transportpce_tests/common/test_utils_rfc8040.py b/tests/transportpce_tests/common/test_utils_rfc8040.py index 576797315..63ab18735 100644 --- a/tests/transportpce_tests/common/test_utils_rfc8040.py +++ b/tests/transportpce_tests/common/test_utils_rfc8040.py @@ -44,6 +44,9 @@ REQUEST_TIMEOUT = 10 CODE_SHOULD_BE_200 = 'Http status code should be 200' CODE_SHOULD_BE_201 = 'Http status code should be 201' +T100GE = 'Transponder 100GE' +T0_MULTILAYER_TOPO = 'T0 - Multi-layer topology' +T0_FULL_MULTILAYER_TOPO = 'T0 - Full Multi-layer topology' SIM_LOG_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log') @@ -632,6 +635,9 @@ def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict): res = response.json() return_key = {'rfc8040': api_module + ':output', 'draft-bierman02': 'output'} - return_output = res[return_key[RESTCONF_VERSION]] + if response.status_code == requests.codes.internal_server_error: + return_output = res + else: + return_output = res[return_key[RESTCONF_VERSION]] return {'status_code': response.status_code, 'output': return_output} diff --git a/tests/transportpce_tests/tapi/test01_abstracted_topology.py b/tests/transportpce_tests/tapi/test01_abstracted_topology.py index f3960b1ab..467dd19a7 100644 --- a/tests/transportpce_tests/tapi/test01_abstracted_topology.py +++ b/tests/transportpce_tests/tapi/test01_abstracted_topology.py @@ -23,7 +23,7 @@ import requests sys.path.append('transportpce_tests/common/') # pylint: disable=wrong-import-position # pylint: disable=import-error -import test_utils # nopep8 +import test_utils_rfc8040 # nopep8 CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully' @@ -34,7 +34,7 @@ class TransportTapitesting(unittest.TestCase): processes = None WAITING = 20 NODE_VERSION = '2.2.1' - cr_serv_sample_data = {"input": { + cr_serv_input_data = { "sdnc-request-header": { "request-id": "request-1", "rpc-action": "service-create", @@ -126,43 +126,55 @@ class TransportTapitesting(unittest.TestCase): "due-date": "2018-06-15T00:00:01Z", "operator-contact": "pw1234" } + + del_serv_input_data = { + "sdnc-request-header": { + "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58", + "rpc-action": "service-delete", + "request-system-id": "appname", + "notification-url": "http://localhost:8585/NotificationServer/notify"}, + "service-delete-req-info": { + "service-name": "TBD", + "tail-retention": "no"} } + tapi_topo = {"topology-id-or-name": "TBD"} + @classmethod def setUpClass(cls): cls.init_failed = False os.environ['JAVA_MIN_MEM'] = '1024M' os.environ['JAVA_MAX_MEM'] = '4096M' - cls.processes = test_utils.start_tpce() + cls.processes = test_utils_rfc8040.start_tpce() # TAPI feature is not installed by default in Karaf if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True': print("installing tapi feature...") - result = test_utils.install_karaf_feature("odl-transportpce-tapi") + result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi") if result.returncode != 0: cls.init_failed = True print("Restarting OpenDaylight...") - test_utils.shutdown_process(cls.processes[0]) - cls.processes[0] = test_utils.start_karaf() - test_utils.process_list[0] = cls.processes[0] - cls.init_failed = not test_utils.wait_until_log_contains( - test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60) + test_utils_rfc8040.shutdown_process(cls.processes[0]) + cls.processes[0] = test_utils_rfc8040.start_karaf() + test_utils_rfc8040.process_list[0] = cls.processes[0] + cls.init_failed = not test_utils_rfc8040.wait_until_log_contains( + test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60) if cls.init_failed: print("tapi installation feature failed...") - test_utils.shutdown_process(cls.processes[0]) + test_utils_rfc8040.shutdown_process(cls.processes[0]) sys.exit(2) - cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), - ('roadma', cls.NODE_VERSION), - ('roadmb', cls.NODE_VERSION), - ('roadmc', cls.NODE_VERSION), - ('xpdrc', cls.NODE_VERSION), - ('spdra', cls.NODE_VERSION), - ('spdrc', cls.NODE_VERSION)]) + cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), + ('roadma', cls.NODE_VERSION), + ('roadmb', cls.NODE_VERSION), + ('roadmc', cls.NODE_VERSION), + ('xpdrc', cls.NODE_VERSION), + ('spdra', cls.NODE_VERSION), + ('spdrc', cls.NODE_VERSION)]) @classmethod def tearDownClass(cls): # pylint: disable=not-an-iterable for process in cls.processes: - test_utils.shutdown_process(process) + test_utils_rfc8040.shutdown_process(process) print("all processes killed") def setUp(self): # instruction executed before each test method @@ -172,169 +184,126 @@ class TransportTapitesting(unittest.TestCase): print("execution of {}".format(self.id().split(".")[-1])) def test_01_get_tapi_topology_T100G(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "Transponder 100GE" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') - self.assertNotIn("owned-node-edge-point", res["output"]["topology"]["node"][0], + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') + self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0], 'Node should contain no owned-node-edge-points') - self.assertEqual("Tpdr100g over WDM node", res["output"]["topology"]["node"][0]["name"][0]["value"], + self.assertEqual("Tpdr100g over WDM node", response["output"]["topology"]["node"][0]["name"][0]["value"], 'node name should be: Tpdr100g over WDM node') - self.assertIn("ETH", res["output"]["topology"]["node"][0]["layer-protocol-name"], + self.assertIn("ETH", response["output"]["topology"]["node"][0]["layer-protocol-name"], 'Node layer protocol should contain ETH') - self.assertEqual(1, len(res["output"]["topology"]["node"][0]["node-rule-group"]), + self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]), 'node should contain 1 node rule group') def test_02_get_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertNotIn("node", res["output"]["topology"], 'Topology should contain no node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') def test_03_connect_rdmb(self): - response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - time.sleep(10) + response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_04_check_tapi_topos(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "Transponder 100GE" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') - - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') + + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') def test_05_disconnect_roadmb(self): - response = test_utils.unmount_device("ROADM-B1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - time.sleep(5) + response = test_utils_rfc8040.unmount_device("ROADM-B1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_06_connect_xpdra(self): - response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - time.sleep(10) + response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_07_check_tapi_topos(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertNotIn("node", res["output"]["topology"], 'Topology should contain no node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') def test_08_connect_rdma(self): - response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - time.sleep(10) + response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_09_connect_rdmc(self): - response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - time.sleep(10) + response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_10_check_tapi_topos(self): self.test_01_get_tapi_topology_T100G() - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertEqual(1, len(res["output"]["topology"]["node"]), 'Topology should contain 1 node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') - self.assertEqual("ROADM-infra", res["output"]["topology"]["node"][0]["name"][0]["value"], + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') + self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"], 'node name should be: ROADM-infra') - self.assertIn("PHOTONIC_MEDIA", res["output"]["topology"]["node"][0]["layer-protocol-name"], + self.assertIn("PHOTONIC_MEDIA", response["output"]["topology"]["node"][0]["layer-protocol-name"], 'Node layer protocol should contain PHOTONIC_MEDIA') - self.assertEqual(1, len(res["output"]["topology"]["node"][0]["node-rule-group"]), + self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]), 'node should contain 1 node rule group') def test_11_connect_xprda_n1_to_roadma_pp1(self): - response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1", - "ROADM-A1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_12_connect_roadma_pp1_to_xpdra_n1(self): - response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1", - "ROADM-A1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_13_check_tapi_topology_T100G(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "Transponder 100GE" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertEqual(1, len(res["output"]["topology"]["node"][0]["owned-node-edge-point"]), + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(1, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]), 'Node should contain 1 owned-node-edge-points') self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1", - res["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"], + response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"], 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1') def test_14_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - links = res["output"]["topology"]["link"] + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + nodes = response["output"]["topology"]["node"] + links = response["output"]["topology"]["link"] self.assertEqual(3, len(nodes), 'Topology should contain 3 nodes') self.assertEqual(2, len(links), 'Topology should contain 2 links') self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"), @@ -347,59 +316,50 @@ class TransportTapitesting(unittest.TestCase): 'Topology should contain 1 oms link') def test_15_connect_xpdrc(self): - response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - time.sleep(10) + response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_16_connect_xprdc_n1_to_roadmc_pp1(self): - response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1", - "ROADM-C1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_17_connect_roadmc_pp1_to_xpdrc_n1(self): - response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1", - "ROADM-C1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_18_check_tapi_topology_T100G(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "Transponder 100GE" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertEqual(2, len(res["output"]["topology"]["node"][0]["owned-node-edge-point"]), + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(2, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]), 'Node should contain 2 owned-node-edge-points') self.assertEqual("XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1", - res["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"], + response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"], 'name of owned-node-edge-points should be XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1') self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1", - res["output"]["topology"]["node"][0]["owned-node-edge-point"][1]["name"][0]["value"], + response["output"]["topology"]["node"][0]["owned-node-edge-point"][1]["name"][0]["value"], 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1') def test_19_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - links = res["output"]["topology"]["link"] + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + nodes = response["output"]["topology"]["node"] + links = response["output"]["topology"]["link"] self.assertEqual(5, len(nodes), 'Topology should contain 5 nodes') self.assertEqual(4, len(links), 'Topology should contain 4 links') self.assertEqual(3, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"), @@ -412,16 +372,14 @@ class TransportTapitesting(unittest.TestCase): 'Topology should contain 2 oms links') def test_20_connect_spdr_sa1(self): - response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - time.sleep(10) - # TODO replace connect and disconnect timers with test_utils.wait_until_log_contains + response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_21_connect_spdr_sc1(self): - response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - time.sleep(10) - # TODO replace connect and disconnect timers with test_utils.wait_until_log_contains + response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION)) + self.assertEqual(response.status_code, + requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_22_check_tapi_topology_T100G(self): self.test_18_check_tapi_topology_T100G() @@ -430,38 +388,42 @@ class TransportTapitesting(unittest.TestCase): self.test_19_check_tapi_topology_T0() def test_24_connect_sprda_n1_to_roadma_pp2(self): - response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1", - "ROADM-A1", "1", "SRG1-PP2-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_25_connect_roadma_pp2_to_spdra_n1(self): - response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1", - "ROADM-A1", "1", "SRG1-PP2-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_26_connect_sprdc_n1_to_roadmc_pp2(self): - response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1", - "ROADM-C1", "1", "SRG1-PP2-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_27_connect_roadmc_pp2_to_spdrc_n1(self): - response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1", - "ROADM-C1", "1", "SRG1-PP2-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) @@ -469,17 +431,12 @@ class TransportTapitesting(unittest.TestCase): self.test_18_check_tapi_topology_T100G() def test_29_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - links = res["output"]["topology"]["link"] + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + nodes = response["output"]["topology"]["node"] + links = response["output"]["topology"]["link"] self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes') self.assertEqual(8, len(links), 'Topology should contain 8 links') self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"), @@ -503,7 +460,8 @@ class TransportTapitesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data) + response = test_utils_rfc8040.add_oms_attr_request( + "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) # Config ROADMC-ROADMA oms-attributes data = {"span": { @@ -516,29 +474,26 @@ class TransportTapitesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data) + response = test_utils_rfc8040.add_oms_attr_request( + "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) def test_31_create_OCH_OTU4_service(self): - response = test_utils.service_create_request(self.cr_serv_sample_data) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'org-openroadm-service', 'service-create', + self.cr_serv_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('PCE calculation in progress', - res['output']['configuration-response-common']['response-message']) + response['output']['configuration-response-common']['response-message']) time.sleep(self.WAITING) def test_32_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - links = res["output"]["topology"]["link"] + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + nodes = response["output"]["topology"]["node"] + links = response["output"]["topology"]["link"] self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes') self.assertEqual(9, len(links), 'Topology should contain 9 links') self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"), @@ -549,41 +504,37 @@ class TransportTapitesting(unittest.TestCase): 'Topology should contain 1 otn link') for link in links: if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1": - self.assertEqual(100000, link["available-capacity"]["total-size"]["value"], + self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]), 'OTU4 link should have an available capacity of 100 000 Mbps') elif link["name"][0]["value-name"] == "transitional link name": - self.assertEqual(100, link["available-capacity"]["total-size"]["value"], + self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]), 'link should have an available capacity of 100 Gbps') self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps') def test_33_create_ODU4_service(self): - self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4" - self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU" - del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] - self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" - self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU" - del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] - self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" - - response = test_utils.service_create_request(self.cr_serv_sample_data) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + self.cr_serv_input_data["service-name"] = "service1-ODU4" + self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU" + del self.cr_serv_input_data["service-a-end"]["otu-service-rate"] + self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" + self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU" + del self.cr_serv_input_data["service-z-end"]["otu-service-rate"] + self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4" + + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'org-openroadm-service', 'service-create', + self.cr_serv_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('PCE calculation in progress', - res['output']['configuration-response-common']['response-message']) + response['output']['configuration-response-common']['response-message']) time.sleep(self.WAITING) def test_34_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - links = res["output"]["topology"]["link"] + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + nodes = response["output"]["topology"]["node"] + links = response["output"]["topology"]["link"] self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes') self.assertEqual(10, len(links), 'Topology should contain 10 links') self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"), @@ -597,43 +548,40 @@ class TransportTapitesting(unittest.TestCase): self.assertEqual(0, link["available-capacity"]["total-size"]["value"], 'OTU4 link should have an available capacity of 0 Mbps') elif link["name"][0]["value"] == "ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1": - self.assertEqual(100000, link["available-capacity"]["total-size"]["value"], + self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]), 'ODU4 link should have an available capacity of 100 000 Mbps') elif link["name"][0]["value-name"] == "transitional link name": - self.assertEqual(100, link["available-capacity"]["total-size"]["value"], + self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]), 'link should have an available capacity of 100 Gbps') self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps') def test_35_connect_sprda_2_n2_to_roadma_pp3(self): - response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "2", - "ROADM-A1", "1", "SRG1-PP3-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_36_connect_roadma_pp3_to_spdra_2_n2(self): - response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "2", - "ROADM-A1", "1", "SRG1-PP3-TXRX") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', res["output"]["result"], + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"], CREATED_SUCCESSFULLY) time.sleep(2) def test_37_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - links = res["output"]["topology"]["link"] + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + nodes = response["output"]["topology"]["node"] + links = response["output"]["topology"]["link"] self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes') self.assertEqual(12, len(links), 'Topology should contain 12 links') self.assertEqual(6, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"), @@ -648,120 +596,94 @@ class TransportTapitesting(unittest.TestCase): 'Topology should contain 2 otn links') def test_38_delete_ODU4_service(self): - response = test_utils.service_delete_request("service1-ODU4") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'org-openroadm-service', 'service-delete', + self.del_serv_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Renderer service delete in progress', - res['output']['configuration-response-common']['response-message']) - time.sleep(20) + response['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) def test_39_delete_OCH_OTU4_service(self): - response = test_utils.service_delete_request("service1-OCH-OTU4") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'org-openroadm-service', 'service-delete', + self.del_serv_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Renderer service delete in progress', - res['output']['configuration-response-common']['response-message']) - time.sleep(20) + response['output']['configuration-response-common']['response-message']) + time.sleep(self.WAITING) def test_40_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - links = res["output"]["topology"]["link"] + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + nodes = response["output"]["topology"]["node"] + links = response["output"]["topology"]["link"] self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes') self.assertEqual(10, len(links), 'Topology should contain 10 links') self.assertEqual(0, count_object_with_double_key(links, "name", "value-name", "otn link name"), 'Topology should contain 0 otn link') def test_41_disconnect_xponders_from_roadm(self): - url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/" - response = test_utils.get_ordm_topo_request("") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - links = res['network'][0]['ietf-network-topology:link'] + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + links = response['network'][0]['ietf-network-topology:link'] for link in links: if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'): - link_name = link["link-id"] - response = test_utils.delete_request(url+link_name) - self.assertEqual(response.status_code, requests.codes.ok) + response = test_utils_rfc8040.del_ietf_network_link_request( + 'openroadm-topology', link['link-id'], 'config') + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_42_check_tapi_topology_T0(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - nodes = res["output"]["topology"]["node"] - self.assertEqual(1, len(nodes), 'Topology should contain 1 node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') - self.assertEqual("ROADM-infra", res["output"]["topology"]["node"][0]["name"][0]["value"], + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') + self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"], 'node name should be: ROADM-infra') def test_43_get_tapi_topology_T100G(self): - url = "{}/operations/tapi-topology:get-topology-details" - data = { - "tapi-topology:input": { - "tapi-topology:topology-id-or-name": "Transponder 100GE" - } - } - response = test_utils.post_request(url, data) - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - res = response.json() - self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') - self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link') - self.assertNotIn("owned-node-edge-point", res["output"]["topology"]["node"][0], + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node') + self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link') + self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0], 'Node should contain no owned-node-edge-points') def test_44_disconnect_roadma(self): - response = test_utils.unmount_device("ROADM-A1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - time.sleep(5) + response = test_utils_rfc8040.unmount_device("ROADM-A1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_45_disconnect_roadmc(self): - response = test_utils.unmount_device("ROADM-C1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - time.sleep(5) + response = test_utils_rfc8040.unmount_device("ROADM-C1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_46_check_tapi_topos(self): self.test_01_get_tapi_topology_T100G() self.test_02_get_tapi_topology_T0() def test_47_disconnect_xpdra(self): - response = test_utils.unmount_device("XPDR-A1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - time.sleep(5) + response = test_utils_rfc8040.unmount_device("XPDR-A1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_48_disconnect_xpdrc(self): - response = test_utils.unmount_device("XPDR-C1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - time.sleep(5) + response = test_utils_rfc8040.unmount_device("XPDR-C1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_49_disconnect_spdr_sa1(self): - response = test_utils.unmount_device("SPDR-SA1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - time.sleep(5) + response = test_utils_rfc8040.unmount_device("SPDR-SA1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_50_disconnect_spdr_sc1(self): - response = test_utils.unmount_device("SPDR-SC1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) - - -def find_object_with_key(list_dicts, key, value): - for dict_ in list_dicts: - if dict_[key] == value: - return dict_ - return None + response = test_utils_rfc8040.unmount_device("SPDR-SC1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def count_object_with_double_key(list_dicts, key1, key2, value): diff --git a/tests/transportpce_tests/tapi/test02_full_topology.py b/tests/transportpce_tests/tapi/test02_full_topology.py index 93d1eb5dd..03502fd53 100644 --- a/tests/transportpce_tests/tapi/test02_full_topology.py +++ b/tests/transportpce_tests/tapi/test02_full_topology.py @@ -21,7 +21,7 @@ import sys sys.path.append('transportpce_tests/common/') # pylint: disable=wrong-import-position # pylint: disable=import-error -import test_utils # nopep8 +import test_utils_rfc8040 # nopep8 # pylint: disable=too-few-public-methods @@ -40,58 +40,61 @@ class TransportPCEtesting(unittest.TestCase): NODE_VERSION = '2.2.1' uuid_services = UuidServices() - cr_serv_sample_data = { - "input": { - "end-point": [ - { - "layer-protocol-name": "PHOTONIC_MEDIA", - "service-interface-point": { - "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107" - }, - "administrative-state": "UNLOCKED", - "operational-state": "ENABLED", - "direction": "BIDIRECTIONAL", - "role": "SYMMETRIC", - "protection-role": "WORK", - "local-id": "SPDR-SA1-XPDR1", - "name": [ + cr_serv_input_data = { + "end-point": [ + { + "layer-protocol-name": "PHOTONIC_MEDIA", + "service-interface-point": { + "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107" + }, + "administrative-state": "UNLOCKED", + "operational-state": "ENABLED", + "direction": "BIDIRECTIONAL", + "role": "SYMMETRIC", + "protection-role": "WORK", + "local-id": "SPDR-SA1-XPDR1", + "name": [ { "value-name": "OpenROADM node id", "value": "SPDR-SA1-XPDR1" } - ] + ] + }, + { + "layer-protocol-name": "PHOTONIC_MEDIA", + "service-interface-point": { + "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8" }, - { - "layer-protocol-name": "PHOTONIC_MEDIA", - "service-interface-point": { - "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8" - }, - "administrative-state": "UNLOCKED", - "operational-state": "ENABLED", - "direction": "BIDIRECTIONAL", - "role": "SYMMETRIC", - "protection-role": "WORK", - "local-id": "SPDR-SC1-XPDR1", - "name": [ + "administrative-state": "UNLOCKED", + "operational-state": "ENABLED", + "direction": "BIDIRECTIONAL", + "role": "SYMMETRIC", + "protection-role": "WORK", + "local-id": "SPDR-SC1-XPDR1", + "name": [ { "value-name": "OpenROADM node id", "value": "SPDR-SC1-XPDR1" } - ] - } - ], - "connectivity-constraint": { - "service-layer": "PHOTONIC_MEDIA", - "service-type": "POINT_TO_POINT_CONNECTIVITY", - "service-level": "Some service-level", - "requested-capacity": { - "total-size": { - "value": "100", - "unit": "GB" - } + ] + } + ], + "connectivity-constraint": { + "service-layer": "PHOTONIC_MEDIA", + "service-type": "POINT_TO_POINT_CONNECTIVITY", + "service-level": "Some service-level", + "requested-capacity": { + "total-size": { + "value": "100", + "unit": "GB" } - }, - "state": "Some state"}} + } + }, + "state": "Some state"} + + del_serv_input_data = {"service-id-or-name": "TBD"} + + tapi_topo = {"topology-id-or-name": "TBD"} @classmethod def setUpClass(cls): @@ -99,33 +102,33 @@ class TransportPCEtesting(unittest.TestCase): cls.init_failed = False os.environ['JAVA_MIN_MEM'] = '1024M' os.environ['JAVA_MAX_MEM'] = '4096M' - cls.processes = test_utils.start_tpce() + cls.processes = test_utils_rfc8040.start_tpce() # TAPI feature is not installed by default in Karaf if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True': print("installing tapi feature...") - result = test_utils.install_karaf_feature("odl-transportpce-tapi") + result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi") if result.returncode != 0: cls.init_failed = True print("Restarting OpenDaylight...") - test_utils.shutdown_process(cls.processes[0]) - cls.processes[0] = test_utils.start_karaf() - test_utils.process_list[0] = cls.processes[0] - cls.init_failed = not test_utils.wait_until_log_contains( - test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60) + test_utils_rfc8040.shutdown_process(cls.processes[0]) + cls.processes[0] = test_utils_rfc8040.start_karaf() + test_utils_rfc8040.process_list[0] = cls.processes[0] + cls.init_failed = not test_utils_rfc8040.wait_until_log_contains( + test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60) if cls.init_failed: print("tapi installation feature failed...") - test_utils.shutdown_process(cls.processes[0]) + test_utils_rfc8040.shutdown_process(cls.processes[0]) sys.exit(2) - cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION), - ('roadma', cls.NODE_VERSION), - ('roadmc', cls.NODE_VERSION), - ('spdrc', cls.NODE_VERSION)]) + cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION), + ('roadma', cls.NODE_VERSION), + ('roadmc', cls.NODE_VERSION), + ('spdrc', cls.NODE_VERSION)]) @classmethod def tearDownClass(cls): # pylint: disable=not-an-iterable for process in cls.processes: - test_utils.shutdown_process(process) + test_utils_rfc8040.shutdown_process(process) print("all processes killed") def setUp(self): @@ -133,66 +136,62 @@ class TransportPCEtesting(unittest.TestCase): def test_01_connect_spdrA(self): print("Connecting SPDRA") - response = test_utils.mount_tapi_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) - self.assertEqual(response.status_code, - requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) time.sleep(2) def test_02_connect_spdrC(self): print("Connecting SPDRC") - response = test_utils.mount_tapi_device("SPDR-SC1", ('spdrc', self.NODE_VERSION)) - self.assertEqual(response.status_code, - requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) time.sleep(2) def test_03_connect_rdmA(self): print("Connecting ROADMA") - response = test_utils.mount_tapi_device("ROADM-A1", ('roadma', self.NODE_VERSION)) - self.assertEqual(response.status_code, - requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) time.sleep(2) def test_04_connect_rdmC(self): print("Connecting ROADMC") - response = test_utils.mount_tapi_device("ROADM-C1", ('roadmc', self.NODE_VERSION)) - self.assertEqual(response.status_code, - requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) time.sleep(2) def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self): - response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1", - "ROADM-A1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', - res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"]) time.sleep(2) def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self): - response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1", - "ROADM-A1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', - res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"]) time.sleep(2) def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self): - response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1", - "ROADM-C1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', - res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"]) time.sleep(2) def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self): - response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1", - "ROADM-C1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', - res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"]) time.sleep(2) def test_09_add_omsAttributes_ROADMA_ROADMC(self): @@ -207,10 +206,9 @@ class TransportPCEtesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils.add_oms_attr_request( + response = test_utils_rfc8040.add_oms_attr_request( "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) - time.sleep(2) def test_10_add_omsAttributes_ROADMC_ROADMA(self): # Config ROADMC-ROADMA oms-attributes @@ -224,57 +222,44 @@ class TransportPCEtesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils.add_oms_attr_request( + response = test_utils_rfc8040.add_oms_attr_request( "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) - time.sleep(2) def test_11_check_otn_topology(self): - response = test_utils.get_otn_topo_request() - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nbNode = len(res['network'][0]['node']) - self.assertEqual(nbNode, 6, 'There should be 6 otn nodes') - self.assertNotIn('ietf-network-topology:link', res['network'][0]) - time.sleep(2) + response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(len(response['network'][0]['node']), 6, 'There should be 6 otn nodes') + self.assertNotIn('ietf-network-topology:link', response['network'][0]) def test_12_check_openroadm_topology(self): - response = test_utils.get_ordm_topo_request("") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nbNode = len(res['network'][0]['node']) - nbLink = len(res['network'][0]['ietf-network-topology:link']) - self.assertEqual(nbNode, 13, 'There should be 13 openroadm nodes') - self.assertEqual(nbLink, 22, 'There should be 22 openroadm links') - time.sleep(2) + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(len(response['network'][0]['node']), 13, 'There should be 13 openroadm nodes') + self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 22, + 'There should be 22 openroadm links') def test_13_get_tapi_topology_details(self): - response = test_utils.tapi_get_topology_details_request( - "T0 - Full Multi-layer topology") - time.sleep(2) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nbNode = len(res['output']['topology']['node']) - nbLink = len(res['output']['topology']['link']) - self.assertEqual(nbNode, 14, 'There should be 14 TAPI nodes') - self.assertEqual(nbLink, 15, 'There should be 15 TAPI links') + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) time.sleep(2) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(len(response['output']['topology']['node']), 14, 'There should be 14 TAPI nodes') + self.assertEqual(len(response['output']['topology']['link']), 15, 'There should be 15 TAPI links') def test_14_check_sip_details(self): - response = test_utils.tapi_get_sip_details_request() - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nbSip = len(res['output']['sip']) - self.assertEqual(nbSip, 60, 'There should be 60 service interface point') - time.sleep(2) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-common', 'get-service-interface-point-list', None) + self.assertEqual(len(response['output']['sip']), 60, 'There should be 60 service interface point') # test create connectivity service from spdrA to spdrC for Photonic_media def test_15_create_connectivity_service_PhotonicMedia(self): - response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data) time.sleep(self.WAITING) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.uuid_services.pm = res['output']['service']['uuid'] + self.assertEqual(response['status_code'], requests.codes.ok) + self.uuid_services.pm = response['output']['service']['uuid'] # pylint: disable=consider-using-f-string print("photonic media service uuid : {}".format(self.uuid_services.pm)) @@ -290,45 +275,39 @@ class TransportPCEtesting(unittest.TestCase): input_dict_3 = {'value-name': 'OpenROADM node id', 'value': 'SPDR-SA1-XPDR1'} - self.assertDictEqual(dict(input_dict_1, **res['output']['service']), - res['output']['service']) - self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]), - res['output']['service']['end-point'][0]['name'][0]) - self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]), - res['output']['service']['end-point'][1]['name'][0]) + self.assertDictEqual(dict(input_dict_1, **response['output']['service']), + response['output']['service']) + self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]), + response['output']['service']['end-point'][0]['name'][0]) + self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]), + response['output']['service']['end-point'][1]['name'][0]) # If the gate fails is because of the waiting time not being enough - time.sleep(self.WAITING) +# time.sleep(self.WAITING) def test_16_get_service_PhotonicMedia(self): - response = test_utils.get_service_list_request( - "services/" + str(self.uuid_services.pm)) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['services'][0]['administrative-state'], 'inService') - self.assertEqual( - res['services'][0]['service-name'], self.uuid_services.pm) - self.assertEqual( - res['services'][0]['connection-type'], 'infrastructure') - self.assertEqual( - res['services'][0]['lifecycle-state'], 'planned') - time.sleep(2) + response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.pm)) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['services'][0]['administrative-state'], 'inService') + self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.pm)) + self.assertEqual(response['services'][0]['connection-type'], 'infrastructure') + self.assertEqual(response['services'][0]['lifecycle-state'], 'planned') + time.sleep(1) # test create connectivity service from spdrA to spdrC for odu def test_17_create_connectivity_service_ODU(self): # pylint: disable=line-too-long - self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "ODU" - self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "5efda776-f8de-3e0b-9bbd-2c702e210946" - self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "ODU" - self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "8116d0af-39fa-3df5-bed2-dd2cd5e8217d" - self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "ODU" - self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-level"] = self.uuid_services.pm - - response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data) + self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "ODU" + self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "5efda776-f8de-3e0b-9bbd-2c702e210946" + self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "ODU" + self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "8116d0af-39fa-3df5-bed2-dd2cd5e8217d" + self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "ODU" + self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.pm + + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data) time.sleep(self.WAITING) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.uuid_services.odu = res['output']['service']['uuid'] + self.assertEqual(response['status_code'], requests.codes.ok) + self.uuid_services.odu = response['output']['service']['uuid'] # pylint: disable=consider-using-f-string print("odu service uuid : {}".format(self.uuid_services.odu)) @@ -344,46 +323,40 @@ class TransportPCEtesting(unittest.TestCase): input_dict_3 = {'value-name': 'OpenROADM node id', 'value': 'SPDR-SA1-XPDR1'} - self.assertDictEqual(dict(input_dict_1, **res['output']['service']), - res['output']['service']) - self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]), - res['output']['service']['end-point'][0]['name'][0]) - self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]), - res['output']['service']['end-point'][1]['name'][0]) + self.assertDictEqual(dict(input_dict_1, **response['output']['service']), + response['output']['service']) + self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]), + response['output']['service']['end-point'][0]['name'][0]) + self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]), + response['output']['service']['end-point'][1]['name'][0]) # If the gate fails is because of the waiting time not being enough - time.sleep(self.WAITING) +# time.sleep(self.WAITING) def test_18_get_service_ODU(self): - response = test_utils.get_service_list_request( - "services/" + str(self.uuid_services.odu)) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['services'][0]['administrative-state'], 'inService') - self.assertEqual( - res['services'][0]['service-name'], self.uuid_services.odu) - self.assertEqual( - res['services'][0]['connection-type'], 'infrastructure') - self.assertEqual( - res['services'][0]['lifecycle-state'], 'planned') - time.sleep(2) + response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.odu)) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['services'][0]['administrative-state'], 'inService') + self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.odu)) + self.assertEqual(response['services'][0]['connection-type'], 'infrastructure') + self.assertEqual(response['services'][0]['lifecycle-state'], 'planned') + time.sleep(1) # test create connectivity service from spdrA to spdrC for dsr def test_19_create_connectivity_service_DSR(self): # pylint: disable=line-too-long - self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "DSR" - self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7" - self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "DSR" - self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22" - self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "DSR" - self.cr_serv_sample_data["input"]["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10" - self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-level"] = self.uuid_services.odu - - response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data) + self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "DSR" + self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7" + self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "DSR" + self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22" + self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "DSR" + self.cr_serv_input_data["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10" + self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.odu + + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data) time.sleep(self.WAITING) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.uuid_services.dsr = res['output']['service']['uuid'] + self.assertEqual(response['status_code'], requests.codes.ok) + self.uuid_services.dsr = response['output']['service']['uuid'] # pylint: disable=consider-using-f-string print("dsr service uuid : {}".format(self.uuid_services.dsr)) @@ -400,37 +373,31 @@ class TransportPCEtesting(unittest.TestCase): 'value': 'SPDR-SA1-XPDR1'} self.assertDictEqual(dict(input_dict_1, - **res['output']['service']), - res['output']['service']) + **response['output']['service']), + response['output']['service']) self.assertDictEqual(dict(input_dict_2, - **res['output']['service']['end-point'][0]['name'][0]), - res['output']['service']['end-point'][0]['name'][0]) + **response['output']['service']['end-point'][0]['name'][0]), + response['output']['service']['end-point'][0]['name'][0]) self.assertDictEqual(dict(input_dict_3, - **res['output']['service']['end-point'][1]['name'][0]), - res['output']['service']['end-point'][1]['name'][0]) + **response['output']['service']['end-point'][1]['name'][0]), + response['output']['service']['end-point'][1]['name'][0]) # The sleep here is okey as the DSR service creation is very fast - time.sleep(self.WAITING) +# time.sleep(self.WAITING) def test_20_get_service_DSR(self): - response = test_utils.get_service_list_request( - "services/" + str(self.uuid_services.dsr)) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['services'][0]['administrative-state'], 'inService') - self.assertEqual( - res['services'][0]['service-name'], self.uuid_services.dsr) - self.assertEqual( - res['services'][0]['connection-type'], 'service') - self.assertEqual( - res['services'][0]['lifecycle-state'], 'planned') - time.sleep(2) + response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.dsr)) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['services'][0]['administrative-state'], 'inService') + self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.dsr)) + self.assertEqual(response['services'][0]['connection-type'], 'service') + self.assertEqual(response['services'][0]['lifecycle-state'], 'planned') + time.sleep(1) def test_21_get_connectivity_service_list(self): - response = test_utils.tapi_get_service_list_request() - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - liste_service = res['output']['service'] + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'get-connectivity-service-list', None) + self.assertEqual(response['status_code'], requests.codes.ok) + liste_service = response['output']['service'] for ele in liste_service: if ele['uuid'] == self.uuid_services.pm: self.assertEqual(ele['operational-state'], 'ENABLED') @@ -452,59 +419,55 @@ class TransportPCEtesting(unittest.TestCase): time.sleep(2) def test_22_delete_connectivity_service_DSR(self): - response = test_utils.tapi_delete_connectivity_request(self.uuid_services.dsr) - self.assertEqual(response.status_code, requests.codes.no_content) + self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.dsr) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data) + self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content)) time.sleep(self.WAITING) def test_23_delete_connectivity_service_ODU(self): - response = test_utils.tapi_delete_connectivity_request(self.uuid_services.odu) - self.assertEqual(response.status_code, requests.codes.no_content) + self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.odu) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data) + self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content)) time.sleep(self.WAITING) def test_24_delete_connectivity_service_PhotonicMedia(self): - response = test_utils.tapi_delete_connectivity_request(self.uuid_services.pm) - self.assertEqual(response.status_code, requests.codes.no_content) + self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.pm) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data) + self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content)) time.sleep(self.WAITING) def test_25_get_no_tapi_services(self): - response = test_utils.tapi_get_service_list_request() - res = response.json() + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'get-connectivity-service-list', None) + self.assertEqual(response['status_code'], requests.codes.internal_server_error) self.assertIn( {"error-type": "rpc", "error-tag": "operation-failed", "error-message": "No services exist in datastore", "error-info": "error"}, - res['errors']['error']) - time.sleep(2) + response['output']['errors']['error']) def test_26_get_no_openroadm_services(self): - response = test_utils.get_service_list_request("") - self.assertEqual(response.status_code, requests.codes.conflict) - res = response.json() - self.assertIn( - {"error-type": "application", "error-tag": "data-missing", - "error-message": "Request could not be completed because the relevant data model content does not exist"}, - res['errors']['error']) - time.sleep(2) + response = test_utils_rfc8040.get_ordm_serv_list_request() + self.assertEqual(response['status_code'], requests.codes.conflict) def test_27_disconnect_spdrA(self): - response = test_utils.unmount_device("SPDR-SA1") - self.assertEqual(response.status_code, requests.codes.ok, - test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("SPDR-SA1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_28_disconnect_spdrC(self): - response = test_utils.unmount_device("SPDR-SC1") - self.assertEqual(response.status_code, requests.codes.ok, - test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("SPDR-SC1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_29_disconnect_roadmA(self): - response = test_utils.unmount_device("ROADM-A1") - self.assertEqual(response.status_code, requests.codes.ok, - test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("ROADM-A1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_30_disconnect_roadmC(self): - response = test_utils.unmount_device("ROADM-C1") - self.assertEqual(response.status_code, requests.codes.ok, - test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("ROADM-C1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) if __name__ == "__main__": diff --git a/tests/transportpce_tests/tapi/test03_tapi_device_change_notifications.py b/tests/transportpce_tests/tapi/test03_tapi_device_change_notifications.py index 3e105a2f9..69de9452f 100644 --- a/tests/transportpce_tests/tapi/test03_tapi_device_change_notifications.py +++ b/tests/transportpce_tests/tapi/test03_tapi_device_change_notifications.py @@ -20,7 +20,7 @@ import sys sys.path.append('transportpce_tests/common/') # pylint: disable=wrong-import-position # pylint: disable=import-error -import test_utils # nopep8 +import test_utils_rfc8040 # nopep8 # pylint: disable=too-few-public-methods @@ -36,61 +36,70 @@ class UuidServices: class TransportPCEFulltesting(unittest.TestCase): - cr_serv_sample_data = { - "input": { - "end-point": [ - { - "layer-protocol-name": "DSR", - "service-interface-point": { - "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df" - }, - "administrative-state": "UNLOCKED", - "operational-state": "ENABLED", - "direction": "BIDIRECTIONAL", - "role": "SYMMETRIC", - "protection-role": "WORK", - "local-id": "XPDR-C1-XPDR1", - "name": [ + cr_serv_input_data = { + "end-point": [ + { + "layer-protocol-name": "DSR", + "service-interface-point": { + "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df" + }, + "administrative-state": "UNLOCKED", + "operational-state": "ENABLED", + "direction": "BIDIRECTIONAL", + "role": "SYMMETRIC", + "protection-role": "WORK", + "local-id": "XPDR-C1-XPDR1", + "name": [ { "value-name": "OpenROADM node id", "value": "XPDR-C1-XPDR1" } - ] + ] + }, + { + "layer-protocol-name": "DSR", + "service-interface-point": { + "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e" }, - { - "layer-protocol-name": "DSR", - "service-interface-point": { - "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e" - }, - "administrative-state": "UNLOCKED", - "operational-state": "ENABLED", - "direction": "BIDIRECTIONAL", - "role": "SYMMETRIC", - "protection-role": "WORK", - "local-id": "XPDR-A1-XPDR1", - "name": [ + "administrative-state": "UNLOCKED", + "operational-state": "ENABLED", + "direction": "BIDIRECTIONAL", + "role": "SYMMETRIC", + "protection-role": "WORK", + "local-id": "XPDR-A1-XPDR1", + "name": [ { "value-name": "OpenROADM node id", "value": "XPDR-A1-XPDR1" } - ] + ] + } + ], + "connectivity-constraint": { + "service-layer": "ETH", + "service-type": "POINT_TO_POINT_CONNECTIVITY", + "service-level": "Some service-level", + "requested-capacity": { + "total-size": { + "value": "100", + "unit": "GB" } - ], - "connectivity-constraint": { - "service-layer": "ETH", - "service-type": "POINT_TO_POINT_CONNECTIVITY", - "service-level": "Some service-level", - "requested-capacity": { - "total-size": { - "value": "100", - "unit": "GB" - } - } - }, - "state": "Some state" - } + } + }, + "state": "Some state" } + del_serv_input_data = {"service-id-or-name": "TBD"} + + tapi_topo = {"topology-id-or-name": "TBD"} + + node_details = { + "topology-id-or-name": "TBD", + "node-id-or-name": "TBD" + } + + tapi_serv_details = {"service-id-or-name": "TBD"} + processes = [] uuid_services = UuidServices() WAITING = 25 # nominal value is 300 @@ -102,87 +111,91 @@ class TransportPCEFulltesting(unittest.TestCase): cls.init_failed = False os.environ['JAVA_MIN_MEM'] = '1024M' os.environ['JAVA_MAX_MEM'] = '4096M' - cls.processes = test_utils.start_tpce() + cls.processes = test_utils_rfc8040.start_tpce() # TAPI feature is not installed by default in Karaf if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True': print("installing tapi feature...") - result = test_utils.install_karaf_feature("odl-transportpce-tapi") + result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi") if result.returncode != 0: cls.init_failed = True print("Restarting OpenDaylight...") - test_utils.shutdown_process(cls.processes[0]) - cls.processes[0] = test_utils.start_karaf() - test_utils.process_list[0] = cls.processes[0] - cls.init_failed = not test_utils.wait_until_log_contains( - test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60) + test_utils_rfc8040.shutdown_process(cls.processes[0]) + cls.processes[0] = test_utils_rfc8040.start_karaf() + test_utils_rfc8040.process_list[0] = cls.processes[0] + cls.init_failed = not test_utils_rfc8040.wait_until_log_contains( + test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60) if cls.init_failed: print("tapi installation feature failed...") - test_utils.shutdown_process(cls.processes[0]) + test_utils_rfc8040.shutdown_process(cls.processes[0]) sys.exit(2) - cls.processes = test_utils.start_tpce() - cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221), - ('roadma', cls.NODE_VERSION_221), - ('roadmc', cls.NODE_VERSION_221), - ('xpdrc', cls.NODE_VERSION_221)]) + cls.processes = test_utils_rfc8040.start_tpce() + cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION_221), + ('roadma', cls.NODE_VERSION_221), + ('roadmc', cls.NODE_VERSION_221), + ('xpdrc', cls.NODE_VERSION_221)]) @classmethod def tearDownClass(cls): # pylint: disable=not-an-iterable for process in cls.processes: - test_utils.shutdown_process(process) + test_utils_rfc8040.shutdown_process(process) print("all processes killed") - time.sleep(10) def setUp(self): # instruction executed before each test method # pylint: disable=consider-using-f-string print("execution of {}".format(self.id().split(".")[-1])) + time.sleep(1) def test_01_connect_xpdrA(self): - response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_02_connect_xpdrC(self): - response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_03_connect_rdmA(self): - response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_04_connect_rdmC(self): - response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) def test_05_connect_xprdA_N1_to_roadmA_PP1(self): - response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1", - "ROADM-A1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"]) time.sleep(2) def test_06_connect_roadmA_PP1_to_xpdrA_N1(self): - response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1", - "ROADM-A1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"]) time.sleep(2) def test_07_connect_xprdC_N1_to_roadmC_PP1(self): - response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1", - "ROADM-C1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-xpdr-rdm-links', + {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"]) time.sleep(2) def test_08_connect_roadmC_PP1_to_xpdrC_N1(self): - response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1", - "ROADM-C1", "1", "SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Roadm Xponder links created successfully', res["output"]["result"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-networkutils', 'init-rdm-xpdr-links', + {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1', + 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}}) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Roadm Xponder links created successfully', response["output"]["result"]) time.sleep(2) def test_09_add_omsAttributes_ROADMA_ROADMC(self): @@ -197,7 +210,8 @@ class TransportPCEFulltesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data) + response = test_utils_rfc8040.add_oms_attr_request( + "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) def test_10_add_omsAttributes_ROADMC_ROADMA(self): @@ -212,16 +226,16 @@ class TransportPCEFulltesting(unittest.TestCase): "fiber-type": "smf", "SRLG-length": 100000, "pmd": 0.5}]}} - response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data) + response = test_utils_rfc8040.add_oms_attr_request( + "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data) self.assertEqual(response.status_code, requests.codes.created) # test service-create for Eth service from xpdr to xpdr def test_11_create_connectivity_service_Ethernet(self): - response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data) time.sleep(self.WAITING) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.uuid_services.eth = res['output']['service']['uuid'] + self.uuid_services.eth = response['output']['service']['uuid'] # pylint: disable=consider-using-f-string input_dict_1 = {'administrative-state': 'LOCKED', @@ -236,42 +250,33 @@ class TransportPCEFulltesting(unittest.TestCase): input_dict_3 = {'value-name': 'OpenROADM node id', 'value': 'XPDR-A1-XPDR1'} - self.assertDictEqual(dict(input_dict_1, **res['output']['service']), - res['output']['service']) - self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]), - res['output']['service']['end-point'][0]['name'][0]) - self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]), - res['output']['service']['end-point'][1]['name'][0]) + self.assertDictEqual(dict(input_dict_1, **response['output']['service']), + response['output']['service']) + self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]), + response['output']['service']['end-point'][0]['name'][0]) + self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]), + response['output']['service']['end-point'][1]['name'][0]) # If the gate fails is because of the waiting time not being enough - time.sleep(self.WAITING) +# time.sleep(self.WAITING) def test_12_get_service_Ethernet(self): - response = test_utils.get_service_list_request("services/" + str(self.uuid_services.eth)) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['services'][0]['administrative-state'], 'inService') - self.assertEqual( - res['services'][0]['service-name'], self.uuid_services.eth) - self.assertEqual( - res['services'][0]['connection-type'], 'service') - self.assertEqual( - res['services'][0]['lifecycle-state'], 'planned') - time.sleep(2) + response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth)) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['services'][0]['administrative-state'], 'inService') + self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.eth)) + self.assertEqual(response['services'][0]['connection-type'], 'service') + self.assertEqual(response['services'][0]['lifecycle-state'], 'planned') + time.sleep(1) def test_13_get_connectivity_service_Ethernet(self): - response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth)) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['output']['service']['operational-state'], 'ENABLED') - self.assertEqual( - res['output']['service']['name'][0]['value'], self.uuid_services.eth) - self.assertEqual( - res['output']['service']['administrative-state'], 'UNLOCKED') - self.assertEqual( - res['output']['service']['lifecycle-state'], 'INSTALLED') - time.sleep(2) + self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['output']['service']['operational-state'], 'ENABLED') + self.assertEqual(response['output']['service']['name'][0]['value'], self.uuid_services.eth) + self.assertEqual(response['output']['service']['administrative-state'], 'UNLOCKED') + self.assertEqual(response['output']['service']['lifecycle-state'], 'INSTALLED') def test_14_change_status_line_port_xpdrc(self): url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1" @@ -281,17 +286,16 @@ class TransportPCEFulltesting(unittest.TestCase): "administrative-state": "outOfService", "port-qual": "xpdr-network"}]} response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) + data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON, + auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD), + timeout=test_utils_rfc8040.REQUEST_TIMEOUT) self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) def test_15_check_update_portmapping(self): - response = test_utils.portmapping_request("XPDR-C1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - mapping_list = res['nodes'][0]['mapping'] + response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None) + self.assertEqual(response['status_code'], requests.codes.ok) + mapping_list = response['nodes'][0]['mapping'] for mapping in mapping_list: if mapping['logical-connection-point'] == 'XPDR1-NETWORK1': self.assertEqual(mapping['port-oper-state'], 'OutOfService', @@ -306,11 +310,9 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_16_check_update_openroadm_topo(self): - url = test_utils.URL_CONFIG_ORDM_TOPO - response = test_utils.get_request(url) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - node_list = res['network'][0]['node'] + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + node_list = response['network'][0]['node'] nb_updated_tp = 0 for node in node_list: self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService') @@ -326,7 +328,7 @@ class TransportPCEFulltesting(unittest.TestCase): self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService') self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified") - link_list = res['network'][0]['ietf-network-topology:link'] + link_list = response['network'][0]['ietf-network-topology:link'] updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX', 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1'] nb_updated_link = 0 @@ -342,10 +344,12 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_17_check_update_tapi_neps(self): - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] nb_updated_neps = 0 for nep in nep_list: if 'XPDR1-NETWORK1' in nep['name'][0]['value']: @@ -359,10 +363,11 @@ class TransportPCEFulltesting(unittest.TestCase): "Operational State should be 'ENABLED'") self.assertEqual(nep['administrative-state'], 'UNLOCKED', "Administrative State should be 'UNLOCKED'") - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] for nep in nep_list: if 'XPDR1-NETWORK1' in nep['name'][0]['value']: self.assertEqual(nep['operational-state'], 'DISABLED', @@ -379,11 +384,12 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_18_check_update_tapi_links(self): - response = test_utils.tapi_get_topology_details_request("T0 - Full Multi-layer topology") + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) time.sleep(2) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - link_list = res['output']['topology']['link'] + self.assertEqual(response['status_code'], requests.codes.ok) + link_list = response['output']['topology']['link'] nb_updated_link = 0 for link in link_list: if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']): @@ -398,22 +404,18 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_19_check_update_service_Ethernet(self): - response = test_utils.get_service_list_request( - "services/" + str(self.uuid_services.eth)) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual(res['services'][0]['operational-state'], 'outOfService') - self.assertEqual(res['services'][0]['administrative-state'], 'inService') - time.sleep(1) + response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth)) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['services'][0]['operational-state'], 'outOfService') + self.assertEqual(response['services'][0]['administrative-state'], 'inService') def test_20_check_update_connectivity_service_Ethernet(self): - response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth)) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['output']['service']['operational-state'], 'DISABLED') - self.assertEqual( - res['output']['service']['administrative-state'], 'LOCKED') + self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['output']['service']['operational-state'], 'DISABLED') + self.assertEqual(response['output']['service']['administrative-state'], 'LOCKED') time.sleep(1) def test_21_restore_status_line_port_xpdrc(self): @@ -424,17 +426,16 @@ class TransportPCEFulltesting(unittest.TestCase): "administrative-state": "inService", "port-qual": "xpdr-network"}]} response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) + data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON, + auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD), + timeout=test_utils_rfc8040.REQUEST_TIMEOUT) self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) def test_22_check_update_portmapping_ok(self): - response = test_utils.portmapping_request("XPDR-C1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - mapping_list = res['nodes'][0]['mapping'] + response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None) + self.assertEqual(response['status_code'], requests.codes.ok) + mapping_list = response['nodes'][0]['mapping'] for mapping in mapping_list: self.assertEqual(mapping['port-oper-state'], 'InService', "Operational State should be 'InService'") @@ -443,11 +444,9 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_23_check_update_openroadm_topo_ok(self): - url = test_utils.URL_CONFIG_ORDM_TOPO - response = test_utils.get_request(url) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - node_list = res['network'][0]['node'] + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + node_list = response['network'][0]['node'] for node in node_list: self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService') self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService') @@ -456,27 +455,30 @@ class TransportPCEFulltesting(unittest.TestCase): self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService') self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService') - link_list = res['network'][0]['ietf-network-topology:link'] + link_list = response['network'][0]['ietf-network-topology:link'] for link in link_list: self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService') self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService') time.sleep(1) def test_24_check_update_tapi_neps_ok(self): - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] for nep in nep_list: self.assertEqual(nep['operational-state'], 'ENABLED', "Operational State should be 'ENABLED'") self.assertEqual(nep['administrative-state'], 'UNLOCKED', "Administrative State should be 'UNLOCKED'") - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] for nep in nep_list: self.assertEqual(nep['operational-state'], 'ENABLED', "Operational State should be 'ENABLED'") @@ -485,16 +487,14 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_25_check_update_tapi_links_ok(self): - response = test_utils.tapi_get_topology_details_request( - "T0 - Full Multi-layer topology") + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) time.sleep(2) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - link_list = res['output']['topology']['link'] + link_list = response['output']['topology']['link'] for link in link_list: self.assertEqual(link['operational-state'], 'ENABLED') self.assertEqual(link['administrative-state'], 'UNLOCKED') - time.sleep(1) def test_26_check_update_service1_ok(self): @@ -513,17 +513,16 @@ class TransportPCEFulltesting(unittest.TestCase): "administrative-state": "outOfService", "port-qual": "roadm-external"}]} response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) + data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON, + auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD), + timeout=test_utils_rfc8040.REQUEST_TIMEOUT) self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) def test_29_check_update_portmapping(self): - response = test_utils.portmapping_request("ROADM-A1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - mapping_list = res['nodes'][0]['mapping'] + response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None) + self.assertEqual(response['status_code'], requests.codes.ok) + mapping_list = response['nodes'][0]['mapping'] for mapping in mapping_list: if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX': self.assertEqual(mapping['port-oper-state'], 'OutOfService', @@ -538,11 +537,9 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_30_check_update_openroadm_topo(self): - url = test_utils.URL_CONFIG_ORDM_TOPO - response = test_utils.get_request(url) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - node_list = res['network'][0]['node'] + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + node_list = response['network'][0]['node'] nb_updated_tp = 0 for node in node_list: self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService') @@ -558,7 +555,7 @@ class TransportPCEFulltesting(unittest.TestCase): self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService') self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified") - link_list = res['network'][0]['ietf-network-topology:link'] + link_list = response['network'][0]['ietf-network-topology:link'] updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX', 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1'] nb_updated_link = 0 @@ -574,10 +571,12 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_31_check_update_tapi_neps(self): - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] nb_updated_neps = 0 for nep in nep_list: if 'SRG1-PP1-TXRX' in nep['name'][0]['value']: @@ -595,12 +594,11 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_32_check_update_tapi_links(self): - response = test_utils.tapi_get_topology_details_request( - "T0 - Full Multi-layer topology") + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) time.sleep(2) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - link_list = res['output']['topology']['link'] + link_list = response['output']['topology']['link'] nb_updated_link = 0 for link in link_list: if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']): @@ -630,17 +628,16 @@ class TransportPCEFulltesting(unittest.TestCase): "administrative-state": "inService", "port-qual": "roadm-external"}]} response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) + data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON, + auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD), + timeout=test_utils_rfc8040.REQUEST_TIMEOUT) self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) def test_36_check_update_portmapping_ok(self): - response = test_utils.portmapping_request("ROADM-A1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - mapping_list = res['nodes'][0]['mapping'] + response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None) + self.assertEqual(response['status_code'], requests.codes.ok) + mapping_list = response['nodes'][0]['mapping'] for mapping in mapping_list: self.assertEqual(mapping['port-oper-state'], 'InService', "Operational State should be 'InService'") @@ -652,10 +649,12 @@ class TransportPCEFulltesting(unittest.TestCase): self.test_23_check_update_openroadm_topo_ok() def test_38_check_update_tapi_neps_ok(self): - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] for nep in nep_list: self.assertEqual(nep['operational-state'], 'ENABLED', "Operational State should be 'ENABLED'") @@ -683,17 +682,16 @@ class TransportPCEFulltesting(unittest.TestCase): "administrative-state": "outOfService", "port-qual": "roadm-external"}]} response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) + data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON, + auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD), + timeout=test_utils_rfc8040.REQUEST_TIMEOUT) self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) def test_43_check_update_portmapping(self): - response = test_utils.portmapping_request("ROADM-A1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - mapping_list = res['nodes'][0]['mapping'] + response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None) + self.assertEqual(response['status_code'], requests.codes.ok) + mapping_list = response['nodes'][0]['mapping'] for mapping in mapping_list: if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX': self.assertEqual(mapping['port-oper-state'], 'OutOfService', @@ -708,11 +706,9 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_44_check_update_openroadm_topo(self): - url = test_utils.URL_CONFIG_ORDM_TOPO - response = test_utils.get_request(url) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - node_list = res['network'][0]['node'] + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + node_list = response['network'][0]['node'] nb_updated_tp = 0 for node in node_list: self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService') @@ -728,7 +724,7 @@ class TransportPCEFulltesting(unittest.TestCase): self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService') self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified") - link_list = res['network'][0]['ietf-network-topology:link'] + link_list = response['network'][0]['ietf-network-topology:link'] updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX', 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX'] nb_updated_link = 0 @@ -744,10 +740,12 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_45_check_update_tapi_neps(self): - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] nb_updated_neps = 0 for nep in nep_list: if 'DEG2-TTP-TXRX' in nep['name'][0]['value']: @@ -765,12 +763,12 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_46_check_update_tapi_links(self): - response = test_utils.tapi_get_topology_details_request( - "T0 - Full Multi-layer topology") + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) time.sleep(2) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - link_list = res['output']['topology']['link'] + self.assertEqual(response['status_code'], requests.codes.ok) + link_list = response['output']['topology']['link'] nb_updated_link = 0 for link in link_list: if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']): @@ -800,9 +798,9 @@ class TransportPCEFulltesting(unittest.TestCase): "administrative-state": "inService", "port-qual": "roadm-external"}]} response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) + data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON, + auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD), + timeout=test_utils_rfc8040.REQUEST_TIMEOUT) self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) @@ -834,17 +832,16 @@ class TransportPCEFulltesting(unittest.TestCase): "administrative-state": "outOfService", "port-qual": "roadm-external"}]} response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) + data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON, + auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD), + timeout=test_utils_rfc8040.REQUEST_TIMEOUT) self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) def test_57_check_update_portmapping(self): - response = test_utils.portmapping_request("ROADM-A1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - mapping_list = res['nodes'][0]['mapping'] + response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None) + self.assertEqual(response['status_code'], requests.codes.ok) + mapping_list = response['nodes'][0]['mapping'] for mapping in mapping_list: if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX': self.assertEqual(mapping['port-oper-state'], 'OutOfService', @@ -859,11 +856,9 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_58_check_update_openroadm_topo(self): - url = test_utils.URL_CONFIG_ORDM_TOPO - response = test_utils.get_request(url) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - node_list = res['network'][0]['node'] + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + node_list = response['network'][0]['node'] nb_updated_tp = 0 for node in node_list: self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService') @@ -879,7 +874,7 @@ class TransportPCEFulltesting(unittest.TestCase): self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService') self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified") - link_list = res['network'][0]['ietf-network-topology:link'] + link_list = response['network'][0]['ietf-network-topology:link'] nb_updated_link = 0 for link in link_list: self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService') @@ -888,10 +883,12 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_59_check_update_tapi_neps(self): - response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - nep_list = res['output']['node']['owned-node-edge-point'] + self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA" + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-node-details', self.node_details) + self.assertEqual(response['status_code'], requests.codes.ok) + nep_list = response['output']['node']['owned-node-edge-point'] nb_updated_neps = 0 for nep in nep_list: if 'SRG1-PP2-TXRX' in nep['name'][0]['value']: @@ -909,12 +906,12 @@ class TransportPCEFulltesting(unittest.TestCase): time.sleep(1) def test_60_check_update_tapi_links(self): - response = test_utils.tapi_get_topology_details_request( - "T0 - Full Multi-layer topology") + self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-topology', 'get-topology-details', self.tapi_topo) time.sleep(2) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - link_list = res['output']['topology']['link'] + self.assertEqual(response['status_code'], requests.codes.ok) + link_list = response['output']['topology']['link'] nb_updated_link = 0 for link in link_list: if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']): @@ -935,37 +932,37 @@ class TransportPCEFulltesting(unittest.TestCase): self.test_13_get_connectivity_service_Ethernet() def test_63_delete_connectivity_service_Ethernet(self): - response = test_utils.tapi_delete_connectivity_request(str(self.uuid_services.eth)) - self.assertEqual(response.status_code, requests.codes.no_content) + self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.eth) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data) + self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content)) time.sleep(self.WAITING) def test_64_disconnect_xponders_from_roadm(self): - url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/" - response = test_utils.get_ordm_topo_request("") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - links = res['network'][0]['ietf-network-topology:link'] + response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + links = response['network'][0]['ietf-network-topology:link'] for link in links: if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'): - link_name = link["link-id"] - response = test_utils.delete_request(url+link_name) - self.assertEqual(response.status_code, requests.codes.ok) + response = test_utils_rfc8040.del_ietf_network_link_request( + 'openroadm-topology', link['link-id'], 'config') + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_65_disconnect_XPDRA(self): - response = test_utils.unmount_device("XPDR-A1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("XPDR-A1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_66_disconnect_XPDRC(self): - response = test_utils.unmount_device("XPDR-C1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("XPDR-C1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_67_disconnect_ROADMA(self): - response = test_utils.unmount_device("ROADM-A1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("ROADM-A1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_68_disconnect_ROADMC(self): - response = test_utils.unmount_device("ROADM-C1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("ROADM-C1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) if __name__ == "__main__": -- 2.36.6