rationalize functional tests calls to service-path
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 class TransportPCEtesting(unittest.TestCase):
20
21     processes = None
22
23     @classmethod
24     def setUpClass(cls):
25         cls.processes = test_utils.start_tpce()
26         cls.processes = test_utils.start_sims(['spdra'])
27
28     @classmethod
29     def tearDownClass(cls):
30         for process in cls.processes:
31             test_utils.shutdown_process(process)
32         print("all processes killed")
33
34     def setUp(self):
35         time.sleep(5)
36
37     def test_01_connect_SPDR_SA1(self):
38         response = test_utils.mount_device("SPDR-SA1", 'spdra')
39         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40         time.sleep(10)
41
42         response = test_utils.get_netconf_oper_request("SPDR-SA1")
43         self.assertEqual(response.status_code, requests.codes.ok)
44         res = response.json()
45         self.assertEqual(
46             res['node'][0]['netconf-node-topology:connection-status'],
47             'connected')
48
49     def test_02_get_portmapping_CLIENT1(self):
50         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT1")
51         self.assertEqual(response.status_code, requests.codes.ok)
52         res = response.json()
53         self.assertIn(
54             {'supported-interface-capability': [
55                 'org-openroadm-port-types:if-10GE-ODU2e',
56                 'org-openroadm-port-types:if-10GE-ODU2',
57                 'org-openroadm-port-types:if-10GE'],
58              'supporting-port': 'CP1-SFP4-P1',
59              'supporting-circuit-pack-name': 'CP1-SFP4',
60              'logical-connection-point': 'XPDR1-CLIENT1',
61              'port-direction': 'bidirectional',
62              'port-qual': 'xpdr-client',
63              'lcp-hash-val': 'FqlcrxV7p30='},
64             res['mapping'])
65
66     def test_03_get_portmapping_NETWORK1(self):
67         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
68         self.assertEqual(response.status_code, requests.codes.ok)
69         res = response.json()
70         self.assertIn(
71             {"logical-connection-point": "XPDR1-NETWORK1",
72              "supporting-port": "CP1-CFP0-P1",
73              "supported-interface-capability": [
74                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
75              ],
76                 "port-direction": "bidirectional",
77                 "port-qual": "xpdr-network",
78                 "supporting-circuit-pack-name": "CP1-CFP0",
79                 "xponder-type": "mpdr",
80              'lcp-hash-val': 'Swfw02qXGyI='},
81             res['mapping'])
82
83     def test_04_service_path_create_OCH_OTU4(self):
84         response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
85                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
86         time.sleep(3)
87         self.assertEqual(response.status_code, requests.codes.ok)
88         res = response.json()
89         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
90         self.assertTrue(res["output"]["success"])
91         self.assertIn(
92             {'node-id': 'SPDR-SA1',
93              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
94              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
95
96     def test_05_get_portmapping_NETWORK1(self):
97         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
98         self.assertEqual(response.status_code, requests.codes.ok)
99         res = response.json()
100         self.assertIn(
101             {"logical-connection-point": "XPDR1-NETWORK1",
102              "supporting-port": "CP1-CFP0-P1",
103              "supported-interface-capability": [
104                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
105              ],
106                 "port-direction": "bidirectional",
107                 "port-qual": "xpdr-network",
108                 "supporting-circuit-pack-name": "CP1-CFP0",
109                 "xponder-type": "mpdr",
110                 "lcp-hash-val": "Swfw02qXGyI="},
111             res['mapping'])
112
113     def test_06_check_interface_och(self):
114         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
115         self.assertEqual(response.status_code, requests.codes.ok)
116         res = response.json()
117
118         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
119                                                           'administrative-state': 'inService',
120                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
121                                                           'type': 'org-openroadm-interfaces:opticalChannel',
122                                                           'supporting-port': 'CP1-CFP0-P1'
123                                                           }),
124                              res['interface'][0])
125
126         self.assertDictEqual(
127             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
128              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
129             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
130
131     def test_07_check_interface_OTU(self):
132         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
133         self.assertEqual(response.status_code, requests.codes.ok)
134         res = response.json()
135         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
136                         'administrative-state': 'inService',
137                         'supporting-circuit-pack-name': 'CP1-CFP0',
138                         'supporting-interface': 'XPDR1-NETWORK1-1',
139                         'type': 'org-openroadm-interfaces:otnOtu',
140                         'supporting-port': 'CP1-CFP0-P1'
141                         }
142
143         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
144                         'expected-sapi': 'Swfw02qXGyI=',
145                         'tx-sapi': 'Swfw02qXGyI=',
146                         'expected-dapi': 'Swfw02qXGyI=',
147                         'rate': 'org-openroadm-otn-common-types:OTU4',
148                         'fec': 'scfec'
149                         }
150
151         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
152                              res['interface'][0])
153
154         self.assertDictEqual(input_dict_2,
155                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
156
157     def test_08_otn_service_path_create_ODU4(self):
158         response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
159                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
160         time.sleep(3)
161         self.assertEqual(response.status_code, requests.codes.ok)
162         res = response.json()
163         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
164         self.assertTrue(res["output"]["success"])
165         self.assertIn(
166             {'node-id': 'SPDR-SA1',
167              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
168
169     def test_09_get_portmapping_NETWORK1(self):
170         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
171         self.assertEqual(response.status_code, requests.codes.ok)
172         res = response.json()
173         self.assertIn(
174             {"logical-connection-point": "XPDR1-NETWORK1",
175              "supporting-port": "CP1-CFP0-P1",
176              "supported-interface-capability": [
177                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
178              ],
179                 "port-direction": "bidirectional",
180                 "port-qual": "xpdr-network",
181                 "supporting-circuit-pack-name": "CP1-CFP0",
182                 "xponder-type": "mpdr",
183                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
184                 "lcp-hash-val": "Swfw02qXGyI="
185              },
186             res['mapping'])
187
188     def test_10_check_interface_ODU4(self):
189         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
190         self.assertEqual(response.status_code, requests.codes.ok)
191         res = response.json()
192         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
193                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
194                         'type': 'org-openroadm-interfaces:otnOdu',
195                         'supporting-port': 'CP1-CFP0-P1'}
196         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
197                         'rate': 'org-openroadm-otn-common-types:ODU4',
198                         'expected-dapi': 'Swfw02qXGyI=',
199                         'expected-sapi': 'Swfw02qXGyI=',
200                         'tx-dapi': 'Swfw02qXGyI=',
201                         'tx-sapi': 'Swfw02qXGyI='}
202
203         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
204                              res['interface'][0])
205         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
206                                   **input_dict_2
207                                   ),
208                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
209                              )
210         self.assertDictEqual(
211             {u'payload-type': u'21', u'exp-payload-type': u'21'},
212             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
213
214     def test_11_otn_service_path_create_10GE(self):
215         response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
216                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
217                                                            "network-tp": "XPDR1-NETWORK1"}],
218                                                        {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
219         time.sleep(3)
220         self.assertEqual(response.status_code, requests.codes.ok)
221         res = response.json()
222         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
223         self.assertTrue(res["output"]["success"])
224         self.assertIn(
225             {'node-id': 'SPDR-SA1',
226              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
227              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
228              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
229
230     def test_12_check_interface_10GE_CLIENT(self):
231         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
232         self.assertEqual(response.status_code, requests.codes.ok)
233         res = response.json()
234         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
235                       'administrative-state': 'inService',
236                       'supporting-circuit-pack-name': 'CP1-SFP4',
237                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
238                       'supporting-port': 'CP1-SFP4-P1'
239                       }
240         self.assertDictEqual(dict(res['interface'][0], **input_dict),
241                              res['interface'][0])
242         self.assertDictEqual(
243             {u'speed': 10000},
244             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
245
246     def test_13_check_interface_ODU2E_CLIENT(self):
247         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
248         self.assertEqual(response.status_code, requests.codes.ok)
249         res = response.json()
250
251         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
252                         'administrative-state': 'inService',
253                         'supporting-circuit-pack-name': 'CP1-SFP4',
254                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
255                         'type': 'org-openroadm-interfaces:otnOdu',
256                         'supporting-port': 'CP1-SFP4-P1'}
257         input_dict_2 = {
258             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
259             'rate': 'org-openroadm-otn-common-types:ODU2e',
260             'monitoring-mode': 'terminated'}
261
262         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
263                              res['interface'][0])
264         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
265                                   **input_dict_2),
266                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
267         self.assertDictEqual(
268             {u'payload-type': u'03', u'exp-payload-type': u'03'},
269             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
270
271     def test_14_check_interface_ODU2E_NETWORK(self):
272         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
273         self.assertEqual(response.status_code, requests.codes.ok)
274         res = response.json()
275         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
276                         'supporting-circuit-pack-name': 'CP1-CFP0',
277                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
278                         'type': 'org-openroadm-interfaces:otnOdu',
279                         'supporting-port': 'CP1-CFP0-P1'}
280         input_dict_2 = {
281             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
282             'rate': 'org-openroadm-otn-common-types:ODU2e',
283             'monitoring-mode': 'monitored'}
284
285         input_dict_3 = {'trib-port-number': 1}
286
287         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
288                              res['interface'][0])
289         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
290                                   **input_dict_2),
291                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
292         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
293             'parent-odu-allocation'], **input_dict_3
294         ),
295             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
296             'parent-odu-allocation'])
297         self.assertIn(1,
298                       res['interface'][0][
299                           'org-openroadm-otn-odu-interfaces:odu'][
300                           'parent-odu-allocation']['trib-slots'])
301
302     def test_15_check_ODU2E_connection(self):
303         response = test_utils.check_netconf_node_request(
304             "SPDR-SA1",
305             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
306         self.assertEqual(response.status_code, requests.codes.ok)
307         res = response.json()
308         input_dict_1 = {
309             'connection-name':
310             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
311             'direction': 'bidirectional'
312         }
313
314         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
315                              res['odu-connection'][0])
316         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
317                              res['odu-connection'][0]['destination'])
318         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
319                              res['odu-connection'][0]['source'])
320
321     def test_16_otn_service_path_delete_10GE(self):
322         response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
323                                                        [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
324                                                            "network-tp": "XPDR1-NETWORK1"}],
325                                                        {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
326         time.sleep(3)
327         self.assertEqual(response.status_code, requests.codes.ok)
328         res = response.json()
329         self.assertIn('Request processed', res["output"]["result"])
330         self.assertTrue(res["output"]["success"])
331
332     def test_17_check_no_ODU2E_connection(self):
333         response = test_utils.check_netconf_node_request(
334             "SPDR-SA1",
335             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
336         self.assertEqual(response.status_code, requests.codes.not_found)
337
338     def test_18_check_no_interface_ODU2E_NETWORK(self):
339         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
340         self.assertEqual(response.status_code, requests.codes.not_found)
341
342     def test_19_check_no_interface_ODU2E_CLIENT(self):
343         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
344         self.assertEqual(response.status_code, requests.codes.not_found)
345
346     def test_20_check_no_interface_10GE_CLIENT(self):
347         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
348         self.assertEqual(response.status_code, requests.codes.not_found)
349
350     def test_21_otn_service_path_delete_ODU4(self):
351         response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
352                                                        [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
353         time.sleep(3)
354         self.assertEqual(response.status_code, requests.codes.ok)
355         res = response.json()
356         self.assertIn('Request processed', res["output"]["result"])
357         self.assertTrue(res["output"]["success"])
358
359     def test_22_check_no_interface_ODU4(self):
360         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
361         self.assertEqual(response.status_code, requests.codes.not_found)
362
363     def test_23_service_path_delete_OCH_OTU4(self):
364         response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
365                                                    [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
366         time.sleep(3)
367         self.assertEqual(response.status_code, requests.codes.ok)
368         res = response.json()
369         self.assertIn('Request processed', res["output"]["result"])
370         self.assertTrue(res["output"]["success"])
371
372     def test_24_check_no_interface_OTU4(self):
373         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
374         self.assertEqual(response.status_code, requests.codes.not_found)
375
376     def test_25_check_no_interface_OCH(self):
377         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
378         self.assertEqual(response.status_code, requests.codes.not_found)
379
380     def test_26_disconnect_SPDR_SA1(self):
381         response = test_utils.unmount_device("SPDR-SA1")
382         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
383
384
385 if __name__ == "__main__":
386     unittest.main(verbosity=2)