import test_utils # nopep8
+# pylint: disable=too-few-public-methods
+class UuidServices:
+ def __init__(self):
+ # pylint: disable=invalid-name
+ self.pm = None
+ self.odu = None
+ self.dsr = None
+
+
class TransportPCEtesting(unittest.TestCase):
- processes = None
+ processes = []
WAITING = 20 # nominal value is 300
NODE_VERSION = '2.2.1'
+ uuid_services = UuidServices()
cr_serv_sample_data = {
"input": {
@classmethod
def setUpClass(cls):
+ # pylint: disable=unsubscriptable-object
cls.init_failed = False
os.environ['JAVA_MIN_MEM'] = '1024M'
os.environ['JAVA_MAX_MEM'] = '4096M'
time.sleep(5)
def test_01_connect_spdrA(self):
- response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ print("Connecting SPDRA")
+ response = test_utils.mount_tapi_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_spdrC(self):
- response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ print("Connecting SPDRC")
+ response = test_utils.mount_tapi_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ print("Connecting ROADMA")
+ response = test_utils.mount_tapi_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ time.sleep(2)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ print("Connecting ROADMC")
+ response = test_utils.mount_tapi_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
nbNode = len(res['output']['topology']['node'])
nbLink = len(res['output']['topology']['link'])
self.assertEqual(nbNode, 14, 'There should be 14 TAPI nodes')
- self.assertEqual(nbLink, 13, 'There should be 13 TAPI links')
+ self.assertEqual(nbLink, 15, 'There should be 15 TAPI links')
time.sleep(2)
def test_14_check_sip_details(self):
time.sleep(self.WAITING)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- global service_pm_uuid
- service_pm_uuid = res['output']['service']['uuid']
- print("photonic media service uuid : {}".format(res['output']['service']['uuid']))
+ self.uuid_services.pm = res['output']['service']['uuid']
+ # pylint: disable=consider-using-f-string
+ print("photonic media service uuid : {}".format(self.uuid_services.pm))
input_dict_1 = {'administrative-state': 'LOCKED',
'lifecycle-state': 'PLANNED',
res['output']['service']['end-point'][0]['name'][0])
self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
res['output']['service']['end-point'][1]['name'][0])
+ # If the gate fails is because of the waiting time not being enough
time.sleep(self.WAITING)
def test_16_get_service_PhotonicMedia(self):
response = test_utils.get_service_list_request(
- "services/" + str(service_pm_uuid))
+ "services/" + str(self.uuid_services.pm))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
res['services'][0]['administrative-state'], 'inService')
self.assertEqual(
- res['services'][0]['service-name'], service_pm_uuid)
+ res['services'][0]['service-name'], self.uuid_services.pm)
self.assertEqual(
res['services'][0]['connection-type'], 'infrastructure')
self.assertEqual(
def test_17_create_connectivity_service_ODU(self):
# pylint: disable=line-too-long
self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "ODU"
- self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "eecbfa6e-57ab-3651-9606-c22c8ce73f18"
+ self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "5efda776-f8de-3e0b-9bbd-2c702e210946"
self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "ODU"
- self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "31f83b1f-29b2-3a8e-af9b-6423dbc5aa22"
+ self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "8116d0af-39fa-3df5-bed2-dd2cd5e8217d"
self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "ODU"
response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
time.sleep(self.WAITING)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- global service_odu_uuid
- service_odu_uuid = res['output']['service']['uuid']
- print("odu service uuid : {}".format(res['output']['service']['uuid']))
+ self.uuid_services.odu = res['output']['service']['uuid']
+ # pylint: disable=consider-using-f-string
+ print("odu service uuid : {}".format(self.uuid_services.odu))
input_dict_1 = {'administrative-state': 'LOCKED',
'lifecycle-state': 'PLANNED',
res['output']['service']['end-point'][0]['name'][0])
self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
res['output']['service']['end-point'][1]['name'][0])
+ # If the gate fails is because of the waiting time not being enough
time.sleep(self.WAITING)
def test_18_get_service_ODU(self):
response = test_utils.get_service_list_request(
- "services/" + str(service_odu_uuid))
+ "services/" + str(self.uuid_services.odu))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
res['services'][0]['administrative-state'], 'inService')
self.assertEqual(
- res['services'][0]['service-name'], service_odu_uuid)
+ res['services'][0]['service-name'], self.uuid_services.odu)
self.assertEqual(
res['services'][0]['connection-type'], 'infrastructure')
self.assertEqual(
time.sleep(self.WAITING)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- global service_dsr_uuid
- service_dsr_uuid = res['output']['service']['uuid']
- print("dsr service uuid : {}".format(res['output']['service']['uuid']))
+ self.uuid_services.dsr = res['output']['service']['uuid']
+ # pylint: disable=consider-using-f-string
+ print("dsr service uuid : {}".format(self.uuid_services.dsr))
input_dict_1 = {'administrative-state': 'LOCKED',
'lifecycle-state': 'PLANNED',
self.assertDictEqual(dict(input_dict_3,
**res['output']['service']['end-point'][1]['name'][0]),
res['output']['service']['end-point'][1]['name'][0])
+ # The sleep here is okey as the DSR service creation is very fast
time.sleep(self.WAITING)
def test_20_get_service_DSR(self):
response = test_utils.get_service_list_request(
- "services/" + str(service_dsr_uuid))
+ "services/" + str(self.uuid_services.dsr))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
res['services'][0]['administrative-state'], 'inService')
self.assertEqual(
- res['services'][0]['service-name'], service_dsr_uuid)
+ res['services'][0]['service-name'], self.uuid_services.dsr)
self.assertEqual(
res['services'][0]['connection-type'], 'service')
self.assertEqual(
res = response.json()
liste_service = res['output']['service']
for ele in liste_service:
- if ele['uuid'] == service_pm_uuid:
+ if ele['uuid'] == self.uuid_services.pm:
self.assertEqual(ele['operational-state'], 'ENABLED')
self.assertEqual(ele['service-layer'], 'PHOTONIC_MEDIA')
nbconnection = len(ele['connection'])
- self.assertEqual(nbconnection, 9, 'There should be 9 connections')
- elif ele['uuid'] == service_odu_uuid:
+ self.assertEqual(nbconnection, 3, 'There should be 3 connections')
+ elif ele['uuid'] == self.uuid_services.odu:
self.assertEqual(ele['operational-state'], 'ENABLED')
self.assertEqual(ele['service-layer'], 'ODU')
nbconnection = len(ele['connection'])
- self.assertEqual(nbconnection, 3, 'There should be 3 connections')
- elif ele['uuid'] == service_dsr_uuid:
+ self.assertEqual(nbconnection, 1, 'There should be 1 connections')
+ elif ele['uuid'] == self.uuid_services.dsr:
self.assertEqual(ele['operational-state'], 'ENABLED')
self.assertEqual(ele['service-layer'], 'DSR')
nbconnection = len(ele['connection'])
- self.assertEqual(nbconnection, 1, 'There should be 1 connection')
+ self.assertEqual(nbconnection, 2, 'There should be 2 connections')
else:
self.fail("get connectivity service failed")
time.sleep(2)
def test_22_delete_connectivity_service_DSR(self):
- response = test_utils.tapi_delete_connectivity_request(service_dsr_uuid)
+ response = test_utils.tapi_delete_connectivity_request(self.uuid_services.dsr)
self.assertEqual(response.status_code, requests.codes.no_content)
time.sleep(self.WAITING)
def test_23_delete_connectivity_service_ODU(self):
- response = test_utils.tapi_delete_connectivity_request(service_odu_uuid)
+ response = test_utils.tapi_delete_connectivity_request(self.uuid_services.odu)
self.assertEqual(response.status_code, requests.codes.no_content)
time.sleep(self.WAITING)
def test_24_delete_connectivity_service_PhotonicMedia(self):
- response = test_utils.tapi_delete_connectivity_request(service_pm_uuid)
+ response = test_utils.tapi_delete_connectivity_request(self.uuid_services.pm)
self.assertEqual(response.status_code, requests.codes.no_content)
time.sleep(self.WAITING)