Change way to start simulators
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 import sys
19 sys.path.append('transportpce_tests/common/')
20 import test_utils
21
22
23 class TransportPCEtesting(unittest.TestCase):
24
25     processes = None
26     NETWORK1_CHECK_DICT = {"logical-connection-point": "XPDR1-NETWORK1",
27                            "supporting-port": "CP1-CFP0-P1",
28                            "supported-interface-capability": [
29                                "org-openroadm-port-types:if-OCH-OTU4-ODU4"
30                            ],
31                            "port-direction": "bidirectional",
32                            "port-qual": "xpdr-network",
33                            "supporting-circuit-pack-name": "CP1-CFP0",
34                            "xponder-type": "mpdr",
35                            'lcp-hash-val': 'Swfw02qXGyI=',
36                            'port-admin-state': 'InService',
37                            'port-oper-state': 'InService'}
38     NODE_VERSION = '2.2.1'
39
40     @classmethod
41     def setUpClass(cls):
42         cls.processes = test_utils.start_tpce()
43         cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION)])
44
45     @classmethod
46     def tearDownClass(cls):
47         # pylint: disable=not-an-iterable
48         for process in cls.processes:
49             test_utils.shutdown_process(process)
50         print("all processes killed")
51
52     def setUp(self):
53         time.sleep(5)
54
55     def test_01_connect_SPDR_SA1(self):
56         response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
57         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
58         time.sleep(10)
59
60         response = test_utils.get_netconf_oper_request("SPDR-SA1")
61         self.assertEqual(response.status_code, requests.codes.ok)
62         res = response.json()
63         self.assertEqual(
64             res['node'][0]['netconf-node-topology:connection-status'],
65             'connected')
66
67     def test_02_get_portmapping_CLIENT4(self):
68         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
69         self.assertEqual(response.status_code, requests.codes.ok)
70         res_mapping = (response.json())['mapping'][0]
71         self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
72         self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
73         self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
74         self.assertEqual('bidirectional', res_mapping['port-direction'])
75         self.assertEqual('xpdr-client', res_mapping['port-qual'])
76         self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
77         self.assertEqual('InService', res_mapping['port-admin-state'])
78         self.assertEqual('InService', res_mapping['port-oper-state'])
79         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
80         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
81         self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
82
83     def test_03_get_portmapping_NETWORK1(self):
84         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
85         self.assertEqual(response.status_code, requests.codes.ok)
86         res = response.json()
87         self.assertIn(
88             self.NETWORK1_CHECK_DICT,
89             res['mapping'])
90
91     def test_04_service_path_create_OCH_OTU4(self):
92         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
93                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
94                                                    196.1, 40, 196.075, 196.125, 761,
95                                                    768)
96         time.sleep(3)
97         self.assertEqual(response.status_code, requests.codes.ok)
98         res = response.json()
99         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
100         self.assertTrue(res["output"]["success"])
101         self.assertIn(
102             {'node-id': 'SPDR-SA1',
103              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
104              'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
105
106     def test_05_get_portmapping_NETWORK1(self):
107         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
108         self.assertEqual(response.status_code, requests.codes.ok)
109         res = response.json()
110         self.assertIn(
111             self.NETWORK1_CHECK_DICT,
112             res['mapping'])
113
114     def test_06_check_interface_och(self):
115         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
116         self.assertEqual(response.status_code, requests.codes.ok)
117         res = response.json()
118
119         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
120                                                           'administrative-state': 'inService',
121                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
122                                                           'type': 'org-openroadm-interfaces:opticalChannel',
123                                                           'supporting-port': 'CP1-CFP0-P1'
124                                                           }),
125                              res['interface'][0])
126
127         self.assertDictEqual(
128             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
129              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
130             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
131
132     def test_07_check_interface_OTU(self):
133         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
134         self.assertEqual(response.status_code, requests.codes.ok)
135         res = response.json()
136         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
137                         'administrative-state': 'inService',
138                         'supporting-circuit-pack-name': 'CP1-CFP0',
139                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
140                         'type': 'org-openroadm-interfaces:otnOtu',
141                         'supporting-port': 'CP1-CFP0-P1'
142                         }
143
144         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
145                         'expected-sapi': 'Swfw02qXGyI=',
146                         'tx-sapi': 'Swfw02qXGyI=',
147                         'expected-dapi': 'Swfw02qXGyI=',
148                         'rate': 'org-openroadm-otn-common-types:OTU4',
149                         'fec': 'scfec'
150                         }
151
152         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
153                              res['interface'][0])
154
155         self.assertDictEqual(input_dict_2,
156                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
157
158     def test_08_otn_service_path_create_ODU4(self):
159         response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
160                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
161         time.sleep(3)
162         self.assertEqual(response.status_code, requests.codes.ok)
163         res = response.json()
164         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
165         self.assertTrue(res["output"]["success"])
166         self.assertIn(
167             {'node-id': 'SPDR-SA1',
168              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
169
170     def test_09_get_portmapping_NETWORK1(self):
171         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
172         self.assertEqual(response.status_code, requests.codes.ok)
173         res = response.json()
174         self.NETWORK1_CHECK_DICT["supporting-odu4"] = "XPDR1-NETWORK1-ODU4"
175         self.assertIn(
176             self.NETWORK1_CHECK_DICT,
177             res['mapping'])
178
179     def test_10_check_interface_ODU4(self):
180         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
181         self.assertEqual(response.status_code, requests.codes.ok)
182         res = response.json()
183         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
184                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
185                         'type': 'org-openroadm-interfaces:otnOdu',
186                         'supporting-port': 'CP1-CFP0-P1'}
187         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
188                         'rate': 'org-openroadm-otn-common-types:ODU4',
189                         'expected-dapi': 'Swfw02qXGyI=',
190                         'expected-sapi': 'Swfw02qXGyI=',
191                         'tx-dapi': 'Swfw02qXGyI=',
192                         'tx-sapi': 'Swfw02qXGyI='}
193
194         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
195                              res['interface'][0])
196         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
197                                   **input_dict_2
198                                   ),
199                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
200                              )
201         self.assertDictEqual(
202             {u'payload-type': u'21', u'exp-payload-type': u'21'},
203             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
204
205     def test_11_otn_service_path_create_10GE(self):
206         response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
207                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
208                                                            "network-tp": "XPDR1-NETWORK1"}],
209                                                        {"ethernet-encoding": "eth encode",
210                                                         "trib-slot": ["1"], "trib-port-number": "1"})
211         time.sleep(3)
212         self.assertEqual(response.status_code, requests.codes.ok)
213         res = response.json()
214         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
215         self.assertTrue(res["output"]["success"])
216         self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
217         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
218                       res["output"]['node-interface'][0]['connection-id'])
219         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
220         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
221         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
222
223     def test_12_check_interface_10GE_CLIENT(self):
224         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
225         self.assertEqual(response.status_code, requests.codes.ok)
226         res = response.json()
227         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
228                       'administrative-state': 'inService',
229                       'supporting-circuit-pack-name': 'CP1-SFP4',
230                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
231                       'supporting-port': 'CP1-SFP4-P1'
232                       }
233         self.assertDictEqual(dict(res['interface'][0], **input_dict),
234                              res['interface'][0])
235         self.assertDictEqual(
236             {u'speed': 10000},
237             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
238
239     def test_13_check_interface_ODU2E_CLIENT(self):
240         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
241         self.assertEqual(response.status_code, requests.codes.ok)
242         res = response.json()
243
244         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
245                         'administrative-state': 'inService',
246                         'supporting-circuit-pack-name': 'CP1-SFP4',
247                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
248                         'type': 'org-openroadm-interfaces:otnOdu',
249                         'supporting-port': 'CP1-SFP4-P1'}
250         input_dict_2 = {
251             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
252             'rate': 'org-openroadm-otn-common-types:ODU2e',
253             'monitoring-mode': 'terminated'}
254
255         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
256                              res['interface'][0])
257         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
258                                   **input_dict_2),
259                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
260         self.assertDictEqual(
261             {u'payload-type': u'03', u'exp-payload-type': u'03'},
262             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
263
264     def test_14_check_interface_ODU2E_NETWORK(self):
265         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
266         self.assertEqual(response.status_code, requests.codes.ok)
267         res = response.json()
268         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
269                         'supporting-circuit-pack-name': 'CP1-CFP0',
270                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
271                         'type': 'org-openroadm-interfaces:otnOdu',
272                         'supporting-port': 'CP1-CFP0-P1'}
273         input_dict_2 = {
274             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
275             'rate': 'org-openroadm-otn-common-types:ODU2e',
276             'monitoring-mode': 'monitored'}
277
278         input_dict_3 = {'trib-port-number': 1}
279
280         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
281                              res['interface'][0])
282         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
283                                   **input_dict_2),
284                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
285         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
286             'parent-odu-allocation'], **input_dict_3
287         ),
288             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
289             'parent-odu-allocation'])
290         self.assertIn(1,
291                       res['interface'][0][
292                           'org-openroadm-otn-odu-interfaces:odu'][
293                           'parent-odu-allocation']['trib-slots'])
294
295     def test_15_check_ODU2E_connection(self):
296         response = test_utils.check_netconf_node_request(
297             "SPDR-SA1",
298             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
299         self.assertEqual(response.status_code, requests.codes.ok)
300         res = response.json()
301         input_dict_1 = {
302             'connection-name':
303             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
304             'direction': 'bidirectional'
305         }
306
307         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
308                              res['odu-connection'][0])
309         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
310                              res['odu-connection'][0]['destination'])
311         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
312                              res['odu-connection'][0]['source'])
313
314     def test_16_otn_service_path_delete_10GE(self):
315         response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
316                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
317                                                            "network-tp": "XPDR1-NETWORK1"}],
318                                                        {"ethernet-encoding": "eth encode",
319                                                         "trib-slot": ["1"], "trib-port-number": "1"})
320         time.sleep(3)
321         self.assertEqual(response.status_code, requests.codes.ok)
322         res = response.json()
323         self.assertIn('Request processed', res["output"]["result"])
324         self.assertTrue(res["output"]["success"])
325
326     def test_17_check_no_ODU2E_connection(self):
327         response = test_utils.check_netconf_node_request(
328             "SPDR-SA1",
329             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
330         self.assertEqual(response.status_code, requests.codes.conflict)
331
332     def test_18_check_no_interface_ODU2E_NETWORK(self):
333         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
334         self.assertEqual(response.status_code, requests.codes.conflict)
335
336     def test_19_check_no_interface_ODU2E_CLIENT(self):
337         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
338         self.assertEqual(response.status_code, requests.codes.conflict)
339
340     def test_20_check_no_interface_10GE_CLIENT(self):
341         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
342         self.assertEqual(response.status_code, requests.codes.conflict)
343
344     def test_21_otn_service_path_delete_ODU4(self):
345         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
346                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
347         time.sleep(3)
348         self.assertEqual(response.status_code, requests.codes.ok)
349         res = response.json()
350         self.assertIn('Request processed', res["output"]["result"])
351         self.assertTrue(res["output"]["success"])
352
353     def test_22_check_no_interface_ODU4(self):
354         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
355         self.assertEqual(response.status_code, requests.codes.conflict)
356
357     def test_23_service_path_delete_OCH_OTU4(self):
358         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
359                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
360                                                    196.1, 40, 196.075, 196.125, 761,
361                                                    768)
362         time.sleep(3)
363         self.assertEqual(response.status_code, requests.codes.ok)
364         res = response.json()
365         self.assertIn('Request processed', res["output"]["result"])
366         self.assertTrue(res["output"]["success"])
367
368     def test_24_check_no_interface_OTU4(self):
369         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
370         self.assertEqual(response.status_code, requests.codes.conflict)
371
372     def test_25_check_no_interface_OCH(self):
373         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
374         self.assertEqual(response.status_code, requests.codes.conflict)
375
376     def test_26_disconnect_SPDR_SA1(self):
377         response = test_utils.unmount_device("SPDR-SA1")
378         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
379
380
381 if __name__ == "__main__":
382     unittest.main(verbosity=2)