20356b2976ae3dc3928205db916f017fdc35742a
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 from common import test_utils
19
20
21 class TransportPCEtesting(unittest.TestCase):
22
23     processes = None
24
25     @classmethod
26     def setUpClass(cls):
27         cls.processes = test_utils.start_tpce()
28         cls.processes = test_utils.start_sims(['spdra'])
29
30     @classmethod
31     def tearDownClass(cls):
32         # pylint: disable=not-an-iterable
33         for process in cls.processes:
34             test_utils.shutdown_process(process)
35         print("all processes killed")
36
37     def setUp(self):
38         time.sleep(5)
39
40     def test_01_connect_SPDR_SA1(self):
41         response = test_utils.mount_device("SPDR-SA1", 'spdra')
42         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
43         time.sleep(10)
44
45         response = test_utils.get_netconf_oper_request("SPDR-SA1")
46         self.assertEqual(response.status_code, requests.codes.ok)
47         res = response.json()
48         self.assertEqual(
49             res['node'][0]['netconf-node-topology:connection-status'],
50             'connected')
51
52     def test_02_get_portmapping_CLIENT4(self):
53         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
54         self.assertEqual(response.status_code, requests.codes.ok)
55         res_mapping = (response.json())['mapping'][0]
56         self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
57         self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
58         self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
59         self.assertEqual('bidirectional', res_mapping['port-direction'])
60         self.assertEqual('xpdr-client', res_mapping['port-qual'])
61         self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
62         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
63         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
64         self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
65
66     def test_03_get_portmapping_NETWORK1(self):
67         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
68         self.assertEqual(response.status_code, requests.codes.ok)
69         res = response.json()
70         self.assertIn(
71             {"logical-connection-point": "XPDR1-NETWORK1",
72              "supporting-port": "CP1-CFP0-P1",
73              "supported-interface-capability": [
74                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
75              ],
76                 "port-direction": "bidirectional",
77                 "port-qual": "xpdr-network",
78                 "supporting-circuit-pack-name": "CP1-CFP0",
79                 "xponder-type": "mpdr",
80              'lcp-hash-val': 'Swfw02qXGyI='},
81             res['mapping'])
82
83     def test_04_service_path_create_OCH_OTU4(self):
84         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
85                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
86                                                    196.1, 40, 196.075, 196.125, 761,
87                                                     768)
88         time.sleep(3)
89         self.assertEqual(response.status_code, requests.codes.ok)
90         res = response.json()
91         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
92         self.assertTrue(res["output"]["success"])
93         self.assertIn(
94             {'node-id': 'SPDR-SA1',
95              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
96              'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
97
98     def test_05_get_portmapping_NETWORK1(self):
99         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
100         self.assertEqual(response.status_code, requests.codes.ok)
101         res = response.json()
102         self.assertIn(
103             {"logical-connection-point": "XPDR1-NETWORK1",
104              "supporting-port": "CP1-CFP0-P1",
105              "supported-interface-capability": [
106                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
107              ],
108                 "port-direction": "bidirectional",
109                 "port-qual": "xpdr-network",
110                 "supporting-circuit-pack-name": "CP1-CFP0",
111                 "xponder-type": "mpdr",
112                 "lcp-hash-val": "Swfw02qXGyI="},
113             res['mapping'])
114
115     def test_06_check_interface_och(self):
116         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
117         self.assertEqual(response.status_code, requests.codes.ok)
118         res = response.json()
119
120         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
121                                                           'administrative-state': 'inService',
122                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
123                                                           'type': 'org-openroadm-interfaces:opticalChannel',
124                                                           'supporting-port': 'CP1-CFP0-P1'
125                                                           }),
126                              res['interface'][0])
127
128         self.assertDictEqual(
129             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
130              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
131             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
132
133     def test_07_check_interface_OTU(self):
134         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
135         self.assertEqual(response.status_code, requests.codes.ok)
136         res = response.json()
137         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
138                         'administrative-state': 'inService',
139                         'supporting-circuit-pack-name': 'CP1-CFP0',
140                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
141                         'type': 'org-openroadm-interfaces:otnOtu',
142                         'supporting-port': 'CP1-CFP0-P1'
143                         }
144
145         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
146                         'expected-sapi': 'Swfw02qXGyI=',
147                         'tx-sapi': 'Swfw02qXGyI=',
148                         'expected-dapi': 'Swfw02qXGyI=',
149                         'rate': 'org-openroadm-otn-common-types:OTU4',
150                         'fec': 'scfec'
151                         }
152
153         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
154                              res['interface'][0])
155
156         self.assertDictEqual(input_dict_2,
157                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
158
159     def test_08_otn_service_path_create_ODU4(self):
160         response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
161                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
162         time.sleep(3)
163         self.assertEqual(response.status_code, requests.codes.ok)
164         res = response.json()
165         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
166         self.assertTrue(res["output"]["success"])
167         self.assertIn(
168             {'node-id': 'SPDR-SA1',
169              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
170
171     def test_09_get_portmapping_NETWORK1(self):
172         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
173         self.assertEqual(response.status_code, requests.codes.ok)
174         res = response.json()
175         self.assertIn(
176             {"logical-connection-point": "XPDR1-NETWORK1",
177              "supporting-port": "CP1-CFP0-P1",
178              "supported-interface-capability": [
179                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
180              ],
181                 "port-direction": "bidirectional",
182                 "port-qual": "xpdr-network",
183                 "supporting-circuit-pack-name": "CP1-CFP0",
184                 "xponder-type": "mpdr",
185                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
186                 "lcp-hash-val": "Swfw02qXGyI="
187              },
188             res['mapping'])
189
190     def test_10_check_interface_ODU4(self):
191         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
192         self.assertEqual(response.status_code, requests.codes.ok)
193         res = response.json()
194         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
195                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
196                         'type': 'org-openroadm-interfaces:otnOdu',
197                         'supporting-port': 'CP1-CFP0-P1'}
198         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
199                         'rate': 'org-openroadm-otn-common-types:ODU4',
200                         'expected-dapi': 'Swfw02qXGyI=',
201                         'expected-sapi': 'Swfw02qXGyI=',
202                         'tx-dapi': 'Swfw02qXGyI=',
203                         'tx-sapi': 'Swfw02qXGyI='}
204
205         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
206                              res['interface'][0])
207         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
208                                   **input_dict_2
209                                   ),
210                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
211                              )
212         self.assertDictEqual(
213             {u'payload-type': u'21', u'exp-payload-type': u'21'},
214             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
215
216     def test_11_otn_service_path_create_10GE(self):
217         response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
218                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
219                                                            "network-tp": "XPDR1-NETWORK1"}],
220                                                        {"ethernet-encoding": "eth encode",
221                                                         "trib-slot": ["1"], "trib-port-number": "1"})
222         time.sleep(3)
223         self.assertEqual(response.status_code, requests.codes.ok)
224         res = response.json()
225         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
226         self.assertTrue(res["output"]["success"])
227         self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
228         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
229                       res["output"]['node-interface'][0]['connection-id'])
230         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
231         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
232         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
233
234     def test_12_check_interface_10GE_CLIENT(self):
235         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
236         self.assertEqual(response.status_code, requests.codes.ok)
237         res = response.json()
238         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
239                       'administrative-state': 'inService',
240                       'supporting-circuit-pack-name': 'CP1-SFP4',
241                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
242                       'supporting-port': 'CP1-SFP4-P1'
243                       }
244         self.assertDictEqual(dict(res['interface'][0], **input_dict),
245                              res['interface'][0])
246         self.assertDictEqual(
247             {u'speed': 10000},
248             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
249
250     def test_13_check_interface_ODU2E_CLIENT(self):
251         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
252         self.assertEqual(response.status_code, requests.codes.ok)
253         res = response.json()
254
255         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
256                         'administrative-state': 'inService',
257                         'supporting-circuit-pack-name': 'CP1-SFP4',
258                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
259                         'type': 'org-openroadm-interfaces:otnOdu',
260                         'supporting-port': 'CP1-SFP4-P1'}
261         input_dict_2 = {
262             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
263             'rate': 'org-openroadm-otn-common-types:ODU2e',
264             'monitoring-mode': 'terminated'}
265
266         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
267                              res['interface'][0])
268         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
269                                   **input_dict_2),
270                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
271         self.assertDictEqual(
272             {u'payload-type': u'03', u'exp-payload-type': u'03'},
273             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
274
275     def test_14_check_interface_ODU2E_NETWORK(self):
276         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
277         self.assertEqual(response.status_code, requests.codes.ok)
278         res = response.json()
279         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
280                         'supporting-circuit-pack-name': 'CP1-CFP0',
281                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
282                         'type': 'org-openroadm-interfaces:otnOdu',
283                         'supporting-port': 'CP1-CFP0-P1'}
284         input_dict_2 = {
285             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
286             'rate': 'org-openroadm-otn-common-types:ODU2e',
287             'monitoring-mode': 'monitored'}
288
289         input_dict_3 = {'trib-port-number': 1}
290
291         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
292                              res['interface'][0])
293         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
294                                   **input_dict_2),
295                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
296         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
297             'parent-odu-allocation'], **input_dict_3
298         ),
299             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
300             'parent-odu-allocation'])
301         self.assertIn(1,
302                       res['interface'][0][
303                           'org-openroadm-otn-odu-interfaces:odu'][
304                           'parent-odu-allocation']['trib-slots'])
305
306     def test_15_check_ODU2E_connection(self):
307         response = test_utils.check_netconf_node_request(
308             "SPDR-SA1",
309             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
310         self.assertEqual(response.status_code, requests.codes.ok)
311         res = response.json()
312         input_dict_1 = {
313             'connection-name':
314             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
315             'direction': 'bidirectional'
316         }
317
318         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
319                              res['odu-connection'][0])
320         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
321                              res['odu-connection'][0]['destination'])
322         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
323                              res['odu-connection'][0]['source'])
324
325     def test_16_otn_service_path_delete_10GE(self):
326         response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
327                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
328                                                            "network-tp": "XPDR1-NETWORK1"}],
329                                                        {"ethernet-encoding": "eth encode",
330                                                         "trib-slot": ["1"], "trib-port-number": "1"})
331         time.sleep(3)
332         self.assertEqual(response.status_code, requests.codes.ok)
333         res = response.json()
334         self.assertIn('Request processed', res["output"]["result"])
335         self.assertTrue(res["output"]["success"])
336
337     def test_17_check_no_ODU2E_connection(self):
338         response = test_utils.check_netconf_node_request(
339             "SPDR-SA1",
340             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
341         self.assertEqual(response.status_code, requests.codes.conflict)
342
343     def test_18_check_no_interface_ODU2E_NETWORK(self):
344         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
345         self.assertEqual(response.status_code, requests.codes.conflict)
346
347     def test_19_check_no_interface_ODU2E_CLIENT(self):
348         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
349         self.assertEqual(response.status_code, requests.codes.conflict)
350
351     def test_20_check_no_interface_10GE_CLIENT(self):
352         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
353         self.assertEqual(response.status_code, requests.codes.conflict)
354
355     def test_21_otn_service_path_delete_ODU4(self):
356         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
357                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
358         time.sleep(3)
359         self.assertEqual(response.status_code, requests.codes.ok)
360         res = response.json()
361         self.assertIn('Request processed', res["output"]["result"])
362         self.assertTrue(res["output"]["success"])
363
364     def test_22_check_no_interface_ODU4(self):
365         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
366         self.assertEqual(response.status_code, requests.codes.conflict)
367
368     def test_23_service_path_delete_OCH_OTU4(self):
369         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
370                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
371                                                    196.1, 40, 196.075, 196.125, 761,
372                                                     768)
373         time.sleep(3)
374         self.assertEqual(response.status_code, requests.codes.ok)
375         res = response.json()
376         self.assertIn('Request processed', res["output"]["result"])
377         self.assertTrue(res["output"]["success"])
378
379     def test_24_check_no_interface_OTU4(self):
380         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
381         self.assertEqual(response.status_code, requests.codes.conflict)
382
383     def test_25_check_no_interface_OCH(self):
384         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
385         self.assertEqual(response.status_code, requests.codes.conflict)
386
387     def test_26_disconnect_SPDR_SA1(self):
388         response = test_utils.unmount_device("SPDR-SA1")
389         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
390
391
392 if __name__ == "__main__":
393     unittest.main(verbosity=2)