sys.path.append("transportpce_tests/common")
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCE400GPortMappingTesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([("xpdra2", cls.NODE_VERSION),
- ("xpdrc2", cls.NODE_VERSION)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([("xpdra2", cls.NODE_VERSION),
+ ("xpdrc2", cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(10)
def test_01_xpdr_device_connection(self):
- response = test_utils_rfc8040.mount_device("XPDR-A2",
- ("xpdra2", self.NODE_VERSION))
+ response = test_utils.mount_device("XPDR-A2",
+ ("xpdra2", self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils_rfc8040.CODE_SHOULD_BE_201)
+ test_utils.CODE_SHOULD_BE_201)
def test_01a_xpdr_device_connection(self):
- response = test_utils_rfc8040.mount_device("XPDR-C2",
- ("xpdrc2", self.NODE_VERSION))
+ response = test_utils.mount_device("XPDR-C2",
+ ("xpdrc2", self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils_rfc8040.CODE_SHOULD_BE_201)
+ test_utils.CODE_SHOULD_BE_201)
# Check the correct capabilities for the client ports (if-100GE, if-100GE-ODU4,
# if-OCH-OTU4-ODU4)
def test_02_check_client_capabilities(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-CLIENT1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-CLIENT1")
self.assertEqual(response["status_code"], requests.codes.ok)
self.assertEqual(
self.CLIENT_CAPABILITIES,
sorted(response["mapping"][0]["supported-interface-capability"]))
def test_03_check_client_capabilities(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-CLIENT1")
+ response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-CLIENT1")
self.assertEqual(response["status_code"], requests.codes.ok)
self.assertEqual(
self.CLIENT_CAPABILITIES,
sorted(response["mapping"][0]["supported-interface-capability"]))
def test_04_100g_ofec_service_path_create(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
"transportpce-device-renderer", "service-path",
{
"service-name": "service_100GE_ofec",
for x in response["output"]["node-interface"][1].keys()})
def test_05_get_portmapping_network1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
self.assertEqual(response["status_code"], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "FDvaQIf2Z08="
response["mapping"])
def test_06_get_portmapping_network1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
self.assertEqual(response["status_code"], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR3-NETWORK1-OTUC1"
self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "AJpkaVmZKJk5"
def test_07_check_interface_otsi(self):
# pylint: disable=line-too-long
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
"administrative-state": "inService",
def test_08_check_interface_otsi(self):
# pylint: disable=line-too-long
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
+ response = test_utils.check_node_attribute_request("XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-NETWORK1-265:272",
"administrative-state": "inService",
response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
def test_09_check_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
def test_10_check_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-NETWORK1-OTSIGROUP-100G",
response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
def test_11_check_interface_otuc1(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-OTUC1")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
def test_12_check_interface_otuc1(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-OTUC1")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-NETWORK1-OTUC1",
response["interface"][0]["org-openroadm-otn-otu-interfaces:otu"])
def test_13_check_interface_oduc1(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-NETWORK1-ODUC1",
response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
def test_14_check_interface_oduc1(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
self.assertEqual(response["status_code"], requests.codes.ok)
response["interface"][0]["org-openroadm-otn-odu-interfaces:odu"]["opu"])
def test_15_check_interface_odu4(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
self.assertEqual(response["status_code"], requests.codes.ok)
# TODO: Check the trib-port numbering
def test_16_check_interface_odu4(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
self.assertEqual(response["status_code"], requests.codes.ok)
# TODO: Check the trib-port numbering
def test_17_check_interface_100ge_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
response["interface"][0]["org-openroadm-ethernet-interfaces:ethernet"])
def test_18_check_interface_100ge_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR3-CLIENT1-ETHERNET-100G",
# Delete the service path
def test_19_service_path_delete_100ge(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
"transportpce-device-renderer", "service-path",
{
"service-name": "service_100GE_ofec",
self.assertIn("Request processed", response["output"]["result"])
def test_20_check_no_interface_100ge_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-CLIENT1-ETHERNET")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_21_check_no_interface_100ge_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-CLIENT1-ETHERNET")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_22_check_no_interface_odu4(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-ODU4")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_23_check_no_interface_odu4(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-ODU4")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_24_check_no_interface_otuc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-ODUC1")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_25_check_no_interface_otuc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-ODUC1")
self.assertEqual(response["status_code"], requests.codes.conflict)
# Check if port-mapping data is updated, where the supporting-otucn is deleted
def test_26_check_no_otuc1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR3-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
def test_27_check_no_otuc1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-C2", "mapping", "XPDR3-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
def test_28_check_no_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_29_check_no_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-OTSIGROUP-100G")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_30_check_no_interface_otsi(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR3-NETWORK1-265:272")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_31_check_no_interface_otsi(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-C2", "interface", "XPDR3-NETWORK1-265:272")
self.assertEqual(response["status_code"], requests.codes.conflict)
# 200G 31.6 Gbaud mode for muxponder, with qam16
def test_32_service_path_create_otuc2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
"transportpce-device-renderer", "service-path",
{
"service-name": "service_OTUC2",
self.NETWORK1_CHECK_DICT["lcp-hash-val"] = "LY9PxYJqUbw="
def test_33_get_portmapping_network1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response["status_code"], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
del self.NETWORK1_CHECK_DICT["connection-map-lcp"]
def test_34_check_interface_otsi(self):
# pylint: disable=line-too-long
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-265:272")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR2-NETWORK1-265:272",
"administrative-state": "inService",
response["interface"][0]["org-openroadm-optical-tributary-signal-interfaces:otsi"]["flexo"])
def test_35_check_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR2-NETWORK1-OTSIGROUP-200G",
response["interface"][0]["org-openroadm-otsi-group-interfaces:otsi-group"])
def test_36_check_interface_otuc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
self.assertEqual(response["status_code"], requests.codes.ok)
input_dict_1 = {"name": "XPDR2-NETWORK1-OTUC2",
# as there is no change in code and are covered in test02_otn_renderer
def test_37_service_path_delete_otuc2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
"transportpce-device-renderer", "service-path",
{
"modulation-format": "dp-qam16",
del self.NETWORK1_CHECK_DICT["supporting-otucn"]
def test_38_check_no_interface_otuc2(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_39_check_no_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_40_check_no_interface_otsi(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
self.assertEqual(response["status_code"], requests.codes.conflict)
def test_41_check_no_otuc2(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
# Disconnect the XPDR
def test_42_xpdr_device_disconnection(self):
- response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ response = test_utils.unmount_device("XPDR-A2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_43_xpdr_device_disconnected(self):
- response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+ response = test_utils.check_device_connection("XPDR-A2")
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
"Request could not be completed because the relevant data model content does not exist")
def test_44_xpdr_device_not_connected(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "node-info", None)
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None)
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
self.assertEqual(response["node-info"]["error-tag"], "data-missing")
"Request could not be completed because the relevant data model content does not exist")
def test_45_xpdr_device_disconnection(self):
- response = test_utils_rfc8040.unmount_device("XPDR-C2")
+ response = test_utils.unmount_device("XPDR-C2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_46_xpdr_device_disconnected(self):
- response = test_utils_rfc8040.check_device_connection("XPDR-C2")
+ response = test_utils.check_device_connection("XPDR-C2")
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["connection-status"]["error-type"], ("protocol", "application"))
self.assertEqual(response["connection-status"]["error-tag"], "data-missing")
"Request could not be completed because the relevant data model content does not exist")
def test_47_xpdr_device_not_connected(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C2", "node-info", None)
+ response = test_utils.get_portmapping_node_attr("XPDR-C2", "node-info", None)
self.assertEqual(response["status_code"], requests.codes.conflict)
self.assertIn(response["node-info"]["error-type"], ("protocol", "application"))
self.assertEqual(response["node-info"]["error-tag"], "data-missing")