2 ##############################################################################
3 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13 # some pylint false positives specific to tapi test
14 # pylint: disable=unsubscriptable-object
15 # pylint: disable=unsupported-assignment-operation
18 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils # nopep8
29 CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully'
32 class TransportTapitesting(unittest.TestCase):
36 NODE_VERSION = '2.2.1'
37 cr_serv_input_data = {
38 "sdnc-request-header": {
39 "request-id": "request-1",
40 "rpc-action": "service-create",
41 "request-system-id": "appname"
43 "service-name": "service1-OCH-OTU4",
44 "common-id": "commonId",
45 "connection-type": "infrastructure",
47 "service-rate": "100",
48 "node-id": "SPDR-SA1",
49 "service-format": "OTU",
50 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
54 "port-device-name": "SPDR-SA1-XPDR1",
56 "port-name": "XPDR1-NETWORK1",
57 "port-rack": "000000.00",
58 "port-shelf": "Chassis#1"
61 "lgx-device-name": "Some lgx-device-name",
62 "lgx-port-name": "Some lgx-port-name",
63 "lgx-port-rack": "000000.00",
64 "lgx-port-shelf": "00"
70 "port-device-name": "SPDR-SA1-XPDR1",
72 "port-name": "XPDR1-NETWORK1",
73 "port-rack": "000000.00",
74 "port-shelf": "Chassis#1"
77 "lgx-device-name": "Some lgx-device-name",
78 "lgx-port-name": "Some lgx-port-name",
79 "lgx-port-rack": "000000.00",
80 "lgx-port-shelf": "00"
87 "service-rate": "100",
88 "node-id": "SPDR-SC1",
89 "service-format": "OTU",
90 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
94 "port-device-name": "SPDR-SC1-XPDR1",
96 "port-name": "XPDR1-NETWORK1",
97 "port-rack": "000000.00",
98 "port-shelf": "Chassis#1"
101 "lgx-device-name": "Some lgx-device-name",
102 "lgx-port-name": "Some lgx-port-name",
103 "lgx-port-rack": "000000.00",
104 "lgx-port-shelf": "00"
110 "port-device-name": "SPDR-SC1-XPDR1",
111 "port-type": "fixed",
112 "port-name": "XPDR1-NETWORK1",
113 "port-rack": "000000.00",
114 "port-shelf": "Chassis#1"
117 "lgx-device-name": "Some lgx-device-name",
118 "lgx-port-name": "Some lgx-port-name",
119 "lgx-port-rack": "000000.00",
120 "lgx-port-shelf": "00"
126 "due-date": "2018-06-15T00:00:01Z",
127 "operator-contact": "pw1234"
130 del_serv_input_data = {
131 "sdnc-request-header": {
132 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
133 "rpc-action": "service-delete",
134 "request-system-id": "appname",
135 "notification-url": "http://localhost:8585/NotificationServer/notify"},
136 "service-delete-req-info": {
137 "service-name": "TBD",
138 "tail-retention": "no"}
141 tapi_topo = {"topology-id": "TBD"}
145 cls.init_failed = False
146 os.environ['JAVA_MIN_MEM'] = '1024M'
147 os.environ['JAVA_MAX_MEM'] = '4096M'
148 cls.processes = test_utils.start_tpce()
149 # TAPI feature is not installed by default in Karaf
150 if "NO_ODL_STARTUP" not in os.environ or "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
151 print("installing tapi feature...")
152 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
153 if result.returncode != 0:
154 cls.init_failed = True
155 print("Restarting OpenDaylight...")
156 test_utils.shutdown_process(cls.processes[0])
157 cls.processes[0] = test_utils.start_karaf()
158 test_utils.process_list[0] = cls.processes[0]
159 cls.init_failed = not test_utils.wait_until_log_contains(
160 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
162 print("tapi installation feature failed...")
163 test_utils.shutdown_process(cls.processes[0])
165 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
166 ('roadma', cls.NODE_VERSION),
167 ('roadmb', cls.NODE_VERSION),
168 ('roadmc', cls.NODE_VERSION),
169 ('xpdrc', cls.NODE_VERSION),
170 ('spdra', cls.NODE_VERSION),
171 ('spdrc', cls.NODE_VERSION)])
174 def tearDownClass(cls):
175 # pylint: disable=not-an-iterable
176 for process in cls.processes:
177 test_utils.shutdown_process(process)
178 print("all processes killed")
180 def setUp(self): # instruction executed before each test method
182 self.fail('Feature installation failed')
183 # pylint: disable=consider-using-f-string
184 print("execution of {}".format(self.id().split(".")[-1]))
186 def test_01_get_tapi_topology_T100G(self):
187 self.tapi_topo["topology-id"] = test_utils.T100GE_UUID
188 response = test_utils.transportpce_api_rpc_request(
189 'tapi-topology', 'get-topology-details', self.tapi_topo)
190 self.assertEqual(response['status_code'], requests.codes.ok)
191 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
192 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
193 self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
194 'Node should contain no owned-node-edge-points')
195 self.assertEqual("Tpdr100g over WDM node", response["output"]["topology"]["node"][0]["name"][0]["value"],
196 'node name should be: Tpdr100g over WDM node')
197 self.assertIn("ETH", response["output"]["topology"]["node"][0]["layer-protocol-name"],
198 'Node layer protocol should contain ETH')
199 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
200 'node should contain 1 node rule group')
202 def test_02_get_tapi_topology_T0(self):
203 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
204 response = test_utils.transportpce_api_rpc_request(
205 'tapi-topology', 'get-topology-details', self.tapi_topo)
206 self.assertEqual(response['status_code'], requests.codes.ok)
207 self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
208 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
210 def test_03_connect_rdmb(self):
211 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
212 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
214 def test_04_check_tapi_topos(self):
215 self.tapi_topo["topology-id"] = test_utils.T100GE_UUID
216 response = test_utils.transportpce_api_rpc_request(
217 'tapi-topology', 'get-topology-details', self.tapi_topo)
218 self.assertEqual(response['status_code'], requests.codes.ok)
219 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
220 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
222 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
223 response = test_utils.transportpce_api_rpc_request(
224 'tapi-topology', 'get-topology-details', self.tapi_topo)
225 self.assertEqual(response['status_code'], requests.codes.ok)
226 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
227 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
229 def test_05_disconnect_roadmb(self):
230 response = test_utils.unmount_device("ROADM-B1")
231 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
233 def test_06_connect_xpdra(self):
234 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
235 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
237 def test_07_check_tapi_topos(self):
238 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
239 response = test_utils.transportpce_api_rpc_request(
240 'tapi-topology', 'get-topology-details', self.tapi_topo)
241 self.assertEqual(response['status_code'], requests.codes.ok)
242 self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
243 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
245 def test_08_connect_rdma(self):
246 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
247 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
249 def test_09_connect_rdmc(self):
250 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
251 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
253 def test_10_check_tapi_topos(self):
254 self.test_01_get_tapi_topology_T100G()
255 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
256 response = test_utils.transportpce_api_rpc_request(
257 'tapi-topology', 'get-topology-details', self.tapi_topo)
258 self.assertEqual(response['status_code'], requests.codes.ok)
259 self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
260 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
261 # self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
262 # 'node name should be: ROADM-infra')
263 nodes = response["output"]["topology"]["node"]
264 self.assertEqual("ROADM-infra",
265 response["output"]["topology"]["node"][0]["name"][count_position_name_from_value_name(
266 response["output"]["topology"]["node"][0]["name"], "otsi node name")]["value"],
267 'node name should be: ROADM-infra')
268 self.assertEqual(1, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
269 'Topology should contain 1 otsi nodes')
270 self.assertIn("PHOTONIC_MEDIA", response["output"]["topology"]["node"][0]["layer-protocol-name"],
271 'Node layer protocol should contain PHOTONIC_MEDIA')
272 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
273 'node should contain 1 node rule group')
275 def test_11_connect_xpdra_n1_to_roadma_pp1(self):
276 response = test_utils.transportpce_api_rpc_request(
277 'transportpce-networkutils', 'init-xpdr-rdm-links',
278 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
279 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
280 self.assertEqual(response['status_code'], requests.codes.ok)
281 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
282 CREATED_SUCCESSFULLY)
285 def test_12_connect_roadma_pp1_to_xpdra_n1(self):
286 response = test_utils.transportpce_api_rpc_request(
287 'transportpce-networkutils', 'init-rdm-xpdr-links',
288 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
289 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
290 self.assertEqual(response['status_code'], requests.codes.ok)
291 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
292 CREATED_SUCCESSFULLY)
295 def test_13_check_tapi_topology_T100G(self):
296 self.tapi_topo["topology-id"] = test_utils.T100GE_UUID
297 response = test_utils.transportpce_api_rpc_request(
298 'tapi-topology', 'get-topology-details', self.tapi_topo)
299 self.assertEqual(response['status_code'], requests.codes.ok)
300 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
301 'Node should contain 1 owned-node-edge-points')
302 self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
303 response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
304 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
306 def test_14_check_tapi_topology_T0(self):
307 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
308 response = test_utils.transportpce_api_rpc_request(
309 'tapi-topology', 'get-topology-details', self.tapi_topo)
310 self.assertEqual(response['status_code'], requests.codes.ok)
311 nodes = response["output"]["topology"]["node"]
312 links = response["output"]["topology"]["link"]
313 self.assertEqual(2, len(nodes), 'Topology should contain 2 nodes')
314 self.assertEqual(1, len(links), 'Topology should contain 1 link')
315 self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
316 'Topology should contain 2 otsi nodes')
317 self.assertEqual(1, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
318 'Topology should contain 1 dsr node')
319 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "OTS link name"),
320 'Topology should contain 1 ots link')
322 def test_15_connect_xpdrc(self):
323 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
324 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
326 def test_16_connect_xpdrc_n1_to_roadmc_pp1(self):
327 response = test_utils.transportpce_api_rpc_request(
328 'transportpce-networkutils', 'init-xpdr-rdm-links',
329 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
330 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
331 self.assertEqual(response['status_code'], requests.codes.ok)
332 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
333 CREATED_SUCCESSFULLY)
336 def test_17_connect_roadmc_pp1_to_xpdrc_n1(self):
337 response = test_utils.transportpce_api_rpc_request(
338 'transportpce-networkutils', 'init-rdm-xpdr-links',
339 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
340 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
341 self.assertEqual(response['status_code'], requests.codes.ok)
342 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
343 CREATED_SUCCESSFULLY)
346 def test_18_check_tapi_topology_T100G(self):
347 self.tapi_topo["topology-id"] = test_utils.T100GE_UUID
348 response = test_utils.transportpce_api_rpc_request(
349 'tapi-topology', 'get-topology-details', self.tapi_topo)
350 self.assertEqual(response['status_code'], requests.codes.ok)
351 self.assertEqual(2, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
352 'Node should contain 2 owned-node-edge-points')
353 self.assertEqual("XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1",
354 response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
355 'name of owned-node-edge-points should be XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1')
356 self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
357 response["output"]["topology"]["node"][0]["owned-node-edge-point"][1]["name"][0]["value"],
358 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
360 def test_19_check_tapi_topology_T0(self):
361 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
362 response = test_utils.transportpce_api_rpc_request(
363 'tapi-topology', 'get-topology-details', self.tapi_topo)
364 self.assertEqual(response['status_code'], requests.codes.ok)
365 nodes = response["output"]["topology"]["node"]
366 links = response["output"]["topology"]["link"]
367 self.assertEqual(3, len(nodes), 'Topology should contain 3 nodes')
368 self.assertEqual(2, len(links), 'Topology should contain 2 links')
369 self.assertEqual(3, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
370 'Topology should contain 3 otsi nodes')
371 self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
372 'Topology should contain 2 dsr nodes')
373 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "OTS link name"),
374 'Topology should contain 2 ots links')
376 def test_20_connect_spdr_sa1(self):
377 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
378 self.assertEqual(response.status_code,
379 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
381 def test_21_connect_spdr_sc1(self):
382 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
383 self.assertEqual(response.status_code,
384 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
386 def test_22_check_tapi_topology_T100G(self):
387 self.test_18_check_tapi_topology_T100G()
389 def test_23_check_tapi_topology_T0(self):
390 self.test_19_check_tapi_topology_T0()
392 def test_24_connect_sprda_n1_to_roadma_pp2(self):
393 response = test_utils.transportpce_api_rpc_request(
394 'transportpce-networkutils', 'init-xpdr-rdm-links',
395 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
396 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
397 self.assertEqual(response['status_code'], requests.codes.ok)
398 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
399 CREATED_SUCCESSFULLY)
402 def test_25_connect_roadma_pp2_to_spdra_n1(self):
403 response = test_utils.transportpce_api_rpc_request(
404 'transportpce-networkutils', 'init-rdm-xpdr-links',
405 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
406 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
407 self.assertEqual(response['status_code'], requests.codes.ok)
408 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
409 CREATED_SUCCESSFULLY)
412 def test_26_connect_sprdc_n1_to_roadmc_pp2(self):
413 response = test_utils.transportpce_api_rpc_request(
414 'transportpce-networkutils', 'init-xpdr-rdm-links',
415 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
416 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
417 self.assertEqual(response['status_code'], requests.codes.ok)
418 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
419 CREATED_SUCCESSFULLY)
422 def test_27_connect_roadmc_pp2_to_spdrc_n1(self):
423 response = test_utils.transportpce_api_rpc_request(
424 'transportpce-networkutils', 'init-rdm-xpdr-links',
425 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
426 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
427 self.assertEqual(response['status_code'], requests.codes.ok)
428 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
429 CREATED_SUCCESSFULLY)
432 def test_28_check_tapi_topology_T100G(self):
433 self.test_18_check_tapi_topology_T100G()
435 def test_29_check_tapi_topology_T0(self):
436 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
437 response = test_utils.transportpce_api_rpc_request(
438 'tapi-topology', 'get-topology-details', self.tapi_topo)
439 self.assertEqual(response['status_code'], requests.codes.ok)
440 nodes = response["output"]["topology"]["node"]
441 links = response["output"]["topology"]["link"]
442 self.assertEqual(5, len(nodes), 'Topology should contain 5 nodes')
443 self.assertEqual(4, len(links), 'Topology should contain 4 links')
444 self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
445 'Topology should contain 5 otsi nodes')
446 self.assertEqual(4, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
447 'Topology should contain 4 dsr nodes')
448 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OTS link name"),
449 'Topology should contain 4 ots links')
451 def test_30_add_oms_attributes(self):
452 # Config ROADMA-ROADMC oms-attributes
454 "auto-spanloss": "true",
455 "spanloss-base": 11.4,
456 "spanloss-current": 12,
457 "engineered-spanloss": 12.2,
458 "link-concatenation": [{
461 "SRLG-length": 100000,
463 response = test_utils.add_oms_attr_request(
464 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
465 self.assertEqual(response.status_code, requests.codes.created)
466 # Config ROADMC-ROADMA oms-attributes
468 "auto-spanloss": "true",
469 "spanloss-base": 11.4,
470 "spanloss-current": 12,
471 "engineered-spanloss": 12.2,
472 "link-concatenation": [{
475 "SRLG-length": 100000,
477 response = test_utils.add_oms_attr_request(
478 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
479 self.assertEqual(response.status_code, requests.codes.created)
481 def test_31_create_OCH_OTU4_service(self):
482 response = test_utils.transportpce_api_rpc_request(
483 'org-openroadm-service', 'service-create',
484 self.cr_serv_input_data)
485 self.assertEqual(response['status_code'], requests.codes.ok)
486 self.assertIn('PCE calculation in progress',
487 response['output']['configuration-response-common']['response-message'])
488 time.sleep(self.WAITING)
490 def test_32_check_tapi_topology_T0(self):
491 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
492 response = test_utils.transportpce_api_rpc_request(
493 'tapi-topology', 'get-topology-details', self.tapi_topo)
494 self.assertEqual(response['status_code'], requests.codes.ok)
495 nodes = response["output"]["topology"]["node"]
496 links = response["output"]["topology"]["link"]
497 self.assertEqual(5, len(nodes), 'Topology should contain 5 nodes')
498 self.assertEqual(5, len(links), 'Topology should contain 5 links')
499 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OTS link name"),
500 'Topology should contain 4 ots links')
501 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "otn link name"),
502 'Topology should contain 1 otn link')
504 if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
505 self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
506 'OTU4 link should have an available capacity of 100 000 Mbps')
507 # elif link["name"][0]["value-name"] == "transitional link name":
508 # self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
509 # 'link should have an available capacity of 100 Gbps')
510 self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
512 def test_33_create_ODU4_service(self):
513 self.cr_serv_input_data["service-name"] = "service1-ODU4"
514 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
515 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
516 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
517 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
518 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
519 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
521 response = test_utils.transportpce_api_rpc_request(
522 'org-openroadm-service', 'service-create',
523 self.cr_serv_input_data)
524 self.assertEqual(response['status_code'], requests.codes.ok)
525 self.assertIn('PCE calculation in progress',
526 response['output']['configuration-response-common']['response-message'])
527 time.sleep(self.WAITING)
529 def test_34_check_tapi_topology_T0(self):
530 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
531 response = test_utils.transportpce_api_rpc_request(
532 'tapi-topology', 'get-topology-details', self.tapi_topo)
533 self.assertEqual(response['status_code'], requests.codes.ok)
534 nodes = response["output"]["topology"]["node"]
535 links = response["output"]["topology"]["link"]
536 self.assertEqual(5, len(nodes), 'Topology should contain 5 nodes')
537 self.assertEqual(6, len(links), 'Topology should contain 6 links')
538 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OTS link name"),
539 'Topology should contain 4 ots links')
540 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "otn link name"),
541 'Topology should contain 2 otn links')
543 if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
544 self.assertEqual(0, link["available-capacity"]["total-size"]["value"],
545 'OTU4 link should have an available capacity of 0 Mbps')
546 elif link["name"][0]["value"] == "ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
547 self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
548 'ODU4 link should have an available capacity of 100 000 Mbps')
549 # elif link["name"][0]["value-name"] == "transitional link name":
550 # self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
551 # 'link should have an available capacity of 100 Gbps')
552 self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
554 def test_35_connect_sprda_2_n2_to_roadma_pp3(self):
555 response = test_utils.transportpce_api_rpc_request(
556 'transportpce-networkutils', 'init-xpdr-rdm-links',
557 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
558 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
559 self.assertEqual(response['status_code'], requests.codes.ok)
560 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
561 CREATED_SUCCESSFULLY)
564 def test_36_connect_roadma_pp3_to_spdra_2_n2(self):
565 response = test_utils.transportpce_api_rpc_request(
566 'transportpce-networkutils', 'init-rdm-xpdr-links',
567 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
568 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
569 self.assertEqual(response['status_code'], requests.codes.ok)
570 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
571 CREATED_SUCCESSFULLY)
574 def test_37_check_tapi_topology_T0(self):
575 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
576 response = test_utils.transportpce_api_rpc_request(
577 'tapi-topology', 'get-topology-details', self.tapi_topo)
578 self.assertEqual(response['status_code'], requests.codes.ok)
579 nodes = response["output"]["topology"]["node"]
580 links = response["output"]["topology"]["link"]
581 self.assertEqual(6, len(nodes), 'Topology should contain 6 nodes')
582 self.assertEqual(7, len(links), 'Topology should contain 7 links')
583 self.assertEqual(6, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
584 'Topology should contain 6 otsi nodes')
585 self.assertEqual(5, count_object_with_double_key(links, "name", "value-name", "OTS link name"),
586 'Topology should contain 5 ots links')
587 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "otn link name"),
588 'Topology should contain 2 otn links')
589 self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
590 'Topology should contain 5 dsr nodes')
592 def test_38_delete_ODU4_service(self):
593 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
594 response = test_utils.transportpce_api_rpc_request(
595 'org-openroadm-service', 'service-delete',
596 self.del_serv_input_data)
597 self.assertEqual(response['status_code'], requests.codes.ok)
598 self.assertIn('Renderer service delete in progress',
599 response['output']['configuration-response-common']['response-message'])
600 time.sleep(self.WAITING)
602 def test_39_delete_OCH_OTU4_service(self):
603 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
604 response = test_utils.transportpce_api_rpc_request(
605 'org-openroadm-service', 'service-delete',
606 self.del_serv_input_data)
607 self.assertEqual(response['status_code'], requests.codes.ok)
608 self.assertIn('Renderer service delete in progress',
609 response['output']['configuration-response-common']['response-message'])
610 time.sleep(self.WAITING)
612 def test_40_check_tapi_topology_T0(self):
613 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
614 response = test_utils.transportpce_api_rpc_request(
615 'tapi-topology', 'get-topology-details', self.tapi_topo)
616 self.assertEqual(response['status_code'], requests.codes.ok)
617 nodes = response["output"]["topology"]["node"]
618 links = response["output"]["topology"]["link"]
619 self.assertEqual(6, len(nodes), 'Topology should contain 6 nodes')
620 self.assertEqual(5, len(links), 'Topology should contain 5 links')
621 self.assertEqual(0, count_object_with_double_key(links, "name", "value-name", "otn link name"),
622 'Topology should contain 0 otn link')
624 def test_41_disconnect_xponders_from_roadm(self):
625 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
626 self.assertEqual(response['status_code'], requests.codes.ok)
627 links = response['network'][0]['ietf-network-topology:link']
629 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
630 response = test_utils.del_ietf_network_link_request(
631 'openroadm-topology', link['link-id'], 'config')
632 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
634 def test_42_check_tapi_topology_T0(self):
635 self.tapi_topo["topology-id"] = test_utils.T0_MULTILAYER_TOPO_UUID
636 response = test_utils.transportpce_api_rpc_request(
637 'tapi-topology', 'get-topology-details', self.tapi_topo)
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
640 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
641 # self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
642 # 'node name should be: ROADM-infra')
643 self.assertEqual("ROADM-infra",
644 response["output"]["topology"]["node"][0]["name"][count_position_name_from_value_name(
645 response["output"]["topology"]["node"][0]["name"], "otsi node name")]["value"],
646 'node name should be: ROADM-infra')
648 def test_43_get_tapi_topology_T100G(self):
649 self.tapi_topo["topology-id"] = test_utils.T100GE
650 response = test_utils.transportpce_api_rpc_request(
651 'tapi-topology', 'get-topology-details', self.tapi_topo)
652 self.assertEqual(response['status_code'], requests.codes.ok)
653 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
654 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
655 self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
656 'Node should contain no owned-node-edge-points')
658 def test_44_disconnect_roadma(self):
659 response = test_utils.unmount_device("ROADM-A1")
660 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
662 def test_45_disconnect_roadmc(self):
663 response = test_utils.unmount_device("ROADM-C1")
664 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
666 def test_46_check_tapi_topos(self):
667 self.test_01_get_tapi_topology_T100G()
668 self.test_02_get_tapi_topology_T0()
670 def test_47_disconnect_xpdra(self):
671 response = test_utils.unmount_device("XPDR-A1")
672 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
674 def test_48_disconnect_xpdrc(self):
675 response = test_utils.unmount_device("XPDR-C1")
676 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
678 def test_49_disconnect_spdr_sa1(self):
679 response = test_utils.unmount_device("SPDR-SA1")
680 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
682 def test_50_disconnect_spdr_sc1(self):
683 response = test_utils.unmount_device("SPDR-SC1")
684 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
687 # def count_object_with_double_key(list_dicts, key1, key2, value):
689 # for dictio in list_dicts:
691 # if dictio[key1][0][key2] == value:
695 def count_object_with_double_key(list_dicts, key1, key2, value):
697 for dictio in list_dicts:
698 for name in dictio[key1]:
699 if name[key2] == value:
704 def count_position_name_from_value_name(name, valuename):
708 if names["value-name"] == valuename:
714 if __name__ == "__main__":
715 unittest.main(verbosity=2)