2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
17 from common import test_utils
20 class TransportPCEtesting(unittest.TestCase):
22 simple_topo_bi_dir_data = None
23 simple_topo_uni_dir_data = None
24 complex_topo_uni_dir_data = None
30 sample_files_parsed = False
31 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
32 "..", "..", "sample_configs", "honeynode-topo.xml")
33 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
34 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
36 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
37 "..", "..", "sample_configs", "NW-simple-topology.xml")
39 with open(TOPO_UNI_DIR_FILE, 'r') as topo_uni_dir:
40 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
42 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
43 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
44 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r') as topo_uni_dir_complex:
45 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
46 sample_files_parsed = True
47 except PermissionError as err:
48 print("Permission Error when trying to read sample files\n", err)
50 except FileNotFoundError as err:
51 print("File Not found Error when trying to read sample files\n", err)
54 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
57 if sample_files_parsed:
58 print("sample files content loaded")
60 cls.processes = test_utils.start_tpce()
63 def tearDownClass(cls):
64 for process in cls.processes:
65 test_utils.shutdown_process(process)
66 print("all processes killed")
68 def setUp(self): # instruction executed before each test method
71 # Load simple bidirectional topology
72 def test_01_load_simple_topology_bi(self):
73 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
74 self.assertEqual(response.status_code, requests.codes.ok)
78 def test_02_get_nodeId(self):
79 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
80 self.assertEqual(response.status_code, requests.codes.ok)
83 res['node'][0]['node-id'], 'ROADMA01-SRG1')
87 def test_03_get_linkId(self):
88 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
89 self.assertEqual(response.status_code, requests.codes.ok)
92 res['ietf-network-topology:link'][0]['link-id'],
93 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
96 # Path Computation success
97 def test_04_path_computation_xpdr_bi(self):
98 response = test_utils.path_computation_request("request-1", "service-1",
99 {"node-id": "XPDRA01", "service-rate": "100",
100 "service-format": "Ethernet", "clli": "nodeA"},
101 {"node-id": "XPDRC01", "service-rate": "100", "service-format": "Ethernet", "clli": "nodeC"})
102 self.assertEqual(response.status_code, requests.codes.ok)
103 res = response.json()
104 self.assertIn('Path is calculated',
105 res['output']['configuration-response-common']['response-message'])
108 # Path Computation success
109 def test_05_path_computation_rdm_bi(self):
110 response = test_utils.path_computation_request("request-1", "service-1",
111 {"node-id": "ROADMA01", "service-rate": "100",
112 "service-format": "Ethernet", "clli": "NodeA"},
113 {"node-id": "ROADMC01", "service-rate": "100", "service-format": "Ethernet", "clli": "NodeC"})
114 self.assertEqual(response.status_code, requests.codes.ok)
115 res = response.json()
116 self.assertIn('Path is calculated',
117 res['output']['configuration-response-common']['response-message'])
121 def test_06_delete_simple_topology_bi(self):
122 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
123 self.assertEqual(response.status_code, requests.codes.ok)
126 # Test deleted topology
127 def test_07_test_topology_simple_bi_deleted(self):
128 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
129 self.assertEqual(response.status_code, requests.codes.conflict)
132 # Load simple bidirectional topology
133 def test_08_load_simple_topology_uni(self):
134 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
135 self.assertEqual(response.status_code, 201)
138 # Get existing nodeId
139 def test_09_get_nodeId(self):
140 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
141 self.assertEqual(response.status_code, requests.codes.ok)
142 res = response.json()
144 res['node'][0]['node-id'],
148 # Get existing linkId
149 def test_10_get_linkId(self):
150 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
151 self.assertEqual(response.status_code, requests.codes.ok)
152 res = response.json()
154 res['ietf-network-topology:link'][0]['link-id'],
155 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
158 # Path Computation success
159 def test_11_path_computation_xpdr_uni(self):
160 response = test_utils.path_computation_request("request-1", "service-1",
161 {"node-id": "XPONDER-1-2", "service-rate": "100",
162 "service-format": "Ethernet", "clli": "ORANGE1"},
163 {"node-id": "XPONDER-3-2", "service-rate": "100", "service-format": "Ethernet", "clli": "ORANGE3"})
164 self.assertEqual(response.status_code, requests.codes.ok)
165 res = response.json()
166 self.assertIn('Path is calculated',
167 res['output']['configuration-response-common']['response-message'])
170 # Path Computation success
171 def test_12_path_computation_rdm_uni(self):
172 response = test_utils.path_computation_request("request1", "service1",
173 {"service-rate": "100", "service-format": "Ethernet",
174 "clli": "cll21", "node-id": "OpenROADM-2-1"},
175 {"service-rate": "100", "service-format": "Ethernet", "clli": "ncli22", "node-id": "OpenROADM-2-2"})
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
178 self.assertIn('Path is calculated',
179 res['output']['configuration-response-common']['response-message'])
181 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
182 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
183 self.assertEqual(atozList, 15)
184 self.assertEqual(ztoaList, 15)
185 for i in range(0, 15):
186 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
187 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
188 if (atoz['id'] == '14'):
189 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
190 if (ztoa['id'] == '0'):
191 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
195 def test_13_delete_simple_topology(self):
196 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
197 self.assertEqual(response.status_code, requests.codes.ok)
200 # Test deleted topology
201 def test_14_test_topology_simple_deleted(self):
202 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
203 self.assertEqual(response.status_code, requests.codes.conflict)
206 # Load complex topology
207 def test_15_load_complex_topology(self):
208 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
209 self.assertEqual(response.status_code, 201)
212 # Get existing nodeId
213 def test_16_get_nodeId(self):
214 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
215 self.assertEqual(response.status_code, requests.codes.ok)
216 res = response.json()
218 res['node'][0]['node-id'],
222 # Test failed path computation
223 def test_17_fail_path_computation(self):
224 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
225 {"input": {"service-handler-header": {"request-id": "request-1"}}})
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 self.assertIn('Service Name is not set',
229 res['output']['configuration-response-common']['response-message'])
232 # Test1 success path computation
233 def test_18_success1_path_computation(self):
234 response = test_utils.path_computation_request("request1", "service1",
235 {"service-format": "Ethernet", "service-rate": "100", "clli": "ORANGE2", "node-id": "XPONDER-2-2",
236 "tx-direction": {"port": {
237 "port-device-name": "Some port-device-name",
238 "port-type": "Some port-type",
239 "port-name": "Some port-name",
240 "port-rack": "Some port-rack",
241 "port-shelf": "Some port-shelf",
242 "port-slot": "Some port-slot",
243 "port-sub-slot": "Some port-sub-slot"
245 "rx-direction": {"port": {
246 "port-device-name": "Some port-device-name",
247 "port-type": "Some port-type",
248 "port-name": "Some port-name",
249 "port-rack": "Some port-rack",
250 "port-shelf": "Some port-shelf",
251 "port-slot": "Some port-slot",
252 "port-sub-slot": "Some port-sub-slot"
254 {"service-format": "Ethernet", "service-rate": "100", "clli": "ORANGE1", "node-id": "XPONDER-1-2",
255 "tx-direction": {"port": {
256 "port-device-name": "Some port-device-name",
257 "port-type": "Some port-type",
258 "port-name": "Some port-name",
259 "port-rack": "Some port-rack",
260 "port-shelf": "Some port-shelf",
261 "port-slot": "Some port-slot",
262 "port-sub-slot": "Some port-sub-slot"
264 "rx-direction": {"port": {
265 "port-device-name": "Some port-device-name",
266 "port-type": "Some port-type",
267 "port-name": "Some port-name",
268 "port-rack": "Some port-rack",
269 "port-shelf": "Some port-shelf",
270 "port-slot": "Some port-slot",
271 "port-sub-slot": "Some port-sub-slot"
273 {"customer-code": ["Some customer-code"],
274 "co-routing": {"existing-service": ["Some existing-service"]}},
275 {"customer-code": ["Some customer-code"],
276 "co-routing": {"existing-service": ["Some existing-service"]}},
277 "hop-count", {"locally-protected-links": "true"})
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
280 self.assertIn('Path is calculated',
281 res['output']['configuration-response-common']['response-message'])
284 # Test2 success path computation with path description
285 def test_19_success2_path_computation(self):
286 response = test_utils.path_computation_request("request 1", "service 1",
287 {"service-rate": "100", "service-format": "Ethernet",
288 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
289 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
290 self.assertEqual(response.status_code, requests.codes.ok)
291 res = response.json()
292 self.assertIn('Path is calculated',
293 res['output']['configuration-response-common']['response-message'])
294 self.assertEqual(5, res['output']['response-parameters']['path-description']
295 ['aToZ-direction']['aToZ-wavelength-number'])
296 self.assertEqual(5, res['output']['response-parameters']['path-description']
297 ['zToA-direction']['zToA-wavelength-number'])
300 # Test3 success path computation with hard-constraints exclude
301 def test_20_success3_path_computation(self):
302 response = test_utils.path_computation_request("request 1", "service 1",
303 {"service-rate": "100", "service-format": "Ethernet",
304 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
305 {"service-rate": "100", "service-format": "Ethernet",
306 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
307 {"exclude_": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 self.assertIn('Path is calculated',
311 res['output']['configuration-response-common']['response-message'])
312 self.assertEqual(9, res['output']['response-parameters']['path-description']
313 ['aToZ-direction']['aToZ-wavelength-number'])
314 self.assertEqual(9, res['output']['response-parameters']['path-description']
315 ['zToA-direction']['zToA-wavelength-number'])
318 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
319 def test_21_path_computation_before_oms_attribute_deletion(self):
320 response = test_utils.path_computation_request("request 1", "service 1",
321 {"service-rate": "100", "service-format": "Ethernet",
322 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
323 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 self.assertIn('Path is calculated',
327 res['output']['configuration-response-common']['response-message'])
328 nbElmPath = len(res['output']['response-parameters']['path-description']
329 ['aToZ-direction']['aToZ'])
330 self.assertEqual(31, nbElmPath)
331 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
333 for i in range(0, nbElmPath):
334 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
335 if(resource_i == link):
337 self.assertEqual(find, True)
340 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
341 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
342 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
343 self.assertEqual(response.status_code, requests.codes.ok)
346 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
347 def test_23_path_computation_after_oms_attribute_deletion(self):
348 response = test_utils.path_computation_request("request 1", "service 1",
349 {"service-rate": "100", "service-format": "Ethernet",
350 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
351 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
352 self.assertEqual(response.status_code, requests.codes.ok)
353 res = response.json()
354 self.assertIn('Path is calculated',
355 res['output']['configuration-response-common']['response-message'])
356 nbElmPath = len(res['output']['response-parameters']['path-description']
357 ['aToZ-direction']['aToZ'])
358 self.assertEqual(47, nbElmPath)
359 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
361 for i in range(0, nbElmPath):
362 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
363 if (resource_i == link):
365 self.assertNotEqual(find, True)
368 # Delete complex topology
369 def test_24_delete_complex_topology(self):
370 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
371 self.assertEqual(response.status_code, requests.codes.ok)
374 # Test deleted complex topology
375 def test_25_test_topology_complex_deleted(self):
376 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
377 self.assertEqual(response.status_code, requests.codes.conflict)
381 if __name__ == "__main__":
382 unittest.main(verbosity=2)