X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Ftransportpce_tests%2Fpce%2Ftest01_pce.py;h=29bc687995e32b980a99b241d4f4490ce425f7c2;hb=refs%2Fchanges%2F20%2F102220%2F6;hp=6477c54f45355766f3f5a99a41e6ea41365c0bf2;hpb=e72024a00f020ebfe4ade77016f4add2cb238e3f;p=transportpce.git diff --git a/tests/transportpce_tests/pce/test01_pce.py b/tests/transportpce_tests/pce/test01_pce.py index 6477c54f4..29bc68799 100644 --- a/tests/transportpce_tests/pce/test01_pce.py +++ b/tests/transportpce_tests/pce/test01_pce.py @@ -20,10 +20,30 @@ import requests sys.path.append('transportpce_tests/common/') # pylint: disable=wrong-import-position # pylint: disable=import-error -import test_utils # nopep8 +import test_utils_rfc8040 # nopep8 class TransportPCEtesting(unittest.TestCase): + path_computation_input_data = { + "service-name": "service-1", + "resource-reserve": "true", + "service-handler-header": { + "request-id": "request1" + }, + "service-a-end": { + "service-rate": "100", + "clli": "NodeA", + "service-format": "Ethernet", + "node-id": "XPDRA01" + }, + "service-z-end": { + "service-rate": "100", + "clli": "NodeC", + "service-format": "Ethernet", + "node-id": "XPDRC01" + }, + "pce-routing-metric": "hop-count" + } simple_topo_bi_dir_data = None simple_topo_uni_dir_data = None @@ -37,18 +57,18 @@ class TransportPCEtesting(unittest.TestCase): sample_files_parsed = False try: TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), - "..", "..", "sample_configs", "honeynode-topo.xml") + "..", "..", "sample_configs", "honeynode-topo.json") with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir: cls.simple_topo_bi_dir_data = topo_bi_dir.read() TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), - "..", "..", "sample_configs", "NW-simple-topology.xml") + "..", "..", "sample_configs", "NW-simple-topology.json") with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir: cls.simple_topo_uni_dir_data = topo_uni_dir.read() TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), - "..", "..", "sample_configs", "NW-for-test-5-4.xml") + "..", "..", "sample_configs", "NW-for-test-5-4.json") with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex: cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read() PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), @@ -69,327 +89,272 @@ class TransportPCEtesting(unittest.TestCase): if sample_files_parsed: print("sample files content loaded") - cls.processes = test_utils.start_tpce() + cls.processes = test_utils_rfc8040.start_tpce() @classmethod def tearDownClass(cls): + # clean datastores + test_utils_rfc8040.del_portmapping() + test_utils_rfc8040.del_ietf_network('openroadm-topology') # pylint: disable=not-an-iterable for process in cls.processes: - test_utils.shutdown_process(process) + test_utils_rfc8040.shutdown_process(process) print("all processes killed") def setUp(self): # instruction executed before each test method time.sleep(1) - # Load port mapping - def test_00_load_port_mapping(self): - response = test_utils.rawpost_request(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data) - self.assertEqual(response.status_code, requests.codes.no_content) - time.sleep(2) + # Load port mapping + def test_01_load_port_mapping(self): + response = test_utils_rfc8040.post_portmapping(self.port_mapping_data) + self.assertIn(response['status_code'], (requests.codes.created, requests.codes.no_content)) + time.sleep(1) - # Load simple bidirectional topology - def test_01_load_simple_topology_bi(self): - response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data) - self.assertEqual(response.status_code, requests.codes.ok) - time.sleep(2) + # Load simple bidirectional topology + def test_02_load_simple_topology_bi(self): + response = test_utils_rfc8040.put_ietf_network('openroadm-topology', self.simple_topo_bi_dir_data) + self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content)) + time.sleep(1) # Get existing nodeId - def test_02_get_nodeId(self): - response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['node'][0]['node-id'], 'ROADMA01-SRG1') + def test_03_get_nodeId(self): + response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['node']['node-id'], 'ROADMA01-SRG1') time.sleep(1) # Get existing linkId - def test_03_get_linkId(self): - response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['ietf-network-topology:link'][0]['link-id'], - 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX') + def test_04_get_linkId(self): + response = test_utils_rfc8040.get_ietf_network_link_request( + 'openroadm-topology', 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['link']['link-id'], 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX') time.sleep(1) # Path Computation success - def test_04_path_computation_xpdr_bi(self): - response = test_utils.path_computation_request("request-1", "service-1", - {"node-id": "XPDRA01", "service-rate": "100", - "service-format": "Ethernet", "clli": "nodeA"}, - {"node-id": "XPDRC01", "service-rate": "100", - "service-format": "Ethernet", "clli": "nodeC"}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_05_path_computation_xpdr_bi(self): + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - time.sleep(5) + response['output']['configuration-response-common']['response-message']) + time.sleep(2) # Path Computation success - def test_05_path_computation_rdm_bi(self): - response = test_utils.path_computation_request("request-1", "service-1", - {"node-id": "ROADMA01", "service-rate": "100", - "service-format": "Ethernet", "clli": "NodeA"}, - {"node-id": "ROADMC01", "service-rate": "100", - "service-format": "Ethernet", "clli": "NodeC"}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_06_path_computation_rdm_bi(self): + self.path_computation_input_data["service-a-end"]["node-id"] = "ROADMA01" + self.path_computation_input_data["service-z-end"]["node-id"] = "ROADMC01" + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - time.sleep(5) - - # Delete topology - def test_06_delete_simple_topology_bi(self): - response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO) - self.assertEqual(response.status_code, requests.codes.ok) + response['output']['configuration-response-common']['response-message']) time.sleep(2) - # Test deleted topology - def test_07_test_topology_simple_bi_deleted(self): - response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1") - self.assertEqual(response.status_code, requests.codes.conflict) - time.sleep(1) - # Load simple bidirectional topology - def test_08_load_simple_topology_uni(self): - response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data) - self.assertEqual(response.status_code, 201) - time.sleep(2) + def test_07_load_simple_topology_uni(self): + response = test_utils_rfc8040.put_ietf_network('openroadm-topology', self.simple_topo_uni_dir_data) + self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content)) + time.sleep(1) # Get existing nodeId - def test_09_get_nodeId(self): - response = test_utils.get_ordm_topo_request("node/XPONDER-1-2") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['node'][0]['node-id'], - 'XPONDER-1-2') + def test_08_get_nodeId(self): + response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPONDER-1-2', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['node']['node-id'], 'XPONDER-1-2') time.sleep(1) # Get existing linkId - def test_10_get_linkId(self): - response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['ietf-network-topology:link'][0]['link-id'], - 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX') + def test_09_get_linkId(self): + response = test_utils_rfc8040.get_ietf_network_link_request( + 'openroadm-topology', 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['link']['link-id'], 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX') time.sleep(1) # Path Computation success - def test_11_path_computation_xpdr_uni(self): - response = test_utils.path_computation_request("request-1", "service-1", - {"node-id": "XPONDER-1-2", "service-rate": "100", - "service-format": "Ethernet", "clli": "ORANGE1"}, - {"node-id": "XPONDER-3-2", "service-rate": "100", - "service-format": "Ethernet", "clli": "ORANGE3"}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_10_path_computation_xpdr_uni(self): + self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2" + self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1" + self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2" + self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3" + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - time.sleep(5) + response['output']['configuration-response-common']['response-message']) + time.sleep(2) # Path Computation success - def test_12_path_computation_rdm_uni(self): - response = test_utils.path_computation_request("request1", "service1", - {"service-rate": "100", "service-format": "Ethernet", - "clli": "cll21", "node-id": "OpenROADM-2-1"}, - {"service-rate": "100", "service-format": "Ethernet", - "clli": "ncli22", "node-id": "OpenROADM-2-2"}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_11_path_computation_rdm_uni(self): + self.path_computation_input_data["service-a-end"]["node-id"] = "OpenROADM-2-1" + self.path_computation_input_data["service-a-end"]["clli"] = "cll21" + self.path_computation_input_data["service-z-end"]["node-id"] = "OpenROADM-2-2" + self.path_computation_input_data["service-z-end"]["clli"] = "ncli22" + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) + response['output']['configuration-response-common']['response-message']) # ZtoA path test - atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ']) - ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA']) + atozList = len(response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ']) + ztoaList = len(response['output']['response-parameters']['path-description']['zToA-direction']['zToA']) self.assertEqual(atozList, 15) self.assertEqual(ztoaList, 15) for i in range(0, 15): - atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i] - ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i] + atoz = response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i] + ztoa = response['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i] if atoz['id'] == '14': self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX') if ztoa['id'] == '0': self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX') - time.sleep(5) - - # Delete topology - def test_13_delete_simple_topology(self): - response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO) - self.assertEqual(response.status_code, requests.codes.ok) time.sleep(2) - # Test deleted topology - def test_14_test_topology_simple_deleted(self): - response = test_utils.get_ordm_topo_request("node/XPONDER-1-2") - self.assertEqual(response.status_code, requests.codes.conflict) - time.sleep(1) - # Load complex topology - def test_15_load_complex_topology(self): - response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data) - self.assertEqual(response.status_code, 201) - time.sleep(2) + def test_12_load_complex_topology(self): + response = test_utils_rfc8040.put_ietf_network('openroadm-topology', self.complex_topo_uni_dir_data) + self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content)) + time.sleep(1) # Get existing nodeId - def test_16_get_nodeId(self): - response = test_utils.get_ordm_topo_request("node/XPONDER-3-2") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['node'][0]['node-id'], - 'XPONDER-3-2') + def test_13_get_nodeId(self): + response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPONDER-3-2', 'config') + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['node']['node-id'], 'XPONDER-3-2') time.sleep(1) # Test failed path computation - def test_17_fail_path_computation(self): - response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST, - {"input": {"service-handler-header": {"request-id": "request-1"}}}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_14_fail_path_computation(self): + del self.path_computation_input_data["service-name"] + del self.path_computation_input_data["service-a-end"] + del self.path_computation_input_data["service-z-end"] + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Service Name is not set', - res['output']['configuration-response-common']['response-message']) + response['output']['configuration-response-common']['response-message']) time.sleep(2) # Test1 success path computation - def test_18_success1_path_computation(self): - response = test_utils.path_computation_request("request1", "service1", - {"service-format": "Ethernet", "service-rate": "100", - "clli": "ORANGE2", "node-id": "XPONDER-2-2"}, - {"service-format": "Ethernet", "service-rate": "100", - "clli": "ORANGE1", "node-id": "XPONDER-1-2"}, - {"customer-code": ["Some customer-code"], - "co-routing": { - "service-identifier-list": [ - { - "service-identifier": "Some existing-service", - } - ] - } - }, - {"customer-code": ["Some customer-code"], - "co-routing": { - "service-identifier-list": [ - { - "service-identifier": "Some existing-service", - } - ] - } - }, "hop-count") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_15_success1_path_computation(self): + self.path_computation_input_data["service-name"] = "service 1" + self.path_computation_input_data["service-a-end"] = {"service-format": "Ethernet", "service-rate": "100", + "clli": "ORANGE2", "node-id": "XPONDER-2-2"} + self.path_computation_input_data["service-z-end"] = {"service-format": "Ethernet", "service-rate": "100", + "clli": "ORANGE1", "node-id": "XPONDER-1-2"} + self.path_computation_input_data["hard-constraints"] = {"customer-code": ["Some customer-code"], + "co-routing": { + "service-identifier-list": [{ + "service-identifier": "Some existing-service"}] + }} + self.path_computation_input_data["soft-constraints"] = {"customer-code": ["Some customer-code"], + "co-routing": { + "service-identifier-list": [{ + "service-identifier": "Some existing-service"}] + }} + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - time.sleep(5) + response['output']['configuration-response-common']['response-message']) + + time.sleep(4) # Test2 success path computation with path description - def test_19_success2_path_computation(self): - response = test_utils.path_computation_request("request 1", "service 1", - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-1-2", "clli": "ORANGE1"}, - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-3-2", "clli": "ORANGE3"}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_16_success2_path_computation(self): + self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2" + self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1" + self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2" + self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3" + del self.path_computation_input_data["hard-constraints"] + del self.path_computation_input_data["soft-constraints"] + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - self.assertEqual(5, res['output']['response-parameters']['path-description'] + response['output']['configuration-response-common']['response-message']) + self.assertEqual(5, response['output']['response-parameters']['path-description'] ['aToZ-direction']['aToZ-wavelength-number']) - self.assertEqual(5, res['output']['response-parameters']['path-description'] + self.assertEqual(5, response['output']['response-parameters']['path-description'] ['zToA-direction']['zToA-wavelength-number']) - time.sleep(5) + time.sleep(4) # Test3 success path computation with hard-constraints exclude - def test_20_success3_path_computation(self): - response = test_utils.path_computation_request("request 1", "service 1", - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-1-2", "clli": "ORANGE1"}, - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-3-2", "clli": "ORANGE3"}, - {"exclude": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_17_success3_path_computation(self): + self.path_computation_input_data["hard-constraints"] = {"exclude": + {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}} + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - self.assertEqual(9, res['output']['response-parameters']['path-description'] + response['output']['configuration-response-common']['response-message']) + self.assertEqual(9, response['output']['response-parameters']['path-description'] ['aToZ-direction']['aToZ-wavelength-number']) - self.assertEqual(9, res['output']['response-parameters']['path-description'] + self.assertEqual(9, response['output']['response-parameters']['path-description'] ['zToA-direction']['zToA-wavelength-number']) - time.sleep(5) + time.sleep(4) # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2 - def test_21_path_computation_before_oms_attribute_deletion(self): - response = test_utils.path_computation_request("request 1", "service 1", - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-2-2", "clli": "ORANGE2"}, - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-1-2", "clli": "ORANGE1"}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_18_path_computation_before_oms_attribute_deletion(self): + self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-2-2" + self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE2" + self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-1-2" + self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE1" + del self.path_computation_input_data["hard-constraints"] + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - nbElmPath = len(res['output']['response-parameters']['path-description'] - ['aToZ-direction']['aToZ']) - self.assertEqual(31, nbElmPath) + response['output']['configuration-response-common']['response-message']) + path_depth = len(response['output']['response-parameters']['path-description'] + ['aToZ-direction']['aToZ']) + self.assertEqual(31, path_depth) link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"} find = False - for i in range(0, nbElmPath): - resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i] - ['resource']) + for i in range(0, path_depth): + resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i] + ['resource']) if resource_i == link: find = True self.assertEqual(find, True) - time.sleep(5) + time.sleep(4) # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2 - def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self): - response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2") - self.assertEqual(response.status_code, requests.codes.ok) + def test_19_delete_oms_attribute_in_openroadm13toopenroadm12_link(self): + response = test_utils_rfc8040.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) time.sleep(2) # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2 - def test_23_path_computation_after_oms_attribute_deletion(self): - response = test_utils.path_computation_request("request 1", "service 1", - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-2-2", "clli": "ORANGE2"}, - {"service-rate": "100", "service-format": "Ethernet", - "node-id": "XPONDER-1-2", "clli": "ORANGE1"}) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + def test_20_path_computation_after_oms_attribute_deletion(self): + response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce', + 'path-computation-request', + self.path_computation_input_data) + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn('Path is calculated', - res['output']['configuration-response-common']['response-message']) - nbElmPath = len(res['output']['response-parameters']['path-description'] - ['aToZ-direction']['aToZ']) - self.assertEqual(47, nbElmPath) + response['output']['configuration-response-common']['response-message']) + path_depth = len(response['output']['response-parameters']['path-description'] + ['aToZ-direction']['aToZ']) + self.assertEqual(47, path_depth) link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"} find = False - for i in range(0, nbElmPath): - resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i] - ['resource']) + for i in range(0, path_depth): + resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i] + ['resource']) if resource_i == link: find = True self.assertNotEqual(find, True) time.sleep(5) - # Delete complex topology - def test_24_delete_complex_topology(self): - response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO) - self.assertEqual(response.status_code, requests.codes.ok) - time.sleep(2) - - # Test deleted complex topology - def test_25_test_topology_complex_deleted(self): - response = test_utils.get_ordm_topo_request("node/XPONDER-3-2") - self.assertEqual(response.status_code, requests.codes.conflict) - time.sleep(1) - - # Delete portmapping - def test_26_delete_port_mapping(self): - response = test_utils.delete_request(test_utils.URL_FULL_PORTMAPPING) - self.assertEqual(response.status_code, requests.codes.ok) - time.sleep(2) - if __name__ == "__main__": unittest.main(verbosity=2)