sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils_rfc8040 # nopep8
+import test_utils # nopep8
# pylint: disable=too-few-public-methods
WAITING = 20 # nominal value is 300
NODE_VERSION = '2.2.1'
uuid_services = UuidServices()
+ uuidTpAiOTSI = "f4c370be-e307-380d-a31b-7edc2b431e5d"
+ uuidTpZiOTSI = "febb4502-6b27-3701-990b-3aa4941f48c4"
+ uuidTpADSR = "3aca9b37-bc46-335d-b147-2423690dee18"
+ uuidTpZDSR = "709d3595-6d56-3ad6-a3cd-5a4a09ce6f9d"
+# SIP uuids
+ # SIP+SPDR-SA1-XPDR1+PHOTONIC_MEDIA_OTS+XPDR1-NETWORK1 UUID IS
+ sAOTS = "38d81f55-1798-3520-ba16-08efa56630c4"
+ # SIP+SPDR-SC1-XPDR1+PHOTONIC_MEDIA_OTS+XPDR1-NETWORK1 UUID IS
+ sZOTS = "97d9ba27-0efa-3010-8b98-b4d73240120c"
+ # SIP+SPDR-SA1-XPDR1+eOTSi+XPDR1-NETWORK1 UUID IS
+ sAeOTS = "f4dbce65-6191-3c84-b351-29ccb0629221"
+ # SIP+SPDR-SC1-XPDR1+eOTSi+XPDR1-NETWORK1 UUID IS
+ sZeOTS = "faabebd3-d7af-3389-96a7-261672744591"
+ # SIP+SPDR-SA1-XPDR1+DSR+XPDR1-CLIENT1 UUID IS
+ sADSR = "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
+ # SIP+SPDR-SC1-XPDR1+DSR+XPDR1-CLIENT1 UUID IS
+ sZDSR = "25812ef2-625d-3bf8-af55-5e93946d1c22"
+ # SIP+SPDR-SA1-XPDR1+eODU+XPDR1-CLIENT1 UUID IS
+ sAeODU = "b6421484-531f-3444-adfc-e11c503f1fab"
+ # SIP+SPDR-SC1-XPDR1+eODU+XPDR1-CLIENT1 UUID IS
+ sZeODU = "4511fca7-0cf7-3b27-a128-3b372d5e1fa8"
+ # uuid_A = uuid.UUID(bytes("SPDR-SA1-XPDR1+DSR+eOTSI+XPDR1-NETWORK1", 'utf-8'))
+ # uuid_C = uuid.UUID(bytes("SPDR-SC1-XPDR1+DSR+eOTSI+XPDR1-NETWORK1", 'utf-8'))
cr_serv_input_data = {
"end-point": [
{
"layer-protocol-name": "PHOTONIC_MEDIA",
"service-interface-point": {
- "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107"
+ "service-interface-point-uuid": "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
},
"administrative-state": "UNLOCKED",
"operational-state": "ENABLED",
{
"layer-protocol-name": "PHOTONIC_MEDIA",
"service-interface-point": {
- "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8"
+ "service-interface-point-uuid": "25812ef2-625d-3bf8-af55-5e93946d1c22"
},
"administrative-state": "UNLOCKED",
"operational-state": "ENABLED",
}
],
"connectivity-constraint": {
- "service-layer": "PHOTONIC_MEDIA",
"service-type": "POINT_TO_POINT_CONNECTIVITY",
"service-level": "Some service-level",
"requested-capacity": {
"total-size": {
"value": "100",
- "unit": "GB"
+ "unit": "tapi-common:CAPACITY_UNIT_GBPS"
}
}
},
- "state": "Some state"}
+ "topology-constraint": [
+ {
+ "local-id": "localIdTopoConstraint",
+ "name": [
+ {
+ "value-name": "Dumb constraint",
+ "value": "for debug1"
+ }
+ ]
+ }
+ ],
+ "state": "LOCKED",
+ "layer-protocol-name": "PHOTONIC_MEDIA"}
- del_serv_input_data = {"service-id-or-name": "TBD"}
+ del_serv_input_data = {"uuid": "TBD"}
- tapi_topo = {"topology-id-or-name": "TBD"}
+ tapi_topo = {"topology-id": "TBD"}
@classmethod
def setUpClass(cls):
cls.init_failed = False
os.environ['JAVA_MIN_MEM'] = '1024M'
os.environ['JAVA_MAX_MEM'] = '4096M'
- cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils.start_tpce()
# TAPI feature is not installed by default in Karaf
- if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
+ if "NO_ODL_STARTUP" not in os.environ or "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
print("installing tapi feature...")
- result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi")
+ result = test_utils.install_karaf_feature("odl-transportpce-tapi")
if result.returncode != 0:
cls.init_failed = True
print("Restarting OpenDaylight...")
- test_utils_rfc8040.shutdown_process(cls.processes[0])
- cls.processes[0] = test_utils_rfc8040.start_karaf()
- test_utils_rfc8040.process_list[0] = cls.processes[0]
- cls.init_failed = not test_utils_rfc8040.wait_until_log_contains(
- test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60)
+ test_utils.shutdown_process(cls.processes[0])
+ cls.processes[0] = test_utils.start_karaf()
+ test_utils.process_list[0] = cls.processes[0]
+ cls.init_failed = not test_utils.wait_until_log_contains(
+ test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
if cls.init_failed:
print("tapi installation feature failed...")
- test_utils_rfc8040.shutdown_process(cls.processes[0])
+ test_utils.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils_rfc8040.shutdown_process(process)
+ test_utils.shutdown_process(process)
print("all processes killed")
def setUp(self):
- time.sleep(5)
+ time.sleep(2)
def test_01_connect_spdrA(self):
print("Connecting SPDRA")
- response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_02_connect_spdrC(self):
print("Connecting SPDRC")
- response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_03_connect_rdmA(self):
print("Connecting ROADMA")
- response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_04_connect_rdmC(self):
print("Connecting ROADMC")
- response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
time.sleep(2)
def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
time.sleep(2)
def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-rdm-xpdr-links',
{'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils_rfc8040.add_oms_attr_request(
+ response = test_utils.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_11_check_otn_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['node']), 6, 'There should be 6 otn nodes')
self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_12_check_openroadm_topology(self):
- response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(len(response['network'][0]['node']), 13, 'There should be 13 openroadm nodes')
self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 22,
'There should be 22 openroadm links')
def test_13_get_tapi_topology_details(self):
- self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.tapi_topo["topology-id"] = test_utils.T0_FULL_MULTILAYER_TOPO_UUID
+ response = test_utils.transportpce_api_rpc_request(
'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
self.assertEqual(response['status_code'], requests.codes.ok)
- self.assertEqual(len(response['output']['topology']['node']), 14, 'There should be 14 TAPI nodes')
- self.assertEqual(len(response['output']['topology']['link']), 15, 'There should be 15 TAPI links')
+ self.assertEqual(len(response['output']['topology']['node']), 8, 'There should be 8 TAPI nodes')
+ self.assertEqual(len(response['output']['topology']['link']), 3, 'There should be 3 TAPI links')
+ print(response['output']['topology']['node'][0])
+ print(response['output']['topology']['node'][1])
+ print(response['output']['topology']['node'][2])
def test_14_check_sip_details(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-common', 'get-service-interface-point-list', None)
- self.assertEqual(len(response['output']['sip']), 60, 'There should be 60 service interface point')
+ self.assertEqual(len(response['output']['sip']), 72, 'There should be 72 service interface point')
# test create connectivity service from spdrA to spdrC for Photonic_media
def test_15_create_connectivity_service_PhotonicMedia(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = self.sAOTS
+ self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = self.sZOTS
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'administrative-state': 'LOCKED',
'lifecycle-state': 'PLANNED',
'operational-state': 'DISABLED',
- 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
- 'service-layer': 'PHOTONIC_MEDIA',
- 'connectivity-direction': 'BIDIRECTIONAL'
+ # 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
+ # 'service-layer': 'PHOTONIC_MEDIA',
+ 'layer-protocol-name': 'PHOTONIC_MEDIA',
+ # 'connectivity-direction': 'BIDIRECTIONAL'
+ 'direction': 'BIDIRECTIONAL'
}
input_dict_2 = {'value-name': 'OpenROADM node id',
'value': 'SPDR-SC1-XPDR1'}
# time.sleep(self.WAITING)
def test_16_get_service_PhotonicMedia(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.pm))
+ response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.pm))
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.pm))
# test create connectivity service from spdrA to spdrC for odu
def test_17_create_connectivity_service_ODU(self):
# pylint: disable=line-too-long
+ self.cr_serv_input_data["layer-protocol-name"] = "ODU"
self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "ODU"
- self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "5efda776-f8de-3e0b-9bbd-2c702e210946"
+ self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = self.sAeODU
self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "ODU"
- self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "8116d0af-39fa-3df5-bed2-dd2cd5e8217d"
- self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "ODU"
+ self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = self.sZeODU
+# self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "ODU"
self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.pm
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'administrative-state': 'LOCKED',
'lifecycle-state': 'PLANNED',
'operational-state': 'DISABLED',
- 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
- 'service-layer': 'ODU',
- 'connectivity-direction': 'BIDIRECTIONAL'
+ # 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
+ 'layer-protocol-name': 'ODU',
+ 'direction': 'BIDIRECTIONAL'
}
input_dict_2 = {'value-name': 'OpenROADM node id',
'value': 'SPDR-SC1-XPDR1'}
# time.sleep(self.WAITING)
def test_18_get_service_ODU(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.odu))
+ response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.odu))
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.odu))
# test create connectivity service from spdrA to spdrC for dsr
def test_19_create_connectivity_service_DSR(self):
# pylint: disable=line-too-long
+ self.cr_serv_input_data["layer-protocol-name"] = "DSR"
self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "DSR"
- self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
+ self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = self.sADSR
self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "DSR"
- self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22"
- self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "DSR"
+ self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = self.sZDSR
+# self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "DSR"
self.cr_serv_input_data["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10"
self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.odu
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'administrative-state': 'LOCKED',
'lifecycle-state': 'PLANNED',
'operational-state': 'DISABLED',
- 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
- 'service-layer': 'DSR',
- 'connectivity-direction': 'BIDIRECTIONAL'
+ # 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
+ 'layer-protocol-name': 'DSR',
+ 'direction': 'BIDIRECTIONAL'
}
input_dict_2 = {'value-name': 'OpenROADM node id',
'value': 'SPDR-SC1-XPDR1'}
# time.sleep(self.WAITING)
def test_20_get_service_DSR(self):
- response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.dsr))
+ response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.dsr))
self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(response['services'][0]['administrative-state'], 'inService')
self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.dsr))
time.sleep(1)
def test_21_get_connectivity_service_list(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'get-connectivity-service-list', None)
self.assertEqual(response['status_code'], requests.codes.ok)
liste_service = response['output']['service']
for ele in liste_service:
if ele['uuid'] == self.uuid_services.pm:
self.assertEqual(ele['operational-state'], 'ENABLED')
- self.assertEqual(ele['service-layer'], 'PHOTONIC_MEDIA')
+# self.assertEqual(ele['service-layer'], 'PHOTONIC_MEDIA')
+ self.assertEqual(ele['layer-protocol-name'], 'PHOTONIC_MEDIA')
nbconnection = len(ele['connection'])
self.assertEqual(nbconnection, 3, 'There should be 3 connections')
elif ele['uuid'] == self.uuid_services.odu:
self.assertEqual(ele['operational-state'], 'ENABLED')
- self.assertEqual(ele['service-layer'], 'ODU')
+# self.assertEqual(ele['service-layer'], 'ODU')
+ self.assertEqual(ele['layer-protocol-name'], 'ODU')
nbconnection = len(ele['connection'])
self.assertEqual(nbconnection, 1, 'There should be 1 connections')
elif ele['uuid'] == self.uuid_services.dsr:
self.assertEqual(ele['operational-state'], 'ENABLED')
- self.assertEqual(ele['service-layer'], 'DSR')
+# self.assertEqual(ele['service-layer'], 'DSR')
+ self.assertEqual(ele['layer-protocol-name'], 'DSR')
nbconnection = len(ele['connection'])
self.assertEqual(nbconnection, 2, 'There should be 2 connections')
else:
time.sleep(2)
def test_22_delete_connectivity_service_DSR(self):
- self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.dsr)
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.del_serv_input_data["uuid"] = str(self.uuid_services.dsr)
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_23_delete_connectivity_service_ODU(self):
- self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.odu)
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.del_serv_input_data["uuid"] = str(self.uuid_services.odu)
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_24_delete_connectivity_service_PhotonicMedia(self):
- self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.pm)
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ self.del_serv_input_data["uuid"] = str(self.uuid_services.pm)
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_25_get_no_tapi_services(self):
- response = test_utils_rfc8040.transportpce_api_rpc_request(
+ response = test_utils.transportpce_api_rpc_request(
'tapi-connectivity', 'get-connectivity-service-list', None)
self.assertEqual(response['status_code'], requests.codes.internal_server_error)
self.assertIn(
response['output']['errors']['error'])
def test_26_get_no_openroadm_services(self):
- response = test_utils_rfc8040.get_ordm_serv_list_request()
+ response = test_utils.get_ordm_serv_list_request()
self.assertEqual(response['status_code'], requests.codes.conflict)
def test_27_disconnect_spdrA(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ response = test_utils.unmount_device("SPDR-SA1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_28_disconnect_spdrC(self):
- response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ response = test_utils.unmount_device("SPDR-SC1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_29_disconnect_roadmA(self):
- response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ response = test_utils.unmount_device("ROADM-A1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_30_disconnect_roadmC(self):
- response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ response = test_utils.unmount_device("ROADM-C1")
self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))