Switch functional tests to RFC8040
[transportpce.git] / tests / transportpce_tests / 7.1 / test03_renderer_or_modes.py
index ecb41ef931039479b2e7e7685f7bc8fa6e79cc0e..b8173c2aa06e36c1fc651bd323321ea86766d421 100644 (file)
@@ -21,7 +21,7 @@ import requests
 sys.path.append("transportpce_tests/common")
 # pylint: disable=wrong-import-position
 # pylint: disable=import-error
-import test_utils_rfc8040  # nopep8
+import test_utils  # nopep8
 
 
 class TransportPCE400GPortMappingTesting(unittest.TestCase):
@@ -44,15 +44,15 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
-        cls.processes = test_utils_rfc8040.start_tpce()
-        cls.processes = test_utils_rfc8040.start_sims([("xpdra2", cls.NODE_VERSION),
-                                                       ("xpdrc2", cls.NODE_VERSION)])
+        cls.processes = test_utils.start_tpce()
+        cls.processes = test_utils.start_sims([("xpdra2", cls.NODE_VERSION),
+                                               ("xpdrc2", cls.NODE_VERSION)])
 
     @classmethod
     def tearDownClass(cls):
         # pylint: disable=not-an-iterable
         for process in cls.processes:
-            test_utils_rfc8040.shutdown_process(process)
+            test_utils.shutdown_process(process)
         print("all processes killed")
 
     def setUp(self):
@@ -61,35 +61,35 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         time.sleep(10)
 
     def test_01_xpdr_device_connection(self):
-        response = test_utils_rfc8040.mount_device("XPDR-A2",
-                                                   ("xpdra2", self.NODE_VERSION))
+        response = test_utils.mount_device("XPDR-A2",
+                                           ("xpdra2", self.NODE_VERSION))
         self.assertEqual(response.status_code, requests.codes.created,
-                         test_utils_rfc8040.CODE_SHOULD_BE_201)
+                         test_utils.CODE_SHOULD_BE_201)
 
     def test_01a_xpdr_device_connection(self):
