Update release in docs/conf.yaml
[transportpce.git] / tests / transportpce_tests / 7.1 / test04_renderer_regen_mode.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2023 AT&T, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13
14 # pylint: disable=wrong-import-order
15 import sys
16 import time
17 import unittest
18
19 import requests
20
21 sys.path.append("transportpce_tests/common")
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils  # nopep8
25
26
27 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28
29     processes = None
30     NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR4-NETWORK1",
31                            "supporting-port": "L1",
32                            "supported-interface-capability": [
33                                "org-openroadm-port-types:if-otsi-otsigroup"
34                            ],
35                            "port-direction": "bidirectional",
36                            "port-qual": "xpdr-network",
37                            "supporting-circuit-pack-name": "1/1/6-PLUG-NET",
38                            "xpdr-type": "regen",
39                            "port-admin-state": "InService",
40                            "port-oper-state": "InService",
41                            "regen-profiles": {
42                                "regen-profile": [
43                                    "OTUC4-REGEN",
44                                    "OTUC2-REGEN",
45                                    "OTUC3-REGEN"
46                                ]
47                            }
48                            }
49     SUBSET_NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR4-NETWORK1",
50                                   "supporting-port": "L1",
51                                   "port-direction": "bidirectional",
52                                   "port-qual": "xpdr-network",
53                                   "supporting-circuit-pack-name": "1/1/6-PLUG-NET",
54                                   "xpdr-type": "regen",
55                                   "port-admin-state": "InService",
56                                   "port-oper-state": "InService"
57                                   }
58     REGEN_CAPABILITIES = ["OTUC2-REGEN",
59                           "OTUC3-REGEN",
60                           "OTUC4-REGEN",
61                           ]
62     SUP_INT_CAPA = ["org-openroadm-port-types:if-otsi-otsigroup"]
63     NODE_VERSION = "7.1"
64
65     @classmethod
66     def setUpClass(cls):
67         cls.processes = test_utils.start_tpce()
68         cls.processes = test_utils.start_sims([("xpdra2", cls.NODE_VERSION)])
69
70     @classmethod
71     def tearDownClass(cls):
72         # pylint: disable=not-an-iterable
73         for process in cls.processes:
74             test_utils.shutdown_process(process)
75         print("all processes killed")
76
77     def setUp(self):
78         # pylint: disable=consider-using-f-string
79         print("execution of {}".format(self.id().split(".")[-1]))
80         time.sleep(2)
81
82     def test_01_xpdr_device_connection(self):
83         response = test_utils.mount_device("XPDR-A2",
84                                            ("xpdra2", self.NODE_VERSION))
85         self.assertEqual(response.status_code, requests.codes.created,
86                          test_utils.CODE_SHOULD_BE_201)
87     # Check the xpdr-type as regen
88
89     def test_02_check_xpdr_type(self):
90         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
91         self.assertEqual(response["status_code"], requests.codes.ok)
92         self.assertEqual(
93             "regen", response["mapping"][0]["xpdr-type"])
94         self.assertEqual(
95             self.REGEN_CAPABILITIES,
96             sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]))
97
98     # Check the xpdr-type as regen
99
100     def test_03_check_xpdr_type(self):
101         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK2")
102         self.assertEqual(response["status_code"], requests.codes.ok)
103         self.assertEqual(
104             "regen", response["mapping"][0]["xpdr-type"])
105         self.assertEqual(
106             self.REGEN_CAPABILITIES,
107             sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]))
108
109     def test_04_400g_regen_service_path_create(self):
110         response = test_utils.transportpce_api_rpc_request(
111             "transportpce-device-renderer", "service-path",
112             {
113                 "service-name": "service_400g_regen",
114                 "wave-number": "0",
115                 "modulation-format": "dp-qam16",
116                 "operation": "create",
117                 "nodes": [
118                     {
119                         "node-id": "XPDR-A2",
120                         "src-tp": "XPDR4-NETWORK1",
121                         "dest-tp": "XPDR4-NETWORK2"
122                     }
123                 ],
124                 "center-freq": 195.0,
125                 "nmc-width": 75,
126                 "mc-width": 87.5,
127                 "min-freq": 194.95625,
128                 "max-freq": 195.04375,
129                 "lower-spectral-slot-number": 582,
130                 "higher-spectral-slot-number": 595,
131             })
132         self.assertEqual(response["status_code"], requests.codes.ok)
133         self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
134
135         self.assertEqual(
136             {"node-id": "XPDR-A2",
137              "otu-interface-id": [
138                  "XPDR4-NETWORK1-OTUC4",
139                  "XPDR4-NETWORK2-OTUC4"
140              ],
141              "odu-interface-id": [
142                  "XPDR4-NETWORK1-ODUC4",
143                  "XPDR4-NETWORK2-ODUC4"
144              ],
145              "och-interface-id": [
146                  "XPDR4-NETWORK1-582:595",
147                  "XPDR4-NETWORK1-OTSIGROUP-400G",
148                  "XPDR4-NETWORK2-582:595",
149                  "XPDR4-NETWORK2-OTSIGROUP-400G",
150              ]},
151             {x: (sorted(response["output"]["node-interface"][0][x])
152                  if isinstance(response["output"]["node-interface"][0][x], list)
153                  else response["output"]["node-interface"][0][x])
154              for x in response["output"]["node-interface"][0].keys()})
155
156     def test_05_get_portmapping_network1(self):
157         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
158         self.assertEqual(response["status_code"], requests.codes.ok)
159         self.SUBSET_NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR4-NETWORK1-OTUC4"
160         self.SUBSET_NETWORK1_CHECK_DICT["lcp-hash-val"] = "AKkLPpWaa8x+"
161         self.SUBSET_NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR4-NETWORK2"
162         subset = {k: v for k, v in response["mapping"][0].items() if k in self.SUBSET_NETWORK1_CHECK_DICT}
163         self.assertDictEqual(subset, self.SUBSET_NETWORK1_CHECK_DICT)
164         self.assertEqual(sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]),
165                          self.REGEN_CAPABILITIES)
166         self.assertEqual(sorted(response["mapping"][0]["supported-interface-capability"]),
167                          self.SUP_INT_CAPA)
168
169     def test_06_get_portmapping_network1(self):
170         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK2")
171         self.assertEqual(response["status_code"], requests.codes.ok)
172         self.SUBSET_NETWORK1_CHECK_DICT["logical-connection-point"] = "XPDR4-NETWORK2"
173         self.SUBSET_NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR4-NETWORK2-OTUC4"
174         self.SUBSET_NETWORK1_CHECK_DICT["lcp-hash-val"] = "AKkLPpWaa8x9"
175         self.SUBSET_NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR4-NETWORK1"
176         self.SUBSET_NETWORK1_CHECK_DICT["supporting-circuit-pack-name"] = "1/1/5-PLUG-NET"
177         subset = {k: v for k, v in response["mapping"][0].items() if k in self.SUBSET_NETWORK1_CHECK_DICT}
178         self.assertDictEqual(subset, self.SUBSET_NETWORK1_CHECK_DICT)
179         self.assertEqual(sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]),
180                          self.REGEN_CAPABILITIES)
181         self.assertEqual(sorted(response["mapping"][0]["supported-interface-capability"]),
182                          self.SUP_INT_CAPA)
183
184     def test_07_check_interface_oduc4(self):
185         response = test_utils.check_node_attribute_request(
186             "XPDR-A2", "interface", "XPDR4-NETWORK1-ODUC4")
187         self.assertEqual(response["status_code"], requests.codes.ok)
188
189         input_dict_1 = {"name": "XPDR4-NETWORK1-ODUC4",
190                         "administrative-state": "inService",
191                         "supporting-circuit-pack-name": "1/1/6-PLUG_NET",
192                         "supporting-interface-list": "XPDR4-NETWORK1-OTUC4",
193                         "type": "org-openroadm-interfaces:otnOdu",
194                         "supporting-port": "L1"}
195
196         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-CTP",
197                         "rate": "org-openroadm-otn-common-types:ODUCn",
198                         "degm-intervals": 2,
199                         "degthr-percentage": 100,
200                         "monitoring-mode": "not-terminated"
201                         }
202         self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
203                              response["interface"][0])
204         self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
205                              response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
206
207     def test_08_check_interface_oduc4(self):
208         response = test_utils.check_node_attribute_request(
209             "XPDR-A2", "interface", "XPDR4-NETWORK2-ODUC4")
210         self.assertEqual(response["status_code"], requests.codes.ok)
211
212         input_dict_1 = {"name": "XPDR4-NETWORK2-ODUC4",
213                         "administrative-state": "inService",
214                         "supporting-circuit-pack-name": "1/1/5-PLUG_NET",
215                         "supporting-interface-list": "XPDR4-NETWORK2-ODUC4",
216                         "type": "org-openroadm-interfaces:otnOdu",
217                         "supporting-port": "L1"}
218
219         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-CTP",
220                         "rate": "org-openroadm-otn-common-types:ODUCn",
221                         "degm-intervals": 2,
222                         "degthr-percentage": 100,
223                         "monitoring-mode": "not-terminated"
224                         }
225         self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
226                              response["interface"][0])
227         self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
228                              response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
229
230     # Delete the service path
231
232     def test_09_service_path_delete_regen(self):
233         response = test_utils.transportpce_api_rpc_request(
234             "transportpce-device-renderer", "service-path",
235             {
236                 "modulation-format": "dp-qam16",
237                 "operation": "delete",
238                 "service-name": "test1_regen",
239                 "wave-number": "0",
240                 "center-freq": 195.0,
241                 "nmc-width": 75,
242                 "mc-width": 87.5,
243                 "min-freq": 194.95625,
244                 "max-freq": 195.04375,
245                 "lower-spectral-slot-number": 582,
246                 "higher-spectral-slot-number": 595,
247                 "nodes": [
248                     {
249                         "node-id": "XPDR-A2",
250                         "src-tp": "XPDR4-NETWORK1",
251                         "dest-tp": "XPDR4-NETWORK2"
252                     }
253                 ]
254             })
255         self.assertEqual(response["status_code"], requests.codes.ok)
256         self.assertIn("Request processed", response["output"]["result"])
257
258     def test_10_check_no_interface_oduc4(self):
259         response = test_utils.check_node_attribute_request(
260             "XPDR-A2", "interface", "XPDR4-NETWORK1-ODUC4")
261         self.assertEqual(response["status_code"], requests.codes.conflict)
262
263     def test_11_check_no_interface_oduc4(self):
264         response = test_utils.check_node_attribute_request(
265             "XPDR-A2", "interface", "XPDR4-NETWORK2-ODUC4")
266         self.assertEqual(response["status_code"], requests.codes.conflict)
267
268     def test_12_check_no_interface_otuc4(self):
269         response = test_utils.check_node_attribute_request(
270             "XPDR-A2", "interface", "XPDR4-NETWORK1-OTUC1")
271         self.assertEqual(response["status_code"], requests.codes.conflict)
272
273     def test_13_check_no_interface_otuc4(self):
274         response = test_utils.check_node_attribute_request(
275             "XPDR-A2", "interface", "XPDR4-NETWORK2-OTUC1")
276         self.assertEqual(response["status_code"], requests.codes.conflict)
277
278     # Check if port-mapping data is updated, where the supporting-otucn is deleted
279     def test_14_check_no_otuc4(self):
280         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
281         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
282
283     def test_15_check_no_otuc4(self):
284         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
285         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
286
287     def test_16_check_no_interface_otsig(self):
288         response = test_utils.check_node_attribute_request(
289             "XPDR-A2", "interface", "XPDR4-NETWORK1-OTSIGROUP-400G")
290         self.assertEqual(response["status_code"], requests.codes.conflict)
291
292     def test_17_check_no_interface_otsig(self):
293         response = test_utils.check_node_attribute_request(
294             "XPDR-A2", "interface", "XPDR4-NETWORK2-OTSIGROUP-400G")
295         self.assertEqual(response["status_code"], requests.codes.conflict)
296
297     def test_18_check_no_interface_otsi(self):
298         response = test_utils.check_node_attribute_request(
299             "XPDR-A2", "interface", "XPDR4-NETWORK1-582:595")
300         self.assertEqual(response["status_code"], requests.codes.conflict)
301
302     def test_19_check_no_interface_otsi(self):
303         response = test_utils.check_node_attribute_request(
304             "XPDR-A2", "interface", "XPDR4-NETWORK2-582:595")
305         self.assertEqual(response["status_code"], requests.codes.conflict)
306
307     # Disconnect the XPDR
308     def test_20_xpdr_device_disconnection(self):
309         response = test_utils.unmount_device("XPDR-A2")
310         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
311
312     def test_21_xpdr_device_disconnected(self):
313         response = test_utils.check_device_connection("XPDR-A2")
314         self.assertEqual(response["status_code"], requests.codes.conflict)
315         self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
316         self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
317         self.assertEqual(response["connection-status"]["error-message"],
318                          "Request could not be completed because the relevant data model content does not exist")
319
320     def test_22_xpdr_device_not_connected(self):
321         response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None)
322         self.assertEqual(response["status_code"], requests.codes.conflict)
323         self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
324         self.assertEqual(response["node-info"]["error-tag"], "data-missing")
325         self.assertEqual(response["node-info"]["error-message"],
326                          "Request could not be completed because the relevant data model content does not exist")
327
328
329 if __name__ == "__main__":
330     unittest.main(verbosity=2)