add a method to check nodes configs in func tests
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 class TransportPCEtesting(unittest.TestCase):
20
21     processes = None
22
23     @classmethod
24     def setUpClass(cls):
25         cls.processes = test_utils.start_tpce()
26         cls.processes = test_utils.start_sims(['spdra'])
27
28     @classmethod
29     def tearDownClass(cls):
30         for process in cls.processes:
31             test_utils.shutdown_process(process)
32         print("all processes killed")
33
34     def setUp(self):
35         time.sleep(5)
36
37     def test_01_connect_SPDR_SA1(self):
38         response = test_utils.mount_device("SPDR-SA1", 'spdra')
39         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40         time.sleep(10)
41
42         url = "{}/operational/network-topology:network-topology/topology/topology-netconf/node/SPDR-SA1"
43         response = test_utils.get_request(url)
44         self.assertEqual(response.status_code, requests.codes.ok)
45         res = response.json()
46         self.assertEqual(
47             res['node'][0]['netconf-node-topology:connection-status'],
48             'connected')
49
50     def test_02_get_portmapping_CLIENT1(self):
51         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
52         response = test_utils.get_request(url)
53         self.assertEqual(response.status_code, requests.codes.ok)
54         res = response.json()
55         self.assertIn(
56             {'supported-interface-capability': [
57                 'org-openroadm-port-types:if-10GE-ODU2e',
58                 'org-openroadm-port-types:if-10GE-ODU2',
59                 'org-openroadm-port-types:if-10GE'],
60              'supporting-port': 'CP1-SFP4-P1',
61              'supporting-circuit-pack-name': 'CP1-SFP4',
62              'logical-connection-point': 'XPDR1-CLIENT1',
63              'port-direction': 'bidirectional',
64              'port-qual': 'xpdr-client',
65              'lcp-hash-val': 'FqlcrxV7p30='},
66             res['mapping'])
67
68     def test_03_get_portmapping_NETWORK1(self):
69         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
70         response = test_utils.get_request(url)
71         self.assertEqual(response.status_code, requests.codes.ok)
72         res = response.json()
73         self.assertIn(
74             {"logical-connection-point": "XPDR1-NETWORK1",
75              "supporting-port": "CP1-CFP0-P1",
76              "supported-interface-capability": [
77                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
78              ],
79                 "port-direction": "bidirectional",
80                 "port-qual": "xpdr-network",
81                 "supporting-circuit-pack-name": "CP1-CFP0",
82                 "xponder-type": "mpdr",
83              'lcp-hash-val': 'Swfw02qXGyI='},
84             res['mapping'])
85
86     def test_04_service_path_create_OCH_OTU4(self):
87         url = "{}/operations/transportpce-device-renderer:service-path"
88         data = {"renderer:input": {
89             "service-name": "service_ODU4",
90             "wave-number": "1",
91             "modulation-format": "qpsk",
92             "operation": "create",
93             "nodes": [
94                 {"node-id": "SPDR-SA1",
95                  "dest-tp": "XPDR1-NETWORK1"}]}}
96         response = test_utils.post_request(url, data)
97         time.sleep(3)
98         self.assertEqual(response.status_code, requests.codes.ok)
99         res = response.json()
100         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
101         self.assertTrue(res["output"]["success"])
102         self.assertIn(
103             {'node-id': 'SPDR-SA1',
104              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
105              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
106
107     def test_05_get_portmapping_NETWORK1(self):
108         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
109         response = test_utils.get_request(url)
110         self.assertEqual(response.status_code, requests.codes.ok)
111         res = response.json()
112         self.assertIn(
113             {"logical-connection-point": "XPDR1-NETWORK1",
114              "supporting-port": "CP1-CFP0-P1",
115              "supported-interface-capability": [
116                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
117              ],
118                 "port-direction": "bidirectional",
119                 "port-qual": "xpdr-network",
120                 "supporting-circuit-pack-name": "CP1-CFP0",
121                 "xponder-type": "mpdr",
122                 "lcp-hash-val": "Swfw02qXGyI="},
123             res['mapping'])
124
125     def test_06_check_interface_och(self):
126         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
127         self.assertEqual(response.status_code, requests.codes.ok)
128         res = response.json()
129
130         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
131                                                           'administrative-state': 'inService',
132                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
133                                                           'type': 'org-openroadm-interfaces:opticalChannel',
134                                                           'supporting-port': 'CP1-CFP0-P1'
135                                                           }),
136                              res['interface'][0])
137
138         self.assertDictEqual(
139             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
140              u'transmit-power': -5},
141             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
142
143     def test_07_check_interface_OTU(self):
144         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
145         self.assertEqual(response.status_code, requests.codes.ok)
146         res = response.json()
147         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
148                         'administrative-state': 'inService',
149                         'supporting-circuit-pack-name': 'CP1-CFP0',
150                         'supporting-interface': 'XPDR1-NETWORK1-1',
151                         'type': 'org-openroadm-interfaces:otnOtu',
152                         'supporting-port': 'CP1-CFP0-P1'
153                         }
154
155         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
156                         'expected-sapi': 'Swfw02qXGyI=',
157                         'tx-sapi': 'Swfw02qXGyI=',
158                         'expected-dapi': 'Swfw02qXGyI=',
159                         'rate': 'org-openroadm-otn-common-types:OTU4',
160                         'fec': 'scfec'
161                         }
162
163         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
164                              res['interface'][0])
165
166         self.assertDictEqual(input_dict_2,
167                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
168
169     def test_08_otn_service_path_create_ODU4(self):
170         url = "{}/operations/transportpce-device-renderer:otn-service-path"
171         data = {"renderer:input": {
172             "service-name": "service_ODU4",
173             "operation": "create",
174             "service-rate": "100G",
175             "service-type": "ODU",
176             "nodes": [
177                 {"node-id": "SPDR-SA1",
178                  "network-tp": "XPDR1-NETWORK1"}]}}
179         response = test_utils.post_request(url, data)
180         time.sleep(3)
181         self.assertEqual(response.status_code, requests.codes.ok)
182         res = response.json()
183         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
184         self.assertTrue(res["output"]["success"])
185         self.assertIn(
186             {'node-id': 'SPDR-SA1',
187              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
188
189     def test_09_get_portmapping_NETWORK1(self):
190         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
191         response = test_utils.get_request(url)
192         self.assertEqual(response.status_code, requests.codes.ok)
193         res = response.json()
194         self.assertIn(
195             {"logical-connection-point": "XPDR1-NETWORK1",
196              "supporting-port": "CP1-CFP0-P1",
197              "supported-interface-capability": [
198                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
199              ],
200                 "port-direction": "bidirectional",
201                 "port-qual": "xpdr-network",
202                 "supporting-circuit-pack-name": "CP1-CFP0",
203                 "xponder-type": "mpdr",
204                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
205                 "lcp-hash-val": "Swfw02qXGyI="
206              },
207             res['mapping'])
208
209     def test_10_check_interface_ODU4(self):
210         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
211         self.assertEqual(response.status_code, requests.codes.ok)
212         res = response.json()
213         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
214                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
215                         'type': 'org-openroadm-interfaces:otnOdu',
216                         'supporting-port': 'CP1-CFP0-P1'}
217         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
218                         'rate': 'org-openroadm-otn-common-types:ODU4',
219                         'expected-dapi': 'Swfw02qXGyI=',
220                         'expected-sapi': 'Swfw02qXGyI=',
221                         'tx-dapi': 'Swfw02qXGyI=',
222                         'tx-sapi': 'Swfw02qXGyI='}
223
224         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
225                              res['interface'][0])
226         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
227                                   **input_dict_2
228                                   ),
229                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
230                              )
231         self.assertDictEqual(
232             {u'payload-type': u'21', u'exp-payload-type': u'21'},
233             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
234
235     def test_11_otn_service_path_create_10GE(self):
236         url = "{}/operations/transportpce-device-renderer:otn-service-path"
237         data = {"renderer:input": {
238             "service-name": "service1",
239             "operation": "create",
240             "service-rate": "10G",
241             "service-type": "Ethernet",
242             "ethernet-encoding": "eth encode",
243             "trib-slot": ["1"],
244             "trib-port-number": "1",
245             "nodes": [
246                 {"node-id": "SPDR-SA1",
247                  "client-tp": "XPDR1-CLIENT1",
248                  "network-tp": "XPDR1-NETWORK1"}]}}
249         response = test_utils.post_request(url, data)
250         time.sleep(3)
251         self.assertEqual(response.status_code, requests.codes.ok)
252         res = response.json()
253         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
254         self.assertTrue(res["output"]["success"])
255         self.assertIn(
256             {'node-id': 'SPDR-SA1',
257              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
258              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
259              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
260
261     def test_12_check_interface_10GE_CLIENT(self):
262         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
266                       'administrative-state': 'inService',
267                       'supporting-circuit-pack-name': 'CP1-SFP4',
268                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
269                       'supporting-port': 'CP1-SFP4-P1'
270                       }
271         self.assertDictEqual(dict(res['interface'][0], **input_dict),
272                              res['interface'][0])
273         self.assertDictEqual(
274             {u'speed': 10000},
275             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
276
277     def test_13_check_interface_ODU2E_CLIENT(self):
278         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
279         self.assertEqual(response.status_code, requests.codes.ok)
280         res = response.json()
281
282         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
283                         'administrative-state': 'inService',
284                         'supporting-circuit-pack-name': 'CP1-SFP4',
285                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
286                         'type': 'org-openroadm-interfaces:otnOdu',
287                         'supporting-port': 'CP1-SFP4-P1'}
288         input_dict_2 = {
289             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
290             'rate': 'org-openroadm-otn-common-types:ODU2e',
291             'monitoring-mode': 'terminated'}
292
293         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
294                              res['interface'][0])
295         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
296                                   **input_dict_2),
297                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
298         self.assertDictEqual(
299             {u'payload-type': u'03', u'exp-payload-type': u'03'},
300             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
301
302     def test_14_check_interface_ODU2E_NETWORK(self):
303         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
304         self.assertEqual(response.status_code, requests.codes.ok)
305         res = response.json()
306         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
307                         'supporting-circuit-pack-name': 'CP1-CFP0',
308                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
309                         'type': 'org-openroadm-interfaces:otnOdu',
310                         'supporting-port': 'CP1-CFP0-P1'}
311         input_dict_2 = {
312             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
313             'rate': 'org-openroadm-otn-common-types:ODU2e',
314             'monitoring-mode': 'monitored'}
315
316         input_dict_3 = {'trib-port-number': 1}
317
318         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
319                              res['interface'][0])
320         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
321                                   **input_dict_2),
322                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
323         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
324             'parent-odu-allocation'], **input_dict_3
325         ),
326             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
327             'parent-odu-allocation'])
328         self.assertIn(1,
329                       res['interface'][0][
330                           'org-openroadm-otn-odu-interfaces:odu'][
331                           'parent-odu-allocation']['trib-slots'])
332
333     def test_15_check_ODU2E_connection(self):
334         response = test_utils.check_netconf_node_request(
335             "SPDR-SA1",
336             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
337         self.assertEqual(response.status_code, requests.codes.ok)
338         res = response.json()
339         input_dict_1 = {
340             'connection-name':
341             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
342             'direction': 'bidirectional'
343         }
344
345         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
346                              res['odu-connection'][0])
347         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
348                              res['odu-connection'][0]['destination'])
349         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
350                              res['odu-connection'][0]['source'])
351
352     def test_16_otn_service_path_delete_10GE(self):
353         url = "{}/operations/transportpce-device-renderer:otn-service-path"
354         data = {"renderer:input": {
355             "service-name": "service1",
356             "operation": "delete",
357             "service-rate": "10G",
358             "service-type": "Ethernet",
359             "ethernet-encoding": "eth encode",
360             "trib-slot": ["1"],
361             "trib-port-number": "1",
362             "nodes": [
363                 {"node-id": "SPDR-SA1",
364                  "client-tp": "XPDR1-CLIENT1",
365                  "network-tp": "XPDR1-NETWORK1"}]}}
366         response = test_utils.post_request(url, data)
367         time.sleep(3)
368         self.assertEqual(response.status_code, requests.codes.ok)
369         res = response.json()
370         self.assertIn('Request processed', res["output"]["result"])
371         self.assertTrue(res["output"]["success"])
372
373     def test_17_check_no_ODU2E_connection(self):
374         response = test_utils.check_netconf_node_request(
375             "SPDR-SA1",
376             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
377         self.assertEqual(response.status_code, requests.codes.not_found)
378
379     def test_18_check_no_interface_ODU2E_NETWORK(self):
380         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
381         self.assertEqual(response.status_code, requests.codes.not_found)
382
383     def test_19_check_no_interface_ODU2E_CLIENT(self):
384         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
385         self.assertEqual(response.status_code, requests.codes.not_found)
386
387     def test_20_check_no_interface_10GE_CLIENT(self):
388         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
389         self.assertEqual(response.status_code, requests.codes.not_found)
390
391     def test_21_otn_service_path_delete_ODU4(self):
392         url = "{}/operations/transportpce-device-renderer:otn-service-path"
393         data = {"renderer:input": {
394             "service-name": "service_ODU4",
395             "operation": "delete",
396             "service-rate": "100G",
397             "service-type": "ODU",
398             "nodes": [
399                 {"node-id": "SPDR-SA1",
400                  "network-tp": "XPDR1-NETWORK1"}]}}
401         response = test_utils.post_request(url, data)
402         time.sleep(3)
403         self.assertEqual(response.status_code, requests.codes.ok)
404         res = response.json()
405         self.assertIn('Request processed', res["output"]["result"])
406         self.assertTrue(res["output"]["success"])
407
408     def test_22_check_no_interface_ODU4(self):
409         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
410         self.assertEqual(response.status_code, requests.codes.not_found)
411
412     def test_23_service_path_delete_OCH_OTU4(self):
413         url = "{}/operations/transportpce-device-renderer:service-path"
414         data = {"renderer:input": {
415             "service-name": "service_OTU4",
416             "wave-number": "1",
417             "modulation-format": "qpsk",
418             "operation": "delete",
419             "nodes": [
420                 {"node-id": "SPDR-SA1",
421                  "dest-tp": "XPDR1-NETWORK1"}]}}
422         response = test_utils.post_request(url, data)
423         time.sleep(3)
424         self.assertEqual(response.status_code, requests.codes.ok)
425         res = response.json()
426         self.assertIn('Request processed', res["output"]["result"])
427         self.assertTrue(res["output"]["success"])
428
429     def test_24_check_no_interface_OTU4(self):
430         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
431         self.assertEqual(response.status_code, requests.codes.not_found)
432
433     def test_25_check_no_interface_OCH(self):
434         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
435         self.assertEqual(response.status_code, requests.codes.not_found)
436
437     def test_26_disconnect_SPDR_SA1(self):
438         response = test_utils.unmount_device("SPDR-SA1")
439         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
440
441
442 if __name__ == "__main__":
443     unittest.main(verbosity=2)