5bb9200eee148332cd97598d160b512b7e840eba
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test07_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 # pylint: disable=wrong-import-order
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils  # nopep8
24
25
26 class TransportPCEtesting(unittest.TestCase):
27
28     processes = None
29     NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
30                            "supporting-port": "CP1-CFP0-P1",
31                            "supported-interface-capability": [
32                                "org-openroadm-port-types:if-OCH-OTU4-ODU4"
33                            ],
34                            "port-direction": "bidirectional",
35                            "port-qual": "xpdr-network",
36                            "supporting-circuit-pack-name": "CP1-CFP0",
37                            "xponder-type": "mpdr",
38                            'lcp-hash-val': 'Swfw02qXGyI=',
39                            'port-admin-state': 'InService',
40                            'port-oper-state': 'InService'}
41     NODE_VERSION = '2.2.1'
42
43     @classmethod
44     def setUpClass(cls):
45         cls.processes = test_utils.start_tpce()
46         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
47
48     @classmethod
49     def tearDownClass(cls):
50         # pylint: disable=not-an-iterable
51         for process in cls.processes:
52             test_utils.shutdown_process(process)
53         print("all processes killed")
54
55     def setUp(self):
56         time.sleep(5)
57
58     def test_01_connect_SPDR_SA1(self):
59         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
60         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
61         time.sleep(10)
62
63         response = test_utils.get_netconf_oper_request("SPDR-SA1")
64         self.assertEqual(response.status_code, requests.codes.ok)
65         res = response.json()
66         self.assertEqual(
67             res['node'][0]['netconf-node-topology:connection-status'],
68             'connected')
69
70     def test_02_get_portmapping_CLIENT4(self):
71         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
72         self.assertEqual(response.status_code, requests.codes.ok)
73         res_mapping = (response.json())['mapping'][0]
74         self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
75         self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
76         self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
77         self.assertEqual('bidirectional', res_mapping['port-direction'])
78         self.assertEqual('xpdr-client', res_mapping['port-qual'])
79         self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
80         self.assertEqual('InService', res_mapping['port-admin-state'])
81         self.assertEqual('InService', res_mapping['port-oper-state'])
82         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
83         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
84         self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
85
86     def test_03_get_portmapping_NETWORK1(self):
87         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
88         self.assertEqual(response.status_code, requests.codes.ok)
89         res = response.json()
90         self.assertIn(
91             self.NETWORK1_CHECK_DICT,
92             res['mapping'])
93
94     def test_04_service_path_create_OCH_OTU4(self):
95         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
96                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
97                                                    196.1, 40, 196.075, 196.125, 761,
98                                                    768)
99         time.sleep(3)
100         self.assertEqual(response.status_code, requests.codes.ok)
101         res = response.json()
102         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
103         self.assertTrue(res["output"]["success"])
104         self.assertIn(
105             {'node-id': 'SPDR-SA1',
106              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
107              'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
108
109     def test_05_get_portmapping_NETWORK1(self):
110         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
111         self.assertEqual(response.status_code, requests.codes.ok)
112         res = response.json()
113         self.assertIn(
114             self.NETWORK1_CHECK_DICT,
115             res['mapping'])
116
117     def test_06_check_interface_och(self):
118         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
119         self.assertEqual(response.status_code, requests.codes.ok)
120         res = response.json()
121
122         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
123                                                           'administrative-state': 'inService',
124                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
125                                                           'type': 'org-openroadm-interfaces:opticalChannel',
126                                                           'supporting-port': 'CP1-CFP0-P1'
127                                                           }),
128                              res['interface'][0])
129
130         self.assertDictEqual(
131             {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
132              'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
133             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
134
135     def test_07_check_interface_OTU(self):
136         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
137         self.assertEqual(response.status_code, requests.codes.ok)
138         res = response.json()
139         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
140                         'administrative-state': 'inService',
141                         'supporting-circuit-pack-name': 'CP1-CFP0',
142                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
143                         'type': 'org-openroadm-interfaces:otnOtu',
144                         'supporting-port': 'CP1-CFP0-P1'
145                         }
146
147         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
148                         'expected-sapi': 'Swfw02qXGyI=',
149                         'tx-sapi': 'Swfw02qXGyI=',
150                         'expected-dapi': 'Swfw02qXGyI=',
151                         'rate': 'org-openroadm-otn-common-types:OTU4',
152                         'fec': 'scfec'
153                         }
154
155         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
156                              res['interface'][0])
157
158         self.assertDictEqual(input_dict_2,
159                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
160
161     def test_08_otn_service_path_create_ODU4(self):
162         response = test_utils.otn_service_path_request("create", "service_ODU4", "100", "ODU",
163                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
164         time.sleep(3)
165         self.assertEqual(response.status_code, requests.codes.ok)
166         res = response.json()
167         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
168         self.assertTrue(res["output"]["success"])
169         self.assertIn(
170             {'node-id': 'SPDR-SA1',
171              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
172
173     def test_09_get_portmapping_NETWORK1(self):
174         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
175         self.assertEqual(response.status_code, requests.codes.ok)
176         res = response.json()
177         self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
178         self.NETWORK1_CHECK_DICT["supporting-otu4"] = "XPDR1-NETWORK1-OTU"
179         self.assertIn(
180             self.NETWORK1_CHECK_DICT,
181             res['mapping'])
182
183     def test_10_check_interface_ODU4(self):
184         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
185         self.assertEqual(response.status_code, requests.codes.ok)
186         res = response.json()
187         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
188                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
189                         'type': 'org-openroadm-interfaces:otnOdu',
190                         'supporting-port': 'CP1-CFP0-P1'}
191         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
192                         'rate': 'org-openroadm-otn-common-types:ODU4',
193                         'expected-dapi': 'Swfw02qXGyI=',
194                         'expected-sapi': 'Swfw02qXGyI=',
195                         'tx-dapi': 'Swfw02qXGyI=',
196                         'tx-sapi': 'Swfw02qXGyI='}
197
198         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
199                              res['interface'][0])
200         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
201                                   **input_dict_2
202                                   ),
203                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
204                              )
205         self.assertDictEqual(
206             {'payload-type': '21', 'exp-payload-type': '21'},
207             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
208
209     def test_11_otn_service_path_create_10GE(self):
210         response = test_utils.otn_service_path_request("create", "service1", "10", "Ethernet",
211                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
212                                                            "network-tp": "XPDR1-NETWORK1"}],
213                                                        {"ethernet-encoding": "eth encode",
214                                                         "trib-slot": ["1"], "trib-port-number": "1"})
215         time.sleep(3)
216         self.assertEqual(response.status_code, requests.codes.ok)
217         res = response.json()
218         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
219         self.assertTrue(res["output"]["success"])
220         self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
221         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
222                       res["output"]['node-interface'][0]['connection-id'])
223         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
224         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
225         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
226
227     def test_12_check_interface_10GE_CLIENT(self):
228         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
229         self.assertEqual(response.status_code, requests.codes.ok)
230         res = response.json()
231         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
232                       'administrative-state': 'inService',
233                       'supporting-circuit-pack-name': 'CP1-SFP4',
234                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
235                       'supporting-port': 'CP1-SFP4-P1'
236                       }
237         self.assertDictEqual(dict(res['interface'][0], **input_dict),
238                              res['interface'][0])
239         self.assertDictEqual(
240             {'speed': 10000},
241             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
242
243     def test_13_check_interface_ODU2E_CLIENT(self):
244         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
245         self.assertEqual(response.status_code, requests.codes.ok)
246         res = response.json()
247
248         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
249                         'administrative-state': 'inService',
250                         'supporting-circuit-pack-name': 'CP1-SFP4',
251                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
252                         'type': 'org-openroadm-interfaces:otnOdu',
253                         'supporting-port': 'CP1-SFP4-P1'}
254         input_dict_2 = {
255             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
256             'rate': 'org-openroadm-otn-common-types:ODU2e',
257             'monitoring-mode': 'terminated'}
258
259         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
260                              res['interface'][0])
261         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
262                                   **input_dict_2),
263                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
264         self.assertDictEqual(
265             {'payload-type': '03', 'exp-payload-type': '03'},
266             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
267
268     def test_14_check_interface_ODU2E_NETWORK(self):
269         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
270         self.assertEqual(response.status_code, requests.codes.ok)
271         res = response.json()
272         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
273                         'supporting-circuit-pack-name': 'CP1-CFP0',
274                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
275                         'type': 'org-openroadm-interfaces:otnOdu',
276                         'supporting-port': 'CP1-CFP0-P1'}
277         input_dict_2 = {
278             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
279             'rate': 'org-openroadm-otn-common-types:ODU2e',
280             'monitoring-mode': 'monitored'}
281
282         input_dict_3 = {'trib-port-number': 1}
283
284         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
285                              res['interface'][0])
286         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
287                                   **input_dict_2),
288                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
289         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
290             'parent-odu-allocation'], **input_dict_3
291         ),
292             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
293             'parent-odu-allocation'])
294         self.assertIn(1,
295                       res['interface'][0][
296                           'org-openroadm-otn-odu-interfaces:odu'][
297                           'parent-odu-allocation']['trib-slots'])
298
299     def test_15_check_ODU2E_connection(self):
300         response = test_utils.check_netconf_node_request(
301             "SPDR-SA1",
302             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
303         self.assertEqual(response.status_code, requests.codes.ok)
304         res = response.json()
305         input_dict_1 = {
306             'connection-name':
307             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
308             'direction': 'bidirectional'
309         }
310
311         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
312                              res['odu-connection'][0])
313         self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1'},
314                              res['odu-connection'][0]['destination'])
315         self.assertDictEqual({'src-if': 'XPDR1-CLIENT4-ODU2e-service1'},
316                              res['odu-connection'][0]['source'])
317
318     def test_16_otn_service_path_delete_10GE(self):
319         response = test_utils.otn_service_path_request("delete", "service1", "10", "Ethernet",
320                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
321                                                            "network-tp": "XPDR1-NETWORK1"}],
322                                                        {"ethernet-encoding": "eth encode",
323                                                         "trib-slot": ["1"], "trib-port-number": "1"})
324         time.sleep(3)
325         self.assertEqual(response.status_code, requests.codes.ok)
326         res = response.json()
327         self.assertIn('Request processed', res["output"]["result"])
328         self.assertTrue(res["output"]["success"])
329
330     def test_17_check_no_ODU2E_connection(self):
331         response = test_utils.check_netconf_node_request(
332             "SPDR-SA1",
333             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
334         self.assertEqual(response.status_code, requests.codes.conflict)
335
336     def test_18_check_no_interface_ODU2E_NETWORK(self):
337         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
338         self.assertEqual(response.status_code, requests.codes.conflict)
339
340     def test_19_check_no_interface_ODU2E_CLIENT(self):
341         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
342         self.assertEqual(response.status_code, requests.codes.conflict)
343
344     def test_20_check_no_interface_10GE_CLIENT(self):
345         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
346         self.assertEqual(response.status_code, requests.codes.conflict)
347
348     def test_21_otn_service_path_delete_ODU4(self):
349         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100", "ODU",
350                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
351         time.sleep(3)
352         self.assertEqual(response.status_code, requests.codes.ok)
353         res = response.json()
354         self.assertIn('Request processed', res["output"]["result"])
355         self.assertTrue(res["output"]["success"])
356
357     def test_22_check_no_interface_ODU4(self):
358         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
359         self.assertEqual(response.status_code, requests.codes.conflict)
360
361     def test_23_service_path_delete_OCH_OTU4(self):
362         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
363                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
364                                                    196.1, 40, 196.075, 196.125, 761,
365                                                    768)
366         time.sleep(3)
367         self.assertEqual(response.status_code, requests.codes.ok)
368         res = response.json()
369         self.assertIn('Request processed', res["output"]["result"])
370         self.assertTrue(res["output"]["success"])
371
372     def test_24_check_no_interface_OTU4(self):
373         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
374         self.assertEqual(response.status_code, requests.codes.conflict)
375
376     def test_25_check_no_interface_OCH(self):
377         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
378         self.assertEqual(response.status_code, requests.codes.conflict)
379
380     def test_26_disconnect_SPDR_SA1(self):
381         response = test_utils.unmount_device("SPDR-SA1")
382         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
383
384
385 if __name__ == "__main__":
386     unittest.main(verbosity=2)