2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
29 "supporting-port": "L1",
30 "supported-interface-capability": [
31 "org-openroadm-port-types:if-otsi-otsigroup"
33 "port-direction": "bidirectional",
34 "port-qual": "switch-network",
35 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
36 "xponder-type": "mpdr",
37 'lcp-hash-val': 'LY9PxYJqUbw=',
38 'port-admin-state': 'InService',
39 'port-oper-state': 'InService'}
44 cls.processes = test_utils_rfc8040.start_tpce()
45 cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
48 def tearDownClass(cls):
49 # pylint: disable=not-an-iterable
50 for process in cls.processes:
51 test_utils_rfc8040.shutdown_process(process)
52 print("all processes killed")
55 # pylint: disable=consider-using-f-string
56 print("execution of {}".format(self.id().split(".")[-1]))
59 def test_01_xpdr_device_connection(self):
60 response = test_utils_rfc8040.mount_device("XPDR-A2",
61 ('xpdra2', self.NODE_VERSION))
62 self.assertEqual(response.status_code, requests.codes.created,
63 test_utils_rfc8040.CODE_SHOULD_BE_201)
65 # Check if the node appears in the ietf-network topology
66 # this test has been removed, since it already exists in port-mapping
67 # 1a) create a OTUC2 device renderer
68 def test_02_service_path_create_otuc2(self):
69 response = test_utils_rfc8040.device_renderer_service_path_request(
71 'service-name': 'service_OTUC2',
73 'modulation-format': 'dp-qpsk',
74 'operation': 'create',
75 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
80 'lower-spectral-slot-number': 755,
81 'higher-spectral-slot-number': 768
83 self.assertEqual(response['status_code'], requests.codes.ok)
84 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
86 {'node-id': 'XPDR-A2',
87 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
88 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
90 def test_03_get_portmapping_network1(self):
91 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
92 self.assertEqual(response['status_code'], requests.codes.ok)
93 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
95 self.NETWORK2_CHECK_DICT,
98 def test_04_check_interface_otsi(self):
99 # pylint: disable=line-too-long
100 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
101 self.assertEqual(response['status_code'], requests.codes.ok)
102 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
103 'administrative-state': 'inService',
104 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
105 'type': 'org-openroadm-interfaces:otsi',
106 'supporting-port': 'L1'}
108 "frequency": 196.0812,
109 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
110 "fec": "org-openroadm-common-types:ofec",
111 "transmit-power": -5,
112 "provision-mode": "explicit",
113 "modulation-format": "dp-qpsk"}
115 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
116 response['interface'][0])
117 self.assertDictEqual(dict(input_dict_2,
118 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
119 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
120 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
121 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
123 def test_05_check_interface_otsig(self):
124 response = test_utils_rfc8040.check_node_attribute_request(
125 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
126 self.assertEqual(response['status_code'], requests.codes.ok)
127 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
128 'administrative-state': 'inService',
129 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
130 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
131 'type': 'org-openroadm-interfaces:otsi-group',
132 'supporting-port': 'L1'}
133 input_dict_2 = {"group-id": 1,
134 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
136 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
137 response['interface'][0])
138 self.assertDictEqual(dict(input_dict_2,
139 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
140 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
142 def test_06_check_interface_otuc2(self):
143 response = test_utils_rfc8040.check_node_attribute_request(
144 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
145 self.assertEqual(response['status_code'], requests.codes.ok)
146 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
147 'administrative-state': 'inService',
148 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
149 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
150 'type': 'org-openroadm-interfaces:otnOtu',
151 'supporting-port': 'L1'}
152 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
153 "degthr-percentage": 100,
154 "tim-detect-mode": "Disabled",
158 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
159 response['interface'][0])
160 self.assertDictEqual(dict(input_dict_2,
161 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
162 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
164 # 1b) create a ODUC2 device renderer
165 def test_07_otn_service_path_create_oduc2(self):
166 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
168 'service-name': 'service_ODUC2',
169 'operation': 'create',
170 'service-rate': '200',
171 'service-format': 'ODU',
172 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
174 self.assertEqual(response['status_code'], requests.codes.ok)
175 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
177 {'node-id': 'XPDR-A2',
178 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
180 def test_08_get_portmapping_network1(self):
181 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
182 self.assertEqual(response['status_code'], requests.codes.ok)
183 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
185 self.NETWORK2_CHECK_DICT,
188 def test_09_check_interface_oduc2(self):
189 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
190 self.assertEqual(response['status_code'], requests.codes.ok)
192 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
193 'administrative-state': 'inService',
194 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
195 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
196 'type': 'org-openroadm-interfaces:otnOdu',
197 'supporting-port': 'L1'}
199 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
200 'rate': 'org-openroadm-otn-common-types:ODUCn',
201 'tx-sapi': 'LY9PxYJqUbw=',
202 'tx-dapi': 'LY9PxYJqUbw=',
203 'expected-sapi': 'LY9PxYJqUbw=',
204 'expected-dapi': 'LY9PxYJqUbw=',
206 "degthr-percentage": 100,
207 "monitoring-mode": "terminated",
210 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
211 response['interface'][0])
212 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
213 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
214 self.assertDictEqual(
215 {'payload-type': '22', 'exp-payload-type': '22'},
216 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
218 # 1c) create Ethernet device renderer
219 def test_10_otn_service_path_create_100ge(self):
220 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
222 'service-name': 'service_Ethernet',
223 'operation': 'create',
224 'service-rate': '100',
225 'service-format': 'Ethernet',
226 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
227 'ethernet-encoding': 'eth encode',
228 'opucn-trib-slots': ['1.1', '1.20']
230 self.assertEqual(response['status_code'], requests.codes.ok)
231 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
232 self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
233 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
234 response['output']['node-interface'][0]['connection-id'])
235 self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
236 self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
237 response['output']['node-interface'][0]['odu-interface-id'])
238 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
239 response['output']['node-interface'][0]['odu-interface-id'])
241 def test_11_check_interface_100ge_client(self):
242 response = test_utils_rfc8040.check_node_attribute_request(
243 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
244 self.assertEqual(response['status_code'], requests.codes.ok)
245 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
246 'administrative-state': 'inService',
247 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
248 'type': 'org-openroadm-interfaces:ethernetCsmacd',
249 'supporting-port': 'C1'
251 input_dict_2 = {'speed': 100000}
252 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
253 response['interface'][0])
254 self.assertDictEqual(dict(input_dict_2,
255 **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
256 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
258 def test_12_check_interface_odu4_client(self):
259 response = test_utils_rfc8040.check_node_attribute_request(
260 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
261 self.assertEqual(response['status_code'], requests.codes.ok)
262 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
263 'administrative-state': 'inService',
264 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
265 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
266 'type': 'org-openroadm-interfaces:otnOdu',
267 'supporting-port': 'C1'}
269 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
270 'rate': 'org-openroadm-otn-common-types:ODU4',
271 'monitoring-mode': 'terminated'}
273 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
274 response['interface'][0])
275 self.assertDictEqual(dict(input_dict_2,
276 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
277 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
278 self.assertDictEqual(
279 {'payload-type': '07', 'exp-payload-type': '07'},
280 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
282 def test_13_check_interface_odu4_network(self):
283 response = test_utils_rfc8040.check_node_attribute_request(
284 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
285 self.assertEqual(response['status_code'], requests.codes.ok)
286 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
287 'administrative-state': 'inService',
288 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
289 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
290 'type': 'org-openroadm-interfaces:otnOdu',
291 'supporting-port': 'L1'}
293 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
294 'rate': 'org-openroadm-otn-common-types:ODU4',
295 'monitoring-mode': 'not-terminated'}
296 input_dict_3 = {'trib-port-number': 1}
298 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
299 response['interface'][0])
300 self.assertDictEqual(dict(input_dict_2,
301 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
302 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
303 self.assertDictEqual(dict(input_dict_3,
304 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
305 'parent-odu-allocation']),
306 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
307 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
308 ['opucn-trib-slots'])
309 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
310 ['opucn-trib-slots'])
312 def test_14_check_odu_connection_xpdra2(self):
313 response = test_utils_rfc8040.check_node_attribute_request(
315 "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
316 self.assertEqual(response['status_code'], requests.codes.ok)
319 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
320 'direction': 'bidirectional'
323 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
324 response['odu-connection'][0])
325 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
326 response['odu-connection'][0]['destination'])
327 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
328 response['odu-connection'][0]['source'])
330 # 1d) Delete Ethernet device interfaces
331 def test_15_otn_service_path_delete_100ge(self):
332 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
334 'service-name': 'service_Ethernet',
335 'operation': 'delete',
336 'service-rate': '100',
337 'service-format': 'Ethernet',
338 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
339 'ethernet-encoding': 'eth encode',
341 'trib-port-number': '1'
343 self.assertEqual(response['status_code'], requests.codes.ok)
344 self.assertIn('Request processed', response['output']['result'])
346 def test_16_check_no_odu_connection(self):
347 response = test_utils_rfc8040.check_node_attribute_request(
349 "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
350 self.assertEqual(response['status_code'], requests.codes.conflict)
352 def test_17_check_no_interface_odu_network(self):
353 response = test_utils_rfc8040.check_node_attribute_request(
354 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
355 self.assertEqual(response['status_code'], requests.codes.conflict)
357 def test_18_check_no_interface_odu_client(self):
358 response = test_utils_rfc8040.check_node_attribute_request(
359 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
360 self.assertEqual(response['status_code'], requests.codes.conflict)
362 def test_19_check_no_interface_100ge_client(self):
363 response = test_utils_rfc8040.check_node_attribute_request(
364 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
365 self.assertEqual(response['status_code'], requests.codes.conflict)
367 # 1e) Delete ODUC2 device interfaces
368 def test_20_otn_service_path_delete_oduc2(self):
369 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
371 'service-name': 'service_ODUC2',
372 'operation': 'delete',
373 'service-rate': '200',
374 'service-format': 'ODU',
375 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
377 self.assertEqual(response['status_code'], requests.codes.ok)
378 self.assertIn('Request processed', response['output']['result'])
380 def test_21_check_no_interface_oduc2(self):
381 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
382 self.assertEqual(response['status_code'], requests.codes.conflict)
384 # 1f) Delete OTUC2 device interfaces
385 def test_22_service_path_delete_otuc2(self):
386 response = test_utils_rfc8040.device_renderer_service_path_request(
388 'service-name': 'service_OTUC2',
390 'modulation-format': 'dp-qpsk',
391 'operation': 'delete',
392 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
393 'center-freq': 196.1,
395 'min-freq': 196.0375,
397 'lower-spectral-slot-number': 755,
398 'higher-spectral-slot-number': 768
400 self.assertEqual(response['status_code'], requests.codes.ok)
401 self.assertIn('Request processed', response['output']['result'])
403 def test_23_check_no_interface_otuc2(self):
404 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
405 self.assertEqual(response['status_code'], requests.codes.conflict)
407 def test_24_check_no_interface_otsig(self):
408 response = test_utils_rfc8040.check_node_attribute_request(
409 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
410 self.assertEqual(response['status_code'], requests.codes.conflict)
412 def test_25_check_no_interface_otsi(self):
413 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
414 self.assertEqual(response['status_code'], requests.codes.conflict)
416 # 2a) create a OTUC3 device renderer
417 def test_26_service_path_create_otuc3(self):
418 response = test_utils_rfc8040.device_renderer_service_path_request(
420 'service-name': 'service_OTUC3',
422 'modulation-format': 'dp-qam8',
423 'operation': 'create',
424 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
425 'center-freq': 196.1,
427 'min-freq': 196.0375,
429 'lower-spectral-slot-number': 755,
430 'higher-spectral-slot-number': 768
432 self.assertEqual(response['status_code'], requests.codes.ok)
433 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
435 {'node-id': 'XPDR-A2',
436 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
437 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
439 def test_27_get_portmapping_network1(self):
440 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
441 self.assertEqual(response['status_code'], requests.codes.ok)
442 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
444 self.NETWORK2_CHECK_DICT,
447 def test_28_check_interface_otsi(self):
448 # pylint: disable=line-too-long
449 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
450 self.assertEqual(response['status_code'], requests.codes.ok)
452 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
453 'administrative-state': 'inService',
454 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
455 'type': 'org-openroadm-interfaces:otsi',
456 'supporting-port': 'L1'}
458 "frequency": 196.0812,
459 "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
460 "fec": "org-openroadm-common-types:ofec",
461 "transmit-power": -5,
462 "provision-mode": "explicit",
463 "modulation-format": "dp-qam8"}
465 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
466 response['interface'][0])
467 self.assertDictEqual(dict(input_dict_2,
468 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
469 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
470 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
471 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
473 def test_29_check_interface_otsig(self):
474 response = test_utils_rfc8040.check_node_attribute_request(
475 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
476 self.assertEqual(response['status_code'], requests.codes.ok)
477 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
478 'administrative-state': 'inService',
479 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
480 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
481 'type': 'org-openroadm-interfaces:otsi-group',
482 'supporting-port': 'L1'}
483 input_dict_2 = {"group-id": 1,
484 "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
486 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
487 response['interface'][0])
488 self.assertDictEqual(dict(input_dict_2,
489 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
490 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
492 def test_30_check_interface_otuc3(self):
493 response = test_utils_rfc8040.check_node_attribute_request(
494 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
495 self.assertEqual(response['status_code'], requests.codes.ok)
496 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
497 'administrative-state': 'inService',
498 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
499 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
500 'type': 'org-openroadm-interfaces:otnOtu',
501 'supporting-port': 'L1'}
502 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
503 "degthr-percentage": 100,
504 "tim-detect-mode": "Disabled",
508 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
509 response['interface'][0])
510 self.assertDictEqual(dict(input_dict_2,
511 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
512 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
514 # 2b) create a ODUC3 device renderer
515 def test_31_otn_service_path_create_oduc3(self):
516 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
518 'service-name': 'service_ODUC3',
519 'operation': 'create',
520 'service-rate': '300',
521 'service-format': 'ODU',
522 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
524 self.assertEqual(response['status_code'], requests.codes.ok)
525 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
527 {'node-id': 'XPDR-A2',
528 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
530 def test_32_get_portmapping_network1(self):
531 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
532 self.assertEqual(response['status_code'], requests.codes.ok)
533 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
535 self.NETWORK2_CHECK_DICT,
538 def test_33_check_interface_oduc3(self):
539 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
540 self.assertEqual(response['status_code'], requests.codes.ok)
542 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
543 'administrative-state': 'inService',
544 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
545 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
546 'type': 'org-openroadm-interfaces:otnOdu',
547 'supporting-port': 'L1'}
549 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
550 'rate': 'org-openroadm-otn-common-types:ODUCn',
551 'tx-sapi': 'LY9PxYJqUbw=',
552 'tx-dapi': 'LY9PxYJqUbw=',
553 'expected-sapi': 'LY9PxYJqUbw=',
554 'expected-dapi': 'LY9PxYJqUbw=',
556 "degthr-percentage": 100,
557 "monitoring-mode": "terminated",
560 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
561 response['interface'][0])
562 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
563 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
564 self.assertDictEqual(
565 {'payload-type': '22', 'exp-payload-type': '22'},
566 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
568 # 2c) create Ethernet device renderer
569 # No change in the ethernet device renderer so skipping those tests
570 # 2d) Delete Ethernet device interfaces
571 # No change in the ethernet device renderer so skipping those tests
573 # 2e) Delete ODUC3 device interfaces
574 def test_34_otn_service_path_delete_oduc3(self):
575 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
577 'service-name': 'service_ODUC3',
578 'operation': 'delete',
579 'service-rate': '300',
580 'service-format': 'ODU',
581 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
583 self.assertEqual(response['status_code'], requests.codes.ok)
584 self.assertIn('Request processed', response['output']['result'])
586 def test_35_check_no_interface_oduc3(self):
587 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
588 self.assertEqual(response['status_code'], requests.codes.conflict)
590 # 2f) Delete OTUC3 device interfaces
591 def test_36_service_path_delete_otuc3(self):
592 response = test_utils_rfc8040.device_renderer_service_path_request(
594 'service-name': 'service_OTUC3',
596 'modulation-format': 'dp-qam8',
597 'operation': 'delete',
598 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
599 'center-freq': 196.1,
601 'min-freq': 196.0375,
603 'lower-spectral-slot-number': 755,
604 'higher-spectral-slot-number': 768
606 self.assertEqual(response['status_code'], requests.codes.ok)
607 self.assertIn('Request processed', response['output']['result'])
609 def test_37_check_no_interface_otuc3(self):
610 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
611 self.assertEqual(response['status_code'], requests.codes.conflict)
613 def test_38_check_no_interface_otsig(self):
614 response = test_utils_rfc8040.check_node_attribute_request(
615 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
616 self.assertEqual(response['status_code'], requests.codes.conflict)
618 def test_39_check_no_interface_otsi(self):
619 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
620 self.assertEqual(response['status_code'], requests.codes.conflict)
622 # 3a) create a OTUC4 device renderer
623 def test_40_service_path_create_otuc3(self):
624 response = test_utils_rfc8040.device_renderer_service_path_request(
626 'service-name': 'service_OTUC4',
628 'modulation-format': 'dp-qam16',
629 'operation': 'create',
630 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
631 'center-freq': 196.1,
633 'min-freq': 196.0375,
635 'lower-spectral-slot-number': 755,
636 'higher-spectral-slot-number': 768
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
641 {'node-id': 'XPDR-A2',
642 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
643 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
645 def test_41_get_portmapping_network1(self):
646 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
647 self.assertEqual(response['status_code'], requests.codes.ok)
648 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
650 self.NETWORK2_CHECK_DICT,
653 def test_42_check_interface_otsi(self):
654 # pylint: disable=line-too-long
655 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
656 self.assertEqual(response['status_code'], requests.codes.ok)
658 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
659 'administrative-state': 'inService',
660 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
661 'type': 'org-openroadm-interfaces:otsi',
662 'supporting-port': 'L1'}
664 "frequency": 196.0812,
665 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
666 "fec": "org-openroadm-common-types:ofec",
667 "transmit-power": -5,
668 "provision-mode": "explicit",
669 "modulation-format": "dp-qam16"}
671 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
672 response['interface'][0])
673 self.assertDictEqual(dict(input_dict_2,
674 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
675 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
676 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
677 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
679 def test_43_check_interface_otsig(self):
680 response = test_utils_rfc8040.check_node_attribute_request(
681 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
682 self.assertEqual(response['status_code'], requests.codes.ok)
683 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
684 'administrative-state': 'inService',
685 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
686 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
687 'type': 'org-openroadm-interfaces:otsi-group',
688 'supporting-port': 'L1'}
689 input_dict_2 = {"group-id": 1,
690 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
692 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
693 response['interface'][0])
694 self.assertDictEqual(dict(input_dict_2,
695 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
696 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
698 def test_44_check_interface_otuc4(self):
699 response = test_utils_rfc8040.check_node_attribute_request(
700 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
701 self.assertEqual(response['status_code'], requests.codes.ok)
702 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
703 'administrative-state': 'inService',
704 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
705 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
706 'type': 'org-openroadm-interfaces:otnOtu',
707 'supporting-port': 'L1'}
708 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
709 "degthr-percentage": 100,
710 "tim-detect-mode": "Disabled",
714 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
715 response['interface'][0])
716 self.assertDictEqual(dict(input_dict_2,
717 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
718 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
720 # 3b) create a ODUC4 device renderer
721 def test_45_otn_service_path_create_oduc3(self):
722 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
724 'service-name': 'service_ODUC4',
725 'operation': 'create',
726 'service-rate': '400',
727 'service-format': 'ODU',
728 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
730 self.assertEqual(response['status_code'], requests.codes.ok)
731 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
733 {'node-id': 'XPDR-A2',
734 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
736 def test_46_get_portmapping_network1(self):
737 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
738 self.assertEqual(response['status_code'], requests.codes.ok)
739 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
741 self.NETWORK2_CHECK_DICT,
744 def test_47_check_interface_oduc4(self):
745 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
746 self.assertEqual(response['status_code'], requests.codes.ok)
748 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
749 'administrative-state': 'inService',
750 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
751 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
752 'type': 'org-openroadm-interfaces:otnOdu',
753 'supporting-port': 'L1'}
755 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
756 'rate': 'org-openroadm-otn-common-types:ODUCn',
757 'tx-sapi': 'LY9PxYJqUbw=',
758 'tx-dapi': 'LY9PxYJqUbw=',
759 'expected-sapi': 'LY9PxYJqUbw=',
760 'expected-dapi': 'LY9PxYJqUbw=',
762 "degthr-percentage": 100,
763 "monitoring-mode": "terminated",
766 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
767 response['interface'][0])
768 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
769 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
770 self.assertDictEqual(
771 {'payload-type': '22', 'exp-payload-type': '22'},
772 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
774 # 3c) create Ethernet device renderer
775 # No change in the ethernet device renderer so skipping those tests
776 # 3d) Delete Ethernet device interfaces
777 # No change in the ethernet device renderer so skipping those tests
779 # 3e) Delete ODUC4 device interfaces
780 def test_48_otn_service_path_delete_oduc4(self):
781 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
783 'service-name': 'service_ODUC4',
784 'operation': 'delete',
785 'service-rate': '400',
786 'service-format': 'ODU',
787 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
789 self.assertEqual(response['status_code'], requests.codes.ok)
790 self.assertIn('Request processed', response['output']['result'])
792 def test_49_check_no_interface_oduc4(self):
793 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
794 self.assertEqual(response['status_code'], requests.codes.conflict)
796 # 3f) Delete OTUC4 device interfaces
797 def test_50_service_path_delete_otuc4(self):
798 response = test_utils_rfc8040.device_renderer_service_path_request(
800 'service-name': 'service_OTUC4',
802 'modulation-format': 'dp-qam16',
803 'operation': 'delete',
804 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
805 'center-freq': 196.1,
807 'min-freq': 196.0375,
809 'lower-spectral-slot-number': 755,
810 'higher-spectral-slot-number': 768
812 self.assertEqual(response['status_code'], requests.codes.ok)
813 self.assertIn('Request processed', response['output']['result'])
815 def test_51_check_no_interface_otuc4(self):
816 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
817 self.assertEqual(response['status_code'], requests.codes.conflict)
819 def test_52_check_no_interface_otsig(self):
820 response = test_utils_rfc8040.check_node_attribute_request(
821 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
822 self.assertEqual(response['status_code'], requests.codes.conflict)
824 def test_53_check_no_interface_otsi(self):
825 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
826 self.assertEqual(response['status_code'], requests.codes.conflict)
828 # Disconnect the XPDR
829 def test_54_xpdr_device_disconnection(self):
830 response = test_utils_rfc8040.unmount_device("XPDR-A2")
831 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
833 def test_55_xpdr_device_disconnected(self):
834 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
835 self.assertEqual(response['status_code'], requests.codes.conflict)
836 self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
837 self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
838 self.assertEqual(response['connection-status']['error-message'],
839 'Request could not be completed because the relevant data model content does not exist')
841 def test_56_xpdr_device_not_connected(self):
842 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
843 self.assertEqual(response['status_code'], requests.codes.conflict)
844 self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
845 self.assertEqual(response['node-info']['error-tag'], 'data-missing')
846 self.assertEqual(response['node-info']['error-message'],
847 'Request could not be completed because the relevant data model content does not exist')
850 if __name__ == '__main__':
851 unittest.main(verbosity=2)