2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
25 # pylint: disable=too-few-public-methods
30 # pylint: disable=invalid-name
37 class TransportPCEFulltesting(unittest.TestCase):
39 cr_serv_input_data = {
42 "layer-protocol-name": "DSR",
43 "service-interface-point": {
44 "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
46 "administrative-state": "UNLOCKED",
47 "operational-state": "ENABLED",
48 "direction": "BIDIRECTIONAL",
50 "protection-role": "WORK",
51 "local-id": "XPDR-C1-XPDR1",
54 "value-name": "OpenROADM node id",
55 "value": "XPDR-C1-XPDR1"
60 "layer-protocol-name": "DSR",
61 "service-interface-point": {
62 "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
64 "administrative-state": "UNLOCKED",
65 "operational-state": "ENABLED",
66 "direction": "BIDIRECTIONAL",
68 "protection-role": "WORK",
69 "local-id": "XPDR-A1-XPDR1",
72 "value-name": "OpenROADM node id",
73 "value": "XPDR-A1-XPDR1"
78 "connectivity-constraint": {
79 "service-layer": "ETH",
80 "service-type": "POINT_TO_POINT_CONNECTIVITY",
81 "service-level": "Some service-level",
82 "requested-capacity": {
92 del_serv_input_data = {"service-id-or-name": "TBD"}
94 tapi_topo = {"topology-id-or-name": "TBD"}
97 "topology-id-or-name": "TBD",
98 "node-id-or-name": "TBD"
101 tapi_serv_details = {"service-id-or-name": "TBD"}
104 uuid_services = UuidServices()
105 WAITING = 25 # nominal value is 300
106 NODE_VERSION_221 = '2.2.1'
110 # pylint: disable=unsubscriptable-object
111 cls.init_failed = False
112 os.environ['JAVA_MIN_MEM'] = '1024M'
113 os.environ['JAVA_MAX_MEM'] = '4096M'
114 cls.processes = test_utils.start_tpce()
115 # TAPI feature is not installed by default in Karaf
116 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
117 print("installing tapi feature...")
118 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
119 if result.returncode != 0:
120 cls.init_failed = True
122 print("tapi installation feature failed...")
123 test_utils.shutdown_process(cls.processes[0])
125 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
126 ('roadma', cls.NODE_VERSION_221),
127 ('roadmc', cls.NODE_VERSION_221),
128 ('xpdrc', cls.NODE_VERSION_221)])
131 def tearDownClass(cls):
132 # pylint: disable=not-an-iterable
133 for process in cls.processes:
134 test_utils.shutdown_process(process)
135 print("all processes killed")
137 def setUp(self): # instruction executed before each test method
138 # pylint: disable=consider-using-f-string
139 print("execution of {}".format(self.id().split(".")[-1]))
142 def test_01_connect_xpdrA(self):
143 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
144 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146 def test_02_connect_xpdrC(self):
147 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
148 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150 def test_03_connect_rdmA(self):
151 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
152 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
154 def test_04_connect_rdmC(self):
155 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
156 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
158 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
159 response = test_utils.transportpce_api_rpc_request(
160 'transportpce-networkutils', 'init-xpdr-rdm-links',
161 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
162 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
163 self.assertEqual(response['status_code'], requests.codes.ok)
164 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
167 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
168 response = test_utils.transportpce_api_rpc_request(
169 'transportpce-networkutils', 'init-rdm-xpdr-links',
170 {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
171 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
172 self.assertEqual(response['status_code'], requests.codes.ok)
173 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
176 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
177 response = test_utils.transportpce_api_rpc_request(
178 'transportpce-networkutils', 'init-xpdr-rdm-links',
179 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
180 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
181 self.assertEqual(response['status_code'], requests.codes.ok)
182 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
185 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
186 response = test_utils.transportpce_api_rpc_request(
187 'transportpce-networkutils', 'init-rdm-xpdr-links',
188 {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
189 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
190 self.assertEqual(response['status_code'], requests.codes.ok)
191 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
194 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
195 # Config ROADMA-ROADMC oms-attributes
197 "auto-spanloss": "true",
198 "spanloss-base": 11.4,
199 "spanloss-current": 12,
200 "engineered-spanloss": 12.2,
201 "link-concatenation": [{
204 "SRLG-length": 100000,
206 response = test_utils.add_oms_attr_request(
207 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
208 self.assertEqual(response.status_code, requests.codes.created)
210 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
211 # Config ROADMC-ROADMA oms-attributes
213 "auto-spanloss": "true",
214 "spanloss-base": 11.4,
215 "spanloss-current": 12,
216 "engineered-spanloss": 12.2,
217 "link-concatenation": [{
220 "SRLG-length": 100000,
222 response = test_utils.add_oms_attr_request(
223 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
224 self.assertEqual(response.status_code, requests.codes.created)
226 # test service-create for Eth service from xpdr to xpdr
227 def test_11_create_connectivity_service_Ethernet(self):
228 response = test_utils.transportpce_api_rpc_request(
229 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
230 time.sleep(self.WAITING)
231 self.uuid_services.eth = response['output']['service']['uuid']
232 # pylint: disable=consider-using-f-string
234 input_dict_1 = {'administrative-state': 'LOCKED',
235 'lifecycle-state': 'PLANNED',
236 'operational-state': 'DISABLED',
237 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
238 'service-layer': 'ETH',
239 'connectivity-direction': 'BIDIRECTIONAL'
241 input_dict_2 = {'value-name': 'OpenROADM node id',
242 'value': 'XPDR-C1-XPDR1'}
243 input_dict_3 = {'value-name': 'OpenROADM node id',
244 'value': 'XPDR-A1-XPDR1'}
246 self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
247 response['output']['service'])
248 self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
249 response['output']['service']['end-point'][0]['name'][0])
250 self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
251 response['output']['service']['end-point'][1]['name'][0])
252 # If the gate fails is because of the waiting time not being enough
253 # time.sleep(self.WAITING)
255 def test_12_get_service_Ethernet(self):
256 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
257 self.assertEqual(response['status_code'], requests.codes.ok)
258 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
259 self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.eth))
260 self.assertEqual(response['services'][0]['connection-type'], 'service')
261 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
264 def test_13_get_connectivity_service_Ethernet(self):
265 self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
266 response = test_utils.transportpce_api_rpc_request(
267 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
268 self.assertEqual(response['status_code'], requests.codes.ok)
269 self.assertEqual(response['output']['service']['operational-state'], 'ENABLED')
270 self.assertEqual(response['output']['service']['name'][0]['value'], self.uuid_services.eth)
271 self.assertEqual(response['output']['service']['administrative-state'], 'UNLOCKED')
272 self.assertEqual(response['output']['service']['lifecycle-state'], 'INSTALLED')
274 def test_14_change_status_line_port_xpdrc(self):
275 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
279 "administrative-state": "outOfService",
280 "port-qual": "xpdr-network"}]}
281 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
282 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
283 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
284 timeout=test_utils.REQUEST_TIMEOUT)
285 self.assertEqual(response.status_code, requests.codes.ok)
288 def test_15_check_update_portmapping(self):
289 response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
290 self.assertEqual(response['status_code'], requests.codes.ok)
291 mapping_list = response['nodes'][0]['mapping']
292 for mapping in mapping_list:
293 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
294 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
295 "Operational State should be 'OutOfService'")
296 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
297 "Administrative State should be 'OutOfService'")
299 self.assertEqual(mapping['port-oper-state'], 'InService',
300 "Operational State should be 'InService'")
301 self.assertEqual(mapping['port-admin-state'], 'InService',
302 "Administrative State should be 'InService'")
305 def test_16_check_update_openroadm_topo(self):
306 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
307 self.assertEqual(response['status_code'], requests.codes.ok)
308 node_list = response['network'][0]['node']
310 for node in node_list:
311 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
312 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
313 tp_list = node['ietf-network-topology:termination-point']
315 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
316 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
317 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
320 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
321 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
322 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
324 link_list = response['network'][0]['ietf-network-topology:link']
325 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
326 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
328 for link in link_list:
329 if link['link-id'] in updated_links:
330 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
331 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
334 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
335 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
336 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
339 def test_17_check_update_tapi_neps(self):
340 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
341 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
342 response = test_utils.transportpce_api_rpc_request(
343 'tapi-topology', 'get-node-details', self.node_details)
344 self.assertEqual(response['status_code'], requests.codes.ok)
345 nep_list = response['output']['node']['owned-node-edge-point']
348 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
349 self.assertEqual(nep['operational-state'], 'DISABLED',
350 "Operational State should be 'DISABLED'")
351 self.assertEqual(nep['administrative-state'], 'LOCKED',
352 "Administrative State should be 'LOCKED'")
355 self.assertEqual(nep['operational-state'], 'ENABLED',
356 "Operational State should be 'ENABLED'")
357 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
358 "Administrative State should be 'UNLOCKED'")
359 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
360 response = test_utils.transportpce_api_rpc_request(
361 'tapi-topology', 'get-node-details', self.node_details)
362 self.assertEqual(response['status_code'], requests.codes.ok)
363 nep_list = response['output']['node']['owned-node-edge-point']
365 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
366 self.assertEqual(nep['operational-state'], 'DISABLED',
367 "Operational State should be 'DISABLED'")
368 self.assertEqual(nep['administrative-state'], 'LOCKED',
369 "Administrative State should be 'LOCKED'")
372 self.assertEqual(nep['operational-state'], 'ENABLED',
373 "Operational State should be 'ENABLED'")
374 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
375 "Administrative State should be 'UNLOCKED'")
376 self.assertEqual(nb_updated_neps, 4, "Only two xponder neps should have been modified")
379 def test_18_check_update_tapi_links(self):
380 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
381 response = test_utils.transportpce_api_rpc_request(
382 'tapi-topology', 'get-topology-details', self.tapi_topo)
384 self.assertEqual(response['status_code'], requests.codes.ok)
385 link_list = response['output']['topology']['link']
387 for link in link_list:
388 if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']):
389 self.assertEqual(link['operational-state'], 'DISABLED')
390 self.assertEqual(link['administrative-state'], 'LOCKED')
393 self.assertEqual(link['operational-state'], 'ENABLED')
394 self.assertEqual(link['administrative-state'], 'UNLOCKED')
395 self.assertEqual(nb_updated_link, 2,
396 "Only two xponder-output/input & xponder-transi links should have been modified")
399 def test_19_check_update_service_Ethernet(self):
400 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
401 self.assertEqual(response['status_code'], requests.codes.ok)
402 self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
403 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
405 def test_20_check_update_connectivity_service_Ethernet(self):
406 self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
407 response = test_utils.transportpce_api_rpc_request(
408 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
409 self.assertEqual(response['status_code'], requests.codes.ok)
410 self.assertEqual(response['output']['service']['operational-state'], 'DISABLED')
411 self.assertEqual(response['output']['service']['administrative-state'], 'LOCKED')
414 def test_21_restore_status_line_port_xpdrc(self):
415 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
419 "administrative-state": "inService",
420 "port-qual": "xpdr-network"}]}
421 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
422 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
423 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
424 timeout=test_utils.REQUEST_TIMEOUT)
425 self.assertEqual(response.status_code, requests.codes.ok)
428 def test_22_check_update_portmapping_ok(self):
429 response = test_utils.get_portmapping_node_attr("XPDR-C1", None, None)
430 self.assertEqual(response['status_code'], requests.codes.ok)
431 mapping_list = response['nodes'][0]['mapping']
432 for mapping in mapping_list:
433 self.assertEqual(mapping['port-oper-state'], 'InService',
434 "Operational State should be 'InService'")
435 self.assertEqual(mapping['port-admin-state'], 'InService',
436 "Administrative State should be 'InService'")
439 def test_23_check_update_openroadm_topo_ok(self):
440 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
441 self.assertEqual(response['status_code'], requests.codes.ok)
442 node_list = response['network'][0]['node']
443 for node in node_list:
444 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
445 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
446 tp_list = node['ietf-network-topology:termination-point']
448 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
449 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
451 link_list = response['network'][0]['ietf-network-topology:link']
452 for link in link_list:
453 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
454 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
457 def test_24_check_update_tapi_neps_ok(self):
458 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
459 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
460 response = test_utils.transportpce_api_rpc_request(
461 'tapi-topology', 'get-node-details', self.node_details)
462 self.assertEqual(response['status_code'], requests.codes.ok)
463 nep_list = response['output']['node']['owned-node-edge-point']
465 self.assertEqual(nep['operational-state'], 'ENABLED',
466 "Operational State should be 'ENABLED'")
467 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
468 "Administrative State should be 'UNLOCKED'")
470 self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
471 response = test_utils.transportpce_api_rpc_request(
472 'tapi-topology', 'get-node-details', self.node_details)
473 self.assertEqual(response['status_code'], requests.codes.ok)
474 nep_list = response['output']['node']['owned-node-edge-point']
476 self.assertEqual(nep['operational-state'], 'ENABLED',
477 "Operational State should be 'ENABLED'")
478 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
479 "Administrative State should be 'UNLOCKED'")
482 def test_25_check_update_tapi_links_ok(self):
483 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
484 response = test_utils.transportpce_api_rpc_request(
485 'tapi-topology', 'get-topology-details', self.tapi_topo)
487 link_list = response['output']['topology']['link']
488 for link in link_list:
489 self.assertEqual(link['operational-state'], 'ENABLED')
490 self.assertEqual(link['administrative-state'], 'UNLOCKED')
493 def test_26_check_update_service1_ok(self):
494 self.test_12_get_service_Ethernet()
496 def test_27_check_update_connectivity_service_Ethernet_ok(self):
497 self.test_13_get_connectivity_service_Ethernet()
499 def test_28_change_status_port_roadma_srg(self):
500 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
503 "logical-connection-point": "SRG1-PP1",
504 "port-type": "client",
505 "circuit-id": "SRG1",
506 "administrative-state": "outOfService",
507 "port-qual": "roadm-external"}]}
508 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
509 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
510 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
511 timeout=test_utils.REQUEST_TIMEOUT)
512 self.assertEqual(response.status_code, requests.codes.ok)
515 def test_29_check_update_portmapping(self):
516 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
517 self.assertEqual(response['status_code'], requests.codes.ok)
518 mapping_list = response['nodes'][0]['mapping']
519 for mapping in mapping_list:
520 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
521 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
522 "Operational State should be 'OutOfService'")
523 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
524 "Administrative State should be 'OutOfService'")
526 self.assertEqual(mapping['port-oper-state'], 'InService',
527 "Operational State should be 'InService'")
528 self.assertEqual(mapping['port-admin-state'], 'InService',
529 "Administrative State should be 'InService'")
532 def test_30_check_update_openroadm_topo(self):
533 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
534 self.assertEqual(response['status_code'], requests.codes.ok)
535 node_list = response['network'][0]['node']
537 for node in node_list:
538 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
539 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
540 tp_list = node['ietf-network-topology:termination-point']
542 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
543 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
544 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
547 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
548 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
549 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
551 link_list = response['network'][0]['ietf-network-topology:link']
552 updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
553 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
555 for link in link_list:
556 if link['link-id'] in updated_links:
557 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
558 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
561 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
562 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
563 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
566 def test_31_check_update_tapi_neps(self):
567 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
568 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
569 response = test_utils.transportpce_api_rpc_request(
570 'tapi-topology', 'get-node-details', self.node_details)
571 self.assertEqual(response['status_code'], requests.codes.ok)
572 nep_list = response['output']['node']['owned-node-edge-point']
575 if 'SRG1-PP1-TXRX' in nep['name'][0]['value']:
576 self.assertEqual(nep['operational-state'], 'DISABLED',
577 "Operational State should be 'DISABLED'")
578 self.assertEqual(nep['administrative-state'], 'LOCKED',
579 "Administrative State should be 'LOCKED'")
582 self.assertEqual(nep['operational-state'], 'ENABLED',
583 "Operational State should be 'ENABLED'")
584 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
585 "Administrative State should be 'UNLOCKED'")
586 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
589 def test_32_check_update_tapi_links(self):
590 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
591 response = test_utils.transportpce_api_rpc_request(
592 'tapi-topology', 'get-topology-details', self.tapi_topo)
594 link_list = response['output']['topology']['link']
596 for link in link_list:
597 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']):
598 self.assertEqual(link['operational-state'], 'DISABLED')
599 self.assertEqual(link['administrative-state'], 'LOCKED')
602 self.assertEqual(link['operational-state'], 'ENABLED')
603 self.assertEqual(link['administrative-state'], 'UNLOCKED')
604 self.assertEqual(nb_updated_link, 1,
605 "Only one xponder-output/input link should have been modified")
608 def test_33_check_update_service_Ethernet(self):
609 self.test_19_check_update_service_Ethernet()
611 def test_34_check_update_connectivity_service_Ethernet(self):
612 self.test_20_check_update_connectivity_service_Ethernet()
614 def test_35_restore_status_port_roadma_srg(self):
615 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
618 "logical-connection-point": "SRG1-PP1",
619 "port-type": "client",
620 "circuit-id": "SRG1",
621 "administrative-state": "inService",
622 "port-qual": "roadm-external"}]}
623 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
624 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
625 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
626 timeout=test_utils.REQUEST_TIMEOUT)
627 self.assertEqual(response.status_code, requests.codes.ok)
630 def test_36_check_update_portmapping_ok(self):
631 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
632 self.assertEqual(response['status_code'], requests.codes.ok)
633 mapping_list = response['nodes'][0]['mapping']
634 for mapping in mapping_list:
635 self.assertEqual(mapping['port-oper-state'], 'InService',
636 "Operational State should be 'InService'")
637 self.assertEqual(mapping['port-admin-state'], 'InService',
638 "Administrative State should be 'InService'")
641 def test_37_check_update_openroadm_topo_ok(self):
642 self.test_23_check_update_openroadm_topo_ok()
644 def test_38_check_update_tapi_neps_ok(self):
645 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
646 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
647 response = test_utils.transportpce_api_rpc_request(
648 'tapi-topology', 'get-node-details', self.node_details)
649 self.assertEqual(response['status_code'], requests.codes.ok)
650 nep_list = response['output']['node']['owned-node-edge-point']
652 self.assertEqual(nep['operational-state'], 'ENABLED',
653 "Operational State should be 'ENABLED'")
654 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
655 "Administrative State should be 'UNLOCKED'")
659 def test_39_check_update_tapi_links_ok(self):
660 self.test_25_check_update_tapi_links_ok()
662 def test_40_check_update_service1_ok(self):
663 self.test_12_get_service_Ethernet()
665 def test_41_check_update_connectivity_service_Ethernet_ok(self):
666 self.test_13_get_connectivity_service_Ethernet()
668 def test_42_change_status_line_port_roadma_deg(self):
669 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
672 "logical-connection-point": "DEG2-TTP-TXRX",
675 "administrative-state": "outOfService",
676 "port-qual": "roadm-external"}]}
677 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
678 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
679 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
680 timeout=test_utils.REQUEST_TIMEOUT)
681 self.assertEqual(response.status_code, requests.codes.ok)
684 def test_43_check_update_portmapping(self):
685 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
686 self.assertEqual(response['status_code'], requests.codes.ok)
687 mapping_list = response['nodes'][0]['mapping']
688 for mapping in mapping_list:
689 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
690 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
691 "Operational State should be 'OutOfService'")
692 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
693 "Administrative State should be 'OutOfService'")
695 self.assertEqual(mapping['port-oper-state'], 'InService',
696 "Operational State should be 'InService'")
697 self.assertEqual(mapping['port-admin-state'], 'InService',
698 "Administrative State should be 'InService'")
701 def test_44_check_update_openroadm_topo(self):
702 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
703 self.assertEqual(response['status_code'], requests.codes.ok)
704 node_list = response['network'][0]['node']
706 for node in node_list:
707 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
708 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
709 tp_list = node['ietf-network-topology:termination-point']
711 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
712 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
713 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
716 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
717 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
718 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
720 link_list = response['network'][0]['ietf-network-topology:link']
721 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
722 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
724 for link in link_list:
725 if link['link-id'] in updated_links:
726 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
727 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
730 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
731 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
732 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
735 def test_45_check_update_tapi_neps(self):
736 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
737 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
738 response = test_utils.transportpce_api_rpc_request(
739 'tapi-topology', 'get-node-details', self.node_details)
740 self.assertEqual(response['status_code'], requests.codes.ok)
741 nep_list = response['output']['node']['owned-node-edge-point']
744 if 'DEG2-TTP-TXRX' in nep['name'][0]['value']:
745 self.assertEqual(nep['operational-state'], 'DISABLED',
746 "Operational State should be 'DISABLED'")
747 self.assertEqual(nep['administrative-state'], 'LOCKED',
748 "Administrative State should be 'LOCKED'")
751 self.assertEqual(nep['operational-state'], 'ENABLED',
752 "Operational State should be 'ENABLED'")
753 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
754 "Administrative State should be 'UNLOCKED'")
755 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
758 def test_46_check_update_tapi_links(self):
759 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
760 response = test_utils.transportpce_api_rpc_request(
761 'tapi-topology', 'get-topology-details', self.tapi_topo)
763 self.assertEqual(response['status_code'], requests.codes.ok)
764 link_list = response['output']['topology']['link']
766 for link in link_list:
767 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']):
768 self.assertEqual(link['operational-state'], 'DISABLED')
769 self.assertEqual(link['administrative-state'], 'LOCKED')
772 self.assertEqual(link['operational-state'], 'ENABLED')
773 self.assertEqual(link['administrative-state'], 'UNLOCKED')
774 self.assertEqual(nb_updated_link, 1,
775 "Only one rdm-rdm link should have been modified")
778 def test_47_check_update_service_Ethernet(self):
779 self.test_19_check_update_service_Ethernet()
781 def test_48_check_update_connectivity_service_Ethernet(self):
782 self.test_20_check_update_connectivity_service_Ethernet()
784 def test_49_restore_status_line_port_roadma_deg(self):
785 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
788 "logical-connection-point": "DEG2-TTP-TXRX",
791 "administrative-state": "inService",
792 "port-qual": "roadm-external"}]}
793 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
794 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
795 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
796 timeout=test_utils.REQUEST_TIMEOUT)
797 self.assertEqual(response.status_code, requests.codes.ok)
800 def test_50_check_update_portmapping_ok(self):
801 self.test_36_check_update_portmapping_ok()
803 def test_51_check_update_openroadm_topo_ok(self):
804 self.test_23_check_update_openroadm_topo_ok()
806 def test_52_check_update_tapi_neps_ok(self):
807 self.test_38_check_update_tapi_neps_ok()
809 def test_53_check_update_tapi_links_ok(self):
810 self.test_25_check_update_tapi_links_ok()
812 def test_54_check_update_service1_ok(self):
813 self.test_12_get_service_Ethernet()
815 def test_55_check_update_connectivity_service_Ethernet_ok(self):
816 self.test_13_get_connectivity_service_Ethernet()
818 def test_56_change_status_port_roadma_srg(self):
819 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
822 "logical-connection-point": "SRG1-PP2",
823 "port-type": "client",
824 "circuit-id": "SRG1",
825 "administrative-state": "outOfService",
826 "port-qual": "roadm-external"}]}
827 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
828 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
829 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
830 timeout=test_utils.REQUEST_TIMEOUT)
831 self.assertEqual(response.status_code, requests.codes.ok)
834 def test_57_check_update_portmapping(self):
835 response = test_utils.get_portmapping_node_attr("ROADM-A1", None, None)
836 self.assertEqual(response['status_code'], requests.codes.ok)
837 mapping_list = response['nodes'][0]['mapping']
838 for mapping in mapping_list:
839 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
840 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
841 "Operational State should be 'OutOfService'")
842 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
843 "Administrative State should be 'OutOfService'")
845 self.assertEqual(mapping['port-oper-state'], 'InService',
846 "Operational State should be 'InService'")
847 self.assertEqual(mapping['port-admin-state'], 'InService',
848 "Administrative State should be 'InService'")
851 def test_58_check_update_openroadm_topo(self):
852 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
853 self.assertEqual(response['status_code'], requests.codes.ok)
854 node_list = response['network'][0]['node']
856 for node in node_list:
857 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
858 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
859 tp_list = node['ietf-network-topology:termination-point']
861 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
862 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
863 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
866 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
867 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
868 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
870 link_list = response['network'][0]['ietf-network-topology:link']
872 for link in link_list:
873 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
874 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
875 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
878 def test_59_check_update_tapi_neps(self):
879 self.node_details["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
880 self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
881 response = test_utils.transportpce_api_rpc_request(
882 'tapi-topology', 'get-node-details', self.node_details)
883 self.assertEqual(response['status_code'], requests.codes.ok)
884 nep_list = response['output']['node']['owned-node-edge-point']
887 if 'SRG1-PP2-TXRX' in nep['name'][0]['value']:
888 self.assertEqual(nep['operational-state'], 'DISABLED',
889 "Operational State should be 'DISABLED'")
890 self.assertEqual(nep['administrative-state'], 'LOCKED',
891 "Administrative State should be 'LOCKED'")
894 self.assertEqual(nep['operational-state'], 'ENABLED',
895 "Operational State should be 'ENABLED'")
896 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
897 "Administrative State should be 'UNLOCKED'")
898 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
901 def test_60_check_update_tapi_links(self):
902 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
903 response = test_utils.transportpce_api_rpc_request(
904 'tapi-topology', 'get-topology-details', self.tapi_topo)
906 self.assertEqual(response['status_code'], requests.codes.ok)
907 link_list = response['output']['topology']['link']
909 for link in link_list:
910 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']):
911 self.assertEqual(link['operational-state'], 'DISABLED')
912 self.assertEqual(link['administrative-state'], 'LOCKED')
915 self.assertEqual(link['operational-state'], 'ENABLED')
916 self.assertEqual(link['administrative-state'], 'UNLOCKED')
917 self.assertEqual(nb_updated_link, 0,
918 "No link should have been modified")
921 def test_61_check_update_service1_ok(self):
922 self.test_12_get_service_Ethernet()
924 def test_62_check_update_connectivity_service_Ethernet_ok(self):
925 self.test_13_get_connectivity_service_Ethernet()
927 def test_63_delete_connectivity_service_Ethernet(self):
928 self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.eth)
929 response = test_utils.transportpce_api_rpc_request(
930 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
931 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
932 time.sleep(self.WAITING)
934 def test_64_disconnect_xponders_from_roadm(self):
935 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
936 self.assertEqual(response['status_code'], requests.codes.ok)
937 links = response['network'][0]['ietf-network-topology:link']
939 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
940 response = test_utils.del_ietf_network_link_request(
941 'openroadm-topology', link['link-id'], 'config')
942 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
944 def test_65_disconnect_XPDRA(self):
945 response = test_utils.unmount_device("XPDR-A1")
946 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
948 def test_66_disconnect_XPDRC(self):
949 response = test_utils.unmount_device("XPDR-C1")
950 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
952 def test_67_disconnect_ROADMA(self):
953 response = test_utils.unmount_device("ROADM-A1")
954 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
956 def test_68_disconnect_ROADMC(self):
957 response = test_utils.unmount_device("ROADM-C1")
958 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
961 if __name__ == "__main__":
962 unittest.main(verbosity=2)