Merge "Fix service-name in otn-renderer FuncTest"
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 class TransportPCEtesting(unittest.TestCase):
20
21     processes = None
22
23     @classmethod
24     def setUpClass(cls):
25         cls.processes = test_utils.start_tpce()
26         cls.processes = test_utils.start_sims(['spdra'])
27
28     @classmethod
29     def tearDownClass(cls):
30         for process in cls.processes:
31             test_utils.shutdown_process(process)
32         print("all processes killed")
33
34     def setUp(self):
35         time.sleep(5)
36
37     def test_01_connect_SPDR_SA1(self):
38         response = test_utils.mount_device("SPDR-SA1", 'spdra')
39         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40         time.sleep(10)
41
42         response = test_utils.get_netconf_oper_request("SPDR-SA1")
43         self.assertEqual(response.status_code, requests.codes.ok)
44         res = response.json()
45         self.assertEqual(
46             res['node'][0]['netconf-node-topology:connection-status'],
47             'connected')
48
49     def test_02_get_portmapping_CLIENT1(self):
50         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
51         response = test_utils.get_request(url)
52         self.assertEqual(response.status_code, requests.codes.ok)
53         res = response.json()
54         self.assertIn(
55             {'supported-interface-capability': [
56                 'org-openroadm-port-types:if-10GE-ODU2e',
57                 'org-openroadm-port-types:if-10GE-ODU2',
58                 'org-openroadm-port-types:if-10GE'],
59              'supporting-port': 'CP1-SFP4-P1',
60              'supporting-circuit-pack-name': 'CP1-SFP4',
61              'logical-connection-point': 'XPDR1-CLIENT1',
62              'port-direction': 'bidirectional',
63              'port-qual': 'xpdr-client',
64              'lcp-hash-val': 'FqlcrxV7p30='},
65             res['mapping'])
66
67     def test_03_get_portmapping_NETWORK1(self):
68         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
69         response = test_utils.get_request(url)
70         self.assertEqual(response.status_code, requests.codes.ok)
71         res = response.json()
72         self.assertIn(
73             {"logical-connection-point": "XPDR1-NETWORK1",
74              "supporting-port": "CP1-CFP0-P1",
75              "supported-interface-capability": [
76                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
77              ],
78                 "port-direction": "bidirectional",
79                 "port-qual": "xpdr-network",
80                 "supporting-circuit-pack-name": "CP1-CFP0",
81                 "xponder-type": "mpdr",
82              'lcp-hash-val': 'Swfw02qXGyI='},
83             res['mapping'])
84
85     def test_04_service_path_create_OCH_OTU4(self):
86         url = "{}/operations/transportpce-device-renderer:service-path"
87         data = {"renderer:input": {
88             "service-name": "service_OCH_OTU4",
89             "wave-number": "1",
90             "modulation-format": "qpsk",
91             "operation": "create",
92             "nodes": [
93                 {"node-id": "SPDR-SA1",
94                  "dest-tp": "XPDR1-NETWORK1"}]}}
95         response = test_utils.post_request(url, data)
96         time.sleep(3)
97         self.assertEqual(response.status_code, requests.codes.ok)
98         res = response.json()
99         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
100         self.assertTrue(res["output"]["success"])
101         self.assertIn(
102             {'node-id': 'SPDR-SA1',
103              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
104              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
105
106     def test_05_get_portmapping_NETWORK1(self):
107         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
108         response = test_utils.get_request(url)
109         self.assertEqual(response.status_code, requests.codes.ok)
110         res = response.json()
111         self.assertIn(
112             {"logical-connection-point": "XPDR1-NETWORK1",
113              "supporting-port": "CP1-CFP0-P1",
114              "supported-interface-capability": [
115                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
116              ],
117                 "port-direction": "bidirectional",
118                 "port-qual": "xpdr-network",
119                 "supporting-circuit-pack-name": "CP1-CFP0",
120                 "xponder-type": "mpdr",
121                 "lcp-hash-val": "Swfw02qXGyI="},
122             res['mapping'])
123
124     def test_06_check_interface_och(self):
125         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
126         self.assertEqual(response.status_code, requests.codes.ok)
127         res = response.json()
128
129         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
130                                                           'administrative-state': 'inService',
131                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
132                                                           'type': 'org-openroadm-interfaces:opticalChannel',
133                                                           'supporting-port': 'CP1-CFP0-P1'
134                                                           }),
135                              res['interface'][0])
136
137         self.assertDictEqual(
138             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
139              u'transmit-power': -5},
140             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
141
142     def test_07_check_interface_OTU(self):
143         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
144         self.assertEqual(response.status_code, requests.codes.ok)
145         res = response.json()
146         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
147                         'administrative-state': 'inService',
148                         'supporting-circuit-pack-name': 'CP1-CFP0',
149                         'supporting-interface': 'XPDR1-NETWORK1-1',
150                         'type': 'org-openroadm-interfaces:otnOtu',
151                         'supporting-port': 'CP1-CFP0-P1'
152                         }
153
154         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
155                         'expected-sapi': 'Swfw02qXGyI=',
156                         'tx-sapi': 'Swfw02qXGyI=',
157                         'expected-dapi': 'Swfw02qXGyI=',
158                         'rate': 'org-openroadm-otn-common-types:OTU4',
159                         'fec': 'scfec'
160                         }
161
162         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
163                              res['interface'][0])
164
165         self.assertDictEqual(input_dict_2,
166                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
167
168     def test_08_otn_service_path_create_ODU4(self):
169         url = "{}/operations/transportpce-device-renderer:otn-service-path"
170         data = {"renderer:input": {
171             "service-name": "service_ODU4",
172             "operation": "create",
173             "service-rate": "100G",
174             "service-type": "ODU",
175             "nodes": [
176                 {"node-id": "SPDR-SA1",
177                  "network-tp": "XPDR1-NETWORK1"}]}}
178         response = test_utils.post_request(url, data)
179         time.sleep(3)
180         self.assertEqual(response.status_code, requests.codes.ok)
181         res = response.json()
182         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
183         self.assertTrue(res["output"]["success"])
184         self.assertIn(
185             {'node-id': 'SPDR-SA1',
186              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
187
188     def test_09_get_portmapping_NETWORK1(self):
189         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
190         response = test_utils.get_request(url)
191         self.assertEqual(response.status_code, requests.codes.ok)
192         res = response.json()
193         self.assertIn(
194             {"logical-connection-point": "XPDR1-NETWORK1",
195              "supporting-port": "CP1-CFP0-P1",
196              "supported-interface-capability": [
197                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
198              ],
199                 "port-direction": "bidirectional",
200                 "port-qual": "xpdr-network",
201                 "supporting-circuit-pack-name": "CP1-CFP0",
202                 "xponder-type": "mpdr",
203                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
204                 "lcp-hash-val": "Swfw02qXGyI="
205              },
206             res['mapping'])
207
208     def test_10_check_interface_ODU4(self):
209         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
210         self.assertEqual(response.status_code, requests.codes.ok)
211         res = response.json()
212         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
213                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
214                         'type': 'org-openroadm-interfaces:otnOdu',
215                         'supporting-port': 'CP1-CFP0-P1'}
216         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
217                         'rate': 'org-openroadm-otn-common-types:ODU4',
218                         'expected-dapi': 'Swfw02qXGyI=',
219                         'expected-sapi': 'Swfw02qXGyI=',
220                         'tx-dapi': 'Swfw02qXGyI=',
221                         'tx-sapi': 'Swfw02qXGyI='}
222
223         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
224                              res['interface'][0])
225         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
226                                   **input_dict_2
227                                   ),
228                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
229                              )
230         self.assertDictEqual(
231             {u'payload-type': u'21', u'exp-payload-type': u'21'},
232             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
233
234     def test_11_otn_service_path_create_10GE(self):
235         url = "{}/operations/transportpce-device-renderer:otn-service-path"
236         data = {"renderer:input": {
237             "service-name": "service1",
238             "operation": "create",
239             "service-rate": "10G",
240             "service-type": "Ethernet",
241             "ethernet-encoding": "eth encode",
242             "trib-slot": ["1"],
243             "trib-port-number": "1",
244             "nodes": [
245                 {"node-id": "SPDR-SA1",
246                  "client-tp": "XPDR1-CLIENT1",
247                  "network-tp": "XPDR1-NETWORK1"}]}}
248         response = test_utils.post_request(url, data)
249         time.sleep(3)
250         self.assertEqual(response.status_code, requests.codes.ok)
251         res = response.json()
252         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
253         self.assertTrue(res["output"]["success"])
254         self.assertIn(
255             {'node-id': 'SPDR-SA1',
256              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
257              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
258              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
259
260     def test_12_check_interface_10GE_CLIENT(self):
261         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
262         self.assertEqual(response.status_code, requests.codes.ok)
263         res = response.json()
264         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
265                       'administrative-state': 'inService',
266                       'supporting-circuit-pack-name': 'CP1-SFP4',
267                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
268                       'supporting-port': 'CP1-SFP4-P1'
269                       }
270         self.assertDictEqual(dict(res['interface'][0], **input_dict),
271                              res['interface'][0])
272         self.assertDictEqual(
273             {u'speed': 10000},
274             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
275
276     def test_13_check_interface_ODU2E_CLIENT(self):
277         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
278         self.assertEqual(response.status_code, requests.codes.ok)
279         res = response.json()
280
281         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
282                         'administrative-state': 'inService',
283                         'supporting-circuit-pack-name': 'CP1-SFP4',
284                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
285                         'type': 'org-openroadm-interfaces:otnOdu',
286                         'supporting-port': 'CP1-SFP4-P1'}
287         input_dict_2 = {
288             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
289             'rate': 'org-openroadm-otn-common-types:ODU2e',
290             'monitoring-mode': 'terminated'}
291
292         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
293                              res['interface'][0])
294         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
295                                   **input_dict_2),
296                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
297         self.assertDictEqual(
298             {u'payload-type': u'03', u'exp-payload-type': u'03'},
299             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
300
301     def test_14_check_interface_ODU2E_NETWORK(self):
302         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
303         self.assertEqual(response.status_code, requests.codes.ok)
304         res = response.json()
305         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
306                         'supporting-circuit-pack-name': 'CP1-CFP0',
307                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
308                         'type': 'org-openroadm-interfaces:otnOdu',
309                         'supporting-port': 'CP1-CFP0-P1'}
310         input_dict_2 = {
311             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
312             'rate': 'org-openroadm-otn-common-types:ODU2e',
313             'monitoring-mode': 'monitored'}
314
315         input_dict_3 = {'trib-port-number': 1}
316
317         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
318                              res['interface'][0])
319         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
320                                   **input_dict_2),
321                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
322         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
323             'parent-odu-allocation'], **input_dict_3
324         ),
325             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
326             'parent-odu-allocation'])
327         self.assertIn(1,
328                       res['interface'][0][
329                           'org-openroadm-otn-odu-interfaces:odu'][
330                           'parent-odu-allocation']['trib-slots'])
331
332     def test_15_check_ODU2E_connection(self):
333         response = test_utils.check_netconf_node_request(
334             "SPDR-SA1",
335             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
336         self.assertEqual(response.status_code, requests.codes.ok)
337         res = response.json()
338         input_dict_1 = {
339             'connection-name':
340             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
341             'direction': 'bidirectional'
342         }
343
344         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
345                              res['odu-connection'][0])
346         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
347                              res['odu-connection'][0]['destination'])
348         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
349                              res['odu-connection'][0]['source'])
350
351     def test_16_otn_service_path_delete_10GE(self):
352         url = "{}/operations/transportpce-device-renderer:otn-service-path"
353         data = {"renderer:input": {
354             "service-name": "service1",
355             "operation": "delete",
356             "service-rate": "10G",
357             "service-type": "Ethernet",
358             "ethernet-encoding": "eth encode",
359             "trib-slot": ["1"],
360             "trib-port-number": "1",
361             "nodes": [
362                 {"node-id": "SPDR-SA1",
363                  "client-tp": "XPDR1-CLIENT1",
364                  "network-tp": "XPDR1-NETWORK1"}]}}
365         response = test_utils.post_request(url, data)
366         time.sleep(3)
367         self.assertEqual(response.status_code, requests.codes.ok)
368         res = response.json()
369         self.assertIn('Request processed', res["output"]["result"])
370         self.assertTrue(res["output"]["success"])
371
372     def test_17_check_no_ODU2E_connection(self):
373         response = test_utils.check_netconf_node_request(
374             "SPDR-SA1",
375             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
376         self.assertEqual(response.status_code, requests.codes.not_found)
377
378     def test_18_check_no_interface_ODU2E_NETWORK(self):
379         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
380         self.assertEqual(response.status_code, requests.codes.not_found)
381
382     def test_19_check_no_interface_ODU2E_CLIENT(self):
383         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
384         self.assertEqual(response.status_code, requests.codes.not_found)
385
386     def test_20_check_no_interface_10GE_CLIENT(self):
387         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
388         self.assertEqual(response.status_code, requests.codes.not_found)
389
390     def test_21_otn_service_path_delete_ODU4(self):
391         url = "{}/operations/transportpce-device-renderer:otn-service-path"
392         data = {"renderer:input": {
393             "service-name": "service_ODU4",
394             "operation": "delete",
395             "service-rate": "100G",
396             "service-type": "ODU",
397             "nodes": [
398                 {"node-id": "SPDR-SA1",
399                  "network-tp": "XPDR1-NETWORK1"}]}}
400         response = test_utils.post_request(url, data)
401         time.sleep(3)
402         self.assertEqual(response.status_code, requests.codes.ok)
403         res = response.json()
404         self.assertIn('Request processed', res["output"]["result"])
405         self.assertTrue(res["output"]["success"])
406
407     def test_22_check_no_interface_ODU4(self):
408         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
409         self.assertEqual(response.status_code, requests.codes.not_found)
410
411     def test_23_service_path_delete_OCH_OTU4(self):
412         url = "{}/operations/transportpce-device-renderer:service-path"
413         data = {"renderer:input": {
414             "service-name": "service_OTU4",
415             "wave-number": "1",
416             "modulation-format": "qpsk",
417             "operation": "delete",
418             "nodes": [
419                 {"node-id": "SPDR-SA1",
420                  "dest-tp": "XPDR1-NETWORK1"}]}}
421         response = test_utils.post_request(url, data)
422         time.sleep(3)
423         self.assertEqual(response.status_code, requests.codes.ok)
424         res = response.json()
425         self.assertIn('Request processed', res["output"]["result"])
426         self.assertTrue(res["output"]["success"])
427
428     def test_24_check_no_interface_OTU4(self):
429         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
430         self.assertEqual(response.status_code, requests.codes.not_found)
431
432     def test_25_check_no_interface_OCH(self):
433         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
434         self.assertEqual(response.status_code, requests.codes.not_found)
435
436     def test_26_disconnect_SPDR_SA1(self):
437         response = test_utils.unmount_device("SPDR-SA1")
438         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
439
440
441 if __name__ == "__main__":
442     unittest.main(verbosity=2)