2 ##############################################################################
3 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
13 # some pylint false positives specific to tapi test
14 # pylint: disable=unsubscriptable-object
15 # pylint: disable=unsupported-assignment-operation
18 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils # nopep8
29 CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully'
32 class TransportTapitesting(unittest.TestCase):
36 NODE_VERSION = '2.2.1'
37 cr_serv_input_data = {
38 "sdnc-request-header": {
39 "request-id": "request-1",
40 "rpc-action": "service-create",
41 "request-system-id": "appname"
43 "service-name": "service1-OCH-OTU4",
44 "common-id": "commonId",
45 "connection-type": "infrastructure",
47 "service-rate": "100",
48 "node-id": "SPDR-SA1",
49 "service-format": "OTU",
50 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
54 "port-device-name": "SPDR-SA1-XPDR1",
56 "port-name": "XPDR1-NETWORK1",
57 "port-rack": "000000.00",
58 "port-shelf": "Chassis#1"
61 "lgx-device-name": "Some lgx-device-name",
62 "lgx-port-name": "Some lgx-port-name",
63 "lgx-port-rack": "000000.00",
64 "lgx-port-shelf": "00"
70 "port-device-name": "SPDR-SA1-XPDR1",
72 "port-name": "XPDR1-NETWORK1",
73 "port-rack": "000000.00",
74 "port-shelf": "Chassis#1"
77 "lgx-device-name": "Some lgx-device-name",
78 "lgx-port-name": "Some lgx-port-name",
79 "lgx-port-rack": "000000.00",
80 "lgx-port-shelf": "00"
87 "service-rate": "100",
88 "node-id": "SPDR-SC1",
89 "service-format": "OTU",
90 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
94 "port-device-name": "SPDR-SC1-XPDR1",
96 "port-name": "XPDR1-NETWORK1",
97 "port-rack": "000000.00",
98 "port-shelf": "Chassis#1"
101 "lgx-device-name": "Some lgx-device-name",
102 "lgx-port-name": "Some lgx-port-name",
103 "lgx-port-rack": "000000.00",
104 "lgx-port-shelf": "00"
110 "port-device-name": "SPDR-SC1-XPDR1",
111 "port-type": "fixed",
112 "port-name": "XPDR1-NETWORK1",
113 "port-rack": "000000.00",
114 "port-shelf": "Chassis#1"
117 "lgx-device-name": "Some lgx-device-name",
118 "lgx-port-name": "Some lgx-port-name",
119 "lgx-port-rack": "000000.00",
120 "lgx-port-shelf": "00"
126 "due-date": "2018-06-15T00:00:01Z",
127 "operator-contact": "pw1234"
130 del_serv_input_data = {
131 "sdnc-request-header": {
132 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
133 "rpc-action": "service-delete",
134 "request-system-id": "appname",
135 "notification-url": "http://localhost:8585/NotificationServer/notify"},
136 "service-delete-req-info": {
137 "service-name": "TBD",
138 "tail-retention": "no"}
141 tapi_topo = {"topology-id-or-name": "TBD"}
145 cls.init_failed = False
146 os.environ['JAVA_MIN_MEM'] = '1024M'
147 os.environ['JAVA_MAX_MEM'] = '4096M'
148 cls.processes = test_utils.start_tpce()
149 # TAPI feature is not installed by default in Karaf
150 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
151 print("installing tapi feature...")
152 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
153 if result.returncode != 0:
154 cls.init_failed = True
156 print("tapi installation feature failed...")
157 test_utils.shutdown_process(cls.processes[0])
159 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
160 ('roadma', cls.NODE_VERSION),
161 ('roadmb', cls.NODE_VERSION),
162 ('roadmc', cls.NODE_VERSION),
163 ('xpdrc', cls.NODE_VERSION),
164 ('spdra', cls.NODE_VERSION),
165 ('spdrc', cls.NODE_VERSION)])
168 def tearDownClass(cls):
169 # pylint: disable=not-an-iterable
170 for process in cls.processes:
171 test_utils.shutdown_process(process)
172 print("all processes killed")
174 def setUp(self): # instruction executed before each test method
176 self.fail('Feature installation failed')
177 # pylint: disable=consider-using-f-string
178 print("execution of {}".format(self.id().split(".")[-1]))
180 def test_01_get_tapi_topology_T100G(self):
181 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
182 response = test_utils.transportpce_api_rpc_request(
183 'tapi-topology', 'get-topology-details', self.tapi_topo)
184 self.assertEqual(response['status_code'], requests.codes.ok)
185 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
186 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
187 self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
188 'Node should contain no owned-node-edge-points')
189 self.assertEqual("Tpdr100g over WDM node", response["output"]["topology"]["node"][0]["name"][0]["value"],
190 'node name should be: Tpdr100g over WDM node')
191 self.assertIn("ETH", response["output"]["topology"]["node"][0]["layer-protocol-name"],
192 'Node layer protocol should contain ETH')
193 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
194 'node should contain 1 node rule group')
196 def test_02_get_tapi_topology_T0(self):
197 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
198 response = test_utils.transportpce_api_rpc_request(
199 'tapi-topology', 'get-topology-details', self.tapi_topo)
200 self.assertEqual(response['status_code'], requests.codes.ok)
201 self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
202 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
204 def test_03_connect_rdmb(self):
205 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
206 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
208 def test_04_check_tapi_topos(self):
209 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
210 response = test_utils.transportpce_api_rpc_request(
211 'tapi-topology', 'get-topology-details', self.tapi_topo)
212 self.assertEqual(response['status_code'], requests.codes.ok)
213 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
214 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
216 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
217 response = test_utils.transportpce_api_rpc_request(
218 'tapi-topology', 'get-topology-details', self.tapi_topo)
219 self.assertEqual(response['status_code'], requests.codes.ok)
220 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
221 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
223 def test_05_disconnect_roadmb(self):
224 response = test_utils.unmount_device("ROADM-B1")
225 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
227 def test_06_connect_xpdra(self):
228 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
229 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
231 def test_07_check_tapi_topos(self):
232 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
233 response = test_utils.transportpce_api_rpc_request(
234 'tapi-topology', 'get-topology-details', self.tapi_topo)
235 self.assertEqual(response['status_code'], requests.codes.ok)
236 self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
237 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
239 def test_08_connect_rdma(self):
240 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
241 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
243 def test_09_connect_rdmc(self):
244 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
245 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
247 def test_10_check_tapi_topos(self):
248 self.test_01_get_tapi_topology_T100G()
250 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
251 response = test_utils.transportpce_api_rpc_request(
252 'tapi-topology', 'get-topology-details', self.tapi_topo)
253 self.assertEqual(response['status_code'], requests.codes.ok)
254 self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
255 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
256 self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
257 'node name should be: ROADM-infra')
258 self.assertIn("PHOTONIC_MEDIA", response["output"]["topology"]["node"][0]["layer-protocol-name"],
259 'Node layer protocol should contain PHOTONIC_MEDIA')
260 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
261 'node should contain 1 node rule group')
263 def test_11_connect_xprda_n1_to_roadma_pp1(self):
264 response = test_utils.transportpce_api_rpc_request(
265 'transportpce-networkutils', 'init-xpdr-rdm-links',
266 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
267 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
268 self.assertEqual(response['status_code'], requests.codes.ok)
269 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
270 CREATED_SUCCESSFULLY)
273 def test_12_connect_roadma_pp1_to_xpdra_n1(self):
274 response = test_utils.transportpce_api_rpc_request(
275 'transportpce-networkutils', 'init-rdm-xpdr-links',
276 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
277 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
278 self.assertEqual(response['status_code'], requests.codes.ok)
279 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
280 CREATED_SUCCESSFULLY)
283 def test_13_check_tapi_topology_T100G(self):
284 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
285 response = test_utils.transportpce_api_rpc_request(
286 'tapi-topology', 'get-topology-details', self.tapi_topo)
287 self.assertEqual(response['status_code'], requests.codes.ok)
288 self.assertEqual(1, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
289 'Node should contain 1 owned-node-edge-points')
290 self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
291 response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
292 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
294 def test_14_check_tapi_topology_T0(self):
295 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
296 response = test_utils.transportpce_api_rpc_request(
297 'tapi-topology', 'get-topology-details', self.tapi_topo)
298 self.assertEqual(response['status_code'], requests.codes.ok)
299 nodes = response["output"]["topology"]["node"]
300 links = response["output"]["topology"]["link"]
301 self.assertEqual(3, len(nodes), 'Topology should contain 3 nodes')
302 self.assertEqual(2, len(links), 'Topology should contain 2 links')
303 self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
304 'Topology should contain 2 otsi nodes')
305 self.assertEqual(1, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
306 'Topology should contain 1 dsr node')
307 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
308 'Topology should contain 1 transitional link')
309 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
310 'Topology should contain 1 oms link')
312 def test_15_connect_xpdrc(self):
313 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
314 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
316 def test_16_connect_xprdc_n1_to_roadmc_pp1(self):
317 response = test_utils.transportpce_api_rpc_request(
318 'transportpce-networkutils', 'init-xpdr-rdm-links',
319 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
320 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
321 self.assertEqual(response['status_code'], requests.codes.ok)
322 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
323 CREATED_SUCCESSFULLY)
326 def test_17_connect_roadmc_pp1_to_xpdrc_n1(self):
327 response = test_utils.transportpce_api_rpc_request(
328 'transportpce-networkutils', 'init-rdm-xpdr-links',
329 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
330 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
331 self.assertEqual(response['status_code'], requests.codes.ok)
332 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
333 CREATED_SUCCESSFULLY)
336 def test_18_check_tapi_topology_T100G(self):
337 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
338 response = test_utils.transportpce_api_rpc_request(
339 'tapi-topology', 'get-topology-details', self.tapi_topo)
340 self.assertEqual(response['status_code'], requests.codes.ok)
341 self.assertEqual(2, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
342 'Node should contain 2 owned-node-edge-points')
343 self.assertEqual("XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1",
344 response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
345 'name of owned-node-edge-points should be XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1')
346 self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
347 response["output"]["topology"]["node"][0]["owned-node-edge-point"][1]["name"][0]["value"],
348 'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
350 def test_19_check_tapi_topology_T0(self):
351 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
352 response = test_utils.transportpce_api_rpc_request(
353 'tapi-topology', 'get-topology-details', self.tapi_topo)
354 self.assertEqual(response['status_code'], requests.codes.ok)
355 nodes = response["output"]["topology"]["node"]
356 links = response["output"]["topology"]["link"]
357 self.assertEqual(5, len(nodes), 'Topology should contain 5 nodes')
358 self.assertEqual(4, len(links), 'Topology should contain 4 links')
359 self.assertEqual(3, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
360 'Topology should contain 3 otsi nodes')
361 self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
362 'Topology should contain 2 dsr nodes')
363 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
364 'Topology should contain 2 transitional links')
365 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
366 'Topology should contain 2 oms links')
368 def test_20_connect_spdr_sa1(self):
369 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
370 self.assertEqual(response.status_code,
371 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
373 def test_21_connect_spdr_sc1(self):
374 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
375 self.assertEqual(response.status_code,
376 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
378 def test_22_check_tapi_topology_T100G(self):
379 self.test_18_check_tapi_topology_T100G()
381 def test_23_check_tapi_topology_T0(self):
382 self.test_19_check_tapi_topology_T0()
384 def test_24_connect_sprda_n1_to_roadma_pp2(self):
385 response = test_utils.transportpce_api_rpc_request(
386 'transportpce-networkutils', 'init-xpdr-rdm-links',
387 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
388 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
389 self.assertEqual(response['status_code'], requests.codes.ok)
390 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
391 CREATED_SUCCESSFULLY)
394 def test_25_connect_roadma_pp2_to_spdra_n1(self):
395 response = test_utils.transportpce_api_rpc_request(
396 'transportpce-networkutils', 'init-rdm-xpdr-links',
397 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
398 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
399 self.assertEqual(response['status_code'], requests.codes.ok)
400 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
401 CREATED_SUCCESSFULLY)
404 def test_26_connect_sprdc_n1_to_roadmc_pp2(self):
405 response = test_utils.transportpce_api_rpc_request(
406 'transportpce-networkutils', 'init-xpdr-rdm-links',
407 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
408 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
409 self.assertEqual(response['status_code'], requests.codes.ok)
410 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
411 CREATED_SUCCESSFULLY)
414 def test_27_connect_roadmc_pp2_to_spdrc_n1(self):
415 response = test_utils.transportpce_api_rpc_request(
416 'transportpce-networkutils', 'init-rdm-xpdr-links',
417 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
418 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
419 self.assertEqual(response['status_code'], requests.codes.ok)
420 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
421 CREATED_SUCCESSFULLY)
424 def test_28_check_tapi_topology_T100G(self):
425 self.test_18_check_tapi_topology_T100G()
427 def test_29_check_tapi_topology_T0(self):
428 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
429 response = test_utils.transportpce_api_rpc_request(
430 'tapi-topology', 'get-topology-details', self.tapi_topo)
431 self.assertEqual(response['status_code'], requests.codes.ok)
432 nodes = response["output"]["topology"]["node"]
433 links = response["output"]["topology"]["link"]
434 self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
435 self.assertEqual(8, len(links), 'Topology should contain 8 links')
436 self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
437 'Topology should contain 5 otsi nodes')
438 self.assertEqual(4, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
439 'Topology should contain 4 dsr nodes')
440 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
441 'Topology should contain 4 transitional links')
442 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
443 'Topology should contain 4 oms links')
445 def test_30_add_oms_attributes(self):
446 # Config ROADMA-ROADMC oms-attributes
448 "auto-spanloss": "true",
449 "spanloss-base": 11.4,
450 "spanloss-current": 12,
451 "engineered-spanloss": 12.2,
452 "link-concatenation": [{
455 "SRLG-length": 100000,
457 response = test_utils.add_oms_attr_request(
458 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
459 self.assertEqual(response.status_code, requests.codes.created)
460 # Config ROADMC-ROADMA oms-attributes
462 "auto-spanloss": "true",
463 "spanloss-base": 11.4,
464 "spanloss-current": 12,
465 "engineered-spanloss": 12.2,
466 "link-concatenation": [{
469 "SRLG-length": 100000,
471 response = test_utils.add_oms_attr_request(
472 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
473 self.assertEqual(response.status_code, requests.codes.created)
475 def test_31_create_OCH_OTU4_service(self):
476 response = test_utils.transportpce_api_rpc_request(
477 'org-openroadm-service', 'service-create',
478 self.cr_serv_input_data)
479 self.assertEqual(response['status_code'], requests.codes.ok)
480 self.assertIn('PCE calculation in progress',
481 response['output']['configuration-response-common']['response-message'])
482 time.sleep(self.WAITING)
484 def test_32_check_tapi_topology_T0(self):
485 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
486 response = test_utils.transportpce_api_rpc_request(
487 'tapi-topology', 'get-topology-details', self.tapi_topo)
488 self.assertEqual(response['status_code'], requests.codes.ok)
489 nodes = response["output"]["topology"]["node"]
490 links = response["output"]["topology"]["link"]
491 self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
492 self.assertEqual(9, len(links), 'Topology should contain 9 links')
493 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
494 'Topology should contain 4 transitional links')
495 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
496 'Topology should contain 4 oms links')
497 self.assertEqual(1, count_object_with_double_key(links, "name", "value-name", "otn link name"),
498 'Topology should contain 1 otn link')
500 if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
501 self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
502 'OTU4 link should have an available capacity of 100 000 Mbps')
503 elif link["name"][0]["value-name"] == "transitional link name":
504 self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
505 'link should have an available capacity of 100 Gbps')
506 self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
508 def test_33_create_ODU4_service(self):
509 self.cr_serv_input_data["service-name"] = "service1-ODU4"
510 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
511 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
512 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
513 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
514 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
515 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
517 response = test_utils.transportpce_api_rpc_request(
518 'org-openroadm-service', 'service-create',
519 self.cr_serv_input_data)
520 self.assertEqual(response['status_code'], requests.codes.ok)
521 self.assertIn('PCE calculation in progress',
522 response['output']['configuration-response-common']['response-message'])
523 time.sleep(self.WAITING)
525 def test_34_check_tapi_topology_T0(self):
526 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
527 response = test_utils.transportpce_api_rpc_request(
528 'tapi-topology', 'get-topology-details', self.tapi_topo)
529 self.assertEqual(response['status_code'], requests.codes.ok)
530 nodes = response["output"]["topology"]["node"]
531 links = response["output"]["topology"]["link"]
532 self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
533 self.assertEqual(10, len(links), 'Topology should contain 10 links')
534 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
535 'Topology should contain 4 transitional links')
536 self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
537 'Topology should contain 4 oms links')
538 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "otn link name"),
539 'Topology should contain 2 otn links')
541 if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
542 self.assertEqual(0, link["available-capacity"]["total-size"]["value"],
543 'OTU4 link should have an available capacity of 0 Mbps')
544 elif link["name"][0]["value"] == "ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
545 self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
546 'ODU4 link should have an available capacity of 100 000 Mbps')
547 elif link["name"][0]["value-name"] == "transitional link name":
548 self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
549 'link should have an available capacity of 100 Gbps')
550 self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
552 def test_35_connect_sprda_2_n2_to_roadma_pp3(self):
553 response = test_utils.transportpce_api_rpc_request(
554 'transportpce-networkutils', 'init-xpdr-rdm-links',
555 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
556 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
557 self.assertEqual(response['status_code'], requests.codes.ok)
558 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
559 CREATED_SUCCESSFULLY)
562 def test_36_connect_roadma_pp3_to_spdra_2_n2(self):
563 response = test_utils.transportpce_api_rpc_request(
564 'transportpce-networkutils', 'init-rdm-xpdr-links',
565 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
566 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
567 self.assertEqual(response['status_code'], requests.codes.ok)
568 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
569 CREATED_SUCCESSFULLY)
572 def test_37_check_tapi_topology_T0(self):
573 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
574 response = test_utils.transportpce_api_rpc_request(
575 'tapi-topology', 'get-topology-details', self.tapi_topo)
576 self.assertEqual(response['status_code'], requests.codes.ok)
577 nodes = response["output"]["topology"]["node"]
578 links = response["output"]["topology"]["link"]
579 self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes')
580 self.assertEqual(12, len(links), 'Topology should contain 12 links')
581 self.assertEqual(6, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
582 'Topology should contain 6 otsi nodes')
583 self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "dsr/odu node name"),
584 'Topology should contain 5 dsr nodes')
585 self.assertEqual(5, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
586 'Topology should contain 5 transitional links')
587 self.assertEqual(5, count_object_with_double_key(links, "name", "value-name", "OMS link name"),
588 'Topology should contain 5 oms links')
589 self.assertEqual(2, count_object_with_double_key(links, "name", "value-name", "otn link name"),
590 'Topology should contain 2 otn links')
592 def test_38_delete_ODU4_service(self):
593 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
594 response = test_utils.transportpce_api_rpc_request(
595 'org-openroadm-service', 'service-delete',
596 self.del_serv_input_data)
597 self.assertEqual(response['status_code'], requests.codes.ok)
598 self.assertIn('Renderer service delete in progress',
599 response['output']['configuration-response-common']['response-message'])
600 time.sleep(self.WAITING)
602 def test_39_delete_OCH_OTU4_service(self):
603 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
604 response = test_utils.transportpce_api_rpc_request(
605 'org-openroadm-service', 'service-delete',
606 self.del_serv_input_data)
607 self.assertEqual(response['status_code'], requests.codes.ok)
608 self.assertIn('Renderer service delete in progress',
609 response['output']['configuration-response-common']['response-message'])
610 time.sleep(self.WAITING)
612 def test_40_check_tapi_topology_T0(self):
613 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
614 response = test_utils.transportpce_api_rpc_request(
615 'tapi-topology', 'get-topology-details', self.tapi_topo)
616 self.assertEqual(response['status_code'], requests.codes.ok)
617 nodes = response["output"]["topology"]["node"]
618 links = response["output"]["topology"]["link"]
619 self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes')
620 self.assertEqual(10, len(links), 'Topology should contain 10 links')
621 self.assertEqual(0, count_object_with_double_key(links, "name", "value-name", "otn link name"),
622 'Topology should contain 0 otn link')
624 def test_41_disconnect_xponders_from_roadm(self):
625 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
626 self.assertEqual(response['status_code'], requests.codes.ok)
627 links = response['network'][0]['ietf-network-topology:link']
629 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
630 response = test_utils.del_ietf_network_link_request(
631 'openroadm-topology', link['link-id'], 'config')
632 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
634 def test_42_check_tapi_topology_T0(self):
635 self.tapi_topo["topology-id-or-name"] = test_utils.T0_MULTILAYER_TOPO
636 response = test_utils.transportpce_api_rpc_request(
637 'tapi-topology', 'get-topology-details', self.tapi_topo)
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
640 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
641 self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
642 'node name should be: ROADM-infra')
644 def test_43_get_tapi_topology_T100G(self):
645 self.tapi_topo["topology-id-or-name"] = test_utils.T100GE
646 response = test_utils.transportpce_api_rpc_request(
647 'tapi-topology', 'get-topology-details', self.tapi_topo)
648 self.assertEqual(response['status_code'], requests.codes.ok)
649 self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
650 self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
651 self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
652 'Node should contain no owned-node-edge-points')
654 def test_44_disconnect_roadma(self):
655 response = test_utils.unmount_device("ROADM-A1")
656 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
658 def test_45_disconnect_roadmc(self):
659 response = test_utils.unmount_device("ROADM-C1")
660 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
662 def test_46_check_tapi_topos(self):
663 self.test_01_get_tapi_topology_T100G()
664 self.test_02_get_tapi_topology_T0()
666 def test_47_disconnect_xpdra(self):
667 response = test_utils.unmount_device("XPDR-A1")
668 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
670 def test_48_disconnect_xpdrc(self):
671 response = test_utils.unmount_device("XPDR-C1")
672 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
674 def test_49_disconnect_spdr_sa1(self):
675 response = test_utils.unmount_device("SPDR-SA1")
676 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
678 def test_50_disconnect_spdr_sc1(self):
679 response = test_utils.unmount_device("SPDR-SC1")
680 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
683 def count_object_with_double_key(list_dicts, key1, key2, value):
685 for dictio in list_dicts:
686 if dictio[key1][0][key2] == value:
691 if __name__ == "__main__":
692 unittest.main(verbosity=2)