2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 # pylint: disable=wrong-import-order
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils # nopep8
24 # pylint: disable=too-few-public-methods
29 # pylint: disable=invalid-name
36 class TransportPCEFulltesting(unittest.TestCase):
38 cr_serv_input_data = {
41 "layer-protocol-name": "DSR",
42 "service-interface-point": {
43 "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
45 "administrative-state": "UNLOCKED",
46 "operational-state": "ENABLED",
47 "direction": "BIDIRECTIONAL",
49 "protection-role": "WORK",
50 "local-id": "XPDR-C1-XPDR1",
53 "value-name": "OpenROADM node id",
54 "value": "XPDR-C1-XPDR1"
59 "layer-protocol-name": "DSR",
60 "service-interface-point": {
61 "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
63 "administrative-state": "UNLOCKED",
64 "operational-state": "ENABLED",
65 "direction": "BIDIRECTIONAL",
67 "protection-role": "WORK",
68 "local-id": "XPDR-A1-XPDR1",
71 "value-name": "OpenROADM node id",
72 "value": "XPDR-A1-XPDR1"
77 "connectivity-constraint": {
78 "service-layer": "ETH",
79 "service-type": "POINT_TO_POINT_CONNECTIVITY",
80 "service-level": "Some service-level",
81 "requested-capacity": {
91 del_serv_input_data = {"service-id-or-name": "TBD"}
93 tapi_topo = {"topology-id-or-name": "TBD"}
96 "topology-id-or-name": "TBD",
97 "node-id-or-name": "TBD"
100 tapi_serv_details = {"service-id-or-name": "TBD"}
103 uuid_services = UuidServices()
104 WAITING = 25 # nominal value is 300
105 NODE_VERSION_221 = '2.2.1'
109 # pylint: disable=unsubscriptable-object
110 cls.init_failed = False
111 os.environ['JAVA_MIN_MEM'] = '1024M'
112 os.environ['JAVA_MAX_MEM'] = '4096M'
113 cls.processes = test_utils.start_tpce()
114 # TAPI feature is not installed by default in Karaf
115 if "NO_ODL_STARTUP" not in os.environ or "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
116 print("installing tapi feature...")
117 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
118 if result.returncode != 0:
119 cls.init_failed = True
120 print("Restarting OpenDaylight...")
121 test_utils.shutdown_process(cls.processes[0])
122 cls.processes[0] = test_utils.start_karaf()
123 test_utils.process_list[0] = cls.processes[0]
124 cls.init_failed = not test_utils.wait_until_log_contains(
125 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
127 print("tapi installation feature failed...")
128 test_utils.shutdown_process(cls.processes[0])
130 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
131 ('roadma', cls.NODE_VERSION_221),
132 ('roadmc', cls.NODE_VERSION_221),
133 ('xpdrc', cls.NODE_VERSION_221)])
136 def tearDownClass(cls):
137 # pylint: disable=not-an-iterable
138 for process in cls.processes:
139 test_utils.shutdown_process(process)
140 print("all processes killed")
142 def setUp(self): # instruction executed before each test method
143 # pylint: disable=consider-using-f-string
144 print("execution of {}".format(self.id().split(".")[-1]))
147 def test_01_connect_xpdrA(self):
148 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_02_connect_xpdrC(self):
152 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_03_connect_rdmA(self):
156 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
157 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
159 def test_04_connect_rdmC(self):
160 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
161 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
163 def test_05_connect_xpdrA_N1_to_roadmA_PP1(self):
164 response = test_utils.transportpce_api_rpc_request(
165 'transportpce-networkutils', 'init-xpdr-rdm-links',
166 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
167 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
168 self.assertEqual(response['status_code'], requests.codes.ok)
169 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
172 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
173 response = test_utils.transportpce_api_rpc_request(
174 'transportpce-networkutils', 'init-rdm-xpdr-links',
175 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
176 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
177 self.assertEqual(response['status_code'], requests.codes.ok)
178 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
181 def test_07_connect_xpdrC_N1_to_roadmC_PP1(self):
182 response = test_utils.transportpce_api_rpc_request(
183 'transportpce-networkutils', 'init-xpdr-rdm-links',
184 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
185 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
186 self.assertEqual(response['status_code'], requests.codes.ok)
187 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
190 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
191 response = test_utils.transportpce_api_rpc_request(
192 'transportpce-networkutils', 'init-rdm-xpdr-links',
193 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
194 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
195 self.assertEqual(response['status_code'], requests.codes.ok)
196 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
199 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
200 # Config ROADMA-ROADMC oms-attributes
202 "auto-spanloss": "true",
203 "spanloss-base": 11.4,
204 "spanloss-current": 12,
205 "engineered-spanloss": 12.2,
206 "link-concatenation": [{
209 "SRLG-length": 100000,
211 response = test_utils.add_oms_attr_request(
212 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
213 self.assertEqual(response.status_code, requests.codes.created)
215 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
216 # Config ROADMC-ROADMA oms-attributes
218 "auto-spanloss": "true",
219 "spanloss-base": 11.4,
220 "spanloss-current": 12,
221 "engineered-spanloss": 12.2,
222 "link-concatenation": [{
225 "SRLG-length": 100000,
227 response = test_utils.add_oms_attr_request(
228 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
229 self.assertEqual(response.status_code, requests.codes.created)
231 # test service-create for Eth service from xpdr to xpdr
232 def test_11_create_connectivity_service_Ethernet(self):
233 response = test_utils.transportpce_api_rpc_request(
234 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
235 time.sleep(self.WAITING)
236 self.uuid_services.eth = response['output']['service']['uuid']
237 # pylint: disable=consider-using-f-string
239 input_dict_1 = {'administrative-state': 'LOCKED',
240 'lifecycle-state': 'PLANNED',
241 'operational-state': 'DISABLED',
242 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
243 'service-layer': 'ETH',
244 'connectivity-direction': 'BIDIRECTIONAL'
246 input_dict_2 = {'value-name': 'OpenROADM node id',
247 'value': 'XPDR-C1-XPDR1'}
248 input_dict_3 = {'value-name': 'OpenROADM node id',
249 'value': 'XPDR-A1-XPDR1'}
251 self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
252 response['output']['service'])
253 self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
254 response['output']['service']['end-point'][0]['name'][0])
255 self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
256 response['output']['service']['end-point'][1]['name'][0])
257 # If the gate fails is because of the waiting time not being enough
258 # time.sleep(self.WAITING)
260 def test_12_get_service_Ethernet(self):
261 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
262 self.assertEqual(response['status_code'], requests.codes.ok)
263 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
264 self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.eth))
265 self.assertEqual(response['services'][0]['connection-type'], 'service')
266 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
269 def test_13_get_connectivity_service_Ethernet(self):
270 self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
271 response = test_utils.transportpce_api_rpc_request(
272 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
273 self.assertEqual(response['status_code'], requests.codes.ok)
274 self.assertEqual(response['output']['service']['operational-state'], 'ENABLED')
275 self.assertEqual(response['output']['service']['name'][0]['value'], self.uuid_services.eth)
276 self.assertEqual(response['output']['service']['administrative-state'], 'UNLOCKED')
277 self.assertEqual(response['output']['service']['lifecycle-state'], 'INSTALLED')
279 def test_14_change_status_line_port_xpdrc(self):
280 self.assertTrue(test_utils.sims_update_cp_port(('xpdrc', self.NODE_VERSION_221), '1/0/1-PLUG-NET', '1',
284 "administrative-state": "outOfService",
285 "port-qual": "xpdr-network"
289 def test_15_check_update_portmapping(self):
290 response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
291 self.assertEqual(response['status_code'], requests.codes.ok)
292 mapping_list = response['nodes'][0]['mapping']
293 for mapping in mapping_list:
294 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
295 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
296 "Operational State should be 'OutOfService'")
297 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
298 "Administrative State should be 'OutOfService'")
300 self.assertEqual(mapping['port-oper-state'], 'InService',
301 "Operational State should be 'InService'")
302 self.assertEqual(mapping['port-admin-state'], 'InService',
303 "Administrative State should be 'InService'")
306 def test_16_check_update_openroadm_topo(self):
307 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
308 self.assertEqual(response['status_code'], requests.codes.ok)
309 node_list = response['network'][0]['node']
311 for node in node_list:
312 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
313 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
314 tp_list = node['ietf-network-topology:termination-point']
316 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
317 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
318 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
321 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
322 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
323 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
325 link_list = response['network'][0]['ietf-network-topology:link']
326 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
327 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
329 for link in link_list:
330 if link['link-id'] in updated_links:
331 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
332 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
335 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
336 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
337 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
340 def test_17_check_update_tapi_neps(self):
341 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
342 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
343 response = test_utils.transportpce_api_rpc_request(
344 'tapi-topology', 'get-node-details', self.node_details)
345 self.assertEqual(response['status_code'], requests.codes.ok)
346 nep_list = response['output']['node']['owned-node-edge-point']
349 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
350 self.assertEqual(nep['operational-state'], 'DISABLED',
351 "Operational State should be 'DISABLED'")
352 self.assertEqual(nep['administrative-state'], 'LOCKED',
353 "Administrative State should be 'LOCKED'")
356 self.assertEqual(nep['operational-state'], 'ENABLED',
357 "Operational State should be 'ENABLED'")
358 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
359 "Administrative State should be 'UNLOCKED'")
360 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
361 response = test_utils.transportpce_api_rpc_request(
362 'tapi-topology', 'get-node-details', self.node_details)
363 self.assertEqual(response['status_code'], requests.codes.ok)
364 nep_list = response['output']['node']['owned-node-edge-point']
366 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
367 self.assertEqual(nep['operational-state'], 'DISABLED',
368 "Operational State should be 'DISABLED'")
369 self.assertEqual(nep['administrative-state'], 'LOCKED',
370 "Administrative State should be 'LOCKED'")
373 self.assertEqual(nep['operational-state'], 'ENABLED',
374 "Operational State should be 'ENABLED'")
375 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
376 "Administrative State should be 'UNLOCKED'")
377 self.assertEqual(nb_updated_neps, 4, "Only two xponder neps should have been modified")
380 def test_18_check_update_tapi_links(self):
381 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
382 response = test_utils.transportpce_api_rpc_request(
383 'tapi-topology', 'get-topology-details', self.tapi_topo)
385 self.assertEqual(response['status_code'], requests.codes.ok)
386 link_list = response['output']['topology']['link']
388 for link in link_list:
389 if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']):
390 self.assertEqual(link['operational-state'], 'DISABLED')
391 self.assertEqual(link['administrative-state'], 'LOCKED')
394 self.assertEqual(link['operational-state'], 'ENABLED')
395 self.assertEqual(link['administrative-state'], 'UNLOCKED')
396 self.assertEqual(nb_updated_link, 2,
397 "Only two xponder-output/input & xponder-transi links should have been modified")
400 def test_19_check_update_service_Ethernet(self):
401 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
402 self.assertEqual(response['status_code'], requests.codes.ok)
403 self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
404 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
406 def test_20_check_update_connectivity_service_Ethernet(self):
407 self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
408 response = test_utils.transportpce_api_rpc_request(
409 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
410 self.assertEqual(response['status_code'], requests.codes.ok)
411 self.assertEqual(response['output']['service']['operational-state'], 'DISABLED')
412 self.assertEqual(response['output']['service']['administrative-state'], 'LOCKED')
415 def test_21_restore_status_line_port_xpdrc(self):
416 self.assertTrue(test_utils.sims_update_cp_port(('xpdrc', self.NODE_VERSION_221), '1/0/1-PLUG-NET', '1',
420 "administrative-state": "inService",
421 "port-qual": "xpdr-network"
425 def test_22_check_update_portmapping_ok(self):
426 response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
427 self.assertEqual(response['status_code'], requests.codes.ok)
428 mapping_list = response['nodes'][0]['mapping']
429 for mapping in mapping_list:
430 self.assertEqual(mapping['port-oper-state'], 'InService',
431 "Operational State should be 'InService'")
432 self.assertEqual(mapping['port-admin-state'], 'InService',
433 "Administrative State should be 'InService'")
436 def test_23_check_update_openroadm_topo_ok(self):
437 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
438 self.assertEqual(response['status_code'], requests.codes.ok)
439 node_list = response['network'][0]['node']
440 for node in node_list:
441 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
442 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
443 tp_list = node['ietf-network-topology:termination-point']
445 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
446 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
448 link_list = response['network'][0]['ietf-network-topology:link']
449 for link in link_list:
450 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
451 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
454 def test_24_check_update_tapi_neps_ok(self):
455 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
456 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
457 response = test_utils.transportpce_api_rpc_request(
458 'tapi-topology', 'get-node-details', self.node_details)
459 self.assertEqual(response['status_code'], requests.codes.ok)
460 nep_list = response['output']['node']['owned-node-edge-point']
462 self.assertEqual(nep['operational-state'], 'ENABLED',
463 "Operational State should be 'ENABLED'")
464 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
465 "Administrative State should be 'UNLOCKED'")
467 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
468 response = test_utils.transportpce_api_rpc_request(
469 'tapi-topology', 'get-node-details', self.node_details)
470 self.assertEqual(response['status_code'], requests.codes.ok)
471 nep_list = response['output']['node']['owned-node-edge-point']
473 self.assertEqual(nep['operational-state'], 'ENABLED',
474 "Operational State should be 'ENABLED'")
475 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
476 "Administrative State should be 'UNLOCKED'")
479 def test_25_check_update_tapi_links_ok(self):
480 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
481 response = test_utils.transportpce_api_rpc_request(
482 'tapi-topology', 'get-topology-details', self.tapi_topo)
484 link_list = response['output']['topology']['link']
485 for link in link_list:
486 self.assertEqual(link['operational-state'], 'ENABLED')
487 self.assertEqual(link['administrative-state'], 'UNLOCKED')
490 def test_26_check_update_service1_ok(self):
491 self.test_12_get_service_Ethernet()
493 def test_27_check_update_connectivity_service_Ethernet_ok(self):
494 self.test_13_get_connectivity_service_Ethernet()
496 def test_28_change_status_port_roadma_srg(self):
497 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C1',
500 "logical-connection-point": "SRG1-PP1",
501 "port-type": "client",
502 "circuit-id": "SRG1",
503 "administrative-state": "outOfService",
504 "port-qual": "roadm-external"
508 def test_29_check_update_portmapping(self):
509 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
510 self.assertEqual(response['status_code'], requests.codes.ok)
511 mapping_list = response['nodes'][0]['mapping']
512 for mapping in mapping_list:
513 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
514 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
515 "Operational State should be 'OutOfService'")
516 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
517 "Administrative State should be 'OutOfService'")
519 self.assertEqual(mapping['port-oper-state'], 'InService',
520 "Operational State should be 'InService'")
521 self.assertEqual(mapping['port-admin-state'], 'InService',
522 "Administrative State should be 'InService'")
525 def test_30_check_update_openroadm_topo(self):
526 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
527 self.assertEqual(response['status_code'], requests.codes.ok)
528 node_list = response['network'][0]['node']
530 for node in node_list:
531 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
532 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
533 tp_list = node['ietf-network-topology:termination-point']
535 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
536 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
537 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
540 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
541 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
542 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
544 link_list = response['network'][0]['ietf-network-topology:link']
545 updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
546 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
548 for link in link_list:
549 if link['link-id'] in updated_links:
550 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
551 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
554 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
555 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
556 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
559 def test_31_check_update_tapi_neps(self):
560 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
561 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
562 response = test_utils.transportpce_api_rpc_request(
563 'tapi-topology', 'get-node-details', self.node_details)
564 self.assertEqual(response['status_code'], requests.codes.ok)
565 nep_list = response['output']['node']['owned-node-edge-point']
568 if 'SRG1-PP1-TXRX' in nep['name'][0]['value']:
569 self.assertEqual(nep['operational-state'], 'DISABLED',
570 "Operational State should be 'DISABLED'")
571 self.assertEqual(nep['administrative-state'], 'LOCKED',
572 "Administrative State should be 'LOCKED'")
575 self.assertEqual(nep['operational-state'], 'ENABLED',
576 "Operational State should be 'ENABLED'")
577 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
578 "Administrative State should be 'UNLOCKED'")
579 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
582 def test_32_check_update_tapi_links(self):
583 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
584 response = test_utils.transportpce_api_rpc_request(
585 'tapi-topology', 'get-topology-details', self.tapi_topo)
587 link_list = response['output']['topology']['link']
589 for link in link_list:
590 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']):
591 self.assertEqual(link['operational-state'], 'DISABLED')
592 self.assertEqual(link['administrative-state'], 'LOCKED')
595 self.assertEqual(link['operational-state'], 'ENABLED')
596 self.assertEqual(link['administrative-state'], 'UNLOCKED')
597 self.assertEqual(nb_updated_link, 1,
598 "Only one xponder-output/input link should have been modified")
601 def test_33_check_update_service_Ethernet(self):
602 self.test_19_check_update_service_Ethernet()
604 def test_34_check_update_connectivity_service_Ethernet(self):
605 self.test_20_check_update_connectivity_service_Ethernet()
607 def test_35_restore_status_port_roadma_srg(self):
608 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C1',
611 "logical-connection-point": "SRG1-PP1",
612 "port-type": "client",
613 "circuit-id": "SRG1",
614 "administrative-state": "inService",
615 "port-qual": "roadm-external"
619 def test_36_check_update_portmapping_ok(self):
620 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
621 self.assertEqual(response['status_code'], requests.codes.ok)
622 mapping_list = response['nodes'][0]['mapping']
623 for mapping in mapping_list:
624 self.assertEqual(mapping['port-oper-state'], 'InService',
625 "Operational State should be 'InService'")
626 self.assertEqual(mapping['port-admin-state'], 'InService',
627 "Administrative State should be 'InService'")
630 def test_37_check_update_openroadm_topo_ok(self):
631 self.test_23_check_update_openroadm_topo_ok()
633 def test_38_check_update_tapi_neps_ok(self):
634 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
635 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
636 response = test_utils.transportpce_api_rpc_request(
637 'tapi-topology', 'get-node-details', self.node_details)
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 nep_list = response['output']['node']['owned-node-edge-point']
641 self.assertEqual(nep['operational-state'], 'ENABLED',
642 "Operational State should be 'ENABLED'")
643 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
644 "Administrative State should be 'UNLOCKED'")
648 def test_39_check_update_tapi_links_ok(self):
649 self.test_25_check_update_tapi_links_ok()
651 def test_40_check_update_service1_ok(self):
652 self.test_12_get_service_Ethernet()
654 def test_41_check_update_connectivity_service_Ethernet_ok(self):
655 self.test_13_get_connectivity_service_Ethernet()
657 def test_42_change_status_line_port_roadma_deg(self):
658 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '2/0', 'L1',
661 "logical-connection-point": "DEG2-TTP-TXRX",
664 "administrative-state": "outOfService",
665 "port-qual": "roadm-external"
669 def test_43_check_update_portmapping(self):
670 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
671 self.assertEqual(response['status_code'], requests.codes.ok)
672 mapping_list = response['nodes'][0]['mapping']
673 for mapping in mapping_list:
674 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
675 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
676 "Operational State should be 'OutOfService'")
677 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
678 "Administrative State should be 'OutOfService'")
680 self.assertEqual(mapping['port-oper-state'], 'InService',
681 "Operational State should be 'InService'")
682 self.assertEqual(mapping['port-admin-state'], 'InService',
683 "Administrative State should be 'InService'")
686 def test_44_check_update_openroadm_topo(self):
687 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
688 self.assertEqual(response['status_code'], requests.codes.ok)
689 node_list = response['network'][0]['node']
691 for node in node_list:
692 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
693 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
694 tp_list = node['ietf-network-topology:termination-point']
696 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
697 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
698 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
701 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
702 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
703 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
705 link_list = response['network'][0]['ietf-network-topology:link']
706 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
707 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
709 for link in link_list:
710 if link['link-id'] in updated_links:
711 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
712 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
715 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
716 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
717 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
720 def test_45_check_update_tapi_neps(self):
721 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
722 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
723 response = test_utils.transportpce_api_rpc_request(
724 'tapi-topology', 'get-node-details', self.node_details)
725 self.assertEqual(response['status_code'], requests.codes.ok)
726 nep_list = response['output']['node']['owned-node-edge-point']
729 if 'DEG2-TTP-TXRX' in nep['name'][0]['value']:
730 self.assertEqual(nep['operational-state'], 'DISABLED',
731 "Operational State should be 'DISABLED'")
732 self.assertEqual(nep['administrative-state'], 'LOCKED',
733 "Administrative State should be 'LOCKED'")
736 self.assertEqual(nep['operational-state'], 'ENABLED',
737 "Operational State should be 'ENABLED'")
738 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
739 "Administrative State should be 'UNLOCKED'")
740 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
743 def test_46_check_update_tapi_links(self):
744 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
745 response = test_utils.transportpce_api_rpc_request(
746 'tapi-topology', 'get-topology-details', self.tapi_topo)
748 self.assertEqual(response['status_code'], requests.codes.ok)
749 link_list = response['output']['topology']['link']
751 for link in link_list:
752 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']):
753 self.assertEqual(link['operational-state'], 'DISABLED')
754 self.assertEqual(link['administrative-state'], 'LOCKED')
757 self.assertEqual(link['operational-state'], 'ENABLED')
758 self.assertEqual(link['administrative-state'], 'UNLOCKED')
759 self.assertEqual(nb_updated_link, 1,
760 "Only one rdm-rdm link should have been modified")
763 def test_47_check_update_service_Ethernet(self):
764 self.test_19_check_update_service_Ethernet()
766 def test_48_check_update_connectivity_service_Ethernet(self):
767 self.test_20_check_update_connectivity_service_Ethernet()
769 def test_49_restore_status_line_port_roadma_deg(self):
770 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '2/0', 'L1',
773 "logical-connection-point": "DEG2-TTP-TXRX",
776 "administrative-state": "inService",
777 "port-qual": "roadm-external"
781 def test_50_check_update_portmapping_ok(self):
782 self.test_36_check_update_portmapping_ok()
784 def test_51_check_update_openroadm_topo_ok(self):
785 self.test_23_check_update_openroadm_topo_ok()
787 def test_52_check_update_tapi_neps_ok(self):
788 self.test_38_check_update_tapi_neps_ok()
790 def test_53_check_update_tapi_links_ok(self):
791 self.test_25_check_update_tapi_links_ok()
793 def test_54_check_update_service1_ok(self):
794 self.test_12_get_service_Ethernet()
796 def test_55_check_update_connectivity_service_Ethernet_ok(self):
797 self.test_13_get_connectivity_service_Ethernet()
799 def test_56_change_status_port_roadma_srg(self):
800 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C2',
803 "logical-connection-point": "SRG1-PP2",
804 "port-type": "client",
805 "circuit-id": "SRG1",
806 "administrative-state": "outOfService",
807 "port-qual": "roadm-external"
811 def test_57_check_update_portmapping(self):
812 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
813 self.assertEqual(response['status_code'], requests.codes.ok)
814 mapping_list = response['nodes'][0]['mapping']
815 for mapping in mapping_list:
816 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
817 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
818 "Operational State should be 'OutOfService'")
819 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
820 "Administrative State should be 'OutOfService'")
822 self.assertEqual(mapping['port-oper-state'], 'InService',
823 "Operational State should be 'InService'")
824 self.assertEqual(mapping['port-admin-state'], 'InService',
825 "Administrative State should be 'InService'")
828 def test_58_check_update_openroadm_topo(self):
829 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
830 self.assertEqual(response['status_code'], requests.codes.ok)
831 node_list = response['network'][0]['node']
833 for node in node_list:
834 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
835 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
836 tp_list = node['ietf-network-topology:termination-point']
838 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
839 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
840 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
843 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
844 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
845 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
847 link_list = response['network'][0]['ietf-network-topology:link']
849 for link in link_list:
850 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
851 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
852 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
855 def test_59_check_update_tapi_neps(self):
856 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
857 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
858 response = test_utils.transportpce_api_rpc_request(
859 'tapi-topology', 'get-node-details', self.node_details)
860 self.assertEqual(response['status_code'], requests.codes.ok)
861 nep_list = response['output']['node']['owned-node-edge-point']
864 if 'SRG1-PP2-TXRX' in nep['name'][0]['value']:
865 self.assertEqual(nep['operational-state'], 'DISABLED',
866 "Operational State should be 'DISABLED'")
867 self.assertEqual(nep['administrative-state'], 'LOCKED',
868 "Administrative State should be 'LOCKED'")
871 self.assertEqual(nep['operational-state'], 'ENABLED',
872 "Operational State should be 'ENABLED'")
873 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
874 "Administrative State should be 'UNLOCKED'")
875 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
878 def test_60_check_update_tapi_links(self):
879 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
880 response = test_utils.transportpce_api_rpc_request(
881 'tapi-topology', 'get-topology-details', self.tapi_topo)
883 self.assertEqual(response['status_code'], requests.codes.ok)
884 link_list = response['output']['topology']['link']
886 for link in link_list:
887 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']):
888 self.assertEqual(link['operational-state'], 'DISABLED')
889 self.assertEqual(link['administrative-state'], 'LOCKED')
892 self.assertEqual(link['operational-state'], 'ENABLED')
893 self.assertEqual(link['administrative-state'], 'UNLOCKED')
894 self.assertEqual(nb_updated_link, 0,
895 "No link should have been modified")
898 def test_61_check_update_service1_ok(self):
899 self.test_12_get_service_Ethernet()
901 def test_62_check_update_connectivity_service_Ethernet_ok(self):
902 self.test_13_get_connectivity_service_Ethernet()
904 def test_63_delete_connectivity_service_Ethernet(self):
905 self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.eth)
906 response = test_utils.transportpce_api_rpc_request(
907 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
908 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
909 time.sleep(self.WAITING)
911 def test_64_disconnect_xponders_from_roadm(self):
912 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
913 self.assertEqual(response['status_code'], requests.codes.ok)
914 links = response['network'][0]['ietf-network-topology:link']
916 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
917 response = test_utils.del_ietf_network_link_request(
918 'openroadm-topology', link['link-id'], 'config')
919 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
921 def test_65_disconnect_XPDRA(self):
922 response = test_utils.unmount_device("XPDR-A1")
923 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
925 def test_66_disconnect_XPDRC(self):
926 response = test_utils.unmount_device("XPDR-C1")
927 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
929 def test_67_disconnect_ROADMA(self):
930 response = test_utils.unmount_device("ROADM-A1")
931 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
933 def test_68_disconnect_ROADMC(self):
934 response = test_utils.unmount_device("ROADM-C1")
935 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
937 def test_69_restore_status_port_roadma_srg(self):
938 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C2',
941 "logical-connection-point": "SRG1-PP2",
942 "port-type": "client",
943 "circuit-id": "SRG1",
944 "administrative-state": "inService",
945 "port-qual": "roadm-external"
949 def test_70_clean_openroadm_topology(self):
950 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
951 self.assertEqual(response['status_code'], requests.codes.ok)
952 links = response['network'][0]['ietf-network-topology:link']
954 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT', 'ROADM-TO-ROADM'):
955 response = test_utils.del_ietf_network_link_request('openroadm-topology', link['link-id'], 'config')
956 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
959 if __name__ == "__main__":
960 unittest.main(verbosity=2)