3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
20 sys.path.append('transportpce_tests/common/')
21 import test_utils # nopep8
24 class TransportPCEtesting(unittest.TestCase):
27 WAITING = 20 # nominal value is 300
28 NODE_VERSION = '2.2.1'
30 cr_serv_sample_data = {
34 "layer-protocol-name": "PHOTONIC_MEDIA",
35 "service-interface-point": {
36 "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107"
38 "administrative-state": "UNLOCKED",
39 "operational-state": "ENABLED",
40 "direction": "BIDIRECTIONAL",
42 "protection-role": "WORK",
43 "local-id": "SPDR-SA1-XPDR1",
46 "value-name": "OpenROADM node id",
47 "value": "SPDR-SA1-XPDR1"
52 "layer-protocol-name": "PHOTONIC_MEDIA",
53 "service-interface-point": {
54 "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8"
56 "administrative-state": "UNLOCKED",
57 "operational-state": "ENABLED",
58 "direction": "BIDIRECTIONAL",
60 "protection-role": "WORK",
61 "local-id": "SPDR-SC1-XPDR1",
64 "value-name": "OpenROADM node id",
65 "value": "SPDR-SC1-XPDR1"
70 "connectivity-constraint": {
71 "service-layer": "PHOTONIC_MEDIA",
72 "service-type": "POINT_TO_POINT_CONNECTIVITY",
73 "service-level": "Some service-level",
74 "requested-capacity": {
81 "state": "Some state"}}
85 cls.init_failed = False
86 os.environ['JAVA_MIN_MEM'] = '1024M'
87 os.environ['JAVA_MAX_MEM'] = '4096M'
88 cls.processes = test_utils.start_tpce()
89 # TAPI feature is not installed by default in Karaf
90 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
91 print("installing tapi feature...")
92 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
93 if result.returncode != 0:
94 cls.init_failed = True
95 print("Restarting OpenDaylight...")
96 test_utils.shutdown_process(cls.processes[0])
97 cls.processes[0] = test_utils.start_karaf()
98 test_utils.process_list[0] = cls.processes[0]
99 cls.init_failed = not test_utils.wait_until_log_contains(
100 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
102 print("tapi installation feature failed...")
103 test_utils.shutdown_process(cls.processes[0])
105 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
106 ('roadma', cls.NODE_VERSION),
107 ('roadmc', cls.NODE_VERSION),
108 ('spdrc', cls.NODE_VERSION)])
111 def tearDownClass(cls):
112 # pylint: disable=not-an-iterable
113 for process in cls.processes:
114 test_utils.shutdown_process(process)
115 print("all processes killed")
120 def test_01_connect_spdrA(self):
121 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
122 self.assertEqual(response.status_code,
123 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
125 def test_02_connect_spdrC(self):
126 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
127 self.assertEqual(response.status_code,
128 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
130 def test_03_connect_rdmA(self):
131 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
132 self.assertEqual(response.status_code,
133 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
135 def test_04_connect_rdmC(self):
136 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
137 self.assertEqual(response.status_code,
138 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
140 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
141 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
142 "ROADM-A1", "1", "SRG1-PP1-TXRX")
143 self.assertEqual(response.status_code, requests.codes.ok)
144 res = response.json()
145 self.assertIn('Xponder Roadm Link created successfully',
146 res["output"]["result"])
149 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
150 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
151 "ROADM-A1", "1", "SRG1-PP1-TXRX")
152 self.assertEqual(response.status_code, requests.codes.ok)
153 res = response.json()
154 self.assertIn('Roadm Xponder links created successfully',
155 res["output"]["result"])
158 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
159 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
160 "ROADM-C1", "1", "SRG1-PP1-TXRX")
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Xponder Roadm Link created successfully',
164 res["output"]["result"])
167 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
168 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
169 "ROADM-C1", "1", "SRG1-PP1-TXRX")
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('Roadm Xponder links created successfully',
173 res["output"]["result"])
176 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
177 # Config ROADMA-ROADMC oms-attributes
179 "auto-spanloss": "true",
180 "spanloss-base": 11.4,
181 "spanloss-current": 12,
182 "engineered-spanloss": 12.2,
183 "link-concatenation": [{
186 "SRLG-length": 100000,
188 response = test_utils.add_oms_attr_request(
189 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
190 self.assertEqual(response.status_code, requests.codes.created)
193 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
194 # Config ROADMC-ROADMA oms-attributes
196 "auto-spanloss": "true",
197 "spanloss-base": 11.4,
198 "spanloss-current": 12,
199 "engineered-spanloss": 12.2,
200 "link-concatenation": [{
203 "SRLG-length": 100000,
205 response = test_utils.add_oms_attr_request(
206 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
207 self.assertEqual(response.status_code, requests.codes.created)
210 def test_11_check_otn_topology(self):
211 response = test_utils.get_otn_topo_request()
212 self.assertEqual(response.status_code, requests.codes.ok)
213 res = response.json()
214 nbNode = len(res['network'][0]['node'])
215 self.assertEqual(nbNode, 6, 'There should be 6 otn nodes')
216 self.assertNotIn('ietf-network-topology:link', res['network'][0])
219 def test_12_check_openroadm_topology(self):
220 response = test_utils.get_ordm_topo_request("")
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 nbNode = len(res['network'][0]['node'])
224 nbLink = len(res['network'][0]['ietf-network-topology:link'])
225 self.assertEqual(nbNode, 13, 'There should be 13 openroadm nodes')
226 self.assertEqual(nbLink, 22, 'There should be 22 openroadm links')
229 def test_13_get_tapi_topology_details(self):
230 response = test_utils.tapi_get_topology_details_request(
231 "T0 - Full Multi-layer topology")
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 nbNode = len(res['output']['topology']['node'])
236 nbLink = len(res['output']['topology']['link'])
237 self.assertEqual(nbNode, 14, 'There should be 14 TAPI nodes')
238 self.assertEqual(nbLink, 13, 'There should be 13 TAPI links')
241 def test_14_check_sip_details(self):
242 response = test_utils.tapi_get_sip_details_request()
243 self.assertEqual(response.status_code, requests.codes.ok)
244 res = response.json()
245 nbSip = len(res['output']['sip'])
246 self.assertEqual(nbSip, 60, 'There should be 60 service interface point')
249 # test create connectivity service from spdrA to spdrC for Photonic_media
250 def test_15_create_connectivity_service_PhotonicMedia(self):
251 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
252 time.sleep(self.WAITING)
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
255 global service_pm_uuid
256 service_pm_uuid = res['output']['service']['uuid']
257 print("photonic media service uuid : {}".format(res['output']['service']['uuid']))
259 input_dict_1 = {'administrative-state': 'LOCKED',
260 'lifecycle-state': 'PLANNED',
261 'operational-state': 'DISABLED',
262 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
263 'service-layer': 'PHOTONIC_MEDIA',
264 'connectivity-direction': 'BIDIRECTIONAL'
266 input_dict_2 = {'value-name': 'OpenROADM node id',
267 'value': 'SPDR-SC1-XPDR1'}
268 input_dict_3 = {'value-name': 'OpenROADM node id',
269 'value': 'SPDR-SA1-XPDR1'}
271 self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
272 res['output']['service'])
273 self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
274 res['output']['service']['end-point'][0]['name'][0])
275 self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
276 res['output']['service']['end-point'][1]['name'][0])
277 time.sleep(self.WAITING)
279 def test_16_get_service_PhotonicMedia(self):
280 response = test_utils.get_service_list_request(
281 "services/"+ str(service_pm_uuid))
282 self.assertEqual(response.status_code, requests.codes.ok)
283 res = response.json()
285 res['services'][0]['administrative-state'], 'inService')
287 res['services'][0]['service-name'], service_pm_uuid)
289 res['services'][0]['connection-type'], 'infrastructure')
291 res['services'][0]['lifecycle-state'], 'planned')
294 # test create connectivity service from spdrA to spdrC for odu
295 def test_17_create_connectivity_service_ODU(self):
296 self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "ODU"
297 self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "eecbfa6e-57ab-3651-9606-c22c8ce73f18"
298 self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "ODU"
299 self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "31f83b1f-29b2-3a8e-af9b-6423dbc5aa22"
300 self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "ODU"
302 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
303 time.sleep(self.WAITING)
304 self.assertEqual(response.status_code, requests.codes.ok)
305 res = response.json()
306 global service_odu_uuid
307 service_odu_uuid = res['output']['service']['uuid']
308 print("odu service uuid : {}".format(res['output']['service']['uuid']))
310 input_dict_1 = {'administrative-state': 'LOCKED',
311 'lifecycle-state': 'PLANNED',
312 'operational-state': 'DISABLED',
313 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
314 'service-layer': 'ODU',
315 'connectivity-direction': 'BIDIRECTIONAL'
317 input_dict_2 = {'value-name': 'OpenROADM node id',
318 'value': 'SPDR-SC1-XPDR1'}
319 input_dict_3 = {'value-name': 'OpenROADM node id',
320 'value': 'SPDR-SA1-XPDR1'}
322 self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
323 res['output']['service'])
324 self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
325 res['output']['service']['end-point'][0]['name'][0])
326 self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
327 res['output']['service']['end-point'][1]['name'][0])
328 time.sleep(self.WAITING)
330 def test_18_get_service_ODU(self):
331 response = test_utils.get_service_list_request(
332 "services/"+ str(service_odu_uuid))
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
336 res['services'][0]['administrative-state'], 'inService')
338 res['services'][0]['service-name'], service_odu_uuid)
340 res['services'][0]['connection-type'], 'infrastructure')
342 res['services'][0]['lifecycle-state'], 'planned')
345 # test create connectivity service from spdrA to spdrC for dsr
346 def test_19_create_connectivity_service_DSR(self):
347 self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "DSR"
348 self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
349 self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "DSR"
350 self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22"
351 self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "DSR"
352 self.cr_serv_sample_data["input"]["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10"
354 response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
355 time.sleep(self.WAITING)
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 global service_dsr_uuid
359 service_dsr_uuid = res['output']['service']['uuid']
360 print("dsr service uuid : {}".format(res['output']['service']['uuid']))
362 input_dict_1 = {'administrative-state': 'LOCKED',
363 'lifecycle-state': 'PLANNED',
364 'operational-state': 'DISABLED',
365 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
366 'service-layer': 'DSR',
367 'connectivity-direction': 'BIDIRECTIONAL'
369 input_dict_2 = {'value-name': 'OpenROADM node id',
370 'value': 'SPDR-SC1-XPDR1'}
371 input_dict_3 = {'value-name': 'OpenROADM node id',
372 'value': 'SPDR-SA1-XPDR1'}
374 self.assertDictEqual(dict(input_dict_1,
375 **res['output']['service']),
376 res['output']['service'])
377 self.assertDictEqual(dict(input_dict_2,
378 **res['output']['service']['end-point'][0]['name'][0]),
379 res['output']['service']['end-point'][0]['name'][0])
380 self.assertDictEqual(dict(input_dict_3,
381 **res['output']['service']['end-point'][1]['name'][0]),
382 res['output']['service']['end-point'][1]['name'][0])
383 time.sleep(self.WAITING)
385 def test_20_get_service_DSR(self):
386 response = test_utils.get_service_list_request(
387 "services/"+ str(service_dsr_uuid))
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
391 res['services'][0]['administrative-state'], 'inService')
393 res['services'][0]['service-name'], service_dsr_uuid)
395 res['services'][0]['connection-type'], 'service')
397 res['services'][0]['lifecycle-state'], 'planned')
400 def test_21_get_connectivity_service_list(self):
401 response = test_utils.tapi_get_service_list_request()
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 liste_service = res['output']['service']
405 for ele in liste_service:
406 if ele['uuid'] == service_pm_uuid:
407 self.assertEqual(ele['operational-state'], 'ENABLED')
408 self.assertEqual(ele['service-layer'], 'PHOTONIC_MEDIA')
409 nbconnection = len(ele['connection'])
410 self.assertEqual(nbconnection, 9, 'There should be 9 connections')
411 elif ele['uuid'] == service_odu_uuid:
412 self.assertEqual(ele['operational-state'], 'ENABLED')
413 self.assertEqual(ele['service-layer'], 'ODU')
414 nbconnection = len(ele['connection'])
415 self.assertEqual(nbconnection, 3, 'There should be 3 connections')
416 elif ele['uuid'] == service_dsr_uuid:
417 self.assertEqual(ele['operational-state'], 'ENABLED')
418 self.assertEqual(ele['service-layer'], 'DSR')
419 nbconnection = len(ele['connection'])
420 self.assertEqual(nbconnection, 1, 'There should be 1 connection')
422 self.fail("get connectivity service failed")
425 def test_22_delete_connectivity_service_DSR(self):
426 response = test_utils.tapi_delete_connectivity_request(service_dsr_uuid)
427 self.assertEqual(response.status_code, requests.codes.no_content)
428 time.sleep(self.WAITING)
430 def test_23_delete_connectivity_service_ODU(self):
431 response = test_utils.tapi_delete_connectivity_request(service_odu_uuid)
432 self.assertEqual(response.status_code, requests.codes.no_content)
433 time.sleep(self.WAITING)
435 def test_24_delete_connectivity_service_PhotonicMedia(self):
436 response = test_utils.tapi_delete_connectivity_request(service_pm_uuid)
437 self.assertEqual(response.status_code, requests.codes.no_content)
438 time.sleep(self.WAITING)
440 def test_25_get_no_tapi_services(self):
441 response = test_utils.tapi_get_service_list_request()
442 res = response.json()
444 {"error-type": "rpc", "error-tag": "operation-failed",
445 "error-message": "No services exist in datastore",
446 "error-info": "<severity>error</severity>"},
447 res['errors']['error'])
450 def test_26_get_no_openroadm_services(self):
451 response = test_utils.get_service_list_request("")
452 self.assertEqual(response.status_code, requests.codes.conflict)
453 res = response.json()
455 {"error-type": "application", "error-tag": "data-missing",
456 "error-message": "Request could not be completed because the relevant data model content does not exist"},
457 res['errors']['error'])
460 def test_27_disconnect_spdrA(self):
461 response = test_utils.unmount_device("SPDR-SA1")
462 self.assertEqual(response.status_code, requests.codes.ok,
463 test_utils.CODE_SHOULD_BE_200)
465 def test_28_disconnect_spdrC(self):
466 response = test_utils.unmount_device("SPDR-SC1")
467 self.assertEqual(response.status_code, requests.codes.ok,
468 test_utils.CODE_SHOULD_BE_200)
470 def test_29_disconnect_roadmA(self):
471 response = test_utils.unmount_device("ROADM-A1")
472 self.assertEqual(response.status_code, requests.codes.ok,
473 test_utils.CODE_SHOULD_BE_200)
475 def test_30_disconnect_roadmC(self):
476 response = test_utils.unmount_device("ROADM-C1")
477 self.assertEqual(response.status_code, requests.codes.ok,
478 test_utils.CODE_SHOULD_BE_200)
481 if __name__ == "__main__":
482 unittest.main(verbosity=2)