fix functional tests linelength pylint issues
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 from common import test_utils
19
20
21 class TransportPCEtesting(unittest.TestCase):
22
23     processes = None
24
25     @classmethod
26     def setUpClass(cls):
27         cls.processes = test_utils.start_tpce()
28         cls.processes = test_utils.start_sims(['spdra'])
29
30     @classmethod
31     def tearDownClass(cls):
32         for process in cls.processes:
33             test_utils.shutdown_process(process)
34         print("all processes killed")
35
36     def setUp(self):
37         time.sleep(5)
38
39     def test_01_connect_SPDR_SA1(self):
40         response = test_utils.mount_device("SPDR-SA1", 'spdra')
41         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42         time.sleep(10)
43
44         response = test_utils.get_netconf_oper_request("SPDR-SA1")
45         self.assertEqual(response.status_code, requests.codes.ok)
46         res = response.json()
47         self.assertEqual(
48             res['node'][0]['netconf-node-topology:connection-status'],
49             'connected')
50
51     def test_02_get_portmapping_CLIENT4(self):
52         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
53         self.assertEqual(response.status_code, requests.codes.ok)
54         res_mapping = (response.json())['mapping'][0]
55         self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
56         self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
57         self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
58         self.assertEqual('bidirectional', res_mapping['port-direction'])
59         self.assertEqual('xpdr-client', res_mapping['port-qual'])
60         self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
61         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
62         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
63         self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
64
65     def test_03_get_portmapping_NETWORK1(self):
66         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
67         self.assertEqual(response.status_code, requests.codes.ok)
68         res = response.json()
69         self.assertIn(
70             {"logical-connection-point": "XPDR1-NETWORK1",
71              "supporting-port": "CP1-CFP0-P1",
72              "supported-interface-capability": [
73                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
74              ],
75                 "port-direction": "bidirectional",
76                 "port-qual": "xpdr-network",
77                 "supporting-circuit-pack-name": "CP1-CFP0",
78                 "xponder-type": "mpdr",
79              'lcp-hash-val': 'Swfw02qXGyI='},
80             res['mapping'])
81
82     def test_04_service_path_create_OCH_OTU4(self):
83         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
84                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
85         time.sleep(3)
86         self.assertEqual(response.status_code, requests.codes.ok)
87         res = response.json()
88         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
89         self.assertTrue(res["output"]["success"])
90         self.assertIn(
91             {'node-id': 'SPDR-SA1',
92              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
93              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
94
95     def test_05_get_portmapping_NETWORK1(self):
96         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
97         self.assertEqual(response.status_code, requests.codes.ok)
98         res = response.json()
99         self.assertIn(
100             {"logical-connection-point": "XPDR1-NETWORK1",
101              "supporting-port": "CP1-CFP0-P1",
102              "supported-interface-capability": [
103                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
104              ],
105                 "port-direction": "bidirectional",
106                 "port-qual": "xpdr-network",
107                 "supporting-circuit-pack-name": "CP1-CFP0",
108                 "xponder-type": "mpdr",
109                 "lcp-hash-val": "Swfw02qXGyI="},
110             res['mapping'])
111
112     def test_06_check_interface_och(self):
113         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
114         self.assertEqual(response.status_code, requests.codes.ok)
115         res = response.json()
116
117         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
118                                                           'administrative-state': 'inService',
119                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
120                                                           'type': 'org-openroadm-interfaces:opticalChannel',
121                                                           'supporting-port': 'CP1-CFP0-P1'
122                                                           }),
123                              res['interface'][0])
124
125         self.assertDictEqual(
126             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
127              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
128             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
129
130     def test_07_check_interface_OTU(self):
131         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
132         self.assertEqual(response.status_code, requests.codes.ok)
133         res = response.json()
134         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
135                         'administrative-state': 'inService',
136                         'supporting-circuit-pack-name': 'CP1-CFP0',
137                         'supporting-interface': 'XPDR1-NETWORK1-1',
138                         'type': 'org-openroadm-interfaces:otnOtu',
139                         'supporting-port': 'CP1-CFP0-P1'
140                         }
141
142         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
143                         'expected-sapi': 'Swfw02qXGyI=',
144                         'tx-sapi': 'Swfw02qXGyI=',
145                         'expected-dapi': 'Swfw02qXGyI=',
146                         'rate': 'org-openroadm-otn-common-types:OTU4',
147                         'fec': 'scfec'
148                         }
149
150         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
151                              res['interface'][0])
152
153         self.assertDictEqual(input_dict_2,
154                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
155
156     def test_08_otn_service_path_create_ODU4(self):
157         response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
158                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
159         time.sleep(3)
160         self.assertEqual(response.status_code, requests.codes.ok)
161         res = response.json()
162         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
163         self.assertTrue(res["output"]["success"])
164         self.assertIn(
165             {'node-id': 'SPDR-SA1',
166              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
167
168     def test_09_get_portmapping_NETWORK1(self):
169         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
170         self.assertEqual(response.status_code, requests.codes.ok)
171         res = response.json()
172         self.assertIn(
173             {"logical-connection-point": "XPDR1-NETWORK1",
174              "supporting-port": "CP1-CFP0-P1",
175              "supported-interface-capability": [
176                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
177              ],
178                 "port-direction": "bidirectional",
179                 "port-qual": "xpdr-network",
180                 "supporting-circuit-pack-name": "CP1-CFP0",
181                 "xponder-type": "mpdr",
182                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
183                 "lcp-hash-val": "Swfw02qXGyI="
184              },
185             res['mapping'])
186
187     def test_10_check_interface_ODU4(self):
188         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
192                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
193                         'type': 'org-openroadm-interfaces:otnOdu',
194                         'supporting-port': 'CP1-CFP0-P1'}
195         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
196                         'rate': 'org-openroadm-otn-common-types:ODU4',
197                         'expected-dapi': 'Swfw02qXGyI=',
198                         'expected-sapi': 'Swfw02qXGyI=',
199                         'tx-dapi': 'Swfw02qXGyI=',
200                         'tx-sapi': 'Swfw02qXGyI='}
201
202         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
203                              res['interface'][0])
204         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
205                                   **input_dict_2
206                                   ),
207                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
208                              )
209         self.assertDictEqual(
210             {u'payload-type': u'21', u'exp-payload-type': u'21'},
211             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
212
213     def test_11_otn_service_path_create_10GE(self):
214         response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
215                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
216                                                            "network-tp": "XPDR1-NETWORK1"}],
217                                                        {"ethernet-encoding": "eth encode",
218                                                         "trib-slot": ["1"], "trib-port-number": "1"})
219         time.sleep(3)
220         self.assertEqual(response.status_code, requests.codes.ok)
221         res = response.json()
222         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
223         self.assertTrue(res["output"]["success"])
224         self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
225         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
226                       res["output"]['node-interface'][0]['connection-id'])
227         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
228         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
229         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
230
231     def test_12_check_interface_10GE_CLIENT(self):
232         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
233         self.assertEqual(response.status_code, requests.codes.ok)
234         res = response.json()
235         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
236                       'administrative-state': 'inService',
237                       'supporting-circuit-pack-name': 'CP1-SFP4',
238                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
239                       'supporting-port': 'CP1-SFP4-P1'
240                       }
241         self.assertDictEqual(dict(res['interface'][0], **input_dict),
242                              res['interface'][0])
243         self.assertDictEqual(
244             {u'speed': 10000},
245             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
246
247     def test_13_check_interface_ODU2E_CLIENT(self):
248         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
249         self.assertEqual(response.status_code, requests.codes.ok)
250         res = response.json()
251
252         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
253                         'administrative-state': 'inService',
254                         'supporting-circuit-pack-name': 'CP1-SFP4',
255                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
256                         'type': 'org-openroadm-interfaces:otnOdu',
257                         'supporting-port': 'CP1-SFP4-P1'}
258         input_dict_2 = {
259             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
260             'rate': 'org-openroadm-otn-common-types:ODU2e',
261             'monitoring-mode': 'terminated'}
262
263         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
264                              res['interface'][0])
265         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
266                                   **input_dict_2),
267                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
268         self.assertDictEqual(
269             {u'payload-type': u'03', u'exp-payload-type': u'03'},
270             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
271
272     def test_14_check_interface_ODU2E_NETWORK(self):
273         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
274         self.assertEqual(response.status_code, requests.codes.ok)
275         res = response.json()
276         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
277                         'supporting-circuit-pack-name': 'CP1-CFP0',
278                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
279                         'type': 'org-openroadm-interfaces:otnOdu',
280                         'supporting-port': 'CP1-CFP0-P1'}
281         input_dict_2 = {
282             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
283             'rate': 'org-openroadm-otn-common-types:ODU2e',
284             'monitoring-mode': 'monitored'}
285
286         input_dict_3 = {'trib-port-number': 1}
287
288         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
289                              res['interface'][0])
290         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
291                                   **input_dict_2),
292                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
293         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
294             'parent-odu-allocation'], **input_dict_3
295         ),
296             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
297             'parent-odu-allocation'])
298         self.assertIn(1,
299                       res['interface'][0][
300                           'org-openroadm-otn-odu-interfaces:odu'][
301                           'parent-odu-allocation']['trib-slots'])
302
303     def test_15_check_ODU2E_connection(self):
304         response = test_utils.check_netconf_node_request(
305             "SPDR-SA1",
306             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
307         self.assertEqual(response.status_code, requests.codes.ok)
308         res = response.json()
309         input_dict_1 = {
310             'connection-name':
311             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
312             'direction': 'bidirectional'
313         }
314
315         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
316                              res['odu-connection'][0])
317         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
318                              res['odu-connection'][0]['destination'])
319         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
320                              res['odu-connection'][0]['source'])
321
322     def test_16_otn_service_path_delete_10GE(self):
323         response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
324                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
325                                                            "network-tp": "XPDR1-NETWORK1"}],
326                                                        {"ethernet-encoding": "eth encode",
327                                                         "trib-slot": ["1"], "trib-port-number": "1"})
328         time.sleep(3)
329         self.assertEqual(response.status_code, requests.codes.ok)
330         res = response.json()
331         self.assertIn('Request processed', res["output"]["result"])
332         self.assertTrue(res["output"]["success"])
333
334     def test_17_check_no_ODU2E_connection(self):
335         response = test_utils.check_netconf_node_request(
336             "SPDR-SA1",
337             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
338         self.assertEqual(response.status_code, requests.codes.conflict)
339
340     def test_18_check_no_interface_ODU2E_NETWORK(self):
341         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
342         self.assertEqual(response.status_code, requests.codes.conflict)
343
344     def test_19_check_no_interface_ODU2E_CLIENT(self):
345         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
346         self.assertEqual(response.status_code, requests.codes.conflict)
347
348     def test_20_check_no_interface_10GE_CLIENT(self):
349         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
350         self.assertEqual(response.status_code, requests.codes.conflict)
351
352     def test_21_otn_service_path_delete_ODU4(self):
353         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
354                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
355         time.sleep(3)
356         self.assertEqual(response.status_code, requests.codes.ok)
357         res = response.json()
358         self.assertIn('Request processed', res["output"]["result"])
359         self.assertTrue(res["output"]["success"])
360
361     def test_22_check_no_interface_ODU4(self):
362         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
363         self.assertEqual(response.status_code, requests.codes.conflict)
364
365     def test_23_service_path_delete_OCH_OTU4(self):
366         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
367                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
368         time.sleep(3)
369         self.assertEqual(response.status_code, requests.codes.ok)
370         res = response.json()
371         self.assertIn('Request processed', res["output"]["result"])
372         self.assertTrue(res["output"]["success"])
373
374     def test_24_check_no_interface_OTU4(self):
375         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
376         self.assertEqual(response.status_code, requests.codes.conflict)
377
378     def test_25_check_no_interface_OCH(self):
379         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
380         self.assertEqual(response.status_code, requests.codes.conflict)
381
382     def test_26_disconnect_SPDR_SA1(self):
383         response = test_utils.unmount_device("SPDR-SA1")
384         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
385
386
387 if __name__ == "__main__":
388     unittest.main(verbosity=2)