Refactor func tests transportpce api rpc calls 1/2
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test07_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 # pylint: disable=wrong-import-order
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040  # nopep8
24
25
26 class TransportPCEtesting(unittest.TestCase):
27
28     processes = None
29     NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
30                            "supporting-port": "CP1-CFP0-P1",
31                            "supported-interface-capability": [
32                                "org-openroadm-port-types:if-OCH-OTU4-ODU4"
33                            ],
34                            "port-direction": "bidirectional",
35                            "port-qual": "xpdr-network",
36                            "supporting-circuit-pack-name": "CP1-CFP0",
37                            "xponder-type": "mpdr",
38                            'lcp-hash-val': 'Swfw02qXGyI=',
39                            'port-admin-state': 'InService',
40                            'port-oper-state': 'InService'}
41     NODE_VERSION = '2.2.1'
42
43     @classmethod
44     def setUpClass(cls):
45         cls.processes = test_utils_rfc8040.start_tpce()
46         cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION)])
47
48     @classmethod
49     def tearDownClass(cls):
50         # pylint: disable=not-an-iterable
51         for process in cls.processes:
52             test_utils_rfc8040.shutdown_process(process)
53         print("all processes killed")
54
55     def setUp(self):
56         time.sleep(5)
57
58     def test_01_connect_SPDR_SA1(self):
59         response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
60         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
61         time.sleep(10)
62
63         response = test_utils_rfc8040.check_device_connection("SPDR-SA1")
64         self.assertEqual(response['status_code'], requests.codes.ok)
65         self.assertEqual(response['connection-status'], 'connected')
66
67     def test_02_get_portmapping_CLIENT4(self):
68         response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-CLIENT4")
69         self.assertEqual(response['status_code'], requests.codes.ok)
70         res_mapping = response['mapping'][0]
71         self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
72         self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
73         self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
74         self.assertEqual('bidirectional', res_mapping['port-direction'])
75         self.assertEqual('xpdr-client', res_mapping['port-qual'])
76         self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
77         self.assertEqual('InService', res_mapping['port-admin-state'])
78         self.assertEqual('InService', res_mapping['port-oper-state'])
79         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
80         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
81         self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
82
83     def test_03_get_portmapping_NETWORK1(self):
84         response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
85         self.assertEqual(response['status_code'], requests.codes.ok)
86         self.assertIn(
87             self.NETWORK1_CHECK_DICT,
88             response['mapping'])
89
90     def test_04_service_path_create_OCH_OTU4(self):
91         response = test_utils_rfc8040.transportpce_api_rpc_request(
92             'transportpce-device-renderer', 'service-path',
93             {
94                 'service-name': 'service_test',
95                 'wave-number': '7',
96                 'modulation-format': 'dp-qpsk',
97                 'operation': 'create',
98                 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
99                 'center-freq': 196.1,
100                 'nmc-width': 40,
101                 'min-freq': 196.075,
102                 'max-freq': 196.125,
103                 'lower-spectral-slot-number': 761,
104                 'higher-spectral-slot-number': 768
105             })
106         self.assertEqual(response['status_code'], requests.codes.ok)
107         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
108         self.assertTrue(response['output']['success'])
109         self.assertIn(
110             {'node-id': 'SPDR-SA1',
111              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
112              'och-interface-id': ['XPDR1-NETWORK1-761:768']}, response['output']['node-interface'])
113
114     def test_05_get_portmapping_NETWORK1(self):
115         response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
116         self.assertEqual(response['status_code'], requests.codes.ok)
117         self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
118         self.assertIn(
119             self.NETWORK1_CHECK_DICT,
120             response['mapping'])
121
122     def test_06_check_interface_och(self):
123         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-761:768")
124         self.assertEqual(response['status_code'], requests.codes.ok)
125
126         self.assertDictEqual(dict(response['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
127                                                                'administrative-state': 'inService',
128                                                                'supporting-circuit-pack-name': 'CP1-CFP0',
129                                                                'type': 'org-openroadm-interfaces:opticalChannel',
130                                                                'supporting-port': 'CP1-CFP0-P1'
131                                                                }),
132                              response['interface'][0])
133
134         self.assertIn(
135             response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
136             [{'frequency': '196.1000', 'rate': 'org-openroadm-common-types:R100G',
137               'transmit-power': '-5', 'modulation-format': 'dp-qpsk'},
138              {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
139               'transmit-power': -5, 'modulation-format': 'dp-qpsk'}])
140
141     def test_07_check_interface_OTU(self):
142         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
143         self.assertEqual(response['status_code'], requests.codes.ok)
144         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
145                         'administrative-state': 'inService',
146                         'supporting-circuit-pack-name': 'CP1-CFP0',
147                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
148                         'type': 'org-openroadm-interfaces:otnOtu',
149                         'supporting-port': 'CP1-CFP0-P1'
150                         }
151
152         input_dict_2 = {'rate': 'org-openroadm-otn-common-types:OTU4',
153                         'fec': 'scfec'
154                         }
155
156         self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
157                              response['interface'][0])
158
159         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'], **input_dict_2),
160                              response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
161
162     def test_08_otn_service_path_create_ODU4(self):
163         response = test_utils_rfc8040.transportpce_api_rpc_request(
164             'transportpce-device-renderer', 'otn-service-path',
165             {
166                 'service-name': 'service_ODU4',
167                 'operation': 'create',
168                 'service-rate': '100',
169                 'service-format': 'ODU',
170                 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
171             })
172         self.assertEqual(response['status_code'], requests.codes.ok)
173         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
174         self.assertTrue(response['output']['success'])
175         self.assertIn(
176             {'node-id': 'SPDR-SA1',
177              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, response['output']['node-interface'])
178
179     def test_09_get_portmapping_NETWORK1(self):
180         response = test_utils_rfc8040.portmapping_request("SPDR-SA1", "XPDR1-NETWORK1")
181         self.assertEqual(response['status_code'], requests.codes.ok)
182         self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
183         self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
184         self.assertIn(
185             self.NETWORK1_CHECK_DICT,
186             response['mapping'])
187
188     def test_10_check_interface_ODU4(self):
189         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
190         self.assertEqual(response['status_code'], requests.codes.ok)
191         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
192                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
193                         'type': 'org-openroadm-interfaces:otnOdu',
194                         'supporting-port': 'CP1-CFP0-P1'}
195         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
196                         'rate': 'org-openroadm-otn-common-types:ODU4'}
197
198         self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
199                              response['interface'][0])
200         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
201                                   **input_dict_2
202                                   ),
203                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
204                              )
205         self.assertDictEqual(
206             {'payload-type': '21', 'exp-payload-type': '21'},
207             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
208
209     def test_11_otn_service_path_create_10GE(self):
210         response = test_utils_rfc8040.transportpce_api_rpc_request(
211             'transportpce-device-renderer', 'otn-service-path',
212             {
213                 'service-name': 'service1',
214                 'operation': 'create',
215                 'service-rate': '10',
216                 'service-format': 'Ethernet',
217                 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
218                 'ethernet-encoding': 'eth encode',
219                 'trib-slot': ['1'],
220                 'trib-port-number': '1'
221             })
222         self.assertEqual(response['status_code'], requests.codes.ok)
223         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', response['output']['result'])
224         self.assertTrue(response['output']['success'])
225         self.assertEqual('SPDR-SA1', response['output']['node-interface'][0]['node-id'])
226         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
227                       response['output']['node-interface'][0]['connection-id'])
228         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', response['output']['node-interface'][0]['eth-interface-id'])
229         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
230         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', response['output']['node-interface'][0]['odu-interface-id'])
231
232     def test_12_check_interface_10GE_CLIENT(self):
233         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
234         self.assertEqual(response['status_code'], requests.codes.ok)
235         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
236                       'administrative-state': 'inService',
237                       'supporting-circuit-pack-name': 'CP1-SFP4',
238                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
239                       'supporting-port': 'CP1-SFP4-P1'
240                       }
241         self.assertDictEqual(dict(response['interface'][0], **input_dict),
242                              response['interface'][0])
243         self.assertEqual(10000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
244
245     def test_13_check_interface_ODU2E_CLIENT(self):
246         response = test_utils_rfc8040.check_node_attribute_request(
247             "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
248         self.assertEqual(response['status_code'], requests.codes.ok)
249
250         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
251                         'administrative-state': 'inService',
252                         'supporting-circuit-pack-name': 'CP1-SFP4',
253                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
254                         'type': 'org-openroadm-interfaces:otnOdu',
255                         'supporting-port': 'CP1-SFP4-P1'}
256         input_dict_2 = {
257             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
258             'rate': 'org-openroadm-otn-common-types:ODU2e',
259             'monitoring-mode': 'terminated'}
260
261         self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
262                              response['interface'][0])
263         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
264                                   **input_dict_2),
265                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
266         self.assertDictEqual(
267             {'payload-type': '03', 'exp-payload-type': '03'},
268             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
269
270     def test_14_check_interface_ODU2E_NETWORK(self):
271         response = test_utils_rfc8040.check_node_attribute_request(
272             "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
273         self.assertEqual(response['status_code'], requests.codes.ok)
274         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
275                         'supporting-circuit-pack-name': 'CP1-CFP0',
276                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
277                         'type': 'org-openroadm-interfaces:otnOdu',
278                         'supporting-port': 'CP1-CFP0-P1'}
279         input_dict_2 = {
280             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
281             'rate': 'org-openroadm-otn-common-types:ODU2e',
282             'monitoring-mode': 'monitored'}
283
284         input_dict_3 = {'trib-port-number': 1}
285
286         self.assertDictEqual(dict(response['interface'][0], **input_dict_1),
287                              response['interface'][0])
288         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
289                                   **input_dict_2),
290                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
291         self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
292             'parent-odu-allocation'], **input_dict_3
293         ),
294             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
295             'parent-odu-allocation'])
296         self.assertIn(1,
297                       response['interface'][0][
298                           'org-openroadm-otn-odu-interfaces:odu'][
299                           'parent-odu-allocation']['trib-slots'])
300
301     def test_15_check_ODU2E_connection(self):
302         response = test_utils_rfc8040.check_node_attribute_request(
303             "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
304         self.assertEqual(response['status_code'], requests.codes.ok)
305         input_dict_1 = {
306             'connection-name':
307             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
308             'direction': 'bidirectional'
309         }
310
311         self.assertDictEqual(dict(response['odu-connection'][0], **input_dict_1),
312                              response['odu-connection'][0])
313         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'},
314                              response['odu-connection'][0]['destination'])
315         self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'},
316                              response['odu-connection'][0]['source'])
317
318     def test_16_otn_service_path_delete_10GE(self):
319         response = test_utils_rfc8040.transportpce_api_rpc_request(
320             'transportpce-device-renderer', 'otn-service-path',
321             {
322                 'service-name': 'service1',
323                 'operation': 'delete',
324                 'service-rate': '10',
325                 'service-format': 'Ethernet',
326                 'nodes': [{'node-id': 'SPDR-SA1', 'client-tp': 'XPDR1-CLIENT4', 'network-tp': 'XPDR1-NETWORK1'}],
327                 'ethernet-encoding': 'eth encode',
328                 'trib-slot': ['1'],
329                 'trib-port-number': '1'
330             })
331         self.assertEqual(response['status_code'], requests.codes.ok)
332         self.assertIn('Request processed', response['output']['result'])
333         self.assertTrue(response['output']['success'])
334
335     def test_17_check_no_ODU2E_connection(self):
336         response = test_utils_rfc8040.check_node_attribute_request(
337             "SPDR-SA1", "odu-connection", "XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
338         self.assertEqual(response['status_code'], requests.codes.conflict)
339
340     def test_18_check_no_interface_ODU2E_NETWORK(self):
341         response = test_utils_rfc8040.check_node_attribute_request(
342             "SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU2e-service1")
343         self.assertEqual(response['status_code'], requests.codes.conflict)
344
345     def test_19_check_no_interface_ODU2E_CLIENT(self):
346         response = test_utils_rfc8040.check_node_attribute_request(
347             "SPDR-SA1", "interface", "XPDR1-CLIENT4-ODU2e-service1")
348         self.assertEqual(response['status_code'], requests.codes.conflict)
349
350     def test_20_check_no_interface_10GE_CLIENT(self):
351         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-CLIENT4-ETHERNET10G")
352         self.assertEqual(response['status_code'], requests.codes.conflict)
353
354     def test_21_otn_service_path_delete_ODU4(self):
355         response = test_utils_rfc8040.transportpce_api_rpc_request(
356             'transportpce-device-renderer', 'otn-service-path',
357             {
358                 'service-name': 'service_ODU4',
359                 'operation': 'delete',
360                 'service-rate': '100',
361                 'service-format': 'ODU',
362                 'nodes': [{'node-id': 'SPDR-SA1', 'network-tp': 'XPDR1-NETWORK1'}]
363             })
364         self.assertEqual(response['status_code'], requests.codes.ok)
365         self.assertIn('Request processed', response['output']['result'])
366         self.assertTrue(response['output']['success'])
367
368     def test_22_check_no_interface_ODU4(self):
369         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-ODU4")
370         self.assertEqual(response['status_code'], requests.codes.conflict)
371
372     def test_23_service_path_delete_OCH_OTU4(self):
373         response = test_utils_rfc8040.transportpce_api_rpc_request(
374             'transportpce-device-renderer', 'service-path',
375             {
376                 'service-name': 'service_test',
377                 'wave-number': '7',
378                 'modulation-format': 'dp-qpsk',
379                 'operation': 'delete',
380                 'nodes': [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
381                 'center-freq': 196.1,
382                 'nmc-width': 40,
383                 'min-freq': 196.075,
384                 'max-freq': 196.125,
385                 'lower-spectral-slot-number': 761,
386                 'higher-spectral-slot-number': 768
387             })
388         self.assertEqual(response['status_code'], requests.codes.ok)
389         self.assertIn('Request processed', response['output']['result'])
390         self.assertTrue(response['output']['success'])
391
392     def test_24_check_no_interface_OTU4(self):
393         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-OTU")
394         self.assertEqual(response['status_code'], requests.codes.conflict)
395
396     def test_25_check_no_interface_OCH(self):
397         response = test_utils_rfc8040.check_node_attribute_request("SPDR-SA1", "interface", "XPDR1-NETWORK1-1")
398         self.assertEqual(response['status_code'], requests.codes.conflict)
399
400     def test_26_disconnect_SPDR_SA1(self):
401         response = test_utils_rfc8040.unmount_device("SPDR-SA1")
402         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
403
404
405 if __name__ == "__main__":
406     unittest.main(verbosity=2)