Keep AvailFreqMaps when netconf session restores
[transportpce.git] / tests / transportpce_tests / 7.1 / test02_device_renderer.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13
14 import unittest
15 import time
16 import requests
17 # pylint: disable=wrong-import-order
18 import sys
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils  # nopep8
23 import test_utils_rfc8040  # nopep8
24
25
26 class TransportPCE400GPortMappingTesting(unittest.TestCase):
27
28     processes = None
29     NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
30                            "supporting-port": "L1",
31                            "supported-interface-capability": [
32                                "org-openroadm-port-types:if-otsi-otsigroup"
33                            ],
34                            "port-direction": "bidirectional",
35                            "port-qual": "switch-network",
36                            "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
37                            "xponder-type": "mpdr",
38                            'lcp-hash-val': 'LY9PxYJqUbw=',
39                            'port-admin-state': 'InService',
40                            'port-oper-state': 'InService'}
41     NODE_VERSION = '7.1'
42
43     @classmethod
44     def setUpClass(cls):
45         cls.processes = test_utils_rfc8040.start_tpce()
46         cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
47
48     @classmethod
49     def tearDownClass(cls):
50         # pylint: disable=not-an-iterable
51         for process in cls.processes:
52             test_utils_rfc8040.shutdown_process(process)
53         print("all processes killed")
54
55     def setUp(self):
56         # pylint: disable=consider-using-f-string
57         print("execution of {}".format(self.id().split(".")[-1]))
58         time.sleep(10)
59
60     def test_01_xpdr_device_connection(self):
61         response = test_utils_rfc8040.mount_device("XPDR-A2",
62                                                    ('xpdra2', self.NODE_VERSION))
63         self.assertEqual(response.status_code, requests.codes.created,
64                          test_utils_rfc8040.CODE_SHOULD_BE_201)
65
66     # Check if the node appears in the ietf-network topology
67     # this test has been removed, since it already exists in port-mapping
68     # 1a) create a OTUC2 device renderer
69     def test_02_service_path_create_otuc2(self):
70         response = test_utils.service_path_request("create", "service_OTUC2", "0",
71                                                    [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
72                                                    196.1, 75, 196.0375, 196.125, 755,
73                                                    768, "dp-qpsk")
74         time.sleep(3)
75         self.assertEqual(response.status_code, requests.codes.ok)
76         res = response.json()
77         self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
78         self.assertTrue(res["output"]["success"])
79         self.assertIn(
80             {'node-id': 'XPDR-A2',
81              'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
82              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, res["output"]['node-interface'])
83
84     def test_03_get_portmapping_network1(self):
85         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
86         self.assertEqual(response.status_code, requests.codes.ok)
87         res = response.json()
88         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
89         self.assertIn(
90             self.NETWORK2_CHECK_DICT,
91             res['mapping'])
92
93     def test_04_check_interface_otsi(self):
94         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
95         self.assertEqual(response.status_code, requests.codes.ok)
96         res = response.json()
97
98         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
99                         'administrative-state': 'inService',
100                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
101                         'type': 'org-openroadm-interfaces:otsi',
102                         'supporting-port': 'L1'}
103         input_dict_2 = {
104             "frequency": 196.0812,
105             "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
106             "fec": "org-openroadm-common-types:ofec",
107             "transmit-power": -5,
108             "provision-mode": "explicit",
109             "modulation-format": "dp-qpsk"}
110
111         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
112                              res['interface'][0])
113         self.assertDictEqual(dict(input_dict_2,
114                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
115                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
116         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
117                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
118
119     def test_05_check_interface_otsig(self):
120         response = test_utils.check_netconf_node_request(
121             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-200G")
122         self.assertEqual(response.status_code, requests.codes.ok)
123         res = response.json()
124         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
125                         'administrative-state': 'inService',
126                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
127                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
128                         'type': 'org-openroadm-interfaces:otsi-group',
129                         'supporting-port': 'L1'}
130         input_dict_2 = {"group-id": 1,
131                         "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
132
133         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
134                              res['interface'][0])
135         self.assertDictEqual(dict(input_dict_2,
136                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
137                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
138
139     def test_06_check_interface_otuc2(self):
140         response = test_utils.check_netconf_node_request(
141             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC2")
142         self.assertEqual(response.status_code, requests.codes.ok)
143         res = response.json()
144         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
145                         'administrative-state': 'inService',
146                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
147                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
148                         'type': 'org-openroadm-interfaces:otnOtu',
149                         'supporting-port': 'L1'}
150         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
151                         "degthr-percentage": 100,
152                         "tim-detect-mode": "Disabled",
153                         "otucn-n-rate": 2,
154                         "degm-intervals": 2}
155
156         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
157                              res['interface'][0])
158         self.assertDictEqual(dict(input_dict_2,
159                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
160                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
161
162     # 1b) create a ODUC2 device renderer
163     def test_07_otn_service_path_create_oduc2(self):
164         response = test_utils.otn_service_path_request("create", "service_ODUC2", "200", "ODU",
165                                                        [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
166         time.sleep(3)
167         self.assertEqual(response.status_code, requests.codes.ok)
168         res = response.json()
169         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
170         self.assertTrue(res["output"]["success"])
171         self.assertIn(
172             {'node-id': 'XPDR-A2',
173              'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, res["output"]['node-interface'])
174
175     def test_08_get_portmapping_network1(self):
176         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
177         self.assertEqual(response.status_code, requests.codes.ok)
178         res = response.json()
179         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
180         self.assertIn(
181             self.NETWORK2_CHECK_DICT,
182             res['mapping'])
183
184     def test_09_check_interface_oduc2(self):
185         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC2")
186         self.assertEqual(response.status_code, requests.codes.ok)
187         res = response.json()
188
189         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
190                         'administrative-state': 'inService',
191                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
192                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
193                         'type': 'org-openroadm-interfaces:otnOdu',
194                         'supporting-port': 'L1'}
195
196         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
197                         'rate': 'org-openroadm-otn-common-types:ODUCn',
198                         'tx-sapi': 'LY9PxYJqUbw=',
199                         'tx-dapi': 'LY9PxYJqUbw=',
200                         'expected-sapi': 'LY9PxYJqUbw=',
201                         'expected-dapi': 'LY9PxYJqUbw=',
202                         "degm-intervals": 2,
203                         "degthr-percentage": 100,
204                         "monitoring-mode": "terminated",
205                         "oducn-n-rate": 2
206                         }
207         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
208                              res['interface'][0])
209         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
210                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
211         self.assertDictEqual(
212             {'payload-type': '22', 'exp-payload-type': '22'},
213             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
214
215     # 1c) create Ethernet device renderer
216     def test_10_otn_service_path_create_100ge(self):
217         response = test_utils.otn_service_path_request("create", "service_Ethernet", "100", "Ethernet",
218                                                        [{"node-id": "XPDR-A2", "client-tp": "XPDR2-CLIENT1",
219                                                          "network-tp": "XPDR2-NETWORK1"}],
220                                                        {"ethernet-encoding": "eth encode",
221                                                         "opucn-trib-slots": [
222                                                             "1.1",
223                                                             "1.20"
224                                                         ]})
225         time.sleep(3)
226         self.assertEqual(response.status_code, requests.codes.ok)
227         res = response.json()
228         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
229         self.assertTrue(res["output"]["success"])
230         self.assertEqual('XPDR-A2', res["output"]['node-interface'][0]['node-id'])
231         self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
232                       res["output"]['node-interface'][0]['connection-id'])
233         self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', res["output"]['node-interface'][0]['eth-interface-id'])
234         self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet', res["output"]['node-interface'][0]['odu-interface-id'])
235         self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet', res["output"]['node-interface'][0]['odu-interface-id'])
236
237     def test_11_check_interface_100ge_client(self):
238         response = test_utils.check_netconf_node_request(
239             "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET-100G")
240         self.assertEqual(response.status_code, requests.codes.ok)
241         res = response.json()
242         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
243                         'administrative-state': 'inService',
244                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
245                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
246                         'supporting-port': 'C1'
247                         }
248         input_dict_2 = {'speed': 100000}
249         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
250                              res['interface'][0])
251         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
252                              res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
253
254     def test_12_check_interface_odu4_client(self):
255         response = test_utils.check_netconf_node_request(
256             "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service_Ethernet")
257         self.assertEqual(response.status_code, requests.codes.ok)
258         res = response.json()
259         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
260                         'administrative-state': 'inService',
261                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
262                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
263                         'type': 'org-openroadm-interfaces:otnOdu',
264                         'supporting-port': 'C1'}
265         input_dict_2 = {
266             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
267             'rate': 'org-openroadm-otn-common-types:ODU4',
268             'monitoring-mode': 'terminated'}
269
270         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
271                              res['interface'][0])
272         self.assertDictEqual(dict(input_dict_2,
273                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
274                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
275         self.assertDictEqual(
276             {'payload-type': '07', 'exp-payload-type': '07'},
277             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
278
279     def test_13_check_interface_odu4_network(self):
280         response = test_utils.check_netconf_node_request(
281             "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service_Ethernet")
282         self.assertEqual(response.status_code, requests.codes.ok)
283         res = response.json()
284         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
285                         'administrative-state': 'inService',
286                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
287                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
288                         'type': 'org-openroadm-interfaces:otnOdu',
289                         'supporting-port': 'L1'}
290         input_dict_2 = {
291             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
292             'rate': 'org-openroadm-otn-common-types:ODU4',
293             'monitoring-mode': 'not-terminated'}
294         input_dict_3 = {'trib-port-number': 1}
295
296         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
297                              res['interface'][0])
298         self.assertDictEqual(dict(input_dict_2,
299                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
300                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
301         self.assertDictEqual(dict(input_dict_3,
302                                   **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
303                                       'parent-odu-allocation']),
304                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
305         self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
306                       ['opucn-trib-slots'])
307         self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
308                       ['opucn-trib-slots'])
309
310     def test_14_check_odu_connection_xpdra2(self):
311         response = test_utils.check_netconf_node_request(
312             "XPDR-A2",
313             "odu-connection/XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
314         self.assertEqual(response.status_code, requests.codes.ok)
315         res = response.json()
316         input_dict_1 = {
317             'connection-name':
318             'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
319             'direction': 'bidirectional'
320         }
321
322         self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
323                              res['odu-connection'][0])
324         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
325                              res['odu-connection'][0]['destination'])
326         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
327                              res['odu-connection'][0]['source'])
328
329     # 1d) Delete Ethernet device interfaces
330     def test_15_otn_service_path_delete_100ge(self):
331         response = test_utils.otn_service_path_request("delete", "service_Ethernet", "100", "Ethernet",
332                                                        [{"node-id": "XPDR-A2", "client-tp": "XPDR2-CLIENT1",
333                                                          "network-tp": "XPDR2-NETWORK1"}],
334                                                        {"ethernet-encoding": "eth encode",
335                                                         "trib-slot": ["1"], "trib-port-number": "1"})
336         time.sleep(3)
337         self.assertEqual(response.status_code, requests.codes.ok)
338         res = response.json()
339         self.assertIn('Request processed', res["output"]["result"])
340         self.assertTrue(res["output"]["success"])
341
342     def test_16_check_no_odu_connection(self):
343         response = test_utils.check_netconf_node_request(
344             "XPDR-A2",
345             "odu-connection/XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
346         self.assertEqual(response.status_code, requests.codes.conflict)
347
348     def test_17_check_no_interface_odu_network(self):
349         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service_Ethernet")
350         self.assertEqual(response.status_code, requests.codes.conflict)
351
352     def test_18_check_no_interface_odu_client(self):
353         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service_Ethernet")
354         self.assertEqual(response.status_code, requests.codes.conflict)
355
356     def test_19_check_no_interface_100ge_client(self):
357         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET-100G")
358         self.assertEqual(response.status_code, requests.codes.conflict)
359
360     # 1e) Delete ODUC2 device interfaces
361     def test_20_otn_service_path_delete_oduc2(self):
362         response = test_utils.otn_service_path_request("delete", "service_ODUC2", "200", "ODU",
363                                                        [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
364         time.sleep(3)
365         self.assertEqual(response.status_code, requests.codes.ok)
366         res = response.json()
367         self.assertIn('Request processed', res["output"]["result"])
368         self.assertTrue(res["output"]["success"])
369
370     def test_21_check_no_interface_oduc2(self):
371         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC2")
372         self.assertEqual(response.status_code, requests.codes.conflict)
373
374     # 1f) Delete OTUC2 device interfaces
375     def test_22_service_path_delete_otuc2(self):
376         response = test_utils.service_path_request("delete", "service_OTUC2", "0",
377                                                    [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
378                                                    196.1, 75, 196.0375, 196.125, 755,
379                                                    768, "dp-qpsk")
380         time.sleep(3)
381         self.assertEqual(response.status_code, requests.codes.ok)
382         res = response.json()
383         self.assertIn('Request processed', res["output"]["result"])
384         self.assertTrue(res["output"]["success"])
385
386     def test_23_check_no_interface_otuc2(self):
387         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTUC2")
388         self.assertEqual(response.status_code, requests.codes.conflict)
389
390     def test_24_check_no_interface_otsig(self):
391         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTSIGROUP-200G")
392         self.assertEqual(response.status_code, requests.codes.conflict)
393
394     def test_25_check_no_interface_otsi(self):
395         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-755:768")
396         self.assertEqual(response.status_code, requests.codes.conflict)
397
398     # 2a) create a OTUC3 device renderer
399     def test_26_service_path_create_otuc3(self):
400         response = test_utils.service_path_request("create", "service_OTUC3", "0",
401                                                    [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
402                                                    196.1, 75, 196.0375, 196.125, 755,
403                                                    768, "dp-qam8")
404         time.sleep(3)
405         self.assertEqual(response.status_code, requests.codes.ok)
406         res = response.json()
407         self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
408         self.assertTrue(res["output"]["success"])
409         self.assertIn(
410             {'node-id': 'XPDR-A2',
411              'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
412              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, res["output"]['node-interface'])
413
414     def test_27_get_portmapping_network1(self):
415         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
416         self.assertEqual(response.status_code, requests.codes.ok)
417         res = response.json()
418         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
419         self.assertIn(
420             self.NETWORK2_CHECK_DICT,
421             res['mapping'])
422
423     def test_28_check_interface_otsi(self):
424         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
425         self.assertEqual(response.status_code, requests.codes.ok)
426         res = response.json()
427
428         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
429                         'administrative-state': 'inService',
430                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
431                         'type': 'org-openroadm-interfaces:otsi',
432                         'supporting-port': 'L1'}
433         input_dict_2 = {
434             "frequency": 196.0812,
435             "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
436             "fec": "org-openroadm-common-types:ofec",
437             "transmit-power": -5,
438             "provision-mode": "explicit",
439             "modulation-format": "dp-qam8"}
440
441         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
442                              res['interface'][0])
443         self.assertDictEqual(dict(input_dict_2,
444                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
445                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
446         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
447                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
448
449     def test_29_check_interface_otsig(self):
450         response = test_utils.check_netconf_node_request(
451             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-300G")
452         self.assertEqual(response.status_code, requests.codes.ok)
453         res = response.json()
454         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
455                         'administrative-state': 'inService',
456                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
457                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
458                         'type': 'org-openroadm-interfaces:otsi-group',
459                         'supporting-port': 'L1'}
460         input_dict_2 = {"group-id": 1,
461                         "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
462
463         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
464                              res['interface'][0])
465         self.assertDictEqual(dict(input_dict_2,
466                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
467                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
468
469     def test_30_check_interface_otuc3(self):
470         response = test_utils.check_netconf_node_request(
471             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC3")
472         self.assertEqual(response.status_code, requests.codes.ok)
473         res = response.json()
474         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
475                         'administrative-state': 'inService',
476                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
477                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
478                         'type': 'org-openroadm-interfaces:otnOtu',
479                         'supporting-port': 'L1'}
480         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
481                         "degthr-percentage": 100,
482                         "tim-detect-mode": "Disabled",
483                         "otucn-n-rate": 3,
484                         "degm-intervals": 2}
485
486         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
487                              res['interface'][0])
488         self.assertDictEqual(dict(input_dict_2,
489                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
490                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
491
492     # 2b) create a ODUC3 device renderer
493     def test_31_otn_service_path_create_oduc3(self):
494         response = test_utils.otn_service_path_request("create", "service_ODUC3", "300", "ODU",
495                                                        [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
496         time.sleep(3)
497         self.assertEqual(response.status_code, requests.codes.ok)
498         res = response.json()
499         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
500         self.assertTrue(res["output"]["success"])
501         self.assertIn(
502             {'node-id': 'XPDR-A2',
503              'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, res["output"]['node-interface'])
504
505     def test_32_get_portmapping_network1(self):
506         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
507         self.assertEqual(response.status_code, requests.codes.ok)
508         res = response.json()
509         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
510         self.assertIn(
511             self.NETWORK2_CHECK_DICT,
512             res['mapping'])
513
514     def test_33_check_interface_oduc3(self):
515         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC3")
516         self.assertEqual(response.status_code, requests.codes.ok)
517         res = response.json()
518
519         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
520                         'administrative-state': 'inService',
521                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
522                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
523                         'type': 'org-openroadm-interfaces:otnOdu',
524                         'supporting-port': 'L1'}
525
526         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
527                         'rate': 'org-openroadm-otn-common-types:ODUCn',
528                         'tx-sapi': 'LY9PxYJqUbw=',
529                         'tx-dapi': 'LY9PxYJqUbw=',
530                         'expected-sapi': 'LY9PxYJqUbw=',
531                         'expected-dapi': 'LY9PxYJqUbw=',
532                         "degm-intervals": 2,
533                         "degthr-percentage": 100,
534                         "monitoring-mode": "terminated",
535                         "oducn-n-rate": 3
536                         }
537         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
538                              res['interface'][0])
539         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
540                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
541         self.assertDictEqual(
542             {'payload-type': '22', 'exp-payload-type': '22'},
543             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
544
545     # 2c) create Ethernet device renderer
546     # No change in the ethernet device renderer so skipping those tests
547     # 2d) Delete Ethernet device interfaces
548     # No change in the ethernet device renderer so skipping those tests
549
550     # 2e) Delete ODUC3 device interfaces
551     def test_34_otn_service_path_delete_oduc3(self):
552         response = test_utils.otn_service_path_request("delete", "service_ODUC3", "300", "ODU",
553                                                        [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
554         time.sleep(3)
555         self.assertEqual(response.status_code, requests.codes.ok)
556         res = response.json()
557         self.assertIn('Request processed', res["output"]["result"])
558         self.assertTrue(res["output"]["success"])
559
560     def test_35_check_no_interface_oduc3(self):
561         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC3")
562         self.assertEqual(response.status_code, requests.codes.conflict)
563
564     # 2f) Delete OTUC3 device interfaces
565     def test_36_service_path_delete_otuc3(self):
566         response = test_utils.service_path_request("delete", "service_OTUC3", "0",
567                                                    [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
568                                                    196.1, 75, 196.0375, 196.125, 755,
569                                                    768, "dp-qam8")
570         time.sleep(3)
571         self.assertEqual(response.status_code, requests.codes.ok)
572         res = response.json()
573         self.assertIn('Request processed', res["output"]["result"])
574         self.assertTrue(res["output"]["success"])
575
576     def test_37_check_no_interface_otuc3(self):
577         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTUC3")
578         self.assertEqual(response.status_code, requests.codes.conflict)
579
580     def test_38_check_no_interface_otsig(self):
581         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTSIGROUP-300G")
582         self.assertEqual(response.status_code, requests.codes.conflict)
583
584     def test_39_check_no_interface_otsi(self):
585         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-755:768")
586         self.assertEqual(response.status_code, requests.codes.conflict)
587
588     # 3a) create a OTUC4 device renderer
589     def test_40_service_path_create_otuc3(self):
590         response = test_utils.service_path_request("create", "service_OTUC4", "0",
591                                                    [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
592                                                    196.1, 75, 196.0375, 196.125, 755,
593                                                    768, "dp-qam16")
594         time.sleep(3)
595         self.assertEqual(response.status_code, requests.codes.ok)
596         res = response.json()
597         self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
598         self.assertTrue(res["output"]["success"])
599         self.assertIn(
600             {'node-id': 'XPDR-A2',
601              'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
602              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, res["output"]['node-interface'])
603
604     def test_41_get_portmapping_network1(self):
605         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
606         self.assertEqual(response.status_code, requests.codes.ok)
607         res = response.json()
608         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
609         self.assertIn(
610             self.NETWORK2_CHECK_DICT,
611             res['mapping'])
612
613     def test_42_check_interface_otsi(self):
614         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
615         self.assertEqual(response.status_code, requests.codes.ok)
616         res = response.json()
617
618         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
619                         'administrative-state': 'inService',
620                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
621                         'type': 'org-openroadm-interfaces:otsi',
622                         'supporting-port': 'L1'}
623         input_dict_2 = {
624             "frequency": 196.0812,
625             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
626             "fec": "org-openroadm-common-types:ofec",
627             "transmit-power": -5,
628             "provision-mode": "explicit",
629             "modulation-format": "dp-qam16"}
630
631         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
632                              res['interface'][0])
633         self.assertDictEqual(dict(input_dict_2,
634                                   **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
635                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
636         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
637                              res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
638
639     def test_43_check_interface_otsig(self):
640         response = test_utils.check_netconf_node_request(
641             "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-400G")
642         self.assertEqual(response.status_code, requests.codes.ok)
643         res = response.json()
644         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
645                         'administrative-state': 'inService',
646                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
647                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
648                         'type': 'org-openroadm-interfaces:otsi-group',
649                         'supporting-port': 'L1'}
650         input_dict_2 = {"group-id": 1,
651                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
652
653         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
654                              res['interface'][0])
655         self.assertDictEqual(dict(input_dict_2,
656                                   **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
657                              res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
658
659     def test_44_check_interface_otuc4(self):
660         response = test_utils.check_netconf_node_request(
661             "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
662         self.assertEqual(response.status_code, requests.codes.ok)
663         res = response.json()
664         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
665                         'administrative-state': 'inService',
666                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
667                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
668                         'type': 'org-openroadm-interfaces:otnOtu',
669                         'supporting-port': 'L1'}
670         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
671                         "degthr-percentage": 100,
672                         "tim-detect-mode": "Disabled",
673                         "otucn-n-rate": 4,
674                         "degm-intervals": 2}
675
676         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
677                              res['interface'][0])
678         self.assertDictEqual(dict(input_dict_2,
679                                   **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
680                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
681
682     # 3b) create a ODUC4 device renderer
683     def test_45_otn_service_path_create_oduc3(self):
684         response = test_utils.otn_service_path_request("create", "service_ODUC4", "400", "ODU",
685                                                        [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
686         time.sleep(3)
687         self.assertEqual(response.status_code, requests.codes.ok)
688         res = response.json()
689         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
690         self.assertTrue(res["output"]["success"])
691         self.assertIn(
692             {'node-id': 'XPDR-A2',
693              'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, res["output"]['node-interface'])
694
695     def test_46_get_portmapping_network1(self):
696         response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
697         self.assertEqual(response.status_code, requests.codes.ok)
698         res = response.json()
699         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
700         self.assertIn(
701             self.NETWORK2_CHECK_DICT,
702             res['mapping'])
703
704     def test_47_check_interface_oduc4(self):
705         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
706         self.assertEqual(response.status_code, requests.codes.ok)
707         res = response.json()
708
709         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
710                         'administrative-state': 'inService',
711                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
712                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
713                         'type': 'org-openroadm-interfaces:otnOdu',
714                         'supporting-port': 'L1'}
715
716         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
717                         'rate': 'org-openroadm-otn-common-types:ODUCn',
718                         'tx-sapi': 'LY9PxYJqUbw=',
719                         'tx-dapi': 'LY9PxYJqUbw=',
720                         'expected-sapi': 'LY9PxYJqUbw=',
721                         'expected-dapi': 'LY9PxYJqUbw=',
722                         "degm-intervals": 2,
723                         "degthr-percentage": 100,
724                         "monitoring-mode": "terminated",
725                         "oducn-n-rate": 4
726                         }
727         self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
728                              res['interface'][0])
729         self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
730                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
731         self.assertDictEqual(
732             {'payload-type': '22', 'exp-payload-type': '22'},
733             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
734
735     # 3c) create Ethernet device renderer
736     # No change in the ethernet device renderer so skipping those tests
737     # 3d) Delete Ethernet device interfaces
738     # No change in the ethernet device renderer so skipping those tests
739
740     # 3e) Delete ODUC4 device interfaces
741     def test_48_otn_service_path_delete_oduc4(self):
742         response = test_utils.otn_service_path_request("delete", "service_ODUC4", "400", "ODU",
743                                                        [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
744         time.sleep(3)
745         self.assertEqual(response.status_code, requests.codes.ok)
746         res = response.json()
747         self.assertIn('Request processed', res["output"]["result"])
748         self.assertTrue(res["output"]["success"])
749
750     def test_49_check_no_interface_oduc4(self):
751         response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
752         self.assertEqual(response.status_code, requests.codes.conflict)
753
754     # 3f) Delete OTUC4 device interfaces
755     def test_50_service_path_delete_otuc4(self):
756         response = test_utils.service_path_request("delete", "service_OTUC4", "0",
757                                                    [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
758                                                    196.1, 75, 196.0375, 196.125, 755,
759                                                    768, "dp-qam16")
760         time.sleep(3)
761         self.assertEqual(response.status_code, requests.codes.ok)
762         res = response.json()
763         self.assertIn('Request processed', res["output"]["result"])
764         self.assertTrue(res["output"]["success"])
765
766     def test_51_check_no_interface_otuc4(self):
767         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTUC4")
768         self.assertEqual(response.status_code, requests.codes.conflict)
769
770     def test_52_check_no_interface_otsig(self):
771         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTSIGROUP-400G")
772         self.assertEqual(response.status_code, requests.codes.conflict)
773
774     def test_53_check_no_interface_otsi(self):
775         response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-755:768")
776         self.assertEqual(response.status_code, requests.codes.conflict)
777
778     # Disconnect the XPDR
779     def test_54_xpdr_device_disconnection(self):
780         response = test_utils_rfc8040.unmount_device("XPDR-A2")
781         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
782
783     def test_55_xpdr_device_disconnected(self):
784         response = test_utils_rfc8040.check_device_connection("XPDR-A2")
785         self.assertEqual(response['status_code'], requests.codes.conflict)
786         self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
787         self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
788         self.assertEqual(response['connection-status']['error-message'],
789                          'Request could not be completed because the relevant data model content does not exist')
790
791     def test_56_xpdr_device_not_connected(self):
792         response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
793         self.assertEqual(response['status_code'], requests.codes.conflict)
794         self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
795         self.assertEqual(response['node-info']['error-tag'], 'data-missing')
796         self.assertEqual(response['node-info']['error-message'],
797                          'Request could not be completed because the relevant data model content does not exist')
798
799
800 if __name__ == '__main__':
801     unittest.main(verbosity=2)