3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040 # nopep8
26 class TransportPCEtesting(unittest.TestCase):
29 NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
30 "supporting-port": "CP1-CFP0-P1",
31 "supported-interface-capability": [
32 "org-openroadm-port-types:if-OCH-OTU4-ODU4"
34 "port-direction": "bidirectional",
35 "port-qual": "xpdr-network",
36 "supporting-circuit-pack-name": "CP1-CFP0",
37 "xponder-type": "mpdr",
38 'lcp-hash-val': 'Swfw02qXGyI=',
39 'port-admin-state': 'InService',
40 'port-oper-state': 'InService'}
41 NODE_VERSION = '2.2.1'
45 cls.processes = test_utils_rfc8040.start_tpce()
46 cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION)])
49 def tearDownClass(cls):
50 # pylint: disable=not-an-iterable
51 for process in cls.processes:
52 test_utils_rfc8040.shutdown_process(process)
53 print("all processes killed")
58 def test_01_connect_SPDR_SA1(self):
59 response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
60 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
63 response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
64 self.assertEqual(response['status_code'], requests.codes.ok)
65 self.assertEqual(response['connection-status'], 'connected')
67 def test_02_get_portmapping_CLIENT4(self):
68 response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-CLIENT4")
69 self.assertEqual(response['status_code'], requests.codes.ok)
70 res_mapping = response['mapping'][0]
71 self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
72 self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
73 self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
74 self.assertEqual('bidirectional', res_mapping['port-direction'])
75 self.assertEqual('xpdr-client', res_mapping['port-qual'])
76 self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
77 self.assertEqual('InService', res_mapping['port-admin-state'])
78 self.assertEqual('InService', res_mapping['port-oper-state'])
79 self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
80 self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
81 self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
83 def test_03_get_portmapping_NETWORK1(self):
84 response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
85 self.assertEqual(response['status_code'], requests.codes.ok)
87 self.NETWORK1_CHECK_DICT,
90 def test_04_service_path_create_OCH_OTU4(self):
91 response = test_utils_rfc8040.transportpce_api_rpc_request(
92 'transportpce-device-renderer', 'service-path',
94 'service-name': 'service_test',
96 'modulation-format': 'dp-qpsk',
97 'operation': 'create',
98 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
103 'lower-spectral-slot-number': 761,
104 'higher-spectral-slot-number': 768
106 self.assertEqual(response['status_code'], requests.codes.ok)
107 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
108 self.assertTrue(response['output']['success'])
110 {'node-id': 'SPDR-SA1',
111 'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
112 'och-interface-id': ['XPDR1-NETWORK1-761:768']}, response['output']['node-interface'])
114 def test_05_get_portmapping_NETWORK1(self):
115 response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
116 self.assertEqual(response['status_code'], requests.codes.ok)
117 self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
119 self.NETWORK1_CHECK_DICT,
122 def test_06_check_interface_och(self):
123 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
124 self.assertEqual(response['status_code'], requests.codes.ok)
126 self.assertDictEqual(dict(response['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
127 'administrative-state': 'inService',
128 'supporting-circuit-pack-name': 'CP1-CFP0',
129 'type': 'org-openroadm-interfaces:opticalChannel',
130 'supporting-port': 'CP1-CFP0-P1'
132 response['interface'][0])
135 response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
136 [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
137 'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
138 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
139 'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
141 def test_07_check_interface_OTU(self):
142 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
143 self.assertEqual(response['status_code'], requests.codes.ok)
144 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
145 'administrative-state': 'inService',
146 'supporting-circuit-pack-name': 'CP1-CFP0',
147 'supporting-interface': 'XPDR1-NETWORK1-761:768',
148 'type': 'org-openroadm-interfaces:otnOtu',
149 'supporting-port': 'CP1-CFP0-P1'
152 input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTU4',
156 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
157 response['interface'][0])
159 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'], **input_dict_2),
160 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
162 def test_08_otn_service_path_create_ODU4(self):
163 response = test_utils_rfc8040.transportpce_api_rpc_request(
164 'transportpce-device-renderer', 'otn-service-path',
166 'service-name': 'service_ODU4',
167 'operation': 'create',
168 'service-rate': '100',
169 'service-format': 'ODU',
170 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
172 self.assertEqual(response['status_code'], requests.codes.ok)
173 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
174 self.assertTrue(response['output']['success'])
176 {'node-id': 'SPDR-SA1',
177 'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, response['output']['node-interface'])
179 def test_09_get_portmapping_NETWORK1(self):
180 response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
181 self.assertEqual(response['status_code'], requests.codes.ok)
182 self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
183 self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
185 self.NETWORK1_CHECK_DICT,
188 def test_10_check_interface_ODU4(self):
189 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
190 self.assertEqual(response['status_code'], requests.codes.ok)
191 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
192 'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
193 'type': 'org-openroadm-interfaces:otnOdu',
194 'supporting-port': 'CP1-CFP0-P1'}
195 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
196 'rate': 'org-openroadm-otn-common-types:ODU4'}
198 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
199 response['interface'][0])
200 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
203 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
205 self.assertDictEqual(
206 {'payload-type': '21', 'exp-payload-type': '21'},
207 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
209 def test_11_otn_service_path_create_10GE(self):
210 response = test_utils_rfc8040.transportpce_api_rpc_request(
211 'transportpce-device-renderer', 'otn-service-path',
213 'service-name': 'service1',
214 'operation': 'create',
215 'service-rate': '10',
216 'service-format': 'Ethernet',
217 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
218 'ethernet-encoding': 'eth encode',
220 'trib-port-number': '1'
222 self.assertEqual(response['status_code'], requests.codes.ok)
223 self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
224 self.assertTrue(response['output']['success'])
225 self.assertEqual('SPDR-SA1', response['output']['node-interface'][0]['node-id'])
226 self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
227 response['output']['node-interface'][0]['connection-id'])
228 self.assertIn('XPDR1-CLIENT4-ETHERNET10G', response['output']['node-interface'][0]['eth-interface-id'])
229 self.assertIn('XPDR1-NETWORK1-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
230 self.assertIn('XPDR1-CLIENT4-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
232 def test_12_check_interface_10GE_CLIENT(self):
233 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
234 self.assertEqual(response['status_code'], requests.codes.ok)
235 input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
236 'administrative-state': 'inService',
237 'supporting-circuit-pack-name': 'CP1-SFP4',
238 'type': 'org-openroadm-interfaces:ethernetCsmacd',
239 'supporting-port': 'CP1-SFP4-P1'
241 self.assertDictEqual(dict(response['interface'][0], **input_dict),
242 response['interface'][0])
243 self.assertEqual(10000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
245 def test_13_check_interface_ODU2E_CLIENT(self):
246 response = test_utils_rfc8040.check_node_attribute_request(
247 "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
248 self.assertEqual(response['status_code'], requests.codes.ok)
250 input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
251 'administrative-state': 'inService',
252 'supporting-circuit-pack-name': 'CP1-SFP4',
253 'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
254 'type': 'org-openroadm-interfaces:otnOdu',
255 'supporting-port': 'CP1-SFP4-P1'}
257 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
258 'rate': 'org-openroadm-otn-common-types:ODU2e',
259 'monitoring-mode': 'terminated'}
261 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
262 response['interface'][0])
263 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
265 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
266 self.assertDictEqual(
267 {'payload-type': '03', 'exp-payload-type': '03'},
268 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
270 def test_14_check_interface_ODU2E_NETWORK(self):
271 response = test_utils_rfc8040.check_node_attribute_request(
272 "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
273 self.assertEqual(response['status_code'], requests.codes.ok)
274 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
275 'supporting-circuit-pack-name': 'CP1-CFP0',
276 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
277 'type': 'org-openroadm-interfaces:otnOdu',
278 'supporting-port': 'CP1-CFP0-P1'}
280 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
281 'rate': 'org-openroadm-otn-common-types:ODU2e',
282 'monitoring-mode': 'monitored'}
284 input_dict_3 = {'trib-port-number': 1}
286 self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
287 response['interface'][0])
288 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
290 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
291 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
292 'parent-odu-allocation'], **input_dict_3
294 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
295 'parent-odu-allocation'])
297 response['interface'][0][
298 'org-openroadm-otn-odu-interfaces:odu'][
299 'parent-odu-allocation']['trib-slots'])
301 def test_15_check_ODU2E_connection(self):
302 response = test_utils_rfc8040.check_node_attribute_request(
303 "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
304 self.assertEqual(response['status_code'], requests.codes.ok)
307 'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
308 'direction': 'bidirectional'
311 self.assertDictEqual(dict(response['odu-connection'][0], **input_dict_1),
312 response['odu-connection'][0])
313 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'},
314 response['odu-connection'][0]['destination'])
315 self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'},
316 response['odu-connection'][0]['source'])
318 def test_16_otn_service_path_delete_10GE(self):
319 response = test_utils_rfc8040.transportpce_api_rpc_request(
320 'transportpce-device-renderer', 'otn-service-path',
322 'service-name': 'service1',
323 'operation': 'delete',
324 'service-rate': '10',
325 'service-format': 'Ethernet',
326 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
327 'ethernet-encoding': 'eth encode',
329 'trib-port-number': '1'
331 self.assertEqual(response['status_code'], requests.codes.ok)
332 self.assertIn('Request processed', response['output']['result'])
333 self.assertTrue(response['output']['success'])
335 def test_17_check_no_ODU2E_connection(self):
336 response = test_utils_rfc8040.check_node_attribute_request(
337 "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
338 self.assertEqual(response['status_code'], requests.codes.conflict)
340 def test_18_check_no_interface_ODU2E_NETWORK(self):
341 response = test_utils_rfc8040.check_node_attribute_request(
342 "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
343 self.assertEqual(response['status_code'], requests.codes.conflict)
345 def test_19_check_no_interface_ODU2E_CLIENT(self):
346 response = test_utils_rfc8040.check_node_attribute_request(
347 "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
348 self.assertEqual(response['status_code'], requests.codes.conflict)
350 def test_20_check_no_interface_10GE_CLIENT(self):
351 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
352 self.assertEqual(response['status_code'], requests.codes.conflict)
354 def test_21_otn_service_path_delete_ODU4(self):
355 response = test_utils_rfc8040.transportpce_api_rpc_request(
356 'transportpce-device-renderer', 'otn-service-path',
358 'service-name': 'service_ODU4',
359 'operation': 'delete',
360 'service-rate': '100',
361 'service-format': 'ODU',
362 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
364 self.assertEqual(response['status_code'], requests.codes.ok)
365 self.assertIn('Request processed', response['output']['result'])
366 self.assertTrue(response['output']['success'])
368 def test_22_check_no_interface_ODU4(self):
369 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
370 self.assertEqual(response['status_code'], requests.codes.conflict)
372 def test_23_service_path_delete_OCH_OTU4(self):
373 response = test_utils_rfc8040.transportpce_api_rpc_request(
374 'transportpce-device-renderer', 'service-path',
376 'service-name': 'service_test',
378 'modulation-format': 'dp-qpsk',
379 'operation': 'delete',
380 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
381 'center-freq': 196.1,
385 'lower-spectral-slot-number': 761,
386 'higher-spectral-slot-number': 768
388 self.assertEqual(response['status_code'], requests.codes.ok)
389 self.assertIn('Request processed', response['output']['result'])
390 self.assertTrue(response['output']['success'])
392 def test_24_check_no_interface_OTU4(self):
393 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
394 self.assertEqual(response['status_code'], requests.codes.conflict)
396 def test_25_check_no_interface_OCH(self):
397 response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1")
398 self.assertEqual(response['status_code'], requests.codes.conflict)
400 def test_26_disconnect_SPDR_SA1(self):
401 response = test_utils_rfc8040.unmount_device("SPDR-SA1")
402 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
405 if __name__ == "__main__":
406 unittest.main(verbosity=2)