sys.path.append('transportpce_tests/common')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
class TransportPCE400GPortMappingTesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils_rfc8040.start_tpce()
- cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
# pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
- time.sleep(10)
+ time.sleep(2)
def test_01_xpdr_device_connection(self):
- response = test_utils_rfc8040.mount_device("XPDR-A2",
- ('xpdra2', self.NODE_VERSION))
+ response = test_utils.mount_device("XPDR-A2",
+ ('xpdra2', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils_rfc8040.CODE_SHOULD_BE_201)
+ test_utils.CODE_SHOULD_BE_201)
# Check if the node appears in the ietf-network topology
# this test has been removed, since it already exists in port-mapping
# 1a) create a OTUC2 device renderer
def test_02_service_path_create_otuc2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC2',
response['output']['node-interface'])
def test_03_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
self.assertIn(
def test_04_check_interface_otsi(self):
# pylint: disable=line-too-long
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
'administrative-state': 'inService',
response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
def test_05_check_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-200G',
response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
def test_06_check_interface_otuc2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC2',
# 1b) create a ODUC2 device renderer
def test_07_otn_service_path_create_oduc2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC2',
'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
def test_08_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
self.assertIn(
response['mapping'])
def test_09_check_interface_oduc2(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC2',
# 1c) create Ethernet device renderer
def test_10_otn_service_path_create_100ge(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_Ethernet',
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Otn Service path was set up successfully for node :XPDR-A2', response['output']['result'])
self.assertEqual('XPDR-A2', response['output']['node-interface'][0]['node-id'])
- self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+ self.assertIn('XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
response['output']['node-interface'][0]['connection-id'])
self.assertIn('XPDR2-CLIENT1-ETHERNET-100G', response['output']['node-interface'][0]['eth-interface-id'])
- self.assertIn('XPDR2-NETWORK1-ODU4-service_Ethernet',
+ self.assertIn('XPDR2-NETWORK1-ODU4:service_Ethernet',
response['output']['node-interface'][0]['odu-interface-id'])
- self.assertIn('XPDR2-CLIENT1-ODU4-service_Ethernet',
+ self.assertIn('XPDR2-CLIENT1-ODU4:service_Ethernet',
response['output']['node-interface'][0]['odu-interface-id'])
def test_11_check_interface_100ge_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
def test_12_check_interface_odu4_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
- "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+ response = test_utils.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4:service_Ethernet")
self.assertEqual(response['status_code'], requests.codes.ok)
- input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4-service_Ethernet',
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service_Ethernet',
'administrative-state': 'inService',
'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_13_check_interface_odu4_network(self):
- response = test_utils_rfc8040.check_node_attribute_request(
- "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+ response = test_utils.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4:service_Ethernet")
self.assertEqual(response['status_code'], requests.codes.ok)
- input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4-service_Ethernet',
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service_Ethernet',
'administrative-state': 'inService',
'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
'supporting-interface-list': 'XPDR2-NETWORK1-ODUC2',
['opucn-trib-slots'])
def test_14_check_odu_connection_xpdra2(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2",
- "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+ "odu-connection", "XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
- 'XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet',
+ 'XPDR2-NETWORK1-ODU4-x-XPDR2-CLIENT1-ODU4',
'direction': 'bidirectional'
}
self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
response['odu-connection'][0])
- self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4-service_Ethernet'},
+ self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service_Ethernet'},
response['odu-connection'][0]['destination'])
- self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4-service_Ethernet'},
+ self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service_Ethernet'},
response['odu-connection'][0]['source'])
# 1d) Delete Ethernet device interfaces
def test_15_otn_service_path_delete_100ge(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_Ethernet',
self.assertIn('Request processed', response['output']['result'])
def test_16_check_no_odu_connection(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2",
- "odu-connection", "XPDR2-CLIENT1-ODU4-service_Ethernet-x-XPDR2-NETWORK1-ODU4-service_Ethernet")
+ "odu-connection", "XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_17_check_no_interface_odu_network(self):
- response = test_utils_rfc8040.check_node_attribute_request(
- "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4-service_Ethernet")
+ response = test_utils.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-ODU4:service_Ethernet")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_18_check_no_interface_odu_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
- "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4-service_Ethernet")
+ response = test_utils.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-CLIENT1-ODU4:service_Ethernet")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_check_no_interface_100ge_client(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-CLIENT1-ETHERNET-100G")
self.assertEqual(response['status_code'], requests.codes.conflict)
# 1e) Delete ODUC2 device interfaces
def test_20_otn_service_path_delete_oduc2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC2',
del self.NETWORK2_CHECK_DICT["supporting-oducn"]
def test_21_check_no_interface_oduc2(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC2")
self.assertEqual(response['status_code'], requests.codes.conflict)
# Check if port-mapping data is updated, where the supporting-oducn is deleted
def test_21a_check_no_oduc2(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-oducn"])
# 1f) Delete OTUC2 device interfaces
def test_22_service_path_delete_otuc2(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC2',
del self.NETWORK2_CHECK_DICT["supporting-otucn"]
def test_23_check_no_interface_otuc2(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC2")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC2")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_24_check_no_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
- "XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
+ response = test_utils.check_node_attribute_request(
+ "XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-200G")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_25_check_no_interface_otsi(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_25a_check_no_otuc2(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
# 2a) create a OTUC3 device renderer
def test_26_service_path_create_otuc3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC3',
})
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
- self.assertIn(
- {'node-id': 'XPDR-A2',
- 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
- 'och-interface-id': ['XPDR2-NETWORK1-755:768',
- 'XPDR2-NETWORK1-OTSIGROUP-300G']},
- response['output']['node-interface'])
+ expected_subset_response = {
+ 'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3']}
+ expected_sorted_list = ['XPDR2-NETWORK1-755:768',
+ 'XPDR2-NETWORK1-OTSIGROUP-300G']
+ subset = {k: v for k, v in response['output']['node-interface'][0].items() if k in expected_subset_response}
+ self.assertDictEqual(subset, expected_subset_response)
+ self.assertEqual(sorted(response['output']['node-interface'][0]['och-interface-id']), expected_sorted_list)
def test_27_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
self.assertIn(
def test_28_check_interface_otsi(self):
# pylint: disable=line-too-long
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
def test_29_check_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-300G',
response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
def test_30_check_interface_otuc3(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC3")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC3',
# 2b) create a ODUC3 device renderer
def test_31_otn_service_path_create_oduc3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC3',
'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
def test_32_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
self.assertIn(
response['mapping'])
def test_33_check_interface_oduc3(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC3',
# 2e) Delete ODUC3 device interfaces
def test_34_otn_service_path_delete_oduc3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC3',
del self.NETWORK2_CHECK_DICT["supporting-oducn"]
def test_35_check_no_interface_oduc3(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC3")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_35a_check_no_oduc3(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-oducn"])
# 2f) Delete OTUC3 device interfaces
def test_36_service_path_delete_otuc3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC3',
del self.NETWORK2_CHECK_DICT["supporting-otucn"]
def test_37_check_no_interface_otuc3(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ response = test_utils.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_38_check_no_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_39_check_no_interface_otsi(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ response = test_utils.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_39a_check_no_otuc3(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
# 3a) create a OTUC4 device renderer
def test_40_service_path_create_otuc4(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC4',
})
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
- self.assertIn(
- {'node-id': 'XPDR-A2',
- 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
- 'och-interface-id': ['XPDR2-NETWORK1-755:768',
- 'XPDR2-NETWORK1-OTSIGROUP-400G']},
- response['output']['node-interface'])
+ expected_subset_response = {
+ 'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4']}
+ expected_sorted_list = ['XPDR2-NETWORK1-755:768',
+ 'XPDR2-NETWORK1-OTSIGROUP-400G']
+ subset = {k: v for k, v in response['output']['node-interface'][0].items() if k in expected_subset_response}
+ self.assertDictEqual(subset, expected_subset_response)
+ self.assertEqual(sorted(response['output']['node-interface'][0]['och-interface-id']), expected_sorted_list)
def test_41_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
self.assertIn(
def test_42_check_interface_otsi(self):
# pylint: disable=line-too-long
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-755:768")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-755:768',
response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
def test_43_check_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
def test_44_check_interface_otuc4(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A2", "interface", "XPDR2-NETWORK1-OTUC4")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
# 3b) create a ODUC4 device renderer
def test_45_otn_service_path_create_oduc3(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC4',
'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
def test_46_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
self.assertIn(
response['mapping'])
def test_47_check_interface_oduc4(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
# 3e) Delete ODUC4 device interfaces
def test_48_otn_service_path_delete_oduc4(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'otn-service-path',
{
'service-name': 'service_ODUC4',
del self.NETWORK2_CHECK_DICT["supporting-oducn"]
def test_49_check_no_interface_oduc4(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
+ response = test_utils.check_node_attribute_request("XPDR-A2", "interface", "XPDR2-NETWORK1-ODUC4")
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_49a_check_no_oduc4(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-oducn"])
# 3f) Delete OTUC4 device interfaces
def test_50_service_path_delete_otuc4(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-device-renderer', 'service-path',
{
'service-name': 'service_OTUC4',
del self.NETWORK2_CHECK_DICT["supporting-otucn"]
def test_51_check_no_interface_otuc4(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ response = test_utils.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_52_check_no_interface_otsig(self):
- response = test_utils_rfc8040.check_node_attribute_request(
+ response = test_utils.check_node_attribute_request(
"XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_53_check_no_interface_otsi(self):
- response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ response = test_utils.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_53a_check_no_otuc4(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
# Disconnect the XPDR
def test_54_xpdr_device_disconnection(self):
- response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ response = test_utils.unmount_device("XPDR-A2")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_55_xpdr_device_disconnected(self):
- response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+ response = test_utils.check_device_connection("XPDR-A2")
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['connection-status']['error-type'], ('protocol', 'application'))
self.assertEqual(response['connection-status']['error-tag'], 'data-missing')
'Request could not be completed because the relevant data model content does not exist')
def test_56_xpdr_device_not_connected(self):
- response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+ response = test_utils.get_portmapping_node_attr("XPDR-A2", "node-info", None)
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
self.assertEqual(response['node-info']['error-tag'], 'data-missing')