2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
17 from common import test_utils
20 class TransportPCEtesting(unittest.TestCase):
22 simple_topo_bi_dir_data = None
23 simple_topo_uni_dir_data = None
24 complex_topo_uni_dir_data = None
30 sample_files_parsed = False
31 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
32 "..", "..", "sample_configs", "honeynode-topo.xml")
33 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
34 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
36 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
37 "..", "..", "sample_configs", "NW-simple-topology.xml")
39 with open(TOPO_UNI_DIR_FILE, 'r') as topo_uni_dir:
40 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
42 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
43 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
44 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r') as topo_uni_dir_complex:
45 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
46 sample_files_parsed = True
47 except PermissionError as err:
48 print("Permission Error when trying to read sample files\n", err)
50 except FileNotFoundError as err:
51 print("File Not found Error when trying to read sample files\n", err)
54 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
57 if sample_files_parsed:
58 print("sample files content loaded")
60 cls.processes = test_utils.start_tpce()
63 def tearDownClass(cls):
64 for process in cls.processes:
65 test_utils.shutdown_process(process)
66 print("all processes killed")
68 def setUp(self): # instruction executed before each test method
71 # Load simple bidirectional topology
72 def test_01_load_simple_topology_bi(self):
73 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
74 self.assertEqual(response.status_code, requests.codes.ok)
78 def test_02_get_nodeId(self):
79 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
80 self.assertEqual(response.status_code, requests.codes.ok)
83 res['node'][0]['node-id'], 'ROADMA01-SRG1')
87 def test_03_get_linkId(self):
88 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
89 self.assertEqual(response.status_code, requests.codes.ok)
92 res['ietf-network-topology:link'][0]['link-id'],
93 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
96 # Path Computation success
97 def test_04_path_computation_xpdr_bi(self):
98 response = test_utils.path_computation_request("request-1", "service-1",
99 {"node-id": "XPDRA01", "service-rate": "100", "service-format": "Ethernet", "clli": "nodeA"},
100 {"node-id": "XPDRC01", "service-rate": "100", "service-format": "Ethernet", "clli": "nodeC"})
101 self.assertEqual(response.status_code, requests.codes.ok)
102 res = response.json()
103 self.assertIn('Path is calculated',
104 res['output']['configuration-response-common']['response-message'])
107 # Path Computation success
108 def test_05_path_computation_rdm_bi(self):
109 response = test_utils.path_computation_request("request-1", "service-1",
110 {"node-id": "ROADMA01", "service-rate": "100", "service-format": "Ethernet", "clli": "NodeA"},
111 {"node-id": "ROADMC01", "service-rate": "100", "service-format": "Ethernet", "clli": "NodeC"})
112 self.assertEqual(response.status_code, requests.codes.ok)
113 res = response.json()
114 self.assertIn('Path is calculated',
115 res['output']['configuration-response-common']['response-message'])
119 def test_06_delete_simple_topology_bi(self):
120 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
121 self.assertEqual(response.status_code, requests.codes.ok)
124 # Test deleted topology
125 def test_07_test_topology_simple_bi_deleted(self):
126 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
127 self.assertEqual(response.status_code, requests.codes.conflict)
130 # Load simple bidirectional topology
131 def test_08_load_simple_topology_uni(self):
132 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
133 self.assertEqual(response.status_code, 201)
136 # Get existing nodeId
137 def test_09_get_nodeId(self):
138 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
139 self.assertEqual(response.status_code, requests.codes.ok)
140 res = response.json()
142 res['node'][0]['node-id'],
146 # Get existing linkId
147 def test_10_get_linkId(self):
148 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
149 self.assertEqual(response.status_code, requests.codes.ok)
150 res = response.json()
152 res['ietf-network-topology:link'][0]['link-id'],
153 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
156 # Path Computation success
157 def test_11_path_computation_xpdr_uni(self):
158 response = test_utils.path_computation_request("request-1", "service-1",
159 {"node-id": "XPONDER-1-2", "service-rate": "100", "service-format": "Ethernet", "clli": "ORANGE1"},
160 {"node-id": "XPONDER-3-2", "service-rate": "100", "service-format": "Ethernet", "clli": "ORANGE3"})
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Path is calculated',
164 res['output']['configuration-response-common']['response-message'])
167 # Path Computation success
168 def test_12_path_computation_rdm_uni(self):
169 response = test_utils.path_computation_request("request1", "service1",
170 {"service-rate": "100", "service-format": "Ethernet", "clli": "cll21", "node-id": "OpenROADM-2-1"},
171 {"service-rate": "100", "service-format": "Ethernet", "clli": "ncli22", "node-id": "OpenROADM-2-2"})
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.assertIn('Path is calculated',
175 res['output']['configuration-response-common']['response-message'])
177 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
178 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
179 self.assertEqual(atozList, 15)
180 self.assertEqual(ztoaList, 15)
181 for i in range(0, 15):
182 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
183 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
184 if (atoz['id'] == '14'):
185 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
186 if (ztoa['id'] == '0'):
187 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
191 def test_13_delete_simple_topology(self):
192 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
193 self.assertEqual(response.status_code, requests.codes.ok)
196 # Test deleted topology
197 def test_14_test_topology_simple_deleted(self):
198 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
199 self.assertEqual(response.status_code, requests.codes.conflict)
202 # Load complex topology
203 def test_15_load_complex_topology(self):
204 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
205 self.assertEqual(response.status_code, 201)
208 # Get existing nodeId
209 def test_16_get_nodeId(self):
210 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
214 res['node'][0]['node-id'],
218 # Test failed path computation
219 def test_17_fail_path_computation(self):
220 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
221 {"input": { "service-handler-header": { "request-id": "request-1" }}})
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 self.assertIn('Service Name is not set',
225 res['output']['configuration-response-common']['response-message'])
228 # Test1 success path computation
229 def test_18_success1_path_computation(self):
230 response = test_utils.path_computation_request("request1", "service1",
231 {"service-format": "Ethernet","service-rate": "100", "clli": "ORANGE2", "node-id": "XPONDER-2-2",
232 "tx-direction": {"port": {
233 "port-device-name": "Some port-device-name",
234 "port-type": "Some port-type",
235 "port-name": "Some port-name",
236 "port-rack": "Some port-rack",
237 "port-shelf": "Some port-shelf",
238 "port-slot": "Some port-slot",
239 "port-sub-slot": "Some port-sub-slot"
241 "rx-direction": {"port": {
242 "port-device-name": "Some port-device-name",
243 "port-type": "Some port-type",
244 "port-name": "Some port-name",
245 "port-rack": "Some port-rack",
246 "port-shelf": "Some port-shelf",
247 "port-slot": "Some port-slot",
248 "port-sub-slot": "Some port-sub-slot"
250 {"service-format": "Ethernet", "service-rate": "100", "clli": "ORANGE1", "node-id": "XPONDER-1-2",
251 "tx-direction": {"port": {
252 "port-device-name": "Some port-device-name",
253 "port-type": "Some port-type",
254 "port-name": "Some port-name",
255 "port-rack": "Some port-rack",
256 "port-shelf": "Some port-shelf",
257 "port-slot": "Some port-slot",
258 "port-sub-slot": "Some port-sub-slot"
260 "rx-direction": {"port": {
261 "port-device-name": "Some port-device-name",
262 "port-type": "Some port-type",
263 "port-name": "Some port-name",
264 "port-rack": "Some port-rack",
265 "port-shelf": "Some port-shelf",
266 "port-slot": "Some port-slot",
267 "port-sub-slot": "Some port-sub-slot"
269 {"customer-code": ["Some customer-code"], "co-routing": {"existing-service": ["Some existing-service"]}},
270 {"customer-code": ["Some customer-code"], "co-routing": {"existing-service": ["Some existing-service"]}},
271 "hop-count", {"locally-protected-links": "true"})
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 self.assertIn('Path is calculated',
275 res['output']['configuration-response-common']['response-message'])
278 # Test2 success path computation with path description
279 def test_19_success2_path_computation(self):
280 response = test_utils.path_computation_request("request 1", "service 1",
281 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
282 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
283 self.assertEqual(response.status_code, requests.codes.ok)
284 res = response.json()
285 self.assertIn('Path is calculated',
286 res['output']['configuration-response-common']['response-message'])
287 self.assertEqual(5, res['output']['response-parameters']['path-description']
288 ['aToZ-direction']['aToZ-wavelength-number'])
289 self.assertEqual(5, res['output']['response-parameters']['path-description']
290 ['zToA-direction']['zToA-wavelength-number'])
293 # Test3 success path computation with hard-constraints exclude
294 def test_20_success3_path_computation(self):
295 response = test_utils.path_computation_request("request 1", "service 1",
296 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
297 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
298 {"exclude_": { "node-id": ["OpenROADM-2-1", "OpenROADM-2-2"] }})
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 self.assertIn('Path is calculated',
302 res['output']['configuration-response-common']['response-message'])
303 self.assertEqual(9, res['output']['response-parameters']['path-description']
304 ['aToZ-direction']['aToZ-wavelength-number'])
305 self.assertEqual(9, res['output']['response-parameters']['path-description']
306 ['zToA-direction']['zToA-wavelength-number'])
309 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
310 def test_21_path_computation_before_oms_attribute_deletion(self):
311 response = test_utils.path_computation_request("request 1", "service 1",
312 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
313 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
314 self.assertEqual(response.status_code, requests.codes.ok)
315 res = response.json()
316 self.assertIn('Path is calculated',
317 res['output']['configuration-response-common']['response-message'])
318 nbElmPath = len(res['output']['response-parameters']['path-description']
319 ['aToZ-direction']['aToZ'])
320 self.assertEqual(31, nbElmPath)
321 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
323 for i in range(0, nbElmPath):
324 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
325 if(resource_i == link):
327 self.assertEqual(find, True)
330 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
331 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
332 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
333 self.assertEqual(response.status_code, requests.codes.ok)
336 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
337 def test_23_path_computation_after_oms_attribute_deletion(self):
338 response = test_utils.path_computation_request("request 1", "service 1",
339 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
340 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
343 self.assertIn('Path is calculated',
344 res['output']['configuration-response-common']['response-message'])
345 nbElmPath = len(res['output']['response-parameters']['path-description']
346 ['aToZ-direction']['aToZ'])
347 self.assertEqual(47, nbElmPath)
348 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
350 for i in range(0, nbElmPath):
351 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
352 if (resource_i == link):
354 self.assertNotEqual(find, True)
357 # Delete complex topology
358 def test_24_delete_complex_topology(self):
359 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
360 self.assertEqual(response.status_code, requests.codes.ok)
363 # Test deleted complex topology
364 def test_25_test_topology_complex_deleted(self):
365 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
366 self.assertEqual(response.status_code, requests.codes.conflict)
370 if __name__ == "__main__":
371 unittest.main(verbosity=2)