sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(10)
- response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
+ response = test_utils.check_device_connection("SPDR-SA1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['connection-status'], 'connected')
def test_02_get_portmapping_CLIENT4(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-CLIENT4")
+ response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-CLIENT4")
self.assertEqual(response['status_code'], requests.codes.ok)
res_mapping = response['mapping'][0]
self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
def test_03_get_portmapping_NETWORK1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
self.NETWORK1_CHECK_DICT,
response['mapping'])
def test_04_service_path_create_OCH_OTU4(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
'och-interface-id': ['XPDR1-NETWORK1-761:768']}, response['output']['node-interface'])
def test_05_get_portmapping_NETWORK1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
self.assertIn(
response['mapping'])
def test_06_check_interface_och(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict(response['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
self.assertEqual(float(intf['transmit-power']), -5)
def test_07_check_interface_OTU(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_08_otn_service_path_create_ODU4(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODU4',
'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, response['output']['node-interface'])
def test_09_get_portmapping_NETWORK1(self):
- response = test_utils_rfc8040.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
response['mapping'])
def test_10_check_interface_ODU4(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_11_otn_service_path_create_10GE(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service1',
self.assertIn('XPDR1-CLIENT4-ODU2e', response['output']['node-interface'][0]['odu-interface-id'])
def test_12_check_interface_10GE_CLIENT(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
'administrative-state': 'inService',
self.assertEqual(10000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_13_check_interface_ODU2E_CLIENT(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e")
self.assertEqual(response['status_code'], requests.codes.ok)
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_14_check_interface_ODU2E_NETWORK(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e', 'administrative-state': 'inService',
'parent-odu-allocation']['trib-slots'])
def test_15_check_ODU2E_connection(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-x-XPDR1-NETWORK1-ODU2e")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
response['odu-connection'][0]['source'])
def test_16_otn_service_path_delete_10GE(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service1',
self.assertTrue(response['output']['success'])
def test_17_check_no_ODU2E_connection(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-x-XPDR1-NETWORK1-ODU2e")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_18_check_no_interface_ODU2E_NETWORK(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_check_no_interface_ODU2E_CLIENT(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_20_check_no_interface_10GE_CLIENT(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_21_otn_service_path_delete_ODU4(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODU4',
self.assertTrue(response['output']['success'])
def test_22_check_no_interface_ODU4(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_23_service_path_delete_OCH_OTU4(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_test',
self.assertTrue(response['output']['success'])
def test_24_check_no_interface_OTU4(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_25_check_no_interface_OCH(self):
- response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1")
+ response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_26_disconnect_SPDR_SA1(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ response = test_utils.unmount_device("SPDR-SA1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))