2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
29 "supporting-port": "L1",
30 "supported-interface-capability": [
31 "org-openroadm-port-types:if-otsi-otsigroup"
33 "port-direction": "bidirectional",
34 "port-qual": "switch-network",
35 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
36 "xponder-type": "mpdr",
37 'lcp-hash-val': 'LY9PxYJqUbw=',
38 'port-admin-state': 'InService',
39 'port-oper-state': 'InService'}
44 cls.processes = test_utils_rfc8040.start_tpce()
45 cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
48 def tearDownClass(cls):
49 # pylint: disable=not-an-iterable
50 for process in cls.processes:
51 test_utils_rfc8040.shutdown_process(process)
52 print("all processes killed")
55 # pylint: disable=consider-using-f-string
56 print("execution of {}".format(self.id().split(".")[-1]))
59 def test_01_xpdr_device_connection(self):
60 response = test_utils_rfc8040.mount_device("XPDR-A2",
61 ('xpdra2', self.NODE_VERSION))
62 self.assertEqual(response.status_code, requests.codes.created,
63 test_utils_rfc8040.CODE_SHOULD_BE_201)
65 # Check if the node appears in the ietf-network topology
66 # this test has been removed, since it already exists in port-mapping
67 # 1a) create a OTUC2 device renderer
68 def test_02_service_path_create_otuc2(self):
69 response = test_utils_rfc8040.transportpce_api_rpc_request(
70 'transportpce-device-renderer', 'service-path',
72 'service-name': 'service_OTUC2',
74 'modulation-format': 'dp-qpsk',
75 'operation': 'create',
76 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
81 'lower-spectral-slot-number': 755,
82 'higher-spectral-slot-number': 768
84 self.assertEqual(response['status_code'], requests.codes.ok)
85 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
87 {'node-id': 'XPDR-A2',
88 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
89 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
91 def test_03_get_portmapping_network1(self):
92 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
93 self.assertEqual(response['status_code'], requests.codes.ok)
94 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
96 self.NETWORK2_CHECK_DICT,
99 def test_04_check_interface_otsi(self):
100 # pylint: disable=line-too-long
101 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
102 self.assertEqual(response['status_code'], requests.codes.ok)
103 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
104 'administrative-state': 'inService',
105 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
106 'type': 'org-openroadm-interfaces:otsi',
107 'supporting-port': 'L1'}
109 "frequency": 196.0812,
110 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
111 "fec": "org-openroadm-common-types:ofec",
112 "transmit-power": -5,
113 "provision-mode": "explicit",
114 "modulation-format": "dp-qpsk"}
116 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
117 response['interface'][0])
118 self.assertDictEqual(dict(input_dict_2,
119 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
120 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
121 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
122 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
124 def test_05_check_interface_otsig(self):
125 response = test_utils_rfc8040.check_node_attribute_request(
126 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
127 self.assertEqual(response['status_code'], requests.codes.ok)
128 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
129 'administrative-state': 'inService',
130 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
131 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
132 'type': 'org-openroadm-interfaces:otsi-group',
133 'supporting-port': 'L1'}
134 input_dict_2 = {"group-id": 1,
135 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
137 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
138 response['interface'][0])
139 self.assertDictEqual(dict(input_dict_2,
140 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
141 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
143 def test_06_check_interface_otuc2(self):
144 response = test_utils_rfc8040.check_node_attribute_request(
145 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
146 self.assertEqual(response['status_code'], requests.codes.ok)
147 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
148 'administrative-state': 'inService',
149 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
150 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
151 'type': 'org-openroadm-interfaces:otnOtu',
152 'supporting-port': 'L1'}
153 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
154 "degthr-percentage": 100,
155 "tim-detect-mode": "Disabled",
159 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
160 response['interface'][0])
161 self.assertDictEqual(dict(input_dict_2,
162 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
163 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
165 # 1b) create a ODUC2 device renderer
166 def test_07_otn_service_path_create_oduc2(self):
167 response = test_utils_rfc8040.transportpce_api_rpc_request(
168 'transportpce-device-renderer', 'otn-service-path',
170 'service-name': 'service_ODUC2',
171 'operation': 'create',
172 'service-rate': '200',
173 'service-format': 'ODU',
174 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
176 self.assertEqual(response['status_code'], requests.codes.ok)
177 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
179 {'node-id': 'XPDR-A2',
180 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
182 def test_08_get_portmapping_network1(self):
183 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
184 self.assertEqual(response['status_code'], requests.codes.ok)
185 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
187 self.NETWORK2_CHECK_DICT,
190 def test_09_check_interface_oduc2(self):
191 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
192 self.assertEqual(response['status_code'], requests.codes.ok)
194 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
195 'administrative-state': 'inService',
196 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
197 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
198 'type': 'org-openroadm-interfaces:otnOdu',
199 'supporting-port': 'L1'}
201 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
202 'rate': 'org-openroadm-otn-common-types:ODUCn',
203 'tx-sapi': 'LY9PxYJqUbw=',
204 'tx-dapi': 'LY9PxYJqUbw=',
205 'expected-sapi': 'LY9PxYJqUbw=',
206 'expected-dapi': 'LY9PxYJqUbw=',
208 "degthr-percentage": 100,
209 "monitoring-mode": "terminated",
212 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
213 response['interface'][0])
214 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
215 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
216 self.assertDictEqual(
217 {'payload-type': '22', 'exp-payload-type': '22'},
218 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
220 # 1c) create Ethernet device renderer
221 def test_10_otn_service_path_create_100ge(self):
222 response = test_utils_rfc8040.transportpce_api_rpc_request(
223 'transportpce-device-renderer', 'otn-service-path',
225 'service-name': 'service_Ethernet',
226 'operation': 'create',
227 'service-rate': '100',
228 'service-format': 'Ethernet',
229 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
230 'ethernet-encoding': 'eth encode',
231 'opucn-trib-slots': ['1.1', '1.20']
233 self.assertEqual(response['status_code'], requests.codes.ok)
234 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
235 self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
236 self.assertIn('XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
237 response['output']['node-interface'][0]['connection-id'])
238 self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
239 self.assertIn('XPDR2-NETWORK1-ODU4',
240 response['output']['node-interface'][0]['odu-interface-id'])
241 self.assertIn('XPDR2-CLIENT1-ODU4',
242 response['output']['node-interface'][0]['odu-interface-id'])
244 def test_11_check_interface_100ge_client(self):
245 response = test_utils_rfc8040.check_node_attribute_request(
246 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
247 self.assertEqual(response['status_code'], requests.codes.ok)
248 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
249 'administrative-state': 'inService',
250 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
251 'type': 'org-openroadm-interfaces:ethernetCsmacd',
252 'supporting-port': 'C1'
254 input_dict_2 = {'speed': 100000}
255 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
256 response['interface'][0])
257 self.assertDictEqual(dict(input_dict_2,
258 **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
259 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
261 def test_12_check_interface_odu4_client(self):
262 response = test_utils_rfc8040.check_node_attribute_request(
263 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4")
264 self.assertEqual(response['status_code'], requests.codes.ok)
265 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
266 'administrative-state': 'inService',
267 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
268 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
269 'type': 'org-openroadm-interfaces:otnOdu',
270 'supporting-port': 'C1'}
272 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
273 'rate': 'org-openroadm-otn-common-types:ODU4',
274 'monitoring-mode': 'terminated'}
276 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
277 response['interface'][0])
278 self.assertDictEqual(dict(input_dict_2,
279 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
280 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
281 self.assertDictEqual(
282 {'payload-type': '07', 'exp-payload-type': '07'},
283 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
285 def test_13_check_interface_odu4_network(self):
286 response = test_utils_rfc8040.check_node_attribute_request(
287 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4")
288 self.assertEqual(response['status_code'], requests.codes.ok)
289 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
290 'administrative-state': 'inService',
291 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
292 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
293 'type': 'org-openroadm-interfaces:otnOdu',
294 'supporting-port': 'L1'}
296 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
297 'rate': 'org-openroadm-otn-common-types:ODU4',
298 'monitoring-mode': 'not-terminated'}
299 input_dict_3 = {'trib-port-number': 1}
301 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
302 response['interface'][0])
303 self.assertDictEqual(dict(input_dict_2,
304 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
305 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
306 self.assertDictEqual(dict(input_dict_3,
307 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
308 'parent-odu-allocation']),
309 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
310 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
311 ['opucn-trib-slots'])
312 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
313 ['opucn-trib-slots'])
315 def test_14_check_odu_connection_xpdra2(self):
316 response = test_utils_rfc8040.check_node_attribute_request(
318 "odu-connection", "XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
319 self.assertEqual(response['status_code'], requests.codes.ok)
322 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
323 'direction': 'bidirectional'
326 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
327 response['odu-connection'][0])
328 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
329 response['odu-connection'][0]['destination'])
330 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
331 response['odu-connection'][0]['source'])
333 # 1d) Delete Ethernet device interfaces
334 def test_15_otn_service_path_delete_100ge(self):
335 response = test_utils_rfc8040.transportpce_api_rpc_request(
336 'transportpce-device-renderer', 'otn-service-path',
338 'service-name': 'service_Ethernet',
339 'operation': 'delete',
340 'service-rate': '100',
341 'service-format': 'Ethernet',
342 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
343 'ethernet-encoding': 'eth encode',
345 'trib-port-number': '1'
347 self.assertEqual(response['status_code'], requests.codes.ok)
348 self.assertIn('Request processed', response['output']['result'])
350 def test_16_check_no_odu_connection(self):
351 response = test_utils_rfc8040.check_node_attribute_request(
353 "odu-connection", "XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
354 self.assertEqual(response['status_code'], requests.codes.conflict)
356 def test_17_check_no_interface_odu_network(self):
357 response = test_utils_rfc8040.check_node_attribute_request(
358 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4")
359 self.assertEqual(response['status_code'], requests.codes.conflict)
361 def test_18_check_no_interface_odu_client(self):
362 response = test_utils_rfc8040.check_node_attribute_request(
363 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4")
364 self.assertEqual(response['status_code'], requests.codes.conflict)
366 def test_19_check_no_interface_100ge_client(self):
367 response = test_utils_rfc8040.check_node_attribute_request(
368 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
369 self.assertEqual(response['status_code'], requests.codes.conflict)
371 # 1e) Delete ODUC2 device interfaces
372 def test_20_otn_service_path_delete_oduc2(self):
373 response = test_utils_rfc8040.transportpce_api_rpc_request(
374 'transportpce-device-renderer', 'otn-service-path',
376 'service-name': 'service_ODUC2',
377 'operation': 'delete',
378 'service-rate': '200',
379 'service-format': 'ODU',
380 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
382 self.assertEqual(response['status_code'], requests.codes.ok)
383 self.assertIn('Request processed', response['output']['result'])
384 # Here you have remove the added oducn supporting port interface
385 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
387 def test_21_check_no_interface_oduc2(self):
388 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
389 self.assertEqual(response['status_code'], requests.codes.conflict)
391 # Check if port-mapping data is updated, where the supporting-oducn is deleted
392 def test_21a_check_no_oduc2(self):
393 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
394 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
396 # 1f) Delete OTUC2 device interfaces
398 def test_22_service_path_delete_otuc2(self):
399 response = test_utils_rfc8040.transportpce_api_rpc_request(
400 'transportpce-device-renderer', 'service-path',
402 'service-name': 'service_OTUC2',
404 'modulation-format': 'dp-qpsk',
405 'operation': 'delete',
406 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
407 'center-freq': 196.1,
409 'min-freq': 196.0375,
411 'lower-spectral-slot-number': 755,
412 'higher-spectral-slot-number': 768
414 self.assertEqual(response['status_code'], requests.codes.ok)
415 self.assertIn('Request processed', response['output']['result'])
416 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
418 def test_23_check_no_interface_otuc2(self):
419 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
420 self.assertEqual(response['status_code'], requests.codes.conflict)
422 def test_24_check_no_interface_otsig(self):
423 response = test_utils_rfc8040.check_node_attribute_request(
424 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
425 self.assertEqual(response['status_code'], requests.codes.conflict)
427 def test_25_check_no_interface_otsi(self):
428 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
429 self.assertEqual(response['status_code'], requests.codes.conflict)
431 def test_25a_check_no_otuc2(self):
432 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
433 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
435 # 2a) create a OTUC3 device renderer
436 def test_26_service_path_create_otuc3(self):
437 response = test_utils_rfc8040.transportpce_api_rpc_request(
438 'transportpce-device-renderer', 'service-path',
440 'service-name': 'service_OTUC3',
442 'modulation-format': 'dp-qam8',
443 'operation': 'create',
444 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
445 'center-freq': 196.1,
447 'min-freq': 196.0375,
449 'lower-spectral-slot-number': 755,
450 'higher-spectral-slot-number': 768
452 self.assertEqual(response['status_code'], requests.codes.ok)
453 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
455 {'node-id': 'XPDR-A2',
456 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
457 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
459 def test_27_get_portmapping_network1(self):
460 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
461 self.assertEqual(response['status_code'], requests.codes.ok)
462 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
464 self.NETWORK2_CHECK_DICT,
467 def test_28_check_interface_otsi(self):
468 # pylint: disable=line-too-long
469 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
470 self.assertEqual(response['status_code'], requests.codes.ok)
472 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
473 'administrative-state': 'inService',
474 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
475 'type': 'org-openroadm-interfaces:otsi',
476 'supporting-port': 'L1'}
478 "frequency": 196.0812,
479 "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
480 "fec": "org-openroadm-common-types:ofec",
481 "transmit-power": -5,
482 "provision-mode": "explicit",
483 "modulation-format": "dp-qam8"}
485 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
486 response['interface'][0])
487 self.assertDictEqual(dict(input_dict_2,
488 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
489 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
490 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
491 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
493 def test_29_check_interface_otsig(self):
494 response = test_utils_rfc8040.check_node_attribute_request(
495 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
496 self.assertEqual(response['status_code'], requests.codes.ok)
497 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
498 'administrative-state': 'inService',
499 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
500 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
501 'type': 'org-openroadm-interfaces:otsi-group',
502 'supporting-port': 'L1'}
503 input_dict_2 = {"group-id": 1,
504 "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
506 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
507 response['interface'][0])
508 self.assertDictEqual(dict(input_dict_2,
509 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
510 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
512 def test_30_check_interface_otuc3(self):
513 response = test_utils_rfc8040.check_node_attribute_request(
514 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
515 self.assertEqual(response['status_code'], requests.codes.ok)
516 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
517 'administrative-state': 'inService',
518 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
519 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
520 'type': 'org-openroadm-interfaces:otnOtu',
521 'supporting-port': 'L1'}
522 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
523 "degthr-percentage": 100,
524 "tim-detect-mode": "Disabled",
528 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
529 response['interface'][0])
530 self.assertDictEqual(dict(input_dict_2,
531 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
532 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
534 # 2b) create a ODUC3 device renderer
535 def test_31_otn_service_path_create_oduc3(self):
536 response = test_utils_rfc8040.transportpce_api_rpc_request(
537 'transportpce-device-renderer', 'otn-service-path',
539 'service-name': 'service_ODUC3',
540 'operation': 'create',
541 'service-rate': '300',
542 'service-format': 'ODU',
543 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
545 self.assertEqual(response['status_code'], requests.codes.ok)
546 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
548 {'node-id': 'XPDR-A2',
549 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
551 def test_32_get_portmapping_network1(self):
552 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
553 self.assertEqual(response['status_code'], requests.codes.ok)
554 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
556 self.NETWORK2_CHECK_DICT,
559 def test_33_check_interface_oduc3(self):
560 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
561 self.assertEqual(response['status_code'], requests.codes.ok)
563 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
564 'administrative-state': 'inService',
565 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
566 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
567 'type': 'org-openroadm-interfaces:otnOdu',
568 'supporting-port': 'L1'}
570 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
571 'rate': 'org-openroadm-otn-common-types:ODUCn',
572 'tx-sapi': 'LY9PxYJqUbw=',
573 'tx-dapi': 'LY9PxYJqUbw=',
574 'expected-sapi': 'LY9PxYJqUbw=',
575 'expected-dapi': 'LY9PxYJqUbw=',
577 "degthr-percentage": 100,
578 "monitoring-mode": "terminated",
581 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
582 response['interface'][0])
583 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
584 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
585 self.assertDictEqual(
586 {'payload-type': '22', 'exp-payload-type': '22'},
587 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
589 # 2c) create Ethernet device renderer
590 # No change in the ethernet device renderer so skipping those tests
591 # 2d) Delete Ethernet device interfaces
592 # No change in the ethernet device renderer so skipping those tests
594 # 2e) Delete ODUC3 device interfaces
595 def test_34_otn_service_path_delete_oduc3(self):
596 response = test_utils_rfc8040.transportpce_api_rpc_request(
597 'transportpce-device-renderer', 'otn-service-path',
599 'service-name': 'service_ODUC3',
600 'operation': 'delete',
601 'service-rate': '300',
602 'service-format': 'ODU',
603 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
605 self.assertEqual(response['status_code'], requests.codes.ok)
606 self.assertIn('Request processed', response['output']['result'])
607 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
609 def test_35_check_no_interface_oduc3(self):
610 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
611 self.assertEqual(response['status_code'], requests.codes.conflict)
613 def test_35a_check_no_oduc3(self):
614 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
615 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
617 # 2f) Delete OTUC3 device interfaces
618 def test_36_service_path_delete_otuc3(self):
619 response = test_utils_rfc8040.transportpce_api_rpc_request(
620 'transportpce-device-renderer', 'service-path',
622 'service-name': 'service_OTUC3',
624 'modulation-format': 'dp-qam8',
625 'operation': 'delete',
626 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
627 'center-freq': 196.1,
629 'min-freq': 196.0375,
631 'lower-spectral-slot-number': 755,
632 'higher-spectral-slot-number': 768
634 self.assertEqual(response['status_code'], requests.codes.ok)
635 self.assertIn('Request processed', response['output']['result'])
636 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
638 def test_37_check_no_interface_otuc3(self):
639 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
640 self.assertEqual(response['status_code'], requests.codes.conflict)
642 def test_38_check_no_interface_otsig(self):
643 response = test_utils_rfc8040.check_node_attribute_request(
644 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
645 self.assertEqual(response['status_code'], requests.codes.conflict)
647 def test_39_check_no_interface_otsi(self):
648 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
649 self.assertEqual(response['status_code'], requests.codes.conflict)
651 def test_39a_check_no_otuc3(self):
652 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
653 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
655 # 3a) create a OTUC4 device renderer
656 def test_40_service_path_create_otuc4(self):
657 response = test_utils_rfc8040.transportpce_api_rpc_request(
658 'transportpce-device-renderer', 'service-path',
660 'service-name': 'service_OTUC4',
662 'modulation-format': 'dp-qam16',
663 'operation': 'create',
664 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
665 'center-freq': 196.1,
667 'min-freq': 196.0375,
669 'lower-spectral-slot-number': 755,
670 'higher-spectral-slot-number': 768
672 self.assertEqual(response['status_code'], requests.codes.ok)
673 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
675 {'node-id': 'XPDR-A2',
676 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
677 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
679 def test_41_get_portmapping_network1(self):
680 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
681 self.assertEqual(response['status_code'], requests.codes.ok)
682 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
684 self.NETWORK2_CHECK_DICT,
687 def test_42_check_interface_otsi(self):
688 # pylint: disable=line-too-long
689 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
690 self.assertEqual(response['status_code'], requests.codes.ok)
692 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
693 'administrative-state': 'inService',
694 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
695 'type': 'org-openroadm-interfaces:otsi',
696 'supporting-port': 'L1'}
698 "frequency": 196.0812,
699 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
700 "fec": "org-openroadm-common-types:ofec",
701 "transmit-power": -5,
702 "provision-mode": "explicit",
703 "modulation-format": "dp-qam16"}
705 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
706 response['interface'][0])
707 self.assertDictEqual(dict(input_dict_2,
708 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
709 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
710 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
711 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
713 def test_43_check_interface_otsig(self):
714 response = test_utils_rfc8040.check_node_attribute_request(
715 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
716 self.assertEqual(response['status_code'], requests.codes.ok)
717 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
718 'administrative-state': 'inService',
719 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
720 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
721 'type': 'org-openroadm-interfaces:otsi-group',
722 'supporting-port': 'L1'}
723 input_dict_2 = {"group-id": 1,
724 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
726 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
727 response['interface'][0])
728 self.assertDictEqual(dict(input_dict_2,
729 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
730 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
732 def test_44_check_interface_otuc4(self):
733 response = test_utils_rfc8040.check_node_attribute_request(
734 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
735 self.assertEqual(response['status_code'], requests.codes.ok)
736 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
737 'administrative-state': 'inService',
738 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
739 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
740 'type': 'org-openroadm-interfaces:otnOtu',
741 'supporting-port': 'L1'}
742 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
743 "degthr-percentage": 100,
744 "tim-detect-mode": "Disabled",
748 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
749 response['interface'][0])
750 self.assertDictEqual(dict(input_dict_2,
751 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
752 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
754 # 3b) create a ODUC4 device renderer
755 def test_45_otn_service_path_create_oduc3(self):
756 response = test_utils_rfc8040.transportpce_api_rpc_request(
757 'transportpce-device-renderer', 'otn-service-path',
759 'service-name': 'service_ODUC4',
760 'operation': 'create',
761 'service-rate': '400',
762 'service-format': 'ODU',
763 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
765 self.assertEqual(response['status_code'], requests.codes.ok)
766 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
768 {'node-id': 'XPDR-A2',
769 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
771 def test_46_get_portmapping_network1(self):
772 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
773 self.assertEqual(response['status_code'], requests.codes.ok)
774 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
776 self.NETWORK2_CHECK_DICT,
779 def test_47_check_interface_oduc4(self):
780 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
781 self.assertEqual(response['status_code'], requests.codes.ok)
783 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
784 'administrative-state': 'inService',
785 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
786 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
787 'type': 'org-openroadm-interfaces:otnOdu',
788 'supporting-port': 'L1'}
790 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
791 'rate': 'org-openroadm-otn-common-types:ODUCn',
792 'tx-sapi': 'LY9PxYJqUbw=',
793 'tx-dapi': 'LY9PxYJqUbw=',
794 'expected-sapi': 'LY9PxYJqUbw=',
795 'expected-dapi': 'LY9PxYJqUbw=',
797 "degthr-percentage": 100,
798 "monitoring-mode": "terminated",
801 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
802 response['interface'][0])
803 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
804 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
805 self.assertDictEqual(
806 {'payload-type': '22', 'exp-payload-type': '22'},
807 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
809 # 3c) create Ethernet device renderer
810 # No change in the ethernet device renderer so skipping those tests
811 # 3d) Delete Ethernet device interfaces
812 # No change in the ethernet device renderer so skipping those tests
814 # 3e) Delete ODUC4 device interfaces
815 def test_48_otn_service_path_delete_oduc4(self):
816 response = test_utils_rfc8040.transportpce_api_rpc_request(
817 'transportpce-device-renderer', 'otn-service-path',
819 'service-name': 'service_ODUC4',
820 'operation': 'delete',
821 'service-rate': '400',
822 'service-format': 'ODU',
823 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
825 self.assertEqual(response['status_code'], requests.codes.ok)
826 self.assertIn('Request processed', response['output']['result'])
827 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
829 def test_49_check_no_interface_oduc4(self):
830 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
831 self.assertEqual(response['status_code'], requests.codes.conflict)
833 def test_49a_check_no_oduc4(self):
834 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
835 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
837 # 3f) Delete OTUC4 device interfaces
838 def test_50_service_path_delete_otuc4(self):
839 response = test_utils_rfc8040.transportpce_api_rpc_request(
840 'transportpce-device-renderer', 'service-path',
842 'service-name': 'service_OTUC4',
844 'modulation-format': 'dp-qam16',
845 'operation': 'delete',
846 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
847 'center-freq': 196.1,
849 'min-freq': 196.0375,
851 'lower-spectral-slot-number': 755,
852 'higher-spectral-slot-number': 768
854 self.assertEqual(response['status_code'], requests.codes.ok)
855 self.assertIn('Request processed', response['output']['result'])
856 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
858 def test_51_check_no_interface_otuc4(self):
859 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
860 self.assertEqual(response['status_code'], requests.codes.conflict)
862 def test_52_check_no_interface_otsig(self):
863 response = test_utils_rfc8040.check_node_attribute_request(
864 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
865 self.assertEqual(response['status_code'], requests.codes.conflict)
867 def test_53_check_no_interface_otsi(self):
868 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
869 self.assertEqual(response['status_code'], requests.codes.conflict)
871 def test_53a_check_no_otuc4(self):
872 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
873 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
875 # Disconnect the XPDR
876 def test_54_xpdr_device_disconnection(self):
877 response = test_utils_rfc8040.unmount_device("XPDR-A2")
878 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
880 def test_55_xpdr_device_disconnected(self):
881 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
882 self.assertEqual(response['status_code'], requests.codes.conflict)
883 self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
884 self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
885 self.assertEqual(response['connection-status']['error-message'],
886 'Request could not be completed because the relevant data model content does not exist')
888 def test_56_xpdr_device_not_connected(self):
889 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
890 self.assertEqual(response['status_code'], requests.codes.conflict)
891 self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
892 self.assertEqual(response['node-info']['error-tag'], 'data-missing')
893 self.assertEqual(response['node-info']['error-message'],
894 'Request could not be completed because the relevant data model content does not exist')
897 if __name__ == '__main__':
898 unittest.main(verbosity=2)