2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
14 # pylint: disable=wrong-import-order
21 sys.path.append("transportpce_tests/common")
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 class TransportPCE400GPortMappingTesting(unittest.TestCase):
30 NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR3-NETWORK1",
31 "supporting-port": "L1",
32 "supported-interface-capability": [
33 "org-openroadm-port-types:if-otsi-otsigroup"
35 "port-direction": "bidirectional",
36 "port-qual": "xpdr-network",
37 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
38 "xponder-type": "tpdr",
39 "port-admin-state": "InService",
40 "port-oper-state": "InService"}
41 CLIENT_CAPABILITIES = ["org-openroadm-port-types:if-100GE",
42 "org-openroadm-port-types:if-OCH-OTU4-ODU4"]
47 cls.processes = test_utils.start_tpce()
48 cls.processes = test_utils.start_sims([("xpdra2", cls.NODE_VERSION),
49 ("xpdrc2", cls.NODE_VERSION)])
52 def tearDownClass(cls):
53 # pylint: disable=not-an-iterable
54 for process in cls.processes:
55 test_utils.shutdown_process(process)
56 print("all processes killed")
59 # pylint: disable=consider-using-f-string
60 print("execution of {}".format(self.id().split(".")[-1]))
63 def test_01_xpdr_device_connection(self):
64 response = test_utils.mount_device("XPDR-A2",
65 ("xpdra2", self.NODE_VERSION))
66 self.assertEqual(response.status_code, requests.codes.created,
67 test_utils.CODE_SHOULD_BE_201)
69 def test_01a_xpdr_device_connection(self):
70 response = test_utils.mount_device("XPDR-C2",
71 ("xpdrc2", self.NODE_VERSION))
72 self.assertEqual(response.status_code, requests.codes.created,
73 test_utils.CODE_SHOULD_BE_201)
75 # Check the correct capabilities for the client ports (if-100GE, if-100GE-ODU4,
77 def test_02_check_client_capabilities(self):
78 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-CLIENT1")
79 self.assertEqual(response["status_code"], requests.codes.ok)
81 self.CLIENT_CAPABILITIES,
82 sorted(response["mapping"][0]["supported-interface-capability"]))
84 def test_03_check_client_capabilities(self):
85 response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-CLIENT1")
86 self.assertEqual(response["status_code"], requests.codes.ok)
88 self.CLIENT_CAPABILITIES,
89 sorted(response["mapping"][0]["supported-interface-capability"]))
91 def test_04_100g_ofec_service_path_create(self):
92 response = test_utils.transportpce_api_rpc_request(
93 "transportpce-device-renderer", "service-path",
95 "service-name": "service_100GE_ofec",
97 "modulation-format": "dp-qpsk",
98 "operation": "create",
100 "node-id": "XPDR-A2",
101 "src-tp": "XPDR3-CLIENT1",
102 "dest-tp": "XPDR3-NETWORK1"
105 "node-id": "XPDR-C2",
106 "src-tp": "XPDR3-CLIENT1",
107 "dest-tp": "XPDR3-NETWORK1"
109 "center-freq": 193.0,
113 "lower-spectral-slot-number": 265,
114 "higher-spectral-slot-number": 272,
116 self.assertEqual(response["status_code"], requests.codes.ok)
117 self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
118 # node-interface is list which does not preserve the order
121 if response["output"]["node-interface"][0]["node-id"] == z_side:
122 a_side, z_side = z_side, a_side
125 "otu-interface-id": [
126 "XPDR3-NETWORK1-OTUC1"
128 "odu-interface-id": [
129 "XPDR3-NETWORK1-ODU4",
130 "XPDR3-NETWORK1-ODUC1"
132 "och-interface-id": [
133 "XPDR3-NETWORK1-265:272",
134 "XPDR3-NETWORK1-OTSIGROUP-100G"
136 "eth-interface-id": [
137 "XPDR3-CLIENT1-ETHERNET"
139 {x: (sorted(response["output"]["node-interface"][0][x])
140 if isinstance(response["output"]["node-interface"][0][x], list)
141 else response["output"]["node-interface"][0][x])
142 for x in response["output"]["node-interface"][0].keys()})
145 "otu-interface-id": [
146 "XPDR3-NETWORK1-OTUC1"
148 "odu-interface-id": [
149 "XPDR3-NETWORK1-ODU4",
150 "XPDR3-NETWORK1-ODUC1"
152 "och-interface-id": [
153 "XPDR3-NETWORK1-265:272",
154 "XPDR3-NETWORK1-OTSIGROUP-100G"
156 "eth-interface-id": [
157 "XPDR3-CLIENT1-ETHERNET"
159 {x: (sorted(response["output"]["node-interface"][1][x])
160 if isinstance(response["output"]["node-interface"][1][x], list)
161 else response["output"]["node-interface"][1][x])
162 for x in response["output"]["node-interface"][1].keys()})
164 def test_05_get_portmapping_network1(self):
165 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
166 self.assertEqual(response["status_code"], requests.codes.ok)
167 self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
168 self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "FDvaQIf2Z08="
169 self.NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR3-CLIENT1"
171 self.NETWORK1_CHECK_DICT,
174 def test_06_get_portmapping_network1(self):
175 response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
176 self.assertEqual(response["status_code"], requests.codes.ok)
177 self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
178 self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "AJpkaVmZKJk5"
179 self.NETWORK1_CHECK_DICT["connection-map-lcp"] = "XPDR3-CLIENT1"
181 self.NETWORK1_CHECK_DICT,
184 def test_07_check_interface_otsi(self):
185 # pylint: disable=line-too-long
186 response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
187 self.assertEqual(response["status_code"], requests.codes.ok)
188 input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
189 "administrative-state": "inService",
190 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
191 "type": "org-openroadm-interfaces:otsi",
192 "supporting-port": "L1"}
194 "frequency": 193.00000,
195 "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
196 "fec": "org-openroadm-common-types:ofec",
197 "transmit-power": -5,
198 "provision-mode": "explicit",
199 "modulation-format": "dp-qpsk"}
201 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
202 response["interface"][0])
203 self.assertDictEqual(dict(input_dict_2,
204 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
205 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
206 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
207 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
209 def test_08_check_interface_otsi(self):
210 # pylint: disable=line-too-long
211 response = test_utils.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
212 self.assertEqual(response["status_code"], requests.codes.ok)
213 input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
214 "administrative-state": "inService",
215 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
216 "type": "org-openroadm-interfaces:otsi",
217 "supporting-port": "L1"}
219 "frequency": 193.00000,
220 "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
221 "fec": "org-openroadm-common-types:ofec",
222 "transmit-power": -5,
223 "provision-mode": "explicit",
224 "modulation-format": "dp-qpsk"}
226 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
227 response["interface"][0])
228 self.assertDictEqual(dict(input_dict_2,
229 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
230 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
231 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
232 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
234 def test_09_check_interface_otsig(self):
235 response = test_utils.check_node_attribute_request(
236 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
237 self.assertEqual(response["status_code"], requests.codes.ok)
238 input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
239 "administrative-state": "inService",
240 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
241 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
242 "type": "org-openroadm-interfaces:otsi-group",
243 "supporting-port": "L1"}
244 input_dict_2 = {"group-id": 1,
245 "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
247 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
248 response["interface"][0])
249 self.assertDictEqual(dict(input_dict_2,
250 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
251 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
253 def test_10_check_interface_otsig(self):
254 response = test_utils.check_node_attribute_request(
255 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
256 self.assertEqual(response["status_code"], requests.codes.ok)
257 input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
258 "administrative-state": "inService",
259 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
260 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
261 "type": "org-openroadm-interfaces:otsi-group",
262 "supporting-port": "L1"}
263 input_dict_2 = {"group-id": 1,
264 "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
266 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
267 response["interface"][0])
268 self.assertDictEqual(dict(input_dict_2,
269 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
270 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
272 def test_11_check_interface_otuc1(self):
273 response = test_utils.check_node_attribute_request(
274 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTUC1")
275 self.assertEqual(response["status_code"], requests.codes.ok)
276 input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
277 "administrative-state": "inService",
278 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
279 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
280 "type": "org-openroadm-interfaces:otnOtu",
281 "supporting-port": "L1"}
282 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
283 "degthr-percentage": 100,
284 "tim-detect-mode": "Disabled",
288 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
289 response["interface"][0])
290 self.assertDictEqual(dict(input_dict_2,
291 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
292 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
294 def test_12_check_interface_otuc1(self):
295 response = test_utils.check_node_attribute_request(
296 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTUC1")
297 self.assertEqual(response["status_code"], requests.codes.ok)
298 input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
299 "administrative-state": "inService",
300 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
301 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
302 "type": "org-openroadm-interfaces:otnOtu",
303 "supporting-port": "L1"}
304 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
305 "degthr-percentage": 100,
306 "tim-detect-mode": "Disabled",
310 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
311 response["interface"][0])
312 self.assertDictEqual(dict(input_dict_2,
313 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
314 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
316 def test_13_check_interface_oduc1(self):
317 response = test_utils.check_node_attribute_request(
318 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
319 self.assertEqual(response["status_code"], requests.codes.ok)
320 input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
321 "administrative-state": "inService",
322 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
323 "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
324 "type": "org-openroadm-interfaces:otnOdu",
325 "supporting-port": "L1"}
327 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
328 "rate": "org-openroadm-otn-common-types:ODUCn",
330 "degthr-percentage": 100,
331 "monitoring-mode": "terminated",
334 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
335 response["interface"][0])
336 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
337 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
338 self.assertDictEqual(
339 {"payload-type": "22", "exp-payload-type": "22"},
340 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
342 def test_14_check_interface_oduc1(self):
343 response = test_utils.check_node_attribute_request(
344 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
345 self.assertEqual(response["status_code"], requests.codes.ok)
347 input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
348 "administrative-state": "inService",
349 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
350 "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
351 "type": "org-openroadm-interfaces:otnOdu",
352 "supporting-port": "L1"}
354 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
355 "rate": "org-openroadm-otn-common-types:ODUCn",
357 "degthr-percentage": 100,
358 "monitoring-mode": "terminated",
361 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
362 response["interface"][0])
363 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
364 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
365 self.assertDictEqual(
366 {"payload-type": "22", "exp-payload-type": "22"},
367 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
369 def test_15_check_interface_odu4(self):
370 response = test_utils.check_node_attribute_request(
371 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
372 self.assertEqual(response["status_code"], requests.codes.ok)
374 input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
375 "administrative-state": "inService",
376 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
377 "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
378 "type": "org-openroadm-interfaces:otnOdu",
379 "supporting-port": "L1"}
381 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
382 "rate": "org-openroadm-otn-common-types:ODU4",
384 "degthr-percentage": 100,
385 "monitoring-mode": "terminated"
387 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
388 response["interface"][0])
389 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
390 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
391 self.assertDictEqual(
392 {"payload-type": "07", "exp-payload-type": "07"},
393 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
395 # TODO: Check the trib-port numbering
397 def test_16_check_interface_odu4(self):
398 response = test_utils.check_node_attribute_request(
399 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
400 self.assertEqual(response["status_code"], requests.codes.ok)
402 input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
403 "administrative-state": "inService",
404 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
405 "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
406 "type": "org-openroadm-interfaces:otnOdu",
407 "supporting-port": "L1"}
409 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
410 "rate": "org-openroadm-otn-common-types:ODU4",
412 "degthr-percentage": 100,
413 "monitoring-mode": "terminated"
415 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
416 response["interface"][0])
417 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
418 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
419 self.assertDictEqual(
420 {"payload-type": "07", "exp-payload-type": "07"},
421 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
423 # TODO: Check the trib-port numbering
425 def test_17_check_interface_100ge_client(self):
426 response = test_utils.check_node_attribute_request(
427 "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
428 self.assertEqual(response["status_code"], requests.codes.ok)
429 input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
430 "administrative-state": "inService",
431 "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
432 "type": "org-openroadm-interfaces:ethernetCsmacd",
433 "supporting-port": "C1"
435 input_dict_2 = {"speed": 100000,
436 "fec": "org-openroadm-common-types:off"}
437 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
438 response["interface"][0])
439 self.assertDictEqual(dict(input_dict_2,
440 **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
441 response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
443 def test_18_check_interface_100ge_client(self):
444 response = test_utils.check_node_attribute_request(
445 "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
446 self.assertEqual(response["status_code"], requests.codes.ok)
447 input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
448 "administrative-state": "inService",
449 "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
450 "type": "org-openroadm-interfaces:ethernetCsmacd",
451 "supporting-port": "C1"
453 input_dict_2 = {"speed": 100000,
454 "fec": "org-openroadm-common-types:off"}
455 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
456 response["interface"][0])
457 self.assertDictEqual(dict(input_dict_2,
458 **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
459 response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
461 # Delete the service path
462 def test_19_service_path_delete_100ge(self):
463 response = test_utils.transportpce_api_rpc_request(
464 "transportpce-device-renderer", "service-path",
466 "service-name": "service_100GE_ofec",
468 "modulation-format": "dp-qpsk",
469 "operation": "delete",
471 "node-id": "XPDR-A2",
472 "src-tp": "XPDR3-CLIENT1",
473 "dest-tp": "XPDR3-NETWORK1"
476 "node-id": "XPDR-C2",
477 "src-tp": "XPDR3-CLIENT1",
478 "dest-tp": "XPDR3-NETWORK1"
480 "center-freq": 193.0,
484 "lower-spectral-slot-number": 265,
485 "higher-spectral-slot-number": 272,
487 self.assertEqual(response["status_code"], requests.codes.ok)
488 self.assertIn("Request processed", response["output"]["result"])
490 def test_20_check_no_interface_100ge_client(self):
491 response = test_utils.check_node_attribute_request(
492 "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
493 self.assertEqual(response["status_code"], requests.codes.conflict)
495 def test_21_check_no_interface_100ge_client(self):
496 response = test_utils.check_node_attribute_request(
497 "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
498 self.assertEqual(response["status_code"], requests.codes.conflict)
500 def test_22_check_no_interface_odu4(self):
501 response = test_utils.check_node_attribute_request(
502 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
503 self.assertEqual(response["status_code"], requests.codes.conflict)
505 def test_23_check_no_interface_odu4(self):
506 response = test_utils.check_node_attribute_request(
507 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
508 self.assertEqual(response["status_code"], requests.codes.conflict)
510 def test_24_check_no_interface_otuc2(self):
511 response = test_utils.check_node_attribute_request(
512 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
513 self.assertEqual(response["status_code"], requests.codes.conflict)
515 def test_25_check_no_interface_otuc2(self):
516 response = test_utils.check_node_attribute_request(
517 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
518 self.assertEqual(response["status_code"], requests.codes.conflict)
520 # Check if port-mapping data is updated, where the supporting-otucn is deleted
521 def test_26_check_no_otuc1(self):
522 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
523 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
525 def test_27_check_no_otuc1(self):
526 response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
527 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
529 def test_28_check_no_interface_otsig(self):
530 response = test_utils.check_node_attribute_request(
531 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
532 self.assertEqual(response["status_code"], requests.codes.conflict)
534 def test_29_check_no_interface_otsig(self):
535 response = test_utils.check_node_attribute_request(
536 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
537 self.assertEqual(response["status_code"], requests.codes.conflict)
539 def test_30_check_no_interface_otsi(self):
540 response = test_utils.check_node_attribute_request(
541 "XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
542 self.assertEqual(response["status_code"], requests.codes.conflict)
544 def test_31_check_no_interface_otsi(self):
545 response = test_utils.check_node_attribute_request(
546 "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
547 self.assertEqual(response["status_code"], requests.codes.conflict)
549 # 200G 31.6 Gbaud mode for muxponder, with qam16
550 def test_32_service_path_create_otuc2(self):
551 response = test_utils.transportpce_api_rpc_request(
552 "transportpce-device-renderer", "service-path",
554 "service-name": "service_OTUC2",
556 "modulation-format": "dp-qam16",
557 "operation": "create",
559 "node-id": "XPDR-A2",
560 "dest-tp": "XPDR2-NETWORK1"
563 "node-id": "XPDR-C2",
564 "dest-tp": "XPDR2-NETWORK1"
566 "center-freq": 193.0,
570 "lower-spectral-slot-number": 265,
571 "higher-spectral-slot-number": 272,
573 self.assertEqual(response["status_code"], requests.codes.ok)
574 self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
575 # node-interface is list which does not preserve the order
578 if response["output"]["node-interface"][0]["node-id"] == z_side:
579 a_side, z_side = z_side, a_side
582 "otu-interface-id": [
583 "XPDR2-NETWORK1-OTUC2"
585 "och-interface-id": [
586 "XPDR2-NETWORK1-OTSIGROUP-200G",
587 "XPDR2-NETWORK1-265:272"
589 response["output"]["node-interface"][0])
592 "otu-interface-id": [
593 "XPDR2-NETWORK1-OTUC2"
595 "och-interface-id": [
596 "XPDR2-NETWORK1-OTSIGROUP-200G",
597 "XPDR2-NETWORK1-265:272"
599 response["output"]["node-interface"][1])
600 # Update the network dict variable for mpdr
601 self.NETWORK1_CHECK_DICT["logical-connection-point"] = "XPDR2-NETWORK1"
602 self.NETWORK1_CHECK_DICT["supporting-circuit-pack-name"] = "1/2/2-PLUG-NET"
603 self.NETWORK1_CHECK_DICT["port-qual"] = "switch-network"
604 self.NETWORK1_CHECK_DICT["xponder-type"] = "mpdr"
605 self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw="
607 def test_33_get_portmapping_network1(self):
608 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
609 self.assertEqual(response["status_code"], requests.codes.ok)
610 self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
611 del self.NETWORK1_CHECK_DICT["connection-map-lcp"]
613 self.NETWORK1_CHECK_DICT,
616 def test_34_check_interface_otsi(self):
617 # pylint: disable=line-too-long
618 response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
619 self.assertEqual(response["status_code"], requests.codes.ok)
620 input_dict_1 = {"name": "XPDR2-NETWORK1-265:272",
621 "administrative-state": "inService",
622 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
623 "type": "org-openroadm-interfaces:otsi",
624 "supporting-port": "L1"}
626 "frequency": 193.00000,
627 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
628 "fec": "org-openroadm-common-types:ofec",
629 "transmit-power": -5,
630 "provision-mode": "explicit",
631 "modulation-format": "dp-qam16"}
633 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
634 response["interface"][0])
635 self.assertDictEqual(dict(input_dict_2,
636 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
637 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
638 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.8", "iid": [1, 2]},
639 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
641 def test_35_check_interface_otsig(self):
642 response = test_utils.check_node_attribute_request(
643 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
644 self.assertEqual(response["status_code"], requests.codes.ok)
645 input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G",
646 "administrative-state": "inService",
647 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
648 ["supporting-interface-list"][0]: "XPDR2-NETWORK1-265:272",
649 "type": "org-openroadm-interfaces:otsi-group",
650 "supporting-port": "L1"}
651 input_dict_2 = {"group-id": 1,
652 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
654 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
655 response["interface"][0])
656 self.assertDictEqual(dict(input_dict_2,
657 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
658 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
660 def test_36_check_interface_otuc2(self):
661 response = test_utils.check_node_attribute_request(
662 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
663 self.assertEqual(response["status_code"], requests.codes.ok)
664 input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2",
665 "administrative-state": "inService",
666 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
667 ["supporting-interface-list"][0]: "XPDR2-NETWORK1-OTSIGROUP-200G",
668 "type": "org-openroadm-interfaces:otnOtu",
669 "supporting-port": "L1"}
670 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
671 "degthr-percentage": 100,
672 "tim-detect-mode": "Disabled",
676 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
677 response["interface"][0])
678 self.assertDictEqual(dict(input_dict_2,
679 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
680 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
682 # We do not need other test cases for the otn-service-path (ODUC2 and Ethernet)
683 # as there is no change in code and are covered in test02_otn_renderer
685 def test_37_service_path_delete_otuc2(self):
686 response = test_utils.transportpce_api_rpc_request(
687 "transportpce-device-renderer", "service-path",
689 "modulation-format": "dp-qam16",
690 "operation": "delete",
691 "service-name": "service_OTUC2",
693 "center-freq": 193.0,
697 "lower-spectral-slot-number": 265,
698 "higher-spectral-slot-number": 272,
701 "node-id": "XPDR-A2",
702 "dest-tp": "XPDR2-NETWORK1"
705 "node-id": "XPDR-C2",
706 "dest-tp": "XPDR2-NETWORK1"
710 self.assertEqual(response["status_code"], requests.codes.ok)
711 self.assertIn("Request processed", response["output"]["result"])
712 del self.NETWORK1_CHECK_DICT["supporting-otucn"]
714 def test_38_check_no_interface_otuc2(self):
715 response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
716 self.assertEqual(response["status_code"], requests.codes.conflict)
718 def test_39_check_no_interface_otsig(self):
719 response = test_utils.check_node_attribute_request(
720 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
721 self.assertEqual(response["status_code"], requests.codes.conflict)
723 def test_40_check_no_interface_otsi(self):
724 response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
725 self.assertEqual(response["status_code"], requests.codes.conflict)
727 def test_41_check_no_otuc2(self):
728 response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
729 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
731 # Disconnect the XPDR
732 def test_42_xpdr_device_disconnection(self):
733 response = test_utils.unmount_device("XPDR-A2")
734 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
736 def test_43_xpdr_device_disconnected(self):
737 response = test_utils.check_device_connection("XPDR-A2")
738 self.assertEqual(response["status_code"], requests.codes.conflict)
739 self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
740 self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
741 self.assertEqual(response["connection-status"]["error-message"],
742 "Request could not be completed because the relevant data model content does not exist")
744 def test_44_xpdr_device_not_connected(self):
745 response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None)
746 self.assertEqual(response["status_code"], requests.codes.conflict)
747 self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
748 self.assertEqual(response["node-info"]["error-tag"], "data-missing")
749 self.assertEqual(response["node-info"]["error-message"],
750 "Request could not be completed because the relevant data model content does not exist")
752 def test_45_xpdr_device_disconnection(self):
753 response = test_utils.unmount_device("XPDR-C2")
754 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
756 def test_46_xpdr_device_disconnected(self):
757 response = test_utils.check_device_connection("XPDR-C2")
758 self.assertEqual(response["status_code"], requests.codes.conflict)
759 self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
760 self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
761 self.assertEqual(response["connection-status"]["error-message"],
762 "Request could not be completed because the relevant data model content does not exist")
764 def test_47_xpdr_device_not_connected(self):
765 response = test_utils.get_portmapping_node_attr("XPDR-C2", "node-info", None)
766 self.assertEqual(response["status_code"], requests.codes.conflict)
767 self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
768 self.assertEqual(response["node-info"]["error-tag"], "data-missing")
769 self.assertEqual(response["node-info"]["error-message"],
770 "Request could not be completed because the relevant data model content does not exist")
773 if __name__ == "__main__":
774 unittest.main(verbosity=2)