Set port states in portMapping
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 from common import test_utils
19
20
21 class TransportPCEtesting(unittest.TestCase):
22
23     processes = None
24
25     @classmethod
26     def setUpClass(cls):
27         cls.processes = test_utils.start_tpce()
28         cls.processes = test_utils.start_sims(['spdra'])
29
30     @classmethod
31     def tearDownClass(cls):
32         # pylint: disable=not-an-iterable
33         for process in cls.processes:
34             test_utils.shutdown_process(process)
35         print("all processes killed")
36
37     def setUp(self):
38         time.sleep(5)
39
40     def test_01_connect_SPDR_SA1(self):
41         response = test_utils.mount_device("SPDR-SA1", 'spdra')
42         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
43         time.sleep(10)
44
45         response = test_utils.get_netconf_oper_request("SPDR-SA1")
46         self.assertEqual(response.status_code, requests.codes.ok)
47         res = response.json()
48         self.assertEqual(
49             res['node'][0]['netconf-node-topology:connection-status'],
50             'connected')
51
52     def test_02_get_portmapping_CLIENT4(self):
53         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
54         self.assertEqual(response.status_code, requests.codes.ok)
55         res_mapping = (response.json())['mapping'][0]
56         self.assertEqual('CP1-SFP4-P1', res_mapping['supporting-port'])
57         self.assertEqual('CP1-SFP4', res_mapping['supporting-circuit-pack-name'])
58         self.assertEqual('XPDR1-CLIENT4', res_mapping['logical-connection-point'])
59         self.assertEqual('bidirectional', res_mapping['port-direction'])
60         self.assertEqual('xpdr-client', res_mapping['port-qual'])
61         self.assertEqual('FqlcrxV7p3g=', res_mapping['lcp-hash-val'])
62         self.assertEqual('InService', res_mapping['port-admin-state'])
63         self.assertEqual('InService', res_mapping['port-oper-state'])
64         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res_mapping['supported-interface-capability'])
65         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res_mapping['supported-interface-capability'])
66         self.assertIn('org-openroadm-port-types:if-10GE', res_mapping['supported-interface-capability'])
67
68     def test_03_get_portmapping_NETWORK1(self):
69         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
70         self.assertEqual(response.status_code, requests.codes.ok)
71         res = response.json()
72         self.assertIn(
73             {"logical-connection-point": "XPDR1-NETWORK1",
74              "supporting-port": "CP1-CFP0-P1",
75              "supported-interface-capability": [
76                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
77              ],
78                 "port-direction": "bidirectional",
79                 "port-qual": "xpdr-network",
80                 "supporting-circuit-pack-name": "CP1-CFP0",
81                 "xponder-type": "mpdr",
82              'lcp-hash-val': 'Swfw02qXGyI=',
83              'port-admin-state': 'InService',
84              'port-oper-state': 'InService'},
85            res['mapping'])
86
87     def test_04_service_path_create_OCH_OTU4(self):
88         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
89                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
90                                                    196.1, 40, 196.075, 196.125, 761,
91                                                     768)
92         time.sleep(3)
93         self.assertEqual(response.status_code, requests.codes.ok)
94         res = response.json()
95         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
96         self.assertTrue(res["output"]["success"])
97         self.assertIn(
98             {'node-id': 'SPDR-SA1',
99              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
100              'och-interface-id': ['XPDR1-NETWORK1-761:768']}, res["output"]['node-interface'])
101
102     def test_05_get_portmapping_NETWORK1(self):
103         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
104         self.assertEqual(response.status_code, requests.codes.ok)
105         res = response.json()
106         self.assertIn(
107             {"logical-connection-point": "XPDR1-NETWORK1",
108              "supporting-port": "CP1-CFP0-P1",
109              "supported-interface-capability": [
110                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
111              ],
112                 "port-direction": "bidirectional",
113                 "port-qual": "xpdr-network",
114                 "supporting-circuit-pack-name": "CP1-CFP0",
115                 "xponder-type": "mpdr",
116                 "lcp-hash-val": "Swfw02qXGyI=",
117                 "port-admin-state": "InService",
118                 "port-oper-state": "InService"},
119             res['mapping'])
120
121     def test_06_check_interface_och(self):
122         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
123         self.assertEqual(response.status_code, requests.codes.ok)
124         res = response.json()
125
126         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-761:768',
127                                                           'administrative-state': 'inService',
128                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
129                                                           'type': 'org-openroadm-interfaces:opticalChannel',
130                                                           'supporting-port': 'CP1-CFP0-P1'
131                                                           }),
132                              res['interface'][0])
133
134         self.assertDictEqual(
135             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
136              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
137             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
138
139     def test_07_check_interface_OTU(self):
140         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
141         self.assertEqual(response.status_code, requests.codes.ok)
142         res = response.json()
143         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
144                         'administrative-state': 'inService',
145                         'supporting-circuit-pack-name': 'CP1-CFP0',
146                         'supporting-interface': 'XPDR1-NETWORK1-761:768',
147                         'type': 'org-openroadm-interfaces:otnOtu',
148                         'supporting-port': 'CP1-CFP0-P1'
149                         }
150
151         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
152                         'expected-sapi': 'Swfw02qXGyI=',
153                         'tx-sapi': 'Swfw02qXGyI=',
154                         'expected-dapi': 'Swfw02qXGyI=',
155                         'rate': 'org-openroadm-otn-common-types:OTU4',
156                         'fec': 'scfec'
157                         }
158
159         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
160                              res['interface'][0])
161
162         self.assertDictEqual(input_dict_2,
163                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
164
165     def test_08_otn_service_path_create_ODU4(self):
166         response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
167                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
168         time.sleep(3)
169         self.assertEqual(response.status_code, requests.codes.ok)
170         res = response.json()
171         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
172         self.assertTrue(res["output"]["success"])
173         self.assertIn(
174             {'node-id': 'SPDR-SA1',
175              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
176
177     def test_09_get_portmapping_NETWORK1(self):
178         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
179         self.assertEqual(response.status_code, requests.codes.ok)
180         res = response.json()
181         self.assertIn(
182             {"logical-connection-point": "XPDR1-NETWORK1",
183              "supporting-port": "CP1-CFP0-P1",
184              "supported-interface-capability": [
185                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
186              ],
187                 "port-direction": "bidirectional",
188                 "port-qual": "xpdr-network",
189                 "supporting-circuit-pack-name": "CP1-CFP0",
190                 "xponder-type": "mpdr",
191                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
192                 "lcp-hash-val": "Swfw02qXGyI=",
193                 "port-admin-state": "InService",
194                 "port-oper-state": "InService"
195              },
196             res['mapping'])
197
198     def test_10_check_interface_ODU4(self):
199         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
200         self.assertEqual(response.status_code, requests.codes.ok)
201         res = response.json()
202         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
203                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
204                         'type': 'org-openroadm-interfaces:otnOdu',
205                         'supporting-port': 'CP1-CFP0-P1'}
206         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
207                         'rate': 'org-openroadm-otn-common-types:ODU4',
208                         'expected-dapi': 'Swfw02qXGyI=',
209                         'expected-sapi': 'Swfw02qXGyI=',
210                         'tx-dapi': 'Swfw02qXGyI=',
211                         'tx-sapi': 'Swfw02qXGyI='}
212
213         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
214                              res['interface'][0])
215         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
216                                   **input_dict_2
217                                   ),
218                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
219                              )
220         self.assertDictEqual(
221             {u'payload-type': u'21', u'exp-payload-type': u'21'},
222             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
223
224     def test_11_otn_service_path_create_10GE(self):
225         response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
226                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
227                                                            "network-tp": "XPDR1-NETWORK1"}],
228                                                        {"ethernet-encoding": "eth encode",
229                                                         "trib-slot": ["1"], "trib-port-number": "1"})
230         time.sleep(3)
231         self.assertEqual(response.status_code, requests.codes.ok)
232         res = response.json()
233         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
234         self.assertTrue(res["output"]["success"])
235         self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
236         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
237                       res["output"]['node-interface'][0]['connection-id'])
238         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
239         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
240         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
241
242     def test_12_check_interface_10GE_CLIENT(self):
243         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
244         self.assertEqual(response.status_code, requests.codes.ok)
245         res = response.json()
246         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
247                       'administrative-state': 'inService',
248                       'supporting-circuit-pack-name': 'CP1-SFP4',
249                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
250                       'supporting-port': 'CP1-SFP4-P1'
251                       }
252         self.assertDictEqual(dict(res['interface'][0], **input_dict),
253                              res['interface'][0])
254         self.assertDictEqual(
255             {u'speed': 10000},
256             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
257
258     def test_13_check_interface_ODU2E_CLIENT(self):
259         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
260         self.assertEqual(response.status_code, requests.codes.ok)
261         res = response.json()
262
263         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
264                         'administrative-state': 'inService',
265                         'supporting-circuit-pack-name': 'CP1-SFP4',
266                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
267                         'type': 'org-openroadm-interfaces:otnOdu',
268                         'supporting-port': 'CP1-SFP4-P1'}
269         input_dict_2 = {
270             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
271             'rate': 'org-openroadm-otn-common-types:ODU2e',
272             'monitoring-mode': 'terminated'}
273
274         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
275                              res['interface'][0])
276         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
277                                   **input_dict_2),
278                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
279         self.assertDictEqual(
280             {u'payload-type': u'03', u'exp-payload-type': u'03'},
281             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
282
283     def test_14_check_interface_ODU2E_NETWORK(self):
284         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
285         self.assertEqual(response.status_code, requests.codes.ok)
286         res = response.json()
287         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
288                         'supporting-circuit-pack-name': 'CP1-CFP0',
289                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
290                         'type': 'org-openroadm-interfaces:otnOdu',
291                         'supporting-port': 'CP1-CFP0-P1'}
292         input_dict_2 = {
293             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
294             'rate': 'org-openroadm-otn-common-types:ODU2e',
295             'monitoring-mode': 'monitored'}
296
297         input_dict_3 = {'trib-port-number': 1}
298
299         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
300                              res['interface'][0])
301         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
302                                   **input_dict_2),
303                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
304         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
305             'parent-odu-allocation'], **input_dict_3
306         ),
307             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
308             'parent-odu-allocation'])
309         self.assertIn(1,
310                       res['interface'][0][
311                           'org-openroadm-otn-odu-interfaces:odu'][
312                           'parent-odu-allocation']['trib-slots'])
313
314     def test_15_check_ODU2E_connection(self):
315         response = test_utils.check_netconf_node_request(
316             "SPDR-SA1",
317             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
318         self.assertEqual(response.status_code, requests.codes.ok)
319         res = response.json()
320         input_dict_1 = {
321             'connection-name':
322             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
323             'direction': 'bidirectional'
324         }
325
326         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
327                              res['odu-connection'][0])
328         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
329                              res['odu-connection'][0]['destination'])
330         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
331                              res['odu-connection'][0]['source'])
332
333     def test_16_otn_service_path_delete_10GE(self):
334         response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
335                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
336                                                            "network-tp": "XPDR1-NETWORK1"}],
337                                                        {"ethernet-encoding": "eth encode",
338                                                         "trib-slot": ["1"], "trib-port-number": "1"})
339         time.sleep(3)
340         self.assertEqual(response.status_code, requests.codes.ok)
341         res = response.json()
342         self.assertIn('Request processed', res["output"]["result"])
343         self.assertTrue(res["output"]["success"])
344
345     def test_17_check_no_ODU2E_connection(self):
346         response = test_utils.check_netconf_node_request(
347             "SPDR-SA1",
348             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
349         self.assertEqual(response.status_code, requests.codes.conflict)
350
351     def test_18_check_no_interface_ODU2E_NETWORK(self):
352         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
353         self.assertEqual(response.status_code, requests.codes.conflict)
354
355     def test_19_check_no_interface_ODU2E_CLIENT(self):
356         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
357         self.assertEqual(response.status_code, requests.codes.conflict)
358
359     def test_20_check_no_interface_10GE_CLIENT(self):
360         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
361         self.assertEqual(response.status_code, requests.codes.conflict)
362
363     def test_21_otn_service_path_delete_ODU4(self):
364         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
365                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
366         time.sleep(3)
367         self.assertEqual(response.status_code, requests.codes.ok)
368         res = response.json()
369         self.assertIn('Request processed', res["output"]["result"])
370         self.assertTrue(res["output"]["success"])
371
372     def test_22_check_no_interface_ODU4(self):
373         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
374         self.assertEqual(response.status_code, requests.codes.conflict)
375
376     def test_23_service_path_delete_OCH_OTU4(self):
377         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
378                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}],
379                                                    196.1, 40, 196.075, 196.125, 761,
380                                                     768)
381         time.sleep(3)
382         self.assertEqual(response.status_code, requests.codes.ok)
383         res = response.json()
384         self.assertIn('Request processed', res["output"]["result"])
385         self.assertTrue(res["output"]["success"])
386
387     def test_24_check_no_interface_OTU4(self):
388         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
389         self.assertEqual(response.status_code, requests.codes.conflict)
390
391     def test_25_check_no_interface_OCH(self):
392         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
393         self.assertEqual(response.status_code, requests.codes.conflict)
394
395     def test_26_disconnect_SPDR_SA1(self):
396         response = test_utils.unmount_device("SPDR-SA1")
397         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
398
399
400 if __name__ == "__main__":
401     unittest.main(verbosity=2)