Migrate renderer func. tests to RFC8040
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test06_renderer_service_path_nominal.py
1 #!/usr/bin/env python
2
3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
11
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14
15 import unittest
16 # from unittest.result import failfast
17 import requests
18 # pylint: disable=wrong-import-order
19 import sys
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040  # nopep8
24
25
26 class TransportPCERendererTesting(unittest.TestCase):
27
28     processes = None
29     NODE_VERSION = '2.2.1'
30
31     @classmethod
32     def setUpClass(cls):
33         cls.processes = test_utils_rfc8040.start_tpce()
34         cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
35
36     @classmethod
37     def tearDownClass(cls):
38         # pylint: disable=not-an-iterable
39         for process in cls.processes:
40             test_utils_rfc8040.shutdown_process(process)
41         print("all processes killed")
42
43     def test_01_rdm_device_connected(self):
44         response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
45         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
46
47     def test_02_xpdr_device_connected(self):
48         response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
49         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
50
51     def test_03_rdm_portmapping(self):
52         response = test_utils_rfc8040.get_portmapping("ROADM-A1")
53         self.assertEqual(response['status_code'], requests.codes.ok)
54         self.assertIn(
55             {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
56              'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
57              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
58             response['nodes'][0]['mapping'])
59         self.assertIn(
60             {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
61              'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional',
62              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
63             response['nodes'][0]['mapping'])
64
65     def test_04_xpdr_portmapping(self):
66         response = test_utils_rfc8040.get_portmapping("XPDR-A1")
67         self.assertEqual(response['status_code'], requests.codes.ok)
68         self.assertIn(
69             {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
70              'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
71              'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
72              'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
73              'lcp-hash-val': 'AMkDwQ7xTmRI', 'xponder-type': 'tpdr',
74              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
75             response['nodes'][0]['mapping'])
76         self.assertIn(
77             {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
78              'supporting-port': 'C1',
79              'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
80              'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
81              'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
82              'lcp-hash-val': 'AJUUr6I5fALj', 'xponder-type': 'tpdr',
83              'port-admin-state': 'InService', 'port-oper-state': 'InService'},
84             response['nodes'][0]['mapping'])
85
86     def test_05_service_path_create(self):
87         data = {
88             'input': {
89                 'service-name': 'service_test',
90                 'wave-number': '7',
91                 'modulation-format': 'dp-qpsk',
92                 'operation': 'create',
93                 'nodes': [{'node-id': 'ROADM-A1', 'src-tp': 'SRG1-PP3-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
94                           {'node-id': 'XPDR-A1', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
95                 'center-freq': 195.8,
96                 'nmc-width': 40,
97                 'min-freq': 195.775,
98                 'max-freq': 195.825,
99                 'lower-spectral-slot-number': 713,
100                 'higher-spectral-slot-number': 720
101             }
102         }
103         response = test_utils_rfc8040.device_renderer_service_path_request(data)
104         self.assertEqual(response.status_code, requests.codes.ok)
105
106     def test_06_service_path_create_rdm_check(self):
107         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-nmc-713:720")
108         self.assertEqual(response['status_code'], requests.codes.ok)
109         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
110         self.assertDictEqual(
111             dict({
112                  'name': 'DEG1-TTP-TXRX-nmc-713:720',
113                  'administrative-state': 'inService',
114                  'supporting-circuit-pack-name': '1/0',
115                  'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
116                  'supporting-port': 'L1'
117                  }, **response['interface'][0]), response['interface'][0]
118         )
119         self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
120                       [{'frequency': '195.8000', 'width': '40'},
121                        {'frequency': 195.8, 'width': 40}])
122
123     def test_07_service_path_create_rdm_check(self):
124         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-mc-713:720")
125         self.assertEqual(response['status_code'], requests.codes.ok)
126         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
127         self.assertDictEqual(
128             dict({
129                  'name': 'DEG1-TTP-TXRX-mc-7',
130                  'administrative-state': 'inService',
131                  'supporting-circuit-pack-name': '1/0',
132                  'type': 'org-openroadm-interfaces:mediaChannelTrailTerminationPoint',
133                  'supporting-port': 'L1'
134                  }, **response['interface'][0]), response['interface'][0]
135         )
136         self.assertIn(response['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'],
137                       [{'min-freq': '195.7750', 'max-freq': '195.8250'},
138                        {'min-freq': 195.775, 'max-freq': 195.825}])
139
140     def test_08_service_path_create_rdm_check(self):
141         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-nmc-713:720")
142         self.assertEqual(response['status_code'], requests.codes.ok)
143         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
144         self.assertDictEqual(
145             dict({
146                  'name': 'SRG1-PP3-TXRX-nmc-713:720',
147                  'administrative-state': 'inService',
148                  'supporting-circuit-pack-name': '3/0',
149                  'type': 'org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint',
150                  'supporting-port': 'C3'
151                  }, **response['interface'][0]), response['interface'][0]
152         )
153         self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
154                       [{'frequency': '195.8000', 'width': '40'},
155                        {'frequency': 195.8, 'width': 40}])
156
157     # -mc supporting interfaces must not be created for SRG, only degrees
158     def test_09_service_path_create_rdm_check(self):
159         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-mc-713:720")
160         self.assertEqual(response['status_code'], requests.codes.conflict)
161
162     def test_10_service_path_create_rdm_check(self):
163         response = test_utils_rfc8040.check_roadm_connections_request("ROADM-A1", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
164         self.assertEqual(response['status_code'], requests.codes.ok)
165         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
166         self.assertDictEqual(
167             dict({
168                  'connection-name': 'SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720',
169                  'opticalControlMode': 'off'
170                  }, **response['roadm-connections'][0]), response['roadm-connections'][0])
171         self.assertDictEqual({'src-if': 'SRG1-PP3-TXRX-nmc-713:720'}, response['roadm-connections'][0]['source'])
172         self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-nmc-713:720'}, response['roadm-connections'][0]['destination'])
173
174     def test_11_service_path_create_xpdr_check(self):
175         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-713:720")
176         self.assertEqual(response['status_code'], requests.codes.ok)
177         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
178         self.assertDictEqual(
179             dict({
180                  'name': 'XPDR1-NETWORK1-713:720',
181                  'administrative-state': 'inService',
182                  'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
183                  'type': 'org-openroadm-interfaces:opticalChannel',
184                  'supporting-port': '1'
185                  }, **response['interface'][0]), response['interface'][0]
186         )
187         self.assertIn(
188             response['interface'][0]['org-openroadm-optical-channel-interfaces:och'],
189             [{'rate': 'org-openroadm-common-types:R100G', 'transmit-power': '-5',
190               'modulation-format': 'dp-qpsk', 'frequency': '195.8000'},
191              {'rate': 'org-openroadm-common-types:R100G', 'transmit-power': -5,
192               'modulation-format': 'dp-qpsk', 'frequency': 195.8}])
193
194     def test_12_service_path_create_xpdr_check(self):
195         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-OTU")
196         self.assertEqual(response['status_code'], requests.codes.ok)
197         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
198         self.assertDictEqual(
199             dict({
200                  'name': 'XPDR1-NETWORK1-OTU',
201                  'administrative-state': 'inService',
202                  'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
203                  'type': 'org-openroadm-interfaces:otnOtu',
204                  'supporting-port': '1',
205                  'supporting-interface': 'XPDR1-NETWORK1-7'
206                  }, **response['interface'][0]), response['interface'][0])
207         self.assertDictEqual(
208             dict({'rate': 'org-openroadm-otn-common-types:OTU4', 'fec': 'scfec'},
209                  **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
210             response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
211
212     def test_13_service_path_create_xpdr_check(self):
213         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-ODU4")
214         self.assertEqual(response['status_code'], requests.codes.ok)
215         # the 2 following statements replace self.assertDictContainsSubset deprecated in python 3.2
216         self.assertDictEqual(
217             dict({
218                  'name': 'XPDR1-NETWORK1-ODU4',
219                  'administrative-state': 'inService',
220                  'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
221                  'type': 'org-openroadm-interfaces:otnOdu',
222                  'supporting-port': '1',
223                  'supporting-interface': 'XPDR1-NETWORK1-OTU'
224                  }, **response['interface'][0]), response['interface'][0])
225         self.assertDictEqual(
226             dict({'rate': 'org-openroadm-otn-common-types:ODU4', 'monitoring-mode': 'terminated'},
227                  **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
228             response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
229         self.assertDictEqual({'exp-payload-type': '07', 'payload-type': '07'},
230                              response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
231
232     def test_14_service_path_create_xpdr_check(self):
233         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-CLIENT1-ETHERNET")
234         self.assertEqual(response['status_code'], requests.codes.ok)
235         # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
236         self.assertDictEqual(
237             dict({
238                  'name': 'XPDR1-CLIENT1-ETHERNET',
239                  'administrative-state': 'inService',
240                  'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
241                  'type': 'org-openroadm-interfaces:ethernetCsmacd',
242                  'supporting-port': 'C1'
243                  }, **response['interface'][0]), response['interface'][0]
244         )
245         self.assertDictEqual(
246             dict({'fec': 'off', 'speed': 100000},
247                  **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
248             response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
249
250     def test_15_service_path_create_xpdr_check(self):
251         response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-NET")
252         self.assertEqual(response['status_code'], requests.codes.ok)
253         self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
254         # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
255
256     def test_16_service_path_create_xpdr_check(self):
257         response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-CLIENT")
258         self.assertEqual(response['status_code'], requests.codes.ok)
259         self.assertIn('not-reserved-inuse', response['circuit-packs'][0]['equipment-state'])
260         # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-591
261
262     def test_17_service_path_delete(self):
263         data = {
264             'input': {
265                 'service-name': 'service_test',
266                 'wave-number': '7',
267                 'modulation-format': 'dp-qpsk',
268                 'operation': 'delete',
269                 'nodes': [{'node-id': 'ROADM-A1', 'src-tp': 'SRG1-PP3-TXRX', 'dest-tp': 'DEG1-TTP-TXRX'},
270                           {'node-id': 'XPDR-A1', 'src-tp': 'XPDR1-CLIENT1', 'dest-tp': 'XPDR1-NETWORK1'}],
271                 'center-freq': 195.8,
272                 'nmc-width': 40,
273                 'min-freq': 195.775,
274                 'max-freq': 195.825,
275                 'lower-spectral-slot-number': 713,
276                 'higher-spectral-slot-number': 720
277             }
278         }
279         response = test_utils_rfc8040.device_renderer_service_path_request(data)
280         self.assertEqual(response.status_code, requests.codes.ok)
281         self.assertIn(response.json(),
282                       ({'output': {'result': 'Request processed', 'success': True}},
283                        {'transportpce-device-renderer:output': {'result': 'Request processed', 'success': True}}))
284
285     def test_18_service_path_delete_rdm_check(self):
286         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-mc-713:720")
287         self.assertEqual(response['status_code'], requests.codes.conflict)
288
289     def test_19_service_path_delete_rdm_check(self):
290         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "DEG1-TTP-TXRX-nmc-713:720")
291         self.assertEqual(response['status_code'], requests.codes.conflict)
292
293     def test_20_service_path_delete_rdm_check(self):
294         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-mc-713:720")
295         self.assertEqual(response['status_code'], requests.codes.conflict)
296
297     def test_21_service_path_delete_rdm_check(self):
298         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-nmc-713:720")
299         self.assertEqual(response['status_code'], requests.codes.conflict)
300
301     def test_22_service_path_delete_rdm_check(self):
302         response = test_utils_rfc8040.check_interface_request("ROADM-A1", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
303         self.assertEqual(response['status_code'], requests.codes.conflict)
304
305     def test_23_service_path_delete_rdm_check(self):
306         response = test_utils_rfc8040.check_roadm_connections_request("ROADM-A1", "SRG1-PP3-TXRX-DEG1-TTP-TXRX-713:720")
307         self.assertEqual(response['status_code'], requests.codes.conflict)
308
309     def test_24_service_path_delete_xpdr_check(self):
310         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-713:720")
311         self.assertEqual(response['status_code'], requests.codes.conflict)
312
313     def test_25_service_path_delete_xpdr_check(self):
314         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-OTU")
315         self.assertEqual(response['status_code'], requests.codes.conflict)
316
317     def test_26_service_path_delete_xpdr_check(self):
318         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-NETWORK1-ODU")
319         self.assertEqual(response['status_code'], requests.codes.conflict)
320
321     def test_27_service_path_delete_xpdr_check(self):
322         response = test_utils_rfc8040.check_interface_request("XPDR-A1", "XPDR1-CLIENT1-ETHERNET")
323         self.assertEqual(response['status_code'], requests.codes.conflict)
324
325     def test_28_service_path_delete_xpdr_check(self):
326         response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-NET")
327         self.assertEqual(response['status_code'], requests.codes.ok)
328         self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
329
330     def test_29_service_path_delete_xpdr_check(self):
331         response = test_utils_rfc8040.check_circuit_packs_request("XPDR-A1", "1%2F0%2F1-PLUG-CLIENT")
332         self.assertEqual(response['status_code'], requests.codes.ok)
333         self.assertEqual('not-reserved-available', response["circuit-packs"][0]['equipment-state'])
334
335     def test_30_rdm_device_disconnected(self):
336         response = test_utils_rfc8040.unmount_device("ROADM-A1")
337         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
338
339     def test_31_xpdr_device_disconnected(self):
340         response = test_utils_rfc8040.unmount_device("XPDR-A1")
341         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
342
343
344 if __name__ == "__main__":
345     unittest.main(verbosity=2, failfast=False)