2 ##############################################################################
3 # Copyright (c) 2022 AT&T, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append("transportpce_tests/common")
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040 # nopep8
25 class TransportPCE400GPortMappingTesting(unittest.TestCase):
28 NETWORK2_CHECK_DICT = {"logical-connection-point": "XPDR3-NETWORK1",
29 "supporting-port": "L1",
30 "supported-interface-capability": [
31 "org-openroadm-port-types:if-otsi-otsigroup"
33 "port-direction": "bidirectional",
34 "port-qual": "xpdr-network",
35 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
36 "xponder-type": "tpdr",
37 "port-admin-state": "InService",
38 "port-oper-state": "InService"}
39 CLIENT_CAPABILITIES = ["org-openroadm-port-types:if-OCH-OTU4-ODU4",
40 "org-openroadm-port-types:if-100GE"]
45 cls.processes = test_utils_rfc8040.start_tpce()
46 cls.processes = test_utils_rfc8040.start_sims([("xpdra2", cls.NODE_VERSION),
47 ("xpdrc2", cls.NODE_VERSION)])
50 def tearDownClass(cls):
51 # pylint: disable=not-an-iterable
52 for process in cls.processes:
53 test_utils_rfc8040.shutdown_process(process)
54 print("all processes killed")
57 # pylint: disable=consider-using-f-string
58 print("execution of {}".format(self.id().split(".")[-1]))
61 def test_01_xpdr_device_connection(self):
62 response = test_utils_rfc8040.mount_device("XPDR-A2",
63 ("xpdra2", self.NODE_VERSION))
64 self.assertEqual(response.status_code, requests.codes.created,
65 test_utils_rfc8040.CODE_SHOULD_BE_201)
67 def test_01a_xpdr_device_connection(self):
68 response = test_utils_rfc8040.mount_device("XPDR-C2",
69 ("xpdrc2", self.NODE_VERSION))
70 self.assertEqual(response.status_code, requests.codes.created,
71 test_utils_rfc8040.CODE_SHOULD_BE_201)
73 # Check the correct capabilities for the client ports (if-100GE, if-100GE-ODU4,
75 def test_02_check_client_capabilities(self):
76 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-CLIENT1")
77 self.assertEqual(response["status_code"], requests.codes.ok)
79 self.CLIENT_CAPABILITIES,
80 response["mapping"][0]["supported-interface-capability"])
82 def test_03_check_client_capabilities(self):
83 response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-CLIENT1")
84 self.assertEqual(response["status_code"], requests.codes.ok)
86 self.CLIENT_CAPABILITIES,
87 response["mapping"][0]["supported-interface-capability"])
89 def test_04_100g_ofec_service_path_create(self):
90 response = test_utils_rfc8040.transportpce_api_rpc_request(
91 "transportpce-device-renderer", "service-path",
93 "service-name": "service_100GE_ofec",
95 "modulation-format": "dp-qpsk",
96 "operation": "create",
99 "src-tp": "XPDR3-CLIENT1",
100 "dest-tp": "XPDR3-NETWORK1"
103 "node-id": "XPDR-C2",
104 "src-tp": "XPDR3-CLIENT1",
105 "dest-tp": "XPDR3-NETWORK1"
107 "center-freq": 193.0,
111 "lower-spectral-slot-number": 265,
112 "higher-spectral-slot-number": 272,
114 self.assertEqual(response["status_code"], requests.codes.ok)
115 self.assertIn("Interfaces created successfully for nodes: ", response["output"]["result"])
116 # node-interface is list which does not preserve the order
119 if response["output"]["node-interface"][0]["node-id"] == z_side:
120 a_side, z_side = z_side, a_side
123 "otu-interface-id": [
124 "XPDR3-NETWORK1-OTUC1"
126 "odu-interface-id": [
127 "XPDR3-NETWORK1-ODUC1",
128 "XPDR3-NETWORK1-ODU4"
130 "och-interface-id": [
131 "XPDR3-NETWORK1-265:272",
132 "XPDR3-NETWORK1-OTSIGROUP-100G"
134 "eth-interface-id": [
135 "XPDR3-CLIENT1-ETHERNET"
137 response["output"]["node-interface"][0])
140 "otu-interface-id": [
141 "XPDR3-NETWORK1-OTUC1"
143 "odu-interface-id": [
144 "XPDR3-NETWORK1-ODUC1",
145 "XPDR3-NETWORK1-ODU4"
147 "och-interface-id": [
148 "XPDR3-NETWORK1-265:272",
149 "XPDR3-NETWORK1-OTSIGROUP-100G"
151 "eth-interface-id": [
152 "XPDR3-CLIENT1-ETHERNET"
154 response["output"]["node-interface"][1])
156 def test_05_get_portmapping_network1(self):
157 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-NETWORK1")
158 self.assertEqual(response["status_code"], requests.codes.ok)
159 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
160 self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "FDvaQIf2Z08="
162 self.NETWORK2_CHECK_DICT,
165 def test_06_get_portmapping_network1(self):
166 response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-NETWORK1")
167 self.assertEqual(response["status_code"], requests.codes.ok)
168 self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
169 self.NETWORK2_CHECK_DICT["lcp-hash-val"] = "AJpkaVmZKJk5"
171 self.NETWORK2_CHECK_DICT,
174 def test_07_check_interface_otsi(self):
175 # pylint: disable=line-too-long
176 response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
177 self.assertEqual(response["status_code"], requests.codes.ok)
178 input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
179 "administrative-state": "inService",
180 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
181 "type": "org-openroadm-interfaces:otsi",
182 "supporting-port": "L1"}
184 "frequency": 193.00000,
185 "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
186 "fec": "org-openroadm-common-types:ofec",
187 "transmit-power": -5,
188 "provision-mode": "explicit",
189 "modulation-format": "dp-qpsk"}
191 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
192 response["interface"][0])
193 self.assertDictEqual(dict(input_dict_2,
194 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
195 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
196 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
197 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
199 def test_08_check_interface_otsi(self):
200 # pylint: disable=line-too-long
201 response = test_utils_rfc8040.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
202 self.assertEqual(response["status_code"], requests.codes.ok)
203 input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
204 "administrative-state": "inService",
205 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
206 "type": "org-openroadm-interfaces:otsi",
207 "supporting-port": "L1"}
209 "frequency": 193.00000,
210 "otsi-rate": "org-openroadm-common-optical-channel-types:R100G-otsi",
211 "fec": "org-openroadm-common-types:ofec",
212 "transmit-power": -5,
213 "provision-mode": "explicit",
214 "modulation-format": "dp-qpsk"}
216 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
217 response["interface"][0])
218 self.assertDictEqual(dict(input_dict_2,
219 **response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]),
220 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"])
221 self.assertDictEqual({"foic-type": "org-openroadm-common-optical-channel-types:foic1.4", "iid": [1]},
222 response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
224 def test_09_check_interface_otsig(self):
225 response = test_utils_rfc8040.check_node_attribute_request(
226 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
227 self.assertEqual(response["status_code"], requests.codes.ok)
228 input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
229 "administrative-state": "inService",
230 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
231 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
232 "type": "org-openroadm-interfaces:otsi-group",
233 "supporting-port": "L1"}
234 input_dict_2 = {"group-id": 1,
235 "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
237 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
238 response["interface"][0])
239 self.assertDictEqual(dict(input_dict_2,
240 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
241 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
243 def test_10_check_interface_otsig(self):
244 response = test_utils_rfc8040.check_node_attribute_request(
245 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
246 self.assertEqual(response["status_code"], requests.codes.ok)
247 input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
248 "administrative-state": "inService",
249 "supporting-circuit-pack-name": "1/1/4-PLUG-NET",
250 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-265:272",
251 "type": "org-openroadm-interfaces:otsi-group",
252 "supporting-port": "L1"}
253 input_dict_2 = {"group-id": 1,
254 "group-rate": "org-openroadm-common-optical-channel-types:R100G-otsi"}
256 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
257 response["interface"][0])
258 self.assertDictEqual(dict(input_dict_2,
259 **response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]),
260 response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
262 def test_11_check_interface_otuc1(self):
263 response = test_utils_rfc8040.check_node_attribute_request(
264 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTUC1")
265 self.assertEqual(response["status_code"], requests.codes.ok)
266 input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
267 "administrative-state": "inService",
268 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
269 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
270 "type": "org-openroadm-interfaces:otnOtu",
271 "supporting-port": "L1"}
272 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
273 "degthr-percentage": 100,
274 "tim-detect-mode": "Disabled",
278 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
279 response["interface"][0])
280 self.assertDictEqual(dict(input_dict_2,
281 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
282 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
284 def test_12_check_interface_otuc1(self):
285 response = test_utils_rfc8040.check_node_attribute_request(
286 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTUC1")
287 self.assertEqual(response["status_code"], requests.codes.ok)
288 input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
289 "administrative-state": "inService",
290 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
291 ["supporting-interface-list"][0]: "XPDR3-NETWORK1-OTSIGROUP-100G",
292 "type": "org-openroadm-interfaces:otnOtu",
293 "supporting-port": "L1"}
294 input_dict_2 = {"rate": "org-openroadm-otn-common-types:OTUCn",
295 "degthr-percentage": 100,
296 "tim-detect-mode": "Disabled",
300 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
301 response["interface"][0])
302 self.assertDictEqual(dict(input_dict_2,
303 **response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]),
304 response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
306 def test_13_check_interface_oduc1(self):
307 response = test_utils_rfc8040.check_node_attribute_request(
308 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
309 self.assertEqual(response["status_code"], requests.codes.ok)
310 input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
311 "administrative-state": "inService",
312 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
313 "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
314 "type": "org-openroadm-interfaces:otnOdu",
315 "supporting-port": "L1"}
317 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
318 "rate": "org-openroadm-otn-common-types:ODUCn",
320 "degthr-percentage": 100,
321 "monitoring-mode": "terminated",
324 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
325 response["interface"][0])
326 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
327 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
328 self.assertDictEqual(
329 {"payload-type": "22", "exp-payload-type": "22"},
330 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
332 def test_14_check_interface_oduc1(self):
333 response = test_utils_rfc8040.check_node_attribute_request(
334 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
335 self.assertEqual(response["status_code"], requests.codes.ok)
337 input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
338 "administrative-state": "inService",
339 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
340 "supporting-interface-list": "XPDR3-NETWORK1-OTUC1",
341 "type": "org-openroadm-interfaces:otnOdu",
342 "supporting-port": "L1"}
344 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
345 "rate": "org-openroadm-otn-common-types:ODUCn",
347 "degthr-percentage": 100,
348 "monitoring-mode": "terminated",
351 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
352 response["interface"][0])
353 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
354 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
355 self.assertDictEqual(
356 {"payload-type": "22", "exp-payload-type": "22"},
357 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
359 def test_15_check_interface_odu4(self):
360 response = test_utils_rfc8040.check_node_attribute_request(
361 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
362 self.assertEqual(response["status_code"], requests.codes.ok)
364 input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
365 "administrative-state": "inService",
366 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
367 "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
368 "type": "org-openroadm-interfaces:otnOdu",
369 "supporting-port": "L1"}
371 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
372 "rate": "org-openroadm-otn-common-types:ODU4",
374 "degthr-percentage": 100,
375 "monitoring-mode": "terminated"
377 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
378 response["interface"][0])
379 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
380 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
381 self.assertDictEqual(
382 {"payload-type": "07", "exp-payload-type": "07"},
383 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
385 # TODO: Check the trib-port numbering
387 def test_16_check_interface_odu4(self):
388 response = test_utils_rfc8040.check_node_attribute_request(
389 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
390 self.assertEqual(response["status_code"], requests.codes.ok)
392 input_dict_1 = {"name": "XPDR3-NETWORK1-ODU4",
393 "administrative-state": "inService",
394 "supporting-circuit-pack-name": "1/1/4-PLUG_NET",
395 "supporting-interface-list": "XPDR3-NETWORK1-ODUC1",
396 "type": "org-openroadm-interfaces:otnOdu",
397 "supporting-port": "L1"}
399 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
400 "rate": "org-openroadm-otn-common-types:ODU4",
402 "degthr-percentage": 100,
403 "monitoring-mode": "terminated"
405 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
406 response["interface"][0])
407 self.assertDictEqual(dict(input_dict_2, **response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]),
408 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"])
409 self.assertDictEqual(
410 {"payload-type": "07", "exp-payload-type": "07"},
411 response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
413 # TODO: Check the trib-port numbering
415 def test_17_check_interface_100ge_client(self):
416 response = test_utils_rfc8040.check_node_attribute_request(
417 "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
418 self.assertEqual(response["status_code"], requests.codes.ok)
419 input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
420 "administrative-state": "inService",
421 "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
422 "type": "org-openroadm-interfaces:ethernetCsmacd",
423 "supporting-port": "C1"
425 input_dict_2 = {"speed": 100000,
426 "fec": "org-openroadm-common-types:off"}
427 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
428 response["interface"][0])
429 self.assertDictEqual(dict(input_dict_2,
430 **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
431 response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
433 def test_18_check_interface_100ge_client(self):
434 response = test_utils_rfc8040.check_node_attribute_request(
435 "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
436 self.assertEqual(response["status_code"], requests.codes.ok)
437 input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
438 "administrative-state": "inService",
439 "supporting-circuit-pack-name": "1/1/3-PLUG-CLIENT",
440 "type": "org-openroadm-interfaces:ethernetCsmacd",
441 "supporting-port": "C1"
443 input_dict_2 = {"speed": 100000,
444 "fec": "org-openroadm-common-types:off"}
445 self.assertDictEqual(dict(input_dict_1, **response["interface"][0]),
446 response["interface"][0])
447 self.assertDictEqual(dict(input_dict_2,
448 **response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]),
449 response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
451 # Delete the service path
452 def test_19_service_path_delete_100ge(self):
453 response = test_utils_rfc8040.transportpce_api_rpc_request(
454 "transportpce-device-renderer", "service-path",
456 "service-name": "service_100GE_ofec",
458 "modulation-format": "dp-qpsk",
459 "operation": "delete",
461 "node-id": "XPDR-A2",
462 "src-tp": "XPDR3-CLIENT1",
463 "dest-tp": "XPDR3-NETWORK1"
466 "node-id": "XPDR-C2",
467 "src-tp": "XPDR3-CLIENT1",
468 "dest-tp": "XPDR3-NETWORK1"
470 "center-freq": 193.0,
474 "lower-spectral-slot-number": 265,
475 "higher-spectral-slot-number": 272,
477 self.assertEqual(response["status_code"], requests.codes.ok)
478 self.assertIn("Request processed", response["output"]["result"])
480 def test_20_check_no_interface_100ge_client(self):
481 response = test_utils_rfc8040.check_node_attribute_request(
482 "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
483 self.assertEqual(response["status_code"], requests.codes.conflict)
485 def test_21_check_no_interface_100ge_client(self):
486 response = test_utils_rfc8040.check_node_attribute_request(
487 "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
488 self.assertEqual(response["status_code"], requests.codes.conflict)
490 def test_22_check_no_interface_odu4(self):
491 response = test_utils_rfc8040.check_node_attribute_request(
492 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
493 self.assertEqual(response["status_code"], requests.codes.conflict)
495 def test_23_check_no_interface_odu4(self):
496 response = test_utils_rfc8040.check_node_attribute_request(
497 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
498 self.assertEqual(response["status_code"], requests.codes.conflict)
500 def test_24_check_no_interface_otuc2(self):
501 response = test_utils_rfc8040.check_node_attribute_request(
502 "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
503 self.assertEqual(response["status_code"], requests.codes.conflict)
505 def test_25_check_no_interface_otuc2(self):
506 response = test_utils_rfc8040.check_node_attribute_request(
507 "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
508 self.assertEqual(response["status_code"], requests.codes.conflict)
510 # Check if port-mapping data is updated, where the supporting-otucn is deleted
511 def test_26_check_no_otuc1(self):
512 response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR3-NETWORK1")
513 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
515 def test_27_check_no_otuc1(self):
516 response = test_utils_rfc8040.portmapping_request("XPDR-C2", "XPDR3-NETWORK1")
517 self.assertRaises(KeyError, lambda: response["supporting-otucn"])
519 def test_28_check_no_interface_otsig(self):
520 response = test_utils_rfc8040.check_node_attribute_request(
521 "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
522 self.assertEqual(response["status_code"], requests.codes.conflict)
524 def test_29_check_no_interface_otsig(self):
525 response = test_utils_rfc8040.check_node_attribute_request(
526 "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
527 self.assertEqual(response["status_code"], requests.codes.conflict)
529 def test_30_check_no_interface_otsi(self):
530 response = test_utils_rfc8040.check_node_attribute_request(
531 "XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
532 self.assertEqual(response["status_code"], requests.codes.conflict)
534 def test_31_check_no_interface_otsi(self):
535 response = test_utils_rfc8040.check_node_attribute_request(
536 "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
537 self.assertEqual(response["status_code"], requests.codes.conflict)
539 # Disconnect the XPDR
540 def test_32_xpdr_device_disconnection(self):
541 response = test_utils_rfc8040.unmount_device("XPDR-A2")
542 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
544 def test_33_xpdr_device_disconnected(self):
545 response = test_utils_rfc8040.check_device_connection("XPDR-A2")
546 self.assertEqual(response["status_code"], requests.codes.conflict)
547 self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
548 self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
549 self.assertEqual(response["connection-status"]["error-message"],
550 "Request could not be completed because the relevant data model content does not exist")
552 def test_34_xpdr_device_not_connected(self):
553 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
554 self.assertEqual(response["status_code"], requests.codes.conflict)
555 self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
556 self.assertEqual(response["node-info"]["error-tag"], "data-missing")
557 self.assertEqual(response["node-info"]["error-message"],
558 "Request could not be completed because the relevant data model content does not exist")
560 def test_35_xpdr_device_disconnection(self):
561 response = test_utils_rfc8040.unmount_device("XPDR-C2")
562 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
564 def test_36_xpdr_device_disconnected(self):
565 response = test_utils_rfc8040.check_device_connection("XPDR-C2")
566 self.assertEqual(response["status_code"], requests.codes.conflict)
567 self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
568 self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
569 self.assertEqual(response["connection-status"]["error-message"],
570 "Request could not be completed because the relevant data model content does not exist")
572 def test_37_xpdr_device_not_connected(self):
573 response = test_utils_rfc8040.get_portmapping_node_info("XPDR-C2")
574 self.assertEqual(response["status_code"], requests.codes.conflict)
575 self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
576 self.assertEqual(response["node-info"]["error-tag"], "data-missing")
577 self.assertEqual(response["node-info"]["error-message"],
578 "Request could not be completed because the relevant data model content does not exist")
581 if __name__ == "__main__":
582 unittest.main(verbosity=2)