-        response = test_utils_rfc8040.mount_device("XPDR-C2",
-                                                   ("xpdrc2", self.NODE_VERSION))
+        response = test_utils.mount_device("XPDR-C2",
+                                           ("xpdrc2", self.NODE_VERSION))
         self.assertEqual(response.status_code, requests.codes.created,
-                         test_utils_rfc8040.CODE_SHOULD_BE_201)
+                         test_utils.CODE_SHOULD_BE_201)
 
     # Check the correct capabilities for the client ports (if-100GE, if-100GE-ODU4,
     # if-OCH-OTU4-ODU4)
     def test_02_check_client_capabilities(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-CLIENT1")
+        response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-CLIENT1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         self.assertEqual(
             self.CLIENT_CAPABILITIES,
             sorted(response["mapping"][0]["supported-interface-capability"]))
 
     def test_03_check_client_capabilities(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-CLIENT1")
+        response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-CLIENT1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         self.assertEqual(
             self.CLIENT_CAPABILITIES,
             sorted(response["mapping"][0]["supported-interface-capability"]))
 
     def test_04_100g_ofec_service_path_create(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             "transportpce-device-renderer", "service-path",
             {
                 "service-name": "service_100GE_ofec",
@@ -162,7 +162,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
              for x in response["output"]["node-interface"][1].keys()})
 
     def test_05_get_portmapping_network1(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
+        response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
         self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "FDvaQIf2Z08="
@@ -172,7 +172,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
             response["mapping"])
 
     def test_06_get_portmapping_network1(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
+        response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
         self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "AJpkaVmZKJk5"
@@ -183,7 +183,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
 
     def test_07_check_interface_otsi(self):
         # pylint: disable=line-too-long
-        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
+        response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
                         "administrative-state": "inService",
@@ -208,7 +208,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
 
     def test_08_check_interface_otsi(self):
         # pylint: disable=line-too-long
-        response = test_utils_rfc8040.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
+        response = test_utils.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
                         "administrative-state": "inService",
@@ -232,7 +232,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
 
     def test_09_check_interface_otsig(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
@@ -251,7 +251,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
 
     def test_10_check_interface_otsig(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
@@ -270,7 +270,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
 
     def test_11_check_interface_otuc1(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-OTUC1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
@@ -292,7 +292,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
 
     def test_12_check_interface_otuc1(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-OTUC1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
@@ -314,7 +314,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
 
     def test_13_check_interface_oduc1(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
@@ -340,7 +340,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
             response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
 
     def test_14_check_interface_oduc1(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
         self.assertEqual(response["status_code"], requests.codes.ok)
 
@@ -367,7 +367,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
             response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
 
     def test_15_check_interface_odu4(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
         self.assertEqual(response["status_code"], requests.codes.ok)
 
@@ -395,7 +395,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         # TODO: Check the trib-port numbering
 
     def test_16_check_interface_odu4(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
         self.assertEqual(response["status_code"], requests.codes.ok)
 
@@ -423,7 +423,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         # TODO: Check the trib-port numbering
 
     def test_17_check_interface_100ge_client(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
@@ -441,7 +441,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
 
     def test_18_check_interface_100ge_client(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
@@ -460,7 +460,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
 
     # Delete the service path
     def test_19_service_path_delete_100ge(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             "transportpce-device-renderer", "service-path",
             {
                 "service-name": "service_100GE_ofec",
@@ -488,67 +488,67 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         self.assertIn("Request processed", response["output"]["result"])
 
     def test_20_check_no_interface_100ge_client(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_21_check_no_interface_100ge_client(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_22_check_no_interface_odu4(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_23_check_no_interface_odu4(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_24_check_no_interface_otuc2(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_25_check_no_interface_otuc2(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     # Check if port-mapping data is updated, where the supporting-otucn is deleted
     def test_26_check_no_otuc1(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
+        response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
 
     def test_27_check_no_otuc1(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
+        response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
 
     def test_28_check_no_interface_otsig(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_29_check_no_interface_otsig(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_30_check_no_interface_otsi(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_31_check_no_interface_otsi(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     # 200G 31.6 Gbaud mode for muxponder, with qam16
     def test_32_service_path_create_otuc2(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             "transportpce-device-renderer", "service-path",
             {
                 "service-name": "service_OTUC2",
@@ -605,7 +605,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw="
 
     def test_33_get_portmapping_network1(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
+        response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
         self.assertEqual(response["status_code"], requests.codes.ok)
         self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
         del self.NETWORK1_CHECK_DICT["connection-map-lcp"]
@@ -615,7 +615,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
 
     def test_34_check_interface_otsi(self):
         # pylint: disable=line-too-long
-        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
+        response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR2-NETWORK1-265:272",
                         "administrative-state": "inService",
@@ -639,7 +639,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
 
     def test_35_check_interface_otsig(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G",
@@ -658,7 +658,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                              response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
 
     def test_36_check_interface_otuc2(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
         self.assertEqual(response["status_code"], requests.codes.ok)
         input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2",
@@ -683,7 +683,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
     # as there is no change in code and are covered in test02_otn_renderer
 
     def test_37_service_path_delete_otuc2(self):
-        response = test_utils_rfc8040.transportpce_api_rpc_request(
+        response = test_utils.transportpce_api_rpc_request(
             "transportpce-device-renderer", "service-path",
             {
                 "modulation-format": "dp-qam16",
@@ -712,29 +712,29 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
         del self.NETWORK1_CHECK_DICT["supporting-otucn"]
 
     def test_38_check_no_interface_otuc2(self):
-        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+        response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_39_check_no_interface_otsig(self):
-        response = test_utils_rfc8040.check_node_attribute_request(
+        response = test_utils.check_node_attribute_request(
             "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_40_check_no_interface_otsi(self):
-        response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+        response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
         self.assertEqual(response["status_code"], requests.codes.conflict)
 
     def test_41_check_no_otuc2(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
+        response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
         self.assertRaises(KeyError, lambda: response["supporting-otucn"])
 
     # Disconnect the XPDR
     def test_42_xpdr_device_disconnection(self):
-        response = test_utils_rfc8040.unmount_device("XPDR-A2")
+        response = test_utils.unmount_device("XPDR-A2")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
     def test_43_xpdr_device_disconnected(self):
-        response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+        response = test_utils.check_device_connection("XPDR-A2")
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
         self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
@@ -742,7 +742,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                          "Request could not be completed because the relevant data model content does not exist")
 
     def test_44_xpdr_device_not_connected(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "node-info", None)
+        response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None)
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
         self.assertEqual(response["node-info"]["error-tag"], "data-missing")
@@ -750,11 +750,11 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                          "Request could not be completed because the relevant data model content does not exist")
 
     def test_45_xpdr_device_disconnection(self):
-        response = test_utils_rfc8040.unmount_device("XPDR-C2")
+        response = test_utils.unmount_device("XPDR-C2")
         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
 
     def test_46_xpdr_device_disconnected(self):
-        response = test_utils_rfc8040.check_device_connection("XPDR-C2")
+        response = test_utils.check_device_connection("XPDR-C2")
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
         self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
@@ -762,7 +762,7 @@ class TransportPCE400GPortMappingTesting(unittest.TestCase):
                          "Request could not be completed because the relevant data model content does not exist")
 
     def test_47_xpdr_device_not_connected(self):
-        response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "node-info", None)
+        response = test_utils.get_portmapping_node_attr("XPDR-C2", "node-info", None)
         self.assertEqual(response["status_code"], requests.codes.conflict)
         self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
         self.assertEqual(response["node-info"]["error-tag"], "data-missing")