sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(5)
def test_01_connect_SPDR_SA1(self):
- response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(10)
- response = test_utils.get_netconf_oper_request("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
def test_02_get_portmapping_CLIENT4(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res_mapping = (response.json())['mapping'][0]
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-CLIENT4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ res_mapping = response['mapping'][0]
self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
def test_03_get_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
self.NETWORK1_CHECK_DICT,
- res['mapping'])
+ response['mapping'])
def test_04_service_path_create_OCH_OTU4(self):
- response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
- [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
- 196.1, 40, 196.075, 196.125, 761,
- 768)
- time.sleep(3)
+ data = {
+ 'input': {
+ 'service-name': 'service_test',
+ 'wave-number': '7',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'create',
+ 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ }
+ }
+ response = test_utils_rfc8040.device_renderer_service_path_request(data)
self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Interfaces created successfully for nodes: ', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
- self.assertIn(
- {'node-id': 'SPDR-SA1',
- 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
- 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
def test_05_get_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
self.assertIn(
self.NETWORK1_CHECK_DICT,
- res['mapping'])
+ response['mapping'])
def test_06_check_interface_och(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
+ self.assertEqual(response['status_code'], requests.codes.ok)
- self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
- 'administrative-state': 'inService',
- 'supporting-circuit-pack-name': 'CP1-CFP0',
- 'type': 'org-openroadm-interfaces:opticalChannel',
- 'supporting-port': 'CP1-CFP0-P1'
- }),
- res['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP1-CFP0',
+ 'type': 'org-openroadm-interfaces:opticalChannel',
+ 'supporting-port': 'CP1-CFP0-P1'
+ }),
+ response['interface'][0])
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ self.assertIn(
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
+ [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
+ {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
+ 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
def test_07_check_interface_OTU(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'fec': 'scfec'
}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
- self.assertDictEqual(input_dict_2,
- res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'], **input_dict_2),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_08_otn_service_path_create_ODU4(self):
- response = test_utils.otn_service_path_request("create", "service_ODU4", "100", "ODU",
- [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ data = {
+ 'input': {
+ 'service-name': 'service_ODU4',
+ 'operation': 'create',
+ 'service-rate': '100',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
+ }
+ }
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
+ self.assertTrue(response['output']['success'])
self.assertIn(
{'node-id': 'SPDR-SA1',
- 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
+ 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, response['output']['node-interface'])
def test_09_get_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
self.assertIn(
self.NETWORK1_CHECK_DICT,
- res['mapping'])
+ response['mapping'])
def test_10_check_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
'type': 'org-openroadm-interfaces:otnOdu',
input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
'rate': 'org-openroadm-otn-common-types:ODU4'}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2
),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_11_otn_service_path_create_10GE(self):
- response = test_utils.otn_service_path_request("create", "service1", "10", "Ethernet",
- [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
- "network-tp": "XPDR1-NETWORK1"}],
- {"ethernet-encoding": "eth encode",
- "trib-slot": ["1"], "trib-port-number": "1"})
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
- self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
+ data = {
+ 'input': {
+ 'service-name': 'service1',
+ 'operation': 'create',
+ 'service-rate': '10',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'trib-slot': ['1'],
+ 'trib-port-number': '1'
+ }
+ }
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
+ self.assertTrue(response['output']['success'])
+ self.assertEqual('SPDR-SA1', response['output']['node-interface'][0]['node-id'])
self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
- res["output"]['node-interface'][0]['connection-id'])
- self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
- self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
- self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
+ response['output']['node-interface'][0]['connection-id'])
+ self.assertIn('XPDR1-CLIENT4-ETHERNET10G', response['output']['node-interface'][0]['eth-interface-id'])
+ self.assertIn('XPDR1-NETWORK1-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
+ self.assertIn('XPDR1-CLIENT4-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
def test_12_check_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(res['interface'][0], **input_dict),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(response['interface'][0], **input_dict),
+ response['interface'][0])
+ self.assertEqual(10000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_13_check_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
'administrative-state': 'inService',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_14_check_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'supporting-interface': 'XPDR1-NETWORK1-ODU4',
input_dict_3 = {'trib-port-number': 1}
- self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'], **input_dict_3
),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
def test_15_check_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1",
- "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
'direction': 'bidirectional'
}
- self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(response['odu-connection'][0], **input_dict_1),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_16_otn_service_path_delete_10GE(self):
- response = test_utils.otn_service_path_request("delete", "service1", "10", "Ethernet",
- [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
- "network-tp": "XPDR1-NETWORK1"}],
- {"ethernet-encoding": "eth encode",
- "trib-slot": ["1"], "trib-port-number": "1"})
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Request processed', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ data = {
+ 'input': {
+ 'service-name': 'service1',
+ 'operation': 'delete',
+ 'service-rate': '10',
+ 'service-format': 'Ethernet',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
+ 'ethernet-encoding': 'eth encode',
+ 'trib-slot': ['1'],
+ 'trib-port-number': '1'
+ }
+ }
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+ self.assertTrue(response['output']['success'])
def test_17_check_no_ODU2E_connection(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1",
- "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_18_check_no_interface_ODU2E_NETWORK(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_check_no_interface_ODU2E_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_20_check_no_interface_10GE_CLIENT(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_21_otn_service_path_delete_ODU4(self):
- response = test_utils.otn_service_path_request("delete", "service_ODU4", "100", "ODU",
- [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
- time.sleep(3)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Request processed', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
+ data = {
+ 'input': {
+ 'service-name': 'service_ODU4',
+ 'operation': 'delete',
+ 'service-rate': '100',
+ 'service-format': 'ODU',
+ 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
+ }
+ }
+ response = test_utils_rfc8040.device_renderer_otn_service_path_request(data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Request processed', response['output']['result'])
+ self.assertTrue(response['output']['success'])
def test_22_check_no_interface_ODU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_23_service_path_delete_OCH_OTU4(self):
- response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
- [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
- 196.1, 40, 196.075, 196.125, 761,
- 768)
- time.sleep(3)
+ data = {
+ 'input': {
+ 'service-name': 'service_test',
+ 'wave-number': '7',
+ 'modulation-format': 'dp-qpsk',
+ 'operation': 'delete',
+ 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
+ 'center-freq': 196.1,
+ 'nmc-width': 40,
+ 'min-freq': 196.075,
+ 'max-freq': 196.125,
+ 'lower-spectral-slot-number': 761,
+ 'higher-spectral-slot-number': 768
+ }
+ }
+ response = test_utils_rfc8040.device_renderer_service_path_request(data)
self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Request processed', res["output"]["result"])
- self.assertTrue(res["output"]["success"])
def test_24_check_no_interface_OTU4(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_25_check_no_interface_OCH(self):
- response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_26_disconnect_SPDR_SA1(self):
- response = test_utils.unmount_device("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":