2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040 # nopep8
26 class TransportPCEtesting(unittest.TestCase):
27 path_computation_input_data = {
28 "service-name": "service-1",
29 "resource-reserve": "true",
30 "service-handler-header": {
31 "request-id": "request1"
34 "service-rate": "100",
36 "service-format": "Ethernet",
40 "service-rate": "100",
42 "service-format": "Ethernet",
45 "pce-routing-metric": "hop-count"
48 simple_topo_bi_dir_data = None
49 simple_topo_uni_dir_data = None
50 complex_topo_uni_dir_data = None
51 port_mapping_data = None
56 # pylint: disable=bare-except
57 sample_files_parsed = False
59 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
60 "..", "..", "sample_configs", "honeynode-topo.json")
61 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
62 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
64 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
65 "..", "..", "sample_configs", "NW-simple-topology.json")
67 with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
68 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
70 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
71 "..", "..", "sample_configs", "NW-for-test-5-4.json")
72 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
73 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
74 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
75 "..", "..", "sample_configs", "pce_portmapping_121.json")
76 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
77 cls.port_mapping_data = port_mapping.read()
78 sample_files_parsed = True
79 except PermissionError as err:
80 print("Permission Error when trying to read sample files\n", err)
82 except FileNotFoundError as err:
83 print("File Not found Error when trying to read sample files\n", err)
86 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
89 if sample_files_parsed:
90 print("sample files content loaded")
92 cls.processes = test_utils_rfc8040.start_tpce()
95 def tearDownClass(cls):
97 test_utils_rfc8040.del_portmapping()
98 test_utils_rfc8040.del_ietf_network('openroadm-topology')
99 # pylint: disable=not-an-iterable
100 for process in cls.processes:
101 test_utils_rfc8040.shutdown_process(process)
102 print("all processes killed")
104 def setUp(self): # instruction executed before each test method
108 def test_01_load_port_mapping(self):
109 response = test_utils_rfc8040.post_portmapping(self.port_mapping_data)
110 self.assertIn(response['status_code'], (requests.codes.created, requests.codes.no_content))
113 # Load simple bidirectional topology
114 def test_02_load_simple_topology_bi(self):
115 response = test_utils_rfc8040.put_ietf_network('openroadm-topology', self.simple_topo_bi_dir_data)
116 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
119 # Get existing nodeId
120 def test_03_get_nodeId(self):
121 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
122 self.assertEqual(response['status_code'], requests.codes.ok)
123 self.assertEqual(response['node']['node-id'], 'ROADMA01-SRG1')
126 # Get existing linkId
127 def test_04_get_linkId(self):
128 response = test_utils_rfc8040.get_ietf_network_link_request(
129 'openroadm-topology', 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX', 'config')
130 self.assertEqual(response['status_code'], requests.codes.ok)
131 self.assertEqual(response['link']['link-id'], 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
134 # Path Computation success
135 def test_05_path_computation_xpdr_bi(self):
136 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
137 'path-computation-request',
138 self.path_computation_input_data)
139 self.assertEqual(response['status_code'], requests.codes.ok)
140 self.assertIn('Path is calculated',
141 response['output']['configuration-response-common']['response-message'])
144 # Path Computation success
145 def test_06_path_computation_rdm_bi(self):
146 self.path_computation_input_data["service-a-end"]["node-id"] = "ROADMA01"
147 self.path_computation_input_data["service-z-end"]["node-id"] = "ROADMC01"
148 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
149 'path-computation-request',
150 self.path_computation_input_data)
151 self.assertEqual(response['status_code'], requests.codes.ok)
152 self.assertIn('Path is calculated',
153 response['output']['configuration-response-common']['response-message'])
156 # Load simple bidirectional topology
157 def test_07_load_simple_topology_uni(self):
158 response = test_utils_rfc8040.put_ietf_network('openroadm-topology', self.simple_topo_uni_dir_data)
159 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
162 # Get existing nodeId
163 def test_08_get_nodeId(self):
164 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPONDER-1-2', 'config')
165 self.assertEqual(response['status_code'], requests.codes.ok)
166 self.assertEqual(response['node']['node-id'], 'XPONDER-1-2')
169 # Get existing linkId
170 def test_09_get_linkId(self):
171 response = test_utils_rfc8040.get_ietf_network_link_request(
172 'openroadm-topology', 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX', 'config')
173 self.assertEqual(response['status_code'], requests.codes.ok)
174 self.assertEqual(response['link']['link-id'], 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
177 # Path Computation success
178 def test_10_path_computation_xpdr_uni(self):
179 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2"
180 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1"
181 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2"
182 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3"
183 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
184 'path-computation-request',
185 self.path_computation_input_data)
186 self.assertEqual(response['status_code'], requests.codes.ok)
187 self.assertIn('Path is calculated',
188 response['output']['configuration-response-common']['response-message'])
191 # Path Computation success
192 def test_11_path_computation_rdm_uni(self):
193 self.path_computation_input_data["service-a-end"]["node-id"] = "OpenROADM-2-1"
194 self.path_computation_input_data["service-a-end"]["clli"] = "cll21"
195 self.path_computation_input_data["service-z-end"]["node-id"] = "OpenROADM-2-2"
196 self.path_computation_input_data["service-z-end"]["clli"] = "ncli22"
197 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
198 'path-computation-request',
199 self.path_computation_input_data)
200 self.assertEqual(response['status_code'], requests.codes.ok)
201 self.assertIn('Path is calculated',
202 response['output']['configuration-response-common']['response-message'])
204 atozList = len(response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
205 ztoaList = len(response['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
206 self.assertEqual(atozList, 15)
207 self.assertEqual(ztoaList, 15)
208 for i in range(0, 15):
209 atoz = response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
210 ztoa = response['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
211 if atoz['id'] == '14':
212 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
213 if ztoa['id'] == '0':
214 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
217 # Load complex topology
218 def test_12_load_complex_topology(self):
219 response = test_utils_rfc8040.put_ietf_network('openroadm-topology', self.complex_topo_uni_dir_data)
220 self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
223 # Get existing nodeId
224 def test_13_get_nodeId(self):
225 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPONDER-3-2', 'config')
226 self.assertEqual(response['status_code'], requests.codes.ok)
227 self.assertEqual(response['node']['node-id'], 'XPONDER-3-2')
230 # Test failed path computation
231 def test_14_fail_path_computation(self):
232 del self.path_computation_input_data["service-name"]
233 del self.path_computation_input_data["service-a-end"]
234 del self.path_computation_input_data["service-z-end"]
235 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
236 'path-computation-request',
237 self.path_computation_input_data)
238 self.assertEqual(response['status_code'], requests.codes.ok)
239 self.assertIn('Service Name is not set',
240 response['output']['configuration-response-common']['response-message'])
243 # Test1 success path computation
244 def test_15_success1_path_computation(self):
245 self.path_computation_input_data["service-name"] = "service 1"
246 self.path_computation_input_data["service-a-end"] = {"service-format": "Ethernet", "service-rate": "100",
247 "clli": "ORANGE2", "node-id": "XPONDER-2-2"}
248 self.path_computation_input_data["service-z-end"] = {"service-format": "Ethernet", "service-rate": "100",
249 "clli": "ORANGE1", "node-id": "XPONDER-1-2"}
250 self.path_computation_input_data["hard-constraints"] = {"customer-code": ["Some customer-code"],
252 "service-identifier-list": [{
253 "service-identifier": "Some existing-service"}]
255 self.path_computation_input_data["soft-constraints"] = {"customer-code": ["Some customer-code"],
257 "service-identifier-list": [{
258 "service-identifier": "Some existing-service"}]
260 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
261 'path-computation-request',
262 self.path_computation_input_data)
263 self.assertEqual(response['status_code'], requests.codes.ok)
264 self.assertIn('Path is calculated',
265 response['output']['configuration-response-common']['response-message'])
269 # Test2 success path computation with path description
270 def test_16_success2_path_computation(self):
271 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-1-2"
272 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE1"
273 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-3-2"
274 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE3"
275 del self.path_computation_input_data["hard-constraints"]
276 del self.path_computation_input_data["soft-constraints"]
277 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
278 'path-computation-request',
279 self.path_computation_input_data)
280 self.assertEqual(response['status_code'], requests.codes.ok)
281 self.assertIn('Path is calculated',
282 response['output']['configuration-response-common']['response-message'])
283 self.assertEqual(5, response['output']['response-parameters']['path-description']
284 ['aToZ-direction']['aToZ-wavelength-number'])
285 self.assertEqual(5, response['output']['response-parameters']['path-description']
286 ['zToA-direction']['zToA-wavelength-number'])
289 # Test3 success path computation with hard-constraints exclude
290 def test_17_success3_path_computation(self):
291 self.path_computation_input_data["hard-constraints"] = {"exclude":
292 {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}}
293 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
294 'path-computation-request',
295 self.path_computation_input_data)
296 self.assertEqual(response['status_code'], requests.codes.ok)
297 self.assertIn('Path is calculated',
298 response['output']['configuration-response-common']['response-message'])
299 self.assertEqual(9, response['output']['response-parameters']['path-description']
300 ['aToZ-direction']['aToZ-wavelength-number'])
301 self.assertEqual(9, response['output']['response-parameters']['path-description']
302 ['zToA-direction']['zToA-wavelength-number'])
305 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
306 def test_18_path_computation_before_oms_attribute_deletion(self):
307 self.path_computation_input_data["service-a-end"]["node-id"] = "XPONDER-2-2"
308 self.path_computation_input_data["service-a-end"]["clli"] = "ORANGE2"
309 self.path_computation_input_data["service-z-end"]["node-id"] = "XPONDER-1-2"
310 self.path_computation_input_data["service-z-end"]["clli"] = "ORANGE1"
311 del self.path_computation_input_data["hard-constraints"]
312 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
313 'path-computation-request',
314 self.path_computation_input_data)
315 self.assertEqual(response['status_code'], requests.codes.ok)
316 self.assertIn('Path is calculated',
317 response['output']['configuration-response-common']['response-message'])
318 path_depth = len(response['output']['response-parameters']['path-description']
319 ['aToZ-direction']['aToZ'])
320 self.assertEqual(31, path_depth)
321 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
323 for i in range(0, path_depth):
324 resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
326 if resource_i == link:
328 self.assertEqual(find, True)
331 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
332 def test_19_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
333 response = test_utils_rfc8040.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
334 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
337 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
338 def test_20_path_computation_after_oms_attribute_deletion(self):
339 response = test_utils_rfc8040.transportpce_api_rpc_request('transportpce-pce',
340 'path-computation-request',
341 self.path_computation_input_data)
342 self.assertEqual(response['status_code'], requests.codes.ok)
343 self.assertIn('Path is calculated',
344 response['output']['configuration-response-common']['response-message'])
345 path_depth = len(response['output']['response-parameters']['path-description']
346 ['aToZ-direction']['aToZ'])
347 self.assertEqual(47, path_depth)
348 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
350 for i in range(0, path_depth):
351 resource_i = (response['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
353 if resource_i == link:
355 self.assertNotEqual(find, True)
359 if __name__ == "__main__":
360 unittest.main(verbosity=2)