sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
# pylint: disable=too-few-public-methods
cls.init_failed = False
os.environ['JAVA_MIN_MEM'] = '1024M'
os.environ['JAVA_MAX_MEM'] = '4096M'
- cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils.start_tpce()
# TAPI feature is not installed by default in Karaf
if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
print("installing tapi feature...")
- result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi")
+ result = test_utils.install_karaf_feature("odl-transportpce-tapi")
if result.returncode != 0:
cls.init_failed = True
print("Restarting OpenDaylight...")
- test_utils_rfc8040.shutdown_process(cls.processes[0])
- cls.processes[0] = test_utils_rfc8040.start_karaf()
- test_utils_rfc8040.process_list[0] = cls.processes[0]
- cls.init_failed = not test_utils_rfc8040.wait_until_log_contains(
- test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60)
+ test_utils.shutdown_process(cls.processes[0])
+ cls.processes[0] = test_utils.start_karaf()
+ test_utils.process_list[0] = cls.processes[0]
+ cls.init_failed = not test_utils.wait_until_log_contains(
+ test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
if cls.init_failed:
print("tapi installation feature failed...")
- test_utils_rfc8040.shutdown_process(cls.processes[0])
+ test_utils.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
def test_01_connect_spdrA(self):
print("Connecting SPDRA")
- response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_02_connect_spdrC(self):
print("Connecting SPDRC")
- response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_03_connect_rdmA(self):
print("Connecting ROADMA")
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_04_connect_rdmC(self):
print("Connecting ROADMC")
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_11_check_otn_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['node']), 6, 'There should be 6 otn nodes')
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_12_check_openroadm_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['node']), 13, 'There should be 13 openroadm nodes')
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 22,
'There should be 22 openroadm links')
def test_13_get_tapi_topology_details(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['output']['topology']['link']), 15, 'There should be 15 TAPI links')
def test_14_check_sip_details(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-common', 'get-service-interface-point-list', None)
self.assertEqual(len(response['output']['sip']), 60, 'There should be 60 service interface point')
# test create connectivity service from spdrA to spdrC for Photonic_media
def test_15_create_connectivity_service_PhotonicMedia(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
self.assertEqual(response['status_code'], requests.codes.ok)
# time.sleep(self.WAITING)
def test_16_get_service_PhotonicMedia(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.pm))
+ response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.pm))
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.pm))
self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "ODU"
self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.pm
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
self.assertEqual(response['status_code'], requests.codes.ok)
# time.sleep(self.WAITING)
def test_18_get_service_ODU(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.odu))
+ response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.odu))
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.odu))
self.cr_serv_input_data["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10"
self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.odu
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
self.assertEqual(response['status_code'], requests.codes.ok)
# time.sleep(self.WAITING)
def test_20_get_service_DSR(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.dsr))
+ response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.dsr))
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.dsr))
time.sleep(1)
def test_21_get_connectivity_service_list(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'get-connectivity-service-list', None)
self.assertEqual(response['status_code'], requests.codes.ok)
liste_service = response['output']['service']
def test_22_delete_connectivity_service_DSR(self):
self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.dsr)
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_23_delete_connectivity_service_ODU(self):
self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.odu)
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_24_delete_connectivity_service_PhotonicMedia(self):
self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.pm)
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_25_get_no_tapi_services(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'get-connectivity-service-list', None)
self.assertEqual(response['status_code'], requests.codes.internal_server_error)
self.assertIn(
response['output']['errors']['error'])
def test_26_get_no_openroadm_services(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_27_disconnect_spdrA(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ response = test_utils.unmount_device("SPDR-SA1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_28_disconnect_spdrC(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ response = test_utils.unmount_device("SPDR-SC1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_29_disconnect_roadmA(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_30_disconnect_roadmC(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))