2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
29 "supporting-port": "L1",
30 "supported-interface-capability": [
31 "org-openroadm-port-types:if-otsi-otsigroup"
33 "port-direction": "bidirectional",
34 "port-qual": "switch-network",
35 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
36 "xponder-type": "mpdr",
37 'lcp-hash-val': 'LY9PxYJqUbw=',
38 'port-admin-state': 'InService',
39 'port-oper-state': 'InService'}
44 cls.processes = test_utils_rfc8040.start_tpce()
45 cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
48 def tearDownClass(cls):
49 # pylint: disable=not-an-iterable
50 for process in cls.processes:
51 test_utils_rfc8040.shutdown_process(process)
52 print("all processes killed")
55 # pylint: disable=consider-using-f-string
56 print("execution of {}".format(self.id().split(".")[-1]))
59 def test_01_xpdr_device_connection(self):
60 response = test_utils_rfc8040.mount_device("XPDR-A2",
61 ('xpdra2', self.NODE_VERSION))
62 self.assertEqual(response.status_code, requests.codes.created,
63 test_utils_rfc8040.CODE_SHOULD_BE_201)
65 # Check if the node appears in the ietf-network topology
66 # this test has been removed, since it already exists in port-mapping
67 # 1a) create a OTUC2 device renderer
68 def test_02_service_path_create_otuc2(self):
69 response = test_utils_rfc8040.transportpce_api_rpc_request(
70 'transportpce-device-renderer', 'service-path',
72 'service-name': 'service_OTUC2',
74 'modulation-format': 'dp-qpsk',
75 'operation': 'create',
76 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
81 'lower-spectral-slot-number': 755,
82 'higher-spectral-slot-number': 768
84 self.assertEqual(response['status_code'], requests.codes.ok)
85 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
87 {'node-id': 'XPDR-A2',
88 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
89 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G',
90 'XPDR2-NETWORK1-755:768']},
91 response['output']['node-interface'])
93 def test_03_get_portmapping_network1(self):
94 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
95 self.assertEqual(response['status_code'], requests.codes.ok)
96 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
98 self.NETWORK2_CHECK_DICT,
101 def test_04_check_interface_otsi(self):
102 # pylint: disable=line-too-long
103 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
104 self.assertEqual(response['status_code'], requests.codes.ok)
105 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
106 'administrative-state': 'inService',
107 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
108 'type': 'org-openroadm-interfaces:otsi',
109 'supporting-port': 'L1'}
111 "frequency": 196.0812,
112 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
113 "fec": "org-openroadm-common-types:ofec",
114 "transmit-power": -5,
115 "provision-mode": "explicit",
116 "modulation-format": "dp-qpsk"}
118 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
119 response['interface'][0])
120 self.assertDictEqual(dict(input_dict_2,
121 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
122 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
123 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
124 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
126 def test_05_check_interface_otsig(self):
127 response = test_utils_rfc8040.check_node_attribute_request(
128 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
129 self.assertEqual(response['status_code'], requests.codes.ok)
130 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
131 'administrative-state': 'inService',
132 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
133 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
134 'type': 'org-openroadm-interfaces:otsi-group',
135 'supporting-port': 'L1'}
136 input_dict_2 = {"group-id": 1,
137 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
139 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
140 response['interface'][0])
141 self.assertDictEqual(dict(input_dict_2,
142 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
143 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
145 def test_06_check_interface_otuc2(self):
146 response = test_utils_rfc8040.check_node_attribute_request(
147 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
148 self.assertEqual(response['status_code'], requests.codes.ok)
149 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
150 'administrative-state': 'inService',
151 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
152 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
153 'type': 'org-openroadm-interfaces:otnOtu',
154 'supporting-port': 'L1'}
155 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
156 "degthr-percentage": 100,
157 "tim-detect-mode": "Disabled",
161 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
162 response['interface'][0])
163 self.assertDictEqual(dict(input_dict_2,
164 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
165 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
167 # 1b) create a ODUC2 device renderer
168 def test_07_otn_service_path_create_oduc2(self):
169 response = test_utils_rfc8040.transportpce_api_rpc_request(
170 'transportpce-device-renderer', 'otn-service-path',
172 'service-name': 'service_ODUC2',
173 'operation': 'create',
174 'service-rate': '200',
175 'service-format': 'ODU',
176 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
178 self.assertEqual(response['status_code'], requests.codes.ok)
179 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
181 {'node-id': 'XPDR-A2',
182 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
184 def test_08_get_portmapping_network1(self):
185 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
186 self.assertEqual(response['status_code'], requests.codes.ok)
187 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
189 self.NETWORK2_CHECK_DICT,
192 def test_09_check_interface_oduc2(self):
193 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
194 self.assertEqual(response['status_code'], requests.codes.ok)
196 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
197 'administrative-state': 'inService',
198 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
199 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
200 'type': 'org-openroadm-interfaces:otnOdu',
201 'supporting-port': 'L1'}
203 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
204 'rate': 'org-openroadm-otn-common-types:ODUCn',
205 'tx-sapi': 'LY9PxYJqUbw=',
206 'tx-dapi': 'LY9PxYJqUbw=',
207 'expected-sapi': 'LY9PxYJqUbw=',
208 'expected-dapi': 'LY9PxYJqUbw=',
210 "degthr-percentage": 100,
211 "monitoring-mode": "terminated",
214 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
215 response['interface'][0])
216 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
217 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
218 self.assertDictEqual(
219 {'payload-type': '22', 'exp-payload-type': '22'},
220 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
222 # 1c) create Ethernet device renderer
223 def test_10_otn_service_path_create_100ge(self):
224 response = test_utils_rfc8040.transportpce_api_rpc_request(
225 'transportpce-device-renderer', 'otn-service-path',
227 'service-name': 'service_Ethernet',
228 'operation': 'create',
229 'service-rate': '100',
230 'service-format': 'Ethernet',
231 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
232 'ethernet-encoding': 'eth encode',
233 'opucn-trib-slots': ['1.1', '1.20']
235 self.assertEqual(response['status_code'], requests.codes.ok)
236 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
237 self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
238 self.assertIn('XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
239 response['output']['node-interface'][0]['connection-id'])
240 self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
241 self.assertIn('XPDR2-NETWORK1-ODU4',
242 response['output']['node-interface'][0]['odu-interface-id'])
243 self.assertIn('XPDR2-CLIENT1-ODU4',
244 response['output']['node-interface'][0]['odu-interface-id'])
246 def test_11_check_interface_100ge_client(self):
247 response = test_utils_rfc8040.check_node_attribute_request(
248 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
249 self.assertEqual(response['status_code'], requests.codes.ok)
250 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
251 'administrative-state': 'inService',
252 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
253 'type': 'org-openroadm-interfaces:ethernetCsmacd',
254 'supporting-port': 'C1'
256 input_dict_2 = {'speed': 100000}
257 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
258 response['interface'][0])
259 self.assertDictEqual(dict(input_dict_2,
260 **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
261 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
263 def test_12_check_interface_odu4_client(self):
264 response = test_utils_rfc8040.check_node_attribute_request(
265 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4")
266 self.assertEqual(response['status_code'], requests.codes.ok)
267 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
268 'administrative-state': 'inService',
269 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
270 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
271 'type': 'org-openroadm-interfaces:otnOdu',
272 'supporting-port': 'C1'}
274 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
275 'rate': 'org-openroadm-otn-common-types:ODU4',
276 'monitoring-mode': 'terminated'}
278 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
279 response['interface'][0])
280 self.assertDictEqual(dict(input_dict_2,
281 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
282 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
283 self.assertDictEqual(
284 {'payload-type': '07', 'exp-payload-type': '07'},
285 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
287 def test_13_check_interface_odu4_network(self):
288 response = test_utils_rfc8040.check_node_attribute_request(
289 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4")
290 self.assertEqual(response['status_code'], requests.codes.ok)
291 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
292 'administrative-state': 'inService',
293 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
295 'type': 'org-openroadm-interfaces:otnOdu',
296 'supporting-port': 'L1'}
298 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
299 'rate': 'org-openroadm-otn-common-types:ODU4',
300 'monitoring-mode': 'not-terminated'}
301 input_dict_3 = {'trib-port-number': 1}
303 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
304 response['interface'][0])
305 self.assertDictEqual(dict(input_dict_2,
306 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
307 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
308 self.assertDictEqual(dict(input_dict_3,
309 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
310 'parent-odu-allocation']),
311 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
312 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
313 ['opucn-trib-slots'])
314 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
315 ['opucn-trib-slots'])
317 def test_14_check_odu_connection_xpdra2(self):
318 response = test_utils_rfc8040.check_node_attribute_request(
320 "odu-connection", "XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
321 self.assertEqual(response['status_code'], requests.codes.ok)
324 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
325 'direction': 'bidirectional'
328 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
329 response['odu-connection'][0])
330 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
331 response['odu-connection'][0]['destination'])
332 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
333 response['odu-connection'][0]['source'])
335 # 1d) Delete Ethernet device interfaces
336 def test_15_otn_service_path_delete_100ge(self):
337 response = test_utils_rfc8040.transportpce_api_rpc_request(
338 'transportpce-device-renderer', 'otn-service-path',
340 'service-name': 'service_Ethernet',
341 'operation': 'delete',
342 'service-rate': '100',
343 'service-format': 'Ethernet',
344 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
345 'ethernet-encoding': 'eth encode',
347 'trib-port-number': '1'
349 self.assertEqual(response['status_code'], requests.codes.ok)
350 self.assertIn('Request processed', response['output']['result'])
352 def test_16_check_no_odu_connection(self):
353 response = test_utils_rfc8040.check_node_attribute_request(
355 "odu-connection", "XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
356 self.assertEqual(response['status_code'], requests.codes.conflict)
358 def test_17_check_no_interface_odu_network(self):
359 response = test_utils_rfc8040.check_node_attribute_request(
360 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4")
361 self.assertEqual(response['status_code'], requests.codes.conflict)
363 def test_18_check_no_interface_odu_client(self):
364 response = test_utils_rfc8040.check_node_attribute_request(
365 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4")
366 self.assertEqual(response['status_code'], requests.codes.conflict)
368 def test_19_check_no_interface_100ge_client(self):
369 response = test_utils_rfc8040.check_node_attribute_request(
370 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
371 self.assertEqual(response['status_code'], requests.codes.conflict)
373 # 1e) Delete ODUC2 device interfaces
374 def test_20_otn_service_path_delete_oduc2(self):
375 response = test_utils_rfc8040.transportpce_api_rpc_request(
376 'transportpce-device-renderer', 'otn-service-path',
378 'service-name': 'service_ODUC2',
379 'operation': 'delete',
380 'service-rate': '200',
381 'service-format': 'ODU',
382 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
384 self.assertEqual(response['status_code'], requests.codes.ok)
385 self.assertIn('Request processed', response['output']['result'])
386 # Here you have remove the added oducn supporting port interface
387 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
389 def test_21_check_no_interface_oduc2(self):
390 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
391 self.assertEqual(response['status_code'], requests.codes.conflict)
393 # Check if port-mapping data is updated, where the supporting-oducn is deleted
394 def test_21a_check_no_oduc2(self):
395 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
396 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
398 # 1f) Delete OTUC2 device interfaces
400 def test_22_service_path_delete_otuc2(self):
401 response = test_utils_rfc8040.transportpce_api_rpc_request(
402 'transportpce-device-renderer', 'service-path',
404 'service-name': 'service_OTUC2',
406 'modulation-format': 'dp-qpsk',
407 'operation': 'delete',
408 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
409 'center-freq': 196.1,
411 'min-freq': 196.0375,
413 'lower-spectral-slot-number': 755,
414 'higher-spectral-slot-number': 768
416 self.assertEqual(response['status_code'], requests.codes.ok)
417 self.assertIn('Request processed', response['output']['result'])
418 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
420 def test_23_check_no_interface_otuc2(self):
421 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
422 self.assertEqual(response['status_code'], requests.codes.conflict)
424 def test_24_check_no_interface_otsig(self):
425 response = test_utils_rfc8040.check_node_attribute_request(
426 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
427 self.assertEqual(response['status_code'], requests.codes.conflict)
429 def test_25_check_no_interface_otsi(self):
430 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
431 self.assertEqual(response['status_code'], requests.codes.conflict)
433 def test_25a_check_no_otuc2(self):
434 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
435 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
437 # 2a) create a OTUC3 device renderer
438 def test_26_service_path_create_otuc3(self):
439 response = test_utils_rfc8040.transportpce_api_rpc_request(
440 'transportpce-device-renderer', 'service-path',
442 'service-name': 'service_OTUC3',
444 'modulation-format': 'dp-qam8',
445 'operation': 'create',
446 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
447 'center-freq': 196.1,
449 'min-freq': 196.0375,
451 'lower-spectral-slot-number': 755,
452 'higher-spectral-slot-number': 768
454 self.assertEqual(response['status_code'], requests.codes.ok)
455 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
456 expected_subset_response = {
457 'node-id': 'XPDR-A2',
458 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3']}
459 expected_sorted_list = ['XPDR2-NETWORK1-755:768',
460 'XPDR2-NETWORK1-OTSIGROUP-300G']
461 subset = {k: v for k, v in response['output']['node-interface'][0].items() if k in expected_subset_response}
462 self.assertDictEqual(subset, expected_subset_response)
463 self.assertEqual(sorted(response['output']['node-interface'][0]['och-interface-id']), expected_sorted_list)
465 def test_27_get_portmapping_network1(self):
466 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
467 self.assertEqual(response['status_code'], requests.codes.ok)
468 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
470 self.NETWORK2_CHECK_DICT,
473 def test_28_check_interface_otsi(self):
474 # pylint: disable=line-too-long
475 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
476 self.assertEqual(response['status_code'], requests.codes.ok)
478 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
479 'administrative-state': 'inService',
480 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
481 'type': 'org-openroadm-interfaces:otsi',
482 'supporting-port': 'L1'}
484 "frequency": 196.0812,
485 "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
486 "fec": "org-openroadm-common-types:ofec",
487 "transmit-power": -5,
488 "provision-mode": "explicit",
489 "modulation-format": "dp-qam8"}
491 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
492 response['interface'][0])
493 self.assertDictEqual(dict(input_dict_2,
494 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
495 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
496 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
497 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
499 def test_29_check_interface_otsig(self):
500 response = test_utils_rfc8040.check_node_attribute_request(
501 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
502 self.assertEqual(response['status_code'], requests.codes.ok)
503 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
504 'administrative-state': 'inService',
505 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
506 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
507 'type': 'org-openroadm-interfaces:otsi-group',
508 'supporting-port': 'L1'}
509 input_dict_2 = {"group-id": 1,
510 "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
512 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
513 response['interface'][0])
514 self.assertDictEqual(dict(input_dict_2,
515 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
516 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
518 def test_30_check_interface_otuc3(self):
519 response = test_utils_rfc8040.check_node_attribute_request(
520 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
521 self.assertEqual(response['status_code'], requests.codes.ok)
522 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
523 'administrative-state': 'inService',
524 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
525 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
526 'type': 'org-openroadm-interfaces:otnOtu',
527 'supporting-port': 'L1'}
528 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
529 "degthr-percentage": 100,
530 "tim-detect-mode": "Disabled",
534 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
535 response['interface'][0])
536 self.assertDictEqual(dict(input_dict_2,
537 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
538 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
540 # 2b) create a ODUC3 device renderer
541 def test_31_otn_service_path_create_oduc3(self):
542 response = test_utils_rfc8040.transportpce_api_rpc_request(
543 'transportpce-device-renderer', 'otn-service-path',
545 'service-name': 'service_ODUC3',
546 'operation': 'create',
547 'service-rate': '300',
548 'service-format': 'ODU',
549 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
551 self.assertEqual(response['status_code'], requests.codes.ok)
552 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
554 {'node-id': 'XPDR-A2',
555 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
557 def test_32_get_portmapping_network1(self):
558 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
559 self.assertEqual(response['status_code'], requests.codes.ok)
560 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
562 self.NETWORK2_CHECK_DICT,
565 def test_33_check_interface_oduc3(self):
566 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
567 self.assertEqual(response['status_code'], requests.codes.ok)
569 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
570 'administrative-state': 'inService',
571 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
572 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
573 'type': 'org-openroadm-interfaces:otnOdu',
574 'supporting-port': 'L1'}
576 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
577 'rate': 'org-openroadm-otn-common-types:ODUCn',
578 'tx-sapi': 'LY9PxYJqUbw=',
579 'tx-dapi': 'LY9PxYJqUbw=',
580 'expected-sapi': 'LY9PxYJqUbw=',
581 'expected-dapi': 'LY9PxYJqUbw=',
583 "degthr-percentage": 100,
584 "monitoring-mode": "terminated",
587 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
588 response['interface'][0])
589 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
590 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
591 self.assertDictEqual(
592 {'payload-type': '22', 'exp-payload-type': '22'},
593 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
595 # 2c) create Ethernet device renderer
596 # No change in the ethernet device renderer so skipping those tests
597 # 2d) Delete Ethernet device interfaces
598 # No change in the ethernet device renderer so skipping those tests
600 # 2e) Delete ODUC3 device interfaces
601 def test_34_otn_service_path_delete_oduc3(self):
602 response = test_utils_rfc8040.transportpce_api_rpc_request(
603 'transportpce-device-renderer', 'otn-service-path',
605 'service-name': 'service_ODUC3',
606 'operation': 'delete',
607 'service-rate': '300',
608 'service-format': 'ODU',
609 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
611 self.assertEqual(response['status_code'], requests.codes.ok)
612 self.assertIn('Request processed', response['output']['result'])
613 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
615 def test_35_check_no_interface_oduc3(self):
616 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
617 self.assertEqual(response['status_code'], requests.codes.conflict)
619 def test_35a_check_no_oduc3(self):
620 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
621 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
623 # 2f) Delete OTUC3 device interfaces
624 def test_36_service_path_delete_otuc3(self):
625 response = test_utils_rfc8040.transportpce_api_rpc_request(
626 'transportpce-device-renderer', 'service-path',
628 'service-name': 'service_OTUC3',
630 'modulation-format': 'dp-qam8',
631 'operation': 'delete',
632 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
633 'center-freq': 196.1,
635 'min-freq': 196.0375,
637 'lower-spectral-slot-number': 755,
638 'higher-spectral-slot-number': 768
640 self.assertEqual(response['status_code'], requests.codes.ok)
641 self.assertIn('Request processed', response['output']['result'])
642 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
644 def test_37_check_no_interface_otuc3(self):
645 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
646 self.assertEqual(response['status_code'], requests.codes.conflict)
648 def test_38_check_no_interface_otsig(self):
649 response = test_utils_rfc8040.check_node_attribute_request(
650 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
651 self.assertEqual(response['status_code'], requests.codes.conflict)
653 def test_39_check_no_interface_otsi(self):
654 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
655 self.assertEqual(response['status_code'], requests.codes.conflict)
657 def test_39a_check_no_otuc3(self):
658 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
659 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
661 # 3a) create a OTUC4 device renderer
662 def test_40_service_path_create_otuc4(self):
663 response = test_utils_rfc8040.transportpce_api_rpc_request(
664 'transportpce-device-renderer', 'service-path',
666 'service-name': 'service_OTUC4',
668 'modulation-format': 'dp-qam16',
669 'operation': 'create',
670 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
671 'center-freq': 196.1,
673 'min-freq': 196.0375,
675 'lower-spectral-slot-number': 755,
676 'higher-spectral-slot-number': 768
678 self.assertEqual(response['status_code'], requests.codes.ok)
679 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
680 expected_subset_response = {
681 'node-id': 'XPDR-A2',
682 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4']}
683 expected_sorted_list = ['XPDR2-NETWORK1-755:768',
684 'XPDR2-NETWORK1-OTSIGROUP-400G']
685 subset = {k: v for k, v in response['output']['node-interface'][0].items() if k in expected_subset_response}
686 self.assertDictEqual(subset, expected_subset_response)
687 self.assertEqual(sorted(response['output']['node-interface'][0]['och-interface-id']), expected_sorted_list)
689 def test_41_get_portmapping_network1(self):
690 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
691 self.assertEqual(response['status_code'], requests.codes.ok)
692 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
694 self.NETWORK2_CHECK_DICT,
697 def test_42_check_interface_otsi(self):
698 # pylint: disable=line-too-long
699 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
700 self.assertEqual(response['status_code'], requests.codes.ok)
702 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
703 'administrative-state': 'inService',
704 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
705 'type': 'org-openroadm-interfaces:otsi',
706 'supporting-port': 'L1'}
708 "frequency": 196.0812,
709 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
710 "fec": "org-openroadm-common-types:ofec",
711 "transmit-power": -5,
712 "provision-mode": "explicit",
713 "modulation-format": "dp-qam16"}
715 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
716 response['interface'][0])
717 self.assertDictEqual(dict(input_dict_2,
718 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
719 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
720 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
721 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
723 def test_43_check_interface_otsig(self):
724 response = test_utils_rfc8040.check_node_attribute_request(
725 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
726 self.assertEqual(response['status_code'], requests.codes.ok)
727 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
728 'administrative-state': 'inService',
729 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
730 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
731 'type': 'org-openroadm-interfaces:otsi-group',
732 'supporting-port': 'L1'}
733 input_dict_2 = {"group-id": 1,
734 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
736 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
737 response['interface'][0])
738 self.assertDictEqual(dict(input_dict_2,
739 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
740 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
742 def test_44_check_interface_otuc4(self):
743 response = test_utils_rfc8040.check_node_attribute_request(
744 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
745 self.assertEqual(response['status_code'], requests.codes.ok)
746 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
747 'administrative-state': 'inService',
748 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
749 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
750 'type': 'org-openroadm-interfaces:otnOtu',
751 'supporting-port': 'L1'}
752 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
753 "degthr-percentage": 100,
754 "tim-detect-mode": "Disabled",
758 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
759 response['interface'][0])
760 self.assertDictEqual(dict(input_dict_2,
761 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
762 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
764 # 3b) create a ODUC4 device renderer
765 def test_45_otn_service_path_create_oduc3(self):
766 response = test_utils_rfc8040.transportpce_api_rpc_request(
767 'transportpce-device-renderer', 'otn-service-path',
769 'service-name': 'service_ODUC4',
770 'operation': 'create',
771 'service-rate': '400',
772 'service-format': 'ODU',
773 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
775 self.assertEqual(response['status_code'], requests.codes.ok)
776 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
778 {'node-id': 'XPDR-A2',
779 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
781 def test_46_get_portmapping_network1(self):
782 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
783 self.assertEqual(response['status_code'], requests.codes.ok)
784 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
786 self.NETWORK2_CHECK_DICT,
789 def test_47_check_interface_oduc4(self):
790 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
791 self.assertEqual(response['status_code'], requests.codes.ok)
793 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
794 'administrative-state': 'inService',
795 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
796 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
797 'type': 'org-openroadm-interfaces:otnOdu',
798 'supporting-port': 'L1'}
800 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
801 'rate': 'org-openroadm-otn-common-types:ODUCn',
802 'tx-sapi': 'LY9PxYJqUbw=',
803 'tx-dapi': 'LY9PxYJqUbw=',
804 'expected-sapi': 'LY9PxYJqUbw=',
805 'expected-dapi': 'LY9PxYJqUbw=',
807 "degthr-percentage": 100,
808 "monitoring-mode": "terminated",
811 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
812 response['interface'][0])
813 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
814 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
815 self.assertDictEqual(
816 {'payload-type': '22', 'exp-payload-type': '22'},
817 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
819 # 3c) create Ethernet device renderer
820 # No change in the ethernet device renderer so skipping those tests
821 # 3d) Delete Ethernet device interfaces
822 # No change in the ethernet device renderer so skipping those tests
824 # 3e) Delete ODUC4 device interfaces
825 def test_48_otn_service_path_delete_oduc4(self):
826 response = test_utils_rfc8040.transportpce_api_rpc_request(
827 'transportpce-device-renderer', 'otn-service-path',
829 'service-name': 'service_ODUC4',
830 'operation': 'delete',
831 'service-rate': '400',
832 'service-format': 'ODU',
833 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
835 self.assertEqual(response['status_code'], requests.codes.ok)
836 self.assertIn('Request processed', response['output']['result'])
837 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
839 def test_49_check_no_interface_oduc4(self):
840 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
841 self.assertEqual(response['status_code'], requests.codes.conflict)
843 def test_49a_check_no_oduc4(self):
844 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
845 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
847 # 3f) Delete OTUC4 device interfaces
848 def test_50_service_path_delete_otuc4(self):
849 response = test_utils_rfc8040.transportpce_api_rpc_request(
850 'transportpce-device-renderer', 'service-path',
852 'service-name': 'service_OTUC4',
854 'modulation-format': 'dp-qam16',
855 'operation': 'delete',
856 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
857 'center-freq': 196.1,
859 'min-freq': 196.0375,
861 'lower-spectral-slot-number': 755,
862 'higher-spectral-slot-number': 768
864 self.assertEqual(response['status_code'], requests.codes.ok)
865 self.assertIn('Request processed', response['output']['result'])
866 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
868 def test_51_check_no_interface_otuc4(self):
869 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
870 self.assertEqual(response['status_code'], requests.codes.conflict)
872 def test_52_check_no_interface_otsig(self):
873 response = test_utils_rfc8040.check_node_attribute_request(
874 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
875 self.assertEqual(response['status_code'], requests.codes.conflict)
877 def test_53_check_no_interface_otsi(self):
878 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
879 self.assertEqual(response['status_code'], requests.codes.conflict)
881 def test_53a_check_no_otuc4(self):
882 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
883 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
885 # Disconnect the XPDR
886 def test_54_xpdr_device_disconnection(self):
887 response = test_utils_rfc8040.unmount_device("XPDR-A2")
888 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
890 def test_55_xpdr_device_disconnected(self):
891 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
892 self.assertEqual(response['status_code'], requests.codes.conflict)
893 self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
894 self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
895 self.assertEqual(response['connection-status']['error-message'],
896 'Request could not be completed because the relevant data model content does not exist')
898 def test_56_xpdr_device_not_connected(self):
899 response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "node-info", None)
900 self.assertEqual(response['status_code'], requests.codes.conflict)
901 self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
902 self.assertEqual(response['node-info']['error-tag'], 'data-missing')
903 self.assertEqual(response['node-info']['error-message'],
904 'Request could not be completed because the relevant data model content does not exist')
907 if __name__ == '__main__':
908 unittest.main(verbosity=2)