2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
16 from common import test_utils
19 class TransportPCEtesting(unittest.TestCase):
21 simple_topo_bi_dir_data = None
22 simple_topo_uni_dir_data = None
23 complex_topo_uni_dir_data = None
27 topo_bi_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
28 "..", "..", "sample_configs", "honeynode-topo.xml")
29 if os.path.isfile(topo_bi_dir_file):
30 with open(topo_bi_dir_file, 'r') as topo_bi_dir:
31 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
32 topo_uni_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
33 "..", "..", "sample_configs", "NW-simple-topology.xml")
34 if os.path.isfile(topo_uni_dir_file):
35 with open(topo_uni_dir_file, 'r') as topo_uni_dir:
36 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
37 topo_uni_dir_complex_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
38 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
39 if os.path.isfile(topo_uni_dir_complex_file):
40 with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
41 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
48 cls.processes = test_utils.start_tpce()
51 def tearDownClass(cls):
52 for process in cls.processes:
53 test_utils.shutdown_process(process)
54 print("all processes killed")
56 def setUp(self): # instruction executed before each test method
59 # Load simple bidirectional topology
60 def test_01_load_simple_topology_bi(self):
61 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
62 .format(test_utils.RESTCONF_BASE_URL))
63 body = self.simple_topo_bi_dir_data
64 response = requests.request(
65 "PUT", url, data=body, headers=test_utils.TYPE_APPLICATION_XML,
66 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
67 self.assertEqual(response.status_code, requests.codes.ok)
71 def test_02_get_nodeId(self):
72 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
73 .format(test_utils.RESTCONF_BASE_URL))
74 response = requests.request(
75 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
76 self.assertEqual(response.status_code, requests.codes.ok)
79 res['node'][0]['node-id'], 'ROADMA01-SRG1')
83 def test_03_get_linkId(self):
84 url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
85 .format(test_utils.RESTCONF_BASE_URL))
86 response = requests.request(
87 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
88 self.assertEqual(response.status_code, requests.codes.ok)
91 res['ietf-network-topology:link'][0]['link-id'],
92 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
95 # Path Computation success
96 def test_04_path_computation_xpdr_bi(self):
97 url = ("{}/operations/transportpce-pce:path-computation-request"
98 .format(test_utils.RESTCONF_BASE_URL))
100 "service-name": "service-1",
101 "resource-reserve": "true",
102 "pce-metric": "hop-count",
103 "service-handler-header": {
104 "request-id": "request-1"
107 "node-id": "XPDRA01",
108 "service-rate": "100",
109 "service-format": "Ethernet",
113 "node-id": "XPDRC01",
114 "service-rate": "100",
115 "service-format": "Ethernet",
120 response = requests.request(
121 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
122 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
125 self.assertIn('Path is calculated',
126 res['output']['configuration-response-common']['response-message'])
129 # Path Computation success
130 def test_05_path_computation_rdm_bi(self):
131 url = ("{}/operations/transportpce-pce:path-computation-request"
132 .format(test_utils.RESTCONF_BASE_URL))
134 "service-name": "service-1",
135 "resource-reserve": "true",
136 "pce-metric": "hop-count",
137 "service-handler-header": {
138 "request-id": "request-1"
141 "node-id": "ROADMA01",
142 "service-rate": "100",
143 "service-format": "Ethernet",
147 "node-id": "ROADMC01",
148 "service-rate": "100",
149 "service-format": "Ethernet",
154 response = requests.request(
155 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
156 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
157 self.assertEqual(response.status_code, requests.codes.ok)
158 res = response.json()
159 self.assertIn('Path is calculated',
160 res['output']['configuration-response-common']['response-message'])
164 def test_06_delete_simple_topology_bi(self):
165 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
166 .format(test_utils.RESTCONF_BASE_URL))
167 response = requests.request(
168 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
169 self.assertEqual(response.status_code, requests.codes.ok)
172 # Test deleted topology
173 def test_07_test_topology_simple_bi_deleted(self):
174 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
175 .format(test_utils.RESTCONF_BASE_URL))
176 response = requests.request(
177 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
178 self.assertEqual(response.status_code, 404)
181 # Load simple bidirectional topology
182 def test_08_load_simple_topology_uni(self):
183 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
184 .format(test_utils.RESTCONF_BASE_URL))
185 body = self.simple_topo_uni_dir_data
186 response = requests.request(
187 "PUT", url, data=body, headers=test_utils.TYPE_APPLICATION_XML,
188 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
189 self.assertEqual(response.status_code, 201)
192 # Get existing nodeId
193 def test_09_get_nodeId(self):
194 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
195 .format(test_utils.RESTCONF_BASE_URL))
196 response = requests.request(
197 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
198 self.assertEqual(response.status_code, requests.codes.ok)
199 res = response.json()
201 res['node'][0]['node-id'],
205 # Get existing linkId
206 def test_10_get_linkId(self):
207 url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
208 .format(test_utils.RESTCONF_BASE_URL))
209 response = requests.request(
210 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
214 res['ietf-network-topology:link'][0]['link-id'],
215 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
218 # Path Computation success
219 def test_11_path_computation_xpdr_uni(self):
220 url = ("{}/operations/transportpce-pce:path-computation-request"
221 .format(test_utils.RESTCONF_BASE_URL))
223 "service-name": "service-1",
224 "resource-reserve": "true",
225 "pce-metric": "hop-count",
226 "service-handler-header": {
227 "request-id": "request-1"
230 "node-id": "XPONDER-1-2",
231 "service-rate": "100",
232 "service-format": "Ethernet",
236 "node-id": "XPONDER-3-2",
237 "service-rate": "100",
238 "service-format": "Ethernet",
243 response = requests.request(
244 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
245 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
248 self.assertIn('Path is calculated',
249 res['output']['configuration-response-common']['response-message'])
252 # Path Computation success
253 def test_12_path_computation_rdm_uni(self):
254 url = ("{}/operations/transportpce-pce:path-computation-request"
255 .format(test_utils.RESTCONF_BASE_URL))
257 "service-name": "service1",
258 "resource-reserve": "true",
259 "service-handler-header": {
260 "request-id": "request1"
263 "service-rate": "100",
264 "service-format": "Ethernet",
266 "node-id": "OpenROADM-2-1"
269 "service-rate": "100",
270 "service-format": "Ethernet",
272 "node-id": "OpenROADM-2-2"
274 "pce-metric": "hop-count"
277 response = requests.request(
278 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
279 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
282 self.assertIn('Path is calculated',
283 res['output']['configuration-response-common']['response-message'])
285 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
286 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
287 self.assertEqual(atozList, 15)
288 self.assertEqual(ztoaList, 15)
289 for i in range(0, 15):
290 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
291 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
292 if (atoz['id'] == '14'):
293 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
294 if (ztoa['id'] == '0'):
295 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
299 def test_13_delete_simple_topology(self):
300 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
301 .format(test_utils.RESTCONF_BASE_URL))
302 response = requests.request(
303 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
304 self.assertEqual(response.status_code, requests.codes.ok)
307 # Test deleted topology
308 def test_14_test_topology_simple_deleted(self):
309 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
310 .format(test_utils.RESTCONF_BASE_URL))
311 response = requests.request(
312 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
313 self.assertEqual(response.status_code, 404)
316 # Load complex topology
317 def test_15_load_complex_topology(self):
318 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
319 .format(test_utils.RESTCONF_BASE_URL))
320 body = self.complex_topo_uni_dir_data
321 response = requests.request(
322 "PUT", url, data=body, headers=test_utils.TYPE_APPLICATION_XML,
323 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
324 self.assertEqual(response.status_code, 201)
327 # Get existing nodeId
328 def test_16_get_nodeId(self):
329 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
330 .format(test_utils.RESTCONF_BASE_URL))
331 response = requests.request(
332 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
336 res['node'][0]['node-id'],
340 # Test failed path computation
341 def test_17_fail_path_computation(self):
342 url = ("{}/operations/transportpce-pce:path-computation-request"
343 .format(test_utils.RESTCONF_BASE_URL))
345 "service-handler-header": {
346 "request-id": "request-1"
350 response = requests.request(
351 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
352 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
353 self.assertEqual(response.status_code, requests.codes.ok)
354 res = response.json()
355 self.assertIn('Service Name is not set',
356 res['output']['configuration-response-common']['response-message'])
359 # Test1 success path computation
360 def test_18_success1_path_computation(self):
361 url = ("{}/operations/transportpce-pce:path-computation-request"
362 .format(test_utils.RESTCONF_BASE_URL))
364 "service-name": "service1",
365 "resource-reserve": "true",
366 "service-handler-header": {
367 "request-id": "request1"
370 "service-format": "Ethernet",
371 "service-rate": "100",
373 "node-id": "XPONDER-2-2",
376 "port-device-name": "Some port-device-name",
377 "port-type": "Some port-type",
378 "port-name": "Some port-name",
379 "port-rack": "Some port-rack",
380 "port-shelf": "Some port-shelf",
381 "port-slot": "Some port-slot",
382 "port-sub-slot": "Some port-sub-slot"
387 "port-device-name": "Some port-device-name",
388 "port-type": "Some port-type",
389 "port-name": "Some port-name",
390 "port-rack": "Some port-rack",
391 "port-shelf": "Some port-shelf",
392 "port-slot": "Some port-slot",
393 "port-sub-slot": "Some port-sub-slot"
398 "service-format": "Ethernet",
399 "service-rate": "100",
401 "node-id": "XPONDER-1-2",
404 "port-device-name": "Some port-device-name",
405 "port-type": "Some port-type",
406 "port-name": "Some port-name",
407 "port-rack": "Some port-rack",
408 "port-shelf": "Some port-shelf",
409 "port-slot": "Some port-slot",
410 "port-sub-slot": "Some port-sub-slot"
415 "port-device-name": "Some port-device-name",
416 "port-type": "Some port-type",
417 "port-name": "Some port-name",
418 "port-rack": "Some port-rack",
419 "port-shelf": "Some port-shelf",
420 "port-slot": "Some port-slot",
421 "port-sub-slot": "Some port-sub-slot"
425 "hard-constraints": {
430 "existing-service": [
431 "Some existing-service"
435 "soft-constraints": {
440 "existing-service": [
441 "Some existing-service"
445 "pce-metric": "hop-count",
446 "locally-protected-links": "true"
449 response = requests.request(
450 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
451 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
452 self.assertEqual(response.status_code, requests.codes.ok)
453 res = response.json()
454 self.assertIn('Path is calculated',
455 res['output']['configuration-response-common']['response-message'])
458 # Test2 success path computation with path description
459 def test_19_success2_path_computation(self):
460 url = ("{}/operations/transportpce-pce:path-computation-request"
461 .format(test_utils.RESTCONF_BASE_URL))
463 "service-name": "service 1",
464 "resource-reserve": "true",
465 "service-handler-header": {
466 "request-id": "request 1"
469 "service-rate": "100",
470 "service-format": "Ethernet",
471 "node-id": "XPONDER-1-2",
475 "service-rate": "100",
476 "service-format": "Ethernet",
477 "node-id": "XPONDER-3-2",
480 "pce-metric": "hop-count"
483 response = requests.request(
484 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
485 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
486 self.assertEqual(response.status_code, requests.codes.ok)
487 res = response.json()
488 self.assertIn('Path is calculated',
489 res['output']['configuration-response-common']['response-message'])
490 self.assertEqual(5, res['output']['response-parameters']['path-description']
491 ['aToZ-direction']['aToZ-wavelength-number'])
492 self.assertEqual(5, res['output']['response-parameters']['path-description']
493 ['zToA-direction']['zToA-wavelength-number'])
496 # Test3 success path computation with hard-constraints exclude
497 def test_20_success3_path_computation(self):
498 url = ("{}/operations/transportpce-pce:path-computation-request"
499 .format(test_utils.RESTCONF_BASE_URL))
501 "service-name": "service 1",
502 "resource-reserve": "true",
503 "service-handler-header": {
504 "request-id": "request 1"
507 "service-rate": "100",
508 "service-format": "Ethernet",
509 "node-id": "XPONDER-1-2",
513 "service-rate": "100",
514 "service-format": "Ethernet",
515 "node-id": "XPONDER-3-2",
518 "hard-constraints": {
520 "node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]
523 "pce-metric": "hop-count"
526 response = requests.request(
527 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
528 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
529 self.assertEqual(response.status_code, requests.codes.ok)
530 res = response.json()
531 self.assertIn('Path is calculated',
532 res['output']['configuration-response-common']['response-message'])
533 self.assertEqual(9, res['output']['response-parameters']['path-description']
534 ['aToZ-direction']['aToZ-wavelength-number'])
535 self.assertEqual(9, res['output']['response-parameters']['path-description']
536 ['zToA-direction']['zToA-wavelength-number'])
539 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
540 def test_21_path_computation_before_oms_attribute_deletion(self):
541 url = ("{}/operations/transportpce-pce:path-computation-request"
542 .format(test_utils.RESTCONF_BASE_URL))
544 "service-name": "service 1",
545 "resource-reserve": "true",
546 "service-handler-header": {
547 "request-id": "request 1"
550 "service-rate": "100",
551 "service-format": "Ethernet",
552 "node-id": "XPONDER-2-2",
556 "service-rate": "100",
557 "service-format": "Ethernet",
558 "node-id": "XPONDER-1-2",
561 "pce-metric": "hop-count"
564 response = requests.request(
565 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
566 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 self.assertIn('Path is calculated',
570 res['output']['configuration-response-common']['response-message'])
571 nbElmPath = len(res['output']['response-parameters']['path-description']
572 ['aToZ-direction']['aToZ'])
573 self.assertEqual(31, nbElmPath)
574 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
576 for i in range(0, nbElmPath):
577 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
578 if(resource_i == link):
580 self.assertEqual(find, True)
583 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
584 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
585 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
586 "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2/org-openroadm-network-topology:OMS-attributes/span"
587 .format(test_utils.RESTCONF_BASE_URL))
588 response = requests.request(
589 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
590 self.assertEqual(response.status_code, requests.codes.ok)
593 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
594 def test_23_path_computation_after_oms_attribute_deletion(self):
595 url = ("{}/operations/transportpce-pce:path-computation-request"
596 .format(test_utils.RESTCONF_BASE_URL))
598 "service-name": "service 1",
599 "resource-reserve": "true",
600 "service-handler-header": {
601 "request-id": "request 1"
604 "service-rate": "100",
605 "service-format": "Ethernet",
606 "node-id": "XPONDER-2-2",
610 "service-rate": "100",
611 "service-format": "Ethernet",
612 "node-id": "XPONDER-1-2",
615 "pce-metric": "hop-count"
618 response = requests.request(
619 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
620 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
621 self.assertEqual(response.status_code, requests.codes.ok)
622 res = response.json()
623 self.assertIn('Path is calculated',
624 res['output']['configuration-response-common']['response-message'])
625 nbElmPath = len(res['output']['response-parameters']['path-description']
626 ['aToZ-direction']['aToZ'])
627 self.assertEqual(47, nbElmPath)
628 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
630 for i in range(0, nbElmPath):
631 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
632 if (resource_i == link):
634 self.assertNotEqual(find, True)
637 # Delete complex topology
638 def test_24_delete_complex_topology(self):
639 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
640 .format(test_utils.RESTCONF_BASE_URL))
641 response = requests.request(
642 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
643 self.assertEqual(response.status_code, requests.codes.ok)
646 # Test deleted complex topology
647 def test_25_test_topology_complex_deleted(self):
648 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
649 .format(test_utils.RESTCONF_BASE_URL))
650 response = requests.request(
651 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
652 self.assertEqual(response.status_code, 404)
656 if __name__ == "__main__":
657 unittest.main(verbosity=2)