Refactor func tests transportpce api rpc calls 1/2
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test05_flex_grid.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13
14 import unittest
15 import time
16 import requests
17 # pylint: disable=wrong-import-order
18 import sys
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils_rfc8040  # nopep8
23
24
25 class TransportPCEPortMappingTesting(unittest.TestCase):
26
27     processes = None
28     NODE_VERSION = '2.2.1'
29
30     @classmethod
31     def setUpClass(cls):
32         cls.processes = test_utils_rfc8040.start_tpce()
33         cls.processes = test_utils_rfc8040.start_sims([('roadmd', cls.NODE_VERSION)])
34
35     @classmethod
36     def tearDownClass(cls):
37         # pylint: disable=not-an-iterable
38         for process in cls.processes:
39             test_utils_rfc8040.shutdown_process(process)
40         print("all processes killed")
41
42     def setUp(self):
43         # pylint: disable=consider-using-f-string
44         print("execution of {}".format(self.id().split(".")[-1]))
45         time.sleep(10)
46
47     def test_01_rdm_device_connection(self):
48         response = test_utils_rfc8040.mount_device("ROADM-D1", ('roadmd', self.NODE_VERSION))
49         self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
50
51     def test_02_rdm_device_connected(self):
52         response = test_utils_rfc8040.check_device_connection("ROADM-D1")
53         self.assertEqual(response['status_code'], requests.codes.ok)
54         self.assertEqual(response['connection-status'], 'connected')
55         time.sleep(10)
56
57     def test_03_rdm_portmapping_info(self):
58         response = test_utils_rfc8040.get_portmapping_node_info("ROADM-D1")
59         self.assertEqual(response['status_code'], requests.codes.ok)
60         self.assertEqual(
61             {'node-type': 'rdm',
62              'node-ip-address': '127.0.0.14',
63              'node-clli': 'NodeD',
64              'openroadm-version': '2.2.1',
65              'node-vendor': 'vendorD',
66              'node-model': 'model2'},
67             response['node-info'])
68         time.sleep(3)
69
70     def test_04_rdm_deg1_lcp(self):
71         # pylint: disable=line-too-long
72         response = test_utils_rfc8040.portmapping_mc_capa_request("ROADM-D1", "DEG1-TTP")
73         self.assertEqual(response['status_code'], requests.codes.ok)
74         self.assertIn(response['mc-capabilities'],
75                       [[{'mc-node-name': 'DEG1-TTP', 'center-freq-granularity': '6.25', 'slot-width-granularity': '12.5'}],
76                        [{'mc-node-name': 'DEG1-TTP', 'center-freq-granularity': 6.25, 'slot-width-granularity': 12.5}]])
77         time.sleep(3)
78
79     def test_05_rdm_deg2_lcp(self):
80         # pylint: disable=line-too-long
81         response = test_utils_rfc8040.portmapping_mc_capa_request("ROADM-D1", "DEG2-TTP")
82         self.assertEqual(response['status_code'], requests.codes.ok)
83         self.assertIn(response['mc-capabilities'],
84                       [[{'mc-node-name': 'DEG2-TTP', 'center-freq-granularity': '6.25', 'slot-width-granularity': '12.5'}],
85                        [{'mc-node-name': 'DEG2-TTP', 'center-freq-granularity': 6.25, 'slot-width-granularity': 12.5}]])
86         time.sleep(3)
87
88     def test_06_rdm_srg1_lcp(self):
89         # pylint: disable=line-too-long
90         response = test_utils_rfc8040.portmapping_mc_capa_request("ROADM-D1", "SRG1-PP")
91         self.assertEqual(response['status_code'], requests.codes.ok)
92         self.assertIn(response['mc-capabilities'],
93                       [[{'mc-node-name': 'SRG1-PP', 'center-freq-granularity': '6.25', 'slot-width-granularity': '12.5'}],
94                        [{'mc-node-name': 'SRG1-PP', 'center-freq-granularity': 6.25, 'slot-width-granularity': 12.5}]])
95         time.sleep(3)
96
97     # Renderer interface creations
98     def test_07_device_renderer(self):
99         response = test_utils_rfc8040.transportpce_api_rpc_request(
100             'transportpce-device-renderer', 'service-path',
101             {
102                 'modulation-format': 'dp-qpsk',
103                 'operation': 'create',
104                 'service-name': 'testNMC-MC',
105                 'wave-number': '0',
106                 'center-freq': '196.05',
107                 'nmc-width': '80',
108                 'nodes': [
109                     {
110                         'node-id': 'ROADM-D1',
111                         'src-tp': 'SRG1-PP1-TXRX',
112                         'dest-tp': 'DEG1-TTP-TXRX'
113                     }
114                 ],
115                 'min-freq': 196.00625,
116                 'max-freq': 196.09375,
117                 'lower-spectral-slot-number': 749,
118                 'higher-spectral-slot-number': 763
119             })
120         self.assertEqual(response['status_code'], requests.codes.ok)
121         self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
122         time.sleep(10)
123
124     # Get Degree MC interface and check
125     def test_08_degree_mc_interface(self):
126         response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-mc-749:763")
127         self.assertEqual(response['status_code'], requests.codes.ok)
128         self.assertDictEqual(
129             dict({"name": "DEG1-TTP-TXRX-mc-749:763",
130                   "supporting-interface": "OMS-DEG1-TTP-TXRX",
131                   "supporting-circuit-pack-name": "1/0",
132                   "circuit-id": "TBD",
133                   "description": "TBD",
134                   "supporting-port": "L1",
135                   "type": "org-openroadm-interfaces:mediaChannelTrailTerminationPoint"},
136                  **response['interface'][0]), response['interface'][0])
137
138         # Check the mc-ttp max and min-freq
139         self.assertIn(response['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'],
140                       [{'min-freq': '196.00625', 'max-freq': '196.09375'},
141                        {'min-freq': 196.00625, 'max-freq': 196.09375}])
142         time.sleep(3)
143
144     # get DEG-NMC interface and check
145     def test_09_degree_nmc_interface(self):
146         response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-nmc-749:763")
147         self.assertEqual(response['status_code'], requests.codes.ok)
148         self.assertDictEqual(
149             dict({"name": "DEG1-TTP-TXRX-nmc-749:763",
150                   "supporting-interface": "DEG1-TTP-TXRX-mc-749:763",
151                   "supporting-circuit-pack-name": "1/0",
152                   "circuit-id": "TBD",
153                   "description": "TBD",
154                   "supporting-port": "L1",
155                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
156                  **response['interface'][0]), response['interface'][0])
157
158         # Check the mc-ttp max and min-freq
159         self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
160                       [{'frequency': '196.05000', 'width': '80'},
161                        {'frequency': 196.05, 'width': 80}])
162         time.sleep(3)
163
164     # get SRG-NMC interface
165     def test_10_srg_nmc_interface(self):
166         response = test_utils_rfc8040.check_node_attribute_request("ROADM-D1", "interface", "SRG1-PP1-TXRX-nmc-749:763")
167         self.assertEqual(response['status_code'], requests.codes.ok)
168         self.assertEqual(
169             dict({"name": "SRG1-PP1-TXRX-nmc-749:763",
170                   "supporting-circuit-pack-name": "3/0",
171                   "circuit-id": "TBD",
172                   "description": "TBD",
173                   "supporting-port": "C1",
174                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
175                  **response['interface'][0]), response['interface'][0])
176         self.assertIn(response['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'],
177                       [{'frequency': '196.05000', 'width': '80'},
178                        {'frequency': 196.05, 'width': 80}])
179         time.sleep(3)
180
181     # Create ROADM-connection
182     def test_11_roadm_connection(self):
183         response = test_utils_rfc8040.check_node_attribute_request(
184             "ROADM-D1", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
185         self.assertEqual(response['status_code'], requests.codes.ok)
186         self.assertEqual("SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763",
187                          response['roadm-connections'][0]['connection-name'])
188         self.assertEqual("SRG1-PP1-TXRX-nmc-749:763",
189                          response['roadm-connections'][0]['source']['src-if'])
190         self.assertEqual("DEG1-TTP-TXRX-nmc-749:763",
191                          response['roadm-connections'][0]['destination']['dst-if'])
192         time.sleep(3)
193
194     # Delete ROADM connections and interfaces
195
196     # delete ROADM connection
197     def test_12_delete_roadm_connection(self):
198         response = test_utils_rfc8040.del_node_attribute_request(
199             "ROADM-D1", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
200         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
201         time.sleep(3)
202
203     # Delete NMC SRG interface
204     def test_13_delete_srg_interface(self):
205         response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "SRG1-PP1-TXRX-nmc-749:763")
206         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
207         time.sleep(3)
208
209     # Delete NMC Degree interface
210     def test_14_delete_degree_interface(self):
211         response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-nmc-749:763")
212         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
213         time.sleep(3)
214
215     # Delete MC Degree interface
216     def test_15_delete_degree_interface(self):
217         response = test_utils_rfc8040.del_node_attribute_request("ROADM-D1", "interface", "DEG1-TTP-TXRX-mc-749:763")
218         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
219         time.sleep(3)
220
221     def test_16_disconnect_ROADM_D1(self):
222         response = test_utils_rfc8040.unmount_device("ROADM-D1")
223         self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
224
225
226 if __name__ == "__main__":
227     unittest.main(verbosity=2)