Refactor func tests transportpce api rpc calls 1/2
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test04_renderer_service_path_nominal.py
1 #!/usr/bin/env python
2
3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 import time
17 import requests
18 # pylint: disable=wrong-import-order
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040  # nopep8
24
25
26 class TransportPCERendererTesting(unittest.TestCase):
27
28     processes = None
29     NODE_VERSION = '1.2.1'
30
31     @classmethod
32     def setUpClass(cls):
33         cls.processes = test_utils_rfc8040.start_tpce()
34         cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
35
36     @classmethod
37     def tearDownClass(cls):
38         # pylint: disable=not-an-iterable
39         for process in cls.processes:
40             test_utils_rfc8040.shutdown_process(process)
41         print("all processes killed")
42
43     def setUp(self):
44         # pylint: disable=consider-using-f-string
45         print("execution of {}".format(self.id().split(".")[-1]))
46         time.sleep(10)
47
48     def test_01_rdm_device_connected(self):
49         response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
50         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
51
52     def test_02_xpdr_device_connected(self):
53         response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
54         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
55
56     def test_03_rdm_portmapping(self):
57         response = test_utils_rfc8040.get_portmapping("ROADMA01")
58         self.assertEqual(response['status_code'], requests.codes.ok)
59         self.assertIn(
60             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
61              'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
62              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
63             response['nodes'][0]['mapping'])
64         self.assertIn(
65             {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
66              'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional',
67              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
68             response['nodes'][0]['mapping'])
69
70     def test_04_xpdr_portmapping(self):
71         response = test_utils_rfc8040.get_portmapping("XPDRA01")
72         self.assertEqual(response['status_code'], requests.codes.ok)
73         self.assertIn(
74             {'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
75              'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
76              'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
77              'lcp-hash-val': 'OSvMgUyP+mE=', 'xponder-type': 'tpdr',
78              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
79             response['nodes'][0]['mapping'])
80         self.assertIn(
81             {'supporting-port': 'C1',
82              'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
83              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
84              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
85              'lcp-hash-val': 'AO9UFkY/TLYw', 'xponder-type': 'tpdr',
86              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
87             response['nodes'][0]['mapping'])
88
89     def test_05_service_path_create(self):
90         response = test_utils_rfc8040.transportpce_api_rpc_request(
91             'transportpce-device-renderer', 'service-path',
92             {
93                 'service-name': 'service_test',
94                 'wave-number': '7',
95                 'modulation-format': 'dp-qpsk',
96                 'operation': 'create',
97                 'nodes': [{'node-id': 'ROADMA01', 'src-tp': 'SRG1-PP7-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
98                           {'node-id': 'XPDRA01', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
99                 'center-freq': 195.8,
100                 'nmc-width': 40,
101                 'min-freq': 195.775,
102                 'max-freq': 195.825,
103                 'lower-spectral-slot-number': 713,
104                 'higher-spectral-slot-number': 720
105             })
106         self.assertEqual(response['status_code'], requests.codes.ok)
107         self.assertIn('Interfaces created successfully for nodes: ROADMA01', response['output']['result'])
108
109     def test_06_service_path_create_rdm_check(self):
110         response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
111         self.assertEqual(response['status_code'], requests.codes.ok)
112         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
113         self.assertDictEqual(
114             dict({
115                 'name': 'DEG1-TTP-TXRX-713:720',
116                 'administrative-state': 'inService',
117                 'supporting-circuit-pack-name': '2/0',
118                 'type': 'org-openroadm-interfaces:opticalChannel',
119                 'supporting-port': 'L1'
120             }, **response['interface'][0]), response['interface'][0])
121         self.assertDictEqual(
122             {'wavelength-number': 7},
123             response['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
124
125     def test_07_service_path_create_rdm_check(self):
126         response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "SRG1-PP7-TXRX-713:720")
127         self.assertEqual(response['status_code'], requests.codes.ok)
128         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
129         self.assertDictEqual(
130             dict({
131                 'name': 'SRG1-PP7-TXRX-713:720',
132                 'administrative-state': 'inService',
133                 'supporting-circuit-pack-name': '4/0',
134                 'type': 'org-openroadm-interfaces:opticalChannel',
135                 'supporting-port': 'C7'
136             }, **response['interface'][0]), response['interface'][0])
137         self.assertDictEqual(
138             {'wavelength-number': 7},
139             response['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
140
141     def test_08_service_path_create_rdm_check(self):
142         response = test_utils_rfc8040.check_node_attribute_request(
143             "ROADMA01", "roadm-connections", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
144         self.assertEqual(response['status_code'], requests.codes.ok)
145         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
146         self.assertDictEqual(
147             dict({
148                 'connection-number': 'SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720',
149                 'wavelength-number': 7,
150                 'opticalControlMode': 'off'
151             }, **response['roadm-connections'][0]), response['roadm-connections'][0])
152         self.assertDictEqual({'src-if': 'SRG1-PP7-TXRX-713:720'}, response['roadm-connections'][0]['source'])
153         self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-713:720'}, response['roadm-connections'][0]['destination'])
154
155     def test_09_service_path_create_xpdr_check(self):
156         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-713:720")
157         self.assertEqual(response['status_code'], requests.codes.ok)
158         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
159         self.assertDictEqual(
160             dict({
161                 'name': 'XPDR1-NETWORK1-713:720',
162                 'administrative-state': 'inService',
163                 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
164                 'type': 'org-openroadm-interfaces:opticalChannel',
165                 'supporting-port': '1'
166             }, **response['interface'][0]), response['interface'][0])
167         self.assertIn(
168             response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
169             [{'rate': 'org-openroadm-optical-channel-interfaces:R100G',
170               'transmit-power': '-5',
171               'wavelength-number': 7,
172               'modulation-format': 'dp-qpsk'},
173              {'rate': 'org-openroadm-optical-channel-interfaces:R100G',
174               'transmit-power': -5,
175               'wavelength-number': 7,
176               'modulation-format': 'dp-qpsk'}])
177
178     def test_10_service_path_create_xpdr_check(self):
179         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-OTU")
180         self.assertEqual(response['status_code'], requests.codes.ok)
181         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
182         self.assertDictEqual(
183             dict({
184                 'name': 'XPDR1-NETWORK1-OTU',
185                 'administrative-state': 'inService',
186                 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
187                 'type': 'org-openroadm-interfaces:otnOtu',
188                 'supporting-port': '1',
189                 'supporting-interface': 'XPDR1-NETWORK1-713:720'
190             }, **response['interface'][0]), response['interface'][0])
191         self.assertDictEqual(
192             {'rate': 'org-openroadm-otn-otu-interfaces:OTU4', 'fec': 'scfec'},
193             response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
194
195     def test_11_service_path_create_xpdr_check(self):
196         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-ODU")
197         self.assertEqual(response['status_code'], requests.codes.ok)
198         # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
199         self.assertDictEqual(
200             dict({
201                 'name': 'XPDR1-NETWORK1-ODU',
202                 'administrative-state': 'inService',
203                 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
204                 'type': 'org-openroadm-interfaces:otnOdu',
205                 'supporting-port': '1',
206                 'supporting-interface': 'XPDR1-NETWORK1-OTU'
207             }, **response['interface'][0]), response['interface'][0])
208         self.assertDictEqual(
209             dict({
210                 'rate': 'org-openroadm-otn-odu-interfaces:ODU4',
211                 'monitoring-mode': 'terminated'
212             }, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
213             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
214         )
215         self.assertDictEqual({'exp-payload-type': '07', 'payload-type': '07'},
216                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
217
218     def test_12_service_path_create_xpdr_check(self):
219         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-CLIENT1-ETHERNET")
220         self.assertEqual(response['status_code'], requests.codes.ok)
221         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
222         self.assertDictEqual(
223             dict({
224                 'name': 'XPDR1-CLIENT1-ETHERNET',
225                 'administrative-state': 'inService',
226                 'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
227                 'type': 'org-openroadm-interfaces:ethernetCsmacd',
228                 'supporting-port': 'C1'
229             }, **response['interface'][0]), response['interface'][0])
230         self.assertDictEqual(
231             {'speed': 100000,
232              'mtu': 9000,
233              'auto-negotiation': 'enabled',
234              'duplex': 'full',
235              'fec': 'off'},
236             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
237
238     def test_13_service_path_create_xpdr_check(self):
239         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "circuit-packs", "1%2F0%2F1-PLUG-NET")
240         # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
241         self.assertEqual(response['status_code'], requests.codes.ok)
242         self.assertIn('not-reserved-inuse', response['circuit-packs'][0]["equipment-state"])
243
244     def test_14_service_path_delete(self):
245         response = test_utils_rfc8040.transportpce_api_rpc_request(
246             'transportpce-device-renderer', 'service-path',
247             {
248                 'service-name': 'service_test',
249                 'wave-number': '7',
250                 'modulation-format': 'dp-qpsk',
251                 'operation': 'delete',
252                 'nodes': [{'node-id': 'ROADMA01', 'src-tp': 'SRG1-PP7-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
253                           {'node-id': 'XPDRA01', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
254                 'center-freq': 195.8,
255                 'nmc-width': 40,
256                 'min-freq': 195.775,
257                 'max-freq': 195.825,
258                 'lower-spectral-slot-number': 713,
259                 'higher-spectral-slot-number': 720
260             })
261         self.assertEqual(response['status_code'], requests.codes.ok)
262         self.assertDictEqual(response['output'], {'result': 'Request processed', 'success': True})
263
264     def test_15_service_path_delete_rdm_check(self):
265         response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "DEG1-TTP-TXRX-713:720")
266         self.assertEqual(response['status_code'], requests.codes.conflict)
267
268     def test_16_service_path_delete_rdm_check(self):
269         response = test_utils_rfc8040.check_node_attribute_request("ROADMA01", "interface", "SRG1-PP7-TXRX-713:720")
270         self.assertEqual(response['status_code'], requests.codes.conflict)
271
272     def test_17_service_path_delete_rdm_check(self):
273         response = test_utils_rfc8040.check_node_attribute_request(
274             "ROADMA01", "roadm-connections", "SRG1-PP7-TXRX-DEG1-TTP-TXRX-713:720")
275         self.assertEqual(response['status_code'], requests.codes.conflict)
276
277     def test_18_service_path_delete_xpdr_check(self):
278         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-713:720")
279         self.assertEqual(response['status_code'], requests.codes.conflict)
280
281     def test_19_service_path_delete_xpdr_check(self):
282         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-OTU")
283         self.assertEqual(response['status_code'], requests.codes.conflict)
284
285     def test_20_service_path_delete_xpdr_check(self):
286         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-NETWORK1-ODU")
287         self.assertEqual(response['status_code'], requests.codes.conflict)
288
289     def test_21_service_path_delete_xpdr_check(self):
290         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "interface", "XPDR1-CLIENT1-ETHERNET")
291         self.assertEqual(response['status_code'], requests.codes.conflict)
292
293     def test_22_service_path_delete_xpdr_check(self):
294         response = test_utils_rfc8040.check_node_attribute_request("XPDRA01", "circuit-packs", "1%2F0%2F1-PLUG-NET")
295         self.assertEqual(response['status_code'], requests.codes.ok)
296         self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
297
298     def test_23_rdm_device_disconnected(self):
299         response = test_utils_rfc8040.unmount_device("ROADMA01")
300         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
301
302     def test_24_xpdr_device_disconnected(self):
303         response = test_utils_rfc8040.unmount_device("XPDRA01")
304         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
305
306
307 if __name__ == "__main__":
308     unittest.main(verbosity=2, failfast=False)