response['output']['node-interface'])
def test_03_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC2"
self.assertIn(
'odu-interface-id': ['XPDR2-NETWORK1-ODUC2']}, response['output']['node-interface'])
def test_08_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC2"
self.assertIn(
# Check if port-mapping data is updated, where the supporting-oducn is deleted
def test_21a_check_no_oduc2(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-oducn"])
# 1f) Delete OTUC2 device interfaces
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_25a_check_no_otuc2(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
# 2a) create a OTUC3 device renderer
})
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
- self.assertIn(
- {'node-id': 'XPDR-A2',
- 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3'],
- 'och-interface-id': ['XPDR2-NETWORK1-755:768',
- 'XPDR2-NETWORK1-OTSIGROUP-300G']},
- response['output']['node-interface'])
+ expected_subset_response = {
+ 'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC3']}
+ expected_sorted_list = ['XPDR2-NETWORK1-755:768',
+ 'XPDR2-NETWORK1-OTSIGROUP-300G']
+ subset = {k: v for k, v in response['output']['node-interface'][0].items() if k in expected_subset_response}
+ self.assertDictEqual(subset, expected_subset_response)
+ self.assertEqual(sorted(response['output']['node-interface'][0]['och-interface-id']), expected_sorted_list)
def test_27_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC3"
self.assertIn(
'odu-interface-id': ['XPDR2-NETWORK1-ODUC3']}, response['output']['node-interface'])
def test_32_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC3"
self.assertIn(
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_35a_check_no_oduc3(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-oducn"])
# 2f) Delete OTUC3 device interfaces
def test_37_check_no_interface_otuc3(self):
response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC3")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_38_check_no_interface_otsig(self):
response = test_utils_rfc8040.check_node_attribute_request(
"XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-300G")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_39_check_no_interface_otsi(self):
response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_39a_check_no_otuc3(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
# 3a) create a OTUC4 device renderer
})
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
- self.assertIn(
- {'node-id': 'XPDR-A2',
- 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4'],
- 'och-interface-id': ['XPDR2-NETWORK1-755:768',
- 'XPDR2-NETWORK1-OTSIGROUP-400G']},
- response['output']['node-interface'])
+ expected_subset_response = {
+ 'node-id': 'XPDR-A2',
+ 'otu-interface-id': ['XPDR2-NETWORK1-OTUC4']}
+ expected_sorted_list = ['XPDR2-NETWORK1-755:768',
+ 'XPDR2-NETWORK1-OTSIGROUP-400G']
+ subset = {k: v for k, v in response['output']['node-interface'][0].items() if k in expected_subset_response}
+ self.assertDictEqual(subset, expected_subset_response)
+ self.assertEqual(sorted(response['output']['node-interface'][0]['och-interface-id']), expected_sorted_list)
def test_41_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-otucn"] = "XPDR2-NETWORK1-OTUC4"
self.assertIn(
'odu-interface-id': ['XPDR2-NETWORK1-ODUC4']}, response['output']['node-interface'])
def test_46_get_portmapping_network1(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK2_CHECK_DICT["supporting-oducn"] = "XPDR2-NETWORK1-ODUC4"
self.assertIn(
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_49a_check_no_oduc4(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-oducn"])
# 3f) Delete OTUC4 device interfaces
def test_51_check_no_interface_otuc4(self):
response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-OTUC4")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_52_check_no_interface_otsig(self):
response = test_utils_rfc8040.check_node_attribute_request(
"XPDR-A1", "interface", "XPDR2-NETWORK1-OTSIGROUP-400G")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_53_check_no_interface_otsi(self):
response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR2-NETWORK1-755:768")
- self.assertEqual(response['status_code'], requests.codes.conflict)
+ self.assertIn(response['status_code'], (requests.codes.conflict, requests.codes.service_unavailable))
def test_53a_check_no_otuc4(self):
- response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "mapping", "XPDR2-NETWORK1")
self.assertRaises(KeyError, lambda: response["supporting-otucn"])
# Disconnect the XPDR
'Request could not be completed because the relevant data model content does not exist')
def test_56_xpdr_device_not_connected(self):
- response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-A2", "node-info", None)
self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(response['node-info']['error-type'], ('protocol', 'application'))
self.assertEqual(response['node-info']['error-tag'], 'data-missing')