improve functional tests pylint score
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_otn_renderer.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 import test_utils
23
24
25 class TransportPCEtesting(unittest.TestCase):
26
27     honeynode_process1 = None
28     odl_process = None
29     restconf_baseurl = "http://localhost:8181/restconf"
30
31     @classmethod
32     def setUpClass(cls):
33         print("starting honeynode1...")
34         cls.honeynode_process1 = test_utils.start_spdra_honeynode()
35         time.sleep(30)
36
37         print("starting opendaylight...")
38         cls.odl_process = test_utils.start_tpce()
39         time.sleep(60)
40         print("opendaylight started")
41
42     @classmethod
43     def tearDownClass(cls):
44         for child in psutil.Process(cls.odl_process.pid).children():
45             child.send_signal(signal.SIGINT)
46             child.wait()
47         cls.odl_process.send_signal(signal.SIGINT)
48         cls.odl_process.wait()
49         for child in psutil.Process(cls.honeynode_process1.pid).children():
50             child.send_signal(signal.SIGINT)
51             child.wait()
52         cls.honeynode_process1.send_signal(signal.SIGINT)
53         cls.honeynode_process1.wait()
54
55     def setUp(self):
56         time.sleep(5)
57
58     def test_01_connect_SPDR_SA1(self):
59         url = ("{}/config/network-topology:"
60                "network-topology/topology/topology-netconf/node/SPDR-SA1"
61                .format(self.restconf_baseurl))
62         data = {"node": [{
63             "node-id": "SPDR-SA1",
64             "netconf-node-topology:username": "admin",
65             "netconf-node-topology:password": "admin",
66             "netconf-node-topology:host": "127.0.0.1",
67             "netconf-node-topology:port": "17845",
68             "netconf-node-topology:tcp-only": "false",
69             "netconf-node-topology:pass-through": {}}]}
70         headers = {'content-type': 'application/json'}
71         response = requests.request(
72             "PUT", url, data=json.dumps(data), headers=headers,
73             auth=('admin', 'admin'))
74         self.assertEqual(response.status_code, requests.codes.created)
75         time.sleep(10)
76         url = ("{}/operational/network-topology:"
77                "network-topology/topology/topology-netconf/node/SPDR-SA1"
78                .format(self.restconf_baseurl))
79         response = requests.request(
80             "GET", url, headers=headers, auth=('admin', 'admin'))
81         self.assertEqual(response.status_code, requests.codes.ok)
82         res = response.json()
83         self.assertEqual(
84             res['node'][0]['netconf-node-topology:connection-status'],
85             'connected')
86
87     def test_02_get_portmapping_CLIENT1(self):
88         url = ("{}/config/transportpce-portmapping:network/"
89                "nodes/SPDR-SA1/mapping/XPDR1-CLIENT1"
90                .format(self.restconf_baseurl))
91         headers = {'content-type': 'application/json'}
92         response = requests.request(
93             "GET", url, headers=headers, auth=('admin', 'admin'))
94         self.assertEqual(response.status_code, requests.codes.ok)
95         res = response.json()
96         self.assertIn(
97             {'supported-interface-capability': [
98                 'org-openroadm-port-types:if-10GE-ODU2e',
99                 'org-openroadm-port-types:if-10GE-ODU2',
100                 'org-openroadm-port-types:if-10GE'],
101              'supporting-port': 'CP1-SFP4-P1',
102              'supporting-circuit-pack-name': 'CP1-SFP4',
103              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
104              'port-qual': 'xpdr-client'},
105             res['mapping'])
106
107     def test_03_get_portmapping_NETWORK1(self):
108         url = ("{}/config/transportpce-portmapping:network/"
109                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
110                .format(self.restconf_baseurl))
111         headers = {'content-type': 'application/json'}
112         response = requests.request(
113             "GET", url, headers=headers, auth=('admin', 'admin'))
114         self.assertEqual(response.status_code, requests.codes.ok)
115         res = response.json()
116         self.assertIn(
117             {"logical-connection-point": "XPDR1-NETWORK1",
118              "supporting-port": "CP1-CFP0-P1",
119              "supported-interface-capability": [
120                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
121              ],
122                 "port-direction": "bidirectional",
123                 "port-qual": "xpdr-network",
124                 "supporting-circuit-pack-name": "CP1-CFP0",
125                 "xponder-type": "mpdr"},
126             res['mapping'])
127
128     def test_04_service_path_create_ODU4(self):
129         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
130         data = {"renderer:input": {
131             "service-name": "service_ODU4",
132             "wave-number": "1",
133             "modulation-format": "qpsk",
134             "operation": "create",
135             "nodes": [
136                 {"node-id": "SPDR-SA1",
137                  "dest-tp": "XPDR1-NETWORK1"}]}}
138         headers = {'content-type': 'application/json'}
139         response = requests.request(
140             "POST", url, data=json.dumps(data),
141             headers=headers, auth=('admin', 'admin'))
142         time.sleep(3)
143         self.assertEqual(response.status_code, requests.codes.ok)
144         res = response.json()
145         self.assertIn('Roadm-connection successfully created for nodes: ', res["output"]["result"])
146         self.assertTrue(res["output"]["success"])
147         self.assertIn(
148             {'node-id': 'SPDR-SA1',
149              'otu-interface-id': ['XPDR1-NETWORK1-OTU'],
150              'odu-interface-id': ['XPDR1-NETWORK1-ODU4'],
151              'och-interface-id': ['XPDR1-NETWORK1-1']}, res["output"]['node-interface'])
152
153     def test_05_get_portmapping_NETWORK1(self):
154         url = ("{}/config/transportpce-portmapping:network/"
155                "nodes/SPDR-SA1/mapping/XPDR1-NETWORK1"
156                .format(self.restconf_baseurl))
157         headers = {'content-type': 'application/json'}
158         response = requests.request(
159             "GET", url, headers=headers, auth=('admin', 'admin'))
160         self.assertEqual(response.status_code, requests.codes.ok)
161         res = response.json()
162         self.assertIn(
163             {"logical-connection-point": "XPDR1-NETWORK1",
164              "supporting-port": "CP1-CFP0-P1",
165              "supported-interface-capability": [
166                  "org-openroadm-port-types:if-OCH-OTU4-ODU4"
167              ],
168                 "port-direction": "bidirectional",
169                 "port-qual": "xpdr-network",
170                 "supporting-circuit-pack-name": "CP1-CFP0",
171                 "xponder-type": "mpdr",
172                 "supporting-odu4": "XPDR1-NETWORK1-ODU4"},
173             res['mapping'])
174
175     def test_06_check_interface_och(self):
176         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
177                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
178                "interface/XPDR1-NETWORK1-1"
179                .format(self.restconf_baseurl))
180         headers = {'content-type': 'application/json'}
181         response = requests.request(
182             "GET", url, headers=headers, auth=('admin', 'admin'))
183         self.assertEqual(response.status_code, requests.codes.ok)
184         res = response.json()
185         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-1', 'administrative-state': 'inService',
186                                        'supporting-circuit-pack-name': 'CP1-CFP0',
187                                        'type': 'org-openroadm-interfaces:opticalChannel',
188                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
189         self.assertDictEqual(
190             {u'frequency': 196.1, u'rate': u'org-openroadm-common-types:R100G',
191              u'transmit-power': -5},
192             res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
193
194     def test_07_check_interface_OTU(self):
195         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
196                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
197                "interface/XPDR1-NETWORK1-OTU"
198                .format(self.restconf_baseurl))
199         headers = {'content-type': 'application/json'}
200         response = requests.request(
201             "GET", url, headers=headers, auth=('admin', 'admin'))
202         self.assertEqual(response.status_code, requests.codes.ok)
203         res = response.json()
204         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-OTU', 'administrative-state': 'inService',
205                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-1',
206                                        'type': 'org-openroadm-interfaces:otnOtu',
207                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
208         self.assertDictEqual(
209             {u'rate': u'org-openroadm-otn-common-types:OTU4',
210              u'fec': u'scfec'},
211             res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
212
213     def test_08_check_interface_ODU4(self):
214         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
215                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
216                "interface/XPDR1-NETWORK1-ODU4"
217                .format(self.restconf_baseurl))
218         headers = {'content-type': 'application/json'}
219         response = requests.request(
220             "GET", url, headers=headers, auth=('admin', 'admin'))
221         self.assertEqual(response.status_code, requests.codes.ok)
222         res = response.json()
223         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU4', 'administrative-state': 'inService',
224                                        'supporting-circuit-pack-name': 'CP1-CFP0', 'supporting-interface': 'XPDR1-NETWORK1-OTU',
225                                        'type': 'org-openroadm-interfaces:otnOdu',
226                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
227         self.assertDictContainsSubset(
228             {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
229              'rate': 'org-openroadm-otn-common-types:ODU4'},
230             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
231         self.assertDictEqual(
232             {u'payload-type': u'21', u'exp-payload-type': u'21'},
233             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
234
235     def test_09_otn_service_path_create_10GE(self):
236         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
237         data = {"renderer:input": {
238             "service-name": "service1",
239             "operation": "create",
240             "service-rate": "10G",
241             "service-type": "Ethernet",
242             "ethernet-encoding": "eth encode",
243             "trib-slot": ["1"],
244             "trib-port-number": "1",
245             "opucn-trib-slots": ["1"],
246             "nodes": [
247                 {"node-id": "SPDR-SA1",
248                  "client-tp": "XPDR1-CLIENT1",
249                  "network-tp": "XPDR1-NETWORK1"}]}}
250         headers = {'content-type': 'application/json'}
251         response = requests.request(
252             "POST", url, data=json.dumps(data),
253             headers=headers, auth=('admin', 'admin'))
254         self.assertEqual(response.status_code, requests.codes.ok)
255         res = response.json()
256         self.assertIn('Otn Service path was set up successfully for node :service1-SPDR-SA1', res["output"]["result"])
257         self.assertTrue(res["output"]["success"])
258
259     def test_10_check_interface_10GE_CLIENT(self):
260         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
261                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
262                "interface/XPDR1-CLIENT1-ETHERNET10G"
263                .format(self.restconf_baseurl))
264         headers = {'content-type': 'application/json'}
265         response = requests.request(
266             "GET", url, headers=headers, auth=('admin', 'admin'))
267         self.assertEqual(response.status_code, requests.codes.ok)
268         res = response.json()
269         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ETHERNET10G', 'administrative-state': 'inService',
270                                        'supporting-circuit-pack-name': 'CP1-SFP4',
271                                        'type': 'org-openroadm-interfaces:ethernetCsmacd',
272                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
273         self.assertDictEqual(
274             {u'speed': 10000},
275             res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
276
277     def test_11_check_interface_ODU2E_CLIENT(self):
278         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
279                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
280                "interface/XPDR1-CLIENT1-ODU2e-service1"
281                .format(self.restconf_baseurl))
282         headers = {'content-type': 'application/json'}
283         response = requests.request(
284             "GET", url, headers=headers, auth=('admin', 'admin'))
285         self.assertEqual(response.status_code, requests.codes.ok)
286         res = response.json()
287         self.assertDictContainsSubset({'name': 'XPDR1-CLIENT1-ODU2e-service1', 'administrative-state': 'inService',
288                                        'supporting-circuit-pack-name': 'CP1-SFP4',
289                                        'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
290                                        'type': 'org-openroadm-interfaces:otnOdu',
291                                        'supporting-port': 'CP1-SFP4-P1'}, res['interface'][0])
292         self.assertDictContainsSubset({
293             'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
294             'rate': 'org-openroadm-otn-common-types:ODU2e',
295             'monitoring-mode': 'terminated'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
296         self.assertDictEqual(
297             {u'payload-type': u'03', u'exp-payload-type': u'03'},
298             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
299
300     def test_12_check_interface_ODU2E_NETWORK(self):
301         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
302                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
303                "interface/XPDR1-NETWORK1-ODU2e-service1"
304                .format(self.restconf_baseurl))
305         headers = {'content-type': 'application/json'}
306         response = requests.request(
307             "GET", url, headers=headers, auth=('admin', 'admin'))
308         self.assertEqual(response.status_code, requests.codes.ok)
309         res = response.json()
310         self.assertDictContainsSubset({'name': 'XPDR1-NETWORK1-ODU2e-service1', 'administrative-state': 'inService',
311                                        'supporting-circuit-pack-name': 'CP1-CFP0',
312                                        'supporting-interface': 'XPDR1-NETWORK1-ODU4',
313                                        'type': 'org-openroadm-interfaces:otnOdu',
314                                        'supporting-port': 'CP1-CFP0-P1'}, res['interface'][0])
315         self.assertDictContainsSubset({
316             'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
317             'rate': 'org-openroadm-otn-common-types:ODU2e',
318             'monitoring-mode': 'monitored'}, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
319         self.assertDictContainsSubset(
320             {'trib-port-number': 1},
321             res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
322         self.assertIn(1,
323                       res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']['trib-slots'])
324
325     def test_13_check_ODU2E_connection(self):
326         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
327                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
328                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
329                .format(self.restconf_baseurl))
330         headers = {'content-type': 'application/json'}
331         response = requests.request(
332             "GET", url, headers=headers, auth=('admin', 'admin'))
333         self.assertEqual(response.status_code, requests.codes.ok)
334         res = response.json()
335         self.assertDictContainsSubset({
336             'connection-name': 'XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1',
337             'direction': 'bidirectional'},
338             res['odu-connection'][0])
339         self.assertDictEqual({u'dst-if': u'XPDR1-NETWORK1-ODU2e-service1'},
340                              res['odu-connection'][0]['destination'])
341         self.assertDictEqual({u'src-if': u'XPDR1-CLIENT1-ODU2e-service1'},
342                              res['odu-connection'][0]['source'])
343
344     def test_14_otn_service_path_delete_10GE(self):
345         url = "{}/operations/transportpce-device-renderer:otn-service-path".format(self.restconf_baseurl)
346         data = {"renderer:input": {
347             "service-name": "service1",
348             "operation": "delete",
349             "service-rate": "10G",
350             "service-type": "Ethernet",
351             "ethernet-encoding": "eth encode",
352             "trib-slot": ["1"],
353             "trib-port-number": "1",
354             "opucn-trib-slots": ["1"],
355             "nodes": [
356                 {"node-id": "SPDR-SA1",
357                  "client-tp": "XPDR1-CLIENT1",
358                  "network-tp": "XPDR1-NETWORK1"}]}}
359         headers = {'content-type': 'application/json'}
360         response = requests.request(
361             "POST", url, data=json.dumps(data),
362             headers=headers, auth=('admin', 'admin'))
363         self.assertEqual(response.status_code, requests.codes.ok)
364         res = response.json()
365         self.assertIn('Request processed', res["output"]["result"])
366         self.assertTrue(res["output"]["success"])
367
368     def test_15_check_no_ODU2E_connection(self):
369         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
370                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
371                "odu-connection/XPDR1-CLIENT1-ODU2e-service1-x-XPDR1-NETWORK1-ODU2e-service1"
372                .format(self.restconf_baseurl))
373         headers = {'content-type': 'application/json'}
374         response = requests.request(
375             "GET", url, headers=headers, auth=('admin', 'admin'))
376         self.assertEqual(response.status_code, requests.codes.not_found)
377
378     def test_16_check_no_interface_ODU2E_NETWORK(self):
379         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
380                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
381                "interface/XPDR1-NETWORK1-ODU2e-service1"
382                .format(self.restconf_baseurl))
383         headers = {'content-type': 'application/json'}
384         response = requests.request(
385             "GET", url, headers=headers, auth=('admin', 'admin'))
386         self.assertEqual(response.status_code, requests.codes.not_found)
387
388     def test_17_check_no_interface_ODU2E_CLIENT(self):
389         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
390                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
391                "interface/XPDR1-CLIENT1-ODU2e-service1"
392                .format(self.restconf_baseurl))
393         headers = {'content-type': 'application/json'}
394         response = requests.request(
395             "GET", url, headers=headers, auth=('admin', 'admin'))
396         self.assertEqual(response.status_code, requests.codes.not_found)
397
398     def test_18_check_no_interface_10GE_CLIENT(self):
399         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
400                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
401                "interface/XPDR1-CLIENT1-ETHERNET10G"
402                .format(self.restconf_baseurl))
403         headers = {'content-type': 'application/json'}
404         response = requests.request(
405             "GET", url, headers=headers, auth=('admin', 'admin'))
406         self.assertEqual(response.status_code, requests.codes.not_found)
407
408     def test_19_service_path_delete_ODU4(self):
409         url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
410         data = {"renderer:input": {
411             "service-name": "service_ODU4",
412             "wave-number": "1",
413             "modulation-format": "qpsk",
414             "operation": "delete",
415             "nodes": [
416                 {"node-id": "SPDR-SA1",
417                  "dest-tp": "XPDR1-NETWORK1"}]}}
418         headers = {'content-type': 'application/json'}
419         response = requests.request(
420             "POST", url, data=json.dumps(data),
421             headers=headers, auth=('admin', 'admin'))
422         time.sleep(3)
423         self.assertEqual(response.status_code, requests.codes.ok)
424         res = response.json()
425         self.assertIn('Request processed', res["output"]["result"])
426         self.assertTrue(res["output"]["success"])
427
428     def test_20_check_no_interface_ODU4(self):
429         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
430                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
431                "interface/XPDR1-NETWORK1-ODU4"
432                .format(self.restconf_baseurl))
433         headers = {'content-type': 'application/json'}
434         response = requests.request(
435             "GET", url, headers=headers, auth=('admin', 'admin'))
436         self.assertEqual(response.status_code, requests.codes.not_found)
437
438     def test_21_check_no_interface_OTU(self):
439         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
440                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
441                "interface/XPDR1-NETWORK1-OTU"
442                .format(self.restconf_baseurl))
443         headers = {'content-type': 'application/json'}
444         response = requests.request(
445             "GET", url, headers=headers, auth=('admin', 'admin'))
446         self.assertEqual(response.status_code, requests.codes.not_found)
447
448     def test_22_check_no_interface_och(self):
449         url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
450                "node/SPDR-SA1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
451                "interface/XPDR1-NETWORK1-1"
452                .format(self.restconf_baseurl))
453         headers = {'content-type': 'application/json'}
454         response = requests.request(
455             "GET", url, headers=headers, auth=('admin', 'admin'))
456         self.assertEqual(response.status_code, requests.codes.not_found)
457
458     def test_23_disconnect_SPDR_SA1(self):
459         url = ("{}/config/network-topology:"
460                "network-topology/topology/topology-netconf/node/SPDR-SA1"
461                .format(self.restconf_baseurl))
462         data = {}
463         headers = {'content-type': 'application/json'}
464         response = requests.request(
465             "DELETE", url, data=json.dumps(data), headers=headers,
466             auth=('admin', 'admin'))
467         self.assertEqual(response.status_code, requests.codes.ok)
468
469
470 if __name__ == "__main__":
471     unittest.main(verbosity=2)