Merge "Unrecognized if-supported-capabilities"
[transportpce.git] / tests / transportpce_tests / 7.1 / test02_otn_renderer.py
1 #!/usr/bin/env python
2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others.  All rights reserved.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
10
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13
14 import unittest
15 import time
16 import requests
17 # pylint: disable=wrong-import-order
18 import sys
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040  # nopep8
23
24
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
26
27     processes = None
28     NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
29                            "supporting-port": "L1",
30                            "supported-interface-capability": [
31                                "org-openroadm-port-types:if-otsi-otsigroup"
32                            ],
33                            "port-direction": "bidirectional",
34                            "port-qual": "switch-network",
35                            "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
36                            "xponder-type": "mpdr",
37                            'lcp-hash-val': 'LY9PxYJqUbw=',
38                            'port-admin-state': 'InService',
39                            'port-oper-state': 'InService'}
40     NODE_VERSION = '7.1'
41
42     @classmethod
43     def setUpClass(cls):
44         cls.processes = test_utils_rfc8040.start_tpce()
45         cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
46
47     @classmethod
48     def tearDownClass(cls):
49         # pylint: disable=not-an-iterable
50         for process in cls.processes:
51             test_utils_rfc8040.shutdown_process(process)
52         print("all processes killed")
53
54     def setUp(self):
55         # pylint: disable=consider-using-f-string
56         print("execution of {}".format(self.id().split(".")[-1]))
57         time.sleep(10)
58
59     def test_01_xpdr_device_connection(self):
60         response = test_utils_rfc8040.mount_device("XPDR-A2",
61                                                    ('xpdra2', self.NODE_VERSION))
62         self.assertEqual(response.status_code, requests.codes.created,
63                          test_utils_rfc8040.CODE_SHOULD_BE_201)
64
65     # Check if the node appears in the ietf-network topology
66     # this test has been removed, since it already exists in port-mapping
67     # 1a) create a OTUC2 device renderer
68     def test_02_service_path_create_otuc2(self):
69         response = test_utils_rfc8040.device_renderer_service_path_request(
70             {
71                 'service-name': 'service_OTUC2',
72                 'wave-number': '0',
73                 'modulation-format': 'dp-qpsk',
74                 'operation': 'create',
75                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
76                 'center-freq': 196.1,
77                 'nmc-width': 75,
78                 'min-freq': 196.0375,
79                 'max-freq': 196.125,
80                 'lower-spectral-slot-number': 755,
81                 'higher-spectral-slot-number': 768
82             })
83         self.assertEqual(response['status_code'], requests.codes.ok)
84         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
85         self.assertIn(
86             {'node-id': 'XPDR-A2',
87              'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
88              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
89
90     def test_03_get_portmapping_network1(self):
91         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
92         self.assertEqual(response['status_code'], requests.codes.ok)
93         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
94         self.assertIn(
95             self.NETWORK2_CHECK_DICT,
96             response['mapping'])
97
98     def test_04_check_interface_otsi(self):
99         # pylint: disable=line-too-long
100         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
101         self.assertEqual(response['status_code'], requests.codes.ok)
102         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
103                         'administrative-state': 'inService',
104                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
105                         'type': 'org-openroadm-interfaces:otsi',
106                         'supporting-port': 'L1'}
107         input_dict_2 = {
108             "frequency": 196.0812,
109             "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
110             "fec": "org-openroadm-common-types:ofec",
111             "transmit-power": -5,
112             "provision-mode": "explicit",
113             "modulation-format": "dp-qpsk"}
114
115         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
116                              response['interface'][0])
117         self.assertDictEqual(dict(input_dict_2,
118                                   **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
119                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
120         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
121                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
122
123     def test_05_check_interface_otsig(self):
124         response = test_utils_rfc8040.check_node_attribute_request(
125             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
126         self.assertEqual(response['status_code'], requests.codes.ok)
127         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
128                         'administrative-state': 'inService',
129                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
130                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
131                         'type': 'org-openroadm-interfaces:otsi-group',
132                         'supporting-port': 'L1'}
133         input_dict_2 = {"group-id": 1,
134                         "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
135
136         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
137                              response['interface'][0])
138         self.assertDictEqual(dict(input_dict_2,
139                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
140                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
141
142     def test_06_check_interface_otuc2(self):
143         response = test_utils_rfc8040.check_node_attribute_request(
144             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
145         self.assertEqual(response['status_code'], requests.codes.ok)
146         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
147                         'administrative-state': 'inService',
148                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
149                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
150                         'type': 'org-openroadm-interfaces:otnOtu',
151                         'supporting-port': 'L1'}
152         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
153                         "degthr-percentage": 100,
154                         "tim-detect-mode": "Disabled",
155                         "otucn-n-rate": 2,
156                         "degm-intervals": 2}
157
158         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
159                              response['interface'][0])
160         self.assertDictEqual(dict(input_dict_2,
161                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
162                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
163
164     # 1b) create a ODUC2 device renderer
165     def test_07_otn_service_path_create_oduc2(self):
166         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
167             {
168                 'service-name': 'service_ODUC2',
169                 'operation': 'create',
170                 'service-rate': '200',
171                 'service-format': 'ODU',
172                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
173             })
174         self.assertEqual(response['status_code'], requests.codes.ok)
175         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
176         self.assertIn(
177             {'node-id': 'XPDR-A2',
178              'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
179
180     def test_08_get_portmapping_network1(self):
181         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
182         self.assertEqual(response['status_code'], requests.codes.ok)
183         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
184         self.assertIn(
185             self.NETWORK2_CHECK_DICT,
186             response['mapping'])
187
188     def test_09_check_interface_oduc2(self):
189         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
190         self.assertEqual(response['status_code'], requests.codes.ok)
191
192         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
193                         'administrative-state': 'inService',
194                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
195                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
196                         'type': 'org-openroadm-interfaces:otnOdu',
197                         'supporting-port': 'L1'}
198
199         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
200                         'rate': 'org-openroadm-otn-common-types:ODUCn',
201                         'tx-sapi': 'LY9PxYJqUbw=',
202                         'tx-dapi': 'LY9PxYJqUbw=',
203                         'expected-sapi': 'LY9PxYJqUbw=',
204                         'expected-dapi': 'LY9PxYJqUbw=',
205                         "degm-intervals": 2,
206                         "degthr-percentage": 100,
207                         "monitoring-mode": "terminated",
208                         "oducn-n-rate": 2
209                         }
210         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
211                              response['interface'][0])
212         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
213                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
214         self.assertDictEqual(
215             {'payload-type': '22', 'exp-payload-type': '22'},
216             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
217
218     # 1c) create Ethernet device renderer
219     def test_10_otn_service_path_create_100ge(self):
220         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
221             {
222                 'service-name': 'service_Ethernet',
223                 'operation': 'create',
224                 'service-rate': '100',
225                 'service-format': 'Ethernet',
226                 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
227                 'ethernet-encoding': 'eth encode',
228                 'opucn-trib-slots': ['1.1', '1.20']
229             })
230         self.assertEqual(response['status_code'], requests.codes.ok)
231         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
232         self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
233         self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
234                       response['output']['node-interface'][0]['connection-id'])
235         self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
236         self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
237                       response['output']['node-interface'][0]['odu-interface-id'])
238         self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
239                       response['output']['node-interface'][0]['odu-interface-id'])
240
241     def test_11_check_interface_100ge_client(self):
242         response = test_utils_rfc8040.check_node_attribute_request(
243             "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
244         self.assertEqual(response['status_code'], requests.codes.ok)
245         input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
246                         'administrative-state': 'inService',
247                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
248                         'type': 'org-openroadm-interfaces:ethernetCsmacd',
249                         'supporting-port': 'C1'
250                         }
251         input_dict_2 = {'speed': 100000}
252         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
253                              response['interface'][0])
254         self.assertDictEqual(dict(input_dict_2,
255                                   **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
256                              response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
257
258     def test_12_check_interface_odu4_client(self):
259         response = test_utils_rfc8040.check_node_attribute_request(
260             "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
261         self.assertEqual(response['status_code'], requests.codes.ok)
262         input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
263                         'administrative-state': 'inService',
264                         'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
265                         'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
266                         'type': 'org-openroadm-interfaces:otnOdu',
267                         'supporting-port': 'C1'}
268         input_dict_2 = {
269             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
270             'rate': 'org-openroadm-otn-common-types:ODU4',
271             'monitoring-mode': 'terminated'}
272
273         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
274                              response['interface'][0])
275         self.assertDictEqual(dict(input_dict_2,
276                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
277                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
278         self.assertDictEqual(
279             {'payload-type': '07', 'exp-payload-type': '07'},
280             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
281
282     def test_13_check_interface_odu4_network(self):
283         response = test_utils_rfc8040.check_node_attribute_request(
284             "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
285         self.assertEqual(response['status_code'], requests.codes.ok)
286         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
287                         'administrative-state': 'inService',
288                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
289                         'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
290                         'type': 'org-openroadm-interfaces:otnOdu',
291                         'supporting-port': 'L1'}
292         input_dict_2 = {
293             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
294             'rate': 'org-openroadm-otn-common-types:ODU4',
295             'monitoring-mode': 'not-terminated'}
296         input_dict_3 = {'trib-port-number': 1}
297
298         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
299                              response['interface'][0])
300         self.assertDictEqual(dict(input_dict_2,
301                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
302                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
303         self.assertDictEqual(dict(input_dict_3,
304                                   **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
305                                       'parent-odu-allocation']),
306                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
307         self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
308                       ['opucn-trib-slots'])
309         self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
310                       ['opucn-trib-slots'])
311
312     def test_14_check_odu_connection_xpdra2(self):
313         response = test_utils_rfc8040.check_node_attribute_request(
314             "XPDR-A2",
315             "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
316         self.assertEqual(response['status_code'], requests.codes.ok)
317         input_dict_1 = {
318             'connection-name':
319             'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
320             'direction': 'bidirectional'
321         }
322
323         self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
324                              response['odu-connection'][0])
325         self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
326                              response['odu-connection'][0]['destination'])
327         self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
328                              response['odu-connection'][0]['source'])
329
330     # 1d) Delete Ethernet device interfaces
331     def test_15_otn_service_path_delete_100ge(self):
332         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
333             {
334                 'service-name': 'service_Ethernet',
335                 'operation': 'delete',
336                 'service-rate': '100',
337                 'service-format': 'Ethernet',
338                 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
339                 'ethernet-encoding': 'eth encode',
340                 'trib-slot': ['1'],
341                 'trib-port-number': '1'
342             })
343         self.assertEqual(response['status_code'], requests.codes.ok)
344         self.assertIn('Request processed', response['output']['result'])
345
346     def test_16_check_no_odu_connection(self):
347         response = test_utils_rfc8040.check_node_attribute_request(
348             "XPDR-A2",
349             "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
350         self.assertEqual(response['status_code'], requests.codes.conflict)
351
352     def test_17_check_no_interface_odu_network(self):
353         response = test_utils_rfc8040.check_node_attribute_request(
354             "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
355         self.assertEqual(response['status_code'], requests.codes.conflict)
356
357     def test_18_check_no_interface_odu_client(self):
358         response = test_utils_rfc8040.check_node_attribute_request(
359             "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
360         self.assertEqual(response['status_code'], requests.codes.conflict)
361
362     def test_19_check_no_interface_100ge_client(self):
363         response = test_utils_rfc8040.check_node_attribute_request(
364             "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
365         self.assertEqual(response['status_code'], requests.codes.conflict)
366
367     # 1e) Delete ODUC2 device interfaces
368     def test_20_otn_service_path_delete_oduc2(self):
369         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
370             {
371                 'service-name': 'service_ODUC2',
372                 'operation': 'delete',
373                 'service-rate': '200',
374                 'service-format': 'ODU',
375                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
376             })
377         self.assertEqual(response['status_code'], requests.codes.ok)
378         self.assertIn('Request processed', response['output']['result'])
379
380     def test_21_check_no_interface_oduc2(self):
381         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
382         self.assertEqual(response['status_code'], requests.codes.conflict)
383
384     # 1f) Delete OTUC2 device interfaces
385     def test_22_service_path_delete_otuc2(self):
386         response = test_utils_rfc8040.device_renderer_service_path_request(
387             {
388                 'service-name': 'service_OTUC2',
389                 'wave-number': '0',
390                 'modulation-format': 'dp-qpsk',
391                 'operation': 'delete',
392                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
393                 'center-freq': 196.1,
394                 'nmc-width': 75,
395                 'min-freq': 196.0375,
396                 'max-freq': 196.125,
397                 'lower-spectral-slot-number': 755,
398                 'higher-spectral-slot-number': 768
399             })
400         self.assertEqual(response['status_code'], requests.codes.ok)
401         self.assertIn('Request processed', response['output']['result'])
402
403     def test_23_check_no_interface_otuc2(self):
404         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
405         self.assertEqual(response['status_code'], requests.codes.conflict)
406
407     def test_24_check_no_interface_otsig(self):
408         response = test_utils_rfc8040.check_node_attribute_request(
409             "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
410         self.assertEqual(response['status_code'], requests.codes.conflict)
411
412     def test_25_check_no_interface_otsi(self):
413         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
414         self.assertEqual(response['status_code'], requests.codes.conflict)
415
416     # 2a) create a OTUC3 device renderer
417     def test_26_service_path_create_otuc3(self):
418         response = test_utils_rfc8040.device_renderer_service_path_request(
419             {
420                 'service-name': 'service_OTUC3',
421                 'wave-number': '0',
422                 'modulation-format': 'dp-qam8',
423                 'operation': 'create',
424                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
425                 'center-freq': 196.1,
426                 'nmc-width': 75,
427                 'min-freq': 196.0375,
428                 'max-freq': 196.125,
429                 'lower-spectral-slot-number': 755,
430                 'higher-spectral-slot-number': 768
431             })
432         self.assertEqual(response['status_code'], requests.codes.ok)
433         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
434         self.assertIn(
435             {'node-id': 'XPDR-A2',
436              'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
437              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
438
439     def test_27_get_portmapping_network1(self):
440         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
441         self.assertEqual(response['status_code'], requests.codes.ok)
442         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
443         self.assertIn(
444             self.NETWORK2_CHECK_DICT,
445             response['mapping'])
446
447     def test_28_check_interface_otsi(self):
448         # pylint: disable=line-too-long
449         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
450         self.assertEqual(response['status_code'], requests.codes.ok)
451
452         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
453                         'administrative-state': 'inService',
454                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
455                         'type': 'org-openroadm-interfaces:otsi',
456                         'supporting-port': 'L1'}
457         input_dict_2 = {
458             "frequency": 196.0812,
459             "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
460             "fec": "org-openroadm-common-types:ofec",
461             "transmit-power": -5,
462             "provision-mode": "explicit",
463             "modulation-format": "dp-qam8"}
464
465         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
466                              response['interface'][0])
467         self.assertDictEqual(dict(input_dict_2,
468                                   **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
469                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
470         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
471                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
472
473     def test_29_check_interface_otsig(self):
474         response = test_utils_rfc8040.check_node_attribute_request(
475             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
476         self.assertEqual(response['status_code'], requests.codes.ok)
477         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
478                         'administrative-state': 'inService',
479                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
480                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
481                         'type': 'org-openroadm-interfaces:otsi-group',
482                         'supporting-port': 'L1'}
483         input_dict_2 = {"group-id": 1,
484                         "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
485
486         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
487                              response['interface'][0])
488         self.assertDictEqual(dict(input_dict_2,
489                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
490                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
491
492     def test_30_check_interface_otuc3(self):
493         response = test_utils_rfc8040.check_node_attribute_request(
494             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
495         self.assertEqual(response['status_code'], requests.codes.ok)
496         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
497                         'administrative-state': 'inService',
498                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
499                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
500                         'type': 'org-openroadm-interfaces:otnOtu',
501                         'supporting-port': 'L1'}
502         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
503                         "degthr-percentage": 100,
504                         "tim-detect-mode": "Disabled",
505                         "otucn-n-rate": 3,
506                         "degm-intervals": 2}
507
508         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
509                              response['interface'][0])
510         self.assertDictEqual(dict(input_dict_2,
511                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
512                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
513
514     # 2b) create a ODUC3 device renderer
515     def test_31_otn_service_path_create_oduc3(self):
516         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
517             {
518                 'service-name': 'service_ODUC3',
519                 'operation': 'create',
520                 'service-rate': '300',
521                 'service-format': 'ODU',
522                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
523             })
524         self.assertEqual(response['status_code'], requests.codes.ok)
525         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
526         self.assertIn(
527             {'node-id': 'XPDR-A2',
528              'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
529
530     def test_32_get_portmapping_network1(self):
531         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
532         self.assertEqual(response['status_code'], requests.codes.ok)
533         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
534         self.assertIn(
535             self.NETWORK2_CHECK_DICT,
536             response['mapping'])
537
538     def test_33_check_interface_oduc3(self):
539         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
540         self.assertEqual(response['status_code'], requests.codes.ok)
541
542         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
543                         'administrative-state': 'inService',
544                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
545                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
546                         'type': 'org-openroadm-interfaces:otnOdu',
547                         'supporting-port': 'L1'}
548
549         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
550                         'rate': 'org-openroadm-otn-common-types:ODUCn',
551                         'tx-sapi': 'LY9PxYJqUbw=',
552                         'tx-dapi': 'LY9PxYJqUbw=',
553                         'expected-sapi': 'LY9PxYJqUbw=',
554                         'expected-dapi': 'LY9PxYJqUbw=',
555                         "degm-intervals": 2,
556                         "degthr-percentage": 100,
557                         "monitoring-mode": "terminated",
558                         "oducn-n-rate": 3
559                         }
560         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
561                              response['interface'][0])
562         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
563                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
564         self.assertDictEqual(
565             {'payload-type': '22', 'exp-payload-type': '22'},
566             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
567
568     # 2c) create Ethernet device renderer
569     # No change in the ethernet device renderer so skipping those tests
570     # 2d) Delete Ethernet device interfaces
571     # No change in the ethernet device renderer so skipping those tests
572
573     # 2e) Delete ODUC3 device interfaces
574     def test_34_otn_service_path_delete_oduc3(self):
575         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
576             {
577                 'service-name': 'service_ODUC3',
578                 'operation': 'delete',
579                 'service-rate': '300',
580                 'service-format': 'ODU',
581                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
582             })
583         self.assertEqual(response['status_code'], requests.codes.ok)
584         self.assertIn('Request processed', response['output']['result'])
585
586     def test_35_check_no_interface_oduc3(self):
587         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
588         self.assertEqual(response['status_code'], requests.codes.conflict)
589
590     # 2f) Delete OTUC3 device interfaces
591     def test_36_service_path_delete_otuc3(self):
592         response = test_utils_rfc8040.device_renderer_service_path_request(
593             {
594                 'service-name': 'service_OTUC3',
595                 'wave-number': '0',
596                 'modulation-format': 'dp-qam8',
597                 'operation': 'delete',
598                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
599                 'center-freq': 196.1,
600                 'nmc-width': 75,
601                 'min-freq': 196.0375,
602                 'max-freq': 196.125,
603                 'lower-spectral-slot-number': 755,
604                 'higher-spectral-slot-number': 768
605             })
606         self.assertEqual(response['status_code'], requests.codes.ok)
607         self.assertIn('Request processed', response['output']['result'])
608
609     def test_37_check_no_interface_otuc3(self):
610         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
611         self.assertEqual(response['status_code'], requests.codes.conflict)
612
613     def test_38_check_no_interface_otsig(self):
614         response = test_utils_rfc8040.check_node_attribute_request(
615             "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
616         self.assertEqual(response['status_code'], requests.codes.conflict)
617
618     def test_39_check_no_interface_otsi(self):
619         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
620         self.assertEqual(response['status_code'], requests.codes.conflict)
621
622     # 3a) create a OTUC4 device renderer
623     def test_40_service_path_create_otuc3(self):
624         response = test_utils_rfc8040.device_renderer_service_path_request(
625             {
626                 'service-name': 'service_OTUC4',
627                 'wave-number': '0',
628                 'modulation-format': 'dp-qam16',
629                 'operation': 'create',
630                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
631                 'center-freq': 196.1,
632                 'nmc-width': 75,
633                 'min-freq': 196.0375,
634                 'max-freq': 196.125,
635                 'lower-spectral-slot-number': 755,
636                 'higher-spectral-slot-number': 768
637             })
638         self.assertEqual(response['status_code'], requests.codes.ok)
639         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
640         self.assertIn(
641             {'node-id': 'XPDR-A2',
642              'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
643              'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
644
645     def test_41_get_portmapping_network1(self):
646         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
647         self.assertEqual(response['status_code'], requests.codes.ok)
648         self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
649         self.assertIn(
650             self.NETWORK2_CHECK_DICT,
651             response['mapping'])
652
653     def test_42_check_interface_otsi(self):
654         # pylint: disable=line-too-long
655         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
656         self.assertEqual(response['status_code'], requests.codes.ok)
657
658         input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
659                         'administrative-state': 'inService',
660                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
661                         'type': 'org-openroadm-interfaces:otsi',
662                         'supporting-port': 'L1'}
663         input_dict_2 = {
664             "frequency": 196.0812,
665             "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
666             "fec": "org-openroadm-common-types:ofec",
667             "transmit-power": -5,
668             "provision-mode": "explicit",
669             "modulation-format": "dp-qam16"}
670
671         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
672                              response['interface'][0])
673         self.assertDictEqual(dict(input_dict_2,
674                                   **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
675                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
676         self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
677                              response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
678
679     def test_43_check_interface_otsig(self):
680         response = test_utils_rfc8040.check_node_attribute_request(
681             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
682         self.assertEqual(response['status_code'], requests.codes.ok)
683         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
684                         'administrative-state': 'inService',
685                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
686                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
687                         'type': 'org-openroadm-interfaces:otsi-group',
688                         'supporting-port': 'L1'}
689         input_dict_2 = {"group-id": 1,
690                         "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
691
692         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
693                              response['interface'][0])
694         self.assertDictEqual(dict(input_dict_2,
695                                   **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
696                              response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
697
698     def test_44_check_interface_otuc4(self):
699         response = test_utils_rfc8040.check_node_attribute_request(
700             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
701         self.assertEqual(response['status_code'], requests.codes.ok)
702         input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
703                         'administrative-state': 'inService',
704                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
705                         ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
706                         'type': 'org-openroadm-interfaces:otnOtu',
707                         'supporting-port': 'L1'}
708         input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
709                         "degthr-percentage": 100,
710                         "tim-detect-mode": "Disabled",
711                         "otucn-n-rate": 4,
712                         "degm-intervals": 2}
713
714         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
715                              response['interface'][0])
716         self.assertDictEqual(dict(input_dict_2,
717                                   **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
718                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
719
720     # 3b) create a ODUC4 device renderer
721     def test_45_otn_service_path_create_oduc3(self):
722         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
723             {
724                 'service-name': 'service_ODUC4',
725                 'operation': 'create',
726                 'service-rate': '400',
727                 'service-format': 'ODU',
728                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
729             })
730         self.assertEqual(response['status_code'], requests.codes.ok)
731         self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
732         self.assertIn(
733             {'node-id': 'XPDR-A2',
734              'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
735
736     def test_46_get_portmapping_network1(self):
737         response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
738         self.assertEqual(response['status_code'], requests.codes.ok)
739         self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
740         self.assertIn(
741             self.NETWORK2_CHECK_DICT,
742             response['mapping'])
743
744     def test_47_check_interface_oduc4(self):
745         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
746         self.assertEqual(response['status_code'], requests.codes.ok)
747
748         input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
749                         'administrative-state': 'inService',
750                         'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
751                         'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
752                         'type': 'org-openroadm-interfaces:otnOdu',
753                         'supporting-port': 'L1'}
754
755         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
756                         'rate': 'org-openroadm-otn-common-types:ODUCn',
757                         'tx-sapi': 'LY9PxYJqUbw=',
758                         'tx-dapi': 'LY9PxYJqUbw=',
759                         'expected-sapi': 'LY9PxYJqUbw=',
760                         'expected-dapi': 'LY9PxYJqUbw=',
761                         "degm-intervals": 2,
762                         "degthr-percentage": 100,
763                         "monitoring-mode": "terminated",
764                         "oducn-n-rate": 4
765                         }
766         self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
767                              response['interface'][0])
768         self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
769                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
770         self.assertDictEqual(
771             {'payload-type': '22', 'exp-payload-type': '22'},
772             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
773
774     # 3c) create Ethernet device renderer
775     # No change in the ethernet device renderer so skipping those tests
776     # 3d) Delete Ethernet device interfaces
777     # No change in the ethernet device renderer so skipping those tests
778
779     # 3e) Delete ODUC4 device interfaces
780     def test_48_otn_service_path_delete_oduc4(self):
781         response = test_utils_rfc8040.device_renderer_otn_service_path_request(
782             {
783                 'service-name': 'service_ODUC4',
784                 'operation': 'delete',
785                 'service-rate': '400',
786                 'service-format': 'ODU',
787                 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
788             })
789         self.assertEqual(response['status_code'], requests.codes.ok)
790         self.assertIn('Request processed', response['output']['result'])
791
792     def test_49_check_no_interface_oduc4(self):
793         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
794         self.assertEqual(response['status_code'], requests.codes.conflict)
795
796     # 3f) Delete OTUC4 device interfaces
797     def test_50_service_path_delete_otuc4(self):
798         response = test_utils_rfc8040.device_renderer_service_path_request(
799             {
800                 'service-name': 'service_OTUC4',
801                 'wave-number': '0',
802                 'modulation-format': 'dp-qam16',
803                 'operation': 'delete',
804                 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
805                 'center-freq': 196.1,
806                 'nmc-width': 75,
807                 'min-freq': 196.0375,
808                 'max-freq': 196.125,
809                 'lower-spectral-slot-number': 755,
810                 'higher-spectral-slot-number': 768
811             })
812         self.assertEqual(response['status_code'], requests.codes.ok)
813         self.assertIn('Request processed', response['output']['result'])
814
815     def test_51_check_no_interface_otuc4(self):
816         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
817         self.assertEqual(response['status_code'], requests.codes.conflict)
818
819     def test_52_check_no_interface_otsig(self):
820         response = test_utils_rfc8040.check_node_attribute_request(
821             "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
822         self.assertEqual(response['status_code'], requests.codes.conflict)
823
824     def test_53_check_no_interface_otsi(self):
825         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
826         self.assertEqual(response['status_code'], requests.codes.conflict)
827
828     # Disconnect the XPDR
829     def test_54_xpdr_device_disconnection(self):
830         response = test_utils_rfc8040.unmount_device("XPDR-A2")
831         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
832
833     def test_55_xpdr_device_disconnected(self):
834         response = test_utils_rfc8040.check_device_connection("XPDR-A2")
835         self.assertEqual(response['status_code'], requests.codes.conflict)
836         self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
837         self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
838         self.assertEqual(response['connection-status']['error-message'],
839                          'Request could not be completed because the relevant data model content does not exist')
840
841     def test_56_xpdr_device_not_connected(self):
842         response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
843         self.assertEqual(response['status_code'], requests.codes.conflict)
844         self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
845         self.assertEqual(response['node-info']['error-tag'], 'data-missing')
846         self.assertEqual(response['node-info']['error-message'],
847                          'Request could not be completed because the relevant data model content does not exist')
848
849
850 if __name__ == '__main__':
851     unittest.main(verbosity=2)