2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
25 # pylint: disable=too-few-public-methods
30 # pylint: disable=invalid-name
37 class TransportPCEFulltesting(unittest.TestCase):
39 cr_serv_input_data = {
42 "layer-protocol-name": "DSR",
43 "service-interface-point": {
44 "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
46 "administrative-state": "UNLOCKED",
47 "operational-state": "ENABLED",
48 "direction": "BIDIRECTIONAL",
50 "protection-role": "WORK",
51 "local-id": "XPDR-C1-XPDR1",
54 "value-name": "OpenROADM node id",
55 "value": "XPDR-C1-XPDR1"
60 "layer-protocol-name": "DSR",
61 "service-interface-point": {
62 "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
64 "administrative-state": "UNLOCKED",
65 "operational-state": "ENABLED",
66 "direction": "BIDIRECTIONAL",
68 "protection-role": "WORK",
69 "local-id": "XPDR-A1-XPDR1",
72 "value-name": "OpenROADM node id",
73 "value": "XPDR-A1-XPDR1"
78 "connectivity-constraint": {
79 "service-layer": "ETH",
80 "service-type": "POINT_TO_POINT_CONNECTIVITY",
81 "service-level": "Some service-level",
82 "requested-capacity": {
92 del_serv_input_data = {"service-id-or-name": "TBD"}
94 tapi_topo = {"topology-id-or-name": "TBD"}
97 "topology-id-or-name": "TBD",
98 "node-id-or-name": "TBD"
101 tapi_serv_details = {"service-id-or-name": "TBD"}
104 uuid_services = UuidServices()
105 WAITING = 25 # nominal value is 300
106 NODE_VERSION_221 = '2.2.1'
110 # pylint: disable=unsubscriptable-object
111 cls.init_failed = False
112 os.environ['JAVA_MIN_MEM'] = '1024M'
113 os.environ['JAVA_MAX_MEM'] = '4096M'
114 cls.processes = test_utils.start_tpce()
115 # TAPI feature is not installed by default in Karaf
116 if "NO_ODL_STARTUP" not in os.environ or "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
117 print("installing tapi feature...")
118 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
119 if result.returncode != 0:
120 cls.init_failed = True
121 print("Restarting OpenDaylight...")
122 test_utils.shutdown_process(cls.processes[0])
123 cls.processes[0] = test_utils.start_karaf()
124 test_utils.process_list[0] = cls.processes[0]
125 cls.init_failed = not test_utils.wait_until_log_contains(
126 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
128 print("tapi installation feature failed...")
129 test_utils.shutdown_process(cls.processes[0])
131 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
132 ('roadma', cls.NODE_VERSION_221),
133 ('roadmc', cls.NODE_VERSION_221),
134 ('xpdrc', cls.NODE_VERSION_221)])
137 def tearDownClass(cls):
138 # pylint: disable=not-an-iterable
139 for process in cls.processes:
140 test_utils.shutdown_process(process)
141 print("all processes killed")
143 def setUp(self): # instruction executed before each test method
144 # pylint: disable=consider-using-f-string
145 print("execution of {}".format(self.id().split(".")[-1]))
148 def test_01_connect_xpdrA(self):
149 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_02_connect_xpdrC(self):
153 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_03_connect_rdmA(self):
157 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
158 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_04_connect_rdmC(self):
161 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
162 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
164 def test_05_connect_xpdrA_N1_to_roadmA_PP1(self):
165 response = test_utils.transportpce_api_rpc_request(
166 'transportpce-networkutils', 'init-xpdr-rdm-links',
167 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
168 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
169 self.assertEqual(response['status_code'], requests.codes.ok)
170 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
173 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
174 response = test_utils.transportpce_api_rpc_request(
175 'transportpce-networkutils', 'init-rdm-xpdr-links',
176 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
177 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
178 self.assertEqual(response['status_code'], requests.codes.ok)
179 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
182 def test_07_connect_xpdrC_N1_to_roadmC_PP1(self):
183 response = test_utils.transportpce_api_rpc_request(
184 'transportpce-networkutils', 'init-xpdr-rdm-links',
185 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
186 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
187 self.assertEqual(response['status_code'], requests.codes.ok)
188 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
191 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
192 response = test_utils.transportpce_api_rpc_request(
193 'transportpce-networkutils', 'init-rdm-xpdr-links',
194 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
195 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
196 self.assertEqual(response['status_code'], requests.codes.ok)
197 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
200 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
201 # Config ROADMA-ROADMC oms-attributes
203 "auto-spanloss": "true",
204 "spanloss-base": 11.4,
205 "spanloss-current": 12,
206 "engineered-spanloss": 12.2,
207 "link-concatenation": [{
210 "SRLG-length": 100000,
212 response = test_utils.add_oms_attr_request(
213 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
214 self.assertEqual(response.status_code, requests.codes.created)
216 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
217 # Config ROADMC-ROADMA oms-attributes
219 "auto-spanloss": "true",
220 "spanloss-base": 11.4,
221 "spanloss-current": 12,
222 "engineered-spanloss": 12.2,
223 "link-concatenation": [{
226 "SRLG-length": 100000,
228 response = test_utils.add_oms_attr_request(
229 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
230 self.assertEqual(response.status_code, requests.codes.created)
232 # test service-create for Eth service from xpdr to xpdr
233 def test_11_create_connectivity_service_Ethernet(self):
234 response = test_utils.transportpce_api_rpc_request(
235 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
236 time.sleep(self.WAITING)
237 self.uuid_services.eth = response['output']['service']['uuid']
238 # pylint: disable=consider-using-f-string
240 input_dict_1 = {'administrative-state': 'LOCKED',
241 'lifecycle-state': 'PLANNED',
242 'operational-state': 'DISABLED',
243 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
244 'service-layer': 'ETH',
245 'connectivity-direction': 'BIDIRECTIONAL'
247 input_dict_2 = {'value-name': 'OpenROADM node id',
248 'value': 'XPDR-C1-XPDR1'}
249 input_dict_3 = {'value-name': 'OpenROADM node id',
250 'value': 'XPDR-A1-XPDR1'}
252 self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
253 response['output']['service'])
254 self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
255 response['output']['service']['end-point'][0]['name'][0])
256 self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
257 response['output']['service']['end-point'][1]['name'][0])
258 # If the gate fails is because of the waiting time not being enough
259 # time.sleep(self.WAITING)
261 def test_12_get_service_Ethernet(self):
262 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
263 self.assertEqual(response['status_code'], requests.codes.ok)
264 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
265 self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.eth))
266 self.assertEqual(response['services'][0]['connection-type'], 'service')
267 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
270 def test_13_get_connectivity_service_Ethernet(self):
271 self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
272 response = test_utils.transportpce_api_rpc_request(
273 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
274 self.assertEqual(response['status_code'], requests.codes.ok)
275 self.assertEqual(response['output']['service']['operational-state'], 'ENABLED')
276 self.assertEqual(response['output']['service']['name'][0]['value'], self.uuid_services.eth)
277 self.assertEqual(response['output']['service']['administrative-state'], 'UNLOCKED')
278 self.assertEqual(response['output']['service']['lifecycle-state'], 'INSTALLED')
280 def test_14_change_status_line_port_xpdrc(self):
281 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
285 "administrative-state": "outOfService",
286 "port-qual": "xpdr-network"}]}
287 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
288 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
289 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
290 timeout=test_utils.REQUEST_TIMEOUT)
291 self.assertEqual(response.status_code, requests.codes.ok)
294 def test_15_check_update_portmapping(self):
295 response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
296 self.assertEqual(response['status_code'], requests.codes.ok)
297 mapping_list = response['nodes'][0]['mapping']
298 for mapping in mapping_list:
299 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
300 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
301 "Operational State should be 'OutOfService'")
302 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
303 "Administrative State should be 'OutOfService'")
305 self.assertEqual(mapping['port-oper-state'], 'InService',
306 "Operational State should be 'InService'")
307 self.assertEqual(mapping['port-admin-state'], 'InService',
308 "Administrative State should be 'InService'")
311 def test_16_check_update_openroadm_topo(self):
312 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
313 self.assertEqual(response['status_code'], requests.codes.ok)
314 node_list = response['network'][0]['node']
316 for node in node_list:
317 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
318 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
319 tp_list = node['ietf-network-topology:termination-point']
321 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
322 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
323 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
326 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
327 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
328 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
330 link_list = response['network'][0]['ietf-network-topology:link']
331 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
332 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
334 for link in link_list:
335 if link['link-id'] in updated_links:
336 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
337 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
340 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
341 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
342 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
345 def test_17_check_update_tapi_neps(self):
346 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
347 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
348 response = test_utils.transportpce_api_rpc_request(
349 'tapi-topology', 'get-node-details', self.node_details)
350 self.assertEqual(response['status_code'], requests.codes.ok)
351 nep_list = response['output']['node']['owned-node-edge-point']
354 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
355 self.assertEqual(nep['operational-state'], 'DISABLED',
356 "Operational State should be 'DISABLED'")
357 self.assertEqual(nep['administrative-state'], 'LOCKED',
358 "Administrative State should be 'LOCKED'")
361 self.assertEqual(nep['operational-state'], 'ENABLED',
362 "Operational State should be 'ENABLED'")
363 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
364 "Administrative State should be 'UNLOCKED'")
365 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
366 response = test_utils.transportpce_api_rpc_request(
367 'tapi-topology', 'get-node-details', self.node_details)
368 self.assertEqual(response['status_code'], requests.codes.ok)
369 nep_list = response['output']['node']['owned-node-edge-point']
371 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
372 self.assertEqual(nep['operational-state'], 'DISABLED',
373 "Operational State should be 'DISABLED'")
374 self.assertEqual(nep['administrative-state'], 'LOCKED',
375 "Administrative State should be 'LOCKED'")
378 self.assertEqual(nep['operational-state'], 'ENABLED',
379 "Operational State should be 'ENABLED'")
380 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
381 "Administrative State should be 'UNLOCKED'")
382 self.assertEqual(nb_updated_neps, 4, "Only two xponder neps should have been modified")
385 def test_18_check_update_tapi_links(self):
386 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
387 response = test_utils.transportpce_api_rpc_request(
388 'tapi-topology', 'get-topology-details', self.tapi_topo)
390 self.assertEqual(response['status_code'], requests.codes.ok)
391 link_list = response['output']['topology']['link']
393 for link in link_list:
394 if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']):
395 self.assertEqual(link['operational-state'], 'DISABLED')
396 self.assertEqual(link['administrative-state'], 'LOCKED')
399 self.assertEqual(link['operational-state'], 'ENABLED')
400 self.assertEqual(link['administrative-state'], 'UNLOCKED')
401 self.assertEqual(nb_updated_link, 2,
402 "Only two xponder-output/input & xponder-transi links should have been modified")
405 def test_19_check_update_service_Ethernet(self):
406 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
407 self.assertEqual(response['status_code'], requests.codes.ok)
408 self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
409 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
411 def test_20_check_update_connectivity_service_Ethernet(self):
412 self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
413 response = test_utils.transportpce_api_rpc_request(
414 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
415 self.assertEqual(response['status_code'], requests.codes.ok)
416 self.assertEqual(response['output']['service']['operational-state'], 'DISABLED')
417 self.assertEqual(response['output']['service']['administrative-state'], 'LOCKED')
420 def test_21_restore_status_line_port_xpdrc(self):
421 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
425 "administrative-state": "inService",
426 "port-qual": "xpdr-network"}]}
427 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
428 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
429 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
430 timeout=test_utils.REQUEST_TIMEOUT)
431 self.assertEqual(response.status_code, requests.codes.ok)
434 def test_22_check_update_portmapping_ok(self):
435 response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
436 self.assertEqual(response['status_code'], requests.codes.ok)
437 mapping_list = response['nodes'][0]['mapping']
438 for mapping in mapping_list:
439 self.assertEqual(mapping['port-oper-state'], 'InService',
440 "Operational State should be 'InService'")
441 self.assertEqual(mapping['port-admin-state'], 'InService',
442 "Administrative State should be 'InService'")
445 def test_23_check_update_openroadm_topo_ok(self):
446 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
447 self.assertEqual(response['status_code'], requests.codes.ok)
448 node_list = response['network'][0]['node']
449 for node in node_list:
450 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
451 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
452 tp_list = node['ietf-network-topology:termination-point']
454 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
455 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
457 link_list = response['network'][0]['ietf-network-topology:link']
458 for link in link_list:
459 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
460 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
463 def test_24_check_update_tapi_neps_ok(self):
464 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
465 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
466 response = test_utils.transportpce_api_rpc_request(
467 'tapi-topology', 'get-node-details', self.node_details)
468 self.assertEqual(response['status_code'], requests.codes.ok)
469 nep_list = response['output']['node']['owned-node-edge-point']
471 self.assertEqual(nep['operational-state'], 'ENABLED',
472 "Operational State should be 'ENABLED'")
473 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
474 "Administrative State should be 'UNLOCKED'")
476 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
477 response = test_utils.transportpce_api_rpc_request(
478 'tapi-topology', 'get-node-details', self.node_details)
479 self.assertEqual(response['status_code'], requests.codes.ok)
480 nep_list = response['output']['node']['owned-node-edge-point']
482 self.assertEqual(nep['operational-state'], 'ENABLED',
483 "Operational State should be 'ENABLED'")
484 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
485 "Administrative State should be 'UNLOCKED'")
488 def test_25_check_update_tapi_links_ok(self):
489 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
490 response = test_utils.transportpce_api_rpc_request(
491 'tapi-topology', 'get-topology-details', self.tapi_topo)
493 link_list = response['output']['topology']['link']
494 for link in link_list:
495 self.assertEqual(link['operational-state'], 'ENABLED')
496 self.assertEqual(link['administrative-state'], 'UNLOCKED')
499 def test_26_check_update_service1_ok(self):
500 self.test_12_get_service_Ethernet()
502 def test_27_check_update_connectivity_service_Ethernet_ok(self):
503 self.test_13_get_connectivity_service_Ethernet()
505 def test_28_change_status_port_roadma_srg(self):
506 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C1',
509 "logical-connection-point": "SRG1-PP1",
510 "port-type": "client",
511 "circuit-id": "SRG1",
512 "administrative-state": "outOfService",
513 "port-qual": "roadm-external"
517 def test_29_check_update_portmapping(self):
518 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
519 self.assertEqual(response['status_code'], requests.codes.ok)
520 mapping_list = response['nodes'][0]['mapping']
521 for mapping in mapping_list:
522 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
523 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
524 "Operational State should be 'OutOfService'")
525 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
526 "Administrative State should be 'OutOfService'")
528 self.assertEqual(mapping['port-oper-state'], 'InService',
529 "Operational State should be 'InService'")
530 self.assertEqual(mapping['port-admin-state'], 'InService',
531 "Administrative State should be 'InService'")
534 def test_30_check_update_openroadm_topo(self):
535 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
536 self.assertEqual(response['status_code'], requests.codes.ok)
537 node_list = response['network'][0]['node']
539 for node in node_list:
540 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
541 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
542 tp_list = node['ietf-network-topology:termination-point']
544 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
545 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
546 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
549 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
550 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
551 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
553 link_list = response['network'][0]['ietf-network-topology:link']
554 updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
555 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
557 for link in link_list:
558 if link['link-id'] in updated_links:
559 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
560 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
563 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
564 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
565 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
568 def test_31_check_update_tapi_neps(self):
569 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
570 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
571 response = test_utils.transportpce_api_rpc_request(
572 'tapi-topology', 'get-node-details', self.node_details)
573 self.assertEqual(response['status_code'], requests.codes.ok)
574 nep_list = response['output']['node']['owned-node-edge-point']
577 if 'SRG1-PP1-TXRX' in nep['name'][0]['value']:
578 self.assertEqual(nep['operational-state'], 'DISABLED',
579 "Operational State should be 'DISABLED'")
580 self.assertEqual(nep['administrative-state'], 'LOCKED',
581 "Administrative State should be 'LOCKED'")
584 self.assertEqual(nep['operational-state'], 'ENABLED',
585 "Operational State should be 'ENABLED'")
586 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
587 "Administrative State should be 'UNLOCKED'")
588 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
591 def test_32_check_update_tapi_links(self):
592 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
593 response = test_utils.transportpce_api_rpc_request(
594 'tapi-topology', 'get-topology-details', self.tapi_topo)
596 link_list = response['output']['topology']['link']
598 for link in link_list:
599 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']):
600 self.assertEqual(link['operational-state'], 'DISABLED')
601 self.assertEqual(link['administrative-state'], 'LOCKED')
604 self.assertEqual(link['operational-state'], 'ENABLED')
605 self.assertEqual(link['administrative-state'], 'UNLOCKED')
606 self.assertEqual(nb_updated_link, 1,
607 "Only one xponder-output/input link should have been modified")
610 def test_33_check_update_service_Ethernet(self):
611 self.test_19_check_update_service_Ethernet()
613 def test_34_check_update_connectivity_service_Ethernet(self):
614 self.test_20_check_update_connectivity_service_Ethernet()
616 def test_35_restore_status_port_roadma_srg(self):
617 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C1',
620 "logical-connection-point": "SRG1-PP1",
621 "port-type": "client",
622 "circuit-id": "SRG1",
623 "administrative-state": "inService",
624 "port-qual": "roadm-external"
628 def test_36_check_update_portmapping_ok(self):
629 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
630 self.assertEqual(response['status_code'], requests.codes.ok)
631 mapping_list = response['nodes'][0]['mapping']
632 for mapping in mapping_list:
633 self.assertEqual(mapping['port-oper-state'], 'InService',
634 "Operational State should be 'InService'")
635 self.assertEqual(mapping['port-admin-state'], 'InService',
636 "Administrative State should be 'InService'")
639 def test_37_check_update_openroadm_topo_ok(self):
640 self.test_23_check_update_openroadm_topo_ok()
642 def test_38_check_update_tapi_neps_ok(self):
643 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
644 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
645 response = test_utils.transportpce_api_rpc_request(
646 'tapi-topology', 'get-node-details', self.node_details)
647 self.assertEqual(response['status_code'], requests.codes.ok)
648 nep_list = response['output']['node']['owned-node-edge-point']
650 self.assertEqual(nep['operational-state'], 'ENABLED',
651 "Operational State should be 'ENABLED'")
652 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
653 "Administrative State should be 'UNLOCKED'")
657 def test_39_check_update_tapi_links_ok(self):
658 self.test_25_check_update_tapi_links_ok()
660 def test_40_check_update_service1_ok(self):
661 self.test_12_get_service_Ethernet()
663 def test_41_check_update_connectivity_service_Ethernet_ok(self):
664 self.test_13_get_connectivity_service_Ethernet()
666 def test_42_change_status_line_port_roadma_deg(self):
667 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '2/0', 'L1',
670 "logical-connection-point": "DEG2-TTP-TXRX",
673 "administrative-state": "outOfService",
674 "port-qual": "roadm-external"
678 def test_43_check_update_portmapping(self):
679 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
680 self.assertEqual(response['status_code'], requests.codes.ok)
681 mapping_list = response['nodes'][0]['mapping']
682 for mapping in mapping_list:
683 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
684 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
685 "Operational State should be 'OutOfService'")
686 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
687 "Administrative State should be 'OutOfService'")
689 self.assertEqual(mapping['port-oper-state'], 'InService',
690 "Operational State should be 'InService'")
691 self.assertEqual(mapping['port-admin-state'], 'InService',
692 "Administrative State should be 'InService'")
695 def test_44_check_update_openroadm_topo(self):
696 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
697 self.assertEqual(response['status_code'], requests.codes.ok)
698 node_list = response['network'][0]['node']
700 for node in node_list:
701 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
702 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
703 tp_list = node['ietf-network-topology:termination-point']
705 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
706 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
707 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
710 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
711 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
712 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
714 link_list = response['network'][0]['ietf-network-topology:link']
715 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
716 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
718 for link in link_list:
719 if link['link-id'] in updated_links:
720 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
721 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
724 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
725 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
726 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
729 def test_45_check_update_tapi_neps(self):
730 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
731 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
732 response = test_utils.transportpce_api_rpc_request(
733 'tapi-topology', 'get-node-details', self.node_details)
734 self.assertEqual(response['status_code'], requests.codes.ok)
735 nep_list = response['output']['node']['owned-node-edge-point']
738 if 'DEG2-TTP-TXRX' in nep['name'][0]['value']:
739 self.assertEqual(nep['operational-state'], 'DISABLED',
740 "Operational State should be 'DISABLED'")
741 self.assertEqual(nep['administrative-state'], 'LOCKED',
742 "Administrative State should be 'LOCKED'")
745 self.assertEqual(nep['operational-state'], 'ENABLED',
746 "Operational State should be 'ENABLED'")
747 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
748 "Administrative State should be 'UNLOCKED'")
749 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
752 def test_46_check_update_tapi_links(self):
753 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
754 response = test_utils.transportpce_api_rpc_request(
755 'tapi-topology', 'get-topology-details', self.tapi_topo)
757 self.assertEqual(response['status_code'], requests.codes.ok)
758 link_list = response['output']['topology']['link']
760 for link in link_list:
761 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']):
762 self.assertEqual(link['operational-state'], 'DISABLED')
763 self.assertEqual(link['administrative-state'], 'LOCKED')
766 self.assertEqual(link['operational-state'], 'ENABLED')
767 self.assertEqual(link['administrative-state'], 'UNLOCKED')
768 self.assertEqual(nb_updated_link, 1,
769 "Only one rdm-rdm link should have been modified")
772 def test_47_check_update_service_Ethernet(self):
773 self.test_19_check_update_service_Ethernet()
775 def test_48_check_update_connectivity_service_Ethernet(self):
776 self.test_20_check_update_connectivity_service_Ethernet()
778 def test_49_restore_status_line_port_roadma_deg(self):
779 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '2/0', 'L1',
782 "logical-connection-point": "DEG2-TTP-TXRX",
785 "administrative-state": "inService",
786 "port-qual": "roadm-external"
790 def test_50_check_update_portmapping_ok(self):
791 self.test_36_check_update_portmapping_ok()
793 def test_51_check_update_openroadm_topo_ok(self):
794 self.test_23_check_update_openroadm_topo_ok()
796 def test_52_check_update_tapi_neps_ok(self):
797 self.test_38_check_update_tapi_neps_ok()
799 def test_53_check_update_tapi_links_ok(self):
800 self.test_25_check_update_tapi_links_ok()
802 def test_54_check_update_service1_ok(self):
803 self.test_12_get_service_Ethernet()
805 def test_55_check_update_connectivity_service_Ethernet_ok(self):
806 self.test_13_get_connectivity_service_Ethernet()
808 def test_56_change_status_port_roadma_srg(self):
809 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C2',
812 "logical-connection-point": "SRG1-PP2",
813 "port-type": "client",
814 "circuit-id": "SRG1",
815 "administrative-state": "outOfService",
816 "port-qual": "roadm-external"
820 def test_57_check_update_portmapping(self):
821 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
822 self.assertEqual(response['status_code'], requests.codes.ok)
823 mapping_list = response['nodes'][0]['mapping']
824 for mapping in mapping_list:
825 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
826 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
827 "Operational State should be 'OutOfService'")
828 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
829 "Administrative State should be 'OutOfService'")
831 self.assertEqual(mapping['port-oper-state'], 'InService',
832 "Operational State should be 'InService'")
833 self.assertEqual(mapping['port-admin-state'], 'InService',
834 "Administrative State should be 'InService'")
837 def test_58_check_update_openroadm_topo(self):
838 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
839 self.assertEqual(response['status_code'], requests.codes.ok)
840 node_list = response['network'][0]['node']
842 for node in node_list:
843 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
844 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
845 tp_list = node['ietf-network-topology:termination-point']
847 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
848 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
849 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
852 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
853 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
854 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
856 link_list = response['network'][0]['ietf-network-topology:link']
858 for link in link_list:
859 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
860 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
861 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
864 def test_59_check_update_tapi_neps(self):
865 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
866 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
867 response = test_utils.transportpce_api_rpc_request(
868 'tapi-topology', 'get-node-details', self.node_details)
869 self.assertEqual(response['status_code'], requests.codes.ok)
870 nep_list = response['output']['node']['owned-node-edge-point']
873 if 'SRG1-PP2-TXRX' in nep['name'][0]['value']:
874 self.assertEqual(nep['operational-state'], 'DISABLED',
875 "Operational State should be 'DISABLED'")
876 self.assertEqual(nep['administrative-state'], 'LOCKED',
877 "Administrative State should be 'LOCKED'")
880 self.assertEqual(nep['operational-state'], 'ENABLED',
881 "Operational State should be 'ENABLED'")
882 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
883 "Administrative State should be 'UNLOCKED'")
884 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
887 def test_60_check_update_tapi_links(self):
888 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
889 response = test_utils.transportpce_api_rpc_request(
890 'tapi-topology', 'get-topology-details', self.tapi_topo)
892 self.assertEqual(response['status_code'], requests.codes.ok)
893 link_list = response['output']['topology']['link']
895 for link in link_list:
896 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']):
897 self.assertEqual(link['operational-state'], 'DISABLED')
898 self.assertEqual(link['administrative-state'], 'LOCKED')
901 self.assertEqual(link['operational-state'], 'ENABLED')
902 self.assertEqual(link['administrative-state'], 'UNLOCKED')
903 self.assertEqual(nb_updated_link, 0,
904 "No link should have been modified")
907 def test_61_check_update_service1_ok(self):
908 self.test_12_get_service_Ethernet()
910 def test_62_check_update_connectivity_service_Ethernet_ok(self):
911 self.test_13_get_connectivity_service_Ethernet()
913 def test_63_delete_connectivity_service_Ethernet(self):
914 self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.eth)
915 response = test_utils.transportpce_api_rpc_request(
916 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
917 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
918 time.sleep(self.WAITING)
920 def test_64_disconnect_xponders_from_roadm(self):
921 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
922 self.assertEqual(response['status_code'], requests.codes.ok)
923 links = response['network'][0]['ietf-network-topology:link']
925 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
926 response = test_utils.del_ietf_network_link_request(
927 'openroadm-topology', link['link-id'], 'config')
928 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
930 def test_65_disconnect_XPDRA(self):
931 response = test_utils.unmount_device("XPDR-A1")
932 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
934 def test_66_disconnect_XPDRC(self):
935 response = test_utils.unmount_device("XPDR-C1")
936 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
938 def test_67_disconnect_ROADMA(self):
939 response = test_utils.unmount_device("ROADM-A1")
940 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
942 def test_68_disconnect_ROADMC(self):
943 response = test_utils.unmount_device("ROADM-C1")
944 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
946 def test_69_restore_status_port_roadma_srg(self):
947 self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C2',
950 "logical-connection-point": "SRG1-PP2",
951 "port-type": "client",
952 "circuit-id": "SRG1",
953 "administrative-state": "inService",
954 "port-qual": "roadm-external"
958 def test_70_clean_openroadm_topology(self):
959 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
960 self.assertEqual(response['status_code'], requests.codes.ok)
961 links = response['network'][0]['ietf-network-topology:link']
963 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT', 'ROADM-TO-ROADM'):
964 response = test_utils.del_ietf_network_link_request('openroadm-topology', link['link-id'], 'config')
965 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
968 if __name__ == "__main__":
969 unittest.main(verbosity=2)