2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
27 path_computation_input_data = {
28 "service-name": "service-1",
29 "resource-reserve": "true",
30 "service-handler-header": {
31 "request-id": "request1"
34 "service-rate": "100",
36 "service-format": "Ethernet",
40 "service-rate": "100",
42 "service-format": "Ethernet",
45 "pce-routing-metric": "hop-count"
48 simple_topo_bi_dir_data = None
49 simple_topo_uni_dir_data = None
50 complex_topo_uni_dir_data = None
51 port_mapping_data = None
56 # pylint: disable=bare-except
57 sample_files_parsed = False
60 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
61 "..", "..", "sample_configs", "honeynode-topo.json")
62 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
63 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
65 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
66 "..", "..", "sample_configs", "NW-simple-topology.json")
68 with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
69 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
71 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
72 "..", "..", "sample_configs", "NW-for-test-5-4.json")
73 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
74 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
75 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
76 "..", "..", "sample_configs", "pce_portmapping_121.json")
77 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
78 cls.port_mapping_data = port_mapping.read()
79 sample_files_parsed = True
80 except PermissionError as err:
81 print("Permission Error when trying to read sample files\n", err)
83 except FileNotFoundError as err:
84 print("File Not found Error when trying to read sample files\n", err)
87 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
90 if sample_files_parsed:
91 print("sample files content loaded")
93 cls.processes = test_utils.start_tpce()
97 def tearDownClass(cls):
99 test_utils.del_portmapping()
100 test_utils.del_ietf_network('openroadm-topology')
101 # pylint: disable=not-an-iterable
102 for process in cls.processes:
103 test_utils.shutdown_process(process)
104 print("all processes killed")
106 def setUp(self): # instruction executed before each test method
110 def test_01_load_port_mapping(self):
111 response = test_utils.post_portmapping(self.port_mapping_data)
112 self.assertIn(response['status_code'], (requests.codes.created, requests.codes.no_content))
115 # Load simple bidirectional topology
116 def test_02_load_simple_topology_bi(self):
117 response = test_utils.put_ietf_network('openroadm-topology', self.simple_topo_bi_dir_data)
118 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
121 # Get existing nodeId
122 def test_03_get_nodeId(self):
123 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
124 self.assertEqual(response['status_code'], requests.codes.ok)
125 self.assertEqual(response['node']['node-id'], 'ROADMA01-SRG1')
128 # Get existing linkId
129 def test_04_get_linkId(self):
130 response = test_utils.get_ietf_network_link_request(
131 'openroadm-topology', 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX', 'config')
132 self.assertEqual(response['status_code'], requests.codes.ok)
133 self.assertEqual(response['link']['link-id'], 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
136 # Path Computation success
137 def test_05_path_computation_xpdr_bi(self):
138 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
139 'path-computation-request',
140 self.path_computation_input_data)
141 self.assertEqual(response['status_code'], requests.codes.ok)
142 self.assertIn('Path is calculated',
143 response['output']['configuration-response-common']['response-message'])
146 # Path Computation success
147 def test_06_path_computation_rdm_bi(self):
148 self.path_computation_input_data["service-a-end"]["node-id"] = "ROADMA01"
149 self.path_computation_input_data["service-z-end"]["node-id"] = "ROADMC01"
150 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
151 'path-computation-request',
152 self.path_computation_input_data)
153 self.assertEqual(response['status_code'], requests.codes.ok)
154 self.assertIn('Path is calculated',
155 response['output']['configuration-response-common']['response-message'])
158 # Load simple bidirectional topology
159 def test_07_load_simple_topology_uni(self):
160 response = test_utils.put_ietf_network('openroadm-topology', self.simple_topo_uni_dir_data)
161 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
164 # Get existing nodeId
165 def test_08_get_nodeId(self):
166 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPONDER-1-2', 'config')
167 self.assertEqual(response['status_code'], requests.codes.ok)
168 self.assertEqual(response['node']['node-id'], 'XPONDER-1-2')
171 # Get existing linkId
172 def test_09_get_linkId(self):
173 response = test_utils.get_ietf_network_link_request(
174 'openroadm-topology', 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX', 'config')
175 self.assertEqual(response['status_code'], requests.codes.ok)
176 self.assertEqual(response['link']['link-id'], 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
179 # Path Computation success
180 def test_10_path_computation_xpdr_uni(self):
181 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2"
182 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1"
183 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2"
184 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3"
185 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
186 'path-computation-request',
187 self.path_computation_input_data)
188 self.assertEqual(response['status_code'], requests.codes.ok)
189 self.assertIn('Path is calculated',
190 response['output']['configuration-response-common']['response-message'])
193 # Path Computation success
194 def test_11_path_computation_rdm_uni(self):
195 self.path_computation_input_data["service-a-end"]["node-id"] = "OpenROADM-2-1"
196 self.path_computation_input_data["service-a-end"]["clli"] = "cll21"
197 self.path_computation_input_data["service-z-end"]["node-id"] = "OpenROADM-2-2"
198 self.path_computation_input_data["service-z-end"]["clli"] = "ncli22"
199 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
200 'path-computation-request',
201 self.path_computation_input_data)
202 self.assertEqual(response['status_code'], requests.codes.ok)
203 self.assertIn('Path is calculated',
204 response['output']['configuration-response-common']['response-message'])
206 atozList = len(response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
207 ztoaList = len(response['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
208 self.assertEqual(atozList, 15)
209 self.assertEqual(ztoaList, 15)
210 for i in range(0, 15):
211 atoz = response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
212 ztoa = response['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
213 if atoz['id'] == '14':
214 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
215 if ztoa['id'] == '0':
216 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
219 # Load complex topology
220 def test_12_load_complex_topology(self):
221 response = test_utils.put_ietf_network('openroadm-topology', self.complex_topo_uni_dir_data)
222 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
225 # Get existing nodeId
226 def test_13_get_nodeId(self):
227 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPONDER-3-2', 'config')
228 self.assertEqual(response['status_code'], requests.codes.ok)
229 self.assertEqual(response['node']['node-id'], 'XPONDER-3-2')
232 # Test failed path computation
233 def test_14_fail_path_computation(self):
234 del self.path_computation_input_data["service-name"]
235 del self.path_computation_input_data["service-a-end"]
236 del self.path_computation_input_data["service-z-end"]
237 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
238 'path-computation-request',
239 self.path_computation_input_data)
240 self.assertEqual(response['status_code'], requests.codes.ok)
241 self.assertIn('Service Name is not set',
242 response['output']['configuration-response-common']['response-message'])
245 # Test1 success path computation
246 def test_15_success1_path_computation(self):
247 self.path_computation_input_data["service-name"] = "service 1"
248 self.path_computation_input_data["service-a-end"] = {"service-format": "Ethernet", "service-rate": "100",
249 "clli": "ORANGE2", "node-id": "XPONDER-2-2"}
250 self.path_computation_input_data["service-z-end"] = {"service-format": "Ethernet", "service-rate": "100",
251 "clli": "ORANGE1", "node-id": "XPONDER-1-2"}
252 self.path_computation_input_data["hard-constraints"] = {"customer-code": ["Some customer-code"],
254 "service-identifier-list": [{
255 "service-identifier": "Some existing-service"}]
257 self.path_computation_input_data["soft-constraints"] = {"customer-code": ["Some customer-code"],
259 "service-identifier-list": [{
260 "service-identifier": "Some existing-service"}]
262 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
263 'path-computation-request',
264 self.path_computation_input_data)
265 self.assertEqual(response['status_code'], requests.codes.ok)
266 self.assertIn('Path is calculated',
267 response['output']['configuration-response-common']['response-message'])
271 # Test2 success path computation with path description
272 def test_16_success2_path_computation(self):
273 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2"
274 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1"
275 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2"
276 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3"
277 del self.path_computation_input_data["hard-constraints"]
278 del self.path_computation_input_data["soft-constraints"]
279 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
280 'path-computation-request',
281 self.path_computation_input_data)
282 self.assertEqual(response['status_code'], requests.codes.ok)
283 self.assertIn('Path is calculated',
284 response['output']['configuration-response-common']['response-message'])
285 self.assertEqual(5, response['output']['response-parameters']['path-description']
286 ['aToZ-direction']['aToZ-wavelength-number'])
287 self.assertEqual(5, response['output']['response-parameters']['path-description']
288 ['zToA-direction']['zToA-wavelength-number'])
291 # Test3 success path computation with hard-constraints exclude
292 def test_17_success3_path_computation(self):
293 self.path_computation_input_data["hard-constraints"] = {"exclude":
294 {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}}
295 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
296 'path-computation-request',
297 self.path_computation_input_data)
298 self.assertEqual(response['status_code'], requests.codes.ok)
299 self.assertIn('Path is calculated',
300 response['output']['configuration-response-common']['response-message'])
301 self.assertEqual(9, response['output']['response-parameters']['path-description']
302 ['aToZ-direction']['aToZ-wavelength-number'])
303 self.assertEqual(9, response['output']['response-parameters']['path-description']
304 ['zToA-direction']['zToA-wavelength-number'])
307 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
308 def test_18_path_computation_before_oms_attribute_deletion(self):
309 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-2-2"
310 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE2"
311 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-1-2"
312 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE1"
313 del self.path_computation_input_data["hard-constraints"]
314 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
315 'path-computation-request',
316 self.path_computation_input_data)
317 self.assertEqual(response['status_code'], requests.codes.ok)
318 self.assertIn('Path is calculated',
319 response['output']['configuration-response-common']['response-message'])
320 path_depth = len(response['output']['response-parameters']['path-description']
321 ['aToZ-direction']['aToZ'])
322 self.assertEqual(31, path_depth)
323 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
325 for i in range(0, path_depth):
326 resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
328 if resource_i == link:
330 self.assertEqual(find, True)
333 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
334 def test_19_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
335 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
336 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
339 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
340 def test_20_path_computation_after_oms_attribute_deletion(self):
341 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
342 'path-computation-request',
343 self.path_computation_input_data)
344 self.assertEqual(response['status_code'], requests.codes.ok)
345 self.assertIn('Path is calculated',
346 response['output']['configuration-response-common']['response-message'])
347 path_depth = len(response['output']['response-parameters']['path-description']
348 ['aToZ-direction']['aToZ'])
349 self.assertEqual(47, path_depth)
350 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
352 for i in range(0, path_depth):
353 resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
355 if resource_i == link:
357 self.assertNotEqual(find, True)
361 if __name__ == "__main__":
362 unittest.main(verbosity=2)