2 ##############################################################################
3 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13 # some pylint false positives specific to tapi test
14 # pylint: disable=unsubscriptable-object
15 # pylint: disable=unsupported-assignment-operation
18 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils # nopep8
29 CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully'
32 class TransportTapitesting(unittest.TestCase):
36 NODE_VERSION = '2.2.1'
37 cr_serv_input_data = {
38 "sdnc-request-header": {
39 "request-id": "request-1",
40 "rpc-action": "service-create",
41 "request-system-id": "appname"
43 "service-name": "service1-OCH-OTU4",
44 "common-id": "commonId",
45 "connection-type": "infrastructure",
47 "service-rate": "100",
48 "node-id": "SPDR-SA1",
49 "service-format": "OTU",
50 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
54 "port-device-name": "SPDR-SA1-XPDR1",
56 "port-name": "XPDR1-NETWORK1",
57 "port-rack": "000000.00",
58 "port-shelf": "Chassis#1"
61 "lgx-device-name": "Some lgx-device-name",
62 "lgx-port-name": "Some lgx-port-name",
63 "lgx-port-rack": "000000.00",
64 "lgx-port-shelf": "00"
70 "port-device-name": "SPDR-SA1-XPDR1",
72 "port-name": "XPDR1-NETWORK1",
73 "port-rack": "000000.00",
74 "port-shelf": "Chassis#1"
77 "lgx-device-name": "Some lgx-device-name",
78 "lgx-port-name": "Some lgx-port-name",
79 "lgx-port-rack": "000000.00",
80 "lgx-port-shelf": "00"
87 "service-rate": "100",
88 "node-id": "SPDR-SC1",
89 "service-format": "OTU",
90 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
94 "port-device-name": "SPDR-SC1-XPDR1",
96 "port-name": "XPDR1-NETWORK1",
97 "port-rack": "000000.00",
98 "port-shelf": "Chassis#1"
101 "lgx-device-name": "Some lgx-device-name",
102 "lgx-port-name": "Some lgx-port-name",
103 "lgx-port-rack": "000000.00",
104 "lgx-port-shelf": "00"
110 "port-device-name": "SPDR-SC1-XPDR1",
111 "port-type": "fixed",
112 "port-name": "XPDR1-NETWORK1",
113 "port-rack": "000000.00",
114 "port-shelf": "Chassis#1"
117 "lgx-device-name": "Some lgx-device-name",
118 "lgx-port-name": "Some lgx-port-name",
119 "lgx-port-rack": "000000.00",
120 "lgx-port-shelf": "00"
126 "due-date": "2018-06-15T00:00:01Z",
127 "operator-contact": "pw1234"
130 del_serv_input_data = {
131 "sdnc-request-header": {
132 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
133 "rpc-action": "service-delete",
134 "request-system-id": "appname",
135 "notification-url": "http://localhost:8585/NotificationServer/notify"},
136 "service-delete-req-info": {
137 "service-name": "TBD",
138 "tail-retention": "no"}
141 tapi_topo = {"topology-id-or-name": "TBD"}
145 cls.init_failed = False
146 os.environ['JAVA_MIN_MEM'] = '1024M'
147 os.environ['JAVA_MAX_MEM'] = '4096M'
148 cls.processes = test_utils.start_tpce()
149 # TAPI feature is not installed by default in Karaf
150 if "NO_ODL_STARTUP" not in os.environ or "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
151 print("installing tapi feature...")
152 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
153 if result.returncode != 0:
154 cls.init_failed = True
155 print("Restarting OpenDaylight...")
156 test_utils.shutdown_process(cls.processes[0])
157 cls.processes[0] = test_utils.start_karaf()
158 test_utils.process_list[0] = cls.processes[0]
159 cls.init_failed = not test_utils.wait_until_log_contains(
160 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
162 print("tapi installation feature failed...")
163 test_utils.shutdown_process(cls.processes[0])
165 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
166 ('roadma', cls.NODE_VERSION),
167 ('roadmb', cls.NODE_VERSION),
168 ('roadmc', cls.NODE_VERSION),
169 ('xpdrc', cls.NODE_VERSION),
170 ('spdra', cls.NODE_VERSION),
171 ('spdrc', cls.NODE_VERSION)])
174 def tearDownClass(cls):
175 # pylint: disable=not-an-iterable
176 for process in cls.processes:
177 test_utils.shutdown_process(process)
178 print("all processes killed")
180 def setUp(self): # instruction executed before each test method
182 self.fail('Feature installation failed')
183 # pylint: disable=consider-using-f-string
184 print("execution of {}".format(self.id().split(".")[-1]))
186 def test_01_get_tapi_topology_T100G(self):
187 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
188 response = test_utils.transportpce_api_rpc_request(
189 'tapi-topology', 'get-topology-details', self.tapi_topo)
190 self.assertEqual(response['status_code'], requests.codes.ok)
191 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
192 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
193 self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
194 'Node should contain no owned-node-edge-points')
195 self.assertEqual("Tpdr100g over WDM node", response["output"]["topology"]["node"][0]["name"][0]["value"],
196 'node name should be: Tpdr100g over WDM node')
197 self.assertIn("ETH", response["output"]["topology"]["node"][0]["layer-protocol-name"],
198 'Node layer protocol should contain ETH')
199 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
200 'node should contain 1 node rule group')
202 def test_02_get_tapi_topology_T0(self):
203 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
204 response = test_utils.transportpce_api_rpc_request(
205 'tapi-topology', 'get-topology-details', self.tapi_topo)
206 self.assertEqual(response['status_code'], requests.codes.ok)
207 self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
208 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
210 def test_03_connect_rdmb(self):
211 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
212 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
214 def test_04_check_tapi_topos(self):
215 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
216 response = test_utils.transportpce_api_rpc_request(
217 'tapi-topology', 'get-topology-details', self.tapi_topo)
218 self.assertEqual(response['status_code'], requests.codes.ok)
219 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
220 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
222 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
223 response = test_utils.transportpce_api_rpc_request(
224 'tapi-topology', 'get-topology-details', self.tapi_topo)
225 self.assertEqual(response['status_code'], requests.codes.ok)
226 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
227 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
229 def test_05_disconnect_roadmb(self):
230 response = test_utils.unmount_device("ROADM-B1")
231 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
233 def test_06_connect_xpdra(self):
234 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
235 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
237 def test_07_check_tapi_topos(self):
238 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
239 response = test_utils.transportpce_api_rpc_request(
240 'tapi-topology', 'get-topology-details', self.tapi_topo)
241 self.assertEqual(response['status_code'], requests.codes.ok)
242 self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
243 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
245 def test_08_connect_rdma(self):
246 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
247 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
249 def test_09_connect_rdmc(self):
250 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
251 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
253 def test_10_check_tapi_topos(self):
254 self.test_01_get_tapi_topology_T100G()
256 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
257 response = test_utils.transportpce_api_rpc_request(
258 'tapi-topology', 'get-topology-details', self.tapi_topo)
259 self.assertEqual(response['status_code'], requests.codes.ok)
260 self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
261 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
262 self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
263 'node name should be: ROADM-infra')
264 self.assertIn("PHOTONIC_MEDIA", response["output"]["topology"]["node"][0]["layer-protocol-name"],
265 'Node layer protocol should contain PHOTONIC_MEDIA')
266 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
267 'node should contain 1 node rule group')
269 def test_11_connect_xprda_n1_to_roadma_pp1(self):
270 response = test_utils.transportpce_api_rpc_request(
271 'transportpce-networkutils', 'init-xpdr-rdm-links',
272 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
273 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
274 self.assertEqual(response['status_code'], requests.codes.ok)
275 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
276 CREATED_SUCCESSFULLY)
279 def test_12_connect_roadma_pp1_to_xpdra_n1(self):
280 response = test_utils.transportpce_api_rpc_request(
281 'transportpce-networkutils', 'init-rdm-xpdr-links',
282 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
283 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
284 self.assertEqual(response['status_code'], requests.codes.ok)
285 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
286 CREATED_SUCCESSFULLY)
289 def test_13_check_tapi_topology_T100G(self):
290 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
291 response = test_utils.transportpce_api_rpc_request(
292 'tapi-topology', 'get-topology-details', self.tapi_topo)
293 self.assertEqual(response['status_code'], requests.codes.ok)
294 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
295 'Node should contain 1 owned-node-edge-points')
296 self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
297 response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
298 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
300 def test_14_check_tapi_topology_T0(self):
301 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
302 response = test_utils.transportpce_api_rpc_request(
303 'tapi-topology', 'get-topology-details', self.tapi_topo)
304 self.assertEqual(response['status_code'], requests.codes.ok)
305 nodes = response["output"]["topology"]["node"]
306 links = response["output"]["topology"]["link"]
307 self.assertEqual(3, len(nodes), 'Topology should contain 3 nodes')
308 self.assertEqual(2, len(links), 'Topology should contain 2 links')
309 self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
310 'Topology should contain 2 otsi nodes')
311 self.assertEqual(1, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
312 'Topology should contain 1 dsr node')
313 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
314 'Topology should contain 1 transitional link')
315 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
316 'Topology should contain 1 oms link')
318 def test_15_connect_xpdrc(self):
319 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
320 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
322 def test_16_connect_xprdc_n1_to_roadmc_pp1(self):
323 response = test_utils.transportpce_api_rpc_request(
324 'transportpce-networkutils', 'init-xpdr-rdm-links',
325 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
326 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
327 self.assertEqual(response['status_code'], requests.codes.ok)
328 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
329 CREATED_SUCCESSFULLY)
332 def test_17_connect_roadmc_pp1_to_xpdrc_n1(self):
333 response = test_utils.transportpce_api_rpc_request(
334 'transportpce-networkutils', 'init-rdm-xpdr-links',
335 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
336 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
337 self.assertEqual(response['status_code'], requests.codes.ok)
338 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
339 CREATED_SUCCESSFULLY)
342 def test_18_check_tapi_topology_T100G(self):
343 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
344 response = test_utils.transportpce_api_rpc_request(
345 'tapi-topology', 'get-topology-details', self.tapi_topo)
346 self.assertEqual(response['status_code'], requests.codes.ok)
347 self.assertEqual(2, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
348 'Node should contain 2 owned-node-edge-points')
349 self.assertEqual("XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1",
350 response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
351 'name of owned-node-edge-points should be XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1')
352 self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
353 response["output"]["topology"]["node"][0]["owned-node-edge-point"][1]["name"][0]["value"],
354 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
356 def test_19_check_tapi_topology_T0(self):
357 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
358 response = test_utils.transportpce_api_rpc_request(
359 'tapi-topology', 'get-topology-details', self.tapi_topo)
360 self.assertEqual(response['status_code'], requests.codes.ok)
361 nodes = response["output"]["topology"]["node"]
362 links = response["output"]["topology"]["link"]
363 self.assertEqual(5, len(nodes), 'Topology should contain 5 nodes')
364 self.assertEqual(4, len(links), 'Topology should contain 4 links')
365 self.assertEqual(3, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
366 'Topology should contain 3 otsi nodes')
367 self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
368 'Topology should contain 2 dsr nodes')
369 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
370 'Topology should contain 2 transitional links')
371 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
372 'Topology should contain 2 oms links')
374 def test_20_connect_spdr_sa1(self):
375 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
376 self.assertEqual(response.status_code,
377 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
379 def test_21_connect_spdr_sc1(self):
380 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
381 self.assertEqual(response.status_code,
382 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
384 def test_22_check_tapi_topology_T100G(self):
385 self.test_18_check_tapi_topology_T100G()
387 def test_23_check_tapi_topology_T0(self):
388 self.test_19_check_tapi_topology_T0()
390 def test_24_connect_sprda_n1_to_roadma_pp2(self):
391 response = test_utils.transportpce_api_rpc_request(
392 'transportpce-networkutils', 'init-xpdr-rdm-links',
393 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
394 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
395 self.assertEqual(response['status_code'], requests.codes.ok)
396 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
397 CREATED_SUCCESSFULLY)
400 def test_25_connect_roadma_pp2_to_spdra_n1(self):
401 response = test_utils.transportpce_api_rpc_request(
402 'transportpce-networkutils', 'init-rdm-xpdr-links',
403 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
404 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
405 self.assertEqual(response['status_code'], requests.codes.ok)
406 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
407 CREATED_SUCCESSFULLY)
410 def test_26_connect_sprdc_n1_to_roadmc_pp2(self):
411 response = test_utils.transportpce_api_rpc_request(
412 'transportpce-networkutils', 'init-xpdr-rdm-links',
413 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
414 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
415 self.assertEqual(response['status_code'], requests.codes.ok)
416 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
417 CREATED_SUCCESSFULLY)
420 def test_27_connect_roadmc_pp2_to_spdrc_n1(self):
421 response = test_utils.transportpce_api_rpc_request(
422 'transportpce-networkutils', 'init-rdm-xpdr-links',
423 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
424 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
425 self.assertEqual(response['status_code'], requests.codes.ok)
426 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
427 CREATED_SUCCESSFULLY)
430 def test_28_check_tapi_topology_T100G(self):
431 self.test_18_check_tapi_topology_T100G()
433 def test_29_check_tapi_topology_T0(self):
434 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
435 response = test_utils.transportpce_api_rpc_request(
436 'tapi-topology', 'get-topology-details', self.tapi_topo)
437 self.assertEqual(response['status_code'], requests.codes.ok)
438 nodes = response["output"]["topology"]["node"]
439 links = response["output"]["topology"]["link"]
440 self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
441 self.assertEqual(8, len(links), 'Topology should contain 8 links')
442 self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
443 'Topology should contain 5 otsi nodes')
444 self.assertEqual(4, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
445 'Topology should contain 4 dsr nodes')
446 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
447 'Topology should contain 4 transitional links')
448 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
449 'Topology should contain 4 oms links')
451 def test_30_add_oms_attributes(self):
452 # Config ROADMA-ROADMC oms-attributes
454 "auto-spanloss": "true",
455 "spanloss-base": 11.4,
456 "spanloss-current": 12,
457 "engineered-spanloss": 12.2,
458 "link-concatenation": [{
461 "SRLG-length": 100000,
463 response = test_utils.add_oms_attr_request(
464 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
465 self.assertEqual(response.status_code, requests.codes.created)
466 # Config ROADMC-ROADMA oms-attributes
468 "auto-spanloss": "true",
469 "spanloss-base": 11.4,
470 "spanloss-current": 12,
471 "engineered-spanloss": 12.2,
472 "link-concatenation": [{
475 "SRLG-length": 100000,
477 response = test_utils.add_oms_attr_request(
478 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
479 self.assertEqual(response.status_code, requests.codes.created)
481 def test_31_create_OCH_OTU4_service(self):
482 response = test_utils.transportpce_api_rpc_request(
483 'org-openroadm-service', 'service-create',
484 self.cr_serv_input_data)
485 self.assertEqual(response['status_code'], requests.codes.ok)
486 self.assertIn('PCE calculation in progress',
487 response['output']['configuration-response-common']['response-message'])
488 time.sleep(self.WAITING)
490 def test_32_check_tapi_topology_T0(self):
491 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
492 response = test_utils.transportpce_api_rpc_request(
493 'tapi-topology', 'get-topology-details', self.tapi_topo)
494 self.assertEqual(response['status_code'], requests.codes.ok)
495 nodes = response["output"]["topology"]["node"]
496 links = response["output"]["topology"]["link"]
497 self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
498 self.assertEqual(9, len(links), 'Topology should contain 9 links')
499 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
500 'Topology should contain 4 transitional links')
501 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
502 'Topology should contain 4 oms links')
503 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "otn link name"),
504 'Topology should contain 1 otn link')
506 if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
507 self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
508 'OTU4 link should have an available capacity of 100 000 Mbps')
509 elif link["name"][0]["value-name"] == "transitional link name":
510 self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
511 'link should have an available capacity of 100 Gbps')
512 self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
514 def test_33_create_ODU4_service(self):
515 self.cr_serv_input_data["service-name"] = "service1-ODU4"
516 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
517 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
518 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
519 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
520 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
521 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
523 response = test_utils.transportpce_api_rpc_request(
524 'org-openroadm-service', 'service-create',
525 self.cr_serv_input_data)
526 self.assertEqual(response['status_code'], requests.codes.ok)
527 self.assertIn('PCE calculation in progress',
528 response['output']['configuration-response-common']['response-message'])
529 time.sleep(self.WAITING)
531 def test_34_check_tapi_topology_T0(self):
532 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
533 response = test_utils.transportpce_api_rpc_request(
534 'tapi-topology', 'get-topology-details', self.tapi_topo)
535 self.assertEqual(response['status_code'], requests.codes.ok)
536 nodes = response["output"]["topology"]["node"]
537 links = response["output"]["topology"]["link"]
538 self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
539 self.assertEqual(10, len(links), 'Topology should contain 10 links')
540 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
541 'Topology should contain 4 transitional links')
542 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
543 'Topology should contain 4 oms links')
544 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "otn link name"),
545 'Topology should contain 2 otn links')
547 if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
548 self.assertEqual(0, link["available-capacity"]["total-size"]["value"],
549 'OTU4 link should have an available capacity of 0 Mbps')
550 elif link["name"][0]["value"] == "ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
551 self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
552 'ODU4 link should have an available capacity of 100 000 Mbps')
553 elif link["name"][0]["value-name"] == "transitional link name":
554 self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
555 'link should have an available capacity of 100 Gbps')
556 self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
558 def test_35_connect_sprda_2_n2_to_roadma_pp3(self):
559 response = test_utils.transportpce_api_rpc_request(
560 'transportpce-networkutils', 'init-xpdr-rdm-links',
561 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
562 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
563 self.assertEqual(response['status_code'], requests.codes.ok)
564 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
565 CREATED_SUCCESSFULLY)
568 def test_36_connect_roadma_pp3_to_spdra_2_n2(self):
569 response = test_utils.transportpce_api_rpc_request(
570 'transportpce-networkutils', 'init-rdm-xpdr-links',
571 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
572 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
573 self.assertEqual(response['status_code'], requests.codes.ok)
574 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
575 CREATED_SUCCESSFULLY)
578 def test_37_check_tapi_topology_T0(self):
579 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
580 response = test_utils.transportpce_api_rpc_request(
581 'tapi-topology', 'get-topology-details', self.tapi_topo)
582 self.assertEqual(response['status_code'], requests.codes.ok)
583 nodes = response["output"]["topology"]["node"]
584 links = response["output"]["topology"]["link"]
585 self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes')
586 self.assertEqual(12, len(links), 'Topology should contain 12 links')
587 self.assertEqual(6, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
588 'Topology should contain 6 otsi nodes')
589 self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
590 'Topology should contain 5 dsr nodes')
591 self.assertEqual(5, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
592 'Topology should contain 5 transitional links')
593 self.assertEqual(5, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
594 'Topology should contain 5 oms links')
595 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "otn link name"),
596 'Topology should contain 2 otn links')
598 def test_38_delete_ODU4_service(self):
599 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
600 response = test_utils.transportpce_api_rpc_request(
601 'org-openroadm-service', 'service-delete',
602 self.del_serv_input_data)
603 self.assertEqual(response['status_code'], requests.codes.ok)
604 self.assertIn('Renderer service delete in progress',
605 response['output']['configuration-response-common']['response-message'])
606 time.sleep(self.WAITING)
608 def test_39_delete_OCH_OTU4_service(self):
609 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
610 response = test_utils.transportpce_api_rpc_request(
611 'org-openroadm-service', 'service-delete',
612 self.del_serv_input_data)
613 self.assertEqual(response['status_code'], requests.codes.ok)
614 self.assertIn('Renderer service delete in progress',
615 response['output']['configuration-response-common']['response-message'])
616 time.sleep(self.WAITING)
618 def test_40_check_tapi_topology_T0(self):
619 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
620 response = test_utils.transportpce_api_rpc_request(
621 'tapi-topology', 'get-topology-details', self.tapi_topo)
622 self.assertEqual(response['status_code'], requests.codes.ok)
623 nodes = response["output"]["topology"]["node"]
624 links = response["output"]["topology"]["link"]
625 self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes')
626 self.assertEqual(10, len(links), 'Topology should contain 10 links')
627 self.assertEqual(0, count_object_with_double_key(links, "name", "value-name", "otn link name"),
628 'Topology should contain 0 otn link')
630 def test_41_disconnect_xponders_from_roadm(self):
631 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
632 self.assertEqual(response['status_code'], requests.codes.ok)
633 links = response['network'][0]['ietf-network-topology:link']
635 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
636 response = test_utils.del_ietf_network_link_request(
637 'openroadm-topology', link['link-id'], 'config')
638 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
640 def test_42_check_tapi_topology_T0(self):
641 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
642 response = test_utils.transportpce_api_rpc_request(
643 'tapi-topology', 'get-topology-details', self.tapi_topo)
644 self.assertEqual(response['status_code'], requests.codes.ok)
645 self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
646 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
647 self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
648 'node name should be: ROADM-infra')
650 def test_43_get_tapi_topology_T100G(self):
651 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
652 response = test_utils.transportpce_api_rpc_request(
653 'tapi-topology', 'get-topology-details', self.tapi_topo)
654 self.assertEqual(response['status_code'], requests.codes.ok)
655 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
656 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
657 self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
658 'Node should contain no owned-node-edge-points')
660 def test_44_disconnect_roadma(self):
661 response = test_utils.unmount_device("ROADM-A1")
662 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
664 def test_45_disconnect_roadmc(self):
665 response = test_utils.unmount_device("ROADM-C1")
666 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
668 def test_46_check_tapi_topos(self):
669 self.test_01_get_tapi_topology_T100G()
670 self.test_02_get_tapi_topology_T0()
672 def test_47_disconnect_xpdra(self):
673 response = test_utils.unmount_device("XPDR-A1")
674 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
676 def test_48_disconnect_xpdrc(self):
677 response = test_utils.unmount_device("XPDR-C1")
678 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
680 def test_49_disconnect_spdr_sa1(self):
681 response = test_utils.unmount_device("SPDR-SA1")
682 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
684 def test_50_disconnect_spdr_sc1(self):
685 response = test_utils.unmount_device("SPDR-SC1")
686 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
689 def count_object_with_double_key(list_dicts, key1, key2, value):
691 for dictio in list_dicts:
692 if dictio[key1][0][key2] == value:
697 if __name__ == "__main__":
698 unittest.main(verbosity=2)