2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
28 simple_topo_bi_dir_data = None
29 simple_topo_uni_dir_data = None
30 complex_topo_uni_dir_data = None
31 port_mapping_data = None
36 # pylint: disable=bare-except
37 sample_files_parsed = False
39 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
40 "..", "..", "sample_configs", "honeynode-topo.xml")
41 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
42 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
44 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
45 "..", "..", "sample_configs", "NW-simple-topology.xml")
47 with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
48 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
50 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
51 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
52 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
53 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
54 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
55 "..", "..", "sample_configs", "pce_portmapping_121.json")
56 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
57 cls.port_mapping_data = port_mapping.read()
58 sample_files_parsed = True
59 except PermissionError as err:
60 print("Permission Error when trying to read sample files\n", err)
62 except FileNotFoundError as err:
63 print("File Not found Error when trying to read sample files\n", err)
66 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
69 if sample_files_parsed:
70 print("sample files content loaded")
72 cls.processes = test_utils.start_tpce()
75 def tearDownClass(cls):
76 # pylint: disable=not-an-iterable
77 for process in cls.processes:
78 test_utils.shutdown_process(process)
79 print("all processes killed")
81 def setUp(self): # instruction executed before each test method
85 def test_00_load_port_mapping(self):
86 response = test_utils.rawpost_request(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
87 self.assertEqual(response.status_code, requests.codes.no_content)
90 # Load simple bidirectional topology
91 def test_01_load_simple_topology_bi(self):
92 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
93 self.assertEqual(response.status_code, requests.codes.ok)
97 def test_02_get_nodeId(self):
98 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
99 self.assertEqual(response.status_code, requests.codes.ok)
100 res = response.json()
102 res['node'][0]['node-id'], 'ROADMA01-SRG1')
105 # Get existing linkId
106 def test_03_get_linkId(self):
107 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
111 res['ietf-network-topology:link'][0]['link-id'],
112 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
115 # Path Computation success
116 def test_04_path_computation_xpdr_bi(self):
117 response = test_utils.path_computation_request("request-1", "service-1",
118 {"node-id": "XPDRA01", "service-rate": "100",
119 "service-format": "Ethernet", "clli": "nodeA"},
120 {"node-id": "XPDRC01", "service-rate": "100",
121 "service-format": "Ethernet", "clli": "nodeC"})
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 self.assertIn('Path is calculated',
125 res['output']['configuration-response-common']['response-message'])
128 # Path Computation success
129 def test_05_path_computation_rdm_bi(self):
130 response = test_utils.path_computation_request("request-1", "service-1",
131 {"node-id": "ROADMA01", "service-rate": "100",
132 "service-format": "Ethernet", "clli": "NodeA"},
133 {"node-id": "ROADMC01", "service-rate": "100",
134 "service-format": "Ethernet", "clli": "NodeC"})
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
137 self.assertIn('Path is calculated',
138 res['output']['configuration-response-common']['response-message'])
142 def test_06_delete_simple_topology_bi(self):
143 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
144 self.assertEqual(response.status_code, requests.codes.ok)
147 # Test deleted topology
148 def test_07_test_topology_simple_bi_deleted(self):
149 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
150 self.assertEqual(response.status_code, requests.codes.conflict)
153 # Load simple bidirectional topology
154 def test_08_load_simple_topology_uni(self):
155 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
156 self.assertEqual(response.status_code, 201)
159 # Get existing nodeId
160 def test_09_get_nodeId(self):
161 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
165 res['node'][0]['node-id'],
169 # Get existing linkId
170 def test_10_get_linkId(self):
171 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
175 res['ietf-network-topology:link'][0]['link-id'],
176 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
179 # Path Computation success
180 def test_11_path_computation_xpdr_uni(self):
181 response = test_utils.path_computation_request("request-1", "service-1",
182 {"node-id": "XPONDER-1-2", "service-rate": "100",
183 "service-format": "Ethernet", "clli": "ORANGE1"},
184 {"node-id": "XPONDER-3-2", "service-rate": "100",
185 "service-format": "Ethernet", "clli": "ORANGE3"})
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Path is calculated',
189 res['output']['configuration-response-common']['response-message'])
192 # Path Computation success
193 def test_12_path_computation_rdm_uni(self):
194 response = test_utils.path_computation_request("request1", "service1",
195 {"service-rate": "100", "service-format": "Ethernet",
196 "clli": "cll21", "node-id": "OpenROADM-2-1"},
197 {"service-rate": "100", "service-format": "Ethernet",
198 "clli": "ncli22", "node-id": "OpenROADM-2-2"})
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
201 self.assertIn('Path is calculated',
202 res['output']['configuration-response-common']['response-message'])
204 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
205 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
206 self.assertEqual(atozList, 15)
207 self.assertEqual(ztoaList, 15)
208 for i in range(0, 15):
209 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
210 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
211 if atoz['id'] == '14':
212 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
213 if ztoa['id'] == '0':
214 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
218 def test_13_delete_simple_topology(self):
219 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
220 self.assertEqual(response.status_code, requests.codes.ok)
223 # Test deleted topology
224 def test_14_test_topology_simple_deleted(self):
225 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
226 self.assertEqual(response.status_code, requests.codes.conflict)
229 # Load complex topology
230 def test_15_load_complex_topology(self):
231 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
232 self.assertEqual(response.status_code, 201)
235 # Get existing nodeId
236 def test_16_get_nodeId(self):
237 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
241 res['node'][0]['node-id'],
245 # Test failed path computation
246 def test_17_fail_path_computation(self):
247 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
248 {"input": {"service-handler-header": {"request-id": "request-1"}}})
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('Service Name is not set',
252 res['output']['configuration-response-common']['response-message'])
255 # Test1 success path computation
256 def test_18_success1_path_computation(self):
257 response = test_utils.path_computation_request("request1", "service1",
258 {"service-format": "Ethernet", "service-rate": "100",
259 "clli": "ORANGE2", "node-id": "XPONDER-2-2"},
260 {"service-format": "Ethernet", "service-rate": "100",
261 "clli": "ORANGE1", "node-id": "XPONDER-1-2"},
262 {"customer-code": ["Some customer-code"],
263 "co-routing": {"existing-service": ["Some existing-service"]}
265 {"customer-code": ["Some customer-code"],
266 "co-routing": {"existing-service": ["Some existing-service"]}
268 "hop-count", {"locally-protected-links": "true"})
269 self.assertEqual(response.status_code, requests.codes.ok)
270 res = response.json()
271 self.assertIn('Path is calculated',
272 res['output']['configuration-response-common']['response-message'])
275 # Test2 success path computation with path description
276 def test_19_success2_path_computation(self):
277 response = test_utils.path_computation_request("request 1", "service 1",
278 {"service-rate": "100", "service-format": "Ethernet",
279 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
280 {"service-rate": "100", "service-format": "Ethernet",
281 "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
282 self.assertEqual(response.status_code, requests.codes.ok)
283 res = response.json()
284 self.assertIn('Path is calculated',
285 res['output']['configuration-response-common']['response-message'])
286 self.assertEqual(5, res['output']['response-parameters']['path-description']
287 ['aToZ-direction']['aToZ-wavelength-number'])
288 self.assertEqual(5, res['output']['response-parameters']['path-description']
289 ['zToA-direction']['zToA-wavelength-number'])
292 # Test3 success path computation with hard-constraints exclude
293 def test_20_success3_path_computation(self):
294 response = test_utils.path_computation_request("request 1", "service 1",
295 {"service-rate": "100", "service-format": "Ethernet",
296 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
297 {"service-rate": "100", "service-format": "Ethernet",
298 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
299 {"exclude_": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
300 self.assertEqual(response.status_code, requests.codes.ok)
301 res = response.json()
302 self.assertIn('Path is calculated',
303 res['output']['configuration-response-common']['response-message'])
304 self.assertEqual(9, res['output']['response-parameters']['path-description']
305 ['aToZ-direction']['aToZ-wavelength-number'])
306 self.assertEqual(9, res['output']['response-parameters']['path-description']
307 ['zToA-direction']['zToA-wavelength-number'])
310 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
311 def test_21_path_computation_before_oms_attribute_deletion(self):
312 response = test_utils.path_computation_request("request 1", "service 1",
313 {"service-rate": "100", "service-format": "Ethernet",
314 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
315 {"service-rate": "100", "service-format": "Ethernet",
316 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
317 self.assertEqual(response.status_code, requests.codes.ok)
318 res = response.json()
319 self.assertIn('Path is calculated',
320 res['output']['configuration-response-common']['response-message'])
321 nbElmPath = len(res['output']['response-parameters']['path-description']
322 ['aToZ-direction']['aToZ'])
323 self.assertEqual(31, nbElmPath)
324 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
326 for i in range(0, nbElmPath):
327 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
329 if resource_i == link:
331 self.assertEqual(find, True)
334 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
335 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
336 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
337 self.assertEqual(response.status_code, requests.codes.ok)
340 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
341 def test_23_path_computation_after_oms_attribute_deletion(self):
342 response = test_utils.path_computation_request("request 1", "service 1",
343 {"service-rate": "100", "service-format": "Ethernet",
344 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
345 {"service-rate": "100", "service-format": "Ethernet",
346 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
347 self.assertEqual(response.status_code, requests.codes.ok)
348 res = response.json()
349 self.assertIn('Path is calculated',
350 res['output']['configuration-response-common']['response-message'])
351 nbElmPath = len(res['output']['response-parameters']['path-description']
352 ['aToZ-direction']['aToZ'])
353 self.assertEqual(47, nbElmPath)
354 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
356 for i in range(0, nbElmPath):
357 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
359 if resource_i == link:
361 self.assertNotEqual(find, True)
364 # Delete complex topology
365 def test_24_delete_complex_topology(self):
366 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
367 self.assertEqual(response.status_code, requests.codes.ok)
370 # Test deleted complex topology
371 def test_25_test_topology_complex_deleted(self):
372 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
373 self.assertEqual(response.status_code, requests.codes.conflict)
377 def test_26_delete_port_mapping(self):
378 response = test_utils.delete_request(test_utils.URL_FULL_PORTMAPPING)
379 self.assertEqual(response.status_code, requests.codes.ok)
383 if __name__ == "__main__":
384 unittest.main(verbosity=2)