3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 class TransportPCEtesting(unittest.TestCase):
30 WAITING = 20 # nominal value is 300
31 NODE_VERSION = '2.2.1'
33 cr_serv_sample_data = {
37 "layer-protocol-name": "PHOTONIC_MEDIA",
38 "service-interface-point": {
39 "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107"
41 "administrative-state": "UNLOCKED",
42 "operational-state": "ENABLED",
43 "direction": "BIDIRECTIONAL",
45 "protection-role": "WORK",
46 "local-id": "SPDR-SA1-XPDR1",
49 "value-name": "OpenROADM node id",
50 "value": "SPDR-SA1-XPDR1"
55 "layer-protocol-name": "PHOTONIC_MEDIA",
56 "service-interface-point": {
57 "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8"
59 "administrative-state": "UNLOCKED",
60 "operational-state": "ENABLED",
61 "direction": "BIDIRECTIONAL",
63 "protection-role": "WORK",
64 "local-id": "SPDR-SC1-XPDR1",
67 "value-name": "OpenROADM node id",
68 "value": "SPDR-SC1-XPDR1"
73 "connectivity-constraint": {
74 "service-layer": "PHOTONIC_MEDIA",
75 "service-type": "POINT_TO_POINT_CONNECTIVITY",
76 "service-level": "Some service-level",
77 "requested-capacity": {
84 "state": "Some state"}}
88 # pylint: disable=unsubscriptable-object
89 cls.init_failed = False
90 os.environ['JAVA_MIN_MEM'] = '1024M'
91 os.environ['JAVA_MAX_MEM'] = '4096M'
92 cls.processes = test_utils.start_tpce()
93 # TAPI feature is not installed by default in Karaf
94 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
95 print("installing tapi feature...")
96 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
97 if result.returncode != 0:
98 cls.init_failed = True
99 print("Restarting OpenDaylight...")
100 test_utils.shutdown_process(cls.processes[0])
101 cls.processes[0] = test_utils.start_karaf()
102 test_utils.process_list[0] = cls.processes[0]
103 cls.init_failed = not test_utils.wait_until_log_contains(
104 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
106 print("tapi installation feature failed...")
107 test_utils.shutdown_process(cls.processes[0])
109 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
110 ('roadma', cls.NODE_VERSION),
111 ('roadmc', cls.NODE_VERSION),
112 ('spdrc', cls.NODE_VERSION)])
115 def tearDownClass(cls):
116 # pylint: disable=not-an-iterable
117 for process in cls.processes:
118 test_utils.shutdown_process(process)
119 print("all processes killed")
124 def test_01_connect_spdrA(self):
125 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
126 self.assertEqual(response.status_code,
127 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
129 def test_02_connect_spdrC(self):
130 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
131 self.assertEqual(response.status_code,
132 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
134 def test_03_connect_rdmA(self):
135 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
136 self.assertEqual(response.status_code,
137 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
139 def test_04_connect_rdmC(self):
140 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
141 self.assertEqual(response.status_code,
142 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
145 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
146 "ROADM-A1", "1", "SRG1-PP1-TXRX")
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
149 self.assertIn('Xponder Roadm Link created successfully',
150 res["output"]["result"])
153 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
154 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
155 "ROADM-A1", "1", "SRG1-PP1-TXRX")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Roadm Xponder links created successfully',
159 res["output"]["result"])
162 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
163 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
164 "ROADM-C1", "1", "SRG1-PP1-TXRX")
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 self.assertIn('Xponder Roadm Link created successfully',
168 res["output"]["result"])
171 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
172 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
173 "ROADM-C1", "1", "SRG1-PP1-TXRX")
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Roadm Xponder links created successfully',
177 res["output"]["result"])
180 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
181 # Config ROADMA-ROADMC oms-attributes
183 "auto-spanloss": "true",
184 "spanloss-base": 11.4,
185 "spanloss-current": 12,
186 "engineered-spanloss": 12.2,
187 "link-concatenation": [{
190 "SRLG-length": 100000,
192 response = test_utils.add_oms_attr_request(
193 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
194 self.assertEqual(response.status_code, requests.codes.created)
197 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
198 # Config ROADMC-ROADMA oms-attributes
200 "auto-spanloss": "true",
201 "spanloss-base": 11.4,
202 "spanloss-current": 12,
203 "engineered-spanloss": 12.2,
204 "link-concatenation": [{
207 "SRLG-length": 100000,
209 response = test_utils.add_oms_attr_request(
210 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
211 self.assertEqual(response.status_code, requests.codes.created)
214 def test_11_check_otn_topology(self):
215 response = test_utils.get_otn_topo_request()
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 nbNode = len(res['network'][0]['node'])
219 self.assertEqual(nbNode, 6, 'There should be 6 otn nodes')
220 self.assertNotIn('ietf-network-topology:link', res['network'][0])
223 def test_12_check_openroadm_topology(self):
224 response = test_utils.get_ordm_topo_request("")
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 nbNode = len(res['network'][0]['node'])
228 nbLink = len(res['network'][0]['ietf-network-topology:link'])
229 self.assertEqual(nbNode, 13, 'There should be 13 openroadm nodes')
230 self.assertEqual(nbLink, 22, 'There should be 22 openroadm links')
233 def test_13_get_tapi_topology_details(self):
234 response = test_utils.tapi_get_topology_details_request(
235 "T0 - Full Multi-layer topology")
237 self.assertEqual(response.status_code, requests.codes.ok)
238 res = response.json()
239 nbNode = len(res['output']['topology']['node'])
240 nbLink = len(res['output']['topology']['link'])
241 self.assertEqual(nbNode, 14, 'There should be 14 TAPI nodes')
242 self.assertEqual(nbLink, 13, 'There should be 13 TAPI links')
245 def test_14_check_sip_details(self):
246 response = test_utils.tapi_get_sip_details_request()
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
249 nbSip = len(res['output']['sip'])
250 self.assertEqual(nbSip, 60, 'There should be 60 service interface point')
253 # test create connectivity service from spdrA to spdrC for Photonic_media
254 def test_15_create_connectivity_service_PhotonicMedia(self):
255 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
256 time.sleep(self.WAITING)
257 self.assertEqual(response.status_code, requests.codes.ok)
258 res = response.json()
259 global SERVICE_PM_UUID
260 SERVICE_PM_UUID = res['output']['service']['uuid']
261 # pylint: disable=consider-using-f-string
262 print("photonic media service uuid : {}".format(res['output']['service']['uuid']))
264 input_dict_1 = {'administrative-state': 'LOCKED',
265 'lifecycle-state': 'PLANNED',
266 'operational-state': 'DISABLED',
267 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
268 'service-layer': 'PHOTONIC_MEDIA',
269 'connectivity-direction': 'BIDIRECTIONAL'
271 input_dict_2 = {'value-name': 'OpenROADM node id',
272 'value': 'SPDR-SC1-XPDR1'}
273 input_dict_3 = {'value-name': 'OpenROADM node id',
274 'value': 'SPDR-SA1-XPDR1'}
276 self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
277 res['output']['service'])
278 self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
279 res['output']['service']['end-point'][0]['name'][0])
280 self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
281 res['output']['service']['end-point'][1]['name'][0])
282 time.sleep(self.WAITING)
284 def test_16_get_service_PhotonicMedia(self):
285 response = test_utils.get_service_list_request(
286 "services/" + str(SERVICE_PM_UUID))
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
290 res['services'][0]['administrative-state'], 'inService')
292 res['services'][0]['service-name'], SERVICE_PM_UUID)
294 res['services'][0]['connection-type'], 'infrastructure')
296 res['services'][0]['lifecycle-state'], 'planned')
299 # test create connectivity service from spdrA to spdrC for odu
300 def test_17_create_connectivity_service_ODU(self):
301 # pylint: disable=line-too-long
302 self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "ODU"
303 self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "eecbfa6e-57ab-3651-9606-c22c8ce73f18"
304 self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "ODU"
305 self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "31f83b1f-29b2-3a8e-af9b-6423dbc5aa22"
306 self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "ODU"
308 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
309 time.sleep(self.WAITING)
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 global SERVICE_ODU_UUID
313 SERVICE_ODU_UUID = res['output']['service']['uuid']
314 # pylint: disable=consider-using-f-string
315 print("odu service uuid : {}".format(res['output']['service']['uuid']))
317 input_dict_1 = {'administrative-state': 'LOCKED',
318 'lifecycle-state': 'PLANNED',
319 'operational-state': 'DISABLED',
320 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
321 'service-layer': 'ODU',
322 'connectivity-direction': 'BIDIRECTIONAL'
324 input_dict_2 = {'value-name': 'OpenROADM node id',
325 'value': 'SPDR-SC1-XPDR1'}
326 input_dict_3 = {'value-name': 'OpenROADM node id',
327 'value': 'SPDR-SA1-XPDR1'}
329 self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
330 res['output']['service'])
331 self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
332 res['output']['service']['end-point'][0]['name'][0])
333 self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
334 res['output']['service']['end-point'][1]['name'][0])
335 time.sleep(self.WAITING)
337 def test_18_get_service_ODU(self):
338 response = test_utils.get_service_list_request(
339 "services/" + str(SERVICE_ODU_UUID))
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
343 res['services'][0]['administrative-state'], 'inService')
345 res['services'][0]['service-name'], SERVICE_ODU_UUID)
347 res['services'][0]['connection-type'], 'infrastructure')
349 res['services'][0]['lifecycle-state'], 'planned')
352 # test create connectivity service from spdrA to spdrC for dsr
353 def test_19_create_connectivity_service_DSR(self):
354 # pylint: disable=line-too-long
355 self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "DSR"
356 self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
357 self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "DSR"
358 self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22"
359 self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "DSR"
360 self.cr_serv_sample_data["input"]["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10"
362 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
363 time.sleep(self.WAITING)
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
366 global SERVICE_DSR_UUID
367 SERVICE_DSR_UUID = res['output']['service']['uuid']
368 # pylint: disable=consider-using-f-string
369 print("dsr service uuid : {}".format(res['output']['service']['uuid']))
371 input_dict_1 = {'administrative-state': 'LOCKED',
372 'lifecycle-state': 'PLANNED',
373 'operational-state': 'DISABLED',
374 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
375 'service-layer': 'DSR',
376 'connectivity-direction': 'BIDIRECTIONAL'
378 input_dict_2 = {'value-name': 'OpenROADM node id',
379 'value': 'SPDR-SC1-XPDR1'}
380 input_dict_3 = {'value-name': 'OpenROADM node id',
381 'value': 'SPDR-SA1-XPDR1'}
383 self.assertDictEqual(dict(input_dict_1,
384 **res['output']['service']),
385 res['output']['service'])
386 self.assertDictEqual(dict(input_dict_2,
387 **res['output']['service']['end-point'][0]['name'][0]),
388 res['output']['service']['end-point'][0]['name'][0])
389 self.assertDictEqual(dict(input_dict_3,
390 **res['output']['service']['end-point'][1]['name'][0]),
391 res['output']['service']['end-point'][1]['name'][0])
392 time.sleep(self.WAITING)
394 def test_20_get_service_DSR(self):
395 response = test_utils.get_service_list_request(
396 "services/" + str(SERVICE_DSR_UUID))
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
400 res['services'][0]['administrative-state'], 'inService')
402 res['services'][0]['service-name'], SERVICE_DSR_UUID)
404 res['services'][0]['connection-type'], 'service')
406 res['services'][0]['lifecycle-state'], 'planned')
409 def test_21_get_connectivity_service_list(self):
410 response = test_utils.tapi_get_service_list_request()
411 self.assertEqual(response.status_code, requests.codes.ok)
412 res = response.json()
413 liste_service = res['output']['service']
414 for ele in liste_service:
415 if ele['uuid'] == SERVICE_PM_UUID:
416 self.assertEqual(ele['operational-state'], 'ENABLED')
417 self.assertEqual(ele['service-layer'], 'PHOTONIC_MEDIA')
418 nbconnection = len(ele['connection'])
419 self.assertEqual(nbconnection, 9, 'There should be 9 connections')
420 elif ele['uuid'] == SERVICE_ODU_UUID:
421 self.assertEqual(ele['operational-state'], 'ENABLED')
422 self.assertEqual(ele['service-layer'], 'ODU')
423 nbconnection = len(ele['connection'])
424 self.assertEqual(nbconnection, 3, 'There should be 3 connections')
425 elif ele['uuid'] == SERVICE_DSR_UUID:
426 self.assertEqual(ele['operational-state'], 'ENABLED')
427 self.assertEqual(ele['service-layer'], 'DSR')
428 nbconnection = len(ele['connection'])
429 self.assertEqual(nbconnection, 1, 'There should be 1 connection')
431 self.fail("get connectivity service failed")
434 def test_22_delete_connectivity_service_DSR(self):
435 response = test_utils.tapi_delete_connectivity_request(SERVICE_DSR_UUID)
436 self.assertEqual(response.status_code, requests.codes.no_content)
437 time.sleep(self.WAITING)
439 def test_23_delete_connectivity_service_ODU(self):
440 response = test_utils.tapi_delete_connectivity_request(SERVICE_ODU_UUID)
441 self.assertEqual(response.status_code, requests.codes.no_content)
442 time.sleep(self.WAITING)
444 def test_24_delete_connectivity_service_PhotonicMedia(self):
445 response = test_utils.tapi_delete_connectivity_request(SERVICE_PM_UUID)
446 self.assertEqual(response.status_code, requests.codes.no_content)
447 time.sleep(self.WAITING)
449 def test_25_get_no_tapi_services(self):
450 response = test_utils.tapi_get_service_list_request()
451 res = response.json()
453 {"error-type": "rpc", "error-tag": "operation-failed",
454 "error-message": "No services exist in datastore",
455 "error-info": "<severity>error</severity>"},
456 res['errors']['error'])
459 def test_26_get_no_openroadm_services(self):
460 response = test_utils.get_service_list_request("")
461 self.assertEqual(response.status_code, requests.codes.conflict)
462 res = response.json()
464 {"error-type": "application", "error-tag": "data-missing",
465 "error-message": "Request could not be completed because the relevant data model content does not exist"},
466 res['errors']['error'])
469 def test_27_disconnect_spdrA(self):
470 response = test_utils.unmount_device("SPDR-SA1")
471 self.assertEqual(response.status_code, requests.codes.ok,
472 test_utils.CODE_SHOULD_BE_200)
474 def test_28_disconnect_spdrC(self):
475 response = test_utils.unmount_device("SPDR-SC1")
476 self.assertEqual(response.status_code, requests.codes.ok,
477 test_utils.CODE_SHOULD_BE_200)
479 def test_29_disconnect_roadmA(self):
480 response = test_utils.unmount_device("ROADM-A1")
481 self.assertEqual(response.status_code, requests.codes.ok,
482 test_utils.CODE_SHOULD_BE_200)
484 def test_30_disconnect_roadmC(self):
485 response = test_utils.unmount_device("ROADM-C1")
486 self.assertEqual(response.status_code, requests.codes.ok,
487 test_utils.CODE_SHOULD_BE_200)
490 if __name__ == "__main__":
491 unittest.main(verbosity=2)