3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 # pylint: disable=wrong-import-order
21 sys.path.append('transportpce_tests/common/')
22 # pylint: disable=wrong-import-position
23 # pylint: disable=import-error
24 import test_utils # nopep8
27 # pylint: disable=too-few-public-methods
30 # pylint: disable=invalid-name
36 class TransportPCEtesting(unittest.TestCase):
39 WAITING = 20 # nominal value is 300
40 NODE_VERSION = '2.2.1'
41 uuid_services = UuidServices()
43 cr_serv_input_data = {
46 "layer-protocol-name": "PHOTONIC_MEDIA",
47 "service-interface-point": {
48 "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107"
50 "administrative-state": "UNLOCKED",
51 "operational-state": "ENABLED",
52 "direction": "BIDIRECTIONAL",
54 "protection-role": "WORK",
55 "local-id": "SPDR-SA1-XPDR1",
58 "value-name": "OpenROADM node id",
59 "value": "SPDR-SA1-XPDR1"
64 "layer-protocol-name": "PHOTONIC_MEDIA",
65 "service-interface-point": {
66 "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8"
68 "administrative-state": "UNLOCKED",
69 "operational-state": "ENABLED",
70 "direction": "BIDIRECTIONAL",
72 "protection-role": "WORK",
73 "local-id": "SPDR-SC1-XPDR1",
76 "value-name": "OpenROADM node id",
77 "value": "SPDR-SC1-XPDR1"
82 "connectivity-constraint": {
83 "service-layer": "PHOTONIC_MEDIA",
84 "service-type": "POINT_TO_POINT_CONNECTIVITY",
85 "service-level": "Some service-level",
86 "requested-capacity": {
93 "state": "Some state"}
95 del_serv_input_data = {"service-id-or-name": "TBD"}
97 tapi_topo = {"topology-id-or-name": "TBD"}
101 # pylint: disable=unsubscriptable-object
102 cls.init_failed = False
103 os.environ['JAVA_MIN_MEM'] = '1024M'
104 os.environ['JAVA_MAX_MEM'] = '4096M'
105 cls.processes = test_utils.start_tpce()
106 # TAPI feature is not installed by default in Karaf
107 if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
108 print("installing tapi feature...")
109 result = test_utils.install_karaf_feature("odl-transportpce-tapi")
110 if result.returncode != 0:
111 cls.init_failed = True
112 print("Restarting OpenDaylight...")
113 test_utils.shutdown_process(cls.processes[0])
114 cls.processes[0] = test_utils.start_karaf()
115 test_utils.process_list[0] = cls.processes[0]
116 cls.init_failed = not test_utils.wait_until_log_contains(
117 test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
119 print("tapi installation feature failed...")
120 test_utils.shutdown_process(cls.processes[0])
122 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
123 ('roadma', cls.NODE_VERSION),
124 ('roadmc', cls.NODE_VERSION),
125 ('spdrc', cls.NODE_VERSION)])
128 def tearDownClass(cls):
129 # pylint: disable=not-an-iterable
130 for process in cls.processes:
131 test_utils.shutdown_process(process)
132 print("all processes killed")
137 def test_01_connect_spdrA(self):
138 print("Connecting SPDRA")
139 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
140 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_02_connect_spdrC(self):
144 print("Connecting SPDRC")
145 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_03_connect_rdmA(self):
150 print("Connecting ROADMA")
151 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
152 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_04_connect_rdmC(self):
156 print("Connecting ROADMC")
157 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
158 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
162 response = test_utils.transportpce_api_rpc_request(
163 'transportpce-networkutils', 'init-xpdr-rdm-links',
164 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
165 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
166 self.assertEqual(response['status_code'], requests.codes.ok)
167 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
170 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
171 response = test_utils.transportpce_api_rpc_request(
172 'transportpce-networkutils', 'init-rdm-xpdr-links',
173 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
174 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
175 self.assertEqual(response['status_code'], requests.codes.ok)
176 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
179 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
180 response = test_utils.transportpce_api_rpc_request(
181 'transportpce-networkutils', 'init-xpdr-rdm-links',
182 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
183 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
184 self.assertEqual(response['status_code'], requests.codes.ok)
185 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
188 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
189 response = test_utils.transportpce_api_rpc_request(
190 'transportpce-networkutils', 'init-rdm-xpdr-links',
191 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
192 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
193 self.assertEqual(response['status_code'], requests.codes.ok)
194 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
197 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
198 # Config ROADMA-ROADMC oms-attributes
200 "auto-spanloss": "true",
201 "spanloss-base": 11.4,
202 "spanloss-current": 12,
203 "engineered-spanloss": 12.2,
204 "link-concatenation": [{
207 "SRLG-length": 100000,
209 response = test_utils.add_oms_attr_request(
210 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
211 self.assertEqual(response.status_code, requests.codes.created)
213 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
214 # Config ROADMC-ROADMA oms-attributes
216 "auto-spanloss": "true",
217 "spanloss-base": 11.4,
218 "spanloss-current": 12,
219 "engineered-spanloss": 12.2,
220 "link-concatenation": [{
223 "SRLG-length": 100000,
225 response = test_utils.add_oms_attr_request(
226 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
227 self.assertEqual(response.status_code, requests.codes.created)
229 def test_11_check_otn_topology(self):
230 response = test_utils.get_ietf_network_request('otn-topology', 'config')
231 self.assertEqual(response['status_code'], requests.codes.ok)
232 self.assertEqual(len(response['network'][0]['node']), 6, 'There should be 6 otn nodes')
233 self.assertNotIn('ietf-network-topology:link', response['network'][0])
235 def test_12_check_openroadm_topology(self):
236 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
237 self.assertEqual(response['status_code'], requests.codes.ok)
238 self.assertEqual(len(response['network'][0]['node']), 13, 'There should be 13 openroadm nodes')
239 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 22,
240 'There should be 22 openroadm links')
242 def test_13_get_tapi_topology_details(self):
243 self.tapi_topo["topology-id-or-name"] = test_utils.T0_FULL_MULTILAYER_TOPO
244 response = test_utils.transportpce_api_rpc_request(
245 'tapi-topology', 'get-topology-details', self.tapi_topo)
247 self.assertEqual(response['status_code'], requests.codes.ok)
248 self.assertEqual(len(response['output']['topology']['node']), 14, 'There should be 14 TAPI nodes')
249 self.assertEqual(len(response['output']['topology']['link']), 15, 'There should be 15 TAPI links')
251 def test_14_check_sip_details(self):
252 response = test_utils.transportpce_api_rpc_request(
253 'tapi-common', 'get-service-interface-point-list', None)
254 self.assertEqual(len(response['output']['sip']), 60, 'There should be 60 service interface point')
256 # test create connectivity service from spdrA to spdrC for Photonic_media
257 def test_15_create_connectivity_service_PhotonicMedia(self):
258 response = test_utils.transportpce_api_rpc_request(
259 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
260 time.sleep(self.WAITING)
261 self.assertEqual(response['status_code'], requests.codes.ok)
262 self.uuid_services.pm = response['output']['service']['uuid']
263 # pylint: disable=consider-using-f-string
264 print("photonic media service uuid : {}".format(self.uuid_services.pm))
266 input_dict_1 = {'administrative-state': 'LOCKED',
267 'lifecycle-state': 'PLANNED',
268 'operational-state': 'DISABLED',
269 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
270 'service-layer': 'PHOTONIC_MEDIA',
271 'connectivity-direction': 'BIDIRECTIONAL'
273 input_dict_2 = {'value-name': 'OpenROADM node id',
274 'value': 'SPDR-SC1-XPDR1'}
275 input_dict_3 = {'value-name': 'OpenROADM node id',
276 'value': 'SPDR-SA1-XPDR1'}
278 self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
279 response['output']['service'])
280 self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
281 response['output']['service']['end-point'][0]['name'][0])
282 self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
283 response['output']['service']['end-point'][1]['name'][0])
284 # If the gate fails is because of the waiting time not being enough
285 # time.sleep(self.WAITING)
287 def test_16_get_service_PhotonicMedia(self):
288 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.pm))
289 self.assertEqual(response['status_code'], requests.codes.ok)
290 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
291 self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.pm))
292 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
293 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
296 # test create connectivity service from spdrA to spdrC for odu
297 def test_17_create_connectivity_service_ODU(self):
298 # pylint: disable=line-too-long
299 self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "ODU"
300 self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "5efda776-f8de-3e0b-9bbd-2c702e210946"
301 self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "ODU"
302 self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "8116d0af-39fa-3df5-bed2-dd2cd5e8217d"
303 self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "ODU"
304 self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.pm
306 response = test_utils.transportpce_api_rpc_request(
307 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
308 time.sleep(self.WAITING)
309 self.assertEqual(response['status_code'], requests.codes.ok)
310 self.uuid_services.odu = response['output']['service']['uuid']
311 # pylint: disable=consider-using-f-string
312 print("odu service uuid : {}".format(self.uuid_services.odu))
314 input_dict_1 = {'administrative-state': 'LOCKED',
315 'lifecycle-state': 'PLANNED',
316 'operational-state': 'DISABLED',
317 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
318 'service-layer': 'ODU',
319 'connectivity-direction': 'BIDIRECTIONAL'
321 input_dict_2 = {'value-name': 'OpenROADM node id',
322 'value': 'SPDR-SC1-XPDR1'}
323 input_dict_3 = {'value-name': 'OpenROADM node id',
324 'value': 'SPDR-SA1-XPDR1'}
326 self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
327 response['output']['service'])
328 self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
329 response['output']['service']['end-point'][0]['name'][0])
330 self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
331 response['output']['service']['end-point'][1]['name'][0])
332 # If the gate fails is because of the waiting time not being enough
333 # time.sleep(self.WAITING)
335 def test_18_get_service_ODU(self):
336 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.odu))
337 self.assertEqual(response['status_code'], requests.codes.ok)
338 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
339 self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.odu))
340 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
341 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
344 # test create connectivity service from spdrA to spdrC for dsr
345 def test_19_create_connectivity_service_DSR(self):
346 # pylint: disable=line-too-long
347 self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "DSR"
348 self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
349 self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "DSR"
350 self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22"
351 self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "DSR"
352 self.cr_serv_input_data["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10"
353 self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.odu
355 response = test_utils.transportpce_api_rpc_request(
356 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
357 time.sleep(self.WAITING)
358 self.assertEqual(response['status_code'], requests.codes.ok)
359 self.uuid_services.dsr = response['output']['service']['uuid']
360 # pylint: disable=consider-using-f-string
361 print("dsr service uuid : {}".format(self.uuid_services.dsr))
363 input_dict_1 = {'administrative-state': 'LOCKED',
364 'lifecycle-state': 'PLANNED',
365 'operational-state': 'DISABLED',
366 'service-type': 'POINT_TO_POINT_CONNECTIVITY',
367 'service-layer': 'DSR',
368 'connectivity-direction': 'BIDIRECTIONAL'
370 input_dict_2 = {'value-name': 'OpenROADM node id',
371 'value': 'SPDR-SC1-XPDR1'}
372 input_dict_3 = {'value-name': 'OpenROADM node id',
373 'value': 'SPDR-SA1-XPDR1'}
375 self.assertDictEqual(dict(input_dict_1,
376 **response['output']['service']),
377 response['output']['service'])
378 self.assertDictEqual(dict(input_dict_2,
379 **response['output']['service']['end-point'][0]['name'][0]),
380 response['output']['service']['end-point'][0]['name'][0])
381 self.assertDictEqual(dict(input_dict_3,
382 **response['output']['service']['end-point'][1]['name'][0]),
383 response['output']['service']['end-point'][1]['name'][0])
384 # The sleep here is okey as the DSR service creation is very fast
385 # time.sleep(self.WAITING)
387 def test_20_get_service_DSR(self):
388 response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.dsr))
389 self.assertEqual(response['status_code'], requests.codes.ok)
390 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
391 self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.dsr))
392 self.assertEqual(response['services'][0]['connection-type'], 'service')
393 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
396 def test_21_get_connectivity_service_list(self):
397 response = test_utils.transportpce_api_rpc_request(
398 'tapi-connectivity', 'get-connectivity-service-list', None)
399 self.assertEqual(response['status_code'], requests.codes.ok)
400 liste_service = response['output']['service']
401 for ele in liste_service:
402 if ele['uuid'] == self.uuid_services.pm:
403 self.assertEqual(ele['operational-state'], 'ENABLED')
404 self.assertEqual(ele['service-layer'], 'PHOTONIC_MEDIA')
405 nbconnection = len(ele['connection'])
406 self.assertEqual(nbconnection, 3, 'There should be 3 connections')
407 elif ele['uuid'] == self.uuid_services.odu:
408 self.assertEqual(ele['operational-state'], 'ENABLED')
409 self.assertEqual(ele['service-layer'], 'ODU')
410 nbconnection = len(ele['connection'])
411 self.assertEqual(nbconnection, 1, 'There should be 1 connections')
412 elif ele['uuid'] == self.uuid_services.dsr:
413 self.assertEqual(ele['operational-state'], 'ENABLED')
414 self.assertEqual(ele['service-layer'], 'DSR')
415 nbconnection = len(ele['connection'])
416 self.assertEqual(nbconnection, 2, 'There should be 2 connections')
418 self.fail("get connectivity service failed")
421 def test_22_delete_connectivity_service_DSR(self):
422 self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.dsr)
423 response = test_utils.transportpce_api_rpc_request(
424 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
425 self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
426 time.sleep(self.WAITING)
428 def test_23_delete_connectivity_service_ODU(self):
429 self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.odu)
430 response = test_utils.transportpce_api_rpc_request(
431 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
432 self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
433 time.sleep(self.WAITING)
435 def test_24_delete_connectivity_service_PhotonicMedia(self):
436 self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.pm)
437 response = test_utils.transportpce_api_rpc_request(
438 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
439 self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
440 time.sleep(self.WAITING)
442 def test_25_get_no_tapi_services(self):
443 response = test_utils.transportpce_api_rpc_request(
444 'tapi-connectivity', 'get-connectivity-service-list', None)
445 self.assertEqual(response['status_code'], requests.codes.internal_server_error)
447 {"error-type": "rpc", "error-tag": "operation-failed",
448 "error-message": "No services exist in datastore",
449 "error-info": "<severity>error</severity>"},
450 response['output']['errors']['error'])
452 def test_26_get_no_openroadm_services(self):
453 response = test_utils.get_ordm_serv_list_request()
454 self.assertEqual(response['status_code'], requests.codes.conflict)
456 def test_27_disconnect_spdrA(self):
457 response = test_utils.unmount_device("SPDR-SA1")
458 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
460 def test_28_disconnect_spdrC(self):
461 response = test_utils.unmount_device("SPDR-SC1")
462 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
464 def test_29_disconnect_roadmA(self):
465 response = test_utils.unmount_device("ROADM-A1")
466 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
468 def test_30_disconnect_roadmC(self):
469 response = test_utils.unmount_device("ROADM-C1")
470 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
473 if __name__ == "__main__":
474 unittest.main(verbosity=2)