a275c2ab4c9aaf7df399a72ad4fef3eac82c4f23
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 from common import test_utils
19
20
21 class TransportPCEtesting(unittest.TestCase):
22
23     processes = None
24     NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
25                            "supporting-port": "CP1-CFP0-P1",
26                            "supported-interface-capability": [
27                                "org-openroadm-port-types:if-OCH-OTU4-ODU4"
28                            ],
29                            "port-direction": "bidirectional",
30                            "port-qual": "xpdr-network",
31                            "supporting-circuit-pack-name": "CP1-CFP0",
32                            "xponder-type": "mpdr",
33                            'lcp-hash-val': 'Swfw02qXGyI=',
34                            'port-admin-state': 'InService',
35                            'port-oper-state': 'InService'}
36
37     @classmethod
38     def setUpClass(cls):
39         cls.processes = test_utils.start_tpce()
40         cls.processes = test_utils.start_sims(['spdra'])
41
42     @classmethod
43     def tearDownClass(cls):
44         # pylint: disable=not-an-iterable
45         for process in cls.processes:
46             test_utils.shutdown_process(process)
47         print("all processes killed")
48
49     def setUp(self):
50         time.sleep(5)
51
52     def test_01_connect_SPDR_SA1(self):
53         response = test_utils.mount_device("SPDR-SA1", 'spdra')
54         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
55         time.sleep(10)
56
57         response = test_utils.get_netconf_oper_request("SPDR-SA1")
58         self.assertEqual(response.status_code, requests.codes.ok)
59         res = response.json()
60         self.assertEqual(
61             res['node'][0]['netconf-node-topology:connection-status'],
62             'connected')
63
64     def test_02_get_portmapping_CLIENT4(self):
65         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
66         self.assertEqual(response.status_code, requests.codes.ok)
67         res_mapping = (response.json())['mapping'][0]
68         self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
69         self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
70         self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
71         self.assertEqual('bidirectional', res_mapping['port-direction'])
72         self.assertEqual('xpdr-client', res_mapping['port-qual'])
73         self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
74         self.assertEqual('InService', res_mapping['port-admin-state'])
75         self.assertEqual('InService', res_mapping['port-oper-state'])
76         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
77         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
78         self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
79
80     def test_03_get_portmapping_NETWORK1(self):
81         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
82         self.assertEqual(response.status_code, requests.codes.ok)
83         res = response.json()
84         self.assertIn(
85             self.NETWORK1_CHECK_DICT,
86             res['mapping'])
87
88     def test_04_service_path_create_OCH_OTU4(self):
89         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
90                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
91                                                    196.1, 40, 196.075, 196.125, 761,
92                                                    768)
93         time.sleep(3)
94         self.assertEqual(response.status_code, requests.codes.ok)
95         res = response.json()
96         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
97         self.assertTrue(res["output"]["success"])
98         self.assertIn(
99             {'node-id': 'SPDR-SA1',
100              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
101              'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
102
103     def test_05_get_portmapping_NETWORK1(self):
104         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
105         self.assertEqual(response.status_code, requests.codes.ok)
106         res = response.json()
107         self.assertIn(
108             self.NETWORK1_CHECK_DICT,
109             res['mapping'])
110
111     def test_06_check_interface_och(self):
112         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
113         self.assertEqual(response.status_code, requests.codes.ok)
114         res = response.json()
115
116         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
117                                                           'administrative-state': 'inService',
118                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
119                                                           'type': 'org-openroadm-interfaces:opticalChannel',
120                                                           'supporting-port': 'CP1-CFP0-P1'
121                                                           }),
122                              res['interface'][0])
123
124         self.assertDictEqual(
125             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
126              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
127             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
128
129     def test_07_check_interface_OTU(self):
130         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
131         self.assertEqual(response.status_code, requests.codes.ok)
132         res = response.json()
133         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
134                         'administrative-state': 'inService',
135                         'supporting-circuit-pack-name': 'CP1-CFP0',
136                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
137                         'type': 'org-openroadm-interfaces:otnOtu',
138                         'supporting-port': 'CP1-CFP0-P1'
139                         }
140
141         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
142                         'expected-sapi': 'Swfw02qXGyI=',
143                         'tx-sapi': 'Swfw02qXGyI=',
144                         'expected-dapi': 'Swfw02qXGyI=',
145                         'rate': 'org-openroadm-otn-common-types:OTU4',
146                         'fec': 'scfec'
147                         }
148
149         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
150                              res['interface'][0])
151
152         self.assertDictEqual(input_dict_2,
153                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
154
155     def test_08_otn_service_path_create_ODU4(self):
156         response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
157                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
158         time.sleep(3)
159         self.assertEqual(response.status_code, requests.codes.ok)
160         res = response.json()
161         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
162         self.assertTrue(res["output"]["success"])
163         self.assertIn(
164             {'node-id': 'SPDR-SA1',
165              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
166
167     def test_09_get_portmapping_NETWORK1(self):
168         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
169         self.assertEqual(response.status_code, requests.codes.ok)
170         res = response.json()
171         self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
172         self.assertIn(
173             self.NETWORK1_CHECK_DICT,
174             res['mapping'])
175
176     def test_10_check_interface_ODU4(self):
177         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
178         self.assertEqual(response.status_code, requests.codes.ok)
179         res = response.json()
180         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
181                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
182                         'type': 'org-openroadm-interfaces:otnOdu',
183                         'supporting-port': 'CP1-CFP0-P1'}
184         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
185                         'rate': 'org-openroadm-otn-common-types:ODU4',
186                         'expected-dapi': 'Swfw02qXGyI=',
187                         'expected-sapi': 'Swfw02qXGyI=',
188                         'tx-dapi': 'Swfw02qXGyI=',
189                         'tx-sapi': 'Swfw02qXGyI='}
190
191         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
192                              res['interface'][0])
193         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
194                                   **input_dict_2
195                                   ),
196                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
197                              )
198         self.assertDictEqual(
199             {u'payload-type': u'21', u'exp-payload-type': u'21'},
200             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
201
202     def test_11_otn_service_path_create_10GE(self):
203         response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
204                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
205                                                            "network-tp": "XPDR1-NETWORK1"}],
206                                                        {"ethernet-encoding": "eth encode",
207                                                         "trib-slot": ["1"], "trib-port-number": "1"})
208         time.sleep(3)
209         self.assertEqual(response.status_code, requests.codes.ok)
210         res = response.json()
211         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
212         self.assertTrue(res["output"]["success"])
213         self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
214         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
215                       res["output"]['node-interface'][0]['connection-id'])
216         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
217         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
218         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
219
220     def test_12_check_interface_10GE_CLIENT(self):
221         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
222         self.assertEqual(response.status_code, requests.codes.ok)
223         res = response.json()
224         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
225                       'administrative-state': 'inService',
226                       'supporting-circuit-pack-name': 'CP1-SFP4',
227                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
228                       'supporting-port': 'CP1-SFP4-P1'
229                       }
230         self.assertDictEqual(dict(res['interface'][0], **input_dict),
231                              res['interface'][0])
232         self.assertDictEqual(
233             {u'speed': 10000},
234             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
235
236     def test_13_check_interface_ODU2E_CLIENT(self):
237         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
238         self.assertEqual(response.status_code, requests.codes.ok)
239         res = response.json()
240
241         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
242                         'administrative-state': 'inService',
243                         'supporting-circuit-pack-name': 'CP1-SFP4',
244                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
245                         'type': 'org-openroadm-interfaces:otnOdu',
246                         'supporting-port': 'CP1-SFP4-P1'}
247         input_dict_2 = {
248             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
249             'rate': 'org-openroadm-otn-common-types:ODU2e',
250             'monitoring-mode': 'terminated'}
251
252         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
253                              res['interface'][0])
254         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
255                                   **input_dict_2),
256                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
257         self.assertDictEqual(
258             {u'payload-type': u'03', u'exp-payload-type': u'03'},
259             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
260
261     def test_14_check_interface_ODU2E_NETWORK(self):
262         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
266                         'supporting-circuit-pack-name': 'CP1-CFP0',
267                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
268                         'type': 'org-openroadm-interfaces:otnOdu',
269                         'supporting-port': 'CP1-CFP0-P1'}
270         input_dict_2 = {
271             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
272             'rate': 'org-openroadm-otn-common-types:ODU2e',
273             'monitoring-mode': 'monitored'}
274
275         input_dict_3 = {'trib-port-number': 1}
276
277         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
278                              res['interface'][0])
279         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
280                                   **input_dict_2),
281                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
282         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
283             'parent-odu-allocation'], **input_dict_3
284         ),
285             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
286             'parent-odu-allocation'])
287         self.assertIn(1,
288                       res['interface'][0][
289                           'org-openroadm-otn-odu-interfaces:odu'][
290                           'parent-odu-allocation']['trib-slots'])
291
292     def test_15_check_ODU2E_connection(self):
293         response = test_utils.check_netconf_node_request(
294             "SPDR-SA1",
295             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
296         self.assertEqual(response.status_code, requests.codes.ok)
297         res = response.json()
298         input_dict_1 = {
299             'connection-name':
300             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
301             'direction': 'bidirectional'
302         }
303
304         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
305                              res['odu-connection'][0])
306         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
307                              res['odu-connection'][0]['destination'])
308         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
309                              res['odu-connection'][0]['source'])
310
311     def test_16_otn_service_path_delete_10GE(self):
312         response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
313                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
314                                                            "network-tp": "XPDR1-NETWORK1"}],
315                                                        {"ethernet-encoding": "eth encode",
316                                                         "trib-slot": ["1"], "trib-port-number": "1"})
317         time.sleep(3)
318         self.assertEqual(response.status_code, requests.codes.ok)
319         res = response.json()
320         self.assertIn('Request processed', res["output"]["result"])
321         self.assertTrue(res["output"]["success"])
322
323     def test_17_check_no_ODU2E_connection(self):
324         response = test_utils.check_netconf_node_request(
325             "SPDR-SA1",
326             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
327         self.assertEqual(response.status_code, requests.codes.conflict)
328
329     def test_18_check_no_interface_ODU2E_NETWORK(self):
330         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
331         self.assertEqual(response.status_code, requests.codes.conflict)
332
333     def test_19_check_no_interface_ODU2E_CLIENT(self):
334         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
335         self.assertEqual(response.status_code, requests.codes.conflict)
336
337     def test_20_check_no_interface_10GE_CLIENT(self):
338         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
339         self.assertEqual(response.status_code, requests.codes.conflict)
340
341     def test_21_otn_service_path_delete_ODU4(self):
342         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
343                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
344         time.sleep(3)
345         self.assertEqual(response.status_code, requests.codes.ok)
346         res = response.json()
347         self.assertIn('Request processed', res["output"]["result"])
348         self.assertTrue(res["output"]["success"])
349
350     def test_22_check_no_interface_ODU4(self):
351         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
352         self.assertEqual(response.status_code, requests.codes.conflict)
353
354     def test_23_service_path_delete_OCH_OTU4(self):
355         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
356                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
357                                                    196.1, 40, 196.075, 196.125, 761,
358                                                    768)
359         time.sleep(3)
360         self.assertEqual(response.status_code, requests.codes.ok)
361         res = response.json()
362         self.assertIn('Request processed', res["output"]["result"])
363         self.assertTrue(res["output"]["success"])
364
365     def test_24_check_no_interface_OTU4(self):
366         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
367         self.assertEqual(response.status_code, requests.codes.conflict)
368
369     def test_25_check_no_interface_OCH(self):
370         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
371         self.assertEqual(response.status_code, requests.codes.conflict)
372
373     def test_26_disconnect_SPDR_SA1(self):
374         response = test_utils.unmount_device("SPDR-SA1")
375         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
376
377
378 if __name__ == "__main__":
379     unittest.main(verbosity=2)