3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
29 NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
30 "supporting-port": "CP1-CFP0-P1",
31 "supported-interface-capability": [
32 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
34 "port-direction": "bidirectional",
35 "port-qual": "xpdr-network",
36 "supporting-circuit-pack-name": "CP1-CFP0",
38 'lcp-hash-val': 'Swfw02qXGyI=',
39 'port-admin-state': 'InService',
40 'port-oper-state': 'InService'}
41 NODE_VERSION = '2.2.1'
45 cls.processes = test_utils.start_tpce()
46 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
49 def tearDownClass(cls):
50 # pylint: disable=not-an-iterable
51 for process in cls.processes:
52 test_utils.shutdown_process(process)
53 print("all processes killed")
58 def test_01_connect_SPDR_SA1(self):
59 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
60 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
63 response = test_utils.check_device_connection("SPDR-SA1")
64 self.assertEqual(response['status_code'], requests.codes.ok)
65 self.assertEqual(response['connection-status'], 'connected')
67 def test_02_get_portmapping_CLIENT4(self):
68 response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-CLIENT4")
69 self.assertEqual(response['status_code'], requests.codes.ok)
70 res_mapping = response['mapping'][0]
71 self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
72 self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
73 self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
74 self.assertEqual('bidirectional', res_mapping['port-direction'])
75 self.assertEqual('xpdr-client', res_mapping['port-qual'])
76 self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
77 self.assertEqual('InService', res_mapping['port-admin-state'])
78 self.assertEqual('InService', res_mapping['port-oper-state'])
79 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
80 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
81 self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
83 def test_03_get_portmapping_NETWORK1(self):
84 response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
85 self.assertEqual(response['status_code'], requests.codes.ok)
87 self.NETWORK1_CHECK_DICT,
90 def test_04_service_path_create_OCH_OTU4(self):
91 response = test_utils.transportpce_api_rpc_request(
92 'transportpce-device-renderer', 'service-path',
94 'service-name': 'service_test',
96 'modulation-format': 'dp-qpsk',
97 'operation': 'create',
98 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
103 'lower-spectral-slot-number': 761,
104 'higher-spectral-slot-number': 768
106 self.assertEqual(response['status_code'], requests.codes.ok)
107 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
108 self.assertTrue(response['output']['success'])
110 {'node-id': 'SPDR-SA1',
111 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
112 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, response['output']['node-interface'])
114 def test_05_get_portmapping_NETWORK1(self):
115 response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
116 self.assertEqual(response['status_code'], requests.codes.ok)
117 self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
119 self.NETWORK1_CHECK_DICT,
122 def test_06_check_interface_och(self):
123 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
124 self.assertEqual(response['status_code'], requests.codes.ok)
126 self.assertDictEqual(dict(response['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
127 'administrative-state': 'inService',
128 'supporting-circuit-pack-name': 'CP1-CFP0',
129 'type': 'org-openroadm-interfaces:opticalChannel',
130 'supporting-port': 'CP1-CFP0-P1'
132 response['interface'][0])
133 intf = response['interface'][0]['org-openroadm-optical-channel-interfaces:och']
134 self.assertEqual(intf['rate'], 'org-openroadm-common-types:R100G')
135 self.assertEqual(intf['modulation-format'], 'dp-qpsk')
136 self.assertEqual(float(intf['frequency']), 196.1)
137 self.assertEqual(float(intf['transmit-power']), -5)
139 def test_07_check_interface_OTU(self):
140 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
141 self.assertEqual(response['status_code'], requests.codes.ok)
142 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
143 'administrative-state': 'inService',
144 'supporting-circuit-pack-name': 'CP1-CFP0',
145 'supporting-interface': 'XPDR1-NETWORK1-761:768',
146 'type': 'org-openroadm-interfaces:otnOtu',
147 'supporting-port': 'CP1-CFP0-P1'
150 input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTU4',
154 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
155 response['interface'][0])
157 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'], **input_dict_2),
158 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
160 def test_08_otn_service_path_create_ODU4(self):
161 response = test_utils.transportpce_api_rpc_request(
162 'transportpce-device-renderer', 'otn-service-path',
164 'service-name': 'service_ODU4',
165 'operation': 'create',
166 'service-rate': '100',
167 'service-format': 'ODU',
168 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
170 self.assertEqual(response['status_code'], requests.codes.ok)
171 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
172 self.assertTrue(response['output']['success'])
174 {'node-id': 'SPDR-SA1',
175 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, response['output']['node-interface'])
177 def test_09_get_portmapping_NETWORK1(self):
178 response = test_utils.get_portmapping_node_attr("SPDR-SA1", "mapping", "XPDR1-NETWORK1")
179 self.assertEqual(response['status_code'], requests.codes.ok)
180 self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
181 self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
183 self.NETWORK1_CHECK_DICT,
186 def test_10_check_interface_ODU4(self):
187 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
188 self.assertEqual(response['status_code'], requests.codes.ok)
189 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
190 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
191 'type': 'org-openroadm-interfaces:otnOdu',
192 'supporting-port': 'CP1-CFP0-P1'}
193 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
194 'rate': 'org-openroadm-otn-common-types:ODU4'}
196 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
197 response['interface'][0])
198 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
201 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
203 self.assertDictEqual(
204 {'payload-type': '21', 'exp-payload-type': '21'},
205 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
207 def test_11_otn_service_path_create_10GE(self):
208 response = test_utils.transportpce_api_rpc_request(
209 'transportpce-device-renderer', 'otn-service-path',
211 'service-name': 'service1',
212 'operation': 'create',
213 'service-rate': '10',
214 'service-format': 'Ethernet',
215 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
216 'ethernet-encoding': 'eth encode',
218 'trib-port-number': '1'
220 self.assertEqual(response['status_code'], requests.codes.ok)
221 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
222 self.assertTrue(response['output']['success'])
223 self.assertEqual('SPDR-SA1', response['output']['node-interface'][0]['node-id'])
224 self.assertIn('XPDR1-CLIENT4-ODU2e-x-XPDR1-NETWORK1-ODU2e',
225 response['output']['node-interface'][0]['connection-id'])
226 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', response['output']['node-interface'][0]['eth-interface-id'])
227 self.assertIn('XPDR1-NETWORK1-ODU2e:service1', response['output']['node-interface'][0]['odu-interface-id'])
228 self.assertIn('XPDR1-CLIENT4-ODU2e:service1', response['output']['node-interface'][0]['odu-interface-id'])
230 def test_12_check_interface_10GE_CLIENT(self):
231 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
232 self.assertEqual(response['status_code'], requests.codes.ok)
233 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
234 'administrative-state': 'inService',
235 'supporting-circuit-pack-name': 'CP1-SFP4',
236 'type': 'org-openroadm-interfaces:ethernetCsmacd',
237 'supporting-port': 'CP1-SFP4-P1'
239 self.assertDictEqual(dict(response['interface'][0], **input_dict),
240 response['interface'][0])
241 self.assertEqual(10000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
243 def test_13_check_interface_ODU2E_CLIENT(self):
244 response = test_utils.check_node_attribute_request(
245 "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e:service1")
246 self.assertEqual(response['status_code'], requests.codes.ok)
248 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e:service1',
249 'administrative-state': 'inService',
250 'supporting-circuit-pack-name': 'CP1-SFP4',
251 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
252 'type': 'org-openroadm-interfaces:otnOdu',
253 'supporting-port': 'CP1-SFP4-P1'}
255 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
256 'rate': 'org-openroadm-otn-common-types:ODU2e',
257 'monitoring-mode': 'terminated'}
259 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
260 response['interface'][0])
261 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
263 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
264 self.assertDictEqual(
265 {'payload-type': '03', 'exp-payload-type': '03'},
266 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
268 def test_14_check_interface_ODU2E_NETWORK(self):
269 response = test_utils.check_node_attribute_request(
270 "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e:service1")
271 self.assertEqual(response['status_code'], requests.codes.ok)
272 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e:service1', 'administrative-state': 'inService',
273 'supporting-circuit-pack-name': 'CP1-CFP0',
274 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
275 'type': 'org-openroadm-interfaces:otnOdu',
276 'supporting-port': 'CP1-CFP0-P1'}
278 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
279 'rate': 'org-openroadm-otn-common-types:ODU2e',
280 'monitoring-mode': 'monitored'}
282 input_dict_3 = {'trib-port-number': 1}
284 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
285 response['interface'][0])
286 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
288 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
289 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
290 'parent-odu-allocation'], **input_dict_3
292 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
293 'parent-odu-allocation'])
295 response['interface'][0][
296 'org-openroadm-otn-odu-interfaces:odu'][
297 'parent-odu-allocation']['trib-slots'])
299 def test_15_check_ODU2E_connection(self):
300 response = test_utils.check_node_attribute_request(
301 "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-x-XPDR1-NETWORK1-ODU2e")
302 self.assertEqual(response['status_code'], requests.codes.ok)
305 'XPDR1-CLIENT4-ODU2e-x-XPDR1-NETWORK1-ODU2e',
306 'direction': 'bidirectional'
309 self.assertDictEqual(dict(response['odu-connection'][0], **input_dict_1),
310 response['odu-connection'][0])
311 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e:service1'},
312 response['odu-connection'][0]['destination'])
313 self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e:service1'},
314 response['odu-connection'][0]['source'])
316 def test_16_otn_service_path_delete_10GE(self):
317 response = test_utils.transportpce_api_rpc_request(
318 'transportpce-device-renderer', 'otn-service-path',
320 'service-name': 'service1',
321 'operation': 'delete',
322 'service-rate': '10',
323 'service-format': 'Ethernet',
324 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
325 'ethernet-encoding': 'eth encode',
327 'trib-port-number': '1'
329 self.assertEqual(response['status_code'], requests.codes.ok)
330 self.assertIn('Request processed', response['output']['result'])
331 self.assertTrue(response['output']['success'])
333 def test_17_check_no_ODU2E_connection(self):
334 response = test_utils.check_node_attribute_request(
335 "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-x-XPDR1-NETWORK1-ODU2e")
336 self.assertEqual(response['status_code'], requests.codes.conflict)
338 def test_18_check_no_interface_ODU2E_NETWORK(self):
339 response = test_utils.check_node_attribute_request(
340 "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e:service1")
341 self.assertEqual(response['status_code'], requests.codes.conflict)
343 def test_19_check_no_interface_ODU2E_CLIENT(self):
344 response = test_utils.check_node_attribute_request(
345 "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e:service1")
346 self.assertEqual(response['status_code'], requests.codes.conflict)
348 def test_20_check_no_interface_10GE_CLIENT(self):
349 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
350 self.assertEqual(response['status_code'], requests.codes.conflict)
352 def test_21_otn_service_path_delete_ODU4(self):
353 response = test_utils.transportpce_api_rpc_request(
354 'transportpce-device-renderer', 'otn-service-path',
356 'service-name': 'service_ODU4',
357 'operation': 'delete',
358 'service-rate': '100',
359 'service-format': 'ODU',
360 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
362 self.assertEqual(response['status_code'], requests.codes.ok)
363 self.assertIn('Request processed', response['output']['result'])
364 self.assertTrue(response['output']['success'])
366 def test_22_check_no_interface_ODU4(self):
367 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
368 self.assertEqual(response['status_code'], requests.codes.conflict)
370 def test_23_service_path_delete_OCH_OTU4(self):
371 response = test_utils.transportpce_api_rpc_request(
372 'transportpce-device-renderer', 'service-path',
374 'service-name': 'service_test',
376 'modulation-format': 'dp-qpsk',
377 'operation': 'delete',
378 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
379 'center-freq': 196.1,
383 'lower-spectral-slot-number': 761,
384 'higher-spectral-slot-number': 768
386 self.assertEqual(response['status_code'], requests.codes.ok)
387 self.assertIn('Request processed', response['output']['result'])
388 self.assertTrue(response['output']['success'])
390 def test_24_check_no_interface_OTU4(self):
391 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
392 self.assertEqual(response['status_code'], requests.codes.conflict)
394 def test_25_check_no_interface_OCH(self):
395 response = test_utils.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1")
396 self.assertEqual(response['status_code'], requests.codes.conflict)
398 def test_26_disconnect_SPDR_SA1(self):
399 response = test_utils.unmount_device("SPDR-SA1")
400 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
403 if __name__ == "__main__":
404 unittest.main(verbosity=2)