2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
16 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEtesting(unittest.TestCase):
28 simple_topo_bi_dir_data = None
29 simple_topo_uni_dir_data = None
30 complex_topo_uni_dir_data = None
31 port_mapping_data = None
36 # pylint: disable=bare-except
38 sample_files_parsed = False
39 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
40 "..", "..", "sample_configs", "honeynode-topo.xml")
41 with open(TOPO_BI_DIR_FILE, 'r', encoding='utf-8') as topo_bi_dir:
42 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
44 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
45 "..", "..", "sample_configs", "NW-simple-topology.xml")
47 with open(TOPO_UNI_DIR_FILE, 'r', encoding='utf-8') as topo_uni_dir:
48 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
50 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
51 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
52 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r', encoding='utf-8') as topo_uni_dir_complex:
53 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
54 PORT_MAPPING_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
55 "..", "..", "sample_configs", "pce_portmapping_121.json")
56 with open(PORT_MAPPING_FILE, 'r', encoding='utf-8') as port_mapping:
57 cls.port_mapping_data = port_mapping.read()
58 sample_files_parsed = True
59 except PermissionError as err:
60 print("Permission Error when trying to read sample files\n", err)
62 except FileNotFoundError as err:
63 print("File Not found Error when trying to read sample files\n", err)
66 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
69 if sample_files_parsed:
70 print("sample files content loaded")
72 cls.processes = test_utils.start_tpce()
75 def tearDownClass(cls):
76 # pylint: disable=not-an-iterable
77 for process in cls.processes:
78 test_utils.shutdown_process(process)
79 print("all processes killed")
81 def setUp(self): # instruction executed before each test method
85 def test_00_load_port_mapping(self):
86 response = test_utils.rawpost_request(test_utils.URL_FULL_PORTMAPPING, self.port_mapping_data)
87 self.assertEqual(response.status_code, requests.codes.no_content)
90 # Load simple bidirectional topology
91 def test_01_load_simple_topology_bi(self):
92 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
93 self.assertEqual(response.status_code, requests.codes.ok)
97 def test_02_get_nodeId(self):
98 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
99 self.assertEqual(response.status_code, requests.codes.ok)
100 res = response.json()
102 res['node'][0]['node-id'], 'ROADMA01-SRG1')
105 # Get existing linkId
106 def test_03_get_linkId(self):
107 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
108 self.assertEqual(response.status_code, requests.codes.ok)
109 res = response.json()
111 res['ietf-network-topology:link'][0]['link-id'],
112 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
115 # Path Computation success
116 def test_04_path_computation_xpdr_bi(self):
117 response = test_utils.path_computation_request("request-1", "service-1",
118 {"node-id": "XPDRA01", "service-rate": "100",
119 "service-format": "Ethernet", "clli": "nodeA"},
120 {"node-id": "XPDRC01", "service-rate": "100",
121 "service-format": "Ethernet", "clli": "nodeC"})
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 self.assertIn('Path is calculated',
125 res['output']['configuration-response-common']['response-message'])
128 # Path Computation success
129 def test_05_path_computation_rdm_bi(self):
130 response = test_utils.path_computation_request("request-1", "service-1",
131 {"node-id": "ROADMA01", "service-rate": "100",
132 "service-format": "Ethernet", "clli": "NodeA"},
133 {"node-id": "ROADMC01", "service-rate": "100",
134 "service-format": "Ethernet", "clli": "NodeC"})
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
137 self.assertIn('Path is calculated',
138 res['output']['configuration-response-common']['response-message'])
142 def test_06_delete_simple_topology_bi(self):
143 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
144 self.assertEqual(response.status_code, requests.codes.ok)
147 # Test deleted topology
148 def test_07_test_topology_simple_bi_deleted(self):
149 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
150 self.assertEqual(response.status_code, requests.codes.conflict)
153 # Load simple bidirectional topology
154 def test_08_load_simple_topology_uni(self):
155 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
156 self.assertEqual(response.status_code, 201)
159 # Get existing nodeId
160 def test_09_get_nodeId(self):
161 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
165 res['node'][0]['node-id'],
169 # Get existing linkId
170 def test_10_get_linkId(self):
171 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
175 res['ietf-network-topology:link'][0]['link-id'],
176 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
179 # Path Computation success
180 def test_11_path_computation_xpdr_uni(self):
181 response = test_utils.path_computation_request("request-1", "service-1",
182 {"node-id": "XPONDER-1-2", "service-rate": "100",
183 "service-format": "Ethernet", "clli": "ORANGE1"},
184 {"node-id": "XPONDER-3-2", "service-rate": "100",
185 "service-format": "Ethernet", "clli": "ORANGE3"})
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Path is calculated',
189 res['output']['configuration-response-common']['response-message'])
192 # Path Computation success
193 def test_12_path_computation_rdm_uni(self):
194 response = test_utils.path_computation_request("request1", "service1",
195 {"service-rate": "100", "service-format": "Ethernet",
196 "clli": "cll21", "node-id": "OpenROADM-2-1"},
197 {"service-rate": "100", "service-format": "Ethernet",
198 "clli": "ncli22", "node-id": "OpenROADM-2-2"})
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
201 self.assertIn('Path is calculated',
202 res['output']['configuration-response-common']['response-message'])
204 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
205 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
206 self.assertEqual(atozList, 15)
207 self.assertEqual(ztoaList, 15)
208 for i in range(0, 15):
209 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
210 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
211 if atoz['id'] == '14':
212 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
213 if ztoa['id'] == '0':
214 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
218 def test_13_delete_simple_topology(self):
219 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
220 self.assertEqual(response.status_code, requests.codes.ok)
223 # Test deleted topology
224 def test_14_test_topology_simple_deleted(self):
225 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
226 self.assertEqual(response.status_code, requests.codes.conflict)
229 # Load complex topology
230 def test_15_load_complex_topology(self):
231 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
232 self.assertEqual(response.status_code, 201)
235 # Get existing nodeId
236 def test_16_get_nodeId(self):
237 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
241 res['node'][0]['node-id'],
245 # Test failed path computation
246 def test_17_fail_path_computation(self):
247 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
248 {"input": {"service-handler-header": {"request-id": "request-1"}}})
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('Service Name is not set',
252 res['output']['configuration-response-common']['response-message'])
255 # Test1 success path computation
256 def test_18_success1_path_computation(self):
257 response = test_utils.path_computation_request("request1", "service1",
258 {"service-format": "Ethernet", "service-rate": "100",
259 "clli": "ORANGE2", "node-id": "XPONDER-2-2",
260 "tx-direction": {"port": {
261 "port-device-name": "Some port-device-name",
262 "port-type": "Some port-type",
263 "port-name": "Some port-name",
264 "port-rack": "Some port-rack",
265 "port-shelf": "Some port-shelf",
266 "port-slot": "Some port-slot",
267 "port-sub-slot": "Some port-sub-slot"
269 "rx-direction": {"port": {
270 "port-device-name": "Some port-device-name",
271 "port-type": "Some port-type",
272 "port-name": "Some port-name",
273 "port-rack": "Some port-rack",
274 "port-shelf": "Some port-shelf",
275 "port-slot": "Some port-slot",
276 "port-sub-slot": "Some port-sub-slot"
278 {"service-format": "Ethernet", "service-rate": "100",
279 "clli": "ORANGE1", "node-id": "XPONDER-1-2",
280 "tx-direction": {"port": {
281 "port-device-name": "Some port-device-name",
282 "port-type": "Some port-type",
283 "port-name": "Some port-name",
284 "port-rack": "Some port-rack",
285 "port-shelf": "Some port-shelf",
286 "port-slot": "Some port-slot",
287 "port-sub-slot": "Some port-sub-slot"
289 "rx-direction": {"port": {
290 "port-device-name": "Some port-device-name",
291 "port-type": "Some port-type",
292 "port-name": "Some port-name",
293 "port-rack": "Some port-rack",
294 "port-shelf": "Some port-shelf",
295 "port-slot": "Some port-slot",
296 "port-sub-slot": "Some port-sub-slot"
298 {"customer-code": ["Some customer-code"],
299 "co-routing": {"existing-service": ["Some existing-service"]}
301 {"customer-code": ["Some customer-code"],
302 "co-routing": {"existing-service": ["Some existing-service"]}
304 "hop-count", {"locally-protected-links": "true"})
305 self.assertEqual(response.status_code, requests.codes.ok)
306 res = response.json()
307 self.assertIn('Path is calculated',
308 res['output']['configuration-response-common']['response-message'])
311 # Test2 success path computation with path description
312 def test_19_success2_path_computation(self):
313 response = test_utils.path_computation_request("request 1", "service 1",
314 {"service-rate": "100", "service-format": "Ethernet",
315 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
316 {"service-rate": "100", "service-format": "Ethernet",
317 "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
320 self.assertIn('Path is calculated',
321 res['output']['configuration-response-common']['response-message'])
322 self.assertEqual(5, res['output']['response-parameters']['path-description']
323 ['aToZ-direction']['aToZ-wavelength-number'])
324 self.assertEqual(5, res['output']['response-parameters']['path-description']
325 ['zToA-direction']['zToA-wavelength-number'])
328 # Test3 success path computation with hard-constraints exclude
329 def test_20_success3_path_computation(self):
330 response = test_utils.path_computation_request("request 1", "service 1",
331 {"service-rate": "100", "service-format": "Ethernet",
332 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
333 {"service-rate": "100", "service-format": "Ethernet",
334 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
335 {"exclude_": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
336 self.assertEqual(response.status_code, requests.codes.ok)
337 res = response.json()
338 self.assertIn('Path is calculated',
339 res['output']['configuration-response-common']['response-message'])
340 self.assertEqual(9, res['output']['response-parameters']['path-description']
341 ['aToZ-direction']['aToZ-wavelength-number'])
342 self.assertEqual(9, res['output']['response-parameters']['path-description']
343 ['zToA-direction']['zToA-wavelength-number'])
346 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
347 def test_21_path_computation_before_oms_attribute_deletion(self):
348 response = test_utils.path_computation_request("request 1", "service 1",
349 {"service-rate": "100", "service-format": "Ethernet",
350 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
351 {"service-rate": "100", "service-format": "Ethernet",
352 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
353 self.assertEqual(response.status_code, requests.codes.ok)
354 res = response.json()
355 self.assertIn('Path is calculated',
356 res['output']['configuration-response-common']['response-message'])
357 nbElmPath = len(res['output']['response-parameters']['path-description']
358 ['aToZ-direction']['aToZ'])
359 self.assertEqual(31, nbElmPath)
360 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
362 for i in range(0, nbElmPath):
363 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
365 if resource_i == link:
367 self.assertEqual(find, True)
370 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
371 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
372 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
373 self.assertEqual(response.status_code, requests.codes.ok)
376 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
377 def test_23_path_computation_after_oms_attribute_deletion(self):
378 response = test_utils.path_computation_request("request 1", "service 1",
379 {"service-rate": "100", "service-format": "Ethernet",
380 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
381 {"service-rate": "100", "service-format": "Ethernet",
382 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
383 self.assertEqual(response.status_code, requests.codes.ok)
384 res = response.json()
385 self.assertIn('Path is calculated',
386 res['output']['configuration-response-common']['response-message'])
387 nbElmPath = len(res['output']['response-parameters']['path-description']
388 ['aToZ-direction']['aToZ'])
389 self.assertEqual(47, nbElmPath)
390 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2", "state": "inService"}
392 for i in range(0, nbElmPath):
393 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
395 if resource_i == link:
397 self.assertNotEqual(find, True)
400 # Delete complex topology
401 def test_24_delete_complex_topology(self):
402 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
403 self.assertEqual(response.status_code, requests.codes.ok)
406 # Test deleted complex topology
407 def test_25_test_topology_complex_deleted(self):
408 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
409 self.assertEqual(response.status_code, requests.codes.conflict)
413 def test_26_delete_port_mapping(self):
414 response = test_utils.delete_request(test_utils.URL_FULL_PORTMAPPING)
415 self.assertEqual(response.status_code, requests.codes.ok)
419 if __name__ == "__main__":
420 unittest.main(verbosity=2)