2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
29 "supporting-port": "L1",
30 "supported-interface-capability": [
31 "org-openroadm-port-types:if-otsi-otsigroup"
33 "port-direction": "bidirectional",
34 "port-qual": "switch-network",
35 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
36 "xponder-type": "mpdr",
37 'lcp-hash-val': 'LY9PxYJqUbw=',
38 'port-admin-state': 'InService',
39 'port-oper-state': 'InService'}
44 cls.processes = test_utils_rfc8040.start_tpce()
45 cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
48 def tearDownClass(cls):
49 # pylint: disable=not-an-iterable
50 for process in cls.processes:
51 test_utils_rfc8040.shutdown_process(process)
52 print("all processes killed")
55 # pylint: disable=consider-using-f-string
56 print("execution of {}".format(self.id().split(".")[-1]))
59 def test_01_xpdr_device_connection(self):
60 response = test_utils_rfc8040.mount_device("XPDR-A2",
61 ('xpdra2', self.NODE_VERSION))
62 self.assertEqual(response.status_code, requests.codes.created,
63 test_utils_rfc8040.CODE_SHOULD_BE_201)
65 # Check if the node appears in the ietf-network topology
66 # this test has been removed, since it already exists in port-mapping
67 # 1a) create a OTUC2 device renderer
68 def test_02_service_path_create_otuc2(self):
69 response = test_utils_rfc8040.transportpce_api_rpc_request(
70 'transportpce-device-renderer', 'service-path',
72 'service-name': 'service_OTUC2',
74 'modulation-format': 'dp-qpsk',
75 'operation': 'create',
76 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
81 'lower-spectral-slot-number': 755,
82 'higher-spectral-slot-number': 768
84 self.assertEqual(response['status_code'], requests.codes.ok)
85 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
87 {'node-id': 'XPDR-A2',
88 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
89 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G',
90 'XPDR2-NETWORK1-755:768']},
91 response['output']['node-interface'])
93 def test_03_get_portmapping_network1(self):
94 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
95 self.assertEqual(response['status_code'], requests.codes.ok)
96 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
98 self.NETWORK2_CHECK_DICT,
101 def test_04_check_interface_otsi(self):
102 # pylint: disable=line-too-long
103 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
104 self.assertEqual(response['status_code'], requests.codes.ok)
105 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
106 'administrative-state': 'inService',
107 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
108 'type': 'org-openroadm-interfaces:otsi',
109 'supporting-port': 'L1'}
111 "frequency": 196.0812,
112 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
113 "fec": "org-openroadm-common-types:ofec",
114 "transmit-power": -5,
115 "provision-mode": "explicit",
116 "modulation-format": "dp-qpsk"}
118 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
119 response['interface'][0])
120 self.assertDictEqual(dict(input_dict_2,
121 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
122 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
123 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
124 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
126 def test_05_check_interface_otsig(self):
127 response = test_utils_rfc8040.check_node_attribute_request(
128 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
129 self.assertEqual(response['status_code'], requests.codes.ok)
130 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
131 'administrative-state': 'inService',
132 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
133 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
134 'type': 'org-openroadm-interfaces:otsi-group',
135 'supporting-port': 'L1'}
136 input_dict_2 = {"group-id": 1,
137 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
139 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
140 response['interface'][0])
141 self.assertDictEqual(dict(input_dict_2,
142 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
143 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
145 def test_06_check_interface_otuc2(self):
146 response = test_utils_rfc8040.check_node_attribute_request(
147 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
148 self.assertEqual(response['status_code'], requests.codes.ok)
149 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
150 'administrative-state': 'inService',
151 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
152 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
153 'type': 'org-openroadm-interfaces:otnOtu',
154 'supporting-port': 'L1'}
155 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
156 "degthr-percentage": 100,
157 "tim-detect-mode": "Disabled",
161 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
162 response['interface'][0])
163 self.assertDictEqual(dict(input_dict_2,
164 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
165 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
167 # 1b) create a ODUC2 device renderer
168 def test_07_otn_service_path_create_oduc2(self):
169 response = test_utils_rfc8040.transportpce_api_rpc_request(
170 'transportpce-device-renderer', 'otn-service-path',
172 'service-name': 'service_ODUC2',
173 'operation': 'create',
174 'service-rate': '200',
175 'service-format': 'ODU',
176 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
178 self.assertEqual(response['status_code'], requests.codes.ok)
179 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
181 {'node-id': 'XPDR-A2',
182 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
184 def test_08_get_portmapping_network1(self):
185 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
186 self.assertEqual(response['status_code'], requests.codes.ok)
187 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
189 self.NETWORK2_CHECK_DICT,
192 def test_09_check_interface_oduc2(self):
193 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
194 self.assertEqual(response['status_code'], requests.codes.ok)
196 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
197 'administrative-state': 'inService',
198 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
199 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
200 'type': 'org-openroadm-interfaces:otnOdu',
201 'supporting-port': 'L1'}
203 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
204 'rate': 'org-openroadm-otn-common-types:ODUCn',
205 'tx-sapi': 'LY9PxYJqUbw=',
206 'tx-dapi': 'LY9PxYJqUbw=',
207 'expected-sapi': 'LY9PxYJqUbw=',
208 'expected-dapi': 'LY9PxYJqUbw=',
210 "degthr-percentage": 100,
211 "monitoring-mode": "terminated",
214 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
215 response['interface'][0])
216 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
217 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
218 self.assertDictEqual(
219 {'payload-type': '22', 'exp-payload-type': '22'},
220 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
222 # 1c) create Ethernet device renderer
223 def test_10_otn_service_path_create_100ge(self):
224 response = test_utils_rfc8040.transportpce_api_rpc_request(
225 'transportpce-device-renderer', 'otn-service-path',
227 'service-name': 'service_Ethernet',
228 'operation': 'create',
229 'service-rate': '100',
230 'service-format': 'Ethernet',
231 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
232 'ethernet-encoding': 'eth encode',
233 'opucn-trib-slots': ['1.1', '1.20']
235 self.assertEqual(response['status_code'], requests.codes.ok)
236 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
237 self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
238 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
239 response['output']['node-interface'][0]['connection-id'])
240 self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
241 self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
242 response['output']['node-interface'][0]['odu-interface-id'])
243 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
244 response['output']['node-interface'][0]['odu-interface-id'])
246 def test_11_check_interface_100ge_client(self):
247 response = test_utils_rfc8040.check_node_attribute_request(
248 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
249 self.assertEqual(response['status_code'], requests.codes.ok)
250 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
251 'administrative-state': 'inService',
252 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
253 'type': 'org-openroadm-interfaces:ethernetCsmacd',
254 'supporting-port': 'C1'
256 input_dict_2 = {'speed': 100000}
257 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
258 response['interface'][0])
259 self.assertDictEqual(dict(input_dict_2,
260 **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
261 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
263 def test_12_check_interface_odu4_client(self):
264 response = test_utils_rfc8040.check_node_attribute_request(
265 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
266 self.assertEqual(response['status_code'], requests.codes.ok)
267 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
268 'administrative-state': 'inService',
269 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
270 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
271 'type': 'org-openroadm-interfaces:otnOdu',
272 'supporting-port': 'C1'}
274 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
275 'rate': 'org-openroadm-otn-common-types:ODU4',
276 'monitoring-mode': 'terminated'}
278 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
279 response['interface'][0])
280 self.assertDictEqual(dict(input_dict_2,
281 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
282 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
283 self.assertDictEqual(
284 {'payload-type': '07', 'exp-payload-type': '07'},
285 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
287 def test_13_check_interface_odu4_network(self):
288 response = test_utils_rfc8040.check_node_attribute_request(
289 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
290 self.assertEqual(response['status_code'], requests.codes.ok)
291 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
292 'administrative-state': 'inService',
293 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
294 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
295 'type': 'org-openroadm-interfaces:otnOdu',
296 'supporting-port': 'L1'}
298 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
299 'rate': 'org-openroadm-otn-common-types:ODU4',
300 'monitoring-mode': 'not-terminated'}
301 input_dict_3 = {'trib-port-number': 1}
303 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
304 response['interface'][0])
305 self.assertDictEqual(dict(input_dict_2,
306 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
307 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
308 self.assertDictEqual(dict(input_dict_3,
309 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
310 'parent-odu-allocation']),
311 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
312 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
313 ['opucn-trib-slots'])
314 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
315 ['opucn-trib-slots'])
317 def test_14_check_odu_connection_xpdra2(self):
318 response = test_utils_rfc8040.check_node_attribute_request(
320 "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
321 self.assertEqual(response['status_code'], requests.codes.ok)
324 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
325 'direction': 'bidirectional'
328 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
329 response['odu-connection'][0])
330 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
331 response['odu-connection'][0]['destination'])
332 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
333 response['odu-connection'][0]['source'])
335 # 1d) Delete Ethernet device interfaces
336 def test_15_otn_service_path_delete_100ge(self):
337 response = test_utils_rfc8040.transportpce_api_rpc_request(
338 'transportpce-device-renderer', 'otn-service-path',
340 'service-name': 'service_Ethernet',
341 'operation': 'delete',
342 'service-rate': '100',
343 'service-format': 'Ethernet',
344 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
345 'ethernet-encoding': 'eth encode',
347 'trib-port-number': '1'
349 self.assertEqual(response['status_code'], requests.codes.ok)
350 self.assertIn('Request processed', response['output']['result'])
352 def test_16_check_no_odu_connection(self):
353 response = test_utils_rfc8040.check_node_attribute_request(
355 "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
356 self.assertEqual(response['status_code'], requests.codes.conflict)
358 def test_17_check_no_interface_odu_network(self):
359 response = test_utils_rfc8040.check_node_attribute_request(
360 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
361 self.assertEqual(response['status_code'], requests.codes.conflict)
363 def test_18_check_no_interface_odu_client(self):
364 response = test_utils_rfc8040.check_node_attribute_request(
365 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
366 self.assertEqual(response['status_code'], requests.codes.conflict)
368 def test_19_check_no_interface_100ge_client(self):
369 response = test_utils_rfc8040.check_node_attribute_request(
370 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
371 self.assertEqual(response['status_code'], requests.codes.conflict)
373 # 1e) Delete ODUC2 device interfaces
374 def test_20_otn_service_path_delete_oduc2(self):
375 response = test_utils_rfc8040.transportpce_api_rpc_request(
376 'transportpce-device-renderer', 'otn-service-path',
378 'service-name': 'service_ODUC2',
379 'operation': 'delete',
380 'service-rate': '200',
381 'service-format': 'ODU',
382 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
384 self.assertEqual(response['status_code'], requests.codes.ok)
385 self.assertIn('Request processed', response['output']['result'])
386 # Here you have remove the added oducn supporting port interface
387 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
389 def test_21_check_no_interface_oduc2(self):
390 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
391 self.assertEqual(response['status_code'], requests.codes.conflict)
393 # Check if port-mapping data is updated, where the supporting-oducn is deleted
394 def test_21a_check_no_oduc2(self):
395 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
396 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
398 # 1f) Delete OTUC2 device interfaces
400 def test_22_service_path_delete_otuc2(self):
401 response = test_utils_rfc8040.transportpce_api_rpc_request(
402 'transportpce-device-renderer', 'service-path',
404 'service-name': 'service_OTUC2',
406 'modulation-format': 'dp-qpsk',
407 'operation': 'delete',
408 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
409 'center-freq': 196.1,
411 'min-freq': 196.0375,
413 'lower-spectral-slot-number': 755,
414 'higher-spectral-slot-number': 768
416 self.assertEqual(response['status_code'], requests.codes.ok)
417 self.assertIn('Request processed', response['output']['result'])
418 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
420 def test_23_check_no_interface_otuc2(self):
421 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
422 self.assertEqual(response['status_code'], requests.codes.conflict)
424 def test_24_check_no_interface_otsig(self):
425 response = test_utils_rfc8040.check_node_attribute_request(
426 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
427 self.assertEqual(response['status_code'], requests.codes.conflict)
429 def test_25_check_no_interface_otsi(self):
430 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
431 self.assertEqual(response['status_code'], requests.codes.conflict)
433 def test_25a_check_no_otuc2(self):
434 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
435 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
437 # 2a) create a OTUC3 device renderer
438 def test_26_service_path_create_otuc3(self):
439 response = test_utils_rfc8040.transportpce_api_rpc_request(
440 'transportpce-device-renderer', 'service-path',
442 'service-name': 'service_OTUC3',
444 'modulation-format': 'dp-qam8',
445 'operation': 'create',
446 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
447 'center-freq': 196.1,
449 'min-freq': 196.0375,
451 'lower-spectral-slot-number': 755,
452 'higher-spectral-slot-number': 768
454 self.assertEqual(response['status_code'], requests.codes.ok)
455 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
457 {'node-id': 'XPDR-A2',
458 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
459 'och-interface-id': ['XPDR2-NETWORK1-755:768',
460 'XPDR2-NETWORK1-OTSIGROUP-300G']},
461 response['output']['node-interface'])
463 def test_27_get_portmapping_network1(self):
464 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
465 self.assertEqual(response['status_code'], requests.codes.ok)
466 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
468 self.NETWORK2_CHECK_DICT,
471 def test_28_check_interface_otsi(self):
472 # pylint: disable=line-too-long
473 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
474 self.assertEqual(response['status_code'], requests.codes.ok)
476 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
477 'administrative-state': 'inService',
478 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
479 'type': 'org-openroadm-interfaces:otsi',
480 'supporting-port': 'L1'}
482 "frequency": 196.0812,
483 "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
484 "fec": "org-openroadm-common-types:ofec",
485 "transmit-power": -5,
486 "provision-mode": "explicit",
487 "modulation-format": "dp-qam8"}
489 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
490 response['interface'][0])
491 self.assertDictEqual(dict(input_dict_2,
492 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
493 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
494 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
495 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
497 def test_29_check_interface_otsig(self):
498 response = test_utils_rfc8040.check_node_attribute_request(
499 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
500 self.assertEqual(response['status_code'], requests.codes.ok)
501 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
502 'administrative-state': 'inService',
503 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
504 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
505 'type': 'org-openroadm-interfaces:otsi-group',
506 'supporting-port': 'L1'}
507 input_dict_2 = {"group-id": 1,
508 "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
510 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
511 response['interface'][0])
512 self.assertDictEqual(dict(input_dict_2,
513 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
514 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
516 def test_30_check_interface_otuc3(self):
517 response = test_utils_rfc8040.check_node_attribute_request(
518 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
519 self.assertEqual(response['status_code'], requests.codes.ok)
520 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
521 'administrative-state': 'inService',
522 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
523 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
524 'type': 'org-openroadm-interfaces:otnOtu',
525 'supporting-port': 'L1'}
526 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
527 "degthr-percentage": 100,
528 "tim-detect-mode": "Disabled",
532 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
533 response['interface'][0])
534 self.assertDictEqual(dict(input_dict_2,
535 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
536 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
538 # 2b) create a ODUC3 device renderer
539 def test_31_otn_service_path_create_oduc3(self):
540 response = test_utils_rfc8040.transportpce_api_rpc_request(
541 'transportpce-device-renderer', 'otn-service-path',
543 'service-name': 'service_ODUC3',
544 'operation': 'create',
545 'service-rate': '300',
546 'service-format': 'ODU',
547 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
549 self.assertEqual(response['status_code'], requests.codes.ok)
550 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
552 {'node-id': 'XPDR-A2',
553 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
555 def test_32_get_portmapping_network1(self):
556 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
557 self.assertEqual(response['status_code'], requests.codes.ok)
558 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
560 self.NETWORK2_CHECK_DICT,
563 def test_33_check_interface_oduc3(self):
564 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
565 self.assertEqual(response['status_code'], requests.codes.ok)
567 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
568 'administrative-state': 'inService',
569 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
570 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
571 'type': 'org-openroadm-interfaces:otnOdu',
572 'supporting-port': 'L1'}
574 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
575 'rate': 'org-openroadm-otn-common-types:ODUCn',
576 'tx-sapi': 'LY9PxYJqUbw=',
577 'tx-dapi': 'LY9PxYJqUbw=',
578 'expected-sapi': 'LY9PxYJqUbw=',
579 'expected-dapi': 'LY9PxYJqUbw=',
581 "degthr-percentage": 100,
582 "monitoring-mode": "terminated",
585 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
586 response['interface'][0])
587 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
588 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
589 self.assertDictEqual(
590 {'payload-type': '22', 'exp-payload-type': '22'},
591 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
593 # 2c) create Ethernet device renderer
594 # No change in the ethernet device renderer so skipping those tests
595 # 2d) Delete Ethernet device interfaces
596 # No change in the ethernet device renderer so skipping those tests
598 # 2e) Delete ODUC3 device interfaces
599 def test_34_otn_service_path_delete_oduc3(self):
600 response = test_utils_rfc8040.transportpce_api_rpc_request(
601 'transportpce-device-renderer', 'otn-service-path',
603 'service-name': 'service_ODUC3',
604 'operation': 'delete',
605 'service-rate': '300',
606 'service-format': 'ODU',
607 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
609 self.assertEqual(response['status_code'], requests.codes.ok)
610 self.assertIn('Request processed', response['output']['result'])
611 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
613 def test_35_check_no_interface_oduc3(self):
614 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
615 self.assertEqual(response['status_code'], requests.codes.conflict)
617 def test_35a_check_no_oduc3(self):
618 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
619 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
621 # 2f) Delete OTUC3 device interfaces
622 def test_36_service_path_delete_otuc3(self):
623 response = test_utils_rfc8040.transportpce_api_rpc_request(
624 'transportpce-device-renderer', 'service-path',
626 'service-name': 'service_OTUC3',
628 'modulation-format': 'dp-qam8',
629 'operation': 'delete',
630 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
631 'center-freq': 196.1,
633 'min-freq': 196.0375,
635 'lower-spectral-slot-number': 755,
636 'higher-spectral-slot-number': 768
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 self.assertIn('Request processed', response['output']['result'])
640 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
642 def test_37_check_no_interface_otuc3(self):
643 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
644 self.assertEqual(response['status_code'], requests.codes.conflict)
646 def test_38_check_no_interface_otsig(self):
647 response = test_utils_rfc8040.check_node_attribute_request(
648 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
649 self.assertEqual(response['status_code'], requests.codes.conflict)
651 def test_39_check_no_interface_otsi(self):
652 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
653 self.assertEqual(response['status_code'], requests.codes.conflict)
655 def test_39a_check_no_otuc3(self):
656 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
657 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
659 # 3a) create a OTUC4 device renderer
660 def test_40_service_path_create_otuc4(self):
661 response = test_utils_rfc8040.transportpce_api_rpc_request(
662 'transportpce-device-renderer', 'service-path',
664 'service-name': 'service_OTUC4',
666 'modulation-format': 'dp-qam16',
667 'operation': 'create',
668 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
669 'center-freq': 196.1,
671 'min-freq': 196.0375,
673 'lower-spectral-slot-number': 755,
674 'higher-spectral-slot-number': 768
676 self.assertEqual(response['status_code'], requests.codes.ok)
677 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
679 {'node-id': 'XPDR-A2',
680 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
681 'och-interface-id': ['XPDR2-NETWORK1-755:768',
682 'XPDR2-NETWORK1-OTSIGROUP-400G']},
683 response['output']['node-interface'])
685 def test_41_get_portmapping_network1(self):
686 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
687 self.assertEqual(response['status_code'], requests.codes.ok)
688 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
690 self.NETWORK2_CHECK_DICT,
693 def test_42_check_interface_otsi(self):
694 # pylint: disable=line-too-long
695 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
696 self.assertEqual(response['status_code'], requests.codes.ok)
698 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
699 'administrative-state': 'inService',
700 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
701 'type': 'org-openroadm-interfaces:otsi',
702 'supporting-port': 'L1'}
704 "frequency": 196.0812,
705 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
706 "fec": "org-openroadm-common-types:ofec",
707 "transmit-power": -5,
708 "provision-mode": "explicit",
709 "modulation-format": "dp-qam16"}
711 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
712 response['interface'][0])
713 self.assertDictEqual(dict(input_dict_2,
714 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
715 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
716 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
717 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
719 def test_43_check_interface_otsig(self):
720 response = test_utils_rfc8040.check_node_attribute_request(
721 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
722 self.assertEqual(response['status_code'], requests.codes.ok)
723 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
724 'administrative-state': 'inService',
725 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
726 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
727 'type': 'org-openroadm-interfaces:otsi-group',
728 'supporting-port': 'L1'}
729 input_dict_2 = {"group-id": 1,
730 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
732 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
733 response['interface'][0])
734 self.assertDictEqual(dict(input_dict_2,
735 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
736 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
738 def test_44_check_interface_otuc4(self):
739 response = test_utils_rfc8040.check_node_attribute_request(
740 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
741 self.assertEqual(response['status_code'], requests.codes.ok)
742 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
743 'administrative-state': 'inService',
744 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
745 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
746 'type': 'org-openroadm-interfaces:otnOtu',
747 'supporting-port': 'L1'}
748 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
749 "degthr-percentage": 100,
750 "tim-detect-mode": "Disabled",
754 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
755 response['interface'][0])
756 self.assertDictEqual(dict(input_dict_2,
757 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
758 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
760 # 3b) create a ODUC4 device renderer
761 def test_45_otn_service_path_create_oduc3(self):
762 response = test_utils_rfc8040.transportpce_api_rpc_request(
763 'transportpce-device-renderer', 'otn-service-path',
765 'service-name': 'service_ODUC4',
766 'operation': 'create',
767 'service-rate': '400',
768 'service-format': 'ODU',
769 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
771 self.assertEqual(response['status_code'], requests.codes.ok)
772 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
774 {'node-id': 'XPDR-A2',
775 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
777 def test_46_get_portmapping_network1(self):
778 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
779 self.assertEqual(response['status_code'], requests.codes.ok)
780 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
782 self.NETWORK2_CHECK_DICT,
785 def test_47_check_interface_oduc4(self):
786 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
787 self.assertEqual(response['status_code'], requests.codes.ok)
789 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
790 'administrative-state': 'inService',
791 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
792 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
793 'type': 'org-openroadm-interfaces:otnOdu',
794 'supporting-port': 'L1'}
796 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
797 'rate': 'org-openroadm-otn-common-types:ODUCn',
798 'tx-sapi': 'LY9PxYJqUbw=',
799 'tx-dapi': 'LY9PxYJqUbw=',
800 'expected-sapi': 'LY9PxYJqUbw=',
801 'expected-dapi': 'LY9PxYJqUbw=',
803 "degthr-percentage": 100,
804 "monitoring-mode": "terminated",
807 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
808 response['interface'][0])
809 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
810 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
811 self.assertDictEqual(
812 {'payload-type': '22', 'exp-payload-type': '22'},
813 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
815 # 3c) create Ethernet device renderer
816 # No change in the ethernet device renderer so skipping those tests
817 # 3d) Delete Ethernet device interfaces
818 # No change in the ethernet device renderer so skipping those tests
820 # 3e) Delete ODUC4 device interfaces
821 def test_48_otn_service_path_delete_oduc4(self):
822 response = test_utils_rfc8040.transportpce_api_rpc_request(
823 'transportpce-device-renderer', 'otn-service-path',
825 'service-name': 'service_ODUC4',
826 'operation': 'delete',
827 'service-rate': '400',
828 'service-format': 'ODU',
829 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
831 self.assertEqual(response['status_code'], requests.codes.ok)
832 self.assertIn('Request processed', response['output']['result'])
833 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
835 def test_49_check_no_interface_oduc4(self):
836 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
837 self.assertEqual(response['status_code'], requests.codes.conflict)
839 def test_49a_check_no_oduc4(self):
840 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
841 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
843 # 3f) Delete OTUC4 device interfaces
844 def test_50_service_path_delete_otuc4(self):
845 response = test_utils_rfc8040.transportpce_api_rpc_request(
846 'transportpce-device-renderer', 'service-path',
848 'service-name': 'service_OTUC4',
850 'modulation-format': 'dp-qam16',
851 'operation': 'delete',
852 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
853 'center-freq': 196.1,
855 'min-freq': 196.0375,
857 'lower-spectral-slot-number': 755,
858 'higher-spectral-slot-number': 768
860 self.assertEqual(response['status_code'], requests.codes.ok)
861 self.assertIn('Request processed', response['output']['result'])
862 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
864 def test_51_check_no_interface_otuc4(self):
865 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
866 self.assertEqual(response['status_code'], requests.codes.conflict)
868 def test_52_check_no_interface_otsig(self):
869 response = test_utils_rfc8040.check_node_attribute_request(
870 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
871 self.assertEqual(response['status_code'], requests.codes.conflict)
873 def test_53_check_no_interface_otsi(self):
874 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
875 self.assertEqual(response['status_code'], requests.codes.conflict)
877 def test_53a_check_no_otuc4(self):
878 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
879 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
881 # Disconnect the XPDR
882 def test_54_xpdr_device_disconnection(self):
883 response = test_utils_rfc8040.unmount_device("XPDR-A2")
884 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
886 def test_55_xpdr_device_disconnected(self):
887 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
888 self.assertEqual(response['status_code'], requests.codes.conflict)
889 self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
890 self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
891 self.assertEqual(response['connection-status']['error-message'],
892 'Request could not be completed because the relevant data model content does not exist')
894 def test_56_xpdr_device_not_connected(self):
895 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
896 self.assertEqual(response['status_code'], requests.codes.conflict)
897 self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
898 self.assertEqual(response['node-info']['error-tag'], 'data-missing')
899 self.assertEqual(response['node-info']['error-message'],
900 'Request could not be completed because the relevant data model content does not exist')
903 if __name__ == '__main__':
904 unittest.main(verbosity=2)