2 ##############################################################################
3 # Copyright (c) 2023 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
14 # pylint: disable=wrong-import-order
21 sys.path.append("transportpce_tests/common")
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 class TransportPCE400GPortMappingTesting(unittest.TestCase):
30 NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR4-NETWORK1",
31 "supporting-port": "L1",
32 "supported-interface-capability": [
33 "org-openroadm-port-types:if-otsi-otsigroup"
35 "port-direction": "bidirectional",
36 "port-qual": "xpdr-network",
37 "supporting-circuit-pack-name": "1/1/6-PLUG-NET",
39 "port-admin-state": "InService",
40 "port-oper-state": "InService",
49 SUBSET_NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR4-NETWORK1",
50 "supporting-port": "L1",
51 "port-direction": "bidirectional",
52 "port-qual": "xpdr-network",
53 "supporting-circuit-pack-name": "1/1/6-PLUG-NET",
55 "port-admin-state": "InService",
56 "port-oper-state": "InService"
58 REGEN_CAPABILITIES = ["OTUC2-REGEN",
62 SUP_INT_CAPA = ["org-openroadm-port-types:if-otsi-otsigroup"]
67 cls.processes = test_utils.start_tpce()
68 cls.processes = test_utils.start_sims([("xpdra2", cls.NODE_VERSION)])
71 def tearDownClass(cls):
72 # pylint: disable=not-an-iterable
73 for process in cls.processes:
74 test_utils.shutdown_process(process)
75 print("all processes killed")
78 # pylint: disable=consider-using-f-string
79 print("execution of {}".format(self.id().split(".")[-1]))
82 def test_01_xpdr_device_connection(self):
83 response = test_utils.mount_device("XPDR-A2",
84 ("xpdra2", self.NODE_VERSION))
85 self.assertEqual(response.status_code, requests.codes.created,
86 test_utils.CODE_SHOULD_BE_201)
87 # Check the xpdr-type as regen
89 def test_02_check_xpdr_type(self):
90 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
91 self.assertEqual(response["status_code"], requests.codes.ok)
93 "regen", response["mapping"][0]["xpdr-type"])
95 self.REGEN_CAPABILITIES,
96 sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]))
98 # Check the xpdr-type as regen
100 def test_03_check_xpdr_type(self):
101 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK2")
102 self.assertEqual(response["status_code"], requests.codes.ok)
104 "regen", response["mapping"][0]["xpdr-type"])
106 self.REGEN_CAPABILITIES,
107 sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]))
109 def test_04_400g_regen_service_path_create(self):
110 response = test_utils.transportpce_api_rpc_request(
111 "transportpce-device-renderer", "service-path",
113 "service-name": "service_400g_regen",
115 "modulation-format": "dp-qam16",
116 "operation": "create",
119 "node-id": "XPDR-A2",
120 "src-tp": "XPDR4-NETWORK1",
121 "dest-tp": "XPDR4-NETWORK2"
124 "center-freq": 195.0,
127 "min-freq": 194.95625,
128 "max-freq": 195.04375,
129 "lower-spectral-slot-number": 582,
130 "higher-spectral-slot-number": 595,
132 self.assertEqual(response["status_code"], requests.codes.ok)
133 self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
136 {"node-id": "XPDR-A2",
137 "otu-interface-id": [
138 "XPDR4-NETWORK1-OTUC4",
139 "XPDR4-NETWORK2-OTUC4"
141 "odu-interface-id": [
142 "XPDR4-NETWORK1-ODUC4",
143 "XPDR4-NETWORK2-ODUC4"
145 "och-interface-id": [
146 "XPDR4-NETWORK1-582:595",
147 "XPDR4-NETWORK1-OTSIGROUP-400G",
148 "XPDR4-NETWORK2-582:595",
149 "XPDR4-NETWORK2-OTSIGROUP-400G",
151 {x: (sorted(response["output"]["node-interface"][0][x])
152 if isinstance(response["output"]["node-interface"][0][x], list)
153 else response["output"]["node-interface"][0][x])
154 for x in response["output"]["node-interface"][0].keys()})
156 def test_05_get_portmapping_network1(self):
157 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
158 self.assertEqual(response["status_code"], requests.codes.ok)
159 self.SUBSET_NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR4-NETWORK1-OTUC4"
160 self.SUBSET_NETWORK1_CHECK_DICT["lcp-hash-val"] = "AKkLPpWaa8x+"
161 self.SUBSET_NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR4-NETWORK2"
162 subset = {k: v for k, v in response["mapping"][0].items() if k in self.SUBSET_NETWORK1_CHECK_DICT}
163 self.assertDictEqual(subset, self.SUBSET_NETWORK1_CHECK_DICT)
164 self.assertEqual(sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]),
165 self.REGEN_CAPABILITIES)
166 self.assertEqual(sorted(response["mapping"][0]["supported-interface-capability"]),
169 def test_06_get_portmapping_network1(self):
170 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK2")
171 self.assertEqual(response["status_code"], requests.codes.ok)
172 self.SUBSET_NETWORK1_CHECK_DICT["logical-connection-point"] = "XPDR4-NETWORK2"
173 self.SUBSET_NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR4-NETWORK2-OTUC4"
174 self.SUBSET_NETWORK1_CHECK_DICT["lcp-hash-val"] = "AKkLPpWaa8x9"
175 self.SUBSET_NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR4-NETWORK1"
176 self.SUBSET_NETWORK1_CHECK_DICT["supporting-circuit-pack-name"] = "1/1/5-PLUG-NET"
177 subset = {k: v for k, v in response["mapping"][0].items() if k in self.SUBSET_NETWORK1_CHECK_DICT}
178 self.assertDictEqual(subset, self.SUBSET_NETWORK1_CHECK_DICT)
179 self.assertEqual(sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]),
180 self.REGEN_CAPABILITIES)
181 self.assertEqual(sorted(response["mapping"][0]["supported-interface-capability"]),
184 def test_07_check_interface_oduc4(self):
185 response = test_utils.check_node_attribute_request(
186 "XPDR-A2", "interface", "XPDR4-NETWORK1-ODUC4")
187 self.assertEqual(response["status_code"], requests.codes.ok)
189 input_dict_1 = {"name": "XPDR4-NETWORK1-ODUC4",
190 "administrative-state": "inService",
191 "supporting-circuit-pack-name": "1/1/6-PLUG_NET",
192 "supporting-interface-list": "XPDR4-NETWORK1-OTUC4",
193 "type": "org-openroadm-interfaces:otnOdu",
194 "supporting-port": "L1"}
196 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-CTP",
197 "rate": "org-openroadm-otn-common-types:ODUCn",
199 "degthr-percentage": 100,
200 "monitoring-mode": "not-terminated"
202 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
203 response["interface"][0])
204 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
205 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
207 def test_08_check_interface_oduc4(self):
208 response = test_utils.check_node_attribute_request(
209 "XPDR-A2", "interface", "XPDR4-NETWORK2-ODUC4")
210 self.assertEqual(response["status_code"], requests.codes.ok)
212 input_dict_1 = {"name": "XPDR4-NETWORK2-ODUC4",
213 "administrative-state": "inService",
214 "supporting-circuit-pack-name": "1/1/5-PLUG_NET",
215 "supporting-interface-list": "XPDR4-NETWORK2-ODUC4",
216 "type": "org-openroadm-interfaces:otnOdu",
217 "supporting-port": "L1"}
219 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-CTP",
220 "rate": "org-openroadm-otn-common-types:ODUCn",
222 "degthr-percentage": 100,
223 "monitoring-mode": "not-terminated"
225 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
226 response["interface"][0])
227 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
228 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
230 # Delete the service path
232 def test_09_service_path_delete_regen(self):
233 response = test_utils.transportpce_api_rpc_request(
234 "transportpce-device-renderer", "service-path",
236 "modulation-format": "dp-qam16",
237 "operation": "delete",
238 "service-name": "test1_regen",
240 "center-freq": 195.0,
243 "min-freq": 194.95625,
244 "max-freq": 195.04375,
245 "lower-spectral-slot-number": 582,
246 "higher-spectral-slot-number": 595,
249 "node-id": "XPDR-A2",
250 "src-tp": "XPDR4-NETWORK1",
251 "dest-tp": "XPDR4-NETWORK2"
255 self.assertEqual(response["status_code"], requests.codes.ok)
256 self.assertIn("Request processed", response["output"]["result"])
258 def test_10_check_no_interface_oduc4(self):
259 response = test_utils.check_node_attribute_request(
260 "XPDR-A2", "interface", "XPDR4-NETWORK1-ODUC4")
261 self.assertEqual(response["status_code"], requests.codes.conflict)
263 def test_11_check_no_interface_oduc4(self):
264 response = test_utils.check_node_attribute_request(
265 "XPDR-A2", "interface", "XPDR4-NETWORK2-ODUC4")
266 self.assertEqual(response["status_code"], requests.codes.conflict)
268 def test_12_check_no_interface_otuc4(self):
269 response = test_utils.check_node_attribute_request(
270 "XPDR-A2", "interface", "XPDR4-NETWORK1-OTUC1")
271 self.assertEqual(response["status_code"], requests.codes.conflict)
273 def test_13_check_no_interface_otuc4(self):
274 response = test_utils.check_node_attribute_request(
275 "XPDR-A2", "interface", "XPDR4-NETWORK2-OTUC1")
276 self.assertEqual(response["status_code"], requests.codes.conflict)
278 # Check if port-mapping data is updated, where the supporting-otucn is deleted
279 def test_14_check_no_otuc4(self):
280 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
281 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
283 def test_15_check_no_otuc4(self):
284 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
285 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
287 def test_16_check_no_interface_otsig(self):
288 response = test_utils.check_node_attribute_request(
289 "XPDR-A2", "interface", "XPDR4-NETWORK1-OTSIGROUP-400G")
290 self.assertEqual(response["status_code"], requests.codes.conflict)
292 def test_17_check_no_interface_otsig(self):
293 response = test_utils.check_node_attribute_request(
294 "XPDR-A2", "interface", "XPDR4-NETWORK2-OTSIGROUP-400G")
295 self.assertEqual(response["status_code"], requests.codes.conflict)
297 def test_18_check_no_interface_otsi(self):
298 response = test_utils.check_node_attribute_request(
299 "XPDR-A2", "interface", "XPDR4-NETWORK1-582:595")
300 self.assertEqual(response["status_code"], requests.codes.conflict)
302 def test_19_check_no_interface_otsi(self):
303 response = test_utils.check_node_attribute_request(
304 "XPDR-A2", "interface", "XPDR4-NETWORK2-582:595")
305 self.assertEqual(response["status_code"], requests.codes.conflict)
307 # Disconnect the XPDR
308 def test_20_xpdr_device_disconnection(self):
309 response = test_utils.unmount_device("XPDR-A2")
310 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
312 def test_21_xpdr_device_disconnected(self):
313 response = test_utils.check_device_connection("XPDR-A2")
314 self.assertEqual(response["status_code"], requests.codes.conflict)
315 self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
316 self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
317 self.assertEqual(response["connection-status"]["error-message"],
318 "Request could not be completed because the relevant data model content does not exist")
320 def test_22_xpdr_device_not_connected(self):
321 response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None)
322 self.assertEqual(response["status_code"], requests.codes.conflict)
323 self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
324 self.assertEqual(response["node-info"]["error-tag"], "data-missing")
325 self.assertEqual(response["node-info"]["error-message"],
326 "Request could not be completed because the relevant data model content does not exist")
329 if __name__ == "__main__":
330 unittest.main(verbosity=2)