Add functional tests for regenerator type
[transportpce.git] / tests / transportpce_tests / 7.1 / test04_renderer_regen_mode.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2023 AT&T, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13
14 # pylint: disable=wrong-import-order
15 import sys
16 import time
17 import unittest
18
19 import requests
20
21 sys.path.append("transportpce_tests/common")
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils  # nopep8
25
26
27 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28
29     processes = None
30     NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR4-NETWORK1",
31                            "supporting-port": "L1",
32                            "supported-interface-capability": [
33                                "org-openroadm-port-types:if-otsi-otsigroup"
34                            ],
35                            "port-direction": "bidirectional",
36                            "port-qual": "xpdr-network",
37                            "supporting-circuit-pack-name": "1/1/6-PLUG-NET",
38                            "xpdr-type": "regen",
39                            "port-admin-state": "InService",
40                            "port-oper-state": "InService",
41                            "regen-profiles": {
42                                "regen-profile": [
43                                    "OTUC4-REGEN",
44                                    "OTUC2-REGEN",
45                                    "OTUC3-REGEN"
46                                ]
47                            }
48                            }
49     REGEN_CAPABILITIES = ["OTUC2-REGEN",
50                           "OTUC3-REGEN",
51                           "OTUC4-REGEN",
52                           ]
53     NODE_VERSION = "7.1"
54
55     @classmethod
56     def setUpClass(cls):
57         cls.processes = test_utils.start_tpce()
58         cls.processes = test_utils.start_sims([("xpdra2", cls.NODE_VERSION)])
59
60     @classmethod
61     def tearDownClass(cls):
62         # pylint: disable=not-an-iterable
63         for process in cls.processes:
64             test_utils.shutdown_process(process)
65         print("all processes killed")
66
67     def setUp(self):
68         # pylint: disable=consider-using-f-string
69         print("execution of {}".format(self.id().split(".")[-1]))
70         time.sleep(2)
71
72     def test_01_xpdr_device_connection(self):
73         response = test_utils.mount_device("XPDR-A2",
74                                            ("xpdra2", self.NODE_VERSION))
75         self.assertEqual(response.status_code, requests.codes.created,
76                          test_utils.CODE_SHOULD_BE_201)
77     # Check the xpdr-type as regen
78
79     def test_02_check_xpdr_type(self):
80         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
81         self.assertEqual(response["status_code"], requests.codes.ok)
82         self.assertEqual(
83             "regen", response["mapping"][0]["xpdr-type"])
84         self.assertEqual(
85             self.REGEN_CAPABILITIES,
86             sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]))
87
88     # Check the xpdr-type as regen
89
90     def test_03_check_xpdr_type(self):
91         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK2")
92         self.assertEqual(response["status_code"], requests.codes.ok)
93         self.assertEqual(
94             "regen", response["mapping"][0]["xpdr-type"])
95         self.assertEqual(
96             self.REGEN_CAPABILITIES,
97             sorted(response["mapping"][0]["regen-profiles"]["regen-profile"]))
98
99     def test_04_400g_regen_service_path_create(self):
100         response = test_utils.transportpce_api_rpc_request(
101             "transportpce-device-renderer", "service-path",
102             {
103                 "service-name": "service_400g_regen",
104                 "wave-number": "0",
105                 "modulation-format": "dp-qam16",
106                 "operation": "create",
107                 "nodes": [
108                     {
109                         "node-id": "XPDR-A2",
110                         "src-tp": "XPDR4-NETWORK1",
111                         "dest-tp": "XPDR4-NETWORK2"
112                     }
113                 ],
114                 "center-freq": 195.0,
115                 "nmc-width": 75,
116                 "mc-width": 87.5,
117                 "min-freq": 194.95625,
118                 "max-freq": 195.04375,
119                 "lower-spectral-slot-number": 582,
120                 "higher-spectral-slot-number": 595,
121             })
122         self.assertEqual(response["status_code"], requests.codes.ok)
123         self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
124
125         self.assertEqual(
126             {"node-id": "XPDR-A2",
127              "otu-interface-id": [
128                  "XPDR4-NETWORK1-OTUC4",
129                  "XPDR4-NETWORK2-OTUC4"
130              ],
131              "odu-interface-id": [
132                  "XPDR4-NETWORK1-ODUC4",
133                  "XPDR4-NETWORK2-ODUC4"
134              ],
135              "och-interface-id": [
136                  "XPDR4-NETWORK1-582:595",
137                  "XPDR4-NETWORK1-OTSIGROUP-400G",
138                  "XPDR4-NETWORK2-582:595",
139                  "XPDR4-NETWORK2-OTSIGROUP-400G",
140              ]},
141             {x: (sorted(response["output"]["node-interface"][0][x])
142                  if isinstance(response["output"]["node-interface"][0][x], list)
143                  else response["output"]["node-interface"][0][x])
144              for x in response["output"]["node-interface"][0].keys()})
145
146     def test_05_get_portmapping_network1(self):
147         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
148         self.assertEqual(response["status_code"], requests.codes.ok)
149         self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR4-NETWORK1-OTUC4"
150         self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "AKkLPpWaa8x+"
151         self.NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR4-NETWORK2"
152         self.assertIn(
153             self.NETWORK1_CHECK_DICT,
154             response["mapping"])
155
156     def test_06_get_portmapping_network1(self):
157         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK2")
158         self.assertEqual(response["status_code"], requests.codes.ok)
159         self.NETWORK1_CHECK_DICT["logical-connection-point"] = "XPDR4-NETWORK2"
160         self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR4-NETWORK2-OTUC4"
161         self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "AKkLPpWaa8x9"
162         self.NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR4-NETWORK1"
163         self.NETWORK1_CHECK_DICT["supporting-circuit-pack-name"] = "1/1/5-PLUG-NET"
164         self.assertIn(
165             self.NETWORK1_CHECK_DICT,
166             response["mapping"])
167
168     def test_07_check_interface_oduc4(self):
169         response = test_utils.check_node_attribute_request(
170             "XPDR-A2", "interface", "XPDR4-NETWORK1-ODUC4")
171         self.assertEqual(response["status_code"], requests.codes.ok)
172
173         input_dict_1 = {"name": "XPDR4-NETWORK1-ODUC4",
174                         "administrative-state": "inService",
175                         "supporting-circuit-pack-name": "1/1/6-PLUG_NET",
176                         "supporting-interface-list": "XPDR4-NETWORK1-OTUC4",
177                         "type": "org-openroadm-interfaces:otnOdu",
178                         "supporting-port": "L1"}
179
180         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-CTP",
181                         "rate": "org-openroadm-otn-common-types:ODUCn",
182                         "degm-intervals": 2,
183                         "degthr-percentage": 100,
184                         "monitoring-mode": "not-terminated"
185                         }
186         self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
187                              response["interface"][0])
188         self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
189                              response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
190
191     def test_08_check_interface_oduc4(self):
192         response = test_utils.check_node_attribute_request(
193             "XPDR-A2", "interface", "XPDR4-NETWORK2-ODUC4")
194         self.assertEqual(response["status_code"], requests.codes.ok)
195
196         input_dict_1 = {"name": "XPDR4-NETWORK2-ODUC4",
197                         "administrative-state": "inService",
198                         "supporting-circuit-pack-name": "1/1/5-PLUG_NET",
199                         "supporting-interface-list": "XPDR4-NETWORK2-ODUC4",
200                         "type": "org-openroadm-interfaces:otnOdu",
201                         "supporting-port": "L1"}
202
203         input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-CTP",
204                         "rate": "org-openroadm-otn-common-types:ODUCn",
205                         "degm-intervals": 2,
206                         "degthr-percentage": 100,
207                         "monitoring-mode": "not-terminated"
208                         }
209         self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
210                              response["interface"][0])
211         self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
212                              response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
213
214     # Delete the service path
215
216     def test_09_service_path_delete_regen(self):
217         response = test_utils.transportpce_api_rpc_request(
218             "transportpce-device-renderer", "service-path",
219             {
220                 "modulation-format": "dp-qam16",
221                 "operation": "delete",
222                 "service-name": "test1_regen",
223                 "wave-number": "0",
224                 "center-freq": 195.0,
225                 "nmc-width": 75,
226                 "mc-width": 87.5,
227                 "min-freq": 194.95625,
228                 "max-freq": 195.04375,
229                 "lower-spectral-slot-number": 582,
230                 "higher-spectral-slot-number": 595,
231                 "nodes": [
232                     {
233                         "node-id": "XPDR-A2",
234                         "src-tp": "XPDR4-NETWORK1",
235                         "dest-tp": "XPDR4-NETWORK2"
236                     }
237                 ]
238             })
239         self.assertEqual(response["status_code"], requests.codes.ok)
240         self.assertIn("Request processed", response["output"]["result"])
241
242     def test_10_check_no_interface_oduc4(self):
243         response = test_utils.check_node_attribute_request(
244             "XPDR-A2", "interface", "XPDR4-NETWORK1-ODUC4")
245         self.assertEqual(response["status_code"], requests.codes.conflict)
246
247     def test_11_check_no_interface_oduc4(self):
248         response = test_utils.check_node_attribute_request(
249             "XPDR-A2", "interface", "XPDR4-NETWORK2-ODUC4")
250         self.assertEqual(response["status_code"], requests.codes.conflict)
251
252     def test_12_check_no_interface_otuc4(self):
253         response = test_utils.check_node_attribute_request(
254             "XPDR-A2", "interface", "XPDR4-NETWORK1-OTUC1")
255         self.assertEqual(response["status_code"], requests.codes.conflict)
256
257     def test_13_check_no_interface_otuc4(self):
258         response = test_utils.check_node_attribute_request(
259             "XPDR-A2", "interface", "XPDR4-NETWORK2-OTUC1")
260         self.assertEqual(response["status_code"], requests.codes.conflict)
261
262     # Check if port-mapping data is updated, where the supporting-otucn is deleted
263     def test_14_check_no_otuc4(self):
264         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
265         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
266
267     def test_15_check_no_otuc4(self):
268         response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR4-NETWORK1")
269         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
270
271     def test_16_check_no_interface_otsig(self):
272         response = test_utils.check_node_attribute_request(
273             "XPDR-A2", "interface", "XPDR4-NETWORK1-OTSIGROUP-400G")
274         self.assertEqual(response["status_code"], requests.codes.conflict)
275
276     def test_17_check_no_interface_otsig(self):
277         response = test_utils.check_node_attribute_request(
278             "XPDR-A2", "interface", "XPDR4-NETWORK2-OTSIGROUP-400G")
279         self.assertEqual(response["status_code"], requests.codes.conflict)
280
281     def test_18_check_no_interface_otsi(self):
282         response = test_utils.check_node_attribute_request(
283             "XPDR-A2", "interface", "XPDR4-NETWORK1-582:595")
284         self.assertEqual(response["status_code"], requests.codes.conflict)
285
286     def test_19_check_no_interface_otsi(self):
287         response = test_utils.check_node_attribute_request(
288             "XPDR-A2", "interface", "XPDR4-NETWORK2-582:595")
289         self.assertEqual(response["status_code"], requests.codes.conflict)
290
291     # Disconnect the XPDR
292     def test_20_xpdr_device_disconnection(self):
293         response = test_utils.unmount_device("XPDR-A2")
294         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
295
296     def test_21_xpdr_device_disconnected(self):
297         response = test_utils.check_device_connection("XPDR-A2")
298         self.assertEqual(response["status_code"], requests.codes.conflict)
299         self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
300         self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
301         self.assertEqual(response["connection-status"]["error-message"],
302                          "Request could not be completed because the relevant data model content does not exist")
303
304     def test_22_xpdr_device_not_connected(self):
305         response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None)
306         self.assertEqual(response["status_code"], requests.codes.conflict)
307         self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
308         self.assertEqual(response["node-info"]["error-tag"], "data-missing")
309         self.assertEqual(response["node-info"]["error-message"],
310                          "Request could not be completed because the relevant data model content does not exist")
311
312
313 if __name__ == "__main__":
314     unittest.main(verbosity=2)