2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 from common import test_utils
22 class TransportPCEtesting(unittest.TestCase):
24 simple_topo_bi_dir_data = None
25 simple_topo_uni_dir_data = None
26 complex_topo_uni_dir_data = None
32 sample_files_parsed = False
33 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
34 "..", "..", "sample_configs", "honeynode-topo.xml")
35 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
36 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
38 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
39 "..", "..", "sample_configs", "NW-simple-topology.xml")
41 with open(TOPO_UNI_DIR_FILE, 'r') as topo_uni_dir:
42 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
44 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
45 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
46 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r') as topo_uni_dir_complex:
47 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
48 sample_files_parsed = True
49 except PermissionError as err:
50 print("Permission Error when trying to read sample files\n", err)
52 except FileNotFoundError as err:
53 print("File Not found Error when trying to read sample files\n", err)
56 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
59 if sample_files_parsed:
60 print("sample files content loaded")
62 cls.processes = test_utils.start_tpce()
65 def tearDownClass(cls):
66 for process in cls.processes:
67 test_utils.shutdown_process(process)
68 print("all processes killed")
70 def setUp(self): # instruction executed before each test method
73 # Load simple bidirectional topology
74 def test_01_load_simple_topology_bi(self):
75 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
76 self.assertEqual(response.status_code, requests.codes.ok)
80 def test_02_get_nodeId(self):
81 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
82 self.assertEqual(response.status_code, requests.codes.ok)
85 res['node'][0]['node-id'], 'ROADMA01-SRG1')
89 def test_03_get_linkId(self):
90 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
91 self.assertEqual(response.status_code, requests.codes.ok)
94 res['ietf-network-topology:link'][0]['link-id'],
95 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
98 # Path Computation success
99 def test_04_path_computation_xpdr_bi(self):
100 response = test_utils.path_computation_request("request-1", "service-1",
101 {"node-id": "XPDRA01", "service-rate": "100",
102 "service-format": "Ethernet", "clli": "nodeA"},
103 {"node-id": "XPDRC01", "service-rate": "100", "service-format": "Ethernet", "clli": "nodeC"})
104 self.assertEqual(response.status_code, requests.codes.ok)
105 res = response.json()
106 self.assertIn('Path is calculated',
107 res['output']['configuration-response-common']['response-message'])
110 # Path Computation success
111 def test_05_path_computation_rdm_bi(self):
112 response = test_utils.path_computation_request("request-1", "service-1",
113 {"node-id": "ROADMA01", "service-rate": "100",
114 "service-format": "Ethernet", "clli": "NodeA"},
115 {"node-id": "ROADMC01", "service-rate": "100", "service-format": "Ethernet", "clli": "NodeC"})
116 self.assertEqual(response.status_code, requests.codes.ok)
117 res = response.json()
118 self.assertIn('Path is calculated',
119 res['output']['configuration-response-common']['response-message'])
123 def test_06_delete_simple_topology_bi(self):
124 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
125 self.assertEqual(response.status_code, requests.codes.ok)
128 # Test deleted topology
129 def test_07_test_topology_simple_bi_deleted(self):
130 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
131 self.assertEqual(response.status_code, requests.codes.conflict)
134 # Load simple bidirectional topology
135 def test_08_load_simple_topology_uni(self):
136 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
137 self.assertEqual(response.status_code, 201)
140 # Get existing nodeId
141 def test_09_get_nodeId(self):
142 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
143 self.assertEqual(response.status_code, requests.codes.ok)
144 res = response.json()
146 res['node'][0]['node-id'],
150 # Get existing linkId
151 def test_10_get_linkId(self):
152 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
153 self.assertEqual(response.status_code, requests.codes.ok)
154 res = response.json()
156 res['ietf-network-topology:link'][0]['link-id'],
157 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
160 # Path Computation success
161 def test_11_path_computation_xpdr_uni(self):
162 response = test_utils.path_computation_request("request-1", "service-1",
163 {"node-id": "XPONDER-1-2", "service-rate": "100",
164 "service-format": "Ethernet", "clli": "ORANGE1"},
165 {"node-id": "XPONDER-3-2", "service-rate": "100", "service-format": "Ethernet", "clli": "ORANGE3"})
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertIn('Path is calculated',
169 res['output']['configuration-response-common']['response-message'])
172 # Path Computation success
173 def test_12_path_computation_rdm_uni(self):
174 response = test_utils.path_computation_request("request1", "service1",
175 {"service-rate": "100", "service-format": "Ethernet",
176 "clli": "cll21", "node-id": "OpenROADM-2-1"},
177 {"service-rate": "100", "service-format": "Ethernet", "clli": "ncli22", "node-id": "OpenROADM-2-2"})
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Path is calculated',
181 res['output']['configuration-response-common']['response-message'])
183 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
184 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
185 self.assertEqual(atozList, 15)
186 self.assertEqual(ztoaList, 15)
187 for i in range(0, 15):
188 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
189 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
190 if (atoz['id'] == '14'):
191 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
192 if (ztoa['id'] == '0'):
193 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
197 def test_13_delete_simple_topology(self):
198 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
199 self.assertEqual(response.status_code, requests.codes.ok)
202 # Test deleted topology
203 def test_14_test_topology_simple_deleted(self):
204 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
205 self.assertEqual(response.status_code, requests.codes.conflict)
208 # Load complex topology
209 def test_15_load_complex_topology(self):
210 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
211 self.assertEqual(response.status_code, 201)
214 # Get existing nodeId
215 def test_16_get_nodeId(self):
216 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
217 self.assertEqual(response.status_code, requests.codes.ok)
218 res = response.json()
220 res['node'][0]['node-id'],
224 # Test failed path computation
225 def test_17_fail_path_computation(self):
226 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
227 {"input": {"service-handler-header": {"request-id": "request-1"}}})
228 self.assertEqual(response.status_code, requests.codes.ok)
229 res = response.json()
230 self.assertIn('Service Name is not set',
231 res['output']['configuration-response-common']['response-message'])
234 # Test1 success path computation
235 def test_18_success1_path_computation(self):
236 response = test_utils.path_computation_request("request1", "service1",
237 {"service-format": "Ethernet", "service-rate": "100", "clli": "ORANGE2", "node-id": "XPONDER-2-2",
238 "tx-direction": {"port": {
239 "port-device-name": "Some port-device-name",
240 "port-type": "Some port-type",
241 "port-name": "Some port-name",
242 "port-rack": "Some port-rack",
243 "port-shelf": "Some port-shelf",
244 "port-slot": "Some port-slot",
245 "port-sub-slot": "Some port-sub-slot"
247 "rx-direction": {"port": {
248 "port-device-name": "Some port-device-name",
249 "port-type": "Some port-type",
250 "port-name": "Some port-name",
251 "port-rack": "Some port-rack",
252 "port-shelf": "Some port-shelf",
253 "port-slot": "Some port-slot",
254 "port-sub-slot": "Some port-sub-slot"
256 {"service-format": "Ethernet", "service-rate": "100", "clli": "ORANGE1", "node-id": "XPONDER-1-2",
257 "tx-direction": {"port": {
258 "port-device-name": "Some port-device-name",
259 "port-type": "Some port-type",
260 "port-name": "Some port-name",
261 "port-rack": "Some port-rack",
262 "port-shelf": "Some port-shelf",
263 "port-slot": "Some port-slot",
264 "port-sub-slot": "Some port-sub-slot"
266 "rx-direction": {"port": {
267 "port-device-name": "Some port-device-name",
268 "port-type": "Some port-type",
269 "port-name": "Some port-name",
270 "port-rack": "Some port-rack",
271 "port-shelf": "Some port-shelf",
272 "port-slot": "Some port-slot",
273 "port-sub-slot": "Some port-sub-slot"
275 {"customer-code": ["Some customer-code"],
276 "co-routing": {"existing-service": ["Some existing-service"]}},
277 {"customer-code": ["Some customer-code"],
278 "co-routing": {"existing-service": ["Some existing-service"]}},
279 "hop-count", {"locally-protected-links": "true"})
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
282 self.assertIn('Path is calculated',
283 res['output']['configuration-response-common']['response-message'])
286 # Test2 success path computation with path description
287 def test_19_success2_path_computation(self):
288 response = test_utils.path_computation_request("request 1", "service 1",
289 {"service-rate": "100", "service-format": "Ethernet",
290 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
291 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
292 self.assertEqual(response.status_code, requests.codes.ok)
293 res = response.json()
294 self.assertIn('Path is calculated',
295 res['output']['configuration-response-common']['response-message'])
296 self.assertEqual(5, res['output']['response-parameters']['path-description']
297 ['aToZ-direction']['aToZ-wavelength-number'])
298 self.assertEqual(5, res['output']['response-parameters']['path-description']
299 ['zToA-direction']['zToA-wavelength-number'])
302 # Test3 success path computation with hard-constraints exclude
303 def test_20_success3_path_computation(self):
304 response = test_utils.path_computation_request("request 1", "service 1",
305 {"service-rate": "100", "service-format": "Ethernet",
306 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
307 {"service-rate": "100", "service-format": "Ethernet",
308 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
309 {"exclude_": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 self.assertIn('Path is calculated',
313 res['output']['configuration-response-common']['response-message'])
314 self.assertEqual(9, res['output']['response-parameters']['path-description']
315 ['aToZ-direction']['aToZ-wavelength-number'])
316 self.assertEqual(9, res['output']['response-parameters']['path-description']
317 ['zToA-direction']['zToA-wavelength-number'])
320 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
321 def test_21_path_computation_before_oms_attribute_deletion(self):
322 response = test_utils.path_computation_request("request 1", "service 1",
323 {"service-rate": "100", "service-format": "Ethernet",
324 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
325 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
326 self.assertEqual(response.status_code, requests.codes.ok)
327 res = response.json()
328 self.assertIn('Path is calculated',
329 res['output']['configuration-response-common']['response-message'])
330 nbElmPath = len(res['output']['response-parameters']['path-description']
331 ['aToZ-direction']['aToZ'])
332 self.assertEqual(31, nbElmPath)
333 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
335 for i in range(0, nbElmPath):
336 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
337 if(resource_i == link):
339 self.assertEqual(find, True)
342 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
343 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
344 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
345 self.assertEqual(response.status_code, requests.codes.ok)
348 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
349 def test_23_path_computation_after_oms_attribute_deletion(self):
350 response = test_utils.path_computation_request("request 1", "service 1",
351 {"service-rate": "100", "service-format": "Ethernet",
352 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
353 {"service-rate": "100", "service-format": "Ethernet", "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertIn('Path is calculated',
357 res['output']['configuration-response-common']['response-message'])
358 nbElmPath = len(res['output']['response-parameters']['path-description']
359 ['aToZ-direction']['aToZ'])
360 self.assertEqual(47, nbElmPath)
361 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
363 for i in range(0, nbElmPath):
364 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
365 if (resource_i == link):
367 self.assertNotEqual(find, True)
370 # Delete complex topology
371 def test_24_delete_complex_topology(self):
372 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
373 self.assertEqual(response.status_code, requests.codes.ok)
376 # Test deleted complex topology
377 def test_25_test_topology_complex_deleted(self):
378 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
379 self.assertEqual(response.status_code, requests.codes.conflict)
383 if __name__ == "__main__":
384 unittest.main(verbosity=2)