generalize RESTconf requests methods in func tests
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import json
14 import time
15 import requests
16 from common import test_utils
17
18
19 class TransportPCEtesting(unittest.TestCase):
20
21     processes = None
22
23     @classmethod
24     def setUpClass(cls):
25         cls.processes = test_utils.start_tpce()
26         cls.processes = test_utils.start_sims(['spdra'])
27
28     @classmethod
29     def tearDownClass(cls):
30         for process in cls.processes:
31             test_utils.shutdown_process(process)
32         print("all processes killed")
33
34     def setUp(self):
35         time.sleep(5)
36
37     def test_01_connect_SPDR_SA1(self):
38         response = test_utils.mount_device("SPDR-SA1", 'spdra')
39         self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
40         time.sleep(10)
41
42         url = "{}/operational/network-topology:network-topology/topology/topology-netconf/node/SPDR-SA1"
43         response = test_utils.get_request(url)
44         self.assertEqual(response.status_code, requests.codes.ok)
45         res = response.json()
46         self.assertEqual(
47             res['node'][0]['netconf-node-topology:connection-status'],
48             'connected')
49
50     def test_02_get_portmapping_CLIENT1(self):
51         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
52         response = test_utils.get_request(url)
53         self.assertEqual(response.status_code, requests.codes.ok)
54         res = response.json()
55         self.assertIn(
56             {'supported-interface-capability': [
57                 'org-openroadm-port-types:if-10GE-ODU2e',
58                 'org-openroadm-port-types:if-10GE-ODU2',
59                 'org-openroadm-port-types:if-10GE'],
60              'supporting-port': 'CP1-SFP4-P1',
61              'supporting-circuit-pack-name': 'CP1-SFP4',
62              'logical-connection-point': 'XPDR1-CLIENT1',
63              'port-direction': 'bidirectional',
64              'port-qual': 'xpdr-client',
65              'lcp-hash-val': 'FqlcrxV7p30='},
66             res['mapping'])
67
68     def test_03_get_portmapping_NETWORK1(self):
69         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
70         response = test_utils.get_request(url)
71         self.assertEqual(response.status_code, requests.codes.ok)
72         res = response.json()
73         self.assertIn(
74             {"logical-connection-point": "XPDR1-NETWORK1",
75              "supporting-port": "CP1-CFP0-P1",
76              "supported-interface-capability": [
77                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
78              ],
79                 "port-direction": "bidirectional",
80                 "port-qual": "xpdr-network",
81                 "supporting-circuit-pack-name": "CP1-CFP0",
82                 "xponder-type": "mpdr",
83              'lcp-hash-val': 'Swfw02qXGyI='},
84             res['mapping'])
85
86     def test_04_service_path_create_OCH_OTU4(self):
87         url = "{}/operations/transportpce-device-renderer:service-path"
88         data = {"renderer:input": {
89             "service-name": "service_ODU4",
90             "wave-number": "1",
91             "modulation-format": "qpsk",
92             "operation": "create",
93             "nodes": [
94                 {"node-id": "SPDR-SA1",
95                  "dest-tp": "XPDR1-NETWORK1"}]}}
96         response = test_utils.post_request(url, data)
97         time.sleep(3)
98         self.assertEqual(response.status_code, requests.codes.ok)
99         res = response.json()
100         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
101         self.assertTrue(res["output"]["success"])
102         self.assertIn(
103             {'node-id': 'SPDR-SA1',
104              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
105              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
106
107     def test_05_get_portmapping_NETWORK1(self):
108         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
109         response = test_utils.get_request(url)
110         self.assertEqual(response.status_code, requests.codes.ok)
111         res = response.json()
112         self.assertIn(
113             {"logical-connection-point": "XPDR1-NETWORK1",
114              "supporting-port": "CP1-CFP0-P1",
115              "supported-interface-capability": [
116                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
117              ],
118                 "port-direction": "bidirectional",
119                 "port-qual": "xpdr-network",
120                 "supporting-circuit-pack-name": "CP1-CFP0",
121                 "xponder-type": "mpdr",
122                 "lcp-hash-val": "Swfw02qXGyI="},
123             res['mapping'])
124
125     def test_06_check_interface_och(self):
126         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
127                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
128                "interface/XPDR1-NETWORK1-1"
129                )
130         response = test_utils.get_request(url)
131         self.assertEqual(response.status_code, requests.codes.ok)
132         res = response.json()
133
134         self.assertDictEqual(dict(res['interface'][0], **{'name': 'XPDR1-NETWORK1-1',
135                                                           'administrative-state': 'inService',
136                                                           'supporting-circuit-pack-name': 'CP1-CFP0',
137                                                           'type': 'org-openroadm-interfaces:opticalChannel',
138                                                           'supporting-port': 'CP1-CFP0-P1'
139                                                           }),
140                              res['interface'][0])
141
142         self.assertDictEqual(
143             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
144              u'transmit-power': -5},
145             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
146
147     def test_07_check_interface_OTU(self):
148         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
149                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
150                "interface/XPDR1-NETWORK1-OTU"
151                )
152         response = test_utils.get_request(url)
153         self.assertEqual(response.status_code, requests.codes.ok)
154         res = response.json()
155         input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
156                         'administrative-state': 'inService',
157                         'supporting-circuit-pack-name': 'CP1-CFP0',
158                         'supporting-interface': 'XPDR1-NETWORK1-1',
159                         'type': 'org-openroadm-interfaces:otnOtu',
160                         'supporting-port': 'CP1-CFP0-P1'
161                         }
162
163         input_dict_2 = {'tx-dapi': 'Swfw02qXGyI=',
164                         'expected-sapi': 'Swfw02qXGyI=',
165                         'tx-sapi': 'Swfw02qXGyI=',
166                         'expected-dapi': 'Swfw02qXGyI=',
167                         'rate': 'org-openroadm-otn-common-types:OTU4',
168                         'fec': 'scfec'
169                         }
170
171         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
172                              res['interface'][0])
173
174         self.assertDictEqual(input_dict_2,
175                              res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
176
177     def test_08_otn_service_path_create_ODU4(self):
178         url = "{}/operations/transportpce-device-renderer:otn-service-path"
179         data = {"renderer:input": {
180             "service-name": "service_ODU4",
181             "operation": "create",
182             "service-rate": "100G",
183             "service-type": "ODU",
184             "nodes": [
185                 {"node-id": "SPDR-SA1",
186                  "network-tp": "XPDR1-NETWORK1"}]}}
187         response = test_utils.post_request(url, data)
188         time.sleep(3)
189         self.assertEqual(response.status_code, requests.codes.ok)
190         res = response.json()
191         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
192         self.assertTrue(res["output"]["success"])
193         self.assertIn(
194             {'node-id': 'SPDR-SA1',
195              'odu-interface-id': ['XPDR1-NETWORK1-ODU4']}, res["output"]['node-interface'])
196
197     def test_09_get_portmapping_NETWORK1(self):
198         url = "{}/config/transportpce-portmapping:network/nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
199         response = test_utils.get_request(url)
200         self.assertEqual(response.status_code, requests.codes.ok)
201         res = response.json()
202         self.assertIn(
203             {"logical-connection-point": "XPDR1-NETWORK1",
204              "supporting-port": "CP1-CFP0-P1",
205              "supported-interface-capability": [
206                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
207              ],
208                 "port-direction": "bidirectional",
209                 "port-qual": "xpdr-network",
210                 "supporting-circuit-pack-name": "CP1-CFP0",
211                 "xponder-type": "mpdr",
212                 "supporting-odu4": "XPDR1-NETWORK1-ODU4",
213                 "lcp-hash-val": "Swfw02qXGyI="
214              },
215             res['mapping'])
216
217     def test_10_check_interface_ODU4(self):
218         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
219                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
220                "interface/XPDR1-NETWORK1-ODU4"
221                )
222         response = test_utils.get_request(url)
223         self.assertEqual(response.status_code, requests.codes.ok)
224         res = response.json()
225         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
226                         'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
227                         'type': 'org-openroadm-interfaces:otnOdu',
228                         'supporting-port': 'CP1-CFP0-P1'}
229         input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
230                         'rate': 'org-openroadm-otn-common-types:ODU4',
231                         'expected-dapi': 'Swfw02qXGyI=',
232                         'expected-sapi': 'Swfw02qXGyI=',
233                         'tx-dapi': 'Swfw02qXGyI=',
234                         'tx-sapi': 'Swfw02qXGyI='}
235
236         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
237                              res['interface'][0])
238         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
239                                   **input_dict_2
240                                   ),
241                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
242                              )
243         self.assertDictEqual(
244             {u'payload-type': u'21', u'exp-payload-type': u'21'},
245             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
246
247     def test_11_otn_service_path_create_10GE(self):
248         url = "{}/operations/transportpce-device-renderer:otn-service-path"
249         data = {"renderer:input": {
250             "service-name": "service1",
251             "operation": "create",
252             "service-rate": "10G",
253             "service-type": "Ethernet",
254             "ethernet-encoding": "eth encode",
255             "trib-slot": ["1"],
256             "trib-port-number": "1",
257             "nodes": [
258                 {"node-id": "SPDR-SA1",
259                  "client-tp": "XPDR1-CLIENT1",
260                  "network-tp": "XPDR1-NETWORK1"}]}}
261         response = test_utils.post_request(url, data)
262         time.sleep(3)
263         self.assertEqual(response.status_code, requests.codes.ok)
264         res = response.json()
265         self.assertIn('Otn Service path was set up successfully for node :SPDR-SA1', res["output"]["result"])
266         self.assertTrue(res["output"]["success"])
267         self.assertIn(
268             {'node-id': 'SPDR-SA1',
269              'connection-id': ['XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1'],
270              'odu-interface-id': ['XPDR1-NETWORK1-ODU2e-service1', 'XPDR1-CLIENT1-ODU2e-service1'],
271              'eth-interface-id': ['XPDR1-CLIENT1-ETHERNET10G']}, res["output"]['node-interface'])
272
273     def test_12_check_interface_10GE_CLIENT(self):
274         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
275                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
276                "interface/XPDR1-CLIENT1-ETHERNET10G"
277                )
278         response = test_utils.get_request(url)
279         self.assertEqual(response.status_code, requests.codes.ok)
280         res = response.json()
281         input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
282                       'administrative-state': 'inService',
283                       'supporting-circuit-pack-name': 'CP1-SFP4',
284                       'type': 'org-openroadm-interfaces:ethernetCsmacd',
285                       'supporting-port': 'CP1-SFP4-P1'
286                       }
287         self.assertDictEqual(dict(res['interface'][0], **input_dict),
288                              res['interface'][0])
289         self.assertDictEqual(
290             {u'speed': 10000},
291             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
292
293     def test_13_check_interface_ODU2E_CLIENT(self):
294         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
295                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
296                "interface/XPDR1-CLIENT1-ODU2e-service1"
297                )
298         response = test_utils.get_request(url)
299         self.assertEqual(response.status_code, requests.codes.ok)
300         res = response.json()
301
302         input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1',
303                         'administrative-state': 'inService',
304                         'supporting-circuit-pack-name': 'CP1-SFP4',
305                         'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
306                         'type': 'org-openroadm-interfaces:otnOdu',
307                         'supporting-port': 'CP1-SFP4-P1'}
308         input_dict_2 = {
309             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
310             'rate': 'org-openroadm-otn-common-types:ODU2e',
311             'monitoring-mode': 'terminated'}
312
313         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
314                              res['interface'][0])
315         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
316                                   **input_dict_2),
317                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
318         self.assertDictEqual(
319             {u'payload-type': u'03', u'exp-payload-type': u'03'},
320             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
321
322     def test_14_check_interface_ODU2E_NETWORK(self):
323         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
324                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
325                "interface/XPDR1-NETWORK1-ODU2e-service1"
326                )
327         response = test_utils.get_request(url)
328         self.assertEqual(response.status_code, requests.codes.ok)
329         res = response.json()
330         input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
331                         'supporting-circuit-pack-name': 'CP1-CFP0',
332                         'supporting-interface': 'XPDR1-NETWORK1-ODU4',
333                         'type': 'org-openroadm-interfaces:otnOdu',
334                         'supporting-port': 'CP1-CFP0-P1'}
335         input_dict_2 = {
336             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
337             'rate': 'org-openroadm-otn-common-types:ODU2e',
338             'monitoring-mode': 'monitored'}
339
340         input_dict_3 = {'trib-port-number': 1}
341
342         self.assertDictEqual(dict(res['interface'][0], **input_dict_1),
343                              res['interface'][0])
344         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
345                                   **input_dict_2),
346                              res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
347         self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
348             'parent-odu-allocation'], **input_dict_3
349         ),
350             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
351             'parent-odu-allocation'])
352         self.assertIn(1,
353                       res['interface'][0][
354                           'org-openroadm-otn-odu-interfaces:odu'][
355                           'parent-odu-allocation']['trib-slots'])
356
357     def test_15_check_ODU2E_connection(self):
358         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
359                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
360                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
361                )
362         response = test_utils.get_request(url)
363         self.assertEqual(response.status_code, requests.codes.ok)
364         res = response.json()
365         input_dict_1 = {
366             'connection-name':
367             'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
368             'direction': 'bidirectional'
369         }
370
371         self.assertDictEqual(dict(res['odu-connection'][0], **input_dict_1),
372                              res['odu-connection'][0])
373         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
374                              res['odu-connection'][0]['destination'])
375         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
376                              res['odu-connection'][0]['source'])
377
378     def test_16_otn_service_path_delete_10GE(self):
379         url = "{}/operations/transportpce-device-renderer:otn-service-path"
380         data = {"renderer:input": {
381             "service-name": "service1",
382             "operation": "delete",
383             "service-rate": "10G",
384             "service-type": "Ethernet",
385             "ethernet-encoding": "eth encode",
386             "trib-slot": ["1"],
387             "trib-port-number": "1",
388             "nodes": [
389                 {"node-id": "SPDR-SA1",
390                  "client-tp": "XPDR1-CLIENT1",
391                  "network-tp": "XPDR1-NETWORK1"}]}}
392         response = test_utils.post_request(url, data)
393         time.sleep(3)
394         self.assertEqual(response.status_code, requests.codes.ok)
395         res = response.json()
396         self.assertIn('Request processed', res["output"]["result"])
397         self.assertTrue(res["output"]["success"])
398
399     def test_17_check_no_ODU2E_connection(self):
400         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
401                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
402                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
403                )
404         response = test_utils.get_request(url)
405         self.assertEqual(response.status_code, requests.codes.not_found)
406
407     def test_18_check_no_interface_ODU2E_NETWORK(self):
408         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
409                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
410                "interface/XPDR1-NETWORK1-ODU2e-service1"
411                )
412         response = test_utils.get_request(url)
413         self.assertEqual(response.status_code, requests.codes.not_found)
414
415     def test_19_check_no_interface_ODU2E_CLIENT(self):
416         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
417                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
418                "interface/XPDR1-CLIENT1-ODU2e-service1"
419                )
420         response = test_utils.get_request(url)
421         self.assertEqual(response.status_code, requests.codes.not_found)
422
423     def test_20_check_no_interface_10GE_CLIENT(self):
424         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
425                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
426                "interface/XPDR1-CLIENT1-ETHERNET10G"
427                )
428         response = test_utils.get_request(url)
429         self.assertEqual(response.status_code, requests.codes.not_found)
430
431     def test_21_otn_service_path_delete_ODU4(self):
432         url = "{}/operations/transportpce-device-renderer:otn-service-path"
433         data = {"renderer:input": {
434             "service-name": "service_ODU4",
435             "operation": "delete",
436             "service-rate": "100G",
437             "service-type": "ODU",
438             "nodes": [
439                 {"node-id": "SPDR-SA1",
440                  "network-tp": "XPDR1-NETWORK1"}]}}
441         response = test_utils.post_request(url, data)
442         time.sleep(3)
443         self.assertEqual(response.status_code, requests.codes.ok)
444         res = response.json()
445         self.assertIn('Request processed', res["output"]["result"])
446         self.assertTrue(res["output"]["success"])
447
448     def test_22_check_no_interface_ODU4(self):
449         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
450                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
451                "interface/XPDR1-NETWORK1-ODU4"
452                )
453         response = test_utils.get_request(url)
454         self.assertEqual(response.status_code, requests.codes.not_found)
455
456     def test_23_service_path_delete_OCH_OTU4(self):
457         url = "{}/operations/transportpce-device-renderer:service-path"
458         data = {"renderer:input": {
459             "service-name": "service_OTU4",
460             "wave-number": "1",
461             "modulation-format": "qpsk",
462             "operation": "delete",
463             "nodes": [
464                 {"node-id": "SPDR-SA1",
465                  "dest-tp": "XPDR1-NETWORK1"}]}}
466         response = test_utils.post_request(url, data)
467         time.sleep(3)
468         self.assertEqual(response.status_code, requests.codes.ok)
469         res = response.json()
470         self.assertIn('Request processed', res["output"]["result"])
471         self.assertTrue(res["output"]["success"])
472
473     def test_24_check_no_interface_OTU4(self):
474         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
475                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
476                "interface/XPDR1-NETWORK1-OTU"
477                )
478         response = test_utils.get_request(url)
479         self.assertEqual(response.status_code, requests.codes.not_found)
480
481     def test_25_check_no_interface_OCH(self):
482         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
483                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
484                "interface/XPDR1-NETWORK1-1"
485                )
486         response = test_utils.get_request(url)
487         self.assertEqual(response.status_code, requests.codes.not_found)
488
489     def test_26_disconnect_SPDR_SA1(self):
490         response = test_utils.unmount_device("SPDR-SA1")
491         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
492
493
494 if __name__ == "__main__":
495     unittest.main(verbosity=2)