21aa9989d22098dcbdf1b75e63bd0dcdae20533c
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_flex_grid.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import unittest
13 import time
14 import requests
15 from common import test_utils
16
17
18 class TransportPCEPortMappingTesting(unittest.TestCase):
19
20     processes = None
21
22     @classmethod
23     def setUpClass(cls):
24         cls.processes = test_utils.start_tpce()
25         cls.processes = test_utils.start_sims(['roadmd'])
26
27     @classmethod
28     def tearDownClass(cls):
29         for process in cls.processes:
30             test_utils.shutdown_process(process)
31         print("all processes killed")
32
33     def setUp(self):
34         print("execution of {}".format(self.id().split(".")[-1]))
35         time.sleep(10)
36
37     def test_01_rdm_device_connection(self):
38         response = test_utils.mount_device("ROADM-D1", 'roadmd')
39         self.assertEqual(response.status_code, requests.codes.created,
40                          test_utils.CODE_SHOULD_BE_201)
41
42     def test_02_rdm_device_connected(self):
43         response = test_utils.get_netconf_oper_request("ROADM-D1")
44         self.assertEqual(response.status_code, requests.codes.ok)
45         res = response.json()
46         self.assertEqual(
47             res['node'][0]['netconf-node-topology:connection-status'],
48             'connected')
49         time.sleep(10)
50
51     def test_03_rdm_portmapping_info(self):
52         response = test_utils.portmapping_request("ROADM-D1/node-info")
53         self.assertEqual(response.status_code, requests.codes.ok)
54         res = response.json()
55         self.assertEqual(
56             {u'node-info': {u'node-type': u'rdm',
57                             u'node-ip-address': u'127.0.0.14',
58                             u'node-clli': u'NodeD',
59                             u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorD',
60                             u'node-model': u'model2',
61                             }},
62             res)
63         time.sleep(3)
64
65     def test_04_rdm_deg1_lcp(self):
66         response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/DEG1-TTP")
67         self.assertEqual(response.status_code, requests.codes.ok)
68         res = response.json()
69         self.assertEqual(
70             {
71               "mc-capabilities": [
72                 {
73                   "mc-node-name": "DEG1-TTP",
74                   "center-freq-granularity": 6.25,
75                   "slot-width-granularity": 12.5
76                 }
77               ]
78             }, res)
79         time.sleep(3)
80
81     def test_05_rdm_deg2_lcp(self):
82         response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/DEG2-TTP")
83         self.assertEqual(response.status_code, requests.codes.ok)
84         res = response.json()
85         self.assertEqual(
86             {
87               "mc-capabilities": [
88                 {
89                   "mc-node-name": "DEG2-TTP",
90                   "center-freq-granularity": 6.25,
91                   "slot-width-granularity": 12.5
92                 }
93               ]
94             }, res)
95         time.sleep(3)
96
97     def test_06_rdm_srg1_lcp(self):
98         response = test_utils.portmapping_request("ROADM-D1/mc-capabilities/SRG1-PP")
99         self.assertEqual(response.status_code, requests.codes.ok)
100         res = response.json()
101         self.assertEqual(
102             {
103               "mc-capabilities": [
104                 {
105                   "mc-node-name": "SRG1-PP",
106                   "center-freq-granularity": 6.25,
107                   "slot-width-granularity": 12.5
108                 }
109               ]
110             }, res)
111         time.sleep(3)
112
113     # Renderer interface creations
114     def test_07_device_renderer(self):
115         data = {
116             "transportpce-device-renderer:input": {
117             "transportpce-device-renderer:modulation-format": "dp-qpsk",
118             "transportpce-device-renderer:operation": "create",
119             "transportpce-device-renderer:service-name": "testNMC-MC",
120             "transportpce-device-renderer:wave-number": "0",
121             "transportpce-device-renderer:center-freq": "196.05",
122             "transportpce-device-renderer:width": "80",
123             "transportpce-device-renderer:nodes": [
124               {
125                 "transportpce-device-renderer:node-id": "ROADM-D1",
126                 "transportpce-device-renderer:src-tp": "SRG1-PP1-TXRX",
127                 "transportpce-device-renderer:dest-tp": "DEG1-TTP-TXRX"
128               }
129             ],
130             "transportpce-device-renderer:min-freq": 196.00625,
131             "transportpce-device-renderer:max-freq": 196.09375,
132             "transportpce-device-renderer:lower-spectral-slot-number": 749,
133             "transportpce-device-renderer:higher-spectral-slot-number": 763
134           }
135         }
136         url = test_utils.RESTCONF_BASE_URL + \
137               "/operations/transportpce-device-renderer:service-path"
138         response = test_utils.post_request(url, data)
139         self.assertEqual(response.status_code, requests.codes.ok)
140         res = response.json()
141         self.assertIn('Roadm-connection successfully created for nodes',
142                       res['output']['result'])
143         time.sleep(10)
144
145     # Get Degree MC interface and check
146     def test_08_degree_mc_interface(self):
147         response = test_utils.check_netconf_node_request("ROADM-D1",
148                                                          "interface/DEG1-TTP-TXRX-mc-749:763")
149         self.assertEqual(response.status_code, requests.codes.ok)
150         res = response.json()
151         self.assertDictEqual(
152             dict({"name": "DEG1-TTP-TXRX-mc-749:763",
153                   "supporting-interface": "OMS-DEG1-TTP-TXRX",
154                   "supporting-circuit-pack-name": "1/0",
155                   "circuit-id": "TBD",
156                   "description": "TBD",
157                   "supporting-port": "L1",
158                   "type": "org-openroadm-interfaces:mediaChannelTrailTerminationPoint"},
159                   **res['interface'][0]),
160             res['interface'][0])
161
162         # Check the mc-ttp max and min-freq
163         self.assertEqual({
164                           "min-freq": 196.00625,
165                           "max-freq": 196.09375
166                           },
167             res['interface'][0]['org-openroadm-media-channel-interfaces:mc-ttp'])
168         time.sleep(3)
169
170     # get DEG-NMC interface and check
171     def test_09_degree_nmc_interface(self):
172         response = test_utils.check_netconf_node_request("ROADM-D1",
173                                                          "interface/DEG1-TTP-TXRX-nmc-749:763")
174         self.assertEqual(response.status_code, requests.codes.ok)
175         res = response.json()
176         self.assertDictEqual(
177             dict({"name": "DEG1-TTP-TXRX-nmc-749:763",
178                   "supporting-interface": "DEG1-TTP-TXRX-mc-749:763",
179                   "supporting-circuit-pack-name": "1/0",
180                   "circuit-id": "TBD",
181                   "description": "TBD",
182                   "supporting-port": "L1",
183                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
184                  **res['interface'][0]),
185             res['interface'][0])
186
187         # Check the mc-ttp max and min-freq
188         self.assertEqual({
189           "frequency": 196.05,
190           "width": 80
191         },
192         res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
193         time.sleep(3)
194
195     # get SRG-NMC interface
196     def test_10_srg_nmc_interface(self):
197         response = test_utils.check_netconf_node_request("ROADM-D1",
198                                                          "interface/SRG1-PP1-TXRX-nmc-749:763")
199         res = response.json()
200         self.assertEqual(response.status_code, requests.codes.ok)
201         self.assertEqual(
202             dict({"name": "SRG1-PP1-TXRX-nmc-749:763",
203                   "supporting-circuit-pack-name": "3/0",
204                   "circuit-id": "TBD",
205                   "description": "TBD",
206                   "supporting-port": "C1",
207                   "type": "org-openroadm-interfaces:networkMediaChannelConnectionTerminationPoint"},
208                  **res['interface'][0]), res['interface'][0])
209         self.assertEqual({"frequency": 196.05, "width": 80},
210                          res['interface'][0]['org-openroadm-network-media-channel-interfaces:nmc-ctp'])
211         time.sleep(3)
212
213     # Create ROADM-connection
214     def test_11_roadm_connection(self):
215         response = test_utils.check_netconf_node_request("ROADM-D1",
216                                           "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
217         self.assertEqual(response.status_code, requests.codes.ok)
218         res = response.json()
219         self.assertEqual("SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763",
220                          res['roadm-connections'][0]['connection-name'])
221         self.assertEqual("SRG1-PP1-TXRX-nmc-749:763",
222                          res['roadm-connections'][0]['source']['src-if'])
223         self.assertEqual("DEG1-TTP-TXRX-nmc-749:763",
224                          res['roadm-connections'][0]['destination']['dst-if'])
225         time.sleep(3)
226
227     # Delete ROADM connections and interfaces
228
229     # delete ROADM connection
230     def test_12_delete_roadm_connection(self):
231         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
232                    "node/ROADM-D1/yang-ext:mount/" +
233                    "org-openroadm-device:org-openroadm-device/" +
234                    "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-749:763")
235         self.assertEqual(response.status_code, requests.codes.ok)
236         time.sleep(3)
237
238     # Delete NMC SRG interface
239     def test_13_delete_srg_interface(self):
240         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
241                                            "node/ROADM-D1/yang-ext:mount/" +
242                                            "org-openroadm-device:org-openroadm-device/" +
243                                            "interface/SRG1-PP1-TXRX-nmc-749:763")
244         self.assertEqual(response.status_code, requests.codes.ok)
245         time.sleep(3)
246
247     # Delete NMC Degree interface
248     def test_14_delete_degree_interface(self):
249         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
250                                              "node/ROADM-D1/yang-ext:mount/" +
251                                              "org-openroadm-device:org-openroadm-device/" +
252                                              "interface/DEG1-TTP-TXRX-nmc-749:763")
253         self.assertEqual(response.status_code, requests.codes.ok)
254         time.sleep(3)
255
256     # Delete MC Degree interface
257     def test_15_delete_degree_interface(self):
258         response = test_utils.delete_request(test_utils.URL_CONFIG_NETCONF_TOPO +
259                                              "node/ROADM-D1/yang-ext:mount/" +
260                                              "org-openroadm-device:org-openroadm-device/" +
261                                              "interface/DEG1-TTP-TXRX-mc-749:763")
262         self.assertEqual(response.status_code, requests.codes.ok)
263         time.sleep(3)
264
265
266 if __name__ == "__main__":
267     unittest.main(verbosity=2)