2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
21 simple_topo_bi_dir_data = None
22 simple_topo_uni_dir_data = None
23 complex_topo_uni_dir_data = None
27 topo_bi_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
28 "..", "..", "sample_configs", "honeynode-topo.xml")
29 if os.path.isfile(topo_bi_dir_file):
30 with open(topo_bi_dir_file, 'r') as topo_bi_dir:
31 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
32 topo_uni_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
33 "..", "..", "sample_configs", "NW-simple-topology.xml")
34 if os.path.isfile(topo_uni_dir_file):
35 with open(topo_uni_dir_file, 'r') as topo_uni_dir:
36 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
37 topo_uni_dir_complex_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
38 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
39 if os.path.isfile(topo_uni_dir_complex_file):
40 with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
41 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
48 cls.processes = test_utils.start_tpce()
51 def tearDownClass(cls):
52 for process in cls.processes:
53 test_utils.shutdown_process(process)
54 print("all processes killed")
56 def setUp(self): # instruction executed before each test method
59 # Load simple bidirectional topology
60 def test_01_load_simple_topology_bi(self):
61 url = "{}/config/ietf-network:networks/network/openroadm-topology"
62 data = self.simple_topo_bi_dir_data
63 response = test_utils.put_xmlrequest(url, data)
64 self.assertEqual(response.status_code, requests.codes.ok)
68 def test_02_get_nodeId(self):
69 url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
70 response = test_utils.get_request(url)
71 self.assertEqual(response.status_code, requests.codes.ok)
74 res['node'][0]['node-id'], 'ROADMA01-SRG1')
78 def test_03_get_linkId(self):
79 url = "{}/config/ietf-network:networks/network/openroadm-topology/link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
80 response = test_utils.get_request(url)
81 self.assertEqual(response.status_code, requests.codes.ok)
84 res['ietf-network-topology:link'][0]['link-id'],
85 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
88 # Path Computation success
89 def test_04_path_computation_xpdr_bi(self):
90 url = "{}/operations/transportpce-pce:path-computation-request"
92 "service-name": "service-1",
93 "resource-reserve": "true",
94 "pce-metric": "hop-count",
95 "service-handler-header": {
96 "request-id": "request-1"
100 "service-rate": "100",
101 "service-format": "Ethernet",
105 "node-id": "XPDRC01",
106 "service-rate": "100",
107 "service-format": "Ethernet",
112 response = test_utils.post_request(url, data)
113 self.assertEqual(response.status_code, requests.codes.ok)
114 res = response.json()
115 self.assertIn('Path is calculated',
116 res['output']['configuration-response-common']['response-message'])
119 # Path Computation success
120 def test_05_path_computation_rdm_bi(self):
121 url = "{}/operations/transportpce-pce:path-computation-request"
123 "service-name": "service-1",
124 "resource-reserve": "true",
125 "pce-metric": "hop-count",
126 "service-handler-header": {
127 "request-id": "request-1"
130 "node-id": "ROADMA01",
131 "service-rate": "100",
132 "service-format": "Ethernet",
136 "node-id": "ROADMC01",
137 "service-rate": "100",
138 "service-format": "Ethernet",
143 response = test_utils.post_request(url, data)
144 self.assertEqual(response.status_code, requests.codes.ok)
145 res = response.json()
146 self.assertIn('Path is calculated',
147 res['output']['configuration-response-common']['response-message'])
151 def test_06_delete_simple_topology_bi(self):
152 url = "{}/config/ietf-network:networks/network/openroadm-topology"
153 response = test_utils.delete_request(url)
154 self.assertEqual(response.status_code, requests.codes.ok)
157 # Test deleted topology
158 def test_07_test_topology_simple_bi_deleted(self):
159 url = "{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
160 response = test_utils.get_request(url)
161 self.assertEqual(response.status_code, 404)
164 # Load simple bidirectional topology
165 def test_08_load_simple_topology_uni(self):
166 url = "{}/config/ietf-network:networks/network/openroadm-topology"
167 data = self.simple_topo_uni_dir_data
168 response = test_utils.put_xmlrequest(url, data)
169 self.assertEqual(response.status_code, 201)
172 # Get existing nodeId
173 def test_09_get_nodeId(self):
174 url = "{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
175 response = test_utils.get_request(url)
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
179 res['node'][0]['node-id'],
183 # Get existing linkId
184 def test_10_get_linkId(self):
185 url = "{}/config/ietf-network:networks/network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
186 response = test_utils.get_request(url)
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
190 res['ietf-network-topology:link'][0]['link-id'],
191 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
194 # Path Computation success
195 def test_11_path_computation_xpdr_uni(self):
196 url = "{}/operations/transportpce-pce:path-computation-request"
198 "service-name": "service-1",
199 "resource-reserve": "true",
200 "pce-metric": "hop-count",
201 "service-handler-header": {
202 "request-id": "request-1"
205 "node-id": "XPONDER-1-2",
206 "service-rate": "100",
207 "service-format": "Ethernet",
211 "node-id": "XPONDER-3-2",
212 "service-rate": "100",
213 "service-format": "Ethernet",
218 response = test_utils.post_request(url, data)
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 self.assertIn('Path is calculated',
222 res['output']['configuration-response-common']['response-message'])
225 # Path Computation success
226 def test_12_path_computation_rdm_uni(self):
227 url = "{}/operations/transportpce-pce:path-computation-request"
229 "service-name": "service1",
230 "resource-reserve": "true",
231 "service-handler-header": {
232 "request-id": "request1"
235 "service-rate": "100",
236 "service-format": "Ethernet",
238 "node-id": "OpenROADM-2-1"
241 "service-rate": "100",
242 "service-format": "Ethernet",
244 "node-id": "OpenROADM-2-2"
246 "pce-metric": "hop-count"
249 response = test_utils.post_request(url, data)
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
252 self.assertIn('Path is calculated',
253 res['output']['configuration-response-common']['response-message'])
255 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
256 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
257 self.assertEqual(atozList, 15)
258 self.assertEqual(ztoaList, 15)
259 for i in range(0, 15):
260 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
261 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
262 if (atoz['id'] == '14'):
263 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
264 if (ztoa['id'] == '0'):
265 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
269 def test_13_delete_simple_topology(self):
270 url = "{}/config/ietf-network:networks/network/openroadm-topology"
271 response = test_utils.delete_request(url)
272 self.assertEqual(response.status_code, requests.codes.ok)
275 # Test deleted topology
276 def test_14_test_topology_simple_deleted(self):
277 url = "{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
278 response = test_utils.get_request(url)
279 self.assertEqual(response.status_code, 404)
282 # Load complex topology
283 def test_15_load_complex_topology(self):
284 url = "{}/config/ietf-network:networks/network/openroadm-topology"
285 data = self.complex_topo_uni_dir_data
286 response = test_utils.put_xmlrequest(url, data)
287 self.assertEqual(response.status_code, 201)
290 # Get existing nodeId
291 def test_16_get_nodeId(self):
292 url = "{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
293 response = test_utils.get_request(url)
294 self.assertEqual(response.status_code, requests.codes.ok)
295 res = response.json()
297 res['node'][0]['node-id'],
301 # Test failed path computation
302 def test_17_fail_path_computation(self):
303 url = "{}/operations/transportpce-pce:path-computation-request"
305 "service-handler-header": {
306 "request-id": "request-1"
310 response = test_utils.post_request(url, data)
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
313 self.assertIn('Service Name is not set',
314 res['output']['configuration-response-common']['response-message'])
317 # Test1 success path computation
318 def test_18_success1_path_computation(self):
319 url = "{}/operations/transportpce-pce:path-computation-request"
321 "service-name": "service1",
322 "resource-reserve": "true",
323 "service-handler-header": {
324 "request-id": "request1"
327 "service-format": "Ethernet",
328 "service-rate": "100",
330 "node-id": "XPONDER-2-2",
333 "port-device-name": "Some port-device-name",
334 "port-type": "Some port-type",
335 "port-name": "Some port-name",
336 "port-rack": "Some port-rack",
337 "port-shelf": "Some port-shelf",
338 "port-slot": "Some port-slot",
339 "port-sub-slot": "Some port-sub-slot"
344 "port-device-name": "Some port-device-name",
345 "port-type": "Some port-type",
346 "port-name": "Some port-name",
347 "port-rack": "Some port-rack",
348 "port-shelf": "Some port-shelf",
349 "port-slot": "Some port-slot",
350 "port-sub-slot": "Some port-sub-slot"
355 "service-format": "Ethernet",
356 "service-rate": "100",
358 "node-id": "XPONDER-1-2",
361 "port-device-name": "Some port-device-name",
362 "port-type": "Some port-type",
363 "port-name": "Some port-name",
364 "port-rack": "Some port-rack",
365 "port-shelf": "Some port-shelf",
366 "port-slot": "Some port-slot",
367 "port-sub-slot": "Some port-sub-slot"
372 "port-device-name": "Some port-device-name",
373 "port-type": "Some port-type",
374 "port-name": "Some port-name",
375 "port-rack": "Some port-rack",
376 "port-shelf": "Some port-shelf",
377 "port-slot": "Some port-slot",
378 "port-sub-slot": "Some port-sub-slot"
382 "hard-constraints": {
387 "existing-service": [
388 "Some existing-service"
392 "soft-constraints": {
397 "existing-service": [
398 "Some existing-service"
402 "pce-metric": "hop-count",
403 "locally-protected-links": "true"
406 response = test_utils.post_request(url, data)
407 self.assertEqual(response.status_code, requests.codes.ok)
408 res = response.json()
409 self.assertIn('Path is calculated',
410 res['output']['configuration-response-common']['response-message'])
413 # Test2 success path computation with path description
414 def test_19_success2_path_computation(self):
415 url = "{}/operations/transportpce-pce:path-computation-request"
417 "service-name": "service 1",
418 "resource-reserve": "true",
419 "service-handler-header": {
420 "request-id": "request 1"
423 "service-rate": "100",
424 "service-format": "Ethernet",
425 "node-id": "XPONDER-1-2",
429 "service-rate": "100",
430 "service-format": "Ethernet",
431 "node-id": "XPONDER-3-2",
434 "pce-metric": "hop-count"
437 response = test_utils.post_request(url, data)
438 self.assertEqual(response.status_code, requests.codes.ok)
439 res = response.json()
440 self.assertIn('Path is calculated',
441 res['output']['configuration-response-common']['response-message'])
442 self.assertEqual(5, res['output']['response-parameters']['path-description']
443 ['aToZ-direction']['aToZ-wavelength-number'])
444 self.assertEqual(5, res['output']['response-parameters']['path-description']
445 ['zToA-direction']['zToA-wavelength-number'])
448 # Test3 success path computation with hard-constraints exclude
449 def test_20_success3_path_computation(self):
450 url = "{}/operations/transportpce-pce:path-computation-request"
452 "service-name": "service 1",
453 "resource-reserve": "true",
454 "service-handler-header": {
455 "request-id": "request 1"
458 "service-rate": "100",
459 "service-format": "Ethernet",
460 "node-id": "XPONDER-1-2",
464 "service-rate": "100",
465 "service-format": "Ethernet",
466 "node-id": "XPONDER-3-2",
469 "hard-constraints": {
471 "node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]
474 "pce-metric": "hop-count"
477 response = test_utils.post_request(url, data)
478 self.assertEqual(response.status_code, requests.codes.ok)
479 res = response.json()
480 self.assertIn('Path is calculated',
481 res['output']['configuration-response-common']['response-message'])
482 self.assertEqual(9, res['output']['response-parameters']['path-description']
483 ['aToZ-direction']['aToZ-wavelength-number'])
484 self.assertEqual(9, res['output']['response-parameters']['path-description']
485 ['zToA-direction']['zToA-wavelength-number'])
488 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
489 def test_21_path_computation_before_oms_attribute_deletion(self):
490 url = "{}/operations/transportpce-pce:path-computation-request"
492 "service-name": "service 1",
493 "resource-reserve": "true",
494 "service-handler-header": {
495 "request-id": "request 1"
498 "service-rate": "100",
499 "service-format": "Ethernet",
500 "node-id": "XPONDER-2-2",
504 "service-rate": "100",
505 "service-format": "Ethernet",
506 "node-id": "XPONDER-1-2",
509 "pce-metric": "hop-count"
512 response = test_utils.post_request(url, data)
513 self.assertEqual(response.status_code, requests.codes.ok)
514 res = response.json()
515 self.assertIn('Path is calculated',
516 res['output']['configuration-response-common']['response-message'])
517 nbElmPath = len(res['output']['response-parameters']['path-description']
518 ['aToZ-direction']['aToZ'])
519 self.assertEqual(31, nbElmPath)
520 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
522 for i in range(0, nbElmPath):
523 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
524 if(resource_i == link):
526 self.assertEqual(find, True)
529 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
530 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
531 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
532 "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2/org-openroadm-network-topology:OMS-attributes/span"
534 response = test_utils.delete_request(url)
535 self.assertEqual(response.status_code, requests.codes.ok)
538 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
539 def test_23_path_computation_after_oms_attribute_deletion(self):
540 url = "{}/operations/transportpce-pce:path-computation-request"
542 "service-name": "service 1",
543 "resource-reserve": "true",
544 "service-handler-header": {
545 "request-id": "request 1"
548 "service-rate": "100",
549 "service-format": "Ethernet",
550 "node-id": "XPONDER-2-2",
554 "service-rate": "100",
555 "service-format": "Ethernet",
556 "node-id": "XPONDER-1-2",
559 "pce-metric": "hop-count"
562 response = test_utils.post_request(url, data)
563 self.assertEqual(response.status_code, requests.codes.ok)
564 res = response.json()
565 self.assertIn('Path is calculated',
566 res['output']['configuration-response-common']['response-message'])
567 nbElmPath = len(res['output']['response-parameters']['path-description']
568 ['aToZ-direction']['aToZ'])
569 self.assertEqual(47, nbElmPath)
570 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
572 for i in range(0, nbElmPath):
573 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
574 if (resource_i == link):
576 self.assertNotEqual(find, True)
579 # Delete complex topology
580 def test_24_delete_complex_topology(self):
581 url = "{}/config/ietf-network:networks/network/openroadm-topology"
582 response = test_utils.delete_request(url)
583 self.assertEqual(response.status_code, requests.codes.ok)
586 # Test deleted complex topology
587 def test_25_test_topology_complex_deleted(self):
588 url = "{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
589 response = test_utils.get_request(url)
590 self.assertEqual(response.status_code, 404)
594 if __name__ == "__main__":
595 unittest.main(verbosity=2)