Merge changes I018b69e8,Ia99c607c,Idc241d22
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test05_flex_grid.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13
14 import unittest
15 import time
16 import requests
17 # pylint: disable=wrong-import-order
18 import sys
19 sys.path.append('transportpce_tests/common/')
20 # pylint: disable=wrong-import-position
21 # pylint: disable=import-error
22 import test_utils  # nopep8
23
24
25 class TransportPCEPortMappingTesting(unittest.TestCase):
26
27     processes = None
28     NODE_VERSION = '2.2.1'
29
30     @classmethod
31     def setUpClass(cls):
32         cls.processes = test_utils.start_tpce()
33         cls.processes = test_utils.start_sims([('roadmd', cls.NODE_VERSION)])
34
35     @classmethod
36     def tearDownClass(cls):
37         # pylint: disable=not-an-iterable
38         for process in cls.processes:
39             test_utils.shutdown_process(process)
40         print("all processes killed")
41
42     def setUp(self):
43         # pylint: disable=consider-using-f-string
44         print("execution of {}".format(self.id().split(".")[-1]))
45         time.sleep(10)
46
47     def test_01_rdm_device_connection(self):
48         response = test_utils.mount_device("ROADM-D1", ('roadmd', self.NODE_VERSION))
49         self.assertEqual(response.status_code, requests.codes.created,
50                          test_utils.CODE_SHOULD_BE_201)
51
52     def test_02_rdm_device_connected(self):
53         response = test_utils.get_netconf_oper_request("ROADM-D1")
54         self.assertEqual(response.status_code, requests.codes.ok)
55         res = response.json()
56         self.assertEqual(
57             res['node'][0]['netconf-node-topology:connection-status'],
58             'connected')
59         time.sleep(10)
60
61     def test_03_rdm_portmapping_info(self):
62         response = test_utils.portmapping_request("ROADM-D1/node-info")
63         self.assertEqual(response.status_code, requests.codes.ok)
64         res = response.json()
65         self.assertEqual(
66             {'node-info': {'node-type': 'rdm',
67                            'node-ip-address': '127.0.0.14',
68                            'node-clli': 'NodeD',
69                            'openroadm-version': '2.2.1',
70                            'node-vendor': 'vendorD',
71                            'node-model': 'model2',
72                            }},
73             res)
74         time.sleep(3)
75
76     def test_04_rdm_deg1_lcp(self):
77         response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/DEG1-TTP")
78         self.assertEqual(response.status_code, requests.codes.ok)
79         res = response.json()
80         self.assertEqual(
81             {
82                 "mc-capabilities": [
83                     {
84                         "mc-node-name": "DEG1-TTP",
85                         "center-freq-granularity": 6.25,
86                         "slot-width-granularity": 12.5
87                     }
88                 ]
89             }, res)
90         time.sleep(3)
91
92     def test_05_rdm_deg2_lcp(self):
93         response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/DEG2-TTP")
94         self.assertEqual(response.status_code, requests.codes.ok)
95         res = response.json()
96         self.assertEqual(
97             {
98                 "mc-capabilities": [
99                     {
100                         "mc-node-name": "DEG2-TTP",
101                         "center-freq-granularity": 6.25,
102                         "slot-width-granularity": 12.5
103                     }
104                 ]
105             }, res)
106         time.sleep(3)
107
108     def test_06_rdm_srg1_lcp(self):
109         response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/SRG1-PP")
110         self.assertEqual(response.status_code, requests.codes.ok)
111         res = response.json()
112         self.assertEqual(
113             {
114                 "mc-capabilities": [
115                     {
116                         "mc-node-name": "SRG1-PP",
117                         "center-freq-granularity": 6.25,
118                         "slot-width-granularity": 12.5
119                     }
120                 ]
121             }, res)
122         time.sleep(3)
123
124     # Renderer interface creations
125     def test_07_device_renderer(self):
126         data = {
127             "transportpce-device-renderer:input": {
128                 "transportpce-device-renderer:modulation-format": "dp-qpsk",
129                 "transportpce-device-renderer:operation": "create",
130                 "transportpce-device-renderer:service-name": "testNMC-MC",
131                 "transportpce-device-renderer:wave-number": "0",
132                 "transportpce-device-renderer:center-freq": "196.05",
133                 "transportpce-device-renderer:width": "80",
134                 "transportpce-device-renderer:nodes": [
135                     {
136                         "transportpce-device-renderer:node-id": "ROADM-D1",
137                         "transportpce-device-renderer:src-tp": "SRG1-PP1-TXRX",
138                         "transportpce-device-renderer:dest-tp": "DEG1-TTP-TXRX"
139                     }
140                 ],
141                 "transportpce-device-renderer:min-freq": 196.00625,
142                 "transportpce-device-renderer:max-freq": 196.09375,
143                 "transportpce-device-renderer:lower-spectral-slot-number": 749,
144                 "transportpce-device-renderer:higher-spectral-slot-number": 763
145             }
146         }
147         url = test_utils.RESTCONF_BASE_URL + \
148             "/operations/transportpce-device-renderer:service-path"
149         response = test_utils.post_request(url, data)
150         self.assertEqual(response.status_code, requests.codes.ok)
151         res = response.json()
152         self.assertIn('Roadm-connection successfully created for nodes',
153                       res['output']['result'])
154         time.sleep(10)
155
156     # Get Degree MC interface and check
157     def test_08_degree_mc_interface(self):
158         response = test_utils.check_netconf_node_request("ROADM-D1",
159                                                          "interface/DEG1-TTP-TXRX-mc-749:763")
160         self.assertEqual(response.status_code, requests.codes.ok)
161         res = response.json()
162         self.assertDictEqual(
163             dict({"name": "DEG1-TTP-TXRX-mc-749:763",
164                   "supporting-interface": "OMS-DEG1-TTP-TXRX",
165                   "supporting-circuit-pack-name": "1/0",
166                   "circuit-id": "TBD",
167                   "description": "TBD",
168                   "supporting-port": "L1",
169                   "type": "org-openroadm-interfaces:mediaChannelTrailTerminationPoint"},
170                  **res['interface'][0]),
171             res['interface'][0])
172
173         # Check the mc-ttp max and min-freq
174         self.assertEqual({
175             "min-freq": 196.00625,
176             "max-freq": 196.09375
177         },
178             res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
179         time.sleep(3)
180
181     # get DEG-NMC interface and check
182     def test_09_degree_nmc_interface(self):
183         response = test_utils.check_netconf_node_request("ROADM-D1",
184                                                          "interface/DEG1-TTP-TXRX-nmc-749:763")
185         self.assertEqual(response.status_code, requests.codes.ok)
186         res = response.json()
187         self.assertDictEqual(
188             dict({"name": "DEG1-TTP-TXRX-nmc-749:763",
189                   "supporting-interface": "DEG1-TTP-TXRX-mc-749:763",
190                   "supporting-circuit-pack-name": "1/0",
191                   "circuit-id": "TBD",
192                   "description": "TBD",
193                   "supporting-port": "L1",
194                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
195                  **res['interface'][0]),
196             res['interface'][0])
197
198         # Check the mc-ttp max and min-freq
199         self.assertEqual({
200             "frequency": 196.05,
201             "width": 80
202         },
203             res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
204         time.sleep(3)
205
206     # get SRG-NMC interface
207     def test_10_srg_nmc_interface(self):
208         response = test_utils.check_netconf_node_request("ROADM-D1",
209                                                          "interface/SRG1-PP1-TXRX-nmc-749:763")
210         res = response.json()
211         self.assertEqual(response.status_code, requests.codes.ok)
212         self.assertEqual(
213             dict({"name": "SRG1-PP1-TXRX-nmc-749:763",
214                   "supporting-circuit-pack-name": "3/0",
215                   "circuit-id": "TBD",
216                   "description": "TBD",
217                   "supporting-port": "C1",
218                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
219                  **res['interface'][0]), res['interface'][0])
220         self.assertEqual({"frequency": 196.05, "width": 80},
221                          res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
222         time.sleep(3)
223
224     # Create ROADM-connection
225     def test_11_roadm_connection(self):
226         response = test_utils.check_netconf_node_request("ROADM-D1",
227                                                          "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
228         self.assertEqual(response.status_code, requests.codes.ok)
229         res = response.json()
230         self.assertEqual("SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763",
231                          res['roadm-connections'][0]['connection-name'])
232         self.assertEqual("SRG1-PP1-TXRX-nmc-749:763",
233                          res['roadm-connections'][0]['source']['src-if'])
234         self.assertEqual("DEG1-TTP-TXRX-nmc-749:763",
235                          res['roadm-connections'][0]['destination']['dst-if'])
236         time.sleep(3)
237
238     # Delete ROADM connections and interfaces
239
240     # delete ROADM connection
241     def test_12_delete_roadm_connection(self):
242         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
243                                              "node/ROADM-D1/yang-ext:mount/" +
244                                              "org-openroadm-device:org-openroadm-device/" +
245                                              "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
246         self.assertEqual(response.status_code, requests.codes.ok)
247         time.sleep(3)
248
249     # Delete NMC SRG interface
250     def test_13_delete_srg_interface(self):
251         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
252                                              "node/ROADM-D1/yang-ext:mount/" +
253                                              "org-openroadm-device:org-openroadm-device/" +
254                                              "interface/SRG1-PP1-TXRX-nmc-749:763")
255         self.assertEqual(response.status_code, requests.codes.ok)
256         time.sleep(3)
257
258     # Delete NMC Degree interface
259     def test_14_delete_degree_interface(self):
260         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
261                                              "node/ROADM-D1/yang-ext:mount/" +
262                                              "org-openroadm-device:org-openroadm-device/" +
263                                              "interface/DEG1-TTP-TXRX-nmc-749:763")
264         self.assertEqual(response.status_code, requests.codes.ok)
265         time.sleep(3)
266
267     # Delete MC Degree interface
268     def test_15_delete_degree_interface(self):
269         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
270                                              "node/ROADM-D1/yang-ext:mount/" +
271                                              "org-openroadm-device:org-openroadm-device/" +
272                                              "interface/DEG1-TTP-TXRX-mc-749:763")
273         self.assertEqual(response.status_code, requests.codes.ok)
274         time.sleep(3)
275
276
277 if __name__ == "__main__":
278     unittest.main(verbosity=2)