+#!/usr/bin/env python
+##############################################################################
+# Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
+import unittest
+import time
+import requests
+# pylint: disable=wrong-import-order
+import sys
+sys.path.append("transportpce_tests/common")
+# pylint: disable=wrong-import-position
+# pylint: disable=import-error
+import test_utils_rfc8040 # nopep8
+
+
+class TransportPCE400GPortMappingTesting(unittest.TestCase):
+
+ processes = None
+ NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR3-NETWORK1",
+ "supporting-port": "L1",
+ "supported-interface-capability": [
+ "org-openroadm-port-types:if-otsi-otsigroup"
+ ],
+ "port-direction": "bidirectional",
+ "port-qual": "xpdr-network",
+ "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
+ "xponder-type": "tpdr",
+ "port-admin-state": "InService",
+ "port-oper-state": "InService"}
+ CLIENT_CAPABILITIES = ["org-openroadm-port-types:if-OCH-OTU4-ODU4",
+ "org-openroadm-port-types:if-100GE"]
+ NODE_VERSION = "7.1"
+
+ @classmethod
+ def setUpClass(cls):
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([("xpdra2", cls.NODE_VERSION),
+ ("xpdrc2", cls.NODE_VERSION)])
+
+ @classmethod
+ def tearDownClass(cls):
+ # pylint: disable=not-an-iterable
+ for process in cls.processes:
+ test_utils_rfc8040.shutdown_process(process)
+ print("all processes killed")
+
+ def setUp(self):
+ # pylint: disable=consider-using-f-string
+ print("execution of {}".format(self.id().split(".")[-1]))
+ time.sleep(10)
+
+ def test_01_xpdr_device_connection(self):
+ response = test_utils_rfc8040.mount_device("XPDR-A2",
+ ("xpdra2", self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created,
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
+
+ def test_01a_xpdr_device_connection(self):
+ response = test_utils_rfc8040.mount_device("XPDR-C2",
+ ("xpdrc2", self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created,
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
+
+ # Check the correct capabilities for the client ports (if-100GE, if-100GE-ODU4,
+ # if-OCH-OTU4-ODU4)
+ def test_02_check_client_capabilities(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-CLIENT1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.assertEqual(
+ self.CLIENT_CAPABILITIES,
+ response["mapping"][0]["supported-interface-capability"])
+
+ def test_03_check_client_capabilities(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-CLIENT1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.assertEqual(
+ self.CLIENT_CAPABILITIES,
+ response["mapping"][0]["supported-interface-capability"])
+
+ def test_04_100g_ofec_service_path_create(self):
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ "transportpce-device-renderer", "service-path",
+ {
+ "service-name": "service_100GE_ofec",
+ "wave-number": "0",
+ "modulation-format": "dp-qpsk",
+ "operation": "create",
+ "nodes": [{
+ "node-id": "XPDR-A2",
+ "src-tp": "XPDR3-CLIENT1",
+ "dest-tp": "XPDR3-NETWORK1"
+ },
+ {
+ "node-id": "XPDR-C2",
+ "src-tp": "XPDR3-CLIENT1",
+ "dest-tp": "XPDR3-NETWORK1"
+ }],
+ "center-freq": 193.0,
+ "nmc-width": 37.5,
+ "min-freq": 192.975,
+ "max-freq": 193.025,
+ "lower-spectral-slot-number": 265,
+ "higher-spectral-slot-number": 272,
+ })
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
+ # node-interface is list which does not preserve the order
+ a_side = "XPDR-A2"
+ z_side = "XPDR-C2"
+ if response["output"]["node-interface"][0]["node-id"] == z_side:
+ a_side, z_side = z_side, a_side
+ self.assertEqual(
+ {"node-id": a_side,
+ "otu-interface-id": [
+ "XPDR3-NETWORK1-OTUC1"
+ ],
+ "odu-interface-id": [
+ "XPDR3-NETWORK1-ODUC1",
+ "XPDR3-NETWORK1-ODU4"
+ ],
+ "och-interface-id": [
+ "XPDR3-NETWORK1-265:272",
+ "XPDR3-NETWORK1-OTSIGROUP-100G"
+ ],
+ "eth-interface-id": [
+ "XPDR3-CLIENT1-ETHERNET"
+ ]},
+ response["output"]["node-interface"][0])
+ self.assertEqual(
+ {"node-id": z_side,
+ "otu-interface-id": [
+ "XPDR3-NETWORK1-OTUC1"
+ ],
+ "odu-interface-id": [
+ "XPDR3-NETWORK1-ODUC1",
+ "XPDR3-NETWORK1-ODU4"
+ ],
+ "och-interface-id": [
+ "XPDR3-NETWORK1-265:272",
+ "XPDR3-NETWORK1-OTSIGROUP-100G"
+ ],
+ "eth-interface-id": [
+ "XPDR3-CLIENT1-ETHERNET"
+ ]},
+ response["output"]["node-interface"][1])
+
+ def test_05_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-NETWORK1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
+ self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "FDvaQIf2Z08="
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response["mapping"])
+
+ def test_06_get_portmapping_network1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-NETWORK1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
+ self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "AJpkaVmZKJk5"
+ self.assertIn(
+ self.NETWORK2_CHECK_DICT,
+ response["mapping"])
+
+ def test_07_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
+ "type": "org-openroadm-interfaces:otsi",
+ "supporting-port": "L1"}
+ input_dict_2 = {
+ "frequency": 193.00000,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qpsk"}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
+ response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
+ response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
+
+ def test_08_check_interface_otsi(self):
+ # pylint: disable=line-too-long
+ response = test_utils_rfc8040.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
+ "type": "org-openroadm-interfaces:otsi",
+ "supporting-port": "L1"}
+ input_dict_2 = {
+ "frequency": 193.00000,
+ "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
+ "fec": "org-openroadm-common-types:ofec",
+ "transmit-power": -5,
+ "provision-mode": "explicit",
+ "modulation-format": "dp-qpsk"}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
+ response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
+ self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
+ response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
+
+ def test_09_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
+ ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
+ "type": "org-openroadm-interfaces:otsi-group",
+ "supporting-port": "L1"}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
+ response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
+
+ def test_10_check_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
+ ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
+ "type": "org-openroadm-interfaces:otsi-group",
+ "supporting-port": "L1"}
+ input_dict_2 = {"group-id": 1,
+ "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
+ response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
+
+ def test_11_check_interface_otuc1(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-OTUC1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
+ ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
+ "type": "org-openroadm-interfaces:otnOtu",
+ "supporting-port": "L1"}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 1,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
+ response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
+
+ def test_12_check_interface_otuc1(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-OTUC1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
+ ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
+ "type": "org-openroadm-interfaces:otnOtu",
+ "supporting-port": "L1"}
+ input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
+ "degthr-percentage": 100,
+ "tim-detect-mode": "Disabled",
+ "otucn-n-rate": 1,
+ "degm-intervals": 2}
+
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
+ response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
+
+ def test_13_check_interface_oduc1(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
+ "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
+ "type": "org-openroadm-interfaces:otnOdu",
+ "supporting-port": "L1"}
+
+ input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
+ "rate": "org-openroadm-otn-common-types:ODUCn",
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 1
+ }
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
+ self.assertDictEqual(
+ {"payload-type": "22", "exp-payload-type": "22"},
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
+
+ def test_14_check_interface_oduc1(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+
+ input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
+ "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
+ "type": "org-openroadm-interfaces:otnOdu",
+ "supporting-port": "L1"}
+
+ input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
+ "rate": "org-openroadm-otn-common-types:ODUCn",
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated",
+ "oducn-n-rate": 1
+ }
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
+ self.assertDictEqual(
+ {"payload-type": "22", "exp-payload-type": "22"},
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
+
+ def test_15_check_interface_odu4(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+
+ input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
+ "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
+ "type": "org-openroadm-interfaces:otnOdu",
+ "supporting-port": "L1"}
+
+ input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
+ "rate": "org-openroadm-otn-common-types:ODU4",
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated"
+ }
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
+ self.assertDictEqual(
+ {"payload-type": "07", "exp-payload-type": "07"},
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
+
+ # TODO: Check the trib-port numbering
+
+ def test_16_check_interface_odu4(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+
+ input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
+ "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
+ "type": "org-openroadm-interfaces:otnOdu",
+ "supporting-port": "L1"}
+
+ input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
+ "rate": "org-openroadm-otn-common-types:ODU4",
+ "degm-intervals": 2,
+ "degthr-percentage": 100,
+ "monitoring-mode": "terminated"
+ }
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
+ self.assertDictEqual(
+ {"payload-type": "07", "exp-payload-type": "07"},
+ response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
+
+ # TODO: Check the trib-port numbering
+
+ def test_17_check_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
+ "type": "org-openroadm-interfaces:ethernetCsmacd",
+ "supporting-port": "C1"
+ }
+ input_dict_2 = {"speed": 100000,
+ "fec": "org-openroadm-common-types:off"}
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
+ response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
+
+ def test_18_check_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
+ "administrative-state": "inService",
+ "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
+ "type": "org-openroadm-interfaces:ethernetCsmacd",
+ "supporting-port": "C1"
+ }
+ input_dict_2 = {"speed": 100000,
+ "fec": "org-openroadm-common-types:off"}
+ self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
+ response["interface"][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
+ response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
+
+ # Delete the service path
+ def test_19_service_path_delete_100ge(self):
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ "transportpce-device-renderer", "service-path",
+ {
+ "service-name": "service_100GE_ofec",
+ "wave-number": "0",
+ "modulation-format": "dp-qpsk",
+ "operation": "delete",
+ "nodes": [{
+ "node-id": "XPDR-A2",
+ "src-tp": "XPDR3-CLIENT1",
+ "dest-tp": "XPDR3-NETWORK1"
+ },
+ {
+ "node-id": "XPDR-C2",
+ "src-tp": "XPDR3-CLIENT1",
+ "dest-tp": "XPDR3-NETWORK1"
+ }],
+ "center-freq": 193.0,
+ "nmc-width": 37.5,
+ "min-freq": 192.975,
+ "max-freq": 193.025,
+ "lower-spectral-slot-number": 265,
+ "higher-spectral-slot-number": 272,
+ })
+ self.assertEqual(response["status_code"], requests.codes.ok)
+ self.assertIn("Request processed", response["output"]["result"])
+
+ def test_20_check_no_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_21_check_no_interface_100ge_client(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_22_check_no_interface_odu4(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_23_check_no_interface_odu4(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_24_check_no_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_25_check_no_interface_otuc2(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ # Check if port-mapping data is updated, where the supporting-otucn is deleted
+ def test_26_check_no_otuc1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-NETWORK1")
+ self.assertRaises(KeyError, lambda: response["supporting-otucn"])
+
+ def test_27_check_no_otuc1(self):
+ response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-NETWORK1")
+ self.assertRaises(KeyError, lambda: response["supporting-otucn"])
+
+ def test_28_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_29_check_no_interface_otsig(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_30_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ def test_31_check_no_interface_otsi(self):
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+
+ # Disconnect the XPDR
+ def test_32_xpdr_device_disconnection(self):
+ response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
+
+ def test_33_xpdr_device_disconnected(self):
+ response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+ self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
+ self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
+ self.assertEqual(response["connection-status"]["error-message"],
+ "Request could not be completed because the relevant data model content does not exist")
+
+ def test_34_xpdr_device_not_connected(self):
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+ self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
+ self.assertEqual(response["node-info"]["error-tag"], "data-missing")
+ self.assertEqual(response["node-info"]["error-message"],
+ "Request could not be completed because the relevant data model content does not exist")
+
+ def test_35_xpdr_device_disconnection(self):
+ response = test_utils_rfc8040.unmount_device("XPDR-C2")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
+
+ def test_36_xpdr_device_disconnected(self):
+ response = test_utils_rfc8040.check_device_connection("XPDR-C2")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+ self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
+ self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
+ self.assertEqual(response["connection-status"]["error-message"],
+ "Request could not be completed because the relevant data model content does not exist")
+
+ def test_37_xpdr_device_not_connected(self):
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-C2")
+ self.assertEqual(response["status_code"], requests.codes.conflict)
+ self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
+ self.assertEqual(response["node-info"]["error-tag"], "data-missing")
+ self.assertEqual(response["node-info"]["error-message"],
+ "Request could not be completed because the relevant data model content does not exist")
+
+
+if __name__ == "__main__":
+ unittest.main(verbosity=2)