2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
20 from common import test_utils
23 class TransportPCEtesting(unittest.TestCase):
25 simple_topo_bi_dir_data = None
26 simple_topo_uni_dir_data = None
27 complex_topo_uni_dir_data = None
31 topo_bi_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
32 "..", "..", "sample_configs", "honeynode-topo.xml")
33 if os.path.isfile(topo_bi_dir_file):
34 with open(topo_bi_dir_file, 'r') as topo_bi_dir:
35 cls.simple_topo_bi_dir_data = topo_bi_dir.read()
36 topo_uni_dir_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
37 "..", "..", "sample_configs", "NW-simple-topology.xml")
38 if os.path.isfile(topo_uni_dir_file):
39 with open(topo_uni_dir_file, 'r') as topo_uni_dir:
40 cls.simple_topo_uni_dir_data = topo_uni_dir.read()
41 topo_uni_dir_complex_file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
42 "..", "..", "sample_configs", "NW-for-test-5-4.xml")
43 if os.path.isfile(topo_uni_dir_complex_file):
44 with open(topo_uni_dir_complex_file, 'r') as topo_uni_dir_complex:
45 cls.complex_topo_uni_dir_data = topo_uni_dir_complex.read()
52 cls.processes = test_utils.start_tpce()
55 def tearDownClass(cls):
56 for process in cls.processes:
57 test_utils.shutdown_process(process)
58 print("all processes killed")
60 def setUp(self): # instruction executed before each test method
63 # Load simple bidirectional topology
64 def test_01_load_simple_topology_bi(self):
65 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
66 .format(test_utils.RESTCONF_BASE_URL))
67 body = self.simple_topo_bi_dir_data
68 response = requests.request(
69 "PUT", url, data=body, headers=test_utils.TYPE_APPLICATION_XML,
70 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
71 self.assertEqual(response.status_code, requests.codes.ok)
75 def test_02_get_nodeId(self):
76 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
77 .format(test_utils.RESTCONF_BASE_URL))
78 response = requests.request(
79 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
80 self.assertEqual(response.status_code, requests.codes.ok)
83 res['node'][0]['node-id'], 'ROADMA01-SRG1')
87 def test_03_get_linkId(self):
88 url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
89 .format(test_utils.RESTCONF_BASE_URL))
90 response = requests.request(
91 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
92 self.assertEqual(response.status_code, requests.codes.ok)
95 res['ietf-network-topology:link'][0]['link-id'],
96 'XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX')
99 # Path Computation success
100 def test_04_path_computation_xpdr_bi(self):
101 url = ("{}/operations/transportpce-pce:path-computation-request"
102 .format(test_utils.RESTCONF_BASE_URL))
104 "service-name": "service-1",
105 "resource-reserve": "true",
106 "pce-metric": "hop-count",
107 "service-handler-header": {
108 "request-id": "request-1"
111 "node-id": "XPDRA01",
112 "service-rate": "100",
113 "service-format": "Ethernet",
117 "node-id": "XPDRC01",
118 "service-rate": "100",
119 "service-format": "Ethernet",
124 response = requests.request(
125 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
126 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
127 self.assertEqual(response.status_code, requests.codes.ok)
128 res = response.json()
129 self.assertIn('Path is calculated',
130 res['output']['configuration-response-common']['response-message'])
133 # Path Computation success
134 def test_05_path_computation_rdm_bi(self):
135 url = ("{}/operations/transportpce-pce:path-computation-request"
136 .format(test_utils.RESTCONF_BASE_URL))
138 "service-name": "service-1",
139 "resource-reserve": "true",
140 "pce-metric": "hop-count",
141 "service-handler-header": {
142 "request-id": "request-1"
145 "node-id": "ROADMA01",
146 "service-rate": "100",
147 "service-format": "Ethernet",
151 "node-id": "ROADMC01",
152 "service-rate": "100",
153 "service-format": "Ethernet",
158 response = requests.request(
159 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
160 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Path is calculated',
164 res['output']['configuration-response-common']['response-message'])
168 def test_06_delete_simple_topology_bi(self):
169 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
170 .format(test_utils.RESTCONF_BASE_URL))
171 response = requests.request(
172 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
173 self.assertEqual(response.status_code, requests.codes.ok)
176 # Test deleted topology
177 def test_07_test_topology_simple_bi_deleted(self):
178 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA01-SRG1"
179 .format(test_utils.RESTCONF_BASE_URL))
180 response = requests.request(
181 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
182 self.assertEqual(response.status_code, 404)
185 # Load simple bidirectional topology
186 def test_08_load_simple_topology_uni(self):
187 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
188 .format(test_utils.RESTCONF_BASE_URL))
189 body = self.simple_topo_uni_dir_data
190 response = requests.request(
191 "PUT", url, data=body, headers=test_utils.TYPE_APPLICATION_XML,
192 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
193 self.assertEqual(response.status_code, 201)
196 # Get existing nodeId
197 def test_09_get_nodeId(self):
198 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
199 .format(test_utils.RESTCONF_BASE_URL))
200 response = requests.request(
201 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
202 self.assertEqual(response.status_code, requests.codes.ok)
203 res = response.json()
205 res['node'][0]['node-id'],
209 # Get existing linkId
210 def test_10_get_linkId(self):
211 url = ("{}/config/ietf-network:networks/network/openroadm-topology/link/XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX"
212 .format(test_utils.RESTCONF_BASE_URL))
213 response = requests.request(
214 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
215 self.assertEqual(response.status_code, requests.codes.ok)
216 res = response.json()
218 res['ietf-network-topology:link'][0]['link-id'],
219 'XPONDER-1-2XPDR-NW1-TX-toOpenROADM-1-2-SRG1-SRG1-PP1-RX')
222 # Path Computation success
223 def test_11_path_computation_xpdr_uni(self):
224 url = ("{}/operations/transportpce-pce:path-computation-request"
225 .format(test_utils.RESTCONF_BASE_URL))
227 "service-name": "service-1",
228 "resource-reserve": "true",
229 "pce-metric": "hop-count",
230 "service-handler-header": {
231 "request-id": "request-1"
234 "node-id": "XPONDER-1-2",
235 "service-rate": "100",
236 "service-format": "Ethernet",
240 "node-id": "XPONDER-3-2",
241 "service-rate": "100",
242 "service-format": "Ethernet",
247 response = requests.request(
248 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
249 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
252 self.assertIn('Path is calculated',
253 res['output']['configuration-response-common']['response-message'])
256 # Path Computation success
257 def test_12_path_computation_rdm_uni(self):
258 url = ("{}/operations/transportpce-pce:path-computation-request"
259 .format(test_utils.RESTCONF_BASE_URL))
261 "service-name": "service1",
262 "resource-reserve": "true",
263 "service-handler-header": {
264 "request-id": "request1"
267 "service-rate": "100",
268 "service-format": "Ethernet",
270 "node-id": "OpenROADM-2-1"
273 "service-rate": "100",
274 "service-format": "Ethernet",
276 "node-id": "OpenROADM-2-2"
278 "pce-metric": "hop-count"
281 response = requests.request(
282 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
283 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
284 self.assertEqual(response.status_code, requests.codes.ok)
285 res = response.json()
286 self.assertIn('Path is calculated',
287 res['output']['configuration-response-common']['response-message'])
289 atozList = len(res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'])
290 ztoaList = len(res['output']['response-parameters']['path-description']['zToA-direction']['zToA'])
291 self.assertEqual(atozList, 15)
292 self.assertEqual(ztoaList, 15)
293 for i in range(0, 15):
294 atoz = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]
295 ztoa = res['output']['response-parameters']['path-description']['zToA-direction']['zToA'][i]
296 if (atoz['id'] == '14'):
297 self.assertEqual(atoz['resource']['tp-id'], 'SRG1-PP1-TX')
298 if (ztoa['id'] == '0'):
299 self.assertEqual(ztoa['resource']['tp-id'], 'SRG1-PP1-RX')
303 def test_13_delete_simple_topology(self):
304 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
305 .format(test_utils.RESTCONF_BASE_URL))
306 response = requests.request(
307 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
308 self.assertEqual(response.status_code, requests.codes.ok)
311 # Test deleted topology
312 def test_14_test_topology_simple_deleted(self):
313 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-1-2"
314 .format(test_utils.RESTCONF_BASE_URL))
315 response = requests.request(
316 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
317 self.assertEqual(response.status_code, 404)
320 # Load complex topology
321 def test_15_load_complex_topology(self):
322 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
323 .format(test_utils.RESTCONF_BASE_URL))
324 body = self.complex_topo_uni_dir_data
325 response = requests.request(
326 "PUT", url, data=body, headers=test_utils.TYPE_APPLICATION_XML,
327 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
328 self.assertEqual(response.status_code, 201)
331 # Get existing nodeId
332 def test_16_get_nodeId(self):
333 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
334 .format(test_utils.RESTCONF_BASE_URL))
335 response = requests.request(
336 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
337 self.assertEqual(response.status_code, requests.codes.ok)
338 res = response.json()
340 res['node'][0]['node-id'],
344 # Test failed path computation
345 def test_17_fail_path_computation(self):
346 url = ("{}/operations/transportpce-pce:path-computation-request"
347 .format(test_utils.RESTCONF_BASE_URL))
349 "service-handler-header": {
350 "request-id": "request-1"
354 response = requests.request(
355 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
356 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertIn('Service Name is not set',
360 res['output']['configuration-response-common']['response-message'])
363 # Test1 success path computation
364 def test_18_success1_path_computation(self):
365 url = ("{}/operations/transportpce-pce:path-computation-request"
366 .format(test_utils.RESTCONF_BASE_URL))
368 "service-name": "service1",
369 "resource-reserve": "true",
370 "service-handler-header": {
371 "request-id": "request1"
374 "service-format": "Ethernet",
375 "service-rate": "100",
377 "node-id": "XPONDER-2-2",
380 "port-device-name": "Some port-device-name",
381 "port-type": "Some port-type",
382 "port-name": "Some port-name",
383 "port-rack": "Some port-rack",
384 "port-shelf": "Some port-shelf",
385 "port-slot": "Some port-slot",
386 "port-sub-slot": "Some port-sub-slot"
391 "port-device-name": "Some port-device-name",
392 "port-type": "Some port-type",
393 "port-name": "Some port-name",
394 "port-rack": "Some port-rack",
395 "port-shelf": "Some port-shelf",
396 "port-slot": "Some port-slot",
397 "port-sub-slot": "Some port-sub-slot"
402 "service-format": "Ethernet",
403 "service-rate": "100",
405 "node-id": "XPONDER-1-2",
408 "port-device-name": "Some port-device-name",
409 "port-type": "Some port-type",
410 "port-name": "Some port-name",
411 "port-rack": "Some port-rack",
412 "port-shelf": "Some port-shelf",
413 "port-slot": "Some port-slot",
414 "port-sub-slot": "Some port-sub-slot"
419 "port-device-name": "Some port-device-name",
420 "port-type": "Some port-type",
421 "port-name": "Some port-name",
422 "port-rack": "Some port-rack",
423 "port-shelf": "Some port-shelf",
424 "port-slot": "Some port-slot",
425 "port-sub-slot": "Some port-sub-slot"
429 "hard-constraints": {
434 "existing-service": [
435 "Some existing-service"
439 "soft-constraints": {
444 "existing-service": [
445 "Some existing-service"
449 "pce-metric": "hop-count",
450 "locally-protected-links": "true"
453 response = requests.request(
454 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
455 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
456 self.assertEqual(response.status_code, requests.codes.ok)
457 res = response.json()
458 self.assertIn('Path is calculated',
459 res['output']['configuration-response-common']['response-message'])
462 # Test2 success path computation with path description
463 def test_19_success2_path_computation(self):
464 url = ("{}/operations/transportpce-pce:path-computation-request"
465 .format(test_utils.RESTCONF_BASE_URL))
467 "service-name": "service 1",
468 "resource-reserve": "true",
469 "service-handler-header": {
470 "request-id": "request 1"
473 "service-rate": "100",
474 "service-format": "Ethernet",
475 "node-id": "XPONDER-1-2",
479 "service-rate": "100",
480 "service-format": "Ethernet",
481 "node-id": "XPONDER-3-2",
484 "pce-metric": "hop-count"
487 response = requests.request(
488 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
489 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
490 self.assertEqual(response.status_code, requests.codes.ok)
491 res = response.json()
492 self.assertIn('Path is calculated',
493 res['output']['configuration-response-common']['response-message'])
494 self.assertEqual(5, res['output']['response-parameters']['path-description']
495 ['aToZ-direction']['aToZ-wavelength-number'])
496 self.assertEqual(5, res['output']['response-parameters']['path-description']
497 ['zToA-direction']['zToA-wavelength-number'])
500 # Test3 success path computation with hard-constraints exclude
501 def test_20_success3_path_computation(self):
502 url = ("{}/operations/transportpce-pce:path-computation-request"
503 .format(test_utils.RESTCONF_BASE_URL))
505 "service-name": "service 1",
506 "resource-reserve": "true",
507 "service-handler-header": {
508 "request-id": "request 1"
511 "service-rate": "100",
512 "service-format": "Ethernet",
513 "node-id": "XPONDER-1-2",
517 "service-rate": "100",
518 "service-format": "Ethernet",
519 "node-id": "XPONDER-3-2",
522 "hard-constraints": {
524 "node-id": ["OpenROADM-2-1", "OpenROADM-2-2"]
527 "pce-metric": "hop-count"
530 response = requests.request(
531 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
532 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
533 self.assertEqual(response.status_code, requests.codes.ok)
534 res = response.json()
535 self.assertIn('Path is calculated',
536 res['output']['configuration-response-common']['response-message'])
537 self.assertEqual(9, res['output']['response-parameters']['path-description']
538 ['aToZ-direction']['aToZ-wavelength-number'])
539 self.assertEqual(9, res['output']['response-parameters']['path-description']
540 ['zToA-direction']['zToA-wavelength-number'])
543 # Path computation before deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
544 def test_21_path_computation_before_oms_attribute_deletion(self):
545 url = ("{}/operations/transportpce-pce:path-computation-request"
546 .format(test_utils.RESTCONF_BASE_URL))
548 "service-name": "service 1",
549 "resource-reserve": "true",
550 "service-handler-header": {
551 "request-id": "request 1"
554 "service-rate": "100",
555 "service-format": "Ethernet",
556 "node-id": "XPONDER-2-2",
560 "service-rate": "100",
561 "service-format": "Ethernet",
562 "node-id": "XPONDER-1-2",
565 "pce-metric": "hop-count"
568 response = requests.request(
569 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
570 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
571 self.assertEqual(response.status_code, requests.codes.ok)
572 res = response.json()
573 self.assertIn('Path is calculated',
574 res['output']['configuration-response-common']['response-message'])
575 nbElmPath = len(res['output']['response-parameters']['path-description']
576 ['aToZ-direction']['aToZ'])
577 self.assertEqual(31, nbElmPath)
578 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
580 for i in range(0, nbElmPath):
581 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
582 if(resource_i == link):
584 self.assertEqual(find, True)
587 # Delete oms-attribute in the link :openroadm1-3 to openroadm1-2
588 def test_22_delete_oms_attribute_in_openroadm13toopenroadm12_link(self):
589 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
590 "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2/org-openroadm-network-topology:OMS-attributes/span"
591 .format(test_utils.RESTCONF_BASE_URL))
592 response = requests.request(
593 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
594 self.assertEqual(response.status_code, requests.codes.ok)
597 # Path computation after deleting oms-attribute of the link :openroadm1-3 to openroadm1-2
598 def test_23_path_computation_after_oms_attribute_deletion(self):
599 url = ("{}/operations/transportpce-pce:path-computation-request"
600 .format(test_utils.RESTCONF_BASE_URL))
602 "service-name": "service 1",
603 "resource-reserve": "true",
604 "service-handler-header": {
605 "request-id": "request 1"
608 "service-rate": "100",
609 "service-format": "Ethernet",
610 "node-id": "XPONDER-2-2",
614 "service-rate": "100",
615 "service-format": "Ethernet",
616 "node-id": "XPONDER-1-2",
619 "pce-metric": "hop-count"
622 response = requests.request(
623 "POST", url, data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
624 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
625 self.assertEqual(response.status_code, requests.codes.ok)
626 res = response.json()
627 self.assertIn('Path is calculated',
628 res['output']['configuration-response-common']['response-message'])
629 nbElmPath = len(res['output']['response-parameters']['path-description']
630 ['aToZ-direction']['aToZ'])
631 self.assertEqual(47, nbElmPath)
632 link = {"link-id": "OpenROADM-1-3-DEG2-to-OpenROADM-1-2-DEG2"}
634 for i in range(0, nbElmPath):
635 resource_i = res['output']['response-parameters']['path-description']['aToZ-direction']['aToZ'][i]['resource']
636 if (resource_i == link):
638 self.assertNotEqual(find, True)
641 # Delete complex topology
642 def test_24_delete_complex_topology(self):
643 url = ("{}/config/ietf-network:networks/network/openroadm-topology"
644 .format(test_utils.RESTCONF_BASE_URL))
645 response = requests.request(
646 "DELETE", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
647 self.assertEqual(response.status_code, requests.codes.ok)
650 # Test deleted complex topology
651 def test_25_test_topology_complex_deleted(self):
652 url = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPONDER-3-2"
653 .format(test_utils.RESTCONF_BASE_URL))
654 response = requests.request(
655 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
656 self.assertEqual(response.status_code, 404)
660 if __name__ == "__main__":
661 unittest.main(verbosity=2)