2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
25 # pylint: disable=too-few-public-methods
30 # pylint: disable=invalid-name
37 class TransportPCEFulltesting(unittest.TestCase):
39 cr_serv_sample_data = {
43 "layer-protocol-name": "DSR",
44 "service-interface-point": {
45 "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
47 "administrative-state": "UNLOCKED",
48 "operational-state": "ENABLED",
49 "direction": "BIDIRECTIONAL",
51 "protection-role": "WORK",
52 "local-id": "XPDR-C1-XPDR1",
55 "value-name": "OpenROADM node id",
56 "value": "XPDR-C1-XPDR1"
61 "layer-protocol-name": "DSR",
62 "service-interface-point": {
63 "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
65 "administrative-state": "UNLOCKED",
66 "operational-state": "ENABLED",
67 "direction": "BIDIRECTIONAL",
69 "protection-role": "WORK",
70 "local-id": "XPDR-A1-XPDR1",
73 "value-name": "OpenROADM node id",
74 "value": "XPDR-A1-XPDR1"
79 "connectivity-constraint": {
80 "service-layer": "ETH",
81 "service-type": "POINT_TO_POINT_CONNECTIVITY",
82 "service-level": "Some service-level",
83 "requested-capacity": {
95 uuid_services = UuidServices()
96 WAITING = 25 # nominal value is 300
97 NODE_VERSION_221 = '2.2.1'
101 # pylint: disable=unsubscriptable-object
102 cls.init_failed = False
103 os.environ['JAVA_MIN_MEM'] = '1024M'
104 os.environ['JAVA_MAX_MEM'] = '4096M'
105 cls.processes = test_utils.start_tpce()
106 # TAPI feature is not installed by default in Karaf
107 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
108 print("installing tapi feature...")
109 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
110 if result.returncode != 0:
111 cls.init_failed = True
112 print("Restarting OpenDaylight...")
113 test_utils.shutdown_process(cls.processes[0])
114 cls.processes[0] = test_utils.start_karaf()
115 test_utils.process_list[0] = cls.processes[0]
116 cls.init_failed = not test_utils.wait_until_log_contains(
117 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
119 print("tapi installation feature failed...")
120 test_utils.shutdown_process(cls.processes[0])
122 cls.processes = test_utils.start_tpce()
123 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
124 ('roadma', cls.NODE_VERSION_221),
125 ('roadmc', cls.NODE_VERSION_221),
126 ('xpdrc', cls.NODE_VERSION_221)])
129 def tearDownClass(cls):
130 # pylint: disable=not-an-iterable
131 for process in cls.processes:
132 test_utils.shutdown_process(process)
133 print("all processes killed")
136 def setUp(self): # instruction executed before each test method
137 # pylint: disable=consider-using-f-string
138 print("execution of {}".format(self.id().split(".")[-1]))
140 def test_01_connect_xpdrA(self):
141 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
142 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_02_connect_xpdrC(self):
145 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_03_connect_rdmA(self):
149 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_04_connect_rdmC(self):
153 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
157 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
158 "ROADM-A1", "1", "SRG1-PP1-TXRX")
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
164 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
165 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
166 "ROADM-A1", "1", "SRG1-PP1-TXRX")
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
172 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
173 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
174 "ROADM-C1", "1", "SRG1-PP1-TXRX")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
180 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
181 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
182 "ROADM-C1", "1", "SRG1-PP1-TXRX")
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
188 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
189 # Config ROADMA-ROADMC oms-attributes
191 "auto-spanloss": "true",
192 "spanloss-base": 11.4,
193 "spanloss-current": 12,
194 "engineered-spanloss": 12.2,
195 "link-concatenation": [{
198 "SRLG-length": 100000,
200 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
201 self.assertEqual(response.status_code, requests.codes.created)
203 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
204 # Config ROADMC-ROADMA oms-attributes
206 "auto-spanloss": "true",
207 "spanloss-base": 11.4,
208 "spanloss-current": 12,
209 "engineered-spanloss": 12.2,
210 "link-concatenation": [{
213 "SRLG-length": 100000,
215 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
216 self.assertEqual(response.status_code, requests.codes.created)
218 # test service-create for Eth service from xpdr to xpdr
219 def test_11_create_connectivity_service_Ethernet(self):
220 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
221 time.sleep(self.WAITING)
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 self.uuid_services.eth = res['output']['service']['uuid']
225 # pylint: disable=consider-using-f-string
227 input_dict_1 = {'administrative-state': 'LOCKED',
228 'lifecycle-state': 'PLANNED',
229 'operational-state': 'DISABLED',
230 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
231 'service-layer': 'ETH',
232 'connectivity-direction': 'BIDIRECTIONAL'
234 input_dict_2 = {'value-name': 'OpenROADM node id',
235 'value': 'XPDR-C1-XPDR1'}
236 input_dict_3 = {'value-name': 'OpenROADM node id',
237 'value': 'XPDR-A1-XPDR1'}
239 self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
240 res['output']['service'])
241 self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
242 res['output']['service']['end-point'][0]['name'][0])
243 self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
244 res['output']['service']['end-point'][1]['name'][0])
245 # If the gate fails is because of the waiting time not being enough
246 time.sleep(self.WAITING)
248 def test_12_get_service_Ethernet(self):
249 response = test_utils.get_service_list_request("services/" + str(self.uuid_services.eth))
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
253 res['services'][0]['administrative-state'], 'inService')
255 res['services'][0]['service-name'], self.uuid_services.eth)
257 res['services'][0]['connection-type'], 'service')
259 res['services'][0]['lifecycle-state'], 'planned')
262 def test_13_get_connectivity_service_Ethernet(self):
263 response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth))
264 self.assertEqual(response.status_code, requests.codes.ok)
265 res = response.json()
267 res['output']['service']['operational-state'], 'ENABLED')
269 res['output']['service']['name'][0]['value'], self.uuid_services.eth)
271 res['output']['service']['administrative-state'], 'UNLOCKED')
273 res['output']['service']['lifecycle-state'], 'INSTALLED')
276 def test_14_change_status_line_port_xpdrc(self):
277 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
281 "administrative-state": "outOfService",
282 "port-qual": "xpdr-network"}]}
283 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
284 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
285 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
286 timeout=test_utils.REQUEST_TIMEOUT)
287 self.assertEqual(response.status_code, requests.codes.ok)
290 def test_15_check_update_portmapping(self):
291 response = test_utils.portmapping_request("XPDR-C1")
292 self.assertEqual(response.status_code, requests.codes.ok)
293 res = response.json()
294 mapping_list = res['nodes'][0]['mapping']
295 for mapping in mapping_list:
296 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
297 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
298 "Operational State should be 'OutOfService'")
299 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
300 "Administrative State should be 'OutOfService'")
302 self.assertEqual(mapping['port-oper-state'], 'InService',
303 "Operational State should be 'InService'")
304 self.assertEqual(mapping['port-admin-state'], 'InService',
305 "Administrative State should be 'InService'")
308 def test_16_check_update_openroadm_topo(self):
309 url = test_utils.URL_CONFIG_ORDM_TOPO
310 response = test_utils.get_request(url)
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
313 node_list = res['network'][0]['node']
315 for node in node_list:
316 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
317 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
318 tp_list = node['ietf-network-topology:termination-point']
320 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
321 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
322 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
325 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
326 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
327 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
329 link_list = res['network'][0]['ietf-network-topology:link']
330 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
331 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
333 for link in link_list:
334 if link['link-id'] in updated_links:
335 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
336 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
339 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
340 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
341 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
344 def test_17_check_update_tapi_neps(self):
345 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi")
346 self.assertEqual(response.status_code, requests.codes.ok)
347 res = response.json()
348 nep_list = res['output']['node']['owned-node-edge-point']
351 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
352 self.assertEqual(nep['operational-state'], 'DISABLED',
353 "Operational State should be 'DISABLED'")
354 self.assertEqual(nep['administrative-state'], 'LOCKED',
355 "Administrative State should be 'LOCKED'")
358 self.assertEqual(nep['operational-state'], 'ENABLED',
359 "Operational State should be 'ENABLED'")
360 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
361 "Administrative State should be 'UNLOCKED'")
362 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR")
363 self.assertEqual(response.status_code, requests.codes.ok)
364 res = response.json()
365 nep_list = res['output']['node']['owned-node-edge-point']
367 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
368 self.assertEqual(nep['operational-state'], 'DISABLED',
369 "Operational State should be 'DISABLED'")
370 self.assertEqual(nep['administrative-state'], 'LOCKED',
371 "Administrative State should be 'LOCKED'")
374 self.assertEqual(nep['operational-state'], 'ENABLED',
375 "Operational State should be 'ENABLED'")
376 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
377 "Administrative State should be 'UNLOCKED'")
378 self.assertEqual(nb_updated_neps, 4, "Only two xponder neps should have been modified")
381 def test_18_check_update_tapi_links(self):
382 response = test_utils.tapi_get_topology_details_request("T0 - Full Multi-layer topology")
384 self.assertEqual(response.status_code, requests.codes.ok)
385 res = response.json()
386 link_list = res['output']['topology']['link']
388 for link in link_list:
389 if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']):
390 self.assertEqual(link['operational-state'], 'DISABLED')
391 self.assertEqual(link['administrative-state'], 'LOCKED')
394 self.assertEqual(link['operational-state'], 'ENABLED')
395 self.assertEqual(link['administrative-state'], 'UNLOCKED')
396 self.assertEqual(nb_updated_link, 2,
397 "Only two xponder-output/input & xponder-transi links should have been modified")
400 def test_19_check_update_service_Ethernet(self):
401 response = test_utils.get_service_list_request(
402 "services/" + str(self.uuid_services.eth))
403 self.assertEqual(response.status_code, requests.codes.ok)
404 res = response.json()
405 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
406 self.assertEqual(res['services'][0]['administrative-state'], 'inService')
409 def test_20_check_update_connectivity_service_Ethernet(self):
410 response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth))
411 self.assertEqual(response.status_code, requests.codes.ok)
412 res = response.json()
414 res['output']['service']['operational-state'], 'DISABLED')
416 res['output']['service']['administrative-state'], 'LOCKED')
419 def test_21_restore_status_line_port_xpdrc(self):
420 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
424 "administrative-state": "inService",
425 "port-qual": "xpdr-network"}]}
426 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
427 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
428 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
429 timeout=test_utils.REQUEST_TIMEOUT)
430 self.assertEqual(response.status_code, requests.codes.ok)
433 def test_22_check_update_portmapping_ok(self):
434 response = test_utils.portmapping_request("XPDR-C1")
435 self.assertEqual(response.status_code, requests.codes.ok)
436 res = response.json()
437 mapping_list = res['nodes'][0]['mapping']
438 for mapping in mapping_list:
439 self.assertEqual(mapping['port-oper-state'], 'InService',
440 "Operational State should be 'InService'")
441 self.assertEqual(mapping['port-admin-state'], 'InService',
442 "Administrative State should be 'InService'")
445 def test_23_check_update_openroadm_topo_ok(self):
446 url = test_utils.URL_CONFIG_ORDM_TOPO
447 response = test_utils.get_request(url)
448 self.assertEqual(response.status_code, requests.codes.ok)
449 res = response.json()
450 node_list = res['network'][0]['node']
451 for node in node_list:
452 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
453 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
454 tp_list = node['ietf-network-topology:termination-point']
456 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
457 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
459 link_list = res['network'][0]['ietf-network-topology:link']
460 for link in link_list:
461 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
462 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
465 def test_24_check_update_tapi_neps_ok(self):
466 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi")
467 self.assertEqual(response.status_code, requests.codes.ok)
468 res = response.json()
469 nep_list = res['output']['node']['owned-node-edge-point']
471 self.assertEqual(nep['operational-state'], 'ENABLED',
472 "Operational State should be 'ENABLED'")
473 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
474 "Administrative State should be 'UNLOCKED'")
476 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR")
477 self.assertEqual(response.status_code, requests.codes.ok)
478 res = response.json()
479 nep_list = res['output']['node']['owned-node-edge-point']
481 self.assertEqual(nep['operational-state'], 'ENABLED',
482 "Operational State should be 'ENABLED'")
483 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
484 "Administrative State should be 'UNLOCKED'")
487 def test_25_check_update_tapi_links_ok(self):
488 response = test_utils.tapi_get_topology_details_request(
489 "T0 - Full Multi-layer topology")
491 self.assertEqual(response.status_code, requests.codes.ok)
492 res = response.json()
493 link_list = res['output']['topology']['link']
494 for link in link_list:
495 self.assertEqual(link['operational-state'], 'ENABLED')
496 self.assertEqual(link['administrative-state'], 'UNLOCKED')
500 def test_26_check_update_service1_ok(self):
501 self.test_12_get_service_Ethernet()
503 def test_27_check_update_connectivity_service_Ethernet_ok(self):
504 self.test_13_get_connectivity_service_Ethernet()
506 def test_28_change_status_port_roadma_srg(self):
507 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
510 "logical-connection-point": "SRG1-PP1",
511 "port-type": "client",
512 "circuit-id": "SRG1",
513 "administrative-state": "outOfService",
514 "port-qual": "roadm-external"}]}
515 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
516 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
517 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
518 timeout=test_utils.REQUEST_TIMEOUT)
519 self.assertEqual(response.status_code, requests.codes.ok)
522 def test_29_check_update_portmapping(self):
523 response = test_utils.portmapping_request("ROADM-A1")
524 self.assertEqual(response.status_code, requests.codes.ok)
525 res = response.json()
526 mapping_list = res['nodes'][0]['mapping']
527 for mapping in mapping_list:
528 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
529 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
530 "Operational State should be 'OutOfService'")
531 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
532 "Administrative State should be 'OutOfService'")
534 self.assertEqual(mapping['port-oper-state'], 'InService',
535 "Operational State should be 'InService'")
536 self.assertEqual(mapping['port-admin-state'], 'InService',
537 "Administrative State should be 'InService'")
540 def test_30_check_update_openroadm_topo(self):
541 url = test_utils.URL_CONFIG_ORDM_TOPO
542 response = test_utils.get_request(url)
543 self.assertEqual(response.status_code, requests.codes.ok)
544 res = response.json()
545 node_list = res['network'][0]['node']
547 for node in node_list:
548 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
549 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
550 tp_list = node['ietf-network-topology:termination-point']
552 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
553 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
554 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
557 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
558 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
559 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
561 link_list = res['network'][0]['ietf-network-topology:link']
562 updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
563 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
565 for link in link_list:
566 if link['link-id'] in updated_links:
567 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
568 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
571 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
572 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
573 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
576 def test_31_check_update_tapi_neps(self):
577 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
578 self.assertEqual(response.status_code, requests.codes.ok)
579 res = response.json()
580 nep_list = res['output']['node']['owned-node-edge-point']
583 if 'SRG1-PP1-TXRX' in nep['name'][0]['value']:
584 self.assertEqual(nep['operational-state'], 'DISABLED',
585 "Operational State should be 'DISABLED'")
586 self.assertEqual(nep['administrative-state'], 'LOCKED',
587 "Administrative State should be 'LOCKED'")
590 self.assertEqual(nep['operational-state'], 'ENABLED',
591 "Operational State should be 'ENABLED'")
592 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
593 "Administrative State should be 'UNLOCKED'")
594 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
597 def test_32_check_update_tapi_links(self):
598 response = test_utils.tapi_get_topology_details_request(
599 "T0 - Full Multi-layer topology")
601 self.assertEqual(response.status_code, requests.codes.ok)
602 res = response.json()
603 link_list = res['output']['topology']['link']
605 for link in link_list:
606 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']):
607 self.assertEqual(link['operational-state'], 'DISABLED')
608 self.assertEqual(link['administrative-state'], 'LOCKED')
611 self.assertEqual(link['operational-state'], 'ENABLED')
612 self.assertEqual(link['administrative-state'], 'UNLOCKED')
613 self.assertEqual(nb_updated_link, 1,
614 "Only one xponder-output/input link should have been modified")
617 def test_33_check_update_service_Ethernet(self):
618 self.test_19_check_update_service_Ethernet()
620 def test_34_check_update_connectivity_service_Ethernet(self):
621 self.test_20_check_update_connectivity_service_Ethernet()
623 def test_35_restore_status_port_roadma_srg(self):
624 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
627 "logical-connection-point": "SRG1-PP1",
628 "port-type": "client",
629 "circuit-id": "SRG1",
630 "administrative-state": "inService",
631 "port-qual": "roadm-external"}]}
632 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
633 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
634 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
635 timeout=test_utils.REQUEST_TIMEOUT)
636 self.assertEqual(response.status_code, requests.codes.ok)
639 def test_36_check_update_portmapping_ok(self):
640 response = test_utils.portmapping_request("ROADM-A1")
641 self.assertEqual(response.status_code, requests.codes.ok)
642 res = response.json()
643 mapping_list = res['nodes'][0]['mapping']
644 for mapping in mapping_list:
645 self.assertEqual(mapping['port-oper-state'], 'InService',
646 "Operational State should be 'InService'")
647 self.assertEqual(mapping['port-admin-state'], 'InService',
648 "Administrative State should be 'InService'")
651 def test_37_check_update_openroadm_topo_ok(self):
652 self.test_23_check_update_openroadm_topo_ok()
654 def test_38_check_update_tapi_neps_ok(self):
655 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
656 self.assertEqual(response.status_code, requests.codes.ok)
657 res = response.json()
658 nep_list = res['output']['node']['owned-node-edge-point']
660 self.assertEqual(nep['operational-state'], 'ENABLED',
661 "Operational State should be 'ENABLED'")
662 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
663 "Administrative State should be 'UNLOCKED'")
667 def test_39_check_update_tapi_links_ok(self):
668 self.test_25_check_update_tapi_links_ok()
670 def test_40_check_update_service1_ok(self):
671 self.test_12_get_service_Ethernet()
673 def test_41_check_update_connectivity_service_Ethernet_ok(self):
674 self.test_13_get_connectivity_service_Ethernet()
676 def test_42_change_status_line_port_roadma_deg(self):
677 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
680 "logical-connection-point": "DEG2-TTP-TXRX",
683 "administrative-state": "outOfService",
684 "port-qual": "roadm-external"}]}
685 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
686 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
687 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
688 timeout=test_utils.REQUEST_TIMEOUT)
689 self.assertEqual(response.status_code, requests.codes.ok)
692 def test_43_check_update_portmapping(self):
693 response = test_utils.portmapping_request("ROADM-A1")
694 self.assertEqual(response.status_code, requests.codes.ok)
695 res = response.json()
696 mapping_list = res['nodes'][0]['mapping']
697 for mapping in mapping_list:
698 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
699 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
700 "Operational State should be 'OutOfService'")
701 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
702 "Administrative State should be 'OutOfService'")
704 self.assertEqual(mapping['port-oper-state'], 'InService',
705 "Operational State should be 'InService'")
706 self.assertEqual(mapping['port-admin-state'], 'InService',
707 "Administrative State should be 'InService'")
710 def test_44_check_update_openroadm_topo(self):
711 url = test_utils.URL_CONFIG_ORDM_TOPO
712 response = test_utils.get_request(url)
713 self.assertEqual(response.status_code, requests.codes.ok)
714 res = response.json()
715 node_list = res['network'][0]['node']
717 for node in node_list:
718 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
719 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
720 tp_list = node['ietf-network-topology:termination-point']
722 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
723 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
724 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
727 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
728 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
729 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
731 link_list = res['network'][0]['ietf-network-topology:link']
732 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
733 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
735 for link in link_list:
736 if link['link-id'] in updated_links:
737 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
738 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
741 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
742 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
743 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
746 def test_45_check_update_tapi_neps(self):
747 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
748 self.assertEqual(response.status_code, requests.codes.ok)
749 res = response.json()
750 nep_list = res['output']['node']['owned-node-edge-point']
753 if 'DEG2-TTP-TXRX' in nep['name'][0]['value']:
754 self.assertEqual(nep['operational-state'], 'DISABLED',
755 "Operational State should be 'DISABLED'")
756 self.assertEqual(nep['administrative-state'], 'LOCKED',
757 "Administrative State should be 'LOCKED'")
760 self.assertEqual(nep['operational-state'], 'ENABLED',
761 "Operational State should be 'ENABLED'")
762 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
763 "Administrative State should be 'UNLOCKED'")
764 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
767 def test_46_check_update_tapi_links(self):
768 response = test_utils.tapi_get_topology_details_request(
769 "T0 - Full Multi-layer topology")
771 self.assertEqual(response.status_code, requests.codes.ok)
772 res = response.json()
773 link_list = res['output']['topology']['link']
775 for link in link_list:
776 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']):
777 self.assertEqual(link['operational-state'], 'DISABLED')
778 self.assertEqual(link['administrative-state'], 'LOCKED')
781 self.assertEqual(link['operational-state'], 'ENABLED')
782 self.assertEqual(link['administrative-state'], 'UNLOCKED')
783 self.assertEqual(nb_updated_link, 1,
784 "Only one rdm-rdm link should have been modified")
787 def test_47_check_update_service_Ethernet(self):
788 self.test_19_check_update_service_Ethernet()
790 def test_48_check_update_connectivity_service_Ethernet(self):
791 self.test_20_check_update_connectivity_service_Ethernet()
793 def test_49_restore_status_line_port_roadma_deg(self):
794 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
797 "logical-connection-point": "DEG2-TTP-TXRX",
800 "administrative-state": "inService",
801 "port-qual": "roadm-external"}]}
802 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
803 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
804 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
805 timeout=test_utils.REQUEST_TIMEOUT)
806 self.assertEqual(response.status_code, requests.codes.ok)
809 def test_50_check_update_portmapping_ok(self):
810 self.test_36_check_update_portmapping_ok()
812 def test_51_check_update_openroadm_topo_ok(self):
813 self.test_23_check_update_openroadm_topo_ok()
815 def test_52_check_update_tapi_neps_ok(self):
816 self.test_38_check_update_tapi_neps_ok()
818 def test_53_check_update_tapi_links_ok(self):
819 self.test_25_check_update_tapi_links_ok()
821 def test_54_check_update_service1_ok(self):
822 self.test_12_get_service_Ethernet()
824 def test_55_check_update_connectivity_service_Ethernet_ok(self):
825 self.test_13_get_connectivity_service_Ethernet()
827 def test_56_change_status_port_roadma_srg(self):
828 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
831 "logical-connection-point": "SRG1-PP2",
832 "port-type": "client",
833 "circuit-id": "SRG1",
834 "administrative-state": "outOfService",
835 "port-qual": "roadm-external"}]}
836 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
837 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
838 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
839 timeout=test_utils.REQUEST_TIMEOUT)
840 self.assertEqual(response.status_code, requests.codes.ok)
843 def test_57_check_update_portmapping(self):
844 response = test_utils.portmapping_request("ROADM-A1")
845 self.assertEqual(response.status_code, requests.codes.ok)
846 res = response.json()
847 mapping_list = res['nodes'][0]['mapping']
848 for mapping in mapping_list:
849 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
850 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
851 "Operational State should be 'OutOfService'")
852 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
853 "Administrative State should be 'OutOfService'")
855 self.assertEqual(mapping['port-oper-state'], 'InService',
856 "Operational State should be 'InService'")
857 self.assertEqual(mapping['port-admin-state'], 'InService',
858 "Administrative State should be 'InService'")
861 def test_58_check_update_openroadm_topo(self):
862 url = test_utils.URL_CONFIG_ORDM_TOPO
863 response = test_utils.get_request(url)
864 self.assertEqual(response.status_code, requests.codes.ok)
865 res = response.json()
866 node_list = res['network'][0]['node']
868 for node in node_list:
869 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
870 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
871 tp_list = node['ietf-network-topology:termination-point']
873 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
874 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
875 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
878 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
879 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
880 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
882 link_list = res['network'][0]['ietf-network-topology:link']
884 for link in link_list:
885 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
886 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
887 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
890 def test_59_check_update_tapi_neps(self):
891 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
892 self.assertEqual(response.status_code, requests.codes.ok)
893 res = response.json()
894 nep_list = res['output']['node']['owned-node-edge-point']
897 if 'SRG1-PP2-TXRX' in nep['name'][0]['value']:
898 self.assertEqual(nep['operational-state'], 'DISABLED',
899 "Operational State should be 'DISABLED'")
900 self.assertEqual(nep['administrative-state'], 'LOCKED',
901 "Administrative State should be 'LOCKED'")
904 self.assertEqual(nep['operational-state'], 'ENABLED',
905 "Operational State should be 'ENABLED'")
906 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
907 "Administrative State should be 'UNLOCKED'")
908 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
911 def test_60_check_update_tapi_links(self):
912 response = test_utils.tapi_get_topology_details_request(
913 "T0 - Full Multi-layer topology")
915 self.assertEqual(response.status_code, requests.codes.ok)
916 res = response.json()
917 link_list = res['output']['topology']['link']
919 for link in link_list:
920 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']):
921 self.assertEqual(link['operational-state'], 'DISABLED')
922 self.assertEqual(link['administrative-state'], 'LOCKED')
925 self.assertEqual(link['operational-state'], 'ENABLED')
926 self.assertEqual(link['administrative-state'], 'UNLOCKED')
927 self.assertEqual(nb_updated_link, 0,
928 "No link should have been modified")
931 def test_61_check_update_service1_ok(self):
932 self.test_12_get_service_Ethernet()
934 def test_62_check_update_connectivity_service_Ethernet_ok(self):
935 self.test_13_get_connectivity_service_Ethernet()
937 def test_63_delete_connectivity_service_Ethernet(self):
938 response = test_utils.tapi_delete_connectivity_request(str(self.uuid_services.eth))
939 self.assertEqual(response.status_code, requests.codes.no_content)
940 time.sleep(self.WAITING)
942 def test_64_disconnect_xponders_from_roadm(self):
943 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
944 response = test_utils.get_ordm_topo_request("")
945 self.assertEqual(response.status_code, requests.codes.ok)
946 res = response.json()
947 links = res['network'][0]['ietf-network-topology:link']
949 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
950 link_name = link["link-id"]
951 response = test_utils.delete_request(url+link_name)
952 self.assertEqual(response.status_code, requests.codes.ok)
954 def test_65_disconnect_XPDRA(self):
955 response = test_utils.unmount_device("XPDR-A1")
956 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
958 def test_66_disconnect_XPDRC(self):
959 response = test_utils.unmount_device("XPDR-C1")
960 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
962 def test_67_disconnect_ROADMA(self):
963 response = test_utils.unmount_device("ROADM-A1")
964 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
966 def test_68_disconnect_ROADMC(self):
967 response = test_utils.unmount_device("ROADM-C1")
968 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
971 if __name__ == "__main__":
972 unittest.main(verbosity=2)