Merge "Add INFO.yaml for transportPCE"
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 class TransportPCEtesting(unittest.TestCase):
20
21     processes = None
22
23     @classmethod
24     def setUpClass(cls):
25         cls.processes = test_utils.start_tpce()
26         cls.processes = test_utils.start_sims(['spdra'])
27
28     @classmethod
29     def tearDownClass(cls):
30         for process in cls.processes:
31             test_utils.shutdown_process(process)
32         print("all processes killed")
33
34     def setUp(self):
35         time.sleep(5)
36
37     def test_01_connect_SPDR_SA1(self):
38         response = test_utils.mount_device("SPDR-SA1", 'spdra')
39         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40         time.sleep(10)
41
42         response = test_utils.get_netconf_oper_request("SPDR-SA1")
43         self.assertEqual(response.status_code, requests.codes.ok)
44         res = response.json()
45         self.assertEqual(
46             res['node'][0]['netconf-node-topology:connection-status'],
47             'connected')
48
49     def test_02_get_portmapping_CLIENT1(self):
50         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-CLIENT1")
51         self.assertEqual(response.status_code, requests.codes.ok)
52         res = response.json()
53         self.assertIn(
54             {'supported-interface-capability': [
55                 'org-openroadm-port-types:if-10GE-ODU2e',
56                 'org-openroadm-port-types:if-10GE-ODU2',
57                 'org-openroadm-port-types:if-10GE'],
58              'supporting-port': 'CP1-SFP4-P1',
59              'supporting-circuit-pack-name': 'CP1-SFP4',
60              'logical-connection-point': 'XPDR1-CLIENT1',
61              'port-direction': 'bidirectional',
62              'port-qual': 'xpdr-client',
63              'lcp-hash-val': 'FqlcrxV7p30='},
64             res['mapping'])
65
66     def test_03_get_portmapping_NETWORK1(self):
67         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
68         self.assertEqual(response.status_code, requests.codes.ok)
69         res = response.json()
70         self.assertIn(
71             {"logical-connection-point": "XPDR1-NETWORK1",
72              "supporting-port": "CP1-CFP0-P1",
73              "supported-interface-capability": [
74                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
75              ],
76                 "port-direction": "bidirectional",
77                 "port-qual": "xpdr-network",
78                 "supporting-circuit-pack-name": "CP1-CFP0",
79                 "xponder-type": "mpdr",
80              'lcp-hash-val': 'Swfw02qXGyI='},
81             res['mapping'])
82
83     def test_04_service_path_create_OCH_OTU4(self):
84         url = "{}/operations/transportpce-device-renderer:service-path"
85         data = {"renderer:input": {
86             "service-name": "service_OCH_OTU4",
87             "wave-number": "1",
88             "modulation-format": "qpsk",
89             "operation": "create",
90             "nodes": [
91                 {"node-id": "SPDR-SA1",
92                  "dest-tp": "XPDR1-NETWORK1"}]}}
93         response = test_utils.post_request(url, data)
94         time.sleep(3)
95         self.assertEqual(response.status_code, requests.codes.ok)
96         res = response.json()
97         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
98         self.assertTrue(res["output"]["success"])
99         self.assertIn(
100             {'node-id': 'SPDR-SA1',
101              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
102              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
103
104     def test_05_get_portmapping_NETWORK1(self):
105         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
106         self.assertEqual(response.status_code, requests.codes.ok)
107         res = response.json()
108         self.assertIn(
109             {"logical-connection-point": "XPDR1-NETWORK1",
110              "supporting-port": "CP1-CFP0-P1",
111              "supported-interface-capability": [
112                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
113              ],
114                 "port-direction": "bidirectional",
115                 "port-qual": "xpdr-network",
116                 "supporting-circuit-pack-name": "CP1-CFP0",
117                 "xponder-type": "mpdr",
118                 "lcp-hash-val": "Swfw02qXGyI="},
119             res['mapping'])
120
121     def test_06_check_interface_och(self):
122         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
123         self.assertEqual(response.status_code, requests.codes.ok)
124         res = response.json()
125
126         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
127                                                           'administrative-state': 'inService',
128                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
129                                                           'type': 'org-openroadm-interfaces:opticalChannel',
130                                                           'supporting-port': 'CP1-CFP0-P1'
131                                                           }),
132                              res['interface'][0])
133
134         self.assertDictEqual(
135             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
136              u'transmit-power': -5, u'modulation-format': 'dp-qpsk'},
137             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
138
139     def test_07_check_interface_OTU(self):
140         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
141         self.assertEqual(response.status_code, requests.codes.ok)
142         res = response.json()
143         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
144                         'administrative-state': 'inService',
145                         'supporting-circuit-pack-name': 'CP1-CFP0',
146                         'supporting-interface': 'XPDR1-NETWORK1-1',
147                         'type': 'org-openroadm-interfaces:otnOtu',
148                         'supporting-port': 'CP1-CFP0-P1'
149                         }
150
151         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
152                         'expected-sapi': 'Swfw02qXGyI=',
153                         'tx-sapi': 'Swfw02qXGyI=',
154                         'expected-dapi': 'Swfw02qXGyI=',
155                         'rate': 'org-openroadm-otn-common-types:OTU4',
156                         'fec': 'scfec'
157                         }
158
159         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
160                              res['interface'][0])
161
162         self.assertDictEqual(input_dict_2,
163                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
164
165     def test_08_otn_service_path_create_ODU4(self):
166         url = "{}/operations/transportpce-device-renderer:otn-service-path"
167         data = {"renderer:input": {
168             "service-name": "service_ODU4",
169             "operation": "create",
170             "service-rate": "100G",
171             "service-type": "ODU",
172             "nodes": [
173                 {"node-id": "SPDR-SA1",
174                  "network-tp": "XPDR1-NETWORK1"}]}}
175         response = test_utils.post_request(url, data)
176         time.sleep(3)
177         self.assertEqual(response.status_code, requests.codes.ok)
178         res = response.json()
179         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
180         self.assertTrue(res["output"]["success"])
181         self.assertIn(
182             {'node-id': 'SPDR-SA1',
183              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
184
185     def test_09_get_portmapping_NETWORK1(self):
186         response = test_utils.portmapping_request("SPDR-SA1/mapping/XPDR1-NETWORK1")
187         self.assertEqual(response.status_code, requests.codes.ok)
188         res = response.json()
189         self.assertIn(
190             {"logical-connection-point": "XPDR1-NETWORK1",
191              "supporting-port": "CP1-CFP0-P1",
192              "supported-interface-capability": [
193                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
194              ],
195                 "port-direction": "bidirectional",
196                 "port-qual": "xpdr-network",
197                 "supporting-circuit-pack-name": "CP1-CFP0",
198                 "xponder-type": "mpdr",
199                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
200                 "lcp-hash-val": "Swfw02qXGyI="
201              },
202             res['mapping'])
203
204     def test_10_check_interface_ODU4(self):
205         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
206         self.assertEqual(response.status_code, requests.codes.ok)
207         res = response.json()
208         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
209                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
210                         'type': 'org-openroadm-interfaces:otnOdu',
211                         'supporting-port': 'CP1-CFP0-P1'}
212         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
213                         'rate': 'org-openroadm-otn-common-types:ODU4',
214                         'expected-dapi': 'Swfw02qXGyI=',
215                         'expected-sapi': 'Swfw02qXGyI=',
216                         'tx-dapi': 'Swfw02qXGyI=',
217                         'tx-sapi': 'Swfw02qXGyI='}
218
219         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
220                              res['interface'][0])
221         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
222                                   **input_dict_2
223                                   ),
224                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
225                              )
226         self.assertDictEqual(
227             {u'payload-type': u'21', u'exp-payload-type': u'21'},
228             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
229
230     def test_11_otn_service_path_create_10GE(self):
231         url = "{}/operations/transportpce-device-renderer:otn-service-path"
232         data = {"renderer:input": {
233             "service-name": "service1",
234             "operation": "create",
235             "service-rate": "10G",
236             "service-type": "Ethernet",
237             "ethernet-encoding": "eth encode",
238             "trib-slot": ["1"],
239             "trib-port-number": "1",
240             "nodes": [
241                 {"node-id": "SPDR-SA1",
242                  "client-tp": "XPDR1-CLIENT1",
243                  "network-tp": "XPDR1-NETWORK1"}]}}
244         response = test_utils.post_request(url, data)
245         time.sleep(3)
246         self.assertEqual(response.status_code, requests.codes.ok)
247         res = response.json()
248         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
249         self.assertTrue(res["output"]["success"])
250         self.assertIn(
251             {'node-id': 'SPDR-SA1',
252              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
253              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
254              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
255
256     def test_12_check_interface_10GE_CLIENT(self):
257         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
258         self.assertEqual(response.status_code, requests.codes.ok)
259         res = response.json()
260         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
261                       'administrative-state': 'inService',
262                       'supporting-circuit-pack-name': 'CP1-SFP4',
263                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
264                       'supporting-port': 'CP1-SFP4-P1'
265                       }
266         self.assertDictEqual(dict(res['interface'][0], **input_dict),
267                              res['interface'][0])
268         self.assertDictEqual(
269             {u'speed': 10000},
270             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
271
272     def test_13_check_interface_ODU2E_CLIENT(self):
273         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
274         self.assertEqual(response.status_code, requests.codes.ok)
275         res = response.json()
276
277         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
278                         'administrative-state': 'inService',
279                         'supporting-circuit-pack-name': 'CP1-SFP4',
280                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
281                         'type': 'org-openroadm-interfaces:otnOdu',
282                         'supporting-port': 'CP1-SFP4-P1'}
283         input_dict_2 = {
284             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
285             'rate': 'org-openroadm-otn-common-types:ODU2e',
286             'monitoring-mode': 'terminated'}
287
288         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
289                              res['interface'][0])
290         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
291                                   **input_dict_2),
292                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
293         self.assertDictEqual(
294             {u'payload-type': u'03', u'exp-payload-type': u'03'},
295             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
296
297     def test_14_check_interface_ODU2E_NETWORK(self):
298         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
299         self.assertEqual(response.status_code, requests.codes.ok)
300         res = response.json()
301         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
302                         'supporting-circuit-pack-name': 'CP1-CFP0',
303                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
304                         'type': 'org-openroadm-interfaces:otnOdu',
305                         'supporting-port': 'CP1-CFP0-P1'}
306         input_dict_2 = {
307             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
308             'rate': 'org-openroadm-otn-common-types:ODU2e',
309             'monitoring-mode': 'monitored'}
310
311         input_dict_3 = {'trib-port-number': 1}
312
313         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
314                              res['interface'][0])
315         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
316                                   **input_dict_2),
317                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
318         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
319             'parent-odu-allocation'], **input_dict_3
320         ),
321             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
322             'parent-odu-allocation'])
323         self.assertIn(1,
324                       res['interface'][0][
325                           'org-openroadm-otn-odu-interfaces:odu'][
326                           'parent-odu-allocation']['trib-slots'])
327
328     def test_15_check_ODU2E_connection(self):
329         response = test_utils.check_netconf_node_request(
330             "SPDR-SA1",
331             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
332         self.assertEqual(response.status_code, requests.codes.ok)
333         res = response.json()
334         input_dict_1 = {
335             'connection-name':
336             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
337             'direction': 'bidirectional'
338         }
339
340         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
341                              res['odu-connection'][0])
342         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
343                              res['odu-connection'][0]['destination'])
344         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
345                              res['odu-connection'][0]['source'])
346
347     def test_16_otn_service_path_delete_10GE(self):
348         url = "{}/operations/transportpce-device-renderer:otn-service-path"
349         data = {"renderer:input": {
350             "service-name": "service1",
351             "operation": "delete",
352             "service-rate": "10G",
353             "service-type": "Ethernet",
354             "ethernet-encoding": "eth encode",
355             "trib-slot": ["1"],
356             "trib-port-number": "1",
357             "nodes": [
358                 {"node-id": "SPDR-SA1",
359                  "client-tp": "XPDR1-CLIENT1",
360                  "network-tp": "XPDR1-NETWORK1"}]}}
361         response = test_utils.post_request(url, data)
362         time.sleep(3)
363         self.assertEqual(response.status_code, requests.codes.ok)
364         res = response.json()
365         self.assertIn('Request processed', res["output"]["result"])
366         self.assertTrue(res["output"]["success"])
367
368     def test_17_check_no_ODU2E_connection(self):
369         response = test_utils.check_netconf_node_request(
370             "SPDR-SA1",
371             "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1")
372         self.assertEqual(response.status_code, requests.codes.not_found)
373
374     def test_18_check_no_interface_ODU2E_NETWORK(self):
375         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
376         self.assertEqual(response.status_code, requests.codes.not_found)
377
378     def test_19_check_no_interface_ODU2E_CLIENT(self):
379         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
380         self.assertEqual(response.status_code, requests.codes.not_found)
381
382     def test_20_check_no_interface_10GE_CLIENT(self):
383         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
384         self.assertEqual(response.status_code, requests.codes.not_found)
385
386     def test_21_otn_service_path_delete_ODU4(self):
387         url = "{}/operations/transportpce-device-renderer:otn-service-path"
388         data = {"renderer:input": {
389             "service-name": "service_ODU4",
390             "operation": "delete",
391             "service-rate": "100G",
392             "service-type": "ODU",
393             "nodes": [
394                 {"node-id": "SPDR-SA1",
395                  "network-tp": "XPDR1-NETWORK1"}]}}
396         response = test_utils.post_request(url, data)
397         time.sleep(3)
398         self.assertEqual(response.status_code, requests.codes.ok)
399         res = response.json()
400         self.assertIn('Request processed', res["output"]["result"])
401         self.assertTrue(res["output"]["success"])
402
403     def test_22_check_no_interface_ODU4(self):
404         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
405         self.assertEqual(response.status_code, requests.codes.not_found)
406
407     def test_23_service_path_delete_OCH_OTU4(self):
408         url = "{}/operations/transportpce-device-renderer:service-path"
409         data = {"renderer:input": {
410             "service-name": "service_OTU4",
411             "wave-number": "1",
412             "modulation-format": "qpsk",
413             "operation": "delete",
414             "nodes": [
415                 {"node-id": "SPDR-SA1",
416                  "dest-tp": "XPDR1-NETWORK1"}]}}
417         response = test_utils.post_request(url, data)
418         time.sleep(3)
419         self.assertEqual(response.status_code, requests.codes.ok)
420         res = response.json()
421         self.assertIn('Request processed', res["output"]["result"])
422         self.assertTrue(res["output"]["success"])
423
424     def test_24_check_no_interface_OTU4(self):
425         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
426         self.assertEqual(response.status_code, requests.codes.not_found)
427
428     def test_25_check_no_interface_OCH(self):
429         response = test_utils.check_netconf_node_request("SPDR-SA1", "interface/XPDR1-NETWORK1-1")
430         self.assertEqual(response.status_code, requests.codes.not_found)
431
432     def test_26_disconnect_SPDR_SA1(self):
433         response = test_utils.unmount_device("SPDR-SA1")
434         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
435
436
437 if __name__ == "__main__":
438     unittest.main(verbosity=2)