2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 from common import test_utils
22 class TransportPCEtesting(unittest.TestCase):
24 simple_topo_bi_dir_data = None
25 simple_topo_uni_dir_data = None
26 complex_topo_uni_dir_data = None
31 # pylint: disable=bare-except
33 sample_files_parsed = False
34 TOPO_BI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
35 "..", "..", "sample_configs", "honeynode-topo.xml")
36 with open(TOPO_BI_DIR_FILE, 'r') as topo_bi_dir:
37 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
39 TOPO_UNI_DIR_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
40 "..", "..", "sample_configs", "NW-simple-topology.xml")
42 with open(TOPO_UNI_DIR_FILE, 'r') as topo_uni_dir:
43 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
45 TOPO_UNI_DIR_COMPLEX_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)),
46 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
47 with open(TOPO_UNI_DIR_COMPLEX_FILE, 'r') as topo_uni_dir_complex:
48 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
49 sample_files_parsed = True
50 except PermissionError as err:
51 print("Permission Error when trying to read sample files\n", err)
53 except FileNotFoundError as err:
54 print("File Not found Error when trying to read sample files\n", err)
57 print("Unexpected error when trying to read sample files\n", sys.exc_info()[0])
60 if sample_files_parsed:
61 print("sample files content loaded")
63 cls.processes = test_utils.start_tpce()
66 def tearDownClass(cls):
67 # pylint: disable=not-an-iterable
68 for process in cls.processes:
69 test_utils.shutdown_process(process)
70 print("all processes killed")
72 def setUp(self): # instruction executed before each test method
75 # Load simple bidirectional topology
76 def test_01_load_simple_topology_bi(self):
77 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_bi_dir_data)
78 self.assertEqual(response.status_code, requests.codes.ok)
82 def test_02_get_nodeId(self):
83 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
84 self.assertEqual(response.status_code, requests.codes.ok)
87 res['node'][0]['node-id'], 'ROADMA01-SRG1')
91 def test_03_get_linkId(self):
92 response = test_utils.get_ordm_topo_request("link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX")
93 self.assertEqual(response.status_code, requests.codes.ok)
96 res['ietf-network-topology:link'][0]['link-id'],
97 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
100 # Path Computation success
101 def test_04_path_computation_xpdr_bi(self):
102 response = test_utils.path_computation_request("request-1", "service-1",
103 {"node-id": "XPDRA01", "service-rate": "100",
104 "service-format": "Ethernet", "clli": "nodeA"},
105 {"node-id": "XPDRC01", "service-rate": "100",
106 "service-format": "Ethernet", "clli": "nodeC"})
107 self.assertEqual(response.status_code, requests.codes.ok)
108 res = response.json()
109 self.assertIn('Path is calculated',
110 res['output']['configuration-response-common']['response-message'])
113 # Path Computation success
114 def test_05_path_computation_rdm_bi(self):
115 response = test_utils.path_computation_request("request-1", "service-1",
116 {"node-id": "ROADMA01", "service-rate": "100",
117 "service-format": "Ethernet", "clli": "NodeA"},
118 {"node-id": "ROADMC01", "service-rate": "100",
119 "service-format": "Ethernet", "clli": "NodeC"})
120 self.assertEqual(response.status_code, requests.codes.ok)
121 res = response.json()
122 self.assertIn('Path is calculated',
123 res['output']['configuration-response-common']['response-message'])
127 def test_06_delete_simple_topology_bi(self):
128 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
129 self.assertEqual(response.status_code, requests.codes.ok)
132 # Test deleted topology
133 def test_07_test_topology_simple_bi_deleted(self):
134 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
135 self.assertEqual(response.status_code, requests.codes.conflict)
138 # Load simple bidirectional topology
139 def test_08_load_simple_topology_uni(self):
140 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.simple_topo_uni_dir_data)
141 self.assertEqual(response.status_code, 201)
144 # Get existing nodeId
145 def test_09_get_nodeId(self):
146 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
150 res['node'][0]['node-id'],
154 # Get existing linkId
155 def test_10_get_linkId(self):
156 response = test_utils.get_ordm_topo_request("link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX")
157 self.assertEqual(response.status_code, requests.codes.ok)
158 res = response.json()
160 res['ietf-network-topology:link'][0]['link-id'],
161 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
164 # Path Computation success
165 def test_11_path_computation_xpdr_uni(self):
166 response = test_utils.path_computation_request("request-1", "service-1",
167 {"node-id": "XPONDER-1-2", "service-rate": "100",
168 "service-format": "Ethernet", "clli": "ORANGE1"},
169 {"node-id": "XPONDER-3-2", "service-rate": "100",
170 "service-format": "Ethernet", "clli": "ORANGE3"})
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
173 self.assertIn('Path is calculated',
174 res['output']['configuration-response-common']['response-message'])
177 # Path Computation success
178 def test_12_path_computation_rdm_uni(self):
179 response = test_utils.path_computation_request("request1", "service1",
180 {"service-rate": "100", "service-format": "Ethernet",
181 "clli": "cll21", "node-id": "OpenROADM-2-1"},
182 {"service-rate": "100", "service-format": "Ethernet",
183 "clli": "ncli22", "node-id": "OpenROADM-2-2"})
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 self.assertIn('Path is calculated',
187 res['output']['configuration-response-common']['response-message'])
189 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
190 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
191 self.assertEqual(atozList, 15)
192 self.assertEqual(ztoaList, 15)
193 for i in range(0, 15):
194 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
195 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
196 if atoz['id'] == '14':
197 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
198 if ztoa['id'] == '0':
199 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
203 def test_13_delete_simple_topology(self):
204 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
205 self.assertEqual(response.status_code, requests.codes.ok)
208 # Test deleted topology
209 def test_14_test_topology_simple_deleted(self):
210 response = test_utils.get_ordm_topo_request("node/XPONDER-1-2")
211 self.assertEqual(response.status_code, requests.codes.conflict)
214 # Load complex topology
215 def test_15_load_complex_topology(self):
216 response = test_utils.put_xmlrequest(test_utils.URL_CONFIG_ORDM_TOPO, self.complex_topo_uni_dir_data)
217 self.assertEqual(response.status_code, 201)
220 # Get existing nodeId
221 def test_16_get_nodeId(self):
222 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
226 res['node'][0]['node-id'],
230 # Test failed path computation
231 def test_17_fail_path_computation(self):
232 response = test_utils.post_request(test_utils.URL_PATH_COMPUTATION_REQUEST,
233 {"input": {"service-handler-header": {"request-id": "request-1"}}})
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
236 self.assertIn('Service Name is not set',
237 res['output']['configuration-response-common']['response-message'])
240 # Test1 success path computation
241 def test_18_success1_path_computation(self):
242 response = test_utils.path_computation_request("request1", "service1",
243 {"service-format": "Ethernet", "service-rate": "100",
244 "clli": "ORANGE2", "node-id": "XPONDER-2-2",
245 "tx-direction": {"port": {
246 "port-device-name": "Some port-device-name",
247 "port-type": "Some port-type",
248 "port-name": "Some port-name",
249 "port-rack": "Some port-rack",
250 "port-shelf": "Some port-shelf",
251 "port-slot": "Some port-slot",
252 "port-sub-slot": "Some port-sub-slot"
254 "rx-direction": {"port": {
255 "port-device-name": "Some port-device-name",
256 "port-type": "Some port-type",
257 "port-name": "Some port-name",
258 "port-rack": "Some port-rack",
259 "port-shelf": "Some port-shelf",
260 "port-slot": "Some port-slot",
261 "port-sub-slot": "Some port-sub-slot"
263 {"service-format": "Ethernet", "service-rate": "100",
264 "clli": "ORANGE1", "node-id": "XPONDER-1-2",
265 "tx-direction": {"port": {
266 "port-device-name": "Some port-device-name",
267 "port-type": "Some port-type",
268 "port-name": "Some port-name",
269 "port-rack": "Some port-rack",
270 "port-shelf": "Some port-shelf",
271 "port-slot": "Some port-slot",
272 "port-sub-slot": "Some port-sub-slot"
274 "rx-direction": {"port": {
275 "port-device-name": "Some port-device-name",
276 "port-type": "Some port-type",
277 "port-name": "Some port-name",
278 "port-rack": "Some port-rack",
279 "port-shelf": "Some port-shelf",
280 "port-slot": "Some port-slot",
281 "port-sub-slot": "Some port-sub-slot"
283 {"customer-code": ["Some customer-code"],
284 "co-routing": {"existing-service": ["Some existing-service"]}
286 {"customer-code": ["Some customer-code"],
287 "co-routing": {"existing-service": ["Some existing-service"]}
289 "hop-count", {"locally-protected-links": "true"})
290 self.assertEqual(response.status_code, requests.codes.ok)
291 res = response.json()
292 self.assertIn('Path is calculated',
293 res['output']['configuration-response-common']['response-message'])
296 # Test2 success path computation with path description
297 def test_19_success2_path_computation(self):
298 response = test_utils.path_computation_request("request 1", "service 1",
299 {"service-rate": "100", "service-format": "Ethernet",
300 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
301 {"service-rate": "100", "service-format": "Ethernet",
302 "node-id": "XPONDER-3-2", "clli": "ORANGE3"})
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
305 self.assertIn('Path is calculated',
306 res['output']['configuration-response-common']['response-message'])
307 self.assertEqual(5, res['output']['response-parameters']['path-description']
308 ['aToZ-direction']['aToZ-wavelength-number'])
309 self.assertEqual(5, res['output']['response-parameters']['path-description']
310 ['zToA-direction']['zToA-wavelength-number'])
313 # Test3 success path computation with hard-constraints exclude
314 def test_20_success3_path_computation(self):
315 response = test_utils.path_computation_request("request 1", "service 1",
316 {"service-rate": "100", "service-format": "Ethernet",
317 "node-id": "XPONDER-1-2", "clli": "ORANGE1"},
318 {"service-rate": "100", "service-format": "Ethernet",
319 "node-id": "XPONDER-3-2", "clli": "ORANGE3"},
320 {"exclude_": {"node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]}})
321 self.assertEqual(response.status_code, requests.codes.ok)
322 res = response.json()
323 self.assertIn('Path is calculated',
324 res['output']['configuration-response-common']['response-message'])
325 self.assertEqual(9, res['output']['response-parameters']['path-description']
326 ['aToZ-direction']['aToZ-wavelength-number'])
327 self.assertEqual(9, res['output']['response-parameters']['path-description']
328 ['zToA-direction']['zToA-wavelength-number'])
331 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
332 def test_21_path_computation_before_oms_attribute_deletion(self):
333 response = test_utils.path_computation_request("request 1", "service 1",
334 {"service-rate": "100", "service-format": "Ethernet",
335 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
336 {"service-rate": "100", "service-format": "Ethernet",
337 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
338 self.assertEqual(response.status_code, requests.codes.ok)
339 res = response.json()
340 self.assertIn('Path is calculated',
341 res['output']['configuration-response-common']['response-message'])
342 nbElmPath = len(res['output']['response-parameters']['path-description']
343 ['aToZ-direction']['aToZ'])
344 self.assertEqual(31, nbElmPath)
345 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
347 for i in range(0, nbElmPath):
348 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
350 if resource_i == link:
352 self.assertEqual(find, True)
355 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
356 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
357 response = test_utils.del_oms_attr_request("OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2")
358 self.assertEqual(response.status_code, requests.codes.ok)
361 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
362 def test_23_path_computation_after_oms_attribute_deletion(self):
363 response = test_utils.path_computation_request("request 1", "service 1",
364 {"service-rate": "100", "service-format": "Ethernet",
365 "node-id": "XPONDER-2-2", "clli": "ORANGE2"},
366 {"service-rate": "100", "service-format": "Ethernet",
367 "node-id": "XPONDER-1-2", "clli": "ORANGE1"})
368 self.assertEqual(response.status_code, requests.codes.ok)
369 res = response.json()
370 self.assertIn('Path is calculated',
371 res['output']['configuration-response-common']['response-message'])
372 nbElmPath = len(res['output']['response-parameters']['path-description']
373 ['aToZ-direction']['aToZ'])
374 self.assertEqual(47, nbElmPath)
375 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
377 for i in range(0, nbElmPath):
378 resource_i = (res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
380 if resource_i == link:
382 self.assertNotEqual(find, True)
385 # Delete complex topology
386 def test_24_delete_complex_topology(self):
387 response = test_utils.delete_request(test_utils.URL_CONFIG_ORDM_TOPO)
388 self.assertEqual(response.status_code, requests.codes.ok)
391 # Test deleted complex topology
392 def test_25_test_topology_complex_deleted(self):
393 response = test_utils.get_ordm_topo_request("node/XPONDER-3-2")
394 self.assertEqual(response.status_code, requests.codes.conflict)
398 if __name__ == "__main__":
399 unittest.main(verbosity=2)