From 5a68673a3b4389adc85fc0afb8e12a07c9d11eb0 Mon Sep 17 00:00:00 2001 From: Balagangadhar Bathula Date: Thu, 26 May 2022 12:27:34 -0400 Subject: [PATCH] Renderer functional tests for 200G rate 31.6 Gbaud - Renderer service-path create and delete - Fix a minor bug JIRA: TRNSPRTPCE-637 Change-Id: Ibc4de5354e1188dec792fbc719513e4261d47312 Signed-off-by: Balagangadhar Bathula --- .../7.1/test02_otn_renderer.py | 6 +- .../7.1/test03_renderer_or_modes.py | 193 +++++++++++++++++- 2 files changed, 190 insertions(+), 9 deletions(-) diff --git a/tests/transportpce_tests/7.1/test02_otn_renderer.py b/tests/transportpce_tests/7.1/test02_otn_renderer.py index b7510bc42..e5e8ed4c2 100644 --- a/tests/transportpce_tests/7.1/test02_otn_renderer.py +++ b/tests/transportpce_tests/7.1/test02_otn_renderer.py @@ -418,16 +418,16 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): del self.NETWORK2_CHECK_DICT["supporting-otucn"] def test_23_check_no_interface_otuc2(self): - response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2") + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2") self.assertEqual(response['status_code'], requests.codes.conflict) def test_24_check_no_interface_otsig(self): response = test_utils_rfc8040.check_node_attribute_request( - "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") self.assertEqual(response['status_code'], requests.codes.conflict) def test_25_check_no_interface_otsi(self): - response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768") + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768") self.assertEqual(response['status_code'], requests.codes.conflict) def test_25a_check_no_otuc2(self): diff --git a/tests/transportpce_tests/7.1/test03_renderer_or_modes.py b/tests/transportpce_tests/7.1/test03_renderer_or_modes.py index 4f988f6ab..92e733541 100644 --- a/tests/transportpce_tests/7.1/test03_renderer_or_modes.py +++ b/tests/transportpce_tests/7.1/test03_renderer_or_modes.py @@ -536,12 +536,193 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272") self.assertEqual(response["status_code"], requests.codes.conflict) + # 200G 31.6 Gbaud mode for muxponder, with qam16 + def test_32_service_path_create_otuc2(self): + response = test_utils_rfc8040.transportpce_api_rpc_request( + "transportpce-device-renderer", "service-path", + { + "service-name": "service_OTUC2", + "wave-number": "0", + "modulation-format": "dp-qam16", + "operation": "create", + "nodes": [{ + "node-id": "XPDR-A2", + "dest-tp": "XPDR2-NETWORK1" + }, + { + "node-id": "XPDR-C2", + "dest-tp": "XPDR2-NETWORK1" + }], + "center-freq": 193.0, + "nmc-width": 37.5, + "min-freq": 192.975, + "max-freq": 193.025, + "lower-spectral-slot-number": 265, + "higher-spectral-slot-number": 272, + }) + self.assertEqual(response["status_code"], requests.codes.ok) + self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"]) + # node-interface is list which does not preserve the order + a_side = "XPDR-A2" + z_side = "XPDR-C2" + if response["output"]["node-interface"][0]["node-id"] == z_side: + a_side, z_side = z_side, a_side + self.assertEqual( + {"node-id": a_side, + "otu-interface-id": [ + "XPDR2-NETWORK1-OTUC2" + ], + "och-interface-id": [ + "XPDR2-NETWORK1-OTSIGROUP-200G", + "XPDR2-NETWORK1-265:272" + ]}, + response["output"]["node-interface"][0]) + self.assertEqual( + {"node-id": z_side, + "otu-interface-id": [ + "XPDR2-NETWORK1-OTUC2" + ], + "och-interface-id": [ + "XPDR2-NETWORK1-OTSIGROUP-200G", + "XPDR2-NETWORK1-265:272" + ]}, + response["output"]["node-interface"][1]) + # Update the network dict variable for mpdr + self.NETWORK2_CHECK_DICT["logical-connection-point"] = "XPDR2-NETWORK1" + self.NETWORK2_CHECK_DICT["supporting-circuit-pack-name"] = "1/2/2-PLUG-NET" + self.NETWORK2_CHECK_DICT["port-qual"] = "switch-network" + self.NETWORK2_CHECK_DICT["xponder-type"] = "mpdr" + self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw=" + + def test_33_get_portmapping_network1(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertEqual(response["status_code"], requests.codes.ok) + self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2" + self.assertIn( + self.NETWORK2_CHECK_DICT, + response["mapping"]) + + def test_34_check_interface_otsi(self): + # pylint: disable=line-too-long + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272") + self.assertEqual(response["status_code"], requests.codes.ok) + input_dict_1 = {"name": "XPDR2-NETWORK1-265:272", + "administrative-state": "inService", + "supporting-circuit-pack-name": "1/2/2-PLUG-NET", + "type": "org-openroadm-interfaces:otsi", + "supporting-port": "L1"} + input_dict_2 = { + "frequency": 193.00000, + "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi", + "fec": "org-openroadm-common-types:ofec", + "transmit-power": -5, + "provision-mode": "explicit", + "modulation-format": "dp-qam16"} + + self.assertDictEqual(dict(input_dict_1, **response["interface"][0]), + response["interface"][0]) + self.assertDictEqual(dict(input_dict_2, + **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]), + response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]) + self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.8", "iid": [1, 2]}, + response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"]) + + def test_35_check_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") + self.assertEqual(response["status_code"], requests.codes.ok) + input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G", + "administrative-state": "inService", + "supporting-circuit-pack-name": "1/2/2-PLUG-NET", + ["supporting-interface-list"][0]: "XPDR2-NETWORK1-265:272", + "type": "org-openroadm-interfaces:otsi-group", + "supporting-port": "L1"} + input_dict_2 = {"group-id": 1, + "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"} + + self.assertDictEqual(dict(input_dict_1, **response["interface"][0]), + response["interface"][0]) + self.assertDictEqual(dict(input_dict_2, + **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]), + response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]) + + def test_36_check_interface_otuc2(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2") + self.assertEqual(response["status_code"], requests.codes.ok) + input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2", + "administrative-state": "inService", + "supporting-circuit-pack-name": "1/2/2-PLUG-NET", + ["supporting-interface-list"][0]: "XPDR2-NETWORK1-OTSIGROUP-200G", + "type": "org-openroadm-interfaces:otnOtu", + "supporting-port": "L1"} + input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn", + "degthr-percentage": 100, + "tim-detect-mode": "Disabled", + "otucn-n-rate": 2, + "degm-intervals": 2} + + self.assertDictEqual(dict(input_dict_1, **response["interface"][0]), + response["interface"][0]) + self.assertDictEqual(dict(input_dict_2, + **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]), + response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]) + + # We do not need other test cases for the otn-service-path (ODUC2 and Ethernet) + # as there is no change in code and are covered in test02_otn_renderer + + def test_37_service_path_delete_otuc2(self): + response = test_utils_rfc8040.transportpce_api_rpc_request( + "transportpce-device-renderer", "service-path", + { + "modulation-format": "dp-qam16", + "operation": "delete", + "service-name": "service_OTUC2", + "wave-number": "0", + "center-freq": 193.0, + "nmc-width": 37.5, + "min-freq": 192.975, + "max-freq": 193.025, + "lower-spectral-slot-number": 265, + "higher-spectral-slot-number": 272, + "nodes": [ + { + "node-id": "XPDR-A2", + "dest-tp": "XPDR2-NETWORK1" + }, + { + "node-id": "XPDR-C2", + "dest-tp": "XPDR2-NETWORK1" + } + ] + }) + self.assertEqual(response["status_code"], requests.codes.ok) + self.assertIn("Request processed", response["output"]["result"]) + del self.NETWORK2_CHECK_DICT["supporting-otucn"] + + def test_38_check_no_interface_otuc2(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2") + self.assertEqual(response["status_code"], requests.codes.conflict) + + def test_39_check_no_interface_otsig(self): + response = test_utils_rfc8040.check_node_attribute_request( + "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") + self.assertEqual(response["status_code"], requests.codes.conflict) + + def test_40_check_no_interface_otsi(self): + response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768") + self.assertEqual(response["status_code"], requests.codes.conflict) + + def test_41_check_no_otuc2(self): + response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1") + self.assertRaises(KeyError, lambda: response["supporting-otucn"]) + # Disconnect the XPDR - def test_32_xpdr_device_disconnection(self): + def test_42_xpdr_device_disconnection(self): response = test_utils_rfc8040.unmount_device("XPDR-A2") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) - def test_33_xpdr_device_disconnected(self): + def test_43_xpdr_device_disconnected(self): response = test_utils_rfc8040.check_device_connection("XPDR-A2") self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["connection-status"]["error-type"], ("protocol", "application")) @@ -549,7 +730,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): self.assertEqual(response["connection-status"]["error-message"], "Request could not be completed because the relevant data model content does not exist") - def test_34_xpdr_device_not_connected(self): + def test_44_xpdr_device_not_connected(self): response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2") self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["node-info"]["error-type"], ("protocol", "application")) @@ -557,11 +738,11 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): self.assertEqual(response["node-info"]["error-message"], "Request could not be completed because the relevant data model content does not exist") - def test_35_xpdr_device_disconnection(self): + def test_45_xpdr_device_disconnection(self): response = test_utils_rfc8040.unmount_device("XPDR-C2") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) - def test_36_xpdr_device_disconnected(self): + def test_46_xpdr_device_disconnected(self): response = test_utils_rfc8040.check_device_connection("XPDR-C2") self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["connection-status"]["error-type"], ("protocol", "application")) @@ -569,7 +750,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): self.assertEqual(response["connection-status"]["error-message"], "Request could not be completed because the relevant data model content does not exist") - def test_37_xpdr_device_not_connected(self): + def test_47_xpdr_device_not_connected(self): response = test_utils_rfc8040.get_portmapping_node_info("XPDR-C2") self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["node-info"]["error-type"], ("protocol", "application")) -- 2.36.6