2 ##############################################################################
3 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
25 # pylint: disable=too-few-public-methods
30 # pylint: disable=invalid-name
37 class TransportPCEFulltesting(unittest.TestCase):
39 cr_serv_sample_data = {
43 "layer-protocol-name": "DSR",
44 "service-interface-point": {
45 "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
47 "administrative-state": "UNLOCKED",
48 "operational-state": "ENABLED",
49 "direction": "BIDIRECTIONAL",
51 "protection-role": "WORK",
52 "local-id": "XPDR-C1-XPDR1",
55 "value-name": "OpenROADM node id",
56 "value": "XPDR-C1-XPDR1"
61 "layer-protocol-name": "DSR",
62 "service-interface-point": {
63 "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
65 "administrative-state": "UNLOCKED",
66 "operational-state": "ENABLED",
67 "direction": "BIDIRECTIONAL",
69 "protection-role": "WORK",
70 "local-id": "XPDR-A1-XPDR1",
73 "value-name": "OpenROADM node id",
74 "value": "XPDR-A1-XPDR1"
79 "connectivity-constraint": {
80 "service-layer": "ETH",
81 "service-type": "POINT_TO_POINT_CONNECTIVITY",
82 "service-level": "Some service-level",
83 "requested-capacity": {
95 uuid_services = UuidServices()
96 WAITING = 25 # nominal value is 300
97 NODE_VERSION_221 = '2.2.1'
101 # pylint: disable=unsubscriptable-object
102 cls.init_failed = False
103 os.environ['JAVA_MIN_MEM'] = '1024M'
104 os.environ['JAVA_MAX_MEM'] = '4096M'
105 cls.processes = test_utils.start_tpce()
106 # TAPI feature is not installed by default in Karaf
107 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
108 print("installing tapi feature...")
109 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
110 if result.returncode != 0:
111 cls.init_failed = True
112 print("Restarting OpenDaylight...")
113 test_utils.shutdown_process(cls.processes[0])
114 cls.processes[0] = test_utils.start_karaf()
115 test_utils.process_list[0] = cls.processes[0]
116 cls.init_failed = not test_utils.wait_until_log_contains(
117 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
119 print("tapi installation feature failed...")
120 test_utils.shutdown_process(cls.processes[0])
122 cls.processes = test_utils.start_tpce()
123 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
124 ('roadma', cls.NODE_VERSION_221),
125 ('roadmc', cls.NODE_VERSION_221),
126 ('xpdrc', cls.NODE_VERSION_221)])
129 def tearDownClass(cls):
130 # pylint: disable=not-an-iterable
131 for process in cls.processes:
132 test_utils.shutdown_process(process)
133 print("all processes killed")
136 def setUp(self): # instruction executed before each test method
137 # pylint: disable=consider-using-f-string
138 print("execution of {}".format(self.id().split(".")[-1]))
140 def test_01_connect_xpdrA(self):
141 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
142 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_02_connect_xpdrC(self):
145 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_03_connect_rdmA(self):
149 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_04_connect_rdmC(self):
153 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
157 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
158 "ROADM-A1", "1", "SRG1-PP1-TXRX")
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
164 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
165 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
166 "ROADM-A1", "1", "SRG1-PP1-TXRX")
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
172 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
173 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
174 "ROADM-C1", "1", "SRG1-PP1-TXRX")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
180 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
181 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
182 "ROADM-C1", "1", "SRG1-PP1-TXRX")
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
188 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
189 # Config ROADMA-ROADMC oms-attributes
191 "auto-spanloss": "true",
192 "spanloss-base": 11.4,
193 "spanloss-current": 12,
194 "engineered-spanloss": 12.2,
195 "link-concatenation": [{
198 "SRLG-length": 100000,
200 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
201 self.assertEqual(response.status_code, requests.codes.created)
203 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
204 # Config ROADMC-ROADMA oms-attributes
206 "auto-spanloss": "true",
207 "spanloss-base": 11.4,
208 "spanloss-current": 12,
209 "engineered-spanloss": 12.2,
210 "link-concatenation": [{
213 "SRLG-length": 100000,
215 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
216 self.assertEqual(response.status_code, requests.codes.created)
218 # test service-create for Eth service from xpdr to xpdr
219 def test_11_create_connectivity_service_Ethernet(self):
220 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
221 time.sleep(self.WAITING)
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 self.uuid_services.eth = res['output']['service']['uuid']
225 # pylint: disable=consider-using-f-string
227 input_dict_1 = {'administrative-state': 'LOCKED',
228 'lifecycle-state': 'PLANNED',
229 'operational-state': 'DISABLED',
230 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
231 'service-layer': 'ETH',
232 'connectivity-direction': 'BIDIRECTIONAL'
234 input_dict_2 = {'value-name': 'OpenROADM node id',
235 'value': 'XPDR-C1-XPDR1'}
236 input_dict_3 = {'value-name': 'OpenROADM node id',
237 'value': 'XPDR-A1-XPDR1'}
239 self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
240 res['output']['service'])
241 self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
242 res['output']['service']['end-point'][0]['name'][0])
243 self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
244 res['output']['service']['end-point'][1]['name'][0])
245 # If the gate fails is because of the waiting time not being enough
246 time.sleep(self.WAITING)
248 def test_12_get_service_Ethernet(self):
249 response = test_utils.get_service_list_request("services/" + str(self.uuid_services.eth))
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
253 res['services'][0]['administrative-state'], 'inService')
255 res['services'][0]['service-name'], self.uuid_services.eth)
257 res['services'][0]['connection-type'], 'service')
259 res['services'][0]['lifecycle-state'], 'planned')
262 def test_13_get_connectivity_service_Ethernet(self):
263 response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth))
264 self.assertEqual(response.status_code, requests.codes.ok)
265 res = response.json()
267 res['output']['service']['operational-state'], 'ENABLED')
269 res['output']['service']['name'][0]['value'], self.uuid_services.eth)
271 res['output']['service']['administrative-state'], 'UNLOCKED')
273 res['output']['service']['lifecycle-state'], 'INSTALLED')
276 def test_14_change_status_line_port_xpdrc(self):
277 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
281 "administrative-state": "outOfService",
282 "port-qual": "xpdr-network"}]}
283 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
284 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
285 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
286 self.assertEqual(response.status_code, requests.codes.ok)
289 def test_15_check_update_portmapping(self):
290 response = test_utils.portmapping_request("XPDR-C1")
291 self.assertEqual(response.status_code, requests.codes.ok)
292 res = response.json()
293 mapping_list = res['nodes'][0]['mapping']
294 for mapping in mapping_list:
295 if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
296 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
297 "Operational State should be 'OutOfService'")
298 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
299 "Administrative State should be 'OutOfService'")
301 self.assertEqual(mapping['port-oper-state'], 'InService',
302 "Operational State should be 'InService'")
303 self.assertEqual(mapping['port-admin-state'], 'InService',
304 "Administrative State should be 'InService'")
307 def test_16_check_update_openroadm_topo(self):
308 url = test_utils.URL_CONFIG_ORDM_TOPO
309 response = test_utils.get_request(url)
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 node_list = res['network'][0]['node']
314 for node in node_list:
315 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
316 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
317 tp_list = node['ietf-network-topology:termination-point']
319 if node['node-id'] == 'XPDR-C1-XPDR1' and tp['tp-id'] == 'XPDR1-NETWORK1':
320 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
321 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
324 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
325 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
326 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
328 link_list = res['network'][0]['ietf-network-topology:link']
329 updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
330 'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
332 for link in link_list:
333 if link['link-id'] in updated_links:
334 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
335 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
338 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
339 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
340 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
343 def test_17_check_update_tapi_neps(self):
344 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi")
345 self.assertEqual(response.status_code, requests.codes.ok)
346 res = response.json()
347 nep_list = res['output']['node']['owned-node-edge-point']
350 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
351 self.assertEqual(nep['operational-state'], 'DISABLED',
352 "Operational State should be 'DISABLED'")
353 self.assertEqual(nep['administrative-state'], 'LOCKED',
354 "Administrative State should be 'LOCKED'")
357 self.assertEqual(nep['operational-state'], 'ENABLED',
358 "Operational State should be 'ENABLED'")
359 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
360 "Administrative State should be 'UNLOCKED'")
361 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR")
362 self.assertEqual(response.status_code, requests.codes.ok)
363 res = response.json()
364 nep_list = res['output']['node']['owned-node-edge-point']
366 if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
367 self.assertEqual(nep['operational-state'], 'DISABLED',
368 "Operational State should be 'DISABLED'")
369 self.assertEqual(nep['administrative-state'], 'LOCKED',
370 "Administrative State should be 'LOCKED'")
373 self.assertEqual(nep['operational-state'], 'ENABLED',
374 "Operational State should be 'ENABLED'")
375 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
376 "Administrative State should be 'UNLOCKED'")
377 self.assertEqual(nb_updated_neps, 4, "Only two xponder neps should have been modified")
380 def test_18_check_update_tapi_links(self):
381 response = test_utils.tapi_get_topology_details_request("T0 - Full Multi-layer topology")
383 self.assertEqual(response.status_code, requests.codes.ok)
384 res = response.json()
385 link_list = res['output']['topology']['link']
387 for link in link_list:
388 if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']):
389 self.assertEqual(link['operational-state'], 'DISABLED')
390 self.assertEqual(link['administrative-state'], 'LOCKED')
393 self.assertEqual(link['operational-state'], 'ENABLED')
394 self.assertEqual(link['administrative-state'], 'UNLOCKED')
395 self.assertEqual(nb_updated_link, 2,
396 "Only two xponder-output/input & xponder-transi links should have been modified")
399 def test_19_check_update_service_Ethernet(self):
400 response = test_utils.get_service_list_request(
401 "services/" + str(self.uuid_services.eth))
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
405 self.assertEqual(res['services'][0]['administrative-state'], 'inService')
408 def test_20_check_update_connectivity_service_Ethernet(self):
409 response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth))
410 self.assertEqual(response.status_code, requests.codes.ok)
411 res = response.json()
413 res['output']['service']['operational-state'], 'DISABLED')
415 res['output']['service']['administrative-state'], 'LOCKED')
418 def test_21_restore_status_line_port_xpdrc(self):
419 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
423 "administrative-state": "inService",
424 "port-qual": "xpdr-network"}]}
425 response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
426 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
427 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
428 self.assertEqual(response.status_code, requests.codes.ok)
431 def test_22_check_update_portmapping_ok(self):
432 response = test_utils.portmapping_request("XPDR-C1")
433 self.assertEqual(response.status_code, requests.codes.ok)
434 res = response.json()
435 mapping_list = res['nodes'][0]['mapping']
436 for mapping in mapping_list:
437 self.assertEqual(mapping['port-oper-state'], 'InService',
438 "Operational State should be 'InService'")
439 self.assertEqual(mapping['port-admin-state'], 'InService',
440 "Administrative State should be 'InService'")
443 def test_23_check_update_openroadm_topo_ok(self):
444 url = test_utils.URL_CONFIG_ORDM_TOPO
445 response = test_utils.get_request(url)
446 self.assertEqual(response.status_code, requests.codes.ok)
447 res = response.json()
448 node_list = res['network'][0]['node']
449 for node in node_list:
450 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
451 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
452 tp_list = node['ietf-network-topology:termination-point']
454 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
455 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
457 link_list = res['network'][0]['ietf-network-topology:link']
458 for link in link_list:
459 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
460 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
463 def test_24_check_update_tapi_neps_ok(self):
464 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi")
465 self.assertEqual(response.status_code, requests.codes.ok)
466 res = response.json()
467 nep_list = res['output']['node']['owned-node-edge-point']
469 self.assertEqual(nep['operational-state'], 'ENABLED',
470 "Operational State should be 'ENABLED'")
471 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
472 "Administrative State should be 'UNLOCKED'")
474 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR")
475 self.assertEqual(response.status_code, requests.codes.ok)
476 res = response.json()
477 nep_list = res['output']['node']['owned-node-edge-point']
479 self.assertEqual(nep['operational-state'], 'ENABLED',
480 "Operational State should be 'ENABLED'")
481 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
482 "Administrative State should be 'UNLOCKED'")
485 def test_25_check_update_tapi_links_ok(self):
486 response = test_utils.tapi_get_topology_details_request(
487 "T0 - Full Multi-layer topology")
489 self.assertEqual(response.status_code, requests.codes.ok)
490 res = response.json()
491 link_list = res['output']['topology']['link']
492 for link in link_list:
493 self.assertEqual(link['operational-state'], 'ENABLED')
494 self.assertEqual(link['administrative-state'], 'UNLOCKED')
498 def test_26_check_update_service1_ok(self):
499 self.test_12_get_service_Ethernet()
501 def test_27_check_update_connectivity_service_Ethernet_ok(self):
502 self.test_13_get_connectivity_service_Ethernet()
504 def test_28_change_status_port_roadma_srg(self):
505 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
508 "logical-connection-point": "SRG1-PP1",
509 "port-type": "client",
510 "circuit-id": "SRG1",
511 "administrative-state": "outOfService",
512 "port-qual": "roadm-external"}]}
513 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
514 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
515 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
516 self.assertEqual(response.status_code, requests.codes.ok)
519 def test_29_check_update_portmapping(self):
520 response = test_utils.portmapping_request("ROADM-A1")
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 mapping_list = res['nodes'][0]['mapping']
524 for mapping in mapping_list:
525 if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
526 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
527 "Operational State should be 'OutOfService'")
528 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
529 "Administrative State should be 'OutOfService'")
531 self.assertEqual(mapping['port-oper-state'], 'InService',
532 "Operational State should be 'InService'")
533 self.assertEqual(mapping['port-admin-state'], 'InService',
534 "Administrative State should be 'InService'")
537 def test_30_check_update_openroadm_topo(self):
538 url = test_utils.URL_CONFIG_ORDM_TOPO
539 response = test_utils.get_request(url)
540 self.assertEqual(response.status_code, requests.codes.ok)
541 res = response.json()
542 node_list = res['network'][0]['node']
544 for node in node_list:
545 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
546 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
547 tp_list = node['ietf-network-topology:termination-point']
549 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP1-TXRX':
550 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
551 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
554 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
555 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
556 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
558 link_list = res['network'][0]['ietf-network-topology:link']
559 updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
560 'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
562 for link in link_list:
563 if link['link-id'] in updated_links:
564 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
565 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
568 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
569 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
570 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
573 def test_31_check_update_tapi_neps(self):
574 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
575 self.assertEqual(response.status_code, requests.codes.ok)
576 res = response.json()
577 nep_list = res['output']['node']['owned-node-edge-point']
580 if 'SRG1-PP1-TXRX' in nep['name'][0]['value']:
581 self.assertEqual(nep['operational-state'], 'DISABLED',
582 "Operational State should be 'DISABLED'")
583 self.assertEqual(nep['administrative-state'], 'LOCKED',
584 "Administrative State should be 'LOCKED'")
587 self.assertEqual(nep['operational-state'], 'ENABLED',
588 "Operational State should be 'ENABLED'")
589 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
590 "Administrative State should be 'UNLOCKED'")
591 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
594 def test_32_check_update_tapi_links(self):
595 response = test_utils.tapi_get_topology_details_request(
596 "T0 - Full Multi-layer topology")
598 self.assertEqual(response.status_code, requests.codes.ok)
599 res = response.json()
600 link_list = res['output']['topology']['link']
602 for link in link_list:
603 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']):
604 self.assertEqual(link['operational-state'], 'DISABLED')
605 self.assertEqual(link['administrative-state'], 'LOCKED')
608 self.assertEqual(link['operational-state'], 'ENABLED')
609 self.assertEqual(link['administrative-state'], 'UNLOCKED')
610 self.assertEqual(nb_updated_link, 1,
611 "Only one xponder-output/input link should have been modified")
614 def test_33_check_update_service_Ethernet(self):
615 self.test_19_check_update_service_Ethernet()
617 def test_34_check_update_connectivity_service_Ethernet(self):
618 self.test_20_check_update_connectivity_service_Ethernet()
620 def test_35_restore_status_port_roadma_srg(self):
621 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
624 "logical-connection-point": "SRG1-PP1",
625 "port-type": "client",
626 "circuit-id": "SRG1",
627 "administrative-state": "inService",
628 "port-qual": "roadm-external"}]}
629 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
630 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
631 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
632 self.assertEqual(response.status_code, requests.codes.ok)
635 def test_36_check_update_portmapping_ok(self):
636 response = test_utils.portmapping_request("ROADM-A1")
637 self.assertEqual(response.status_code, requests.codes.ok)
638 res = response.json()
639 mapping_list = res['nodes'][0]['mapping']
640 for mapping in mapping_list:
641 self.assertEqual(mapping['port-oper-state'], 'InService',
642 "Operational State should be 'InService'")
643 self.assertEqual(mapping['port-admin-state'], 'InService',
644 "Administrative State should be 'InService'")
647 def test_37_check_update_openroadm_topo_ok(self):
648 self.test_23_check_update_openroadm_topo_ok()
650 def test_38_check_update_tapi_neps_ok(self):
651 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
652 self.assertEqual(response.status_code, requests.codes.ok)
653 res = response.json()
654 nep_list = res['output']['node']['owned-node-edge-point']
656 self.assertEqual(nep['operational-state'], 'ENABLED',
657 "Operational State should be 'ENABLED'")
658 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
659 "Administrative State should be 'UNLOCKED'")
663 def test_39_check_update_tapi_links_ok(self):
664 self.test_25_check_update_tapi_links_ok()
666 def test_40_check_update_service1_ok(self):
667 self.test_12_get_service_Ethernet()
669 def test_41_check_update_connectivity_service_Ethernet_ok(self):
670 self.test_13_get_connectivity_service_Ethernet()
672 def test_42_change_status_line_port_roadma_deg(self):
673 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
676 "logical-connection-point": "DEG2-TTP-TXRX",
679 "administrative-state": "outOfService",
680 "port-qual": "roadm-external"}]}
681 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
682 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
683 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
684 self.assertEqual(response.status_code, requests.codes.ok)
687 def test_43_check_update_portmapping(self):
688 response = test_utils.portmapping_request("ROADM-A1")
689 self.assertEqual(response.status_code, requests.codes.ok)
690 res = response.json()
691 mapping_list = res['nodes'][0]['mapping']
692 for mapping in mapping_list:
693 if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
694 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
695 "Operational State should be 'OutOfService'")
696 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
697 "Administrative State should be 'OutOfService'")
699 self.assertEqual(mapping['port-oper-state'], 'InService',
700 "Operational State should be 'InService'")
701 self.assertEqual(mapping['port-admin-state'], 'InService',
702 "Administrative State should be 'InService'")
705 def test_44_check_update_openroadm_topo(self):
706 url = test_utils.URL_CONFIG_ORDM_TOPO
707 response = test_utils.get_request(url)
708 self.assertEqual(response.status_code, requests.codes.ok)
709 res = response.json()
710 node_list = res['network'][0]['node']
712 for node in node_list:
713 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
714 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
715 tp_list = node['ietf-network-topology:termination-point']
717 if node['node-id'] == 'ROADM-A1-DEG2' and tp['tp-id'] == 'DEG2-TTP-TXRX':
718 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
719 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
722 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
723 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
724 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
726 link_list = res['network'][0]['ietf-network-topology:link']
727 updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
728 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
730 for link in link_list:
731 if link['link-id'] in updated_links:
732 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'outOfService')
733 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'outOfService')
736 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
737 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
738 self.assertEqual(nb_updated_link, 2, "Only two xponder-output/input links should have been modified")
741 def test_45_check_update_tapi_neps(self):
742 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
743 self.assertEqual(response.status_code, requests.codes.ok)
744 res = response.json()
745 nep_list = res['output']['node']['owned-node-edge-point']
748 if 'DEG2-TTP-TXRX' in nep['name'][0]['value']:
749 self.assertEqual(nep['operational-state'], 'DISABLED',
750 "Operational State should be 'DISABLED'")
751 self.assertEqual(nep['administrative-state'], 'LOCKED',
752 "Administrative State should be 'LOCKED'")
755 self.assertEqual(nep['operational-state'], 'ENABLED',
756 "Operational State should be 'ENABLED'")
757 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
758 "Administrative State should be 'UNLOCKED'")
759 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
762 def test_46_check_update_tapi_links(self):
763 response = test_utils.tapi_get_topology_details_request(
764 "T0 - Full Multi-layer topology")
766 self.assertEqual(response.status_code, requests.codes.ok)
767 res = response.json()
768 link_list = res['output']['topology']['link']
770 for link in link_list:
771 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']):
772 self.assertEqual(link['operational-state'], 'DISABLED')
773 self.assertEqual(link['administrative-state'], 'LOCKED')
776 self.assertEqual(link['operational-state'], 'ENABLED')
777 self.assertEqual(link['administrative-state'], 'UNLOCKED')
778 self.assertEqual(nb_updated_link, 1,
779 "Only one rdm-rdm link should have been modified")
782 def test_47_check_update_service_Ethernet(self):
783 self.test_19_check_update_service_Ethernet()
785 def test_48_check_update_connectivity_service_Ethernet(self):
786 self.test_20_check_update_connectivity_service_Ethernet()
788 def test_49_restore_status_line_port_roadma_deg(self):
789 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/2%2F0/ports/L1"
792 "logical-connection-point": "DEG2-TTP-TXRX",
795 "administrative-state": "inService",
796 "port-qual": "roadm-external"}]}
797 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
798 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
799 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
800 self.assertEqual(response.status_code, requests.codes.ok)
803 def test_50_check_update_portmapping_ok(self):
804 self.test_36_check_update_portmapping_ok()
806 def test_51_check_update_openroadm_topo_ok(self):
807 self.test_23_check_update_openroadm_topo_ok()
809 def test_52_check_update_tapi_neps_ok(self):
810 self.test_38_check_update_tapi_neps_ok()
812 def test_53_check_update_tapi_links_ok(self):
813 self.test_25_check_update_tapi_links_ok()
815 def test_54_check_update_service1_ok(self):
816 self.test_12_get_service_Ethernet()
818 def test_55_check_update_connectivity_service_Ethernet_ok(self):
819 self.test_13_get_connectivity_service_Ethernet()
821 def test_56_change_status_port_roadma_srg(self):
822 url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C2"
825 "logical-connection-point": "SRG1-PP2",
826 "port-type": "client",
827 "circuit-id": "SRG1",
828 "administrative-state": "outOfService",
829 "port-qual": "roadm-external"}]}
830 response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
831 data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
832 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
833 self.assertEqual(response.status_code, requests.codes.ok)
836 def test_57_check_update_portmapping(self):
837 response = test_utils.portmapping_request("ROADM-A1")
838 self.assertEqual(response.status_code, requests.codes.ok)
839 res = response.json()
840 mapping_list = res['nodes'][0]['mapping']
841 for mapping in mapping_list:
842 if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
843 self.assertEqual(mapping['port-oper-state'], 'OutOfService',
844 "Operational State should be 'OutOfService'")
845 self.assertEqual(mapping['port-admin-state'], 'OutOfService',
846 "Administrative State should be 'OutOfService'")
848 self.assertEqual(mapping['port-oper-state'], 'InService',
849 "Operational State should be 'InService'")
850 self.assertEqual(mapping['port-admin-state'], 'InService',
851 "Administrative State should be 'InService'")
854 def test_58_check_update_openroadm_topo(self):
855 url = test_utils.URL_CONFIG_ORDM_TOPO
856 response = test_utils.get_request(url)
857 self.assertEqual(response.status_code, requests.codes.ok)
858 res = response.json()
859 node_list = res['network'][0]['node']
861 for node in node_list:
862 self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
863 self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
864 tp_list = node['ietf-network-topology:termination-point']
866 if node['node-id'] == 'ROADM-A1-SRG1' and tp['tp-id'] == 'SRG1-PP2-TXRX':
867 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'outOfService')
868 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'outOfService')
871 self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
872 self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
873 self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
875 link_list = res['network'][0]['ietf-network-topology:link']
877 for link in link_list:
878 self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
879 self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
880 self.assertEqual(nb_updated_link, 0, "No link should have been modified")
883 def test_59_check_update_tapi_neps(self):
884 response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
885 self.assertEqual(response.status_code, requests.codes.ok)
886 res = response.json()
887 nep_list = res['output']['node']['owned-node-edge-point']
890 if 'SRG1-PP2-TXRX' in nep['name'][0]['value']:
891 self.assertEqual(nep['operational-state'], 'DISABLED',
892 "Operational State should be 'DISABLED'")
893 self.assertEqual(nep['administrative-state'], 'LOCKED',
894 "Administrative State should be 'LOCKED'")
897 self.assertEqual(nep['operational-state'], 'ENABLED',
898 "Operational State should be 'ENABLED'")
899 self.assertEqual(nep['administrative-state'], 'UNLOCKED',
900 "Administrative State should be 'UNLOCKED'")
901 self.assertEqual(nb_updated_neps, 3, "Only three roadm neps should have been modified")
904 def test_60_check_update_tapi_links(self):
905 response = test_utils.tapi_get_topology_details_request(
906 "T0 - Full Multi-layer topology")
908 self.assertEqual(response.status_code, requests.codes.ok)
909 res = response.json()
910 link_list = res['output']['topology']['link']
912 for link in link_list:
913 if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']):
914 self.assertEqual(link['operational-state'], 'DISABLED')
915 self.assertEqual(link['administrative-state'], 'LOCKED')
918 self.assertEqual(link['operational-state'], 'ENABLED')
919 self.assertEqual(link['administrative-state'], 'UNLOCKED')
920 self.assertEqual(nb_updated_link, 0,
921 "No link should have been modified")
924 def test_61_check_update_service1_ok(self):
925 self.test_12_get_service_Ethernet()
927 def test_62_check_update_connectivity_service_Ethernet_ok(self):
928 self.test_13_get_connectivity_service_Ethernet()
930 def test_63_delete_connectivity_service_Ethernet(self):
931 response = test_utils.tapi_delete_connectivity_request(str(self.uuid_services.eth))
932 self.assertEqual(response.status_code, requests.codes.no_content)
933 time.sleep(self.WAITING)
935 def test_64_disconnect_xponders_from_roadm(self):
936 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
937 response = test_utils.get_ordm_topo_request("")
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 links = res['network'][0]['ietf-network-topology:link']
942 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
943 link_name = link["link-id"]
944 response = test_utils.delete_request(url+link_name)
945 self.assertEqual(response.status_code, requests.codes.ok)
947 def test_65_disconnect_XPDRA(self):
948 response = test_utils.unmount_device("XPDR-A1")
949 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
951 def test_66_disconnect_XPDRC(self):
952 response = test_utils.unmount_device("XPDR-C1")
953 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
955 def test_67_disconnect_ROADMA(self):
956 response = test_utils.unmount_device("ROADM-A1")
957 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
959 def test_68_disconnect_ROADMC(self):
960 response = test_utils.unmount_device("ROADM-C1")
961 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
964 if __name__ == "__main__":
965 unittest.main(verbosity=2)