Refactor func tests transportpce api rpc calls 1/2
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test06_renderer_service_path_nominal.py
1 #!/usr/bin/env python
2
3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 # from unittest.result import failfast
17 import requests
18 # pylint: disable=wrong-import-order
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040  # nopep8
24
25
26 class TransportPCERendererTesting(unittest.TestCase):
27
28     processes = None
29     NODE_VERSION = '2.2.1'
30
31     @classmethod
32     def setUpClass(cls):
33         cls.processes = test_utils_rfc8040.start_tpce()
34         cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
35
36     @classmethod
37     def tearDownClass(cls):
38         # pylint: disable=not-an-iterable
39         for process in cls.processes:
40             test_utils_rfc8040.shutdown_process(process)
41         print("all processes killed")
42
43     def test_01_rdm_device_connected(self):
44         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
45         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
46
47     def test_02_xpdr_device_connected(self):
48         response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
49         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
50
51     def test_03_rdm_portmapping(self):
52         response = test_utils_rfc8040.get_portmapping("ROADM-A1")
53         self.assertEqual(response['status_code'], requests.codes.ok)
54         self.assertIn(
55             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
56              'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
57              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
58             response['nodes'][0]['mapping'])
59         self.assertIn(
60             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
61              'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional',
62              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
63             response['nodes'][0]['mapping'])
64
65     def test_04_xpdr_portmapping(self):
66         response = test_utils_rfc8040.get_portmapping("XPDR-A1")
67         self.assertEqual(response['status_code'], requests.codes.ok)
68         self.assertIn(
69             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
70              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
71              'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
72              'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
73              'lcp-hash-val': 'AMkDwQ7xTmRI', 'xponder-type': 'tpdr',
74              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
75             response['nodes'][0]['mapping'])
76         self.assertIn(
77             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
78              'supporting-port': 'C1',
79              'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
80              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
81              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
82              'lcp-hash-val': 'AJUUr6I5fALj', 'xponder-type': 'tpdr',
83              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
84             response['nodes'][0]['mapping'])
85
86     def test_05_service_path_create(self):
87         response = test_utils_rfc8040.transportpce_api_rpc_request(
88             'transportpce-device-renderer', 'service-path',
89             {
90                 'service-name': 'service_test',
91                 'wave-number': '7',
92                 'modulation-format': 'dp-qpsk',
93                 'operation': 'create',
94                 'nodes': [{'node-id': 'ROADM-A1', 'src-tp': 'SRG1-PP3-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
95                           {'node-id': 'XPDR-A1', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
96                 'center-freq': 195.8,
97                 'nmc-width': 40,
98                 'min-freq': 195.775,
99                 'max-freq': 195.825,
100                 'lower-spectral-slot-number': 713,
101                 'higher-spectral-slot-number': 720
102             })
103         self.assertEqual(response['status_code'], requests.codes.ok)
104         self.assertIn('Interfaces created successfully for nodes: ROADM-A1', response['output']['result'])
105
106     def test_06_service_path_create_rdm_check(self):
107         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-nmc-713:720")
108         self.assertEqual(response['status_code'], requests.codes.ok)
109         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
110         self.assertDictEqual(
111             dict({
112                  'name': 'DEG1-TTP-TXRX-nmc-713:720',
113                  'administrative-state': 'inService',
114                  'supporting-circuit-pack-name': '1/0',
115                  'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
116                  'supporting-port': 'L1'
117                  }, **response['interface'][0]), response['interface'][0]
118         )
119         self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
120                       [{'frequency': '195.8000', 'width': '40'},
121                        {'frequency': 195.8, 'width': 40}])
122
123     def test_07_service_path_create_rdm_check(self):
124         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-mc-713:720")
125         self.assertEqual(response['status_code'], requests.codes.ok)
126         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
127         self.assertDictEqual(
128             dict({
129                  'name': 'DEG1-TTP-TXRX-mc-7',
130                  'administrative-state': 'inService',
131                  'supporting-circuit-pack-name': '1/0',
132                  'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
133                  'supporting-port': 'L1'
134                  }, **response['interface'][0]), response['interface'][0]
135         )
136         self.assertIn(response['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'],
137                       [{'min-freq': '195.7750', 'max-freq': '195.8250'},
138                        {'min-freq': 195.775, 'max-freq': 195.825}])
139
140     def test_08_service_path_create_rdm_check(self):
141         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-nmc-713:720")
142         self.assertEqual(response['status_code'], requests.codes.ok)
143         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
144         self.assertDictEqual(
145             dict({
146                  'name': 'SRG1-PP3-TXRX-nmc-713:720',
147                  'administrative-state': 'inService',
148                  'supporting-circuit-pack-name': '3/0',
149                  'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
150                  'supporting-port': 'C3'
151                  }, **response['interface'][0]), response['interface'][0]
152         )
153         self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
154                       [{'frequency': '195.8000', 'width': '40'},
155                        {'frequency': 195.8, 'width': 40}])
156
157     # -mc supporting interfaces must not be created for SRG, only degrees
158     def test_09_service_path_create_rdm_check(self):
159         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-mc-713:720")
160         self.assertEqual(response['status_code'], requests.codes.conflict)
161
162     def test_10_service_path_create_rdm_check(self):
163         response = test_utils_rfc8040.check_node_attribute_request(
164             "ROADM-A1", "roadm-connections", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
165         self.assertEqual(response['status_code'], requests.codes.ok)
166         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
167         self.assertDictEqual(
168             dict({
169                  'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720',
170                  'opticalControlMode': 'off'
171                  }, **response['roadm-connections'][0]), response['roadm-connections'][0])
172         self.assertDictEqual({'src-if': 'SRG1-PP3-TXRX-nmc-713:720'}, response['roadm-connections'][0]['source'])
173         self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-nmc-713:720'}, response['roadm-connections'][0]['destination'])
174
175     def test_11_service_path_create_xpdr_check(self):
176         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-713:720")
177         self.assertEqual(response['status_code'], requests.codes.ok)
178         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
179         self.assertDictEqual(
180             dict({
181                  'name': 'XPDR1-NETWORK1-713:720',
182                  'administrative-state': 'inService',
183                  'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
184                  'type': 'org-openroadm-interfaces:opticalChannel',
185                  'supporting-port': '1'
186                  }, **response['interface'][0]), response['interface'][0]
187         )
188         self.assertIn(
189             response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
190             [{'rate': 'org-openroadm-common-types:R100G', 'transmit-power': '-5',
191               'modulation-format': 'dp-qpsk', 'frequency': '195.8000'},
192              {'rate': 'org-openroadm-common-types:R100G', 'transmit-power': -5,
193               'modulation-format': 'dp-qpsk', 'frequency': 195.8}])
194
195     def test_12_service_path_create_xpdr_check(self):
196         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-OTU")
197         self.assertEqual(response['status_code'], requests.codes.ok)
198         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
199         self.assertDictEqual(
200             dict({
201                  'name': 'XPDR1-NETWORK1-OTU',
202                  'administrative-state': 'inService',
203                  'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
204                  'type': 'org-openroadm-interfaces:otnOtu',
205                  'supporting-port': '1',
206                  'supporting-interface': 'XPDR1-NETWORK1-7'
207                  }, **response['interface'][0]), response['interface'][0])
208         self.assertDictEqual(
209             dict({'rate': 'org-openroadm-otn-common-types:OTU4', 'fec': 'scfec'},
210                  **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
211             response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
212
213     def test_13_service_path_create_xpdr_check(self):
214         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-ODU4")
215         self.assertEqual(response['status_code'], requests.codes.ok)
216         # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
217         self.assertDictEqual(
218             dict({
219                  'name': 'XPDR1-NETWORK1-ODU4',
220                  'administrative-state': 'inService',
221                  'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
222                  'type': 'org-openroadm-interfaces:otnOdu',
223                  'supporting-port': '1',
224                  'supporting-interface': 'XPDR1-NETWORK1-OTU'
225                  }, **response['interface'][0]), response['interface'][0])
226         self.assertDictEqual(
227             dict({'rate': 'org-openroadm-otn-common-types:ODU4', 'monitoring-mode': 'terminated'},
228                  **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
229             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
230         self.assertDictEqual({'exp-payload-type': '07', 'payload-type': '07'},
231                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
232
233     def test_14_service_path_create_xpdr_check(self):
234         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-CLIENT1-ETHERNET")
235         self.assertEqual(response['status_code'], requests.codes.ok)
236         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
237         self.assertDictEqual(
238             dict({
239                  'name': 'XPDR1-CLIENT1-ETHERNET',
240                  'administrative-state': 'inService',
241                  'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
242                  'type': 'org-openroadm-interfaces:ethernetCsmacd',
243                  'supporting-port': 'C1'
244                  }, **response['interface'][0]), response['interface'][0]
245         )
246         self.assertDictEqual(
247             dict({'fec': 'off', 'speed': 100000},
248                  **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
249             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
250
251     def test_15_service_path_create_xpdr_check(self):
252         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-NET")
253         self.assertEqual(response['status_code'], requests.codes.ok)
254         self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
255         # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
256
257     def test_16_service_path_create_xpdr_check(self):
258         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-CLIENT")
259         self.assertEqual(response['status_code'], requests.codes.ok)
260         self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
261         # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
262
263     def test_17_service_path_delete(self):
264         response = test_utils_rfc8040.transportpce_api_rpc_request(
265             'transportpce-device-renderer', 'service-path',
266             {
267                 'service-name': 'service_test',
268                 'wave-number': '7',
269                 'modulation-format': 'dp-qpsk',
270                 'operation': 'delete',
271                 'nodes': [{'node-id': 'ROADM-A1', 'src-tp': 'SRG1-PP3-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
272                           {'node-id': 'XPDR-A1', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
273                 'center-freq': 195.8,
274                 'nmc-width': 40,
275                 'min-freq': 195.775,
276                 'max-freq': 195.825,
277                 'lower-spectral-slot-number': 713,
278                 'higher-spectral-slot-number': 720
279             })
280         self.assertEqual(response['status_code'], requests.codes.ok)
281         self.assertDictEqual(response['output'], {'result': 'Request processed', 'success': True})
282
283     def test_18_service_path_delete_rdm_check(self):
284         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-mc-713:720")
285         self.assertEqual(response['status_code'], requests.codes.conflict)
286
287     def test_19_service_path_delete_rdm_check(self):
288         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "DEG1-TTP-TXRX-nmc-713:720")
289         self.assertEqual(response['status_code'], requests.codes.conflict)
290
291     def test_20_service_path_delete_rdm_check(self):
292         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-mc-713:720")
293         self.assertEqual(response['status_code'], requests.codes.conflict)
294
295     def test_21_service_path_delete_rdm_check(self):
296         response = test_utils_rfc8040.check_node_attribute_request("ROADM-A1", "interface", "SRG1-PP3-TXRX-nmc-713:720")
297         self.assertEqual(response['status_code'], requests.codes.conflict)
298
299     def test_22_service_path_delete_rdm_check(self):
300         response = test_utils_rfc8040.check_node_attribute_request(
301             "ROADM-A1", "interface", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
302         self.assertEqual(response['status_code'], requests.codes.conflict)
303
304     def test_23_service_path_delete_rdm_check(self):
305         response = test_utils_rfc8040.check_node_attribute_request(
306             "ROADM-A1", "roadm-connections", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
307         self.assertEqual(response['status_code'], requests.codes.conflict)
308
309     def test_24_service_path_delete_xpdr_check(self):
310         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-713:720")
311         self.assertEqual(response['status_code'], requests.codes.conflict)
312
313     def test_25_service_path_delete_xpdr_check(self):
314         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-OTU")
315         self.assertEqual(response['status_code'], requests.codes.conflict)
316
317     def test_26_service_path_delete_xpdr_check(self):
318         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-NETWORK1-ODU")
319         self.assertEqual(response['status_code'], requests.codes.conflict)
320
321     def test_27_service_path_delete_xpdr_check(self):
322         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "interface", "XPDR1-CLIENT1-ETHERNET")
323         self.assertEqual(response['status_code'], requests.codes.conflict)
324
325     def test_28_service_path_delete_xpdr_check(self):
326         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-NET")
327         self.assertEqual(response['status_code'], requests.codes.ok)
328         self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
329
330     def test_29_service_path_delete_xpdr_check(self):
331         response = test_utils_rfc8040.check_node_attribute_request("XPDR-A1", "circuit-packs", "1%2F0%2F1-PLUG-CLIENT")
332         self.assertEqual(response['status_code'], requests.codes.ok)
333         self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
334
335     def test_30_rdm_device_disconnected(self):
336         response = test_utils_rfc8040.unmount_device("ROADM-A1")
337         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
338
339     def test_31_xpdr_device_disconnected(self):
340         response = test_utils_rfc8040.unmount_device("XPDR-A1")
341         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
342
343
344 if __name__ == "__main__":
345     unittest.main(verbosity=2, failfast=False)