"XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
self.assertEqual(response["status_code"], requests.codes.conflict)
+ # 200G 31.6 Gbaud mode for muxponder, with qam16
+ def test_32_service_path_create_otuc2(self):
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ "transportpce-device-renderer", "service-path",
+ {
+ "service-name": "service_OTUC2",
+ "wave-number": "0",
+ "modulation-format": "dp-qam16",
+ "operation": "create",
+ "nodes": [{
+ "node-id": "XPDR-A2",
+ "dest-tp": "XPDR2-NETWORK1"
+ },
+ {
+ "node-id": "XPDR-C2",
+ "dest-tp": "XPDR2-NETWORK1"
+ }],
+ "center-freq": 193.0,
+ "nmc-width": 37.5,
+ "min-freq": 192.975,
+ "max-freq": 193.025,
+ "lower-spectral-slot-number": 265,
+ "higher-spectral-slot-number": 272,
+ })
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
+ # node-interface is list which does not preserve the order
+ a_side = "XPDR-A2"
+ z_side = "XPDR-C2"
+ if response["output"]["node-interface"][0]["node-id"] == z_side:
+ a_side, z_side = z_side, a_side
+ self.assertEqual(
+ {"node-id": a_side,
+ "otu-interface-id": [
+ "XPDR2-NETWORK1-OTUC2"
+ ],
+ "och-interface-id": [
+ "XPDR2-NETWORK1-OTSIGROUP-200G",
+ "XPDR2-NETWORK1-265:272"
+ ]},
+ response["output"]["node-interface"][0])
+ self.assertEqual(
+ {"node-id": z_side,
+ "otu-interface-id": [
+ "XPDR2-NETWORK1-OTUC2"
+ ],
+ "och-interface-id": [
+ "XPDR2-NETWORK1-OTSIGROUP-200G",
+ "XPDR2-NETWORK1-265:272"
+ ]},
+ response["output"]["node-interface"][1])
+ # Update the network dict variable for mpdr
+ self.NETWORK2_CHECK_DICT["logical-connection-point"] = "XPDR2-NETWORK1"
+ self.NETWORK2_CHECK_DICT["supporting-circuit-pack-name"] = "1/2/2-PLUG-NET"
+ self.NETWORK2_CHECK_DICT["port-qual"] = "switch-network"
+ self.NETWORK2_CHECK_DICT["xponder-type"] = "mpdr"
+ self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw="
+
+ def test_33_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response["mapping"])
+
+ def test_34_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR2-NETWORK1-265:272",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+ "type": "org-openroadm-interfaces:otsi",
+ "supporting-port": "L1"}
+ input_dict_2 = {
+ "frequency": 193.00000,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qam16"}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
+ response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.8", "iid": [1, 2]},
+ response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
+
+ def test_35_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+ ["supporting-interface-list"][0]: "XPDR2-NETWORK1-265:272",
+ "type": "org-openroadm-interfaces:otsi-group",
+ "supporting-port": "L1"}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
+ response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
+
+ def test_36_check_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
+ ["supporting-interface-list"][0]: "XPDR2-NETWORK1-OTSIGROUP-200G",
+ "type": "org-openroadm-interfaces:otnOtu",
+ "supporting-port": "L1"}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 2,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
+ response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
+
+ # We do not need other test cases for the otn-service-path (ODUC2 and Ethernet)
+ # as there is no change in code and are covered in test02_otn_renderer
+
+ def test_37_service_path_delete_otuc2(self):
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ "transportpce-device-renderer", "service-path",
+ {
+ "modulation-format": "dp-qam16",
+ "operation": "delete",
+ "service-name": "service_OTUC2",
+ "wave-number": "0",
+ "center-freq": 193.0,
+ "nmc-width": 37.5,
+ "min-freq": 192.975,
+ "max-freq": 193.025,
+ "lower-spectral-slot-number": 265,
+ "higher-spectral-slot-number": 272,
+ "nodes": [
+ {
+ "node-id": "XPDR-A2",
+ "dest-tp": "XPDR2-NETWORK1"
+ },
+ {
+ "node-id": "XPDR-C2",
+ "dest-tp": "XPDR2-NETWORK1"
+ }
+ ]
+ })
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.assertIn("Request processed", response["output"]["result"])
+ del self.NETWORK2_CHECK_DICT["supporting-otucn"]
+
+ def test_38_check_no_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_39_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_40_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_41_check_no_otuc2(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertRaises(KeyError, lambda: response["supporting-otucn"])
+
# Disconnect the XPDR
- def test_32_xpdr_device_disconnection(self):
+ def test_42_xpdr_device_disconnection(self):
response = test_utils_rfc8040.unmount_device("XPDR-A2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
- def test_33_xpdr_device_disconnected(self):
+ def test_43_xpdr_device_disconnected(self):
response = test_utils_rfc8040.check_device_connection("XPDR-A2")
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
self.assertEqual(response["connection-status"]["error-message"],
"Request could not be completed because the relevant data model content does not exist")
- def test_34_xpdr_device_not_connected(self):
+ def test_44_xpdr_device_not_connected(self):
response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
self.assertEqual(response["node-info"]["error-message"],
"Request could not be completed because the relevant data model content does not exist")
- def test_35_xpdr_device_disconnection(self):
+ def test_45_xpdr_device_disconnection(self):
response = test_utils_rfc8040.unmount_device("XPDR-C2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
- def test_36_xpdr_device_disconnected(self):
+ def test_46_xpdr_device_disconnected(self):
response = test_utils_rfc8040.check_device_connection("XPDR-C2")
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
self.assertEqual(response["connection-status"]["error-message"],
"Request could not be completed because the relevant data model content does not exist")
- def test_37_xpdr_device_not_connected(self):
+ def test_47_xpdr_device_not_connected(self):
response = test_utils_rfc8040.get_portmapping_node_info("XPDR-C2")
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))