2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 from common import test_utils
22 class TransportPCEtesting(unittest.TestCase):
24 simple_topo_bi_dir_data = None
25 simple_topo_uni_dir_data = None
26 complex_topo_uni_dir_data = None
32 sample_files_parsed = False
33 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
34 "..", "..", "sample_configs", "honeynode-topo.xml")
35 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
36 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
38 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
39 "..", "..", "sample_configs", "NW-simple-topology.xml")
41 with open(TOPO_UNI_DIR_FILE, 'r') as topo_uni_dir:
42 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
44 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
45 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
46 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r') as topo_uni_dir_complex:
47 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
48 sample_files_parsed = True
49 except PermissionError as err:
50 print("Permission Error when trying to read sample files\n", err)
52 except FileNotFoundError as err:
53 print("File Not found Error when trying to read sample files\n", err)
56 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
59 if sample_files_parsed:
60 print("sample files content loaded")
62 cls.processes = test_utils.start_tpce()
65 def tearDownClass(cls):
66 for process in cls.processes:
67 test_utils.shutdown_process(process)
68 print("all processes killed")
70 def setUp(self): # instruction executed before each test method
73 # Load simple bidirectional topology
74 def test_01_load_simple_topology_bi(self):
75 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
76 self.assertEqual(response.status_code, requests.codes.ok)
80 def test_02_get_nodeId(self):
81 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
82 self.assertEqual(response.status_code, requests.codes.ok)
85 res['node'][0]['node-id'], 'ROADMA01-SRG1')
89 def test_03_get_linkId(self):
90 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
91 self.assertEqual(response.status_code, requests.codes.ok)
94 res['ietf-network-topology:link'][0]['link-id'],
95 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
98 # Path Computation success
99 def test_04_path_computation_xpdr_bi(self):
100 response = test_utils.path_computation_request("request-1", "service-1",
101 {"node-id": "XPDRA01", "service-rate": "100",
102 "service-format": "Ethernet", "clli": "nodeA"},
103 {"node-id": "XPDRC01", "service-rate": "100",
104 "service-format": "Ethernet", "clli": "nodeC"})
105 self.assertEqual(response.status_code, requests.codes.ok)
106 res = response.json()
107 self.assertIn('Path is calculated',
108 res['output']['configuration-response-common']['response-message'])
111 # Path Computation success
112 def test_05_path_computation_rdm_bi(self):
113 response = test_utils.path_computation_request("request-1", "service-1",
114 {"node-id": "ROADMA01", "service-rate": "100",
115 "service-format": "Ethernet", "clli": "NodeA"},
116 {"node-id": "ROADMC01", "service-rate": "100",
117 "service-format": "Ethernet", "clli": "NodeC"})
118 self.assertEqual(response.status_code, requests.codes.ok)
119 res = response.json()
120 self.assertIn('Path is calculated',
121 res['output']['configuration-response-common']['response-message'])
125 def test_06_delete_simple_topology_bi(self):
126 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
127 self.assertEqual(response.status_code, requests.codes.ok)
130 # Test deleted topology
131 def test_07_test_topology_simple_bi_deleted(self):
132 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
133 self.assertEqual(response.status_code, requests.codes.conflict)
136 # Load simple bidirectional topology
137 def test_08_load_simple_topology_uni(self):
138 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
139 self.assertEqual(response.status_code, 201)
142 # Get existing nodeId
143 def test_09_get_nodeId(self):
144 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
145 self.assertEqual(response.status_code, requests.codes.ok)
146 res = response.json()
148 res['node'][0]['node-id'],
152 # Get existing linkId
153 def test_10_get_linkId(self):
154 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
155 self.assertEqual(response.status_code, requests.codes.ok)
156 res = response.json()
158 res['ietf-network-topology:link'][0]['link-id'],
159 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
162 # Path Computation success
163 def test_11_path_computation_xpdr_uni(self):
164 response = test_utils.path_computation_request("request-1", "service-1",
165 {"node-id": "XPONDER-1-2", "service-rate": "100",
166 "service-format": "Ethernet", "clli": "ORANGE1"},
167 {"node-id": "XPONDER-3-2", "service-rate": "100",
168 "service-format": "Ethernet", "clli": "ORANGE3"})
169 self.assertEqual(response.status_code, requests.codes.ok)
170 res = response.json()
171 self.assertIn('Path is calculated',
172 res['output']['configuration-response-common']['response-message'])
175 # Path Computation success
176 def test_12_path_computation_rdm_uni(self):
177 response = test_utils.path_computation_request("request1", "service1",
178 {"service-rate": "100", "service-format": "Ethernet",
179 "clli": "cll21", "node-id": "OpenROADM-2-1"},
180 {"service-rate": "100", "service-format": "Ethernet",
181 "clli": "ncli22", "node-id": "OpenROADM-2-2"})
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
184 self.assertIn('Path is calculated',
185 res['output']['configuration-response-common']['response-message'])
187 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
188 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
189 self.assertEqual(atozList, 15)
190 self.assertEqual(ztoaList, 15)
191 for i in range(0, 15):
192 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
193 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
194 if (atoz['id'] == '14'):
195 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
196 if (ztoa['id'] == '0'):
197 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
201 def test_13_delete_simple_topology(self):
202 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
203 self.assertEqual(response.status_code, requests.codes.ok)
206 # Test deleted topology
207 def test_14_test_topology_simple_deleted(self):
208 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
209 self.assertEqual(response.status_code, requests.codes.conflict)
212 # Load complex topology
213 def test_15_load_complex_topology(self):
214 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
215 self.assertEqual(response.status_code, 201)
218 # Get existing nodeId
219 def test_16_get_nodeId(self):
220 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
224 res['node'][0]['node-id'],
228 # Test failed path computation
229 def test_17_fail_path_computation(self):
230 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
231 {"input": {"service-handler-header": {"request-id": "request-1"}}})
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
234 self.assertIn('Service Name is not set',
235 res['output']['configuration-response-common']['response-message'])
238 # Test1 success path computation
239 def test_18_success1_path_computation(self):
240 response = test_utils.path_computation_request("request1", "service1",
241 {"service-format": "Ethernet", "service-rate": "100",
242 "clli": "ORANGE2", "node-id": "XPONDER-2-2",
243 "tx-direction": {"port": {
244 "port-device-name": "Some port-device-name",
245 "port-type": "Some port-type",
246 "port-name": "Some port-name",
247 "port-rack": "Some port-rack",
248 "port-shelf": "Some port-shelf",
249 "port-slot": "Some port-slot",
250 "port-sub-slot": "Some port-sub-slot"
252 "rx-direction": {"port": {
253 "port-device-name": "Some port-device-name",
254 "port-type": "Some port-type",
255 "port-name": "Some port-name",
256 "port-rack": "Some port-rack",
257 "port-shelf": "Some port-shelf",
258 "port-slot": "Some port-slot",
259 "port-sub-slot": "Some port-sub-slot"
261 {"service-format": "Ethernet", "service-rate": "100",
262 "clli": "ORANGE1", "node-id": "XPONDER-1-2",
263 "tx-direction": {"port": {
264 "port-device-name": "Some port-device-name",
265 "port-type": "Some port-type",
266 "port-name": "Some port-name",
267 "port-rack": "Some port-rack",
268 "port-shelf": "Some port-shelf",
269 "port-slot": "Some port-slot",
270 "port-sub-slot": "Some port-sub-slot"
272 "rx-direction": {"port": {
273 "port-device-name": "Some port-device-name",
274 "port-type": "Some port-type",
275 "port-name": "Some port-name",
276 "port-rack": "Some port-rack",
277 "port-shelf": "Some port-shelf",
278 "port-slot": "Some port-slot",
279 "port-sub-slot": "Some port-sub-slot"
281 {"customer-code": ["Some customer-code"],
282 "co-routing": {"existing-service": ["Some existing-service"]}
284 {"customer-code": ["Some customer-code"],
285 "co-routing": {"existing-service": ["Some existing-service"]}
287 "hop-count", {"locally-protected-links": "true"})
288 self.assertEqual(response.status_code, requests.codes.ok)
289 res = response.json()
290 self.assertIn('Path is calculated',
291 res['output']['configuration-response-common']['response-message'])
294 # Test2 success path computation with path description
295 def test_19_success2_path_computation(self):
296 response = test_utils.path_computation_request("request 1", "service 1",
297 {"service-rate": "100", "service-format": "Ethernet",
298 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
299 {"service-rate": "100", "service-format": "Ethernet",
300 "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
301 self.assertEqual(response.status_code, requests.codes.ok)
302 res = response.json()
303 self.assertIn('Path is calculated',
304 res['output']['configuration-response-common']['response-message'])
305 self.assertEqual(5, res['output']['response-parameters']['path-description']
306 ['aToZ-direction']['aToZ-wavelength-number'])
307 self.assertEqual(5, res['output']['response-parameters']['path-description']
308 ['zToA-direction']['zToA-wavelength-number'])
311 # Test3 success path computation with hard-constraints exclude
312 def test_20_success3_path_computation(self):
313 response = test_utils.path_computation_request("request 1", "service 1",
314 {"service-rate": "100", "service-format": "Ethernet",
315 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
316 {"service-rate": "100", "service-format": "Ethernet",
317 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
318 {"exclude_": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
319 self.assertEqual(response.status_code, requests.codes.ok)
320 res = response.json()
321 self.assertIn('Path is calculated',
322 res['output']['configuration-response-common']['response-message'])
323 self.assertEqual(9, res['output']['response-parameters']['path-description']
324 ['aToZ-direction']['aToZ-wavelength-number'])
325 self.assertEqual(9, res['output']['response-parameters']['path-description']
326 ['zToA-direction']['zToA-wavelength-number'])
329 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
330 def test_21_path_computation_before_oms_attribute_deletion(self):
331 response = test_utils.path_computation_request("request 1", "service 1",
332 {"service-rate": "100", "service-format": "Ethernet",
333 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
334 {"service-rate": "100", "service-format": "Ethernet",
335 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
336 self.assertEqual(response.status_code, requests.codes.ok)
337 res = response.json()
338 self.assertIn('Path is calculated',
339 res['output']['configuration-response-common']['response-message'])
340 nbElmPath = len(res['output']['response-parameters']['path-description']
341 ['aToZ-direction']['aToZ'])
342 self.assertEqual(31, nbElmPath)
343 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
345 for i in range(0, nbElmPath):
346 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
348 if(resource_i == link):
350 self.assertEqual(find, True)
353 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
354 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
355 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
356 self.assertEqual(response.status_code, requests.codes.ok)
359 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
360 def test_23_path_computation_after_oms_attribute_deletion(self):
361 response = test_utils.path_computation_request("request 1", "service 1",
362 {"service-rate": "100", "service-format": "Ethernet",
363 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
364 {"service-rate": "100", "service-format": "Ethernet",
365 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 self.assertIn('Path is calculated',
369 res['output']['configuration-response-common']['response-message'])
370 nbElmPath = len(res['output']['response-parameters']['path-description']
371 ['aToZ-direction']['aToZ'])
372 self.assertEqual(47, nbElmPath)
373 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
375 for i in range(0, nbElmPath):
376 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
378 if (resource_i == link):
380 self.assertNotEqual(find, True)
383 # Delete complex topology
384 def test_24_delete_complex_topology(self):
385 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
386 self.assertEqual(response.status_code, requests.codes.ok)
389 # Test deleted complex topology
390 def test_25_test_topology_complex_deleted(self):
391 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
392 self.assertEqual(response.status_code, requests.codes.conflict)
396 if __name__ == "__main__":
397 unittest.main(verbosity=2)