2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
28 simple_topo_bi_dir_data = None
29 simple_topo_uni_dir_data = None
30 complex_topo_uni_dir_data = None
31 port_mapping_data = None
36 # pylint: disable=bare-except
37 sample_files_parsed = False
39 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
40 "..", "..", "sample_configs", "honeynode-topo.xml")
41 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
42 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
44 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
45 "..", "..", "sample_configs", "NW-simple-topology.xml")
47 with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
48 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
50 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
51 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
52 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
53 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
54 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
55 "..", "..", "sample_configs", "pce_portmapping_121.json")
56 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
57 cls.port_mapping_data = port_mapping.read()
58 sample_files_parsed = True
59 except PermissionError as err:
60 print("Permission Error when trying to read sample files\n", err)
62 except FileNotFoundError as err:
63 print("File Not found Error when trying to read sample files\n", err)
66 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
69 if sample_files_parsed:
70 print("sample files content loaded")
72 cls.processes = test_utils.start_tpce()
75 def tearDownClass(cls):
76 # pylint: disable=not-an-iterable
77 for process in cls.processes:
78 test_utils.shutdown_process(process)
79 print("all processes killed")
81 def setUp(self): # instruction executed before each test method
85 def test_00_load_port_mapping(self):
86 response = test_utils.rawpost_request(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
87 self.assertEqual(response.status_code, requests.codes.no_content)
90 # Load simple bidirectional topology
91 def test_01_load_simple_topology_bi(self):
92 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
93 self.assertEqual(response.status_code, requests.codes.ok)
97 def test_02_get_nodeId(self):
98 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
99 self.assertEqual(response.status_code, requests.codes.ok)
100 res = response.json()
102 res['node'][0]['node-id'], 'ROADMA01-SRG1')
105 # Get existing linkId
106 def test_03_get_linkId(self):
107 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
111 res['ietf-network-topology:link'][0]['link-id'],
112 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
115 # Path Computation success
116 def test_04_path_computation_xpdr_bi(self):
117 response = test_utils.path_computation_request("request-1", "service-1",
118 {"node-id": "XPDRA01", "service-rate": "100",
119 "service-format": "Ethernet", "clli": "nodeA"},
120 {"node-id": "XPDRC01", "service-rate": "100",
121 "service-format": "Ethernet", "clli": "nodeC"})
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 self.assertIn('Path is calculated',
125 res['output']['configuration-response-common']['response-message'])
128 # Path Computation success
129 def test_05_path_computation_rdm_bi(self):
130 response = test_utils.path_computation_request("request-1", "service-1",
131 {"node-id": "ROADMA01", "service-rate": "100",
132 "service-format": "Ethernet", "clli": "NodeA"},
133 {"node-id": "ROADMC01", "service-rate": "100",
134 "service-format": "Ethernet", "clli": "NodeC"})
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
137 self.assertIn('Path is calculated',
138 res['output']['configuration-response-common']['response-message'])
142 def test_06_delete_simple_topology_bi(self):
143 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
144 self.assertEqual(response.status_code, requests.codes.ok)
147 # Test deleted topology
148 def test_07_test_topology_simple_bi_deleted(self):
149 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
150 self.assertEqual(response.status_code, requests.codes.conflict)
153 # Load simple bidirectional topology
154 def test_08_load_simple_topology_uni(self):
155 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
156 self.assertEqual(response.status_code, 201)
159 # Get existing nodeId
160 def test_09_get_nodeId(self):
161 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
165 res['node'][0]['node-id'],
169 # Get existing linkId
170 def test_10_get_linkId(self):
171 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
175 res['ietf-network-topology:link'][0]['link-id'],
176 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
179 # Path Computation success
180 def test_11_path_computation_xpdr_uni(self):
181 response = test_utils.path_computation_request("request-1", "service-1",
182 {"node-id": "XPONDER-1-2", "service-rate": "100",
183 "service-format": "Ethernet", "clli": "ORANGE1"},
184 {"node-id": "XPONDER-3-2", "service-rate": "100",
185 "service-format": "Ethernet", "clli": "ORANGE3"})
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Path is calculated',
189 res['output']['configuration-response-common']['response-message'])
192 # Path Computation success
193 def test_12_path_computation_rdm_uni(self):
194 response = test_utils.path_computation_request("request1", "service1",
195 {"service-rate": "100", "service-format": "Ethernet",
196 "clli": "cll21", "node-id": "OpenROADM-2-1"},
197 {"service-rate": "100", "service-format": "Ethernet",
198 "clli": "ncli22", "node-id": "OpenROADM-2-2"})
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
201 self.assertIn('Path is calculated',
202 res['output']['configuration-response-common']['response-message'])
204 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
205 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
206 self.assertEqual(atozList, 15)
207 self.assertEqual(ztoaList, 15)
208 for i in range(0, 15):
209 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
210 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
211 if atoz['id'] == '14':
212 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
213 if ztoa['id'] == '0':
214 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
218 def test_13_delete_simple_topology(self):
219 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
220 self.assertEqual(response.status_code, requests.codes.ok)
223 # Test deleted topology
224 def test_14_test_topology_simple_deleted(self):
225 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
226 self.assertEqual(response.status_code, requests.codes.conflict)
229 # Load complex topology
230 def test_15_load_complex_topology(self):
231 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
232 self.assertEqual(response.status_code, 201)
235 # Get existing nodeId
236 def test_16_get_nodeId(self):
237 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
241 res['node'][0]['node-id'],
245 # Test failed path computation
246 def test_17_fail_path_computation(self):
247 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
248 {"input": {"service-handler-header": {"request-id": "request-1"}}})
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('Service Name is not set',
252 res['output']['configuration-response-common']['response-message'])
255 # Test1 success path computation
256 def test_18_success1_path_computation(self):
257 response = test_utils.path_computation_request("request1", "service1",
258 {"service-format": "Ethernet", "service-rate": "100",
259 "clli": "ORANGE2", "node-id": "XPONDER-2-2"},
260 {"service-format": "Ethernet", "service-rate": "100",
261 "clli": "ORANGE1", "node-id": "XPONDER-1-2"},
262 {"customer-code": ["Some customer-code"],
264 "service-identifier-list": [
266 "service-identifier": "Some existing-service",
271 {"customer-code": ["Some customer-code"],
273 "service-identifier-list": [
275 "service-identifier": "Some existing-service",
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
282 self.assertIn('Path is calculated',
283 res['output']['configuration-response-common']['response-message'])
286 # Test2 success path computation with path description
287 def test_19_success2_path_computation(self):
288 response = test_utils.path_computation_request("request 1", "service 1",
289 {"service-rate": "100", "service-format": "Ethernet",
290 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
291 {"service-rate": "100", "service-format": "Ethernet",
292 "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
293 self.assertEqual(response.status_code, requests.codes.ok)
294 res = response.json()
295 self.assertIn('Path is calculated',
296 res['output']['configuration-response-common']['response-message'])
297 self.assertEqual(5, res['output']['response-parameters']['path-description']
298 ['aToZ-direction']['aToZ-wavelength-number'])
299 self.assertEqual(5, res['output']['response-parameters']['path-description']
300 ['zToA-direction']['zToA-wavelength-number'])
303 # Test3 success path computation with hard-constraints exclude
304 def test_20_success3_path_computation(self):
305 response = test_utils.path_computation_request("request 1", "service 1",
306 {"service-rate": "100", "service-format": "Ethernet",
307 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
308 {"service-rate": "100", "service-format": "Ethernet",
309 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
310 {"exclude": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
313 self.assertIn('Path is calculated',
314 res['output']['configuration-response-common']['response-message'])
315 self.assertEqual(9, res['output']['response-parameters']['path-description']
316 ['aToZ-direction']['aToZ-wavelength-number'])
317 self.assertEqual(9, res['output']['response-parameters']['path-description']
318 ['zToA-direction']['zToA-wavelength-number'])
321 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
322 def test_21_path_computation_before_oms_attribute_deletion(self):
323 response = test_utils.path_computation_request("request 1", "service 1",
324 {"service-rate": "100", "service-format": "Ethernet",
325 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
326 {"service-rate": "100", "service-format": "Ethernet",
327 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
328 self.assertEqual(response.status_code, requests.codes.ok)
329 res = response.json()
330 self.assertIn('Path is calculated',
331 res['output']['configuration-response-common']['response-message'])
332 nbElmPath = len(res['output']['response-parameters']['path-description']
333 ['aToZ-direction']['aToZ'])
334 self.assertEqual(31, nbElmPath)
335 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
337 for i in range(0, nbElmPath):
338 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
340 if resource_i == link:
342 self.assertEqual(find, True)
345 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
346 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
347 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
348 self.assertEqual(response.status_code, requests.codes.ok)
351 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
352 def test_23_path_computation_after_oms_attribute_deletion(self):
353 response = test_utils.path_computation_request("request 1", "service 1",
354 {"service-rate": "100", "service-format": "Ethernet",
355 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
356 {"service-rate": "100", "service-format": "Ethernet",
357 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
358 self.assertEqual(response.status_code, requests.codes.ok)
359 res = response.json()
360 self.assertIn('Path is calculated',
361 res['output']['configuration-response-common']['response-message'])
362 nbElmPath = len(res['output']['response-parameters']['path-description']
363 ['aToZ-direction']['aToZ'])
364 self.assertEqual(47, nbElmPath)
365 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
367 for i in range(0, nbElmPath):
368 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
370 if resource_i == link:
372 self.assertNotEqual(find, True)
375 # Delete complex topology
376 def test_24_delete_complex_topology(self):
377 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
378 self.assertEqual(response.status_code, requests.codes.ok)
381 # Test deleted complex topology
382 def test_25_test_topology_complex_deleted(self):
383 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
384 self.assertEqual(response.status_code, requests.codes.conflict)
388 def test_26_delete_port_mapping(self):
389 response = test_utils.delete_request(test_utils.URL_FULL_PORTMAPPING)
390 self.assertEqual(response.status_code, requests.codes.ok)
394 if __name__ == "__main__":
395 unittest.main(verbosity=2)