X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Ftransportpce_tests%2Fhybrid%2Ftest03_autonomous_reroute.py;h=e51f290d79d1cf6ea63c46649ba7480c07134255;hb=122239adbcc8e857aa9eff4a45f6ab1900dfe885;hp=5053ae23911733c6e1eb1f04068dd5cf6825edac;hpb=e2af857602888e5347f137aac70f73ad02bb6c76;p=transportpce.git diff --git a/tests/transportpce_tests/hybrid/test03_autonomous_reroute.py b/tests/transportpce_tests/hybrid/test03_autonomous_reroute.py index 5053ae239..e51f290d7 100644 --- a/tests/transportpce_tests/hybrid/test03_autonomous_reroute.py +++ b/tests/transportpce_tests/hybrid/test03_autonomous_reroute.py @@ -14,7 +14,6 @@ # pylint: disable=too-many-public-methods # pylint: disable=too-many-lines -import json import unittest import time import requests @@ -682,6 +681,64 @@ class TransportPCEtesting(unittest.TestCase): } ] + pm_handling_degrade_body = { + "rpc-action": "set", + "pm-to-be-set-or-created": { + "current-pm-entry": [ + { + "pm-resource-instance": "/org-openroadm-device:org-openroadm-device/org-openroadm-device" + ":interface[org-openroadm-device:name='OTS-DEG2-TTP-TXRX']", + "pm-resource-type": "interface", + "pm-resource-type-extension": "", + "current-pm": [ + { + "type": "opticalPowerInput", + "extension": "", + "location": "nearEnd", + "direction": "rx", + "measurement": [ + { + "granularity": "15min", + "pmParameterValue": -30, + "pmParameterUnit": "dBm", + "validity": "complete" + }, + { + "granularity": "24Hour", + "pmParameterValue": -21.3, + "pmParameterUnit": "dBm", + "validity": "complete" + } + ] + } + ] + } + ] + } + } + + pm_handling_recover_body = { + "rpc-action": "clear", + "pm-to-get-clear-or-delete": { + "current-pm-entry": [ + { + "pm-resource-instance": "/org-openroadm-device:org-openroadm-device/org-openroadm-device" + ":interface[org-openroadm-device:name='OTS-DEG2-TTP-TXRX']", + "pm-resource-type": "interface", + "pm-resource-type-extension": "", + "current-pm": [ + { + "type": "opticalPowerInput", + "extension": "", + "location": "nearEnd", + "direction": "rx" + } + ] + } + ] + } + } + @classmethod def setUpClass(cls): cls.processes = test_utils.start_tpce() @@ -726,7 +783,7 @@ class TransportPCEtesting(unittest.TestCase): self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) - def test_06_connect_xprda2_1_N1_to_roadma_PP1(self): + def test_06_connect_xpdra2_1_N1_to_roadma_PP1(self): response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1', @@ -742,7 +799,7 @@ class TransportPCEtesting(unittest.TestCase): self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Roadm Xponder links created successfully', response["output"]["result"]) - def test_08_connect_xprdc2_1_N1_to_roadmc_PP1(self): + def test_08_connect_xpdrc2_1_N1_to_roadmc_PP1(self): response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1', @@ -811,50 +868,8 @@ class TransportPCEtesting(unittest.TestCase): # Degrade ROADM-A1-ROADM-C1 link def test_14_set_pm_ROADMA_OTS_DEG2_TTP_TXRX_OpticalPowerInput(self): - url = "{}/operations/pm-handling:pm-interact" - body = { - "input": { - "rpc-action": "set", - "pm-to-be-set-or-created": { - "current-pm-entry": [ - { - "pm-resource-instance": "/org-openroadm-device:org-openroadm-device/org-openroadm-device" - ":interface[org-openroadm-device:name='OTS-DEG2-TTP-TXRX']", - "pm-resource-type": "interface", - "pm-resource-type-extension": "", - "current-pm": [ - { - "type": "opticalPowerInput", - "extension": "", - "location": "nearEnd", - "direction": "rx", - "measurement": [ - { - "granularity": "15min", - "pmParameterValue": -30, - "pmParameterUnit": "dBm", - "validity": "complete" - }, - { - "granularity": "24Hour", - "pmParameterValue": -21.3, - "pmParameterUnit": "dBm", - "validity": "complete" - } - ] - } - ] - } - ] - } - } - } - response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) - self.assertEqual(response.status_code, requests.codes.ok) - self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully set !") + self.assertTrue(test_utils.sims_update_pm_interact(('roadma', self.NODE_VERSION_221), + self.pm_handling_degrade_body)) time.sleep(2) def test_15_get_eth_service1(self): @@ -891,42 +906,14 @@ class TransportPCEtesting(unittest.TestCase): # Restore ROADM-A1-ROADM-C1 link def test_17_clear_pm_ROADMA_OTS_DEG2_TTP_TXRX_OpticalPowerInput(self): - url = "{}/operations/pm-handling:pm-interact" - body = { - "input": { - "rpc-action": "clear", - "pm-to-get-clear-or-delete": { - "current-pm-entry": [ - { - "pm-resource-instance": "/org-openroadm-device:org-openroadm-device/org-openroadm-device" - ":interface[org-openroadm-device:name='OTS-DEG2-TTP-TXRX']", - "pm-resource-type": "interface", - "pm-resource-type-extension": "", - "current-pm": [ - { - "type": "opticalPowerInput", - "extension": "", - "location": "nearEnd", - "direction": "rx" - } - ] - } - ] - } - } - } - response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) - self.assertEqual(response.status_code, requests.codes.ok) - self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully released !") + self.assertTrue(test_utils.sims_update_pm_interact(('roadma', self.NODE_VERSION_221), + self.pm_handling_recover_body)) time.sleep(2) def test_18_get_eth_service1(self): self.test_13_get_eth_service1() - def test_19_connect_xprda2_3_N1_to_roadma_PP2(self): + def test_19_connect_xpdra2_3_N1_to_roadma_PP2(self): response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '3', 'network-num': '1', @@ -944,7 +931,7 @@ class TransportPCEtesting(unittest.TestCase): self.assertIn('Roadm Xponder links created successfully', response["output"]["result"]) time.sleep(2) - def test_21_connect_xprdc2_3_N1_to_roadmc_PP2(self): + def test_21_connect_xpdrc2_3_N1_to_roadmc_PP2(self): response = test_utils.transportpce_api_rpc_request( 'transportpce-networkutils', 'init-xpdr-rdm-links', {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '3', 'network-num': '1', @@ -1137,50 +1124,7 @@ class TransportPCEtesting(unittest.TestCase): # Degrade ROADM-A1-ROADM-C1 link def test_36_set_pm_ROADMA_OTS_DEG2_TTP_TXRX_OpticalPowerInput(self): - url = "{}/operations/pm-handling:pm-interact" - body = { - "input": { - "rpc-action": "set", - "pm-to-be-set-or-created": { - "current-pm-entry": [ - { - "pm-resource-instance": "/org-openroadm-device:org-openroadm-device/org-openroadm-device" - ":interface[org-openroadm-device:name='OTS-DEG2-TTP-TXRX']", - "pm-resource-type": "interface", - "pm-resource-type-extension": "", - "current-pm": [ - { - "type": "opticalPowerInput", - "extension": "", - "location": "nearEnd", - "direction": "rx", - "measurement": [ - { - "granularity": "15min", - "pmParameterValue": -30, - "pmParameterUnit": "dBm", - "validity": "complete" - }, - { - "granularity": "24Hour", - "pmParameterValue": -21.3, - "pmParameterUnit": "dBm", - "validity": "complete" - } - ] - } - ] - } - ] - } - } - } - response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) - self.assertEqual(response.status_code, requests.codes.ok) - self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully set !") + self.test_14_set_pm_ROADMA_OTS_DEG2_TTP_TXRX_OpticalPowerInput() time.sleep(self.WAITING * 2) def test_37_get_eth_service1(self): @@ -1206,7 +1150,7 @@ class TransportPCEtesting(unittest.TestCase): def test_40_get_service_path_service_2(self): response = test_utils.get_serv_path_list_attr("service-paths", "service2") self.assertEqual(response['status_code'], requests.codes.ok) - index = self.service_path_service_2_AtoZ.index( + index1 = self.service_path_service_2_AtoZ.index( { 'id': '10', 'resource': { @@ -1216,50 +1160,35 @@ class TransportPCEtesting(unittest.TestCase): } } ) - service_path_expected = self.service_path_service_2_AtoZ[:index] + [{ + index2 = self.service_path_service_2_AtoZ.index( + { + 'id': '11', + 'resource': { + 'state': 'inService', + 'link-id': 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX' + } + } + ) + service_path_expected = self.service_path_service_2_AtoZ[:index1] + [{ 'id': '10', 'resource': { 'state': 'outOfService', 'tp-id': 'DEG2-TTP-TXRX', 'tp-node-id': 'ROADM-A1-DEG2' } - }] + self.service_path_service_2_AtoZ[index + 1:] + }] + self.service_path_service_2_AtoZ[index1 + 1:index2] + [{ + 'id': '11', + 'resource': { + 'state': 'outOfService', + 'link-id': 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX' + } + }] + self.service_path_service_2_AtoZ[index2 + 1:] self.assertCountEqual(service_path_expected, response['service-paths'][0]['path-description']['aToZ-direction']['aToZ']) # Restore ROADM-A1-ROADM-C1 link def test_41_clear_pm_ROADMA_OTS_DEG2_TTP_TXRX_OpticalPowerInput(self): - url = "{}/operations/pm-handling:pm-interact" - body = { - "input": { - "rpc-action": "clear", - "pm-to-get-clear-or-delete": { - "current-pm-entry": [ - { - "pm-resource-instance": "/org-openroadm-device:org-openroadm-device/org-openroadm-device" - ":interface[org-openroadm-device:name='OTS-DEG2-TTP-TXRX']", - "pm-resource-type": "interface", - "pm-resource-type-extension": "", - "current-pm": [ - { - "type": "opticalPowerInput", - "extension": "", - "location": "nearEnd", - "direction": "rx" - } - ] - } - ] - } - } - } - response = requests.request("POST", url.format("http://127.0.0.1:8141/restconf"), - data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON, - auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD), - timeout=test_utils.REQUEST_TIMEOUT) - self.assertEqual(response.status_code, requests.codes.ok) - self.assertEqual(response.json()['output']['status-message'], "The PMs has been successfully released !") - time.sleep(2) + self.test_17_clear_pm_ROADMA_OTS_DEG2_TTP_TXRX_OpticalPowerInput() def test_42_get_eth_service1(self): self.test_13_get_eth_service1()