Merge "Change the ODU4 type to ODU-TTP-CTP"
[transportpce.git] / tests / transportpce_tests / 7.1 / test02_otn_renderer.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13
14 import unittest
15 import time
16 import requests
17 # pylint: disable=wrong-import-order
18 import sys
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040  # nopep8
23
24
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
26
27     processes = None
28     NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
29                            "supporting-port": "L1",
30                            "supported-interface-capability": [
31                                "org-openroadm-port-types:if-otsi-otsigroup"
32                            ],
33                            "port-direction": "bidirectional",
34                            "port-qual": "switch-network",
35                            "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
36                            "xponder-type": "mpdr",
37                            'lcp-hash-val': 'LY9PxYJqUbw=',
38                            'port-admin-state': 'InService',
39                            'port-oper-state': 'InService'}
40     NODE_VERSION = '7.1'
41
42     @classmethod
43     def setUpClass(cls):
44         cls.processes = test_utils_rfc8040.start_tpce()
45         cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
46
47     @classmethod
48     def tearDownClass(cls):
49         # pylint: disable=not-an-iterable
50         for process in cls.processes:
51             test_utils_rfc8040.shutdown_process(process)
52         print("all processes killed")
53
54     def setUp(self):
55         # pylint: disable=consider-using-f-string
56         print("execution of {}".format(self.id().split(".")[-1]))
57         time.sleep(10)
58
59     def test_01_xpdr_device_connection(self):
60         response = test_utils_rfc8040.mount_device("XPDR-A2",
61                                                    ('xpdra2', self.NODE_VERSION))
62         self.assertEqual(response.status_code, requests.codes.created,
63                          test_utils_rfc8040.CODE_SHOULD_BE_201)
64
65     # Check if the node appears in the ietf-network topology
66     # this test has been removed, since it already exists in port-mapping
67     # 1a) create a OTUC2 device renderer
68     def test_02_service_path_create_otuc2(self):
69         response = test_utils_rfc8040.device_renderer_service_path_request(
70             {
71                 'service-name': 'service_OTUC2',
72                 'wave-number': '0',
73                 'modulation-format': 'dp-qpsk',
74                 'operation': 'create',
75                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
76                 'center-freq': 196.1,
77                 'nmc-width': 75,
78                 'min-freq': 196.0375,
79                 'max-freq': 196.125,
80                 'lower-spectral-slot-number': 755,
81                 'higher-spectral-slot-number': 768
82             })
83         self.assertEqual(response['status_code'], requests.codes.ok)
84         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
85         self.assertIn(
86             {'node-id': 'XPDR-A2',
87              'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
88              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
89
90     def test_03_get_portmapping_network1(self):
91         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
92         self.assertEqual(response['status_code'], requests.codes.ok)
93         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
94         self.assertIn(
95             self.NETWORK2_CHECK_DICT,
96             response['mapping'])
97
98     def test_04_check_interface_otsi(self):
99         # pylint: disable=line-too-long
100         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
101         self.assertEqual(response['status_code'], requests.codes.ok)
102         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
103                         'administrative-state': 'inService',
104                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
105                         'type': 'org-openroadm-interfaces:otsi',
106                         'supporting-port': 'L1'}
107         input_dict_2 = {
108             "frequency": 196.0812,
109             "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
110             "fec": "org-openroadm-common-types:ofec",
111             "transmit-power": -5,
112             "provision-mode": "explicit",
113             "modulation-format": "dp-qpsk"}
114
115         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
116                              response['interface'][0])
117         self.assertDictEqual(dict(input_dict_2,
118                                   **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
119                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
120         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
121                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
122
123     def test_05_check_interface_otsig(self):
124         response = test_utils_rfc8040.check_node_attribute_request(
125             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
126         self.assertEqual(response['status_code'], requests.codes.ok)
127         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
128                         'administrative-state': 'inService',
129                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
130                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
131                         'type': 'org-openroadm-interfaces:otsi-group',
132                         'supporting-port': 'L1'}
133         input_dict_2 = {"group-id": 1,
134                         "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
135
136         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
137                              response['interface'][0])
138         self.assertDictEqual(dict(input_dict_2,
139                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
140                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
141
142     def test_06_check_interface_otuc2(self):
143         response = test_utils_rfc8040.check_node_attribute_request(
144             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
145         self.assertEqual(response['status_code'], requests.codes.ok)
146         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
147                         'administrative-state': 'inService',
148                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
149                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
150                         'type': 'org-openroadm-interfaces:otnOtu',
151                         'supporting-port': 'L1'}
152         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
153                         "degthr-percentage": 100,
154                         "tim-detect-mode": "Disabled",
155                         "otucn-n-rate": 2,
156                         "degm-intervals": 2}
157
158         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
159                              response['interface'][0])
160         self.assertDictEqual(dict(input_dict_2,
161                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
162                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
163
164     # 1b) create a ODUC2 device renderer
165     def test_07_otn_service_path_create_oduc2(self):
166         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
167             {
168                 'service-name': 'service_ODUC2',
169                 'operation': 'create',
170                 'service-rate': '200',
171                 'service-format': 'ODU',
172                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
173             })
174         self.assertEqual(response['status_code'], requests.codes.ok)
175         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
176         self.assertIn(
177             {'node-id': 'XPDR-A2',
178              'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
179
180     def test_08_get_portmapping_network1(self):
181         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
182         self.assertEqual(response['status_code'], requests.codes.ok)
183         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
184         self.assertIn(
185             self.NETWORK2_CHECK_DICT,
186             response['mapping'])
187
188     def test_09_check_interface_oduc2(self):
189         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
190         self.assertEqual(response['status_code'], requests.codes.ok)
191
192         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
193                         'administrative-state': 'inService',
194                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
195                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
196                         'type': 'org-openroadm-interfaces:otnOdu',
197                         'supporting-port': 'L1'}
198
199         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
200                         'rate': 'org-openroadm-otn-common-types:ODUCn',
201                         'tx-sapi': 'LY9PxYJqUbw=',
202                         'tx-dapi': 'LY9PxYJqUbw=',
203                         'expected-sapi': 'LY9PxYJqUbw=',
204                         'expected-dapi': 'LY9PxYJqUbw=',
205                         "degm-intervals": 2,
206                         "degthr-percentage": 100,
207                         "monitoring-mode": "terminated",
208                         "oducn-n-rate": 2
209                         }
210         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
211                              response['interface'][0])
212         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
213                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
214         self.assertDictEqual(
215             {'payload-type': '22', 'exp-payload-type': '22'},
216             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
217
218     # 1c) create Ethernet device renderer
219     def test_10_otn_service_path_create_100ge(self):
220         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
221             {
222                 'service-name': 'service_Ethernet',
223                 'operation': 'create',
224                 'service-rate': '100',
225                 'service-format': 'Ethernet',
226                 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
227                 'ethernet-encoding': 'eth encode',
228                 'opucn-trib-slots': ['1.1', '1.20']
229             })
230         self.assertEqual(response['status_code'], requests.codes.ok)
231         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
232         self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
233         self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
234                       response['output']['node-interface'][0]['connection-id'])
235         self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
236         self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
237                       response['output']['node-interface'][0]['odu-interface-id'])
238         self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
239                       response['output']['node-interface'][0]['odu-interface-id'])
240
241     def test_11_check_interface_100ge_client(self):
242         response = test_utils_rfc8040.check_node_attribute_request(
243             "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
244         self.assertEqual(response['status_code'], requests.codes.ok)
245         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
246                         'administrative-state': 'inService',
247                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
248                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
249                         'supporting-port': 'C1'
250                         }
251         input_dict_2 = {'speed': 100000}
252         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
253                              response['interface'][0])
254         self.assertDictEqual(dict(input_dict_2,
255                                   **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
256                              response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
257
258     def test_12_check_interface_odu4_client(self):
259         response = test_utils_rfc8040.check_node_attribute_request(
260             "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
261         self.assertEqual(response['status_code'], requests.codes.ok)
262         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
263                         'administrative-state': 'inService',
264                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
265                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
266                         'type': 'org-openroadm-interfaces:otnOdu',
267                         'supporting-port': 'C1'}
268         input_dict_2 = {
269             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
270             'rate': 'org-openroadm-otn-common-types:ODU4',
271             'monitoring-mode': 'terminated'}
272
273         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
274                              response['interface'][0])
275         self.assertDictEqual(dict(input_dict_2,
276                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
277                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
278         self.assertDictEqual(
279             {'payload-type': '07', 'exp-payload-type': '07'},
280             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
281
282     def test_13_check_interface_odu4_network(self):
283         response = test_utils_rfc8040.check_node_attribute_request(
284             "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
285         self.assertEqual(response['status_code'], requests.codes.ok)
286         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
287                         'administrative-state': 'inService',
288                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
289                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
290                         'type': 'org-openroadm-interfaces:otnOdu',
291                         'supporting-port': 'L1'}
292         input_dict_2 = {
293             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
294             'rate': 'org-openroadm-otn-common-types:ODU4',
295             'monitoring-mode': 'not-terminated'}
296         input_dict_3 = {'trib-port-number': 1}
297
298         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
299                              response['interface'][0])
300         self.assertDictEqual(dict(input_dict_2,
301                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
302                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
303         self.assertDictEqual(dict(input_dict_3,
304                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
305                                       'parent-odu-allocation']),
306                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
307         self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
308                       ['opucn-trib-slots'])
309         self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
310                       ['opucn-trib-slots'])
311
312     def test_14_check_odu_connection_xpdra2(self):
313         response = test_utils_rfc8040.check_node_attribute_request(
314             "XPDR-A2",
315             "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
316         self.assertEqual(response['status_code'], requests.codes.ok)
317         input_dict_1 = {
318             'connection-name':
319             'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
320             'direction': 'bidirectional'
321         }
322
323         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
324                              response['odu-connection'][0])
325         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
326                              response['odu-connection'][0]['destination'])
327         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
328                              response['odu-connection'][0]['source'])
329
330     # 1d) Delete Ethernet device interfaces
331     def test_15_otn_service_path_delete_100ge(self):
332         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
333             {
334                 'service-name': 'service_Ethernet',
335                 'operation': 'delete',
336                 'service-rate': '100',
337                 'service-format': 'Ethernet',
338                 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
339                 'ethernet-encoding': 'eth encode',
340                 'trib-slot': ['1'],
341                 'trib-port-number': '1'
342             })
343         self.assertEqual(response['status_code'], requests.codes.ok)
344         self.assertIn('Request processed', response['output']['result'])
345
346     def test_16_check_no_odu_connection(self):
347         response = test_utils_rfc8040.check_node_attribute_request(
348             "XPDR-A2",
349             "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
350         self.assertEqual(response['status_code'], requests.codes.conflict)
351
352     def test_17_check_no_interface_odu_network(self):
353         response = test_utils_rfc8040.check_node_attribute_request(
354             "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
355         self.assertEqual(response['status_code'], requests.codes.conflict)
356
357     def test_18_check_no_interface_odu_client(self):
358         response = test_utils_rfc8040.check_node_attribute_request(
359             "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
360         self.assertEqual(response['status_code'], requests.codes.conflict)
361
362     def test_19_check_no_interface_100ge_client(self):
363         response = test_utils_rfc8040.check_node_attribute_request(
364             "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
365         self.assertEqual(response['status_code'], requests.codes.conflict)
366
367     # 1e) Delete ODUC2 device interfaces
368     def test_20_otn_service_path_delete_oduc2(self):
369         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
370             {
371                 'service-name': 'service_ODUC2',
372                 'operation': 'delete',
373                 'service-rate': '200',
374                 'service-format': 'ODU',
375                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
376             })
377         self.assertEqual(response['status_code'], requests.codes.ok)
378         self.assertIn('Request processed', response['output']['result'])
379         # Here you have remove the added oducn supporting port interface
380         del self.NETWORK2_CHECK_DICT["supporting-oducn"]
381
382     def test_21_check_no_interface_oduc2(self):
383         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
384         self.assertEqual(response['status_code'], requests.codes.conflict)
385
386     # Check if port-mapping data is updated, where the supporting-oducn is deleted
387     def test_21a_check_no_oduc2(self):
388         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
389         self.assertRaises(KeyError, lambda: response["supporting-oducn"])
390
391     # 1f) Delete OTUC2 device interfaces
392
393     def test_22_service_path_delete_otuc2(self):
394         response = test_utils_rfc8040.device_renderer_service_path_request(
395             {
396                 'service-name': 'service_OTUC2',
397                 'wave-number': '0',
398                 'modulation-format': 'dp-qpsk',
399                 'operation': 'delete',
400                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
401                 'center-freq': 196.1,
402                 'nmc-width': 75,
403                 'min-freq': 196.0375,
404                 'max-freq': 196.125,
405                 'lower-spectral-slot-number': 755,
406                 'higher-spectral-slot-number': 768
407             })
408         self.assertEqual(response['status_code'], requests.codes.ok)
409         self.assertIn('Request processed', response['output']['result'])
410         del self.NETWORK2_CHECK_DICT["supporting-otucn"]
411
412     def test_23_check_no_interface_otuc2(self):
413         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
414         self.assertEqual(response['status_code'], requests.codes.conflict)
415
416     def test_24_check_no_interface_otsig(self):
417         response = test_utils_rfc8040.check_node_attribute_request(
418             "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
419         self.assertEqual(response['status_code'], requests.codes.conflict)
420
421     def test_25_check_no_interface_otsi(self):
422         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
423         self.assertEqual(response['status_code'], requests.codes.conflict)
424
425     def test_25a_check_no_otuc2(self):
426         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
427         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
428
429     # 2a) create a OTUC3 device renderer
430     def test_26_service_path_create_otuc3(self):
431         response = test_utils_rfc8040.device_renderer_service_path_request(
432             {
433                 'service-name': 'service_OTUC3',
434                 'wave-number': '0',
435                 'modulation-format': 'dp-qam8',
436                 'operation': 'create',
437                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
438                 'center-freq': 196.1,
439                 'nmc-width': 75,
440                 'min-freq': 196.0375,
441                 'max-freq': 196.125,
442                 'lower-spectral-slot-number': 755,
443                 'higher-spectral-slot-number': 768
444             })
445         self.assertEqual(response['status_code'], requests.codes.ok)
446         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
447         self.assertIn(
448             {'node-id': 'XPDR-A2',
449              'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
450              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
451
452     def test_27_get_portmapping_network1(self):
453         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
454         self.assertEqual(response['status_code'], requests.codes.ok)
455         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
456         self.assertIn(
457             self.NETWORK2_CHECK_DICT,
458             response['mapping'])
459
460     def test_28_check_interface_otsi(self):
461         # pylint: disable=line-too-long
462         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
463         self.assertEqual(response['status_code'], requests.codes.ok)
464
465         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
466                         'administrative-state': 'inService',
467                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
468                         'type': 'org-openroadm-interfaces:otsi',
469                         'supporting-port': 'L1'}
470         input_dict_2 = {
471             "frequency": 196.0812,
472             "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
473             "fec": "org-openroadm-common-types:ofec",
474             "transmit-power": -5,
475             "provision-mode": "explicit",
476             "modulation-format": "dp-qam8"}
477
478         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
479                              response['interface'][0])
480         self.assertDictEqual(dict(input_dict_2,
481                                   **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
482                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
483         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
484                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
485
486     def test_29_check_interface_otsig(self):
487         response = test_utils_rfc8040.check_node_attribute_request(
488             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
489         self.assertEqual(response['status_code'], requests.codes.ok)
490         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
491                         'administrative-state': 'inService',
492                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
493                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
494                         'type': 'org-openroadm-interfaces:otsi-group',
495                         'supporting-port': 'L1'}
496         input_dict_2 = {"group-id": 1,
497                         "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
498
499         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
500                              response['interface'][0])
501         self.assertDictEqual(dict(input_dict_2,
502                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
503                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
504
505     def test_30_check_interface_otuc3(self):
506         response = test_utils_rfc8040.check_node_attribute_request(
507             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
508         self.assertEqual(response['status_code'], requests.codes.ok)
509         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
510                         'administrative-state': 'inService',
511                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
512                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
513                         'type': 'org-openroadm-interfaces:otnOtu',
514                         'supporting-port': 'L1'}
515         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
516                         "degthr-percentage": 100,
517                         "tim-detect-mode": "Disabled",
518                         "otucn-n-rate": 3,
519                         "degm-intervals": 2}
520
521         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
522                              response['interface'][0])
523         self.assertDictEqual(dict(input_dict_2,
524                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
525                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
526
527     # 2b) create a ODUC3 device renderer
528     def test_31_otn_service_path_create_oduc3(self):
529         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
530             {
531                 'service-name': 'service_ODUC3',
532                 'operation': 'create',
533                 'service-rate': '300',
534                 'service-format': 'ODU',
535                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
536             })
537         self.assertEqual(response['status_code'], requests.codes.ok)
538         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
539         self.assertIn(
540             {'node-id': 'XPDR-A2',
541              'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
542
543     def test_32_get_portmapping_network1(self):
544         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
545         self.assertEqual(response['status_code'], requests.codes.ok)
546         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
547         self.assertIn(
548             self.NETWORK2_CHECK_DICT,
549             response['mapping'])
550
551     def test_33_check_interface_oduc3(self):
552         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
553         self.assertEqual(response['status_code'], requests.codes.ok)
554
555         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
556                         'administrative-state': 'inService',
557                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
558                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
559                         'type': 'org-openroadm-interfaces:otnOdu',
560                         'supporting-port': 'L1'}
561
562         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
563                         'rate': 'org-openroadm-otn-common-types:ODUCn',
564                         'tx-sapi': 'LY9PxYJqUbw=',
565                         'tx-dapi': 'LY9PxYJqUbw=',
566                         'expected-sapi': 'LY9PxYJqUbw=',
567                         'expected-dapi': 'LY9PxYJqUbw=',
568                         "degm-intervals": 2,
569                         "degthr-percentage": 100,
570                         "monitoring-mode": "terminated",
571                         "oducn-n-rate": 3
572                         }
573         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
574                              response['interface'][0])
575         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
576                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
577         self.assertDictEqual(
578             {'payload-type': '22', 'exp-payload-type': '22'},
579             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
580
581     # 2c) create Ethernet device renderer
582     # No change in the ethernet device renderer so skipping those tests
583     # 2d) Delete Ethernet device interfaces
584     # No change in the ethernet device renderer so skipping those tests
585
586     # 2e) Delete ODUC3 device interfaces
587     def test_34_otn_service_path_delete_oduc3(self):
588         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
589             {
590                 'service-name': 'service_ODUC3',
591                 'operation': 'delete',
592                 'service-rate': '300',
593                 'service-format': 'ODU',
594                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
595             })
596         self.assertEqual(response['status_code'], requests.codes.ok)
597         self.assertIn('Request processed', response['output']['result'])
598         del self.NETWORK2_CHECK_DICT["supporting-oducn"]
599
600     def test_35_check_no_interface_oduc3(self):
601         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
602         self.assertEqual(response['status_code'], requests.codes.conflict)
603
604     def test_35a_check_no_oduc3(self):
605         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
606         self.assertRaises(KeyError, lambda: response["supporting-oducn"])
607
608     # 2f) Delete OTUC3 device interfaces
609     def test_36_service_path_delete_otuc3(self):
610         response = test_utils_rfc8040.device_renderer_service_path_request(
611             {
612                 'service-name': 'service_OTUC3',
613                 'wave-number': '0',
614                 'modulation-format': 'dp-qam8',
615                 'operation': 'delete',
616                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
617                 'center-freq': 196.1,
618                 'nmc-width': 75,
619                 'min-freq': 196.0375,
620                 'max-freq': 196.125,
621                 'lower-spectral-slot-number': 755,
622                 'higher-spectral-slot-number': 768
623             })
624         self.assertEqual(response['status_code'], requests.codes.ok)
625         self.assertIn('Request processed', response['output']['result'])
626         del self.NETWORK2_CHECK_DICT["supporting-otucn"]
627
628     def test_37_check_no_interface_otuc3(self):
629         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
630         self.assertEqual(response['status_code'], requests.codes.conflict)
631
632     def test_38_check_no_interface_otsig(self):
633         response = test_utils_rfc8040.check_node_attribute_request(
634             "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
635         self.assertEqual(response['status_code'], requests.codes.conflict)
636
637     def test_39_check_no_interface_otsi(self):
638         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
639         self.assertEqual(response['status_code'], requests.codes.conflict)
640
641     def test_39a_check_no_otuc3(self):
642         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
643         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
644
645     # 3a) create a OTUC4 device renderer
646     def test_40_service_path_create_otuc4(self):
647         response = test_utils_rfc8040.device_renderer_service_path_request(
648             {
649                 'service-name': 'service_OTUC4',
650                 'wave-number': '0',
651                 'modulation-format': 'dp-qam16',
652                 'operation': 'create',
653                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
654                 'center-freq': 196.1,
655                 'nmc-width': 75,
656                 'min-freq': 196.0375,
657                 'max-freq': 196.125,
658                 'lower-spectral-slot-number': 755,
659                 'higher-spectral-slot-number': 768
660             })
661         self.assertEqual(response['status_code'], requests.codes.ok)
662         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
663         self.assertIn(
664             {'node-id': 'XPDR-A2',
665              'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
666              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
667
668     def test_41_get_portmapping_network1(self):
669         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
670         self.assertEqual(response['status_code'], requests.codes.ok)
671         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
672         self.assertIn(
673             self.NETWORK2_CHECK_DICT,
674             response['mapping'])
675
676     def test_42_check_interface_otsi(self):
677         # pylint: disable=line-too-long
678         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
679         self.assertEqual(response['status_code'], requests.codes.ok)
680
681         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
682                         'administrative-state': 'inService',
683                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
684                         'type': 'org-openroadm-interfaces:otsi',
685                         'supporting-port': 'L1'}
686         input_dict_2 = {
687             "frequency": 196.0812,
688             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
689             "fec": "org-openroadm-common-types:ofec",
690             "transmit-power": -5,
691             "provision-mode": "explicit",
692             "modulation-format": "dp-qam16"}
693
694         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
695                              response['interface'][0])
696         self.assertDictEqual(dict(input_dict_2,
697                                   **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
698                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
699         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
700                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
701
702     def test_43_check_interface_otsig(self):
703         response = test_utils_rfc8040.check_node_attribute_request(
704             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
705         self.assertEqual(response['status_code'], requests.codes.ok)
706         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
707                         'administrative-state': 'inService',
708                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
709                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
710                         'type': 'org-openroadm-interfaces:otsi-group',
711                         'supporting-port': 'L1'}
712         input_dict_2 = {"group-id": 1,
713                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
714
715         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
716                              response['interface'][0])
717         self.assertDictEqual(dict(input_dict_2,
718                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
719                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
720
721     def test_44_check_interface_otuc4(self):
722         response = test_utils_rfc8040.check_node_attribute_request(
723             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
724         self.assertEqual(response['status_code'], requests.codes.ok)
725         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
726                         'administrative-state': 'inService',
727                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
728                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
729                         'type': 'org-openroadm-interfaces:otnOtu',
730                         'supporting-port': 'L1'}
731         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
732                         "degthr-percentage": 100,
733                         "tim-detect-mode": "Disabled",
734                         "otucn-n-rate": 4,
735                         "degm-intervals": 2}
736
737         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
738                              response['interface'][0])
739         self.assertDictEqual(dict(input_dict_2,
740                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
741                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
742
743     # 3b) create a ODUC4 device renderer
744     def test_45_otn_service_path_create_oduc3(self):
745         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
746             {
747                 'service-name': 'service_ODUC4',
748                 'operation': 'create',
749                 'service-rate': '400',
750                 'service-format': 'ODU',
751                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
752             })
753         self.assertEqual(response['status_code'], requests.codes.ok)
754         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
755         self.assertIn(
756             {'node-id': 'XPDR-A2',
757              'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
758
759     def test_46_get_portmapping_network1(self):
760         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
761         self.assertEqual(response['status_code'], requests.codes.ok)
762         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
763         self.assertIn(
764             self.NETWORK2_CHECK_DICT,
765             response['mapping'])
766
767     def test_47_check_interface_oduc4(self):
768         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
769         self.assertEqual(response['status_code'], requests.codes.ok)
770
771         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
772                         'administrative-state': 'inService',
773                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
774                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
775                         'type': 'org-openroadm-interfaces:otnOdu',
776                         'supporting-port': 'L1'}
777
778         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
779                         'rate': 'org-openroadm-otn-common-types:ODUCn',
780                         'tx-sapi': 'LY9PxYJqUbw=',
781                         'tx-dapi': 'LY9PxYJqUbw=',
782                         'expected-sapi': 'LY9PxYJqUbw=',
783                         'expected-dapi': 'LY9PxYJqUbw=',
784                         "degm-intervals": 2,
785                         "degthr-percentage": 100,
786                         "monitoring-mode": "terminated",
787                         "oducn-n-rate": 4
788                         }
789         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
790                              response['interface'][0])
791         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
792                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
793         self.assertDictEqual(
794             {'payload-type': '22', 'exp-payload-type': '22'},
795             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
796
797     # 3c) create Ethernet device renderer
798     # No change in the ethernet device renderer so skipping those tests
799     # 3d) Delete Ethernet device interfaces
800     # No change in the ethernet device renderer so skipping those tests
801
802     # 3e) Delete ODUC4 device interfaces
803     def test_48_otn_service_path_delete_oduc4(self):
804         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
805             {
806                 'service-name': 'service_ODUC4',
807                 'operation': 'delete',
808                 'service-rate': '400',
809                 'service-format': 'ODU',
810                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
811             })
812         self.assertEqual(response['status_code'], requests.codes.ok)
813         self.assertIn('Request processed', response['output']['result'])
814         del self.NETWORK2_CHECK_DICT["supporting-oducn"]
815
816     def test_49_check_no_interface_oduc4(self):
817         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
818         self.assertEqual(response['status_code'], requests.codes.conflict)
819
820     def test_49a_check_no_oduc4(self):
821         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
822         self.assertRaises(KeyError, lambda: response["supporting-oducn"])
823
824     # 3f) Delete OTUC4 device interfaces
825     def test_50_service_path_delete_otuc4(self):
826         response = test_utils_rfc8040.device_renderer_service_path_request(
827             {
828                 'service-name': 'service_OTUC4',
829                 'wave-number': '0',
830                 'modulation-format': 'dp-qam16',
831                 'operation': 'delete',
832                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
833                 'center-freq': 196.1,
834                 'nmc-width': 75,
835                 'min-freq': 196.0375,
836                 'max-freq': 196.125,
837                 'lower-spectral-slot-number': 755,
838                 'higher-spectral-slot-number': 768
839             })
840         self.assertEqual(response['status_code'], requests.codes.ok)
841         self.assertIn('Request processed', response['output']['result'])
842         del self.NETWORK2_CHECK_DICT["supporting-otucn"]
843
844     def test_51_check_no_interface_otuc4(self):
845         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
846         self.assertEqual(response['status_code'], requests.codes.conflict)
847
848     def test_52_check_no_interface_otsig(self):
849         response = test_utils_rfc8040.check_node_attribute_request(
850             "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
851         self.assertEqual(response['status_code'], requests.codes.conflict)
852
853     def test_53_check_no_interface_otsi(self):
854         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
855         self.assertEqual(response['status_code'], requests.codes.conflict)
856
857     def test_53a_check_no_otuc4(self):
858         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
859         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
860
861     # Disconnect the XPDR
862     def test_54_xpdr_device_disconnection(self):
863         response = test_utils_rfc8040.unmount_device("XPDR-A2")
864         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
865
866     def test_55_xpdr_device_disconnected(self):
867         response = test_utils_rfc8040.check_device_connection("XPDR-A2")
868         self.assertEqual(response['status_code'], requests.codes.conflict)
869         self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
870         self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
871         self.assertEqual(response['connection-status']['error-message'],
872                          'Request could not be completed because the relevant data model content does not exist')
873
874     def test_56_xpdr_device_not_connected(self):
875         response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
876         self.assertEqual(response['status_code'], requests.codes.conflict)
877         self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
878         self.assertEqual(response['node-info']['error-tag'], 'data-missing')
879         self.assertEqual(response['node-info']['error-message'],
880                          'Request could not be completed because the relevant data model content does not exist')
881
882
883 if __name__ == '__main__':
884     unittest.main(verbosity=2)