2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils # nopep8
23 import test_utils_rfc8040 # nopep8
26 class TransportPCE400GPortMappingTesting(unittest.TestCase):
29 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR2-NETWORK1",
30 "supporting-port": "L1",
31 "supported-interface-capability": [
32 "org-openroadm-port-types:if-otsi-otsigroup"
34 "port-direction": "bidirectional",
35 "port-qual": "switch-network",
36 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
37 "xponder-type": "mpdr",
38 'lcp-hash-val': 'LY9PxYJqUbw=',
39 'port-admin-state': 'InService',
40 'port-oper-state': 'InService'}
45 cls.processes = test_utils_rfc8040.start_tpce()
46 cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
49 def tearDownClass(cls):
50 # pylint: disable=not-an-iterable
51 for process in cls.processes:
52 test_utils_rfc8040.shutdown_process(process)
53 print("all processes killed")
56 # pylint: disable=consider-using-f-string
57 print("execution of {}".format(self.id().split(".")[-1]))
60 def test_01_xpdr_device_connection(self):
61 response = test_utils_rfc8040.mount_device("XPDR-A2",
62 ('xpdra2', self.NODE_VERSION))
63 self.assertEqual(response.status_code, requests.codes.created,
64 test_utils_rfc8040.CODE_SHOULD_BE_201)
66 # Check if the node appears in the ietf-network topology
67 # this test has been removed, since it already exists in port-mapping
68 # 1a) create a OTUC2 device renderer
69 def test_02_service_path_create_otuc2(self):
70 response = test_utils.service_path_request("create", "service_OTUC2", "0",
71 [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
72 196.1, 75, 196.0375, 196.125, 755,
75 self.assertEqual(response.status_code, requests.codes.ok)
77 self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
78 self.assertTrue(res["output"]["success"])
80 {'node-id': 'XPDR-A2',
81 'otu-interface-id': ['XPDR2-NETWORK1-OTUC2'],
82 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-200G']}, res["output"]['node-interface'])
84 def test_03_get_portmapping_network1(self):
85 response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
86 self.assertEqual(response.status_code, requests.codes.ok)
88 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
90 self.NETWORK2_CHECK_DICT,
93 def test_04_check_interface_otsi(self):
94 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
95 self.assertEqual(response.status_code, requests.codes.ok)
98 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
99 'administrative-state': 'inService',
100 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
101 'type': 'org-openroadm-interfaces:otsi',
102 'supporting-port': 'L1'}
104 "frequency": 196.0812,
105 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
106 "fec": "org-openroadm-common-types:ofec",
107 "transmit-power": -5,
108 "provision-mode": "explicit",
109 "modulation-format": "dp-qpsk"}
111 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
113 self.assertDictEqual(dict(input_dict_2,
114 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
115 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
116 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.4", "iid": [1, 2]},
117 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
119 def test_05_check_interface_otsig(self):
120 response = test_utils.check_netconf_node_request(
121 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-200G")
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
125 'administrative-state': 'inService',
126 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
127 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
128 'type': 'org-openroadm-interfaces:otsi-group',
129 'supporting-port': 'L1'}
130 input_dict_2 = {"group-id": 1,
131 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
133 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
135 self.assertDictEqual(dict(input_dict_2,
136 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
137 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
139 def test_06_check_interface_otuc2(self):
140 response = test_utils.check_netconf_node_request(
141 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC2")
142 self.assertEqual(response.status_code, requests.codes.ok)
143 res = response.json()
144 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
145 'administrative-state': 'inService',
146 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
147 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-200G',
148 'type': 'org-openroadm-interfaces:otnOtu',
149 'supporting-port': 'L1'}
150 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
151 "degthr-percentage": 100,
152 "tim-detect-mode": "Disabled",
156 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
158 self.assertDictEqual(dict(input_dict_2,
159 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
160 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
162 # 1b) create a ODUC2 device renderer
163 def test_07_otn_service_path_create_oduc2(self):
164 response = test_utils.otn_service_path_request("create", "service_ODUC2", "200", "ODU",
165 [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
170 self.assertTrue(res["output"]["success"])
172 {'node-id': 'XPDR-A2',
173 'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, res["output"]['node-interface'])
175 def test_08_get_portmapping_network1(self):
176 response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
181 self.NETWORK2_CHECK_DICT,
184 def test_09_check_interface_oduc2(self):
185 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC2")
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
189 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
190 'administrative-state': 'inService',
191 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
192 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC2',
193 'type': 'org-openroadm-interfaces:otnOdu',
194 'supporting-port': 'L1'}
196 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
197 'rate': 'org-openroadm-otn-common-types:ODUCn',
198 'tx-sapi': 'LY9PxYJqUbw=',
199 'tx-dapi': 'LY9PxYJqUbw=',
200 'expected-sapi': 'LY9PxYJqUbw=',
201 'expected-dapi': 'LY9PxYJqUbw=',
203 "degthr-percentage": 100,
204 "monitoring-mode": "terminated",
207 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
209 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
210 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
211 self.assertDictEqual(
212 {'payload-type': '22', 'exp-payload-type': '22'},
213 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
215 # 1c) create Ethernet device renderer
216 def test_10_otn_service_path_create_100ge(self):
217 response = test_utils.otn_service_path_request("create", "service_Ethernet", "100", "Ethernet",
218 [{"node-id": "XPDR-A2", "client-tp": "XPDR2-CLIENT1",
219 "network-tp": "XPDR2-NETWORK1"}],
220 {"ethernet-encoding": "eth encode",
221 "opucn-trib-slots": [
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
229 self.assertTrue(res["output"]["success"])
230 self.assertEqual('XPDR-A2', res["output"]['node-interface'][0]['node-id'])
231 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
232 res["output"]['node-interface'][0]['connection-id'])
233 self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', res["output"]['node-interface'][0]['eth-interface-id'])
234 self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet', res["output"]['node-interface'][0]['odu-interface-id'])
235 self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet', res["output"]['node-interface'][0]['odu-interface-id'])
237 def test_11_check_interface_100ge_client(self):
238 response = test_utils.check_netconf_node_request(
239 "XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET-100G")
240 self.assertEqual(response.status_code, requests.codes.ok)
241 res = response.json()
242 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
243 'administrative-state': 'inService',
244 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
245 'type': 'org-openroadm-interfaces:ethernetCsmacd',
246 'supporting-port': 'C1'
248 input_dict_2 = {'speed': 100000}
249 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
251 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
252 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
254 def test_12_check_interface_odu4_client(self):
255 response = test_utils.check_netconf_node_request(
256 "XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service_Ethernet")
257 self.assertEqual(response.status_code, requests.codes.ok)
258 res = response.json()
259 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
260 'administrative-state': 'inService',
261 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
262 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
263 'type': 'org-openroadm-interfaces:otnOdu',
264 'supporting-port': 'C1'}
266 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
267 'rate': 'org-openroadm-otn-common-types:ODU4',
268 'monitoring-mode': 'terminated'}
270 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
272 self.assertDictEqual(dict(input_dict_2,
273 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
274 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
275 self.assertDictEqual(
276 {'payload-type': '07', 'exp-payload-type': '07'},
277 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
279 def test_13_check_interface_odu4_network(self):
280 response = test_utils.check_netconf_node_request(
281 "XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service_Ethernet")
282 self.assertEqual(response.status_code, requests.codes.ok)
283 res = response.json()
284 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
285 'administrative-state': 'inService',
286 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
287 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
288 'type': 'org-openroadm-interfaces:otnOdu',
289 'supporting-port': 'L1'}
291 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
292 'rate': 'org-openroadm-otn-common-types:ODU4',
293 'monitoring-mode': 'not-terminated'}
294 input_dict_3 = {'trib-port-number': 1}
296 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
298 self.assertDictEqual(dict(input_dict_2,
299 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
300 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
301 self.assertDictEqual(dict(input_dict_3,
302 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
303 'parent-odu-allocation']),
304 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
305 self.assertIn('1.1', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
306 ['opucn-trib-slots'])
307 self.assertIn('1.20', res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
308 ['opucn-trib-slots'])
310 def test_14_check_odu_connection_xpdra2(self):
311 response = test_utils.check_netconf_node_request(
313 "odu-connection/XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
314 self.assertEqual(response.status_code, requests.codes.ok)
315 res = response.json()
318 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
319 'direction': 'bidirectional'
322 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
323 res['odu-connection'][0])
324 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
325 res['odu-connection'][0]['destination'])
326 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
327 res['odu-connection'][0]['source'])
329 # 1d) Delete Ethernet device interfaces
330 def test_15_otn_service_path_delete_100ge(self):
331 response = test_utils.otn_service_path_request("delete", "service_Ethernet", "100", "Ethernet",
332 [{"node-id": "XPDR-A2", "client-tp": "XPDR2-CLIENT1",
333 "network-tp": "XPDR2-NETWORK1"}],
334 {"ethernet-encoding": "eth encode",
335 "trib-slot": ["1"], "trib-port-number": "1"})
337 self.assertEqual(response.status_code, requests.codes.ok)
338 res = response.json()
339 self.assertIn('Request processed', res["output"]["result"])
340 self.assertTrue(res["output"]["success"])
342 def test_16_check_no_odu_connection(self):
343 response = test_utils.check_netconf_node_request(
345 "odu-connection/XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
346 self.assertEqual(response.status_code, requests.codes.conflict)
348 def test_17_check_no_interface_odu_network(self):
349 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODU4-service_Ethernet")
350 self.assertEqual(response.status_code, requests.codes.conflict)
352 def test_18_check_no_interface_odu_client(self):
353 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-CLIENT1-ODU4-service_Ethernet")
354 self.assertEqual(response.status_code, requests.codes.conflict)
356 def test_19_check_no_interface_100ge_client(self):
357 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-CLIENT1-ETHERNET-100G")
358 self.assertEqual(response.status_code, requests.codes.conflict)
360 # 1e) Delete ODUC2 device interfaces
361 def test_20_otn_service_path_delete_oduc2(self):
362 response = test_utils.otn_service_path_request("delete", "service_ODUC2", "200", "ODU",
363 [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 self.assertIn('Request processed', res["output"]["result"])
368 self.assertTrue(res["output"]["success"])
370 def test_21_check_no_interface_oduc2(self):
371 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC2")
372 self.assertEqual(response.status_code, requests.codes.conflict)
374 # 1f) Delete OTUC2 device interfaces
375 def test_22_service_path_delete_otuc2(self):
376 response = test_utils.service_path_request("delete", "service_OTUC2", "0",
377 [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
378 196.1, 75, 196.0375, 196.125, 755,
381 self.assertEqual(response.status_code, requests.codes.ok)
382 res = response.json()
383 self.assertIn('Request processed', res["output"]["result"])
384 self.assertTrue(res["output"]["success"])
386 def test_23_check_no_interface_otuc2(self):
387 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTUC2")
388 self.assertEqual(response.status_code, requests.codes.conflict)
390 def test_24_check_no_interface_otsig(self):
391 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTSIGROUP-200G")
392 self.assertEqual(response.status_code, requests.codes.conflict)
394 def test_25_check_no_interface_otsi(self):
395 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-755:768")
396 self.assertEqual(response.status_code, requests.codes.conflict)
398 # 2a) create a OTUC3 device renderer
399 def test_26_service_path_create_otuc3(self):
400 response = test_utils.service_path_request("create", "service_OTUC3", "0",
401 [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
402 196.1, 75, 196.0375, 196.125, 755,
405 self.assertEqual(response.status_code, requests.codes.ok)
406 res = response.json()
407 self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
408 self.assertTrue(res["output"]["success"])
410 {'node-id': 'XPDR-A2',
411 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
412 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-300G']}, res["output"]['node-interface'])
414 def test_27_get_portmapping_network1(self):
415 response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
420 self.NETWORK2_CHECK_DICT,
423 def test_28_check_interface_otsi(self):
424 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
425 self.assertEqual(response.status_code, requests.codes.ok)
426 res = response.json()
428 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
429 'administrative-state': 'inService',
430 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
431 'type': 'org-openroadm-interfaces:otsi',
432 'supporting-port': 'L1'}
434 "frequency": 196.0812,
435 "otsi-rate": "org-openroadm-common-optical-channel-types:R300G-otsi",
436 "fec": "org-openroadm-common-types:ofec",
437 "transmit-power": -5,
438 "provision-mode": "explicit",
439 "modulation-format": "dp-qam8"}
441 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
443 self.assertDictEqual(dict(input_dict_2,
444 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
445 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
446 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic3.6", "iid": [1, 2, 3]},
447 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
449 def test_29_check_interface_otsig(self):
450 response = test_utils.check_netconf_node_request(
451 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-300G")
452 self.assertEqual(response.status_code, requests.codes.ok)
453 res = response.json()
454 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
455 'administrative-state': 'inService',
456 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
457 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
458 'type': 'org-openroadm-interfaces:otsi-group',
459 'supporting-port': 'L1'}
460 input_dict_2 = {"group-id": 1,
461 "group-rate": "org-openroadm-common-optical-channel-types:R300G-otsi"}
463 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
465 self.assertDictEqual(dict(input_dict_2,
466 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
467 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
469 def test_30_check_interface_otuc3(self):
470 response = test_utils.check_netconf_node_request(
471 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC3")
472 self.assertEqual(response.status_code, requests.codes.ok)
473 res = response.json()
474 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
475 'administrative-state': 'inService',
476 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
477 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-300G',
478 'type': 'org-openroadm-interfaces:otnOtu',
479 'supporting-port': 'L1'}
480 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
481 "degthr-percentage": 100,
482 "tim-detect-mode": "Disabled",
486 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
488 self.assertDictEqual(dict(input_dict_2,
489 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
490 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
492 # 2b) create a ODUC3 device renderer
493 def test_31_otn_service_path_create_oduc3(self):
494 response = test_utils.otn_service_path_request("create", "service_ODUC3", "300", "ODU",
495 [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
497 self.assertEqual(response.status_code, requests.codes.ok)
498 res = response.json()
499 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
500 self.assertTrue(res["output"]["success"])
502 {'node-id': 'XPDR-A2',
503 'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, res["output"]['node-interface'])
505 def test_32_get_portmapping_network1(self):
506 response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
507 self.assertEqual(response.status_code, requests.codes.ok)
508 res = response.json()
509 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
511 self.NETWORK2_CHECK_DICT,
514 def test_33_check_interface_oduc3(self):
515 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC3")
516 self.assertEqual(response.status_code, requests.codes.ok)
517 res = response.json()
519 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
520 'administrative-state': 'inService',
521 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
522 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC3',
523 'type': 'org-openroadm-interfaces:otnOdu',
524 'supporting-port': 'L1'}
526 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
527 'rate': 'org-openroadm-otn-common-types:ODUCn',
528 'tx-sapi': 'LY9PxYJqUbw=',
529 'tx-dapi': 'LY9PxYJqUbw=',
530 'expected-sapi': 'LY9PxYJqUbw=',
531 'expected-dapi': 'LY9PxYJqUbw=',
533 "degthr-percentage": 100,
534 "monitoring-mode": "terminated",
537 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
539 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
540 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
541 self.assertDictEqual(
542 {'payload-type': '22', 'exp-payload-type': '22'},
543 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
545 # 2c) create Ethernet device renderer
546 # No change in the ethernet device renderer so skipping those tests
547 # 2d) Delete Ethernet device interfaces
548 # No change in the ethernet device renderer so skipping those tests
550 # 2e) Delete ODUC3 device interfaces
551 def test_34_otn_service_path_delete_oduc3(self):
552 response = test_utils.otn_service_path_request("delete", "service_ODUC3", "300", "ODU",
553 [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
555 self.assertEqual(response.status_code, requests.codes.ok)
556 res = response.json()
557 self.assertIn('Request processed', res["output"]["result"])
558 self.assertTrue(res["output"]["success"])
560 def test_35_check_no_interface_oduc3(self):
561 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC3")
562 self.assertEqual(response.status_code, requests.codes.conflict)
564 # 2f) Delete OTUC3 device interfaces
565 def test_36_service_path_delete_otuc3(self):
566 response = test_utils.service_path_request("delete", "service_OTUC3", "0",
567 [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
568 196.1, 75, 196.0375, 196.125, 755,
571 self.assertEqual(response.status_code, requests.codes.ok)
572 res = response.json()
573 self.assertIn('Request processed', res["output"]["result"])
574 self.assertTrue(res["output"]["success"])
576 def test_37_check_no_interface_otuc3(self):
577 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTUC3")
578 self.assertEqual(response.status_code, requests.codes.conflict)
580 def test_38_check_no_interface_otsig(self):
581 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTSIGROUP-300G")
582 self.assertEqual(response.status_code, requests.codes.conflict)
584 def test_39_check_no_interface_otsi(self):
585 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-755:768")
586 self.assertEqual(response.status_code, requests.codes.conflict)
588 # 3a) create a OTUC4 device renderer
589 def test_40_service_path_create_otuc3(self):
590 response = test_utils.service_path_request("create", "service_OTUC4", "0",
591 [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
592 196.1, 75, 196.0375, 196.125, 755,
595 self.assertEqual(response.status_code, requests.codes.ok)
596 res = response.json()
597 self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
598 self.assertTrue(res["output"]["success"])
600 {'node-id': 'XPDR-A2',
601 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
602 'och-interface-id': ['XPDR2-NETWORK1-OTSIGROUP-400G']}, res["output"]['node-interface'])
604 def test_41_get_portmapping_network1(self):
605 response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
606 self.assertEqual(response.status_code, requests.codes.ok)
607 res = response.json()
608 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
610 self.NETWORK2_CHECK_DICT,
613 def test_42_check_interface_otsi(self):
614 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-755:768")
615 self.assertEqual(response.status_code, requests.codes.ok)
616 res = response.json()
618 input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
619 'administrative-state': 'inService',
620 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
621 'type': 'org-openroadm-interfaces:otsi',
622 'supporting-port': 'L1'}
624 "frequency": 196.0812,
625 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
626 "fec": "org-openroadm-common-types:ofec",
627 "transmit-power": -5,
628 "provision-mode": "explicit",
629 "modulation-format": "dp-qam16"}
631 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
633 self.assertDictEqual(dict(input_dict_2,
634 **res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
635 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
636 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
637 res['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
639 def test_43_check_interface_otsig(self):
640 response = test_utils.check_netconf_node_request(
641 "XPDR-A2", "interface/XPDR2-NETWORK1-OTSIGROUP-400G")
642 self.assertEqual(response.status_code, requests.codes.ok)
643 res = response.json()
644 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
645 'administrative-state': 'inService',
646 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
647 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-755:768',
648 'type': 'org-openroadm-interfaces:otsi-group',
649 'supporting-port': 'L1'}
650 input_dict_2 = {"group-id": 1,
651 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
653 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
655 self.assertDictEqual(dict(input_dict_2,
656 **res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
657 res['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
659 def test_44_check_interface_otuc4(self):
660 response = test_utils.check_netconf_node_request(
661 "XPDR-A2", "interface/XPDR2-NETWORK1-OTUC4")
662 self.assertEqual(response.status_code, requests.codes.ok)
663 res = response.json()
664 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
665 'administrative-state': 'inService',
666 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
667 ['supporting-interface-list'][0]: 'XPDR2-NETWORK1-OTSIGROUP-400G',
668 'type': 'org-openroadm-interfaces:otnOtu',
669 'supporting-port': 'L1'}
670 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
671 "degthr-percentage": 100,
672 "tim-detect-mode": "Disabled",
676 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
678 self.assertDictEqual(dict(input_dict_2,
679 **res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
680 res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
682 # 3b) create a ODUC4 device renderer
683 def test_45_otn_service_path_create_oduc3(self):
684 response = test_utils.otn_service_path_request("create", "service_ODUC4", "400", "ODU",
685 [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
687 self.assertEqual(response.status_code, requests.codes.ok)
688 res = response.json()
689 self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', res["output"]["result"])
690 self.assertTrue(res["output"]["success"])
692 {'node-id': 'XPDR-A2',
693 'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, res["output"]['node-interface'])
695 def test_46_get_portmapping_network1(self):
696 response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
697 self.assertEqual(response.status_code, requests.codes.ok)
698 res = response.json()
699 self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
701 self.NETWORK2_CHECK_DICT,
704 def test_47_check_interface_oduc4(self):
705 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
706 self.assertEqual(response.status_code, requests.codes.ok)
707 res = response.json()
709 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
710 'administrative-state': 'inService',
711 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
712 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
713 'type': 'org-openroadm-interfaces:otnOdu',
714 'supporting-port': 'L1'}
716 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
717 'rate': 'org-openroadm-otn-common-types:ODUCn',
718 'tx-sapi': 'LY9PxYJqUbw=',
719 'tx-dapi': 'LY9PxYJqUbw=',
720 'expected-sapi': 'LY9PxYJqUbw=',
721 'expected-dapi': 'LY9PxYJqUbw=',
723 "degthr-percentage": 100,
724 "monitoring-mode": "terminated",
727 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
729 self.assertDictEqual(dict(input_dict_2, **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
730 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
731 self.assertDictEqual(
732 {'payload-type': '22', 'exp-payload-type': '22'},
733 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
735 # 3c) create Ethernet device renderer
736 # No change in the ethernet device renderer so skipping those tests
737 # 3d) Delete Ethernet device interfaces
738 # No change in the ethernet device renderer so skipping those tests
740 # 3e) Delete ODUC4 device interfaces
741 def test_48_otn_service_path_delete_oduc4(self):
742 response = test_utils.otn_service_path_request("delete", "service_ODUC4", "400", "ODU",
743 [{"node-id": "XPDR-A2", "network-tp": "XPDR2-NETWORK1"}])
745 self.assertEqual(response.status_code, requests.codes.ok)
746 res = response.json()
747 self.assertIn('Request processed', res["output"]["result"])
748 self.assertTrue(res["output"]["success"])
750 def test_49_check_no_interface_oduc4(self):
751 response = test_utils.check_netconf_node_request("XPDR-A2", "interface/XPDR2-NETWORK1-ODUC4")
752 self.assertEqual(response.status_code, requests.codes.conflict)
754 # 3f) Delete OTUC4 device interfaces
755 def test_50_service_path_delete_otuc4(self):
756 response = test_utils.service_path_request("delete", "service_OTUC4", "0",
757 [{"node-id": "XPDR-A2", "dest-tp": "XPDR2-NETWORK1"}],
758 196.1, 75, 196.0375, 196.125, 755,
761 self.assertEqual(response.status_code, requests.codes.ok)
762 res = response.json()
763 self.assertIn('Request processed', res["output"]["result"])
764 self.assertTrue(res["output"]["success"])
766 def test_51_check_no_interface_otuc4(self):
767 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTUC4")
768 self.assertEqual(response.status_code, requests.codes.conflict)
770 def test_52_check_no_interface_otsig(self):
771 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-OTSIGROUP-400G")
772 self.assertEqual(response.status_code, requests.codes.conflict)
774 def test_53_check_no_interface_otsi(self):
775 response = test_utils.check_netconf_node_request("XPDR-A1", "interface/XPDR2-NETWORK1-755:768")
776 self.assertEqual(response.status_code, requests.codes.conflict)
778 # Disconnect the XPDR
779 def test_54_xpdr_device_disconnection(self):
780 response = test_utils_rfc8040.unmount_device("XPDR-A2")
781 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
783 def test_55_xpdr_device_disconnected(self):
784 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
785 self.assertEqual(response['status_code'], requests.codes.conflict)
786 self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
787 self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
788 self.assertEqual(response['connection-status']['error-message'],
789 'Request could not be completed because the relevant data model content does not exist')
791 def test_56_xpdr_device_not_connected(self):
792 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
793 self.assertEqual(response['status_code'], requests.codes.conflict)
794 self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
795 self.assertEqual(response['node-info']['error-tag'], 'data-missing')
796 self.assertEqual(response['node-info']['error-message'],
797 'Request could not be completed because the relevant data model content does not exist')
800 if __name__ == '__main__':
801 unittest.main(verbosity=2)