2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
14 # pylint: disable=wrong-import-order
21 sys.path.append("transportpce_tests/common")
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils_rfc8040 # nopep8
27 class TransportPCE400GPortMappingTesting(unittest.TestCase):
30 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR3-NETWORK1",
31 "supporting-port": "L1",
32 "supported-interface-capability": [
33 "org-openroadm-port-types:if-otsi-otsigroup"
35 "port-direction": "bidirectional",
36 "port-qual": "xpdr-network",
37 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
38 "xponder-type": "tpdr",
39 "port-admin-state": "InService",
40 "port-oper-state": "InService"}
41 CLIENT_CAPABILITIES = ["org-openroadm-port-types:if-100GE",
42 "org-openroadm-port-types:if-OCH-OTU4-ODU4"]
47 cls.processes = test_utils_rfc8040.start_tpce()
48 cls.processes = test_utils_rfc8040.start_sims([("xpdra2", cls.NODE_VERSION),
49 ("xpdrc2", cls.NODE_VERSION)])
52 def tearDownClass(cls):
53 # pylint: disable=not-an-iterable
54 for process in cls.processes:
55 test_utils_rfc8040.shutdown_process(process)
56 print("all processes killed")
59 # pylint: disable=consider-using-f-string
60 print("execution of {}".format(self.id().split(".")[-1]))
63 def test_01_xpdr_device_connection(self):
64 response = test_utils_rfc8040.mount_device("XPDR-A2",
65 ("xpdra2", self.NODE_VERSION))
66 self.assertEqual(response.status_code, requests.codes.created,
67 test_utils_rfc8040.CODE_SHOULD_BE_201)
69 def test_01a_xpdr_device_connection(self):
70 response = test_utils_rfc8040.mount_device("XPDR-C2",
71 ("xpdrc2", self.NODE_VERSION))
72 self.assertEqual(response.status_code, requests.codes.created,
73 test_utils_rfc8040.CODE_SHOULD_BE_201)
75 # Check the correct capabilities for the client ports (if-100GE, if-100GE-ODU4,
77 def test_02_check_client_capabilities(self):
78 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-CLIENT1")
79 self.assertEqual(response["status_code"], requests.codes.ok)
81 self.CLIENT_CAPABILITIES,
82 sorted(response["mapping"][0]["supported-interface-capability"]))
84 def test_03_check_client_capabilities(self):
85 response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-CLIENT1")
86 self.assertEqual(response["status_code"], requests.codes.ok)
88 self.CLIENT_CAPABILITIES,
89 sorted(response["mapping"][0]["supported-interface-capability"]))
91 def test_04_100g_ofec_service_path_create(self):
92 response = test_utils_rfc8040.transportpce_api_rpc_request(
93 "transportpce-device-renderer", "service-path",
95 "service-name": "service_100GE_ofec",
97 "modulation-format": "dp-qpsk",
98 "operation": "create",
100 "node-id": "XPDR-A2",
101 "src-tp": "XPDR3-CLIENT1",
102 "dest-tp": "XPDR3-NETWORK1"
105 "node-id": "XPDR-C2",
106 "src-tp": "XPDR3-CLIENT1",
107 "dest-tp": "XPDR3-NETWORK1"
109 "center-freq": 193.0,
113 "lower-spectral-slot-number": 265,
114 "higher-spectral-slot-number": 272,
116 self.assertEqual(response["status_code"], requests.codes.ok)
117 self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
118 # node-interface is list which does not preserve the order
121 if response["output"]["node-interface"][0]["node-id"] == z_side:
122 a_side, z_side = z_side, a_side
125 "otu-interface-id": [
126 "XPDR3-NETWORK1-OTUC1"
128 "odu-interface-id": [
129 "XPDR3-NETWORK1-ODU4",
130 "XPDR3-NETWORK1-ODUC1"
132 "och-interface-id": [
133 "XPDR3-NETWORK1-265:272",
134 "XPDR3-NETWORK1-OTSIGROUP-100G"
136 "eth-interface-id": [
137 "XPDR3-CLIENT1-ETHERNET"
139 {x: (sorted(response["output"]["node-interface"][0][x])
140 if isinstance(response["output"]["node-interface"][0][x], list)
141 else response["output"]["node-interface"][0][x])
142 for x in response["output"]["node-interface"][0].keys()})
145 "otu-interface-id": [
146 "XPDR3-NETWORK1-OTUC1"
148 "odu-interface-id": [
149 "XPDR3-NETWORK1-ODU4",
150 "XPDR3-NETWORK1-ODUC1"
152 "och-interface-id": [
153 "XPDR3-NETWORK1-265:272",
154 "XPDR3-NETWORK1-OTSIGROUP-100G"
156 "eth-interface-id": [
157 "XPDR3-CLIENT1-ETHERNET"
159 {x: (sorted(response["output"]["node-interface"][1][x])
160 if isinstance(response["output"]["node-interface"][1][x], list)
161 else response["output"]["node-interface"][1][x])
162 for x in response["output"]["node-interface"][1].keys()})
164 def test_05_get_portmapping_network1(self):
165 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-NETWORK1")
166 self.assertEqual(response["status_code"], requests.codes.ok)
167 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
168 self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "FDvaQIf2Z08="
170 self.NETWORK2_CHECK_DICT,
173 def test_06_get_portmapping_network1(self):
174 response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-NETWORK1")
175 self.assertEqual(response["status_code"], requests.codes.ok)
176 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
177 self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "AJpkaVmZKJk5"
179 self.NETWORK2_CHECK_DICT,
182 def test_07_check_interface_otsi(self):
183 # pylint: disable=line-too-long
184 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
185 self.assertEqual(response["status_code"], requests.codes.ok)
186 input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
187 "administrative-state": "inService",
188 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
189 "type": "org-openroadm-interfaces:otsi",
190 "supporting-port": "L1"}
192 "frequency": 193.00000,
193 "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
194 "fec": "org-openroadm-common-types:ofec",
195 "transmit-power": -5,
196 "provision-mode": "explicit",
197 "modulation-format": "dp-qpsk"}
199 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
200 response["interface"][0])
201 self.assertDictEqual(dict(input_dict_2,
202 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
203 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
204 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
205 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
207 def test_08_check_interface_otsi(self):
208 # pylint: disable=line-too-long
209 response = test_utils_rfc8040.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
210 self.assertEqual(response["status_code"], requests.codes.ok)
211 input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
212 "administrative-state": "inService",
213 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
214 "type": "org-openroadm-interfaces:otsi",
215 "supporting-port": "L1"}
217 "frequency": 193.00000,
218 "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
219 "fec": "org-openroadm-common-types:ofec",
220 "transmit-power": -5,
221 "provision-mode": "explicit",
222 "modulation-format": "dp-qpsk"}
224 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
225 response["interface"][0])
226 self.assertDictEqual(dict(input_dict_2,
227 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
228 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
229 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
230 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
232 def test_09_check_interface_otsig(self):
233 response = test_utils_rfc8040.check_node_attribute_request(
234 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
235 self.assertEqual(response["status_code"], requests.codes.ok)
236 input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
237 "administrative-state": "inService",
238 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
239 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
240 "type": "org-openroadm-interfaces:otsi-group",
241 "supporting-port": "L1"}
242 input_dict_2 = {"group-id": 1,
243 "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
245 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
246 response["interface"][0])
247 self.assertDictEqual(dict(input_dict_2,
248 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
249 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
251 def test_10_check_interface_otsig(self):
252 response = test_utils_rfc8040.check_node_attribute_request(
253 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
254 self.assertEqual(response["status_code"], requests.codes.ok)
255 input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
256 "administrative-state": "inService",
257 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
258 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
259 "type": "org-openroadm-interfaces:otsi-group",
260 "supporting-port": "L1"}
261 input_dict_2 = {"group-id": 1,
262 "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
264 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
265 response["interface"][0])
266 self.assertDictEqual(dict(input_dict_2,
267 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
268 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
270 def test_11_check_interface_otuc1(self):
271 response = test_utils_rfc8040.check_node_attribute_request(
272 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTUC1")
273 self.assertEqual(response["status_code"], requests.codes.ok)
274 input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
275 "administrative-state": "inService",
276 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
277 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
278 "type": "org-openroadm-interfaces:otnOtu",
279 "supporting-port": "L1"}
280 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
281 "degthr-percentage": 100,
282 "tim-detect-mode": "Disabled",
286 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
287 response["interface"][0])
288 self.assertDictEqual(dict(input_dict_2,
289 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
290 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
292 def test_12_check_interface_otuc1(self):
293 response = test_utils_rfc8040.check_node_attribute_request(
294 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTUC1")
295 self.assertEqual(response["status_code"], requests.codes.ok)
296 input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
297 "administrative-state": "inService",
298 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
299 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
300 "type": "org-openroadm-interfaces:otnOtu",
301 "supporting-port": "L1"}
302 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
303 "degthr-percentage": 100,
304 "tim-detect-mode": "Disabled",
308 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
309 response["interface"][0])
310 self.assertDictEqual(dict(input_dict_2,
311 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
312 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
314 def test_13_check_interface_oduc1(self):
315 response = test_utils_rfc8040.check_node_attribute_request(
316 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
317 self.assertEqual(response["status_code"], requests.codes.ok)
318 input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
319 "administrative-state": "inService",
320 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
321 "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
322 "type": "org-openroadm-interfaces:otnOdu",
323 "supporting-port": "L1"}
325 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
326 "rate": "org-openroadm-otn-common-types:ODUCn",
328 "degthr-percentage": 100,
329 "monitoring-mode": "terminated",
332 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
333 response["interface"][0])
334 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
335 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
336 self.assertDictEqual(
337 {"payload-type": "22", "exp-payload-type": "22"},
338 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
340 def test_14_check_interface_oduc1(self):
341 response = test_utils_rfc8040.check_node_attribute_request(
342 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
343 self.assertEqual(response["status_code"], requests.codes.ok)
345 input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
346 "administrative-state": "inService",
347 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
348 "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
349 "type": "org-openroadm-interfaces:otnOdu",
350 "supporting-port": "L1"}
352 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
353 "rate": "org-openroadm-otn-common-types:ODUCn",
355 "degthr-percentage": 100,
356 "monitoring-mode": "terminated",
359 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
360 response["interface"][0])
361 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
362 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
363 self.assertDictEqual(
364 {"payload-type": "22", "exp-payload-type": "22"},
365 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
367 def test_15_check_interface_odu4(self):
368 response = test_utils_rfc8040.check_node_attribute_request(
369 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
370 self.assertEqual(response["status_code"], requests.codes.ok)
372 input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
373 "administrative-state": "inService",
374 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
375 "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
376 "type": "org-openroadm-interfaces:otnOdu",
377 "supporting-port": "L1"}
379 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
380 "rate": "org-openroadm-otn-common-types:ODU4",
382 "degthr-percentage": 100,
383 "monitoring-mode": "terminated"
385 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
386 response["interface"][0])
387 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
388 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
389 self.assertDictEqual(
390 {"payload-type": "07", "exp-payload-type": "07"},
391 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
393 # TODO: Check the trib-port numbering
395 def test_16_check_interface_odu4(self):
396 response = test_utils_rfc8040.check_node_attribute_request(
397 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
398 self.assertEqual(response["status_code"], requests.codes.ok)
400 input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
401 "administrative-state": "inService",
402 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
403 "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
404 "type": "org-openroadm-interfaces:otnOdu",
405 "supporting-port": "L1"}
407 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
408 "rate": "org-openroadm-otn-common-types:ODU4",
410 "degthr-percentage": 100,
411 "monitoring-mode": "terminated"
413 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
414 response["interface"][0])
415 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
416 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
417 self.assertDictEqual(
418 {"payload-type": "07", "exp-payload-type": "07"},
419 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
421 # TODO: Check the trib-port numbering
423 def test_17_check_interface_100ge_client(self):
424 response = test_utils_rfc8040.check_node_attribute_request(
425 "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
426 self.assertEqual(response["status_code"], requests.codes.ok)
427 input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
428 "administrative-state": "inService",
429 "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
430 "type": "org-openroadm-interfaces:ethernetCsmacd",
431 "supporting-port": "C1"
433 input_dict_2 = {"speed": 100000,
434 "fec": "org-openroadm-common-types:off"}
435 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
436 response["interface"][0])
437 self.assertDictEqual(dict(input_dict_2,
438 **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
439 response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
441 def test_18_check_interface_100ge_client(self):
442 response = test_utils_rfc8040.check_node_attribute_request(
443 "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
444 self.assertEqual(response["status_code"], requests.codes.ok)
445 input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
446 "administrative-state": "inService",
447 "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
448 "type": "org-openroadm-interfaces:ethernetCsmacd",
449 "supporting-port": "C1"
451 input_dict_2 = {"speed": 100000,
452 "fec": "org-openroadm-common-types:off"}
453 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
454 response["interface"][0])
455 self.assertDictEqual(dict(input_dict_2,
456 **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
457 response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
459 # Delete the service path
460 def test_19_service_path_delete_100ge(self):
461 response = test_utils_rfc8040.transportpce_api_rpc_request(
462 "transportpce-device-renderer", "service-path",
464 "service-name": "service_100GE_ofec",
466 "modulation-format": "dp-qpsk",
467 "operation": "delete",
469 "node-id": "XPDR-A2",
470 "src-tp": "XPDR3-CLIENT1",
471 "dest-tp": "XPDR3-NETWORK1"
474 "node-id": "XPDR-C2",
475 "src-tp": "XPDR3-CLIENT1",
476 "dest-tp": "XPDR3-NETWORK1"
478 "center-freq": 193.0,
482 "lower-spectral-slot-number": 265,
483 "higher-spectral-slot-number": 272,
485 self.assertEqual(response["status_code"], requests.codes.ok)
486 self.assertIn("Request processed", response["output"]["result"])
488 def test_20_check_no_interface_100ge_client(self):
489 response = test_utils_rfc8040.check_node_attribute_request(
490 "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
491 self.assertEqual(response["status_code"], requests.codes.conflict)
493 def test_21_check_no_interface_100ge_client(self):
494 response = test_utils_rfc8040.check_node_attribute_request(
495 "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
496 self.assertEqual(response["status_code"], requests.codes.conflict)
498 def test_22_check_no_interface_odu4(self):
499 response = test_utils_rfc8040.check_node_attribute_request(
500 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
501 self.assertEqual(response["status_code"], requests.codes.conflict)
503 def test_23_check_no_interface_odu4(self):
504 response = test_utils_rfc8040.check_node_attribute_request(
505 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
506 self.assertEqual(response["status_code"], requests.codes.conflict)
508 def test_24_check_no_interface_otuc2(self):
509 response = test_utils_rfc8040.check_node_attribute_request(
510 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
511 self.assertEqual(response["status_code"], requests.codes.conflict)
513 def test_25_check_no_interface_otuc2(self):
514 response = test_utils_rfc8040.check_node_attribute_request(
515 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
516 self.assertEqual(response["status_code"], requests.codes.conflict)
518 # Check if port-mapping data is updated, where the supporting-otucn is deleted
519 def test_26_check_no_otuc1(self):
520 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-NETWORK1")
521 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
523 def test_27_check_no_otuc1(self):
524 response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-NETWORK1")
525 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
527 def test_28_check_no_interface_otsig(self):
528 response = test_utils_rfc8040.check_node_attribute_request(
529 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
530 self.assertEqual(response["status_code"], requests.codes.conflict)
532 def test_29_check_no_interface_otsig(self):
533 response = test_utils_rfc8040.check_node_attribute_request(
534 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
535 self.assertEqual(response["status_code"], requests.codes.conflict)
537 def test_30_check_no_interface_otsi(self):
538 response = test_utils_rfc8040.check_node_attribute_request(
539 "XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
540 self.assertEqual(response["status_code"], requests.codes.conflict)
542 def test_31_check_no_interface_otsi(self):
543 response = test_utils_rfc8040.check_node_attribute_request(
544 "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
545 self.assertEqual(response["status_code"], requests.codes.conflict)
547 # 200G 31.6 Gbaud mode for muxponder, with qam16
548 def test_32_service_path_create_otuc2(self):
549 response = test_utils_rfc8040.transportpce_api_rpc_request(
550 "transportpce-device-renderer", "service-path",
552 "service-name": "service_OTUC2",
554 "modulation-format": "dp-qam16",
555 "operation": "create",
557 "node-id": "XPDR-A2",
558 "dest-tp": "XPDR2-NETWORK1"
561 "node-id": "XPDR-C2",
562 "dest-tp": "XPDR2-NETWORK1"
564 "center-freq": 193.0,
568 "lower-spectral-slot-number": 265,
569 "higher-spectral-slot-number": 272,
571 self.assertEqual(response["status_code"], requests.codes.ok)
572 self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
573 # node-interface is list which does not preserve the order
576 if response["output"]["node-interface"][0]["node-id"] == z_side:
577 a_side, z_side = z_side, a_side
580 "otu-interface-id": [
581 "XPDR2-NETWORK1-OTUC2"
583 "och-interface-id": [
584 "XPDR2-NETWORK1-OTSIGROUP-200G",
585 "XPDR2-NETWORK1-265:272"
587 response["output"]["node-interface"][0])
590 "otu-interface-id": [
591 "XPDR2-NETWORK1-OTUC2"
593 "och-interface-id": [
594 "XPDR2-NETWORK1-OTSIGROUP-200G",
595 "XPDR2-NETWORK1-265:272"
597 response["output"]["node-interface"][1])
598 # Update the network dict variable for mpdr
599 self.NETWORK2_CHECK_DICT["logical-connection-point"] = "XPDR2-NETWORK1"
600 self.NETWORK2_CHECK_DICT["supporting-circuit-pack-name"] = "1/2/2-PLUG-NET"
601 self.NETWORK2_CHECK_DICT["port-qual"] = "switch-network"
602 self.NETWORK2_CHECK_DICT["xponder-type"] = "mpdr"
603 self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw="
605 def test_33_get_portmapping_network1(self):
606 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
607 self.assertEqual(response["status_code"], requests.codes.ok)
608 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
610 self.NETWORK2_CHECK_DICT,
613 def test_34_check_interface_otsi(self):
614 # pylint: disable=line-too-long
615 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
616 self.assertEqual(response["status_code"], requests.codes.ok)
617 input_dict_1 = {"name": "XPDR2-NETWORK1-265:272",
618 "administrative-state": "inService",
619 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
620 "type": "org-openroadm-interfaces:otsi",
621 "supporting-port": "L1"}
623 "frequency": 193.00000,
624 "otsi-rate": "org-openroadm-common-optical-channel-types:R200G-otsi",
625 "fec": "org-openroadm-common-types:ofec",
626 "transmit-power": -5,
627 "provision-mode": "explicit",
628 "modulation-format": "dp-qam16"}
630 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
631 response["interface"][0])
632 self.assertDictEqual(dict(input_dict_2,
633 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
634 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
635 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic2.8", "iid": [1, 2]},
636 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
638 def test_35_check_interface_otsig(self):
639 response = test_utils_rfc8040.check_node_attribute_request(
640 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
641 self.assertEqual(response["status_code"], requests.codes.ok)
642 input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G",
643 "administrative-state": "inService",
644 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
645 ["supporting-interface-list"][0]: "XPDR2-NETWORK1-265:272",
646 "type": "org-openroadm-interfaces:otsi-group",
647 "supporting-port": "L1"}
648 input_dict_2 = {"group-id": 1,
649 "group-rate": "org-openroadm-common-optical-channel-types:R200G-otsi"}
651 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
652 response["interface"][0])
653 self.assertDictEqual(dict(input_dict_2,
654 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
655 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
657 def test_36_check_interface_otuc2(self):
658 response = test_utils_rfc8040.check_node_attribute_request(
659 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
660 self.assertEqual(response["status_code"], requests.codes.ok)
661 input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2",
662 "administrative-state": "inService",
663 "supporting-circuit-pack-name": "1/2/2-PLUG-NET",
664 ["supporting-interface-list"][0]: "XPDR2-NETWORK1-OTSIGROUP-200G",
665 "type": "org-openroadm-interfaces:otnOtu",
666 "supporting-port": "L1"}
667 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
668 "degthr-percentage": 100,
669 "tim-detect-mode": "Disabled",
673 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
674 response["interface"][0])
675 self.assertDictEqual(dict(input_dict_2,
676 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
677 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
679 # We do not need other test cases for the otn-service-path (ODUC2 and Ethernet)
680 # as there is no change in code and are covered in test02_otn_renderer
682 def test_37_service_path_delete_otuc2(self):
683 response = test_utils_rfc8040.transportpce_api_rpc_request(
684 "transportpce-device-renderer", "service-path",
686 "modulation-format": "dp-qam16",
687 "operation": "delete",
688 "service-name": "service_OTUC2",
690 "center-freq": 193.0,
694 "lower-spectral-slot-number": 265,
695 "higher-spectral-slot-number": 272,
698 "node-id": "XPDR-A2",
699 "dest-tp": "XPDR2-NETWORK1"
702 "node-id": "XPDR-C2",
703 "dest-tp": "XPDR2-NETWORK1"
707 self.assertEqual(response["status_code"], requests.codes.ok)
708 self.assertIn("Request processed", response["output"]["result"])
709 del self.NETWORK2_CHECK_DICT["supporting-otucn"]
711 def test_38_check_no_interface_otuc2(self):
712 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
713 self.assertEqual(response["status_code"], requests.codes.conflict)
715 def test_39_check_no_interface_otsig(self):
716 response = test_utils_rfc8040.check_node_attribute_request(
717 "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
718 self.assertEqual(response["status_code"], requests.codes.conflict)
720 def test_40_check_no_interface_otsi(self):
721 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
722 self.assertEqual(response["status_code"], requests.codes.conflict)
724 def test_41_check_no_otuc2(self):
725 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
726 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
728 # Disconnect the XPDR
729 def test_42_xpdr_device_disconnection(self):
730 response = test_utils_rfc8040.unmount_device("XPDR-A2")
731 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
733 def test_43_xpdr_device_disconnected(self):
734 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
735 self.assertEqual(response["status_code"], requests.codes.conflict)
736 self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
737 self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
738 self.assertEqual(response["connection-status"]["error-message"],
739 "Request could not be completed because the relevant data model content does not exist")
741 def test_44_xpdr_device_not_connected(self):
742 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
743 self.assertEqual(response["status_code"], requests.codes.conflict)
744 self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
745 self.assertEqual(response["node-info"]["error-tag"], "data-missing")
746 self.assertEqual(response["node-info"]["error-message"],
747 "Request could not be completed because the relevant data model content does not exist")
749 def test_45_xpdr_device_disconnection(self):
750 response = test_utils_rfc8040.unmount_device("XPDR-C2")
751 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
753 def test_46_xpdr_device_disconnected(self):
754 response = test_utils_rfc8040.check_device_connection("XPDR-C2")
755 self.assertEqual(response["status_code"], requests.codes.conflict)
756 self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
757 self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
758 self.assertEqual(response["connection-status"]["error-message"],
759 "Request could not be completed because the relevant data model content does not exist")
761 def test_47_xpdr_device_not_connected(self):
762 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-C2")
763 self.assertEqual(response["status_code"], requests.codes.conflict)
764 self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
765 self.assertEqual(response["node-info"]["error-tag"], "data-missing")
766 self.assertEqual(response["node-info"]["error-message"],
767 "Request could not be completed because the relevant data model content does not exist")
770 if __name__ == "__main__":
771 unittest.main(verbosity=2)