2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
27 path_computation_input_data = {
28 "service-name": "service-1",
29 "resource-reserve": "true",
30 "service-handler-header": {
31 "request-id": "request1"
34 "service-rate": "100",
36 "service-format": "Ethernet",
40 "service-rate": "100",
42 "service-format": "Ethernet",
45 "pce-routing-metric": "hop-count"
48 simple_topo_bi_dir_data = None
49 simple_topo_uni_dir_data = None
50 complex_topo_uni_dir_data = None
51 port_mapping_data = None
56 # pylint: disable=bare-except
57 sample_files_parsed = False
59 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
60 "..", "..", "sample_configs", "honeynode-topo.json")
61 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
62 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
64 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
65 "..", "..", "sample_configs", "NW-simple-topology.json")
67 with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
68 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
70 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
71 "..", "..", "sample_configs", "NW-for-test-5-4.json")
72 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
73 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
74 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
75 "..", "..", "sample_configs", "pce_portmapping_121.json")
76 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
77 cls.port_mapping_data = port_mapping.read()
78 sample_files_parsed = True
79 except PermissionError as err:
80 print("Permission Error when trying to read sample files\n", err)
82 except FileNotFoundError as err:
83 print("File Not found Error when trying to read sample files\n", err)
86 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
89 if sample_files_parsed:
90 print("sample files content loaded")
92 cls.processes = test_utils.start_tpce()
96 def tearDownClass(cls):
98 test_utils.del_portmapping()
99 test_utils.del_ietf_network('openroadm-topology')
100 # pylint: disable=not-an-iterable
101 for process in cls.processes:
102 test_utils.shutdown_process(process)
103 print("all processes killed")
105 def setUp(self): # instruction executed before each test method
109 def test_01_load_port_mapping(self):
110 response = test_utils.post_portmapping(self.port_mapping_data)
111 self.assertIn(response['status_code'], (requests.codes.created, requests.codes.no_content))
114 # Load simple bidirectional topology
115 def test_02_load_simple_topology_bi(self):
116 response = test_utils.put_ietf_network('openroadm-topology', self.simple_topo_bi_dir_data)
117 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
120 # Get existing nodeId
121 def test_03_get_nodeId(self):
122 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
123 self.assertEqual(response['status_code'], requests.codes.ok)
124 self.assertEqual(response['node']['node-id'], 'ROADMA01-SRG1')
127 # Get existing linkId
128 def test_04_get_linkId(self):
129 response = test_utils.get_ietf_network_link_request(
130 'openroadm-topology', 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX', 'config')
131 self.assertEqual(response['status_code'], requests.codes.ok)
132 self.assertEqual(response['link']['link-id'], 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
135 # Path Computation success
136 def test_05_path_computation_xpdr_bi(self):
137 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
138 'path-computation-request',
139 self.path_computation_input_data)
140 self.assertEqual(response['status_code'], requests.codes.ok)
141 self.assertIn('Path is calculated',
142 response['output']['configuration-response-common']['response-message'])
145 # Path Computation success
146 def test_06_path_computation_rdm_bi(self):
147 self.path_computation_input_data["service-a-end"]["node-id"] = "ROADMA01"
148 self.path_computation_input_data["service-z-end"]["node-id"] = "ROADMC01"
149 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
150 'path-computation-request',
151 self.path_computation_input_data)
152 self.assertEqual(response['status_code'], requests.codes.ok)
153 self.assertIn('Path is calculated',
154 response['output']['configuration-response-common']['response-message'])
157 # Load simple bidirectional topology
158 def test_07_load_simple_topology_uni(self):
159 response = test_utils.put_ietf_network('openroadm-topology', self.simple_topo_uni_dir_data)
160 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
163 # Get existing nodeId
164 def test_08_get_nodeId(self):
165 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPONDER-1-2', 'config')
166 self.assertEqual(response['status_code'], requests.codes.ok)
167 self.assertEqual(response['node']['node-id'], 'XPONDER-1-2')
170 # Get existing linkId
171 def test_09_get_linkId(self):
172 response = test_utils.get_ietf_network_link_request(
173 'openroadm-topology', 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX', 'config')
174 self.assertEqual(response['status_code'], requests.codes.ok)
175 self.assertEqual(response['link']['link-id'], 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
178 # Path Computation success
179 def test_10_path_computation_xpdr_uni(self):
180 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2"
181 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1"
182 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2"
183 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3"
184 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
185 'path-computation-request',
186 self.path_computation_input_data)
187 self.assertEqual(response['status_code'], requests.codes.ok)
188 self.assertIn('Path is calculated',
189 response['output']['configuration-response-common']['response-message'])
192 # Path Computation success
193 def test_11_path_computation_rdm_uni(self):
194 self.path_computation_input_data["service-a-end"]["node-id"] = "OpenROADM-2-1"
195 self.path_computation_input_data["service-a-end"]["clli"] = "cll21"
196 self.path_computation_input_data["service-z-end"]["node-id"] = "OpenROADM-2-2"
197 self.path_computation_input_data["service-z-end"]["clli"] = "ncli22"
198 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
199 'path-computation-request',
200 self.path_computation_input_data)
201 self.assertEqual(response['status_code'], requests.codes.ok)
202 self.assertIn('Path is calculated',
203 response['output']['configuration-response-common']['response-message'])
205 atozList = len(response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
206 ztoaList = len(response['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
207 self.assertEqual(atozList, 15)
208 self.assertEqual(ztoaList, 15)
209 for i in range(0, 15):
210 atoz = response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
211 ztoa = response['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
212 if atoz['id'] == '14':
213 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
214 if ztoa['id'] == '0':
215 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
218 # Load complex topology
219 def test_12_load_complex_topology(self):
220 response = test_utils.put_ietf_network('openroadm-topology', self.complex_topo_uni_dir_data)
221 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
224 # Get existing nodeId
225 def test_13_get_nodeId(self):
226 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPONDER-3-2', 'config')
227 self.assertEqual(response['status_code'], requests.codes.ok)
228 self.assertEqual(response['node']['node-id'], 'XPONDER-3-2')
231 # Test failed path computation
232 def test_14_fail_path_computation(self):
233 del self.path_computation_input_data["service-name"]
234 del self.path_computation_input_data["service-a-end"]
235 del self.path_computation_input_data["service-z-end"]
236 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
237 'path-computation-request',
238 self.path_computation_input_data)
239 self.assertEqual(response['status_code'], requests.codes.ok)
240 self.assertIn('Service Name is not set',
241 response['output']['configuration-response-common']['response-message'])
244 # Test1 success path computation
245 def test_15_success1_path_computation(self):
246 self.path_computation_input_data["service-name"] = "service 1"
247 self.path_computation_input_data["service-a-end"] = {"service-format": "Ethernet", "service-rate": "100",
248 "clli": "ORANGE2", "node-id": "XPONDER-2-2"}
249 self.path_computation_input_data["service-z-end"] = {"service-format": "Ethernet", "service-rate": "100",
250 "clli": "ORANGE1", "node-id": "XPONDER-1-2"}
251 self.path_computation_input_data["hard-constraints"] = {"customer-code": ["Some customer-code"],
253 "service-identifier-list": [{
254 "service-identifier": "Some existing-service"}]
256 self.path_computation_input_data["soft-constraints"] = {"customer-code": ["Some customer-code"],
258 "service-identifier-list": [{
259 "service-identifier": "Some existing-service"}]
261 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
262 'path-computation-request',
263 self.path_computation_input_data)
264 self.assertEqual(response['status_code'], requests.codes.ok)
265 self.assertIn('Path is calculated',
266 response['output']['configuration-response-common']['response-message'])
270 # Test2 success path computation with path description
271 def test_16_success2_path_computation(self):
272 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2"
273 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1"
274 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2"
275 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3"
276 del self.path_computation_input_data["hard-constraints"]
277 del self.path_computation_input_data["soft-constraints"]
278 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
279 'path-computation-request',
280 self.path_computation_input_data)
281 self.assertEqual(response['status_code'], requests.codes.ok)
282 self.assertIn('Path is calculated',
283 response['output']['configuration-response-common']['response-message'])
284 self.assertEqual(5, response['output']['response-parameters']['path-description']
285 ['aToZ-direction']['aToZ-wavelength-number'])
286 self.assertEqual(5, response['output']['response-parameters']['path-description']
287 ['zToA-direction']['zToA-wavelength-number'])
290 # Test3 success path computation with hard-constraints exclude
291 def test_17_success3_path_computation(self):
292 self.path_computation_input_data["hard-constraints"] = {"exclude":
293 {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}}
294 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
295 'path-computation-request',
296 self.path_computation_input_data)
297 self.assertEqual(response['status_code'], requests.codes.ok)
298 self.assertIn('Path is calculated',
299 response['output']['configuration-response-common']['response-message'])
300 self.assertEqual(9, response['output']['response-parameters']['path-description']
301 ['aToZ-direction']['aToZ-wavelength-number'])
302 self.assertEqual(9, response['output']['response-parameters']['path-description']
303 ['zToA-direction']['zToA-wavelength-number'])
306 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
307 def test_18_path_computation_before_oms_attribute_deletion(self):
308 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-2-2"
309 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE2"
310 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-1-2"
311 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE1"
312 del self.path_computation_input_data["hard-constraints"]
313 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
314 'path-computation-request',
315 self.path_computation_input_data)
316 self.assertEqual(response['status_code'], requests.codes.ok)
317 self.assertIn('Path is calculated',
318 response['output']['configuration-response-common']['response-message'])
319 path_depth = len(response['output']['response-parameters']['path-description']
320 ['aToZ-direction']['aToZ'])
321 self.assertEqual(31, path_depth)
322 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
324 for i in range(0, path_depth):
325 resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
327 if resource_i == link:
329 self.assertEqual(find, True)
332 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
333 def test_19_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
334 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
335 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
338 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
339 def test_20_path_computation_after_oms_attribute_deletion(self):
340 response = test_utils.transportpce_api_rpc_request('transportpce-pce',
341 'path-computation-request',
342 self.path_computation_input_data)
343 self.assertEqual(response['status_code'], requests.codes.ok)
344 self.assertIn('Path is calculated',
345 response['output']['configuration-response-common']['response-message'])
346 path_depth = len(response['output']['response-parameters']['path-description']
347 ['aToZ-direction']['aToZ'])
348 self.assertEqual(47, path_depth)
349 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
351 for i in range(0, path_depth):
352 resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
354 if resource_i == link:
356 self.assertNotEqual(find, True)
360 if __name__ == "__main__":
361 unittest.main(verbosity=2)