X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=tests%2Ftransportpce_tests%2F2.2.1%2Ftest07_otn_renderer.py;h=20f1d04142cc33b113a26dfd53cbceb0f9edfe2a;hb=8ec42577fc9bb12bed1a7689cd4a17487e3c1ff6;hp=d6f47f7d74bf5f423b003ec4f3e582069a486fa7;hpb=3e33c429ca86add1881d7586a2d0c3c60ddac3c7;p=transportpce.git diff --git a/tests/transportpce_tests/2.2.1/test07_otn_renderer.py b/tests/transportpce_tests/2.2.1/test07_otn_renderer.py index d6f47f7d7..20f1d0414 100644 --- a/tests/transportpce_tests/2.2.1/test07_otn_renderer.py +++ b/tests/transportpce_tests/2.2.1/test07_otn_renderer.py @@ -15,9 +15,12 @@ import unittest import time import requests +# pylint: disable=wrong-import-order import sys sys.path.append('transportpce_tests/common/') -import test_utils +# pylint: disable=wrong-import-position +# pylint: disable=import-error +import test_utils_rfc8040 # nopep8 class TransportPCEtesting(unittest.TestCase): @@ -39,35 +42,32 @@ class TransportPCEtesting(unittest.TestCase): @classmethod def setUpClass(cls): - cls.processes = test_utils.start_tpce() - cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)]) + cls.processes = test_utils_rfc8040.start_tpce() + cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION)]) @classmethod def tearDownClass(cls): # pylint: disable=not-an-iterable for process in cls.processes: - test_utils.shutdown_process(process) + test_utils_rfc8040.shutdown_process(process) print("all processes killed") def setUp(self): time.sleep(5) def test_01_connect_SPDR_SA1(self): - response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) - self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201) + response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION)) + self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201) time.sleep(10) - response = test_utils.get_netconf_oper_request("SPDR-SA1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertEqual( - res['node'][0]['netconf-node-topology:connection-status'], - 'connected') + response = test_utils_rfc8040.check_device_connection("SPDR-SA1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertEqual(response['connection-status'], 'connected') def test_02_get_portmapping_CLIENT4(self): - response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4") - self.assertEqual(response.status_code, requests.codes.ok) - res_mapping = (response.json())['mapping'][0] + response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-CLIENT4") + self.assertEqual(response['status_code'], requests.codes.ok) + res_mapping = response['mapping'][0] self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port']) self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name']) self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point']) @@ -81,58 +81,66 @@ class TransportPCEtesting(unittest.TestCase): self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability']) def test_03_get_portmapping_NETWORK1(self): - response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) self.assertIn( self.NETWORK1_CHECK_DICT, - res['mapping']) + response['mapping']) def test_04_service_path_create_OCH_OTU4(self): - response = test_utils.service_path_request("create", "service_OCH_OTU4", "1", - [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}], - 196.1, 40, 196.075, 196.125, 761, - 768) - time.sleep(3) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"]) - self.assertTrue(res["output"]["success"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-device-renderer', 'service-path', + { + 'service-name': 'service_test', + 'wave-number': '7', + 'modulation-format': 'dp-qpsk', + 'operation': 'create', + 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}], + 'center-freq': 196.1, + 'nmc-width': 40, + 'min-freq': 196.075, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 761, + 'higher-spectral-slot-number': 768 + }) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Interfaces created successfully for nodes: ', response['output']['result']) + self.assertTrue(response['output']['success']) self.assertIn( {'node-id': 'SPDR-SA1', 'otu-interface-id': ['XPDR1-NETWORK1-OTU'], - 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface']) + 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, response['output']['node-interface']) def test_05_get_portmapping_NETWORK1(self): - response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) + self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU" self.assertIn( self.NETWORK1_CHECK_DICT, - res['mapping']) + response['mapping']) def test_06_check_interface_och(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - - self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768', - 'administrative-state': 'inService', - 'supporting-circuit-pack-name': 'CP1-CFP0', - 'type': 'org-openroadm-interfaces:opticalChannel', - 'supporting-port': 'CP1-CFP0-P1' - }), - res['interface'][0]) + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768") + self.assertEqual(response['status_code'], requests.codes.ok) - self.assertDictEqual( - {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G', - u'transmit-power': -5, u'modulation-format': 'dp-qpsk'}, - res['interface'][0]['org-openroadm-optical-channel-interfaces:och']) + self.assertDictEqual(dict(response['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768', + 'administrative-state': 'inService', + 'supporting-circuit-pack-name': 'CP1-CFP0', + 'type': 'org-openroadm-interfaces:opticalChannel', + 'supporting-port': 'CP1-CFP0-P1' + }), + response['interface'][0]) + + self.assertIn( + response['interface'][0]['org-openroadm-optical-channel-interfaces:och'], + [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G', + 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'}, + {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G', + 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}]) def test_07_check_interface_OTU(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU") + self.assertEqual(response['status_code'], requests.codes.ok) input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService', 'supporting-circuit-pack-name': 'CP1-CFP0', @@ -141,105 +149,103 @@ class TransportPCEtesting(unittest.TestCase): 'supporting-port': 'CP1-CFP0-P1' } - input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=', - 'expected-sapi': 'Swfw02qXGyI=', - 'tx-sapi': 'Swfw02qXGyI=', - 'expected-dapi': 'Swfw02qXGyI=', - 'rate': 'org-openroadm-otn-common-types:OTU4', + input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTU4', 'fec': 'scfec' } - self.assertDictEqual(dict(res['interface'][0], **input_dict_1), - res['interface'][0]) + self.assertDictEqual(dict(response['interface'][0], **input_dict_1), + response['interface'][0]) - self.assertDictEqual(input_dict_2, - res['interface'][0]['org-openroadm-otn-otu-interfaces:otu']) + self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'], **input_dict_2), + response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']) def test_08_otn_service_path_create_ODU4(self): - response = test_utils.otn_service_path_request("create", "service_ODU4", "100", "ODU", - [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}]) - time.sleep(3) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"]) - self.assertTrue(res["output"]["success"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-device-renderer', 'otn-service-path', + { + 'service-name': 'service_ODU4', + 'operation': 'create', + 'service-rate': '100', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}] + }) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result']) + self.assertTrue(response['output']['success']) self.assertIn( {'node-id': 'SPDR-SA1', - 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface']) + 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, response['output']['node-interface']) def test_09_get_portmapping_NETWORK1(self): - response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1") + self.assertEqual(response['status_code'], requests.codes.ok) self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4" + self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU" self.assertIn( self.NETWORK1_CHECK_DICT, - res['mapping']) + response['mapping']) def test_10_check_interface_ODU4(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4") + self.assertEqual(response['status_code'], requests.codes.ok) input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService', 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU', 'type': 'org-openroadm-interfaces:otnOdu', 'supporting-port': 'CP1-CFP0-P1'} input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP', - 'rate': 'org-openroadm-otn-common-types:ODU4', - 'expected-dapi': 'Swfw02qXGyI=', - 'expected-sapi': 'Swfw02qXGyI=', - 'tx-dapi': 'Swfw02qXGyI=', - 'tx-sapi': 'Swfw02qXGyI='} - - self.assertDictEqual(dict(res['interface'][0], **input_dict_1), - res['interface'][0]) - self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + 'rate': 'org-openroadm-otn-common-types:ODU4'} + + self.assertDictEqual(dict(response['interface'][0], **input_dict_1), + response['interface'][0]) + self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], **input_dict_2 ), - res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'] ) self.assertDictEqual( - {u'payload-type': u'21', u'exp-payload-type': u'21'}, - res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + {'payload-type': '21', 'exp-payload-type': '21'}, + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) def test_11_otn_service_path_create_10GE(self): - response = test_utils.otn_service_path_request("create", "service1", "10", "Ethernet", - [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4", - "network-tp": "XPDR1-NETWORK1"}], - {"ethernet-encoding": "eth encode", - "trib-slot": ["1"], "trib-port-number": "1"}) - time.sleep(3) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"]) - self.assertTrue(res["output"]["success"]) - self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id']) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-device-renderer', 'otn-service-path', + { + 'service-name': 'service1', + 'operation': 'create', + 'service-rate': '10', + 'service-format': 'Ethernet', + 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}], + 'ethernet-encoding': 'eth encode', + 'trib-slot': ['1'], + 'trib-port-number': '1' + }) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result']) + self.assertTrue(response['output']['success']) + self.assertEqual('SPDR-SA1', response['output']['node-interface'][0]['node-id']) self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1', - res["output"]['node-interface'][0]['connection-id']) - self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id']) - self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id']) - self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id']) + response['output']['node-interface'][0]['connection-id']) + self.assertIn('XPDR1-CLIENT4-ETHERNET10G', response['output']['node-interface'][0]['eth-interface-id']) + self.assertIn('XPDR1-NETWORK1-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id']) + self.assertIn('XPDR1-CLIENT4-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id']) def test_12_check_interface_10GE_CLIENT(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G") + self.assertEqual(response['status_code'], requests.codes.ok) input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G', 'administrative-state': 'inService', 'supporting-circuit-pack-name': 'CP1-SFP4', 'type': 'org-openroadm-interfaces:ethernetCsmacd', 'supporting-port': 'CP1-SFP4-P1' } - self.assertDictEqual(dict(res['interface'][0], **input_dict), - res['interface'][0]) - self.assertDictEqual( - {u'speed': 10000}, - res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']) + self.assertDictEqual(dict(response['interface'][0], **input_dict), + response['interface'][0]) + self.assertEqual(10000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed']) def test_13_check_interface_ODU2E_CLIENT(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.check_node_attribute_request( + "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1") + self.assertEqual(response['status_code'], requests.codes.ok) input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1', 'administrative-state': 'inService', @@ -252,19 +258,19 @@ class TransportPCEtesting(unittest.TestCase): 'rate': 'org-openroadm-otn-common-types:ODU2e', 'monitoring-mode': 'terminated'} - self.assertDictEqual(dict(res['interface'][0], **input_dict_1), - res['interface'][0]) - self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + self.assertDictEqual(dict(response['interface'][0], **input_dict_1), + response['interface'][0]) + self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], **input_dict_2), - res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) self.assertDictEqual( - {u'payload-type': u'03', u'exp-payload-type': u'03'}, - res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) + {'payload-type': '03', 'exp-payload-type': '03'}, + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']) def test_14_check_interface_ODU2E_NETWORK(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.check_node_attribute_request( + "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1") + self.assertEqual(response['status_code'], requests.codes.ok) input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService', 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-ODU4', @@ -277,105 +283,123 @@ class TransportPCEtesting(unittest.TestCase): input_dict_3 = {'trib-port-number': 1} - self.assertDictEqual(dict(res['interface'][0], **input_dict_1), - res['interface'][0]) - self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], + self.assertDictEqual(dict(response['interface'][0], **input_dict_1), + response['interface'][0]) + self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'], **input_dict_2), - res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) - self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']) + self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ 'parent-odu-allocation'], **input_dict_3 ), - res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ + response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][ 'parent-odu-allocation']) self.assertIn(1, - res['interface'][0][ + response['interface'][0][ 'org-openroadm-otn-odu-interfaces:odu'][ 'parent-odu-allocation']['trib-slots']) def test_15_check_ODU2E_connection(self): - response = test_utils.check_netconf_node_request( - "SPDR-SA1", - "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1") - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() + response = test_utils_rfc8040.check_node_attribute_request( + "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1") + self.assertEqual(response['status_code'], requests.codes.ok) input_dict_1 = { 'connection-name': 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1', 'direction': 'bidirectional' } - self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1), - res['odu-connection'][0]) - self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'}, - res['odu-connection'][0]['destination']) - self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'}, - res['odu-connection'][0]['source']) + self.assertDictEqual(dict(response['odu-connection'][0], **input_dict_1), + response['odu-connection'][0]) + self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'}, + response['odu-connection'][0]['destination']) + self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'}, + response['odu-connection'][0]['source']) def test_16_otn_service_path_delete_10GE(self): - response = test_utils.otn_service_path_request("delete", "service1", "10", "Ethernet", - [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4", - "network-tp": "XPDR1-NETWORK1"}], - {"ethernet-encoding": "eth encode", - "trib-slot": ["1"], "trib-port-number": "1"}) - time.sleep(3) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Request processed', res["output"]["result"]) - self.assertTrue(res["output"]["success"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-device-renderer', 'otn-service-path', + { + 'service-name': 'service1', + 'operation': 'delete', + 'service-rate': '10', + 'service-format': 'Ethernet', + 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}], + 'ethernet-encoding': 'eth encode', + 'trib-slot': ['1'], + 'trib-port-number': '1' + }) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + self.assertTrue(response['output']['success']) def test_17_check_no_ODU2E_connection(self): - response = test_utils.check_netconf_node_request( - "SPDR-SA1", - "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1") - self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils_rfc8040.check_node_attribute_request( + "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1") + self.assertEqual(response['status_code'], requests.codes.conflict) def test_18_check_no_interface_ODU2E_NETWORK(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1") - self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils_rfc8040.check_node_attribute_request( + "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1") + self.assertEqual(response['status_code'], requests.codes.conflict) def test_19_check_no_interface_ODU2E_CLIENT(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1") - self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils_rfc8040.check_node_attribute_request( + "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1") + self.assertEqual(response['status_code'], requests.codes.conflict) def test_20_check_no_interface_10GE_CLIENT(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G") - self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G") + self.assertEqual(response['status_code'], requests.codes.conflict) def test_21_otn_service_path_delete_ODU4(self): - response = test_utils.otn_service_path_request("delete", "service_ODU4", "100", "ODU", - [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}]) - time.sleep(3) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Request processed', res["output"]["result"]) - self.assertTrue(res["output"]["success"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-device-renderer', 'otn-service-path', + { + 'service-name': 'service_ODU4', + 'operation': 'delete', + 'service-rate': '100', + 'service-format': 'ODU', + 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}] + }) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + self.assertTrue(response['output']['success']) def test_22_check_no_interface_ODU4(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4") - self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4") + self.assertEqual(response['status_code'], requests.codes.conflict) def test_23_service_path_delete_OCH_OTU4(self): - response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1", - [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}], - 196.1, 40, 196.075, 196.125, 761, - 768) - time.sleep(3) - self.assertEqual(response.status_code, requests.codes.ok) - res = response.json() - self.assertIn('Request processed', res["output"]["result"]) - self.assertTrue(res["output"]["success"]) + response = test_utils_rfc8040.transportpce_api_rpc_request( + 'transportpce-device-renderer', 'service-path', + { + 'service-name': 'service_test', + 'wave-number': '7', + 'modulation-format': 'dp-qpsk', + 'operation': 'delete', + 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}], + 'center-freq': 196.1, + 'nmc-width': 40, + 'min-freq': 196.075, + 'max-freq': 196.125, + 'lower-spectral-slot-number': 761, + 'higher-spectral-slot-number': 768 + }) + self.assertEqual(response['status_code'], requests.codes.ok) + self.assertIn('Request processed', response['output']['result']) + self.assertTrue(response['output']['success']) def test_24_check_no_interface_OTU4(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU") - self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU") + self.assertEqual(response['status_code'], requests.codes.conflict) def test_25_check_no_interface_OCH(self): - response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1") - self.assertEqual(response.status_code, requests.codes.conflict) + response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1") + self.assertEqual(response['status_code'], requests.codes.conflict) def test_26_disconnect_SPDR_SA1(self): - response = test_utils.unmount_device("SPDR-SA1") - self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200) + response = test_utils_rfc8040.unmount_device("SPDR-SA1") + self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content)) if __name__ == "__main__":