Renderer functional tests for 200G rate 31.6 Gbaud 04/101304/2
authorBalagangadhar Bathula <bb4341@att.com>
Thu, 26 May 2022 16:27:34 +0000 (12:27 -0400)
committerBalagangadhar Bathula <bb4341@att.com>
Thu, 26 May 2022 19:33:24 +0000 (15:33 -0400)
- Renderer service-path create and delete
- Fix a minor bug

JIRA: TRNSPRTPCE-637
Change-Id: Ibc4de5354e1188dec792fbc719513e4261d47312
Signed-off-by: Balagangadhar Bathula <bb4341@att.com>
tests/transportpce_tests/7.1/test02_otn_renderer.py
tests/transportpce_tests/7.1/test03_renderer_or_modes.py

index b7510bc42e4e8d89879dae409540568b808a2cd9..e5e8ed4c2e5bc45b4535795d23b76e190c50b05e 100644 (file)
@@ -418,16 +418,16 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         del self.NETWORK2_CHECK_DICT["supporting-otucn"]
 
     def test_23_check_no_interface_otuc2(self):
-        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
         self.assertEqual(response['status_code'], requests.codes.conflict)
 
     def test_24_check_no_interface_otsig(self):
         response = test_utils_rfc8040.check_node_attribute_request(
-            "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
         self.assertEqual(response['status_code'], requests.codes.conflict)
 
     def test_25_check_no_interface_otsi(self):
-        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
         self.assertEqual(response['status_code'], requests.codes.conflict)
 
     def test_25a_check_no_otuc2(self):
index 4f988f6ab4363ca04d33a5ac223dabad689a08ac..92e733541a87b09ed9fc1980f668d12f288ec0c6 100644 (file)
@@ -536,12 +536,193 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
             "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
+    # 200G 31.6 Gbaud mode for muxponder, with qam16
+    def test_32_service_path_create_otuc2(self):
+        response = test_utils_rfc8040.transportpce_api_rpc_request(
+            "transportpce-device-renderer", "service-path",
+            {
+                "service-name": "service_OTUC2",
+                "wave-number": "0",
+                "modulation-format": "dp-qam16",
+                "operation": "create",
+                "nodes": [{
+                    "node-id": "XPDR-A2",
+                    "dest-tp": "XPDR2-NETWORK1"
+                },
+                    {
+                    "node-id": "XPDR-C2",
+                    "dest-tp": "XPDR2-NETWORK1"
+                }],
+                "center-freq": 193.0,
+                "nmc-width": 37.5,
+                "min-freq": 192.975,
+                "max-freq": 193.025,
+                "lower-spectral-slot-number": 265,
+                "higher-spectral-slot-number": 272,
+            })
+        self.assertEqual(response["status_code"], requests.codes.ok)
+        self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
+        # node-interface is list which does not preserve the order
+        a_side = "XPDR-A2"
+        z_side = "XPDR-C2"
+        if response["output"]["node-interface"][0]["node-id"] == z_side:
+            a_side, z_side = z_side, a_side
+        self.assertEqual(
+            {"node-id": a_side,
+             "otu-interface-id": [
+                 "XPDR2-NETWORK1-OTUC2"
+             ],
+             "och-interface-id": [
+                 "XPDR2-NETWORK1-OTSIGROUP-200G",
+                 "XPDR2-NETWORK1-265:272"
+             ]},
+            response["output"]["node-interface"][0])
+        self.assertEqual(
+            {"node-id": z_side,
+             "otu-interface-id": [
+                 "XPDR2-NETWORK1-OTUC2"
+             ],
+             "och-interface-id": [
+                 "XPDR2-NETWORK1-OTSIGROUP-200G",
+                 "XPDR2-NETWORK1-265:272"
+             ]},
+            response["output"]["node-interface"][1])
+        # Update the network dict variable for mpdr
+        self.NETWORK2_CHECK_DICT["logical-connection-point"] = "XPDR2-NETWORK1"
+        self.NETWORK2_CHECK_DICT["supporting-circuit-pack-name"] = "1/2/2-PLUG-NET"
+        self.NETWORK2_CHECK_DICT["port-qual"] = "switch-network"
+        self.NETWORK2_CHECK_DICT["xponder-type"] = "mpdr"
+        self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw="
+
+    def test_33_get_portmapping_network1(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertEqual(response["status_code"], requests.codes.ok)
+        self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
+        self.assertIn(
+            self.NETWORK2_CHECK_DICT,
+            response["mapping"])
+
+    def test_34_check_interface_otsi(self):
+        # pylint: disable=line-too-long
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
+        self.assertEqual(response["status_code"], requests.codes.ok)
+        input_dict_1 = {"name": "XPDR2-NETWORK1-265:272",
+                        "administrative-state": "inService",
+                        "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+                        "type": "org-openroadm-interfaces:otsi",
+                        "supporting-port": "L1"}
+        input_dict_2 = {
+            "frequency": 193.00000,
+            "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
+            "fec": "org-openroadm-common-types:ofec",
+            "transmit-power": -5,
+            "provision-mode": "explicit",
+            "modulation-format": "dp-qam16"}
+
+        self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+                             response["interface"][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
+                             response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
+        self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.8", "iid": [1, 2]},
+                             response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
+
+    def test_35_check_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+        self.assertEqual(response["status_code"], requests.codes.ok)
+        input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G",
+                        "administrative-state": "inService",
+                        "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+                        ["supporting-interface-list"][0]: "XPDR2-NETWORK1-265:272",
+                        "type": "org-openroadm-interfaces:otsi-group",
+                        "supporting-port": "L1"}
+        input_dict_2 = {"group-id": 1,
+                        "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
+
+        self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+                             response["interface"][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
+                             response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
+
+    def test_36_check_interface_otuc2(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+        self.assertEqual(response["status_code"], requests.codes.ok)
+        input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2",
+                        "administrative-state": "inService",
+                        "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+                        ["supporting-interface-list"][0]: "XPDR2-NETWORK1-OTSIGROUP-200G",
+                        "type": "org-openroadm-interfaces:otnOtu",
+                        "supporting-port": "L1"}
+        input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+                        "degthr-percentage": 100,
+                        "tim-detect-mode": "Disabled",
+                        "otucn-n-rate": 2,
+                        "degm-intervals": 2}
+
+        self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+                             response["interface"][0])
+        self.assertDictEqual(dict(input_dict_2,
+                                  **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
+                             response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
+
+    # We do not need other test cases for the otn-service-path (ODUC2 and Ethernet)
+    # as there is no change in code and are covered in test02_otn_renderer
+
+    def test_37_service_path_delete_otuc2(self):
+        response = test_utils_rfc8040.transportpce_api_rpc_request(
+            "transportpce-device-renderer", "service-path",
+            {
+                "modulation-format": "dp-qam16",
+                "operation": "delete",
+                "service-name": "service_OTUC2",
+                "wave-number": "0",
+                "center-freq": 193.0,
+                "nmc-width": 37.5,
+                "min-freq": 192.975,
+                "max-freq": 193.025,
+                "lower-spectral-slot-number": 265,
+                "higher-spectral-slot-number": 272,
+                "nodes": [
+                    {
+                        "node-id": "XPDR-A2",
+                        "dest-tp": "XPDR2-NETWORK1"
+                    },
+                    {
+                        "node-id": "XPDR-C2",
+                        "dest-tp": "XPDR2-NETWORK1"
+                    }
+                ]
+            })
+        self.assertEqual(response["status_code"], requests.codes.ok)
+        self.assertIn("Request processed", response["output"]["result"])
+        del self.NETWORK2_CHECK_DICT["supporting-otucn"]
+
+    def test_38_check_no_interface_otuc2(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+        self.assertEqual(response["status_code"], requests.codes.conflict)
+
+    def test_39_check_no_interface_otsig(self):
+        response = test_utils_rfc8040.check_node_attribute_request(
+            "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+        self.assertEqual(response["status_code"], requests.codes.conflict)
+
+    def test_40_check_no_interface_otsi(self):
+        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+        self.assertEqual(response["status_code"], requests.codes.conflict)
+
+    def test_41_check_no_otuc2(self):
+        response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+        self.assertRaises(KeyError, lambda: response["supporting-otucn"])
+
     # Disconnect the XPDR
-    def test_32_xpdr_device_disconnection(self):
+    def test_42_xpdr_device_disconnection(self):
         response = test_utils_rfc8040.unmount_device("XPDR-A2")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
-    def test_33_xpdr_device_disconnected(self):
+    def test_43_xpdr_device_disconnected(self):
         response = test_utils_rfc8040.check_device_connection("XPDR-A2")
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
@@ -549,7 +730,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         self.assertEqual(response["connection-status"]["error-message"],
                          "Request could not be completed because the relevant data model content does not exist")
 
-    def test_34_xpdr_device_not_connected(self):
+    def test_44_xpdr_device_not_connected(self):
         response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
@@ -557,11 +738,11 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         self.assertEqual(response["node-info"]["error-message"],
                          "Request could not be completed because the relevant data model content does not exist")
 
-    def test_35_xpdr_device_disconnection(self):
+    def test_45_xpdr_device_disconnection(self):
         response = test_utils_rfc8040.unmount_device("XPDR-C2")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
-    def test_36_xpdr_device_disconnected(self):
+    def test_46_xpdr_device_disconnected(self):
         response = test_utils_rfc8040.check_device_connection("XPDR-C2")
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
@@ -569,7 +750,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         self.assertEqual(response["connection-status"]["error-message"],
                          "Request could not be completed because the relevant data model content does not exist")
 
-    def test_37_xpdr_device_not_connected(self):
+    def test_47_xpdr_device_not_connected(self):
         response = test_utils_rfc8040.get_portmapping_node_info("XPDR-C2")
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))