X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Ftransportpce_tests%2F7.1%2Ftest03_renderer_or_modes.py;fp=tests%2Ftransportpce_tests%2F7.1%2Ftest03_renderer_or_modes.py;h=b8173c2aa06e36c1fc651bd323321ea86766d421;hb=654a9fb915061ac8eb5ec52cacbc3c76a8455aa8;hp=ecb41ef931039479b2e7e7685f7bc8fa6e79cc0e;hpb=6233872e577c6abc704af61bc26362202017c103;p=transportpce.git diff --git a/tests/transportpce_tests/7.1/test03_renderer_or_modes.py b/tests/transportpce_tests/7.1/test03_renderer_or_modes.py index ecb41ef93..b8173c2aa 100644 --- a/tests/transportpce_tests/7.1/test03_renderer_or_modes.py +++ b/tests/transportpce_tests/7.1/test03_renderer_or_modes.py @@ -21,7 +21,7 @@ import requests sys.path.append("transportpce_tests/common") # pylint: disable=wrong-import-position # pylint: disable=import-error -import test_utils_rfc8040 # nopep8 +import test_utils # nopep8 class TransportPCE400GPortMappingTesting(unittest.TestCase): @@ -44,15 +44,15 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): @classmethod def setUpClass(cls): - cls.processes = test_utils_rfc8040.start_tpce() - cls.processes = test_utils_rfc8040.start_sims([("xpdra2", cls.NODE_VERSION), - ("xpdrc2", cls.NODE_VERSION)]) + cls.processes = test_utils.start_tpce() + cls.processes = test_utils.start_sims([("xpdra2", cls.NODE_VERSION), + ("xpdrc2", cls.NODE_VERSION)]) @classmethod def tearDownClass(cls): # pylint: disable=not-an-iterable for process in cls.processes: - test_utils_rfc8040.shutdown_process(process) + test_utils.shutdown_process(process) print("all processes killed") def setUp(self): @@ -61,35 +61,35 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): time.sleep(10) def test_01_xpdr_device_connection(self): - response = test_utils_rfc8040.mount_device("XPDR-A2", - ("xpdra2", self.NODE_VERSION)) + response = test_utils.mount_device("XPDR-A2", + ("xpdra2", self.NODE_VERSION)) self.assertEqual(response.status_code, requests.codes.created, - test_utils_rfc8040.CODE_SHOULD_BE_201) + test_utils.CODE_SHOULD_BE_201) def test_01a_xpdr_device_connection(self): - response = test_utils_rfc8040.mount_device("XPDR-C2", - ("xpdrc2", self.NODE_VERSION)) + response = test_utils.mount_device("XPDR-C2", + ("xpdrc2", self.NODE_VERSION)) self.assertEqual(response.status_code, requests.codes.created, - test_utils_rfc8040.CODE_SHOULD_BE_201) + test_utils.CODE_SHOULD_BE_201) # Check the correct capabilities for the client ports (if-100GE, if-100GE-ODU4, # if-OCH-OTU4-ODU4) def test_02_check_client_capabilities(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-CLIENT1") + response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-CLIENT1") self.assertEqual(response["status_code"], requests.codes.ok) self.assertEqual( self.CLIENT_CAPABILITIES, sorted(response["mapping"][0]["supported-interface-capability"])) def test_03_check_client_capabilities(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-CLIENT1") + response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-CLIENT1") self.assertEqual(response["status_code"], requests.codes.ok) self.assertEqual( self.CLIENT_CAPABILITIES, sorted(response["mapping"][0]["supported-interface-capability"])) def test_04_100g_ofec_service_path_create(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( "transportpce-device-renderer", "service-path", { "service-name": "service_100GE_ofec", @@ -162,7 +162,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): for x in response["output"]["node-interface"][1].keys()}) def test_05_get_portmapping_network1(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1") + response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1") self.assertEqual(response["status_code"], requests.codes.ok) self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1" self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "FDvaQIf2Z08=" @@ -172,7 +172,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["mapping"]) def test_06_get_portmapping_network1(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1") + response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1") self.assertEqual(response["status_code"], requests.codes.ok) self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1" self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "AJpkaVmZKJk5" @@ -183,7 +183,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): def test_07_check_interface_otsi(self): # pylint: disable=line-too-long - response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272") + response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-NETWORK1-265:272", "administrative-state": "inService", @@ -208,7 +208,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): def test_08_check_interface_otsi(self): # pylint: disable=line-too-long - response = test_utils_rfc8040.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272") + response = test_utils.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-NETWORK1-265:272", "administrative-state": "inService", @@ -232,7 +232,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"]) def test_09_check_interface_otsig(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G", @@ -251,7 +251,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]) def test_10_check_interface_otsig(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G", @@ -270,7 +270,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]) def test_11_check_interface_otuc1(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-OTUC1") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1", @@ -292,7 +292,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]) def test_12_check_interface_otuc1(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-OTUC1") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1", @@ -314,7 +314,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"]) def test_13_check_interface_oduc1(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1", @@ -340,7 +340,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"]) def test_14_check_interface_oduc1(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1") self.assertEqual(response["status_code"], requests.codes.ok) @@ -367,7 +367,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"]) def test_15_check_interface_odu4(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4") self.assertEqual(response["status_code"], requests.codes.ok) @@ -395,7 +395,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): # TODO: Check the trib-port numbering def test_16_check_interface_odu4(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4") self.assertEqual(response["status_code"], requests.codes.ok) @@ -423,7 +423,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): # TODO: Check the trib-port numbering def test_17_check_interface_100ge_client(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G", @@ -441,7 +441,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"]) def test_18_check_interface_100ge_client(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G", @@ -460,7 +460,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): # Delete the service path def test_19_service_path_delete_100ge(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( "transportpce-device-renderer", "service-path", { "service-name": "service_100GE_ofec", @@ -488,67 +488,67 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): self.assertIn("Request processed", response["output"]["result"]) def test_20_check_no_interface_100ge_client(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET") self.assertEqual(response["status_code"], requests.codes.conflict) def test_21_check_no_interface_100ge_client(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET") self.assertEqual(response["status_code"], requests.codes.conflict) def test_22_check_no_interface_odu4(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4") self.assertEqual(response["status_code"], requests.codes.conflict) def test_23_check_no_interface_odu4(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4") self.assertEqual(response["status_code"], requests.codes.conflict) def test_24_check_no_interface_otuc2(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1") self.assertEqual(response["status_code"], requests.codes.conflict) def test_25_check_no_interface_otuc2(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1") self.assertEqual(response["status_code"], requests.codes.conflict) # Check if port-mapping data is updated, where the supporting-otucn is deleted def test_26_check_no_otuc1(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1") + response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1") self.assertRaises(KeyError, lambda: response["supporting-otucn"]) def test_27_check_no_otuc1(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1") + response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1") self.assertRaises(KeyError, lambda: response["supporting-otucn"]) def test_28_check_no_interface_otsig(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G") self.assertEqual(response["status_code"], requests.codes.conflict) def test_29_check_no_interface_otsig(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G") self.assertEqual(response["status_code"], requests.codes.conflict) def test_30_check_no_interface_otsi(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR3-NETWORK1-265:272") self.assertEqual(response["status_code"], requests.codes.conflict) def test_31_check_no_interface_otsi(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272") self.assertEqual(response["status_code"], requests.codes.conflict) # 200G 31.6 Gbaud mode for muxponder, with qam16 def test_32_service_path_create_otuc2(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( "transportpce-device-renderer", "service-path", { "service-name": "service_OTUC2", @@ -605,7 +605,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw=" def test_33_get_portmapping_network1(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1") + response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1") self.assertEqual(response["status_code"], requests.codes.ok) self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2" del self.NETWORK1_CHECK_DICT["connection-map-lcp"] @@ -615,7 +615,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): def test_34_check_interface_otsi(self): # pylint: disable=line-too-long - response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272") + response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR2-NETWORK1-265:272", "administrative-state": "inService", @@ -639,7 +639,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"]) def test_35_check_interface_otsig(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G", @@ -658,7 +658,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"]) def test_36_check_interface_otuc2(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2") self.assertEqual(response["status_code"], requests.codes.ok) input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2", @@ -683,7 +683,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): # as there is no change in code and are covered in test02_otn_renderer def test_37_service_path_delete_otuc2(self): - response = test_utils_rfc8040.transportpce_api_rpc_request( + response = test_utils.transportpce_api_rpc_request( "transportpce-device-renderer", "service-path", { "modulation-format": "dp-qam16", @@ -712,29 +712,29 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): del self.NETWORK1_CHECK_DICT["supporting-otucn"] def test_38_check_no_interface_otuc2(self): - response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2") + response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2") self.assertEqual(response["status_code"], requests.codes.conflict) def test_39_check_no_interface_otsig(self): - response = test_utils_rfc8040.check_node_attribute_request( + response = test_utils.check_node_attribute_request( "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G") self.assertEqual(response["status_code"], requests.codes.conflict) def test_40_check_no_interface_otsi(self): - response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768") + response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768") self.assertEqual(response["status_code"], requests.codes.conflict) def test_41_check_no_otuc2(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1") + response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1") self.assertRaises(KeyError, lambda: response["supporting-otucn"]) # Disconnect the XPDR def test_42_xpdr_device_disconnection(self): - response = test_utils_rfc8040.unmount_device("XPDR-A2") + response = test_utils.unmount_device("XPDR-A2") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_43_xpdr_device_disconnected(self): - response = test_utils_rfc8040.check_device_connection("XPDR-A2") + response = test_utils.check_device_connection("XPDR-A2") self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["connection-status"]["error-type"], ("protocol", "application")) self.assertEqual(response["connection-status"]["error-tag"], "data-missing") @@ -742,7 +742,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): "Request could not be completed because the relevant data model content does not exist") def test_44_xpdr_device_not_connected(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "node-info", None) + response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None) self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["node-info"]["error-type"], ("protocol", "application")) self.assertEqual(response["node-info"]["error-tag"], "data-missing") @@ -750,11 +750,11 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): "Request could not be completed because the relevant data model content does not exist") def test_45_xpdr_device_disconnection(self): - response = test_utils_rfc8040.unmount_device("XPDR-C2") + response = test_utils.unmount_device("XPDR-C2") self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) def test_46_xpdr_device_disconnected(self): - response = test_utils_rfc8040.check_device_connection("XPDR-C2") + response = test_utils.check_device_connection("XPDR-C2") self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["connection-status"]["error-type"], ("protocol", "application")) self.assertEqual(response["connection-status"]["error-tag"], "data-missing") @@ -762,7 +762,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase): "Request could not be completed because the relevant data model content does not exist") def test_47_xpdr_device_not_connected(self): - response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "node-info", None) + response = test_utils.get_portmapping_node_attr("XPDR-C2", "node-info", None) self.assertEqual(response["status_code"], requests.codes.conflict) self.assertIn(response["node-info"]["error-type"], ("protocol", "application")) self.assertEqual(response["node-info"]["error-tag"], "data-missing")