run autopep8 on functional tests
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 class TransportPCEtesting(unittest.TestCase):
20
21     processes = None
22
23     @classmethod
24     def setUpClass(cls):
25         cls.processes = test_utils.start_tpce()
26         cls.processes = test_utils.start_sims(['spdra'])
27
28     @classmethod
29     def tearDownClass(cls):
30         for process in cls.processes:
31             test_utils.shutdown_process(process)
32         print("all processes killed")
33
34     def setUp(self):
35         time.sleep(5)
36
37     def test_01_connect_SPDR_SA1(self):
38         response = test_utils.mount_device("SPDR-SA1", 'spdra')
39         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40         time.sleep(10)
41
42         response = test_utils.get_netconf_oper_request("SPDR-SA1")
43         self.assertEqual(response.status_code, requests.codes.ok)
44         res = response.json()
45         self.assertEqual(
46             res['node'][0]['netconf-node-topology:connection-status'],
47             'connected')
48
49     def test_02_get_portmapping_CLIENT4(self):
50         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT4")
51         self.assertEqual(response.status_code, requests.codes.ok)
52         res = response.json()
53         self.assertEqual('CP1-SFP4-P1', res['mapping'][0]['supporting-port'])
54         self.assertEqual('CP1-SFP4', res['mapping'][0]['supporting-circuit-pack-name'])
55         self.assertEqual('XPDR1-CLIENT4', res['mapping'][0]['logical-connection-point'])
56         self.assertEqual('bidirectional', res['mapping'][0]['port-direction'])
57         self.assertEqual('xpdr-client', res['mapping'][0]['port-qual'])
58         self.assertEqual('FqlcrxV7p3g=', res['mapping'][0]['lcp-hash-val'])
59         self.assertIn('org-openroadm-port-types:if-10GE-ODU2e', res['mapping'][0]['supported-interface-capability'])
60         self.assertIn('org-openroadm-port-types:if-10GE-ODU2', res['mapping'][0]['supported-interface-capability'])
61         self.assertIn('org-openroadm-port-types:if-10GE', res['mapping'][0]['supported-interface-capability'])
62
63     def test_03_get_portmapping_NETWORK1(self):
64         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
65         self.assertEqual(response.status_code, requests.codes.ok)
66         res = response.json()
67         self.assertIn(
68             {"logical-connection-point": "XPDR1-NETWORK1",
69              "supporting-port": "CP1-CFP0-P1",
70              "supported-interface-capability": [
71                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
72              ],
73                 "port-direction": "bidirectional",
74                 "port-qual": "xpdr-network",
75                 "supporting-circuit-pack-name": "CP1-CFP0",
76                 "xponder-type": "mpdr",
77              'lcp-hash-val': 'Swfw02qXGyI='},
78             res['mapping'])
79
80     def test_04_service_path_create_OCH_OTU4(self):
81         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
82                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
83         time.sleep(3)
84         self.assertEqual(response.status_code, requests.codes.ok)
85         res = response.json()
86         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
87         self.assertTrue(res["output"]["success"])
88         self.assertIn(
89             {'node-id': 'SPDR-SA1',
90              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
91              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
92
93     def test_05_get_portmapping_NETWORK1(self):
94         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
95         self.assertEqual(response.status_code, requests.codes.ok)
96         res = response.json()
97         self.assertIn(
98             {"logical-connection-point": "XPDR1-NETWORK1",
99              "supporting-port": "CP1-CFP0-P1",
100              "supported-interface-capability": [
101                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
102              ],
103                 "port-direction": "bidirectional",
104                 "port-qual": "xpdr-network",
105                 "supporting-circuit-pack-name": "CP1-CFP0",
106                 "xponder-type": "mpdr",
107                 "lcp-hash-val": "Swfw02qXGyI="},
108             res['mapping'])
109
110     def test_06_check_interface_och(self):
111         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
112         self.assertEqual(response.status_code, requests.codes.ok)
113         res = response.json()
114
115         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
116                                                           'administrative-state': 'inService',
117                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
118                                                           'type': 'org-openroadm-interfaces:opticalChannel',
119                                                           'supporting-port': 'CP1-CFP0-P1'
120                                                           }),
121                              res['interface'][0])
122
123         self.assertDictEqual(
124             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
125              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
126             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
127
128     def test_07_check_interface_OTU(self):
129         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
130         self.assertEqual(response.status_code, requests.codes.ok)
131         res = response.json()
132         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
133                         'administrative-state': 'inService',
134                         'supporting-circuit-pack-name': 'CP1-CFP0',
135                         'supporting-interface': 'XPDR1-NETWORK1-1',
136                         'type': 'org-openroadm-interfaces:otnOtu',
137                         'supporting-port': 'CP1-CFP0-P1'
138                         }
139
140         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
141                         'expected-sapi': 'Swfw02qXGyI=',
142                         'tx-sapi': 'Swfw02qXGyI=',
143                         'expected-dapi': 'Swfw02qXGyI=',
144                         'rate': 'org-openroadm-otn-common-types:OTU4',
145                         'fec': 'scfec'
146                         }
147
148         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
149                              res['interface'][0])
150
151         self.assertDictEqual(input_dict_2,
152                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
153
154     def test_08_otn_service_path_create_ODU4(self):
155         response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
156                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
157         time.sleep(3)
158         self.assertEqual(response.status_code, requests.codes.ok)
159         res = response.json()
160         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
161         self.assertTrue(res["output"]["success"])
162         self.assertIn(
163             {'node-id': 'SPDR-SA1',
164              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
165
166     def test_09_get_portmapping_NETWORK1(self):
167         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
168         self.assertEqual(response.status_code, requests.codes.ok)
169         res = response.json()
170         self.assertIn(
171             {"logical-connection-point": "XPDR1-NETWORK1",
172              "supporting-port": "CP1-CFP0-P1",
173              "supported-interface-capability": [
174                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
175              ],
176                 "port-direction": "bidirectional",
177                 "port-qual": "xpdr-network",
178                 "supporting-circuit-pack-name": "CP1-CFP0",
179                 "xponder-type": "mpdr",
180                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
181                 "lcp-hash-val": "Swfw02qXGyI="
182              },
183             res['mapping'])
184
185     def test_10_check_interface_ODU4(self):
186         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
187         self.assertEqual(response.status_code, requests.codes.ok)
188         res = response.json()
189         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
190                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
191                         'type': 'org-openroadm-interfaces:otnOdu',
192                         'supporting-port': 'CP1-CFP0-P1'}
193         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
194                         'rate': 'org-openroadm-otn-common-types:ODU4',
195                         'expected-dapi': 'Swfw02qXGyI=',
196                         'expected-sapi': 'Swfw02qXGyI=',
197                         'tx-dapi': 'Swfw02qXGyI=',
198                         'tx-sapi': 'Swfw02qXGyI='}
199
200         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
201                              res['interface'][0])
202         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
203                                   **input_dict_2
204                                   ),
205                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
206                              )
207         self.assertDictEqual(
208             {u'payload-type': u'21', u'exp-payload-type': u'21'},
209             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
210
211     def test_11_otn_service_path_create_10GE(self):
212         response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
213                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
214                                                            "network-tp": "XPDR1-NETWORK1"}],
215                                                        {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
216         time.sleep(3)
217         self.assertEqual(response.status_code, requests.codes.ok)
218         res = response.json()
219         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
220         self.assertTrue(res["output"]["success"])
221         self.assertEqual('SPDR-SA1', res["output"]['node-interface'][0]['node-id'])
222         self.assertIn('XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
223                       res["output"]['node-interface'][0]['connection-id'])
224         self.assertIn('XPDR1-CLIENT4-ETHERNET10G', res["output"]['node-interface'][0]['eth-interface-id'])
225         self.assertIn('XPDR1-NETWORK1-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
226         self.assertIn('XPDR1-CLIENT4-ODU2e-service1', res["output"]['node-interface'][0]['odu-interface-id'])
227
228     def test_12_check_interface_10GE_CLIENT(self):
229         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
230         self.assertEqual(response.status_code, requests.codes.ok)
231         res = response.json()
232         input_dict = {'name': 'XPDR1-CLIENT4-ETHERNET10G',
233                       'administrative-state': 'inService',
234                       'supporting-circuit-pack-name': 'CP1-SFP4',
235                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
236                       'supporting-port': 'CP1-SFP4-P1'
237                       }
238         self.assertDictEqual(dict(res['interface'][0], **input_dict),
239                              res['interface'][0])
240         self.assertDictEqual(
241             {u'speed': 10000},
242             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
243
244     def test_13_check_interface_ODU2E_CLIENT(self):
245         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
246         self.assertEqual(response.status_code, requests.codes.ok)
247         res = response.json()
248
249         input_dict_1 = {'name': 'XPDR1-CLIENT4-ODU2e-service1',
250                         'administrative-state': 'inService',
251                         'supporting-circuit-pack-name': 'CP1-SFP4',
252                         'supporting-interface': 'XPDR1-CLIENT4-ETHERNET10G',
253                         'type': 'org-openroadm-interfaces:otnOdu',
254                         'supporting-port': 'CP1-SFP4-P1'}
255         input_dict_2 = {
256             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
257             'rate': 'org-openroadm-otn-common-types:ODU2e',
258             'monitoring-mode': 'terminated'}
259
260         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
261                              res['interface'][0])
262         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
263                                   **input_dict_2),
264                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
265         self.assertDictEqual(
266             {u'payload-type': u'03', u'exp-payload-type': u'03'},
267             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
268
269     def test_14_check_interface_ODU2E_NETWORK(self):
270         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
271         self.assertEqual(response.status_code, requests.codes.ok)
272         res = response.json()
273         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
274                         'supporting-circuit-pack-name': 'CP1-CFP0',
275                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
276                         'type': 'org-openroadm-interfaces:otnOdu',
277                         'supporting-port': 'CP1-CFP0-P1'}
278         input_dict_2 = {
279             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
280             'rate': 'org-openroadm-otn-common-types:ODU2e',
281             'monitoring-mode': 'monitored'}
282
283         input_dict_3 = {'trib-port-number': 1}
284
285         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
286                              res['interface'][0])
287         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
288                                   **input_dict_2),
289                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
290         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
291             'parent-odu-allocation'], **input_dict_3
292         ),
293             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
294             'parent-odu-allocation'])
295         self.assertIn(1,
296                       res['interface'][0][
297                           'org-openroadm-otn-odu-interfaces:odu'][
298                           'parent-odu-allocation']['trib-slots'])
299
300     def test_15_check_ODU2E_connection(self):
301         response = test_utils.check_netconf_node_request(
302             "SPDR-SA1",
303             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
304         self.assertEqual(response.status_code, requests.codes.ok)
305         res = response.json()
306         input_dict_1 = {
307             'connection-name':
308             'XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
309             'direction': 'bidirectional'
310         }
311
312         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
313                              res['odu-connection'][0])
314         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
315                              res['odu-connection'][0]['destination'])
316         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT4-ODU2e-service1'},
317                              res['odu-connection'][0]['source'])
318
319     def test_16_otn_service_path_delete_10GE(self):
320         response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
321                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT4",
322                                                            "network-tp": "XPDR1-NETWORK1"}],
323                                                        {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
324         time.sleep(3)
325         self.assertEqual(response.status_code, requests.codes.ok)
326         res = response.json()
327         self.assertIn('Request processed', res["output"]["result"])
328         self.assertTrue(res["output"]["success"])
329
330     def test_17_check_no_ODU2E_connection(self):
331         response = test_utils.check_netconf_node_request(
332             "SPDR-SA1",
333             "odu-connection/XPDR1-CLIENT4-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
334         self.assertEqual(response.status_code, requests.codes.conflict)
335
336     def test_18_check_no_interface_ODU2E_NETWORK(self):
337         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
338         self.assertEqual(response.status_code, requests.codes.conflict)
339
340     def test_19_check_no_interface_ODU2E_CLIENT(self):
341         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ODU2e-service1")
342         self.assertEqual(response.status_code, requests.codes.conflict)
343
344     def test_20_check_no_interface_10GE_CLIENT(self):
345         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT4-ETHERNET10G")
346         self.assertEqual(response.status_code, requests.codes.conflict)
347
348     def test_21_otn_service_path_delete_ODU4(self):
349         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
350                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
351         time.sleep(3)
352         self.assertEqual(response.status_code, requests.codes.ok)
353         res = response.json()
354         self.assertIn('Request processed', res["output"]["result"])
355         self.assertTrue(res["output"]["success"])
356
357     def test_22_check_no_interface_ODU4(self):
358         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
359         self.assertEqual(response.status_code, requests.codes.conflict)
360
361     def test_23_service_path_delete_OCH_OTU4(self):
362         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
363                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
364         time.sleep(3)
365         self.assertEqual(response.status_code, requests.codes.ok)
366         res = response.json()
367         self.assertIn('Request processed', res["output"]["result"])
368         self.assertTrue(res["output"]["success"])
369
370     def test_24_check_no_interface_OTU4(self):
371         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
372         self.assertEqual(response.status_code, requests.codes.conflict)
373
374     def test_25_check_no_interface_OCH(self):
375         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
376         self.assertEqual(response.status_code, requests.codes.conflict)
377
378     def test_26_disconnect_SPDR_SA1(self):
379         response = test_utils.unmount_device("SPDR-SA1")
380         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
381
382
383 if __name__ == "__main__":
384     unittest.main(verbosity=2)