2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
29 "supporting-port": "L1",
30 "supported-interface-capability": [
31 "org-openroadm-port-types:if-otsi-otsigroup"
33 "port-direction": "bidirectional",
34 "port-qual": "switch-network",
35 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
36 "xponder-type": "mpdr",
37 'lcp-hash-val': 'LY9PxYJqUbw=',
38 'port-admin-state': 'InService',
39 'port-oper-state': 'InService'}
44 cls.processes = test_utils_rfc8040.start_tpce()
45 cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
48 def tearDownClass(cls):
49 # pylint: disable=not-an-iterable
50 for process in cls.processes:
51 test_utils_rfc8040.shutdown_process(process)
52 print("all processes killed")
55 # pylint: disable=consider-using-f-string
56 print("execution of {}".format(self.id().split(".")[-1]))
59 def test_01_xpdr_device_connection(self):
60 response = test_utils_rfc8040.mount_device("XPDR-A2",
61 ('xpdra2', self.NODE_VERSION))
62 self.assertEqual(response.status_code, requests.codes.created,
63 test_utils_rfc8040.CODE_SHOULD_BE_201)
65 # Check if the node appears in the ietf-network topology
66 # this test has been removed, since it already exists in port-mapping
67 # 1a) create a OTUC2 device renderer
68 def test_02_service_path_create_otuc2(self):
69 response = test_utils_rfc8040.device_renderer_service_path_request(
71 'service-name': 'service_OTUC2',
73 'modulation-format': 'dp-qpsk',
74 'operation': 'create',
75 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
80 'lower-spectral-slot-number': 755,
81 'higher-spectral-slot-number': 768
83 self.assertEqual(response['status_code'], requests.codes.ok)
84 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
86 {'node-id': 'XPDR-A2',
87 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
88 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, response['output']['node-interface'])
90 def test_03_get_portmapping_network1(self):
91 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
92 self.assertEqual(response['status_code'], requests.codes.ok)
93 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
95 self.NETWORK2_CHECK_DICT,
98 def test_04_check_interface_otsi(self):
99 # pylint: disable=line-too-long
100 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
101 self.assertEqual(response['status_code'], requests.codes.ok)
102 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
103 'administrative-state': 'inService',
104 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
105 'type': 'org-openroadm-interfaces:otsi',
106 'supporting-port': 'L1'}
108 "frequency": 196.0812,
109 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
110 "fec": "org-openroadm-common-types:ofec",
111 "transmit-power": -5,
112 "provision-mode": "explicit",
113 "modulation-format": "dp-qpsk"}
115 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
116 response['interface'][0])
117 self.assertDictEqual(dict(input_dict_2,
118 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
119 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
120 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
121 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
123 def test_05_check_interface_otsig(self):
124 response = test_utils_rfc8040.check_node_attribute_request(
125 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
126 self.assertEqual(response['status_code'], requests.codes.ok)
127 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
128 'administrative-state': 'inService',
129 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
130 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
131 'type': 'org-openroadm-interfaces:otsi-group',
132 'supporting-port': 'L1'}
133 input_dict_2 = {"group-id": 1,
134 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
136 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
137 response['interface'][0])
138 self.assertDictEqual(dict(input_dict_2,
139 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
140 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
142 def test_06_check_interface_otuc2(self):
143 response = test_utils_rfc8040.check_node_attribute_request(
144 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
145 self.assertEqual(response['status_code'], requests.codes.ok)
146 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
147 'administrative-state': 'inService',
148 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
149 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
150 'type': 'org-openroadm-interfaces:otnOtu',
151 'supporting-port': 'L1'}
152 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
153 "degthr-percentage": 100,
154 "tim-detect-mode": "Disabled",
158 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
159 response['interface'][0])
160 self.assertDictEqual(dict(input_dict_2,
161 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
162 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
164 # 1b) create a ODUC2 device renderer
165 def test_07_otn_service_path_create_oduc2(self):
166 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
168 'service-name': 'service_ODUC2',
169 'operation': 'create',
170 'service-rate': '200',
171 'service-format': 'ODU',
172 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
174 self.assertEqual(response['status_code'], requests.codes.ok)
175 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
177 {'node-id': 'XPDR-A2',
178 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
180 def test_08_get_portmapping_network1(self):
181 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
182 self.assertEqual(response['status_code'], requests.codes.ok)
183 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
185 self.NETWORK2_CHECK_DICT,
188 def test_09_check_interface_oduc2(self):
189 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
190 self.assertEqual(response['status_code'], requests.codes.ok)
192 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
193 'administrative-state': 'inService',
194 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
195 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
196 'type': 'org-openroadm-interfaces:otnOdu',
197 'supporting-port': 'L1'}
199 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
200 'rate': 'org-openroadm-otn-common-types:ODUCn',
201 'tx-sapi': 'LY9PxYJqUbw=',
202 'tx-dapi': 'LY9PxYJqUbw=',
203 'expected-sapi': 'LY9PxYJqUbw=',
204 'expected-dapi': 'LY9PxYJqUbw=',
206 "degthr-percentage": 100,
207 "monitoring-mode": "terminated",
210 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
211 response['interface'][0])
212 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
213 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
214 self.assertDictEqual(
215 {'payload-type': '22', 'exp-payload-type': '22'},
216 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
218 # 1c) create Ethernet device renderer
219 def test_10_otn_service_path_create_100ge(self):
220 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
222 'service-name': 'service_Ethernet',
223 'operation': 'create',
224 'service-rate': '100',
225 'service-format': 'Ethernet',
226 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
227 'ethernet-encoding': 'eth encode',
228 'opucn-trib-slots': ['1.1', '1.20']
230 self.assertEqual(response['status_code'], requests.codes.ok)
231 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
232 self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
233 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
234 response['output']['node-interface'][0]['connection-id'])
235 self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
236 self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
237 response['output']['node-interface'][0]['odu-interface-id'])
238 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
239 response['output']['node-interface'][0]['odu-interface-id'])
241 def test_11_check_interface_100ge_client(self):
242 response = test_utils_rfc8040.check_node_attribute_request(
243 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
244 self.assertEqual(response['status_code'], requests.codes.ok)
245 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
246 'administrative-state': 'inService',
247 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
248 'type': 'org-openroadm-interfaces:ethernetCsmacd',
249 'supporting-port': 'C1'
251 input_dict_2 = {'speed': 100000}
252 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
253 response['interface'][0])
254 self.assertDictEqual(dict(input_dict_2,
255 **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
256 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
258 def test_12_check_interface_odu4_client(self):
259 response = test_utils_rfc8040.check_node_attribute_request(
260 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
261 self.assertEqual(response['status_code'], requests.codes.ok)
262 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
263 'administrative-state': 'inService',
264 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
265 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
266 'type': 'org-openroadm-interfaces:otnOdu',
267 'supporting-port': 'C1'}
269 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
270 'rate': 'org-openroadm-otn-common-types:ODU4',
271 'monitoring-mode': 'terminated'}
273 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
274 response['interface'][0])
275 self.assertDictEqual(dict(input_dict_2,
276 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
277 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
278 self.assertDictEqual(
279 {'payload-type': '07', 'exp-payload-type': '07'},
280 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
282 def test_13_check_interface_odu4_network(self):
283 response = test_utils_rfc8040.check_node_attribute_request(
284 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
285 self.assertEqual(response['status_code'], requests.codes.ok)
286 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
287 'administrative-state': 'inService',
288 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
289 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
290 'type': 'org-openroadm-interfaces:otnOdu',
291 'supporting-port': 'L1'}
293 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
294 'rate': 'org-openroadm-otn-common-types:ODU4',
295 'monitoring-mode': 'not-terminated'}
296 input_dict_3 = {'trib-port-number': 1}
298 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
299 response['interface'][0])
300 self.assertDictEqual(dict(input_dict_2,
301 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
302 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
303 self.assertDictEqual(dict(input_dict_3,
304 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
305 'parent-odu-allocation']),
306 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
307 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
308 ['opucn-trib-slots'])
309 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
310 ['opucn-trib-slots'])
312 def test_14_check_odu_connection_xpdra2(self):
313 response = test_utils_rfc8040.check_node_attribute_request(
315 "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
316 self.assertEqual(response['status_code'], requests.codes.ok)
319 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
320 'direction': 'bidirectional'
323 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
324 response['odu-connection'][0])
325 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
326 response['odu-connection'][0]['destination'])
327 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
328 response['odu-connection'][0]['source'])
330 # 1d) Delete Ethernet device interfaces
331 def test_15_otn_service_path_delete_100ge(self):
332 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
334 'service-name': 'service_Ethernet',
335 'operation': 'delete',
336 'service-rate': '100',
337 'service-format': 'Ethernet',
338 'nodes': [{'node-id': 'XPDR-A2', 'client-tp': 'XPDR2-CLIENT1', 'network-tp': 'XPDR2-NETWORK1'}],
339 'ethernet-encoding': 'eth encode',
341 'trib-port-number': '1'
343 self.assertEqual(response['status_code'], requests.codes.ok)
344 self.assertIn('Request processed', response['output']['result'])
346 def test_16_check_no_odu_connection(self):
347 response = test_utils_rfc8040.check_node_attribute_request(
349 "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
350 self.assertEqual(response['status_code'], requests.codes.conflict)
352 def test_17_check_no_interface_odu_network(self):
353 response = test_utils_rfc8040.check_node_attribute_request(
354 "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
355 self.assertEqual(response['status_code'], requests.codes.conflict)
357 def test_18_check_no_interface_odu_client(self):
358 response = test_utils_rfc8040.check_node_attribute_request(
359 "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
360 self.assertEqual(response['status_code'], requests.codes.conflict)
362 def test_19_check_no_interface_100ge_client(self):
363 response = test_utils_rfc8040.check_node_attribute_request(
364 "XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
365 self.assertEqual(response['status_code'], requests.codes.conflict)
367 # 1e) Delete ODUC2 device interfaces
368 def test_20_otn_service_path_delete_oduc2(self):
369 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
371 'service-name': 'service_ODUC2',
372 'operation': 'delete',
373 'service-rate': '200',
374 'service-format': 'ODU',
375 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
377 self.assertEqual(response['status_code'], requests.codes.ok)
378 self.assertIn('Request processed', response['output']['result'])
379 # Here you have remove the added oducn supporting port interface
380 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
382 def test_21_check_no_interface_oduc2(self):
383 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
384 self.assertEqual(response['status_code'], requests.codes.conflict)
386 # Check if port-mapping data is updated, where the supporting-oducn is deleted
387 def test_21a_check_no_oduc2(self):
388 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
389 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
391 # 1f) Delete OTUC2 device interfaces
393 def test_22_service_path_delete_otuc2(self):
394 response = test_utils_rfc8040.device_renderer_service_path_request(
396 'service-name': 'service_OTUC2',
398 'modulation-format': 'dp-qpsk',
399 'operation': 'delete',
400 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
401 'center-freq': 196.1,
403 'min-freq': 196.0375,
405 'lower-spectral-slot-number': 755,
406 'higher-spectral-slot-number': 768
408 self.assertEqual(response['status_code'], requests.codes.ok)
409 self.assertIn('Request processed', response['output']['result'])
410 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
412 def test_23_check_no_interface_otuc2(self):
413 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
414 self.assertEqual(response['status_code'], requests.codes.conflict)
416 def test_24_check_no_interface_otsig(self):
417 response = test_utils_rfc8040.check_node_attribute_request(
418 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
419 self.assertEqual(response['status_code'], requests.codes.conflict)
421 def test_25_check_no_interface_otsi(self):
422 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
423 self.assertEqual(response['status_code'], requests.codes.conflict)
425 def test_25a_check_no_otuc2(self):
426 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
427 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
429 # 2a) create a OTUC3 device renderer
430 def test_26_service_path_create_otuc3(self):
431 response = test_utils_rfc8040.device_renderer_service_path_request(
433 'service-name': 'service_OTUC3',
435 'modulation-format': 'dp-qam8',
436 'operation': 'create',
437 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
438 'center-freq': 196.1,
440 'min-freq': 196.0375,
442 'lower-spectral-slot-number': 755,
443 'higher-spectral-slot-number': 768
445 self.assertEqual(response['status_code'], requests.codes.ok)
446 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
448 {'node-id': 'XPDR-A2',
449 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
450 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, response['output']['node-interface'])
452 def test_27_get_portmapping_network1(self):
453 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
454 self.assertEqual(response['status_code'], requests.codes.ok)
455 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
457 self.NETWORK2_CHECK_DICT,
460 def test_28_check_interface_otsi(self):
461 # pylint: disable=line-too-long
462 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
463 self.assertEqual(response['status_code'], requests.codes.ok)
465 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
466 'administrative-state': 'inService',
467 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
468 'type': 'org-openroadm-interfaces:otsi',
469 'supporting-port': 'L1'}
471 "frequency": 196.0812,
472 "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
473 "fec": "org-openroadm-common-types:ofec",
474 "transmit-power": -5,
475 "provision-mode": "explicit",
476 "modulation-format": "dp-qam8"}
478 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
479 response['interface'][0])
480 self.assertDictEqual(dict(input_dict_2,
481 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
482 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
483 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
484 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
486 def test_29_check_interface_otsig(self):
487 response = test_utils_rfc8040.check_node_attribute_request(
488 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
489 self.assertEqual(response['status_code'], requests.codes.ok)
490 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
491 'administrative-state': 'inService',
492 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
493 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
494 'type': 'org-openroadm-interfaces:otsi-group',
495 'supporting-port': 'L1'}
496 input_dict_2 = {"group-id": 1,
497 "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
499 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
500 response['interface'][0])
501 self.assertDictEqual(dict(input_dict_2,
502 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
503 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
505 def test_30_check_interface_otuc3(self):
506 response = test_utils_rfc8040.check_node_attribute_request(
507 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
508 self.assertEqual(response['status_code'], requests.codes.ok)
509 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
510 'administrative-state': 'inService',
511 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
512 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
513 'type': 'org-openroadm-interfaces:otnOtu',
514 'supporting-port': 'L1'}
515 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
516 "degthr-percentage": 100,
517 "tim-detect-mode": "Disabled",
521 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
522 response['interface'][0])
523 self.assertDictEqual(dict(input_dict_2,
524 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
525 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
527 # 2b) create a ODUC3 device renderer
528 def test_31_otn_service_path_create_oduc3(self):
529 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
531 'service-name': 'service_ODUC3',
532 'operation': 'create',
533 'service-rate': '300',
534 'service-format': 'ODU',
535 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
537 self.assertEqual(response['status_code'], requests.codes.ok)
538 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
540 {'node-id': 'XPDR-A2',
541 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
543 def test_32_get_portmapping_network1(self):
544 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
545 self.assertEqual(response['status_code'], requests.codes.ok)
546 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
548 self.NETWORK2_CHECK_DICT,
551 def test_33_check_interface_oduc3(self):
552 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
553 self.assertEqual(response['status_code'], requests.codes.ok)
555 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
556 'administrative-state': 'inService',
557 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
558 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
559 'type': 'org-openroadm-interfaces:otnOdu',
560 'supporting-port': 'L1'}
562 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
563 'rate': 'org-openroadm-otn-common-types:ODUCn',
564 'tx-sapi': 'LY9PxYJqUbw=',
565 'tx-dapi': 'LY9PxYJqUbw=',
566 'expected-sapi': 'LY9PxYJqUbw=',
567 'expected-dapi': 'LY9PxYJqUbw=',
569 "degthr-percentage": 100,
570 "monitoring-mode": "terminated",
573 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
574 response['interface'][0])
575 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
576 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
577 self.assertDictEqual(
578 {'payload-type': '22', 'exp-payload-type': '22'},
579 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
581 # 2c) create Ethernet device renderer
582 # No change in the ethernet device renderer so skipping those tests
583 # 2d) Delete Ethernet device interfaces
584 # No change in the ethernet device renderer so skipping those tests
586 # 2e) Delete ODUC3 device interfaces
587 def test_34_otn_service_path_delete_oduc3(self):
588 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
590 'service-name': 'service_ODUC3',
591 'operation': 'delete',
592 'service-rate': '300',
593 'service-format': 'ODU',
594 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
596 self.assertEqual(response['status_code'], requests.codes.ok)
597 self.assertIn('Request processed', response['output']['result'])
598 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
600 def test_35_check_no_interface_oduc3(self):
601 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
602 self.assertEqual(response['status_code'], requests.codes.conflict)
604 def test_35a_check_no_oduc3(self):
605 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
606 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
608 # 2f) Delete OTUC3 device interfaces
609 def test_36_service_path_delete_otuc3(self):
610 response = test_utils_rfc8040.device_renderer_service_path_request(
612 'service-name': 'service_OTUC3',
614 'modulation-format': 'dp-qam8',
615 'operation': 'delete',
616 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
617 'center-freq': 196.1,
619 'min-freq': 196.0375,
621 'lower-spectral-slot-number': 755,
622 'higher-spectral-slot-number': 768
624 self.assertEqual(response['status_code'], requests.codes.ok)
625 self.assertIn('Request processed', response['output']['result'])
626 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
628 def test_37_check_no_interface_otuc3(self):
629 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
630 self.assertEqual(response['status_code'], requests.codes.conflict)
632 def test_38_check_no_interface_otsig(self):
633 response = test_utils_rfc8040.check_node_attribute_request(
634 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
635 self.assertEqual(response['status_code'], requests.codes.conflict)
637 def test_39_check_no_interface_otsi(self):
638 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
639 self.assertEqual(response['status_code'], requests.codes.conflict)
641 def test_39a_check_no_otuc3(self):
642 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
643 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
645 # 3a) create a OTUC4 device renderer
646 def test_40_service_path_create_otuc4(self):
647 response = test_utils_rfc8040.device_renderer_service_path_request(
649 'service-name': 'service_OTUC4',
651 'modulation-format': 'dp-qam16',
652 'operation': 'create',
653 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
654 'center-freq': 196.1,
656 'min-freq': 196.0375,
658 'lower-spectral-slot-number': 755,
659 'higher-spectral-slot-number': 768
661 self.assertEqual(response['status_code'], requests.codes.ok)
662 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
664 {'node-id': 'XPDR-A2',
665 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
666 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, response['output']['node-interface'])
668 def test_41_get_portmapping_network1(self):
669 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
670 self.assertEqual(response['status_code'], requests.codes.ok)
671 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
673 self.NETWORK2_CHECK_DICT,
676 def test_42_check_interface_otsi(self):
677 # pylint: disable=line-too-long
678 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
679 self.assertEqual(response['status_code'], requests.codes.ok)
681 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
682 'administrative-state': 'inService',
683 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
684 'type': 'org-openroadm-interfaces:otsi',
685 'supporting-port': 'L1'}
687 "frequency": 196.0812,
688 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
689 "fec": "org-openroadm-common-types:ofec",
690 "transmit-power": -5,
691 "provision-mode": "explicit",
692 "modulation-format": "dp-qam16"}
694 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
695 response['interface'][0])
696 self.assertDictEqual(dict(input_dict_2,
697 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
698 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
699 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
700 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
702 def test_43_check_interface_otsig(self):
703 response = test_utils_rfc8040.check_node_attribute_request(
704 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
705 self.assertEqual(response['status_code'], requests.codes.ok)
706 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
707 'administrative-state': 'inService',
708 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
709 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
710 'type': 'org-openroadm-interfaces:otsi-group',
711 'supporting-port': 'L1'}
712 input_dict_2 = {"group-id": 1,
713 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
715 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
716 response['interface'][0])
717 self.assertDictEqual(dict(input_dict_2,
718 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
719 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
721 def test_44_check_interface_otuc4(self):
722 response = test_utils_rfc8040.check_node_attribute_request(
723 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
724 self.assertEqual(response['status_code'], requests.codes.ok)
725 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
726 'administrative-state': 'inService',
727 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
728 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
729 'type': 'org-openroadm-interfaces:otnOtu',
730 'supporting-port': 'L1'}
731 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
732 "degthr-percentage": 100,
733 "tim-detect-mode": "Disabled",
737 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
738 response['interface'][0])
739 self.assertDictEqual(dict(input_dict_2,
740 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
741 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
743 # 3b) create a ODUC4 device renderer
744 def test_45_otn_service_path_create_oduc3(self):
745 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
747 'service-name': 'service_ODUC4',
748 'operation': 'create',
749 'service-rate': '400',
750 'service-format': 'ODU',
751 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
753 self.assertEqual(response['status_code'], requests.codes.ok)
754 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
756 {'node-id': 'XPDR-A2',
757 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
759 def test_46_get_portmapping_network1(self):
760 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
761 self.assertEqual(response['status_code'], requests.codes.ok)
762 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
764 self.NETWORK2_CHECK_DICT,
767 def test_47_check_interface_oduc4(self):
768 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
769 self.assertEqual(response['status_code'], requests.codes.ok)
771 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
772 'administrative-state': 'inService',
773 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
774 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
775 'type': 'org-openroadm-interfaces:otnOdu',
776 'supporting-port': 'L1'}
778 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
779 'rate': 'org-openroadm-otn-common-types:ODUCn',
780 'tx-sapi': 'LY9PxYJqUbw=',
781 'tx-dapi': 'LY9PxYJqUbw=',
782 'expected-sapi': 'LY9PxYJqUbw=',
783 'expected-dapi': 'LY9PxYJqUbw=',
785 "degthr-percentage": 100,
786 "monitoring-mode": "terminated",
789 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
790 response['interface'][0])
791 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
792 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
793 self.assertDictEqual(
794 {'payload-type': '22', 'exp-payload-type': '22'},
795 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
797 # 3c) create Ethernet device renderer
798 # No change in the ethernet device renderer so skipping those tests
799 # 3d) Delete Ethernet device interfaces
800 # No change in the ethernet device renderer so skipping those tests
802 # 3e) Delete ODUC4 device interfaces
803 def test_48_otn_service_path_delete_oduc4(self):
804 response = test_utils_rfc8040.device_renderer_otn_service_path_request(
806 'service-name': 'service_ODUC4',
807 'operation': 'delete',
808 'service-rate': '400',
809 'service-format': 'ODU',
810 'nodes': [{'node-id': 'XPDR-A2', 'network-tp': 'XPDR2-NETWORK1'}]
812 self.assertEqual(response['status_code'], requests.codes.ok)
813 self.assertIn('Request processed', response['output']['result'])
814 del self.NETWORK2_CHECK_DICT["supporting-oducn"]
816 def test_49_check_no_interface_oduc4(self):
817 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
818 self.assertEqual(response['status_code'], requests.codes.conflict)
820 def test_49a_check_no_oduc4(self):
821 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
822 self.assertRaises(KeyError, lambda: response["supporting-oducn"])
824 # 3f) Delete OTUC4 device interfaces
825 def test_50_service_path_delete_otuc4(self):
826 response = test_utils_rfc8040.device_renderer_service_path_request(
828 'service-name': 'service_OTUC4',
830 'modulation-format': 'dp-qam16',
831 'operation': 'delete',
832 'nodes': [{'node-id': 'XPDR-A2', 'dest-tp': 'XPDR2-NETWORK1'}],
833 'center-freq': 196.1,
835 'min-freq': 196.0375,
837 'lower-spectral-slot-number': 755,
838 'higher-spectral-slot-number': 768
840 self.assertEqual(response['status_code'], requests.codes.ok)
841 self.assertIn('Request processed', response['output']['result'])
842 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
844 def test_51_check_no_interface_otuc4(self):
845 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
846 self.assertEqual(response['status_code'], requests.codes.conflict)
848 def test_52_check_no_interface_otsig(self):
849 response = test_utils_rfc8040.check_node_attribute_request(
850 "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
851 self.assertEqual(response['status_code'], requests.codes.conflict)
853 def test_53_check_no_interface_otsi(self):
854 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
855 self.assertEqual(response['status_code'], requests.codes.conflict)
857 def test_53a_check_no_otuc4(self):
858 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
859 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
861 # Disconnect the XPDR
862 def test_54_xpdr_device_disconnection(self):
863 response = test_utils_rfc8040.unmount_device("XPDR-A2")
864 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
866 def test_55_xpdr_device_disconnected(self):
867 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
868 self.assertEqual(response['status_code'], requests.codes.conflict)
869 self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
870 self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
871 self.assertEqual(response['connection-status']['error-message'],
872 'Request could not be completed because the relevant data model content does not exist')
874 def test_56_xpdr_device_not_connected(self):
875 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
876 self.assertEqual(response['status_code'], requests.codes.conflict)
877 self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
878 self.assertEqual(response['node-info']['error-tag'], 'data-missing')
879 self.assertEqual(response['node-info']['error-message'],
880 'Request could not be completed because the relevant data model content does not exist')
883 if __name__ == '__main__':
884 unittest.main(verbosity=2)