2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
28 simple_topo_bi_dir_data = None
29 simple_topo_uni_dir_data = None
30 complex_topo_uni_dir_data = None
31 port_mapping_data = None
36 # pylint: disable=bare-except
37 sample_files_parsed = False
39 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
40 "..", "..", "sample_configs", "honeynode-topo.xml")
41 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
42 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
44 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
45 "..", "..", "sample_configs", "NW-simple-topology.xml")
47 with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
48 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
50 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
51 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
52 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
53 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
54 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
55 "..", "..", "sample_configs", "pce_portmapping_121.json")
56 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
57 cls.port_mapping_data = port_mapping.read()
58 sample_files_parsed = True
59 except PermissionError as err:
60 print("Permission Error when trying to read sample files\n", err)
62 except FileNotFoundError as err:
63 print("File Not found Error when trying to read sample files\n", err)
66 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
69 if sample_files_parsed:
70 print("sample files content loaded")
72 cls.processes = test_utils.start_tpce()
75 def tearDownClass(cls):
76 # pylint: disable=not-an-iterable
77 for process in cls.processes:
78 test_utils.shutdown_process(process)
79 print("all processes killed")
81 def setUp(self): # instruction executed before each test method
85 def test_00_load_port_mapping(self):
86 response = test_utils.rawpost_request(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
87 self.assertEqual(response.status_code, requests.codes.no_content)
90 # Load simple bidirectional topology
91 def test_01_load_simple_topology_bi(self):
92 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
93 self.assertEqual(response.status_code, requests.codes.ok)
97 def test_02_get_nodeId(self):
98 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
99 self.assertEqual(response.status_code, requests.codes.ok)
100 res = response.json()
102 res['node'][0]['node-id'], 'ROADMA01-SRG1')
105 # Get existing linkId
106 def test_03_get_linkId(self):
107 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
111 res['ietf-network-topology:link'][0]['link-id'],
112 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
115 # Path Computation success
116 def test_04_path_computation_xpdr_bi(self):
117 response = test_utils.path_computation_request("request-1", "service-1",
118 {"node-id": "XPDRA01", "service-rate": "100",
119 "service-format": "Ethernet", "clli": "nodeA"},
120 {"node-id": "XPDRC01", "service-rate": "100",
121 "service-format": "Ethernet", "clli": "nodeC"})
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 self.assertIn('Path is calculated',
125 res['output']['configuration-response-common']['response-message'])
128 # Path Computation success
129 def test_05_path_computation_rdm_bi(self):
130 response = test_utils.path_computation_request("request-1", "service-1",
131 {"node-id": "ROADMA01", "service-rate": "100",
132 "service-format": "Ethernet", "clli": "NodeA"},
133 {"node-id": "ROADMC01", "service-rate": "100",
134 "service-format": "Ethernet", "clli": "NodeC"})
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
137 self.assertIn('Path is calculated',
138 res['output']['configuration-response-common']['response-message'])
142 def test_06_delete_simple_topology_bi(self):
143 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
144 self.assertEqual(response.status_code, requests.codes.ok)
147 # Test deleted topology
148 def test_07_test_topology_simple_bi_deleted(self):
149 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
150 self.assertEqual(response.status_code, requests.codes.conflict)
153 # Load simple bidirectional topology
154 def test_08_load_simple_topology_uni(self):
155 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
156 self.assertEqual(response.status_code, 201)
159 # Get existing nodeId
160 def test_09_get_nodeId(self):
161 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
165 res['node'][0]['node-id'],
169 # Get existing linkId
170 def test_10_get_linkId(self):
171 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
175 res['ietf-network-topology:link'][0]['link-id'],
176 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
179 # Path Computation success
180 def test_11_path_computation_xpdr_uni(self):
181 response = test_utils.path_computation_request("request-1", "service-1",
182 {"node-id": "XPONDER-1-2", "service-rate": "100",
183 "service-format": "Ethernet", "clli": "ORANGE1"},
184 {"node-id": "XPONDER-3-2", "service-rate": "100",
185 "service-format": "Ethernet", "clli": "ORANGE3"})
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Path is calculated',
189 res['output']['configuration-response-common']['response-message'])
192 # Path Computation success
193 def test_12_path_computation_rdm_uni(self):
194 response = test_utils.path_computation_request("request1", "service1",
195 {"service-rate": "100", "service-format": "Ethernet",
196 "clli": "cll21", "node-id": "OpenROADM-2-1"},
197 {"service-rate": "100", "service-format": "Ethernet",
198 "clli": "ncli22", "node-id": "OpenROADM-2-2"})
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
201 self.assertIn('Path is calculated',
202 res['output']['configuration-response-common']['response-message'])
204 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
205 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
206 self.assertEqual(atozList, 15)
207 self.assertEqual(ztoaList, 15)
208 for i in range(0, 15):
209 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
210 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
211 if atoz['id'] == '14':
212 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
213 if ztoa['id'] == '0':
214 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
218 def test_13_delete_simple_topology(self):
219 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
220 self.assertEqual(response.status_code, requests.codes.ok)
223 # Test deleted topology
224 def test_14_test_topology_simple_deleted(self):
225 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
226 self.assertEqual(response.status_code, requests.codes.conflict)
229 # Load complex topology
230 def test_15_load_complex_topology(self):
231 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
232 self.assertEqual(response.status_code, 201)
235 # Get existing nodeId
236 def test_16_get_nodeId(self):
237 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
241 res['node'][0]['node-id'],
245 # Test failed path computation
246 def test_17_fail_path_computation(self):
247 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
248 {"input": {"service-handler-header": {"request-id": "request-1"}}})
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('Service Name is not set',
252 res['output']['configuration-response-common']['response-message'])
255 # Test1 success path computation
256 def test_18_success1_path_computation(self):
257 response = test_utils.path_computation_request("request1", "service1",
258 {"service-format": "Ethernet", "service-rate": "100",
259 "clli": "ORANGE2", "node-id": "XPONDER-2-2"},
260 {"service-format": "Ethernet", "service-rate": "100",
261 "clli": "ORANGE1", "node-id": "XPONDER-1-2"},
262 {"customer-code": ["Some customer-code"],
263 "co-routing": {"existing-service": ["Some existing-service"]}
265 {"customer-code": ["Some customer-code"],
266 "co-routing": {"existing-service": ["Some existing-service"]}
268 self.assertEqual(response.status_code, requests.codes.ok)
269 res = response.json()
270 self.assertIn('Path is calculated',
271 res['output']['configuration-response-common']['response-message'])
274 # Test2 success path computation with path description
275 def test_19_success2_path_computation(self):
276 response = test_utils.path_computation_request("request 1", "service 1",
277 {"service-rate": "100", "service-format": "Ethernet",
278 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
279 {"service-rate": "100", "service-format": "Ethernet",
280 "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
281 self.assertEqual(response.status_code, requests.codes.ok)
282 res = response.json()
283 self.assertIn('Path is calculated',
284 res['output']['configuration-response-common']['response-message'])
285 self.assertEqual(5, res['output']['response-parameters']['path-description']
286 ['aToZ-direction']['aToZ-wavelength-number'])
287 self.assertEqual(5, res['output']['response-parameters']['path-description']
288 ['zToA-direction']['zToA-wavelength-number'])
291 # Test3 success path computation with hard-constraints exclude
292 def test_20_success3_path_computation(self):
293 response = test_utils.path_computation_request("request 1", "service 1",
294 {"service-rate": "100", "service-format": "Ethernet",
295 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
296 {"service-rate": "100", "service-format": "Ethernet",
297 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
298 {"exclude": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 self.assertIn('Path is calculated',
302 res['output']['configuration-response-common']['response-message'])
303 self.assertEqual(9, res['output']['response-parameters']['path-description']
304 ['aToZ-direction']['aToZ-wavelength-number'])
305 self.assertEqual(9, res['output']['response-parameters']['path-description']
306 ['zToA-direction']['zToA-wavelength-number'])
309 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
310 def test_21_path_computation_before_oms_attribute_deletion(self):
311 response = test_utils.path_computation_request("request 1", "service 1",
312 {"service-rate": "100", "service-format": "Ethernet",
313 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
314 {"service-rate": "100", "service-format": "Ethernet",
315 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
316 self.assertEqual(response.status_code, requests.codes.ok)
317 res = response.json()
318 self.assertIn('Path is calculated',
319 res['output']['configuration-response-common']['response-message'])
320 nbElmPath = len(res['output']['response-parameters']['path-description']
321 ['aToZ-direction']['aToZ'])
322 self.assertEqual(31, nbElmPath)
323 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
325 for i in range(0, nbElmPath):
326 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
328 if resource_i == link:
330 self.assertEqual(find, True)
333 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
334 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
335 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
336 self.assertEqual(response.status_code, requests.codes.ok)
339 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
340 def test_23_path_computation_after_oms_attribute_deletion(self):
341 response = test_utils.path_computation_request("request 1", "service 1",
342 {"service-rate": "100", "service-format": "Ethernet",
343 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
344 {"service-rate": "100", "service-format": "Ethernet",
345 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
346 self.assertEqual(response.status_code, requests.codes.ok)
347 res = response.json()
348 self.assertIn('Path is calculated',
349 res['output']['configuration-response-common']['response-message'])
350 nbElmPath = len(res['output']['response-parameters']['path-description']
351 ['aToZ-direction']['aToZ'])
352 self.assertEqual(47, nbElmPath)
353 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
355 for i in range(0, nbElmPath):
356 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
358 if resource_i == link:
360 self.assertNotEqual(find, True)
363 # Delete complex topology
364 def test_24_delete_complex_topology(self):
365 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
366 self.assertEqual(response.status_code, requests.codes.ok)
369 # Test deleted complex topology
370 def test_25_test_topology_complex_deleted(self):
371 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
372 self.assertEqual(response.status_code, requests.codes.conflict)
376 def test_26_delete_port_mapping(self):
377 response = test_utils.delete_request(test_utils.URL_FULL_PORTMAPPING)
378 self.assertEqual(response.status_code, requests.codes.ok)
382 if __name__ == "__main__":
383 unittest.main(verbosity=2)