2 ##############################################################################
3 #Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
22 class TransportPCEtesting(unittest.TestCase):
25 restconf_baseurl = "http://127.0.0.1:8181/restconf"
29 executable = "../karaf/target/assembly/bin/karaf"
30 with open('odl.log', 'w') as outfile:
31 cls.odl_process = subprocess.Popen(
32 ["bash", executable], stdout=outfile,
33 stdin=open(os.devnull))
36 def setUpClass(cls): #a class method called before tests in an individual class run.
41 def tearDownClass(cls):
42 for child in psutil.Process(cls.odl_process.pid).children():
43 child.send_signal(signal.SIGINT)
45 cls.odl_process.send_signal(signal.SIGINT)
46 cls.odl_process.wait()
48 def setUp(self): #instruction executed before each test method
51 #Get existing PathDesciption
52 def test_01_get_pathdescriptions(self):
53 url = ("{}/operational/stubpce:path-description-list/pathDescriptions/NodeAToNodeZ_direct_1"
54 .format(self.restconf_baseurl))
55 headers = {'content-type': 'application/json',
56 "Accept": "application/json"}
57 response = requests.request(
58 "GET", url, headers=headers, auth=('admin', 'admin'))
59 self.assertEqual(response.status_code, requests.codes.ok)
62 res['pathDescriptions'][0]['path-name'],
63 'NodeAToNodeZ_direct_1')
66 #Get non existing PathDesciption
67 def test_02_get_pathdescriptions(self):
68 url = ("{}/operational/stubpce:path-description-list/pathDescriptions/Node"
69 .format(self.restconf_baseurl))
70 headers = {'content-type': 'application/json',
71 "Accept": "application/json"}
72 response = requests.request(
73 "GET", url, headers=headers, auth=('admin', 'admin'))
74 self.assertEqual(response.status_code, 404)
77 #Create Service 'test' with correct parameters
78 def test_03_create_service(self):
79 url = ("{}/operations/org-openroadm-service:service-create"
80 .format(self.restconf_baseurl))
82 "sdnc-request-header": {
83 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
84 "rpc-action": "service-create",
85 "request-system-id": "appname",
86 "notification-url": "http://localhost:8585/NotificationServer/notify"
88 "service-name": "test",
89 "common-id": "ASATT1234567",
90 "connection-type": "infrastructure",
92 "service-rate": "100",
94 "service-format": "Ethernet",
95 "clli": "SNJSCAMCJP8",
98 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
99 "port-type": "router",
100 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
101 "port-rack": "000000.00",
105 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
106 "lgx-port-name": "LGX Back.3",
107 "lgx-port-rack": "000000.00",
108 "lgx-port-shelf": "00"
113 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
114 "port-type": "router",
115 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
116 "port-rack": "000000.00",
120 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
121 "lgx-port-name": "LGX Back.4",
122 "lgx-port-rack": "000000.00",
123 "lgx-port-shelf": "00"
129 "service-rate": "100",
131 "service-format": "Ethernet",
132 "clli": "SNJSCAMCJT4",
135 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
136 "port-type": "router",
137 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
138 "port-rack": "000000.00",
142 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
143 "lgx-port-name": "LGX Back.29",
144 "lgx-port-rack": "000000.00",
145 "lgx-port-shelf": "00"
150 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
151 "port-type": "router",
152 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
153 "port-rack": "000000.00",
157 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
158 "lgx-port-name": "LGX Back.30",
159 "lgx-port-rack": "000000.00",
160 "lgx-port-shelf": "00"
165 "due-date": "2016-11-28T00:00:01Z",
166 "operator-contact": "pw1234"
169 headers = {'content-type': 'application/json',
170 "Accept": "application/json"}
171 response = requests.request(
172 "POST", url, data=json.dumps(data), headers=headers,
173 auth=('admin', 'admin'))
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('in progress',
177 res['output']['configuration-response-common']['response-message'])
180 #Create Service 'test' with not compliant parameter : no 'sdnc-request-header' parameter
181 def test_04_create_service(self):
182 url = ("{}/operations/org-openroadm-service:service-create"
183 .format(self.restconf_baseurl))
185 "service-name": "test",
186 "common-id": "ASATT1234567",
187 "connection-type": "infrastructure",
189 "service-rate": "100",
191 "service-format": "Ethernet",
192 "clli": "SNJSCAMCJP8",
195 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
196 "port-type": "router",
197 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
198 "port-rack": "000000.00",
202 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
203 "lgx-port-name": "LGX Back.3",
204 "lgx-port-rack": "000000.00",
205 "lgx-port-shelf": "00"
210 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
211 "port-type": "router",
212 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
213 "port-rack": "000000.00",
217 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
218 "lgx-port-name": "LGX Back.4",
219 "lgx-port-rack": "000000.00",
220 "lgx-port-shelf": "00"
226 "service-rate": "100",
228 "service-format": "Ethernet",
229 "clli": "SNJSCAMCJT4",
232 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
233 "port-type": "router",
234 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
235 "port-rack": "000000.00",
239 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
240 "lgx-port-name": "LGX Back.29",
241 "lgx-port-rack": "000000.00",
242 "lgx-port-shelf": "00"
247 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
248 "port-type": "router",
249 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
250 "port-rack": "000000.00",
254 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
255 "lgx-port-name": "LGX Back.30",
256 "lgx-port-rack": "000000.00",
257 "lgx-port-shelf": "00"
262 "due-date": "2016-11-28T00:00:01Z",
263 "operator-contact": "pw1234"
266 headers = {'content-type': 'application/json',
267 "Accept": "application/json"}
268 response = requests.request(
269 "POST", url, data=json.dumps(data), headers=headers,
270 auth=('admin', 'admin'))
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 self.assertIn('Service not compliant',
274 res['output']['configuration-response-common']['response-message'])
277 #Create Service 'test' with not compliant parameter : no 'tx-direction' for serviceAEnd
278 def test_05_create_service(self):
279 url = ("{}/operations/org-openroadm-service:service-create"
280 .format(self.restconf_baseurl))
282 "sdnc-request-header": {
283 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
284 "rpc-action": "service-create",
285 "request-system-id": "appname",
286 "notification-url": "http://localhost:8585/NotificationServer/notify"
288 "service-name": "test",
289 "common-id": "ASATT1234567",
290 "connection-type": "infrastructure",
292 "service-rate": "100",
294 "service-format": "Ethernet",
295 "clli": "SNJSCAMCJP8",
298 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
299 "port-type": "router",
300 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
301 "port-rack": "000000.00",
305 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
306 "lgx-port-name": "LGX Back.4",
307 "lgx-port-rack": "000000.00",
308 "lgx-port-shelf": "00"
314 "service-rate": "100",
316 "service-format": "Ethernet",
317 "clli": "SNJSCAMCJT4",
320 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
321 "port-type": "router",
322 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
323 "port-rack": "000000.00",
327 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
328 "lgx-port-name": "LGX Back.29",
329 "lgx-port-rack": "000000.00",
330 "lgx-port-shelf": "00"
335 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
336 "port-type": "router",
337 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
338 "port-rack": "000000.00",
342 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
343 "lgx-port-name": "LGX Back.30",
344 "lgx-port-rack": "000000.00",
345 "lgx-port-shelf": "00"
350 "due-date": "2016-11-28T00:00:01Z",
351 "operator-contact": "pw1234"
354 headers = {'content-type': 'application/json',
355 "Accept": "application/json"}
356 response = requests.request(
357 "POST", url, data=json.dumps(data), headers=headers,
358 auth=('admin', 'admin'))
359 self.assertEqual(response.status_code, requests.codes.ok)
360 res = response.json()
361 self.assertIn('Service not compliant',
362 res['output']['configuration-response-common']['response-message'])
365 #Get 'test' service created
366 def test_06_get_service(self):
367 url = ("{}/operational/org-openroadm-service:service-list/services/test"
368 .format(self.restconf_baseurl))
369 headers = {'content-type': 'application/json',
370 "Accept": "application/json"}
371 response = requests.request(
372 "GET", url, headers=headers, auth=('admin', 'admin'))
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
376 res['services'][0]['administrative-state'],
380 #get non existing service
381 def test_07_get_service(self):
382 url = ("{}/operational/org-openroadm-service:service-list/services/test1"
383 .format(self.restconf_baseurl))
384 headers = {'content-type': 'application/json',
385 "Accept": "application/json"}
386 response = requests.request(
387 "GET", url, headers=headers, auth=('admin', 'admin'))
388 self.assertEqual(response.status_code, 404)
391 #reconfigure 'test' to be 'test-new'
392 def test_08_reconfigure_service(self):
393 url = ("{}/operations/org-openroadm-service:service-reconfigure"
394 .format(self.restconf_baseurl))
396 "service-name": "test",
397 "new-service-name": "test-new",
398 "common-id": "ASATT1234567",
399 "connection-type": "infrastructure",
401 "service-rate": "100",
403 "service-format": "Ethernet",
404 "clli": "SNJSCAMCJP8",
407 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
408 "port-type": "router",
409 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
410 "port-rack": "000000.00",
414 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
415 "lgx-port-name": "LGX Back.3",
416 "lgx-port-rack": "000000.00",
417 "lgx-port-shelf": "00"
422 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
423 "port-type": "router",
424 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
425 "port-rack": "000000.00",
429 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
430 "lgx-port-name": "LGX Back.4",
431 "lgx-port-rack": "000000.00",
432 "lgx-port-shelf": "00"
438 "service-rate": "100",
440 "service-format": "Ethernet",
441 "clli": "SNJSCAMCJT4",
444 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
445 "port-type": "router",
446 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
447 "port-rack": "000000.00",
451 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
452 "lgx-port-name": "LGX Back.29",
453 "lgx-port-rack": "000000.00",
454 "lgx-port-shelf": "00"
459 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
460 "port-type": "router",
461 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
462 "port-rack": "000000.00",
466 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
467 "lgx-port-name": "LGX Back.30",
468 "lgx-port-rack": "000000.00",
469 "lgx-port-shelf": "00"
474 "hard-constraints": {
476 "existing-service": [
477 "104/GE100/SNJSCAMCJP8/SNJSCAMCJT4"
479 "existing-service-applicability": {
488 "SNJSCAMCJP8_000000.00"
497 headers = {'content-type': 'application/json',
498 "Accept": "application/json"}
499 response = requests.request(
500 "POST", url, data=json.dumps(data), headers=headers,
501 auth=('admin', 'admin'))
502 self.assertEqual(response.status_code, requests.codes.ok)
503 res = response.json()
504 self.assertIn('in progress',
505 res['output']['status-message'])
509 #get new service 'test-new'
510 def test_09_get_service(self):
511 url = ("{}/operational/org-openroadm-service:service-list/services/test-new"
512 .format(self.restconf_baseurl))
513 headers = {'content-type': 'application/json',
514 "Accept": "application/json"}
515 response = requests.request(
516 "GET", url, headers=headers, auth=('admin', 'admin'))
517 res = response.json()
519 res['services'][0]['operational-state'],
524 #Modify 'test-new' state
525 def test_10_modify_service_state(self):
526 url = ("{}/operations/servicehandler:service-state-modify"
527 .format(self.restconf_baseurl))
529 "service-name": "test-new",
530 "operational-state": "outOfService"
533 headers = {'content-type': 'application/json'}
534 response = requests.request(
535 "POST", url, data=json.dumps(data), headers=headers,
536 auth=('admin', 'admin'))
537 res = response.json()
538 self.assertIn('Service state modified',
539 res['output']['configuration-response-common']['response-message'])
543 #get new service 'test-new' state
544 def test_11_get_service(self):
545 url = ("{}/operational/org-openroadm-service:service-list/services/test-new"
546 .format(self.restconf_baseurl))
547 headers = {'content-type': 'application/json',
548 "Accept": "application/json"}
549 response = requests.request(
550 "GET", url, headers=headers, auth=('admin', 'admin'))
551 res = response.json()
553 res['services'][0]['operational-state'],
558 #restore service 'test-new'
559 def test_12_restore_service(self):
560 url = ("{}/operations/org-openroadm-service:service-restoration"
561 .format(self.restconf_baseurl))
563 "service-name": "test-new",
564 "option": "permanent"
567 headers = {'content-type': 'application/json'}
568 response = requests.request(
569 "POST", url, data=json.dumps(data), headers=headers,
570 auth=('admin', 'admin'))
571 res = response.json()
572 self.assertIn('in progress',
573 res['output']['status-message'])
577 #get new service 'test-new' state
578 def test_13_get_service(self):
579 url = ("{}/operational/org-openroadm-service:service-list/services/test-new"
580 .format(self.restconf_baseurl))
581 headers = {'content-type': 'application/json',
582 "Accept": "application/json"}
583 response = requests.request(
584 "GET", url, headers=headers, auth=('admin', 'admin'))
585 res = response.json()
587 res['services'][0]['operational-state'],
592 #Delete 'test-new' service
593 def test_14_delete_service(self):
594 url = ("{}/operations/org-openroadm-service:service-delete"
595 .format(self.restconf_baseurl))
597 "sdnc-request-header": {
598 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
599 "rpc-action": "service-delete",
600 "request-system-id": "appname",
601 "notification-url": "http://localhost:8585/NotificationServer/notify"
603 "service-delete-req-info": {
604 "service-name": "test-new",
605 "due-date": "2016-11-28T00:00:01Z",
606 "tail-retention": "no"
610 headers = {'content-type': 'application/json'}
611 response = requests.request(
612 "POST", url, data=json.dumps(data), headers=headers,
613 auth=('admin', 'admin'))
614 res = response.json()
615 self.assertIn('in progress',
616 res['output']['configuration-response-common']['response-message'])
620 #Delete non existing service
621 def test_15_delete_service(self):
622 url = ("{}/operations/org-openroadm-service:service-delete"
623 .format(self.restconf_baseurl))
625 "sdnc-request-header": {
626 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
627 "rpc-action": "service-delete",
628 "request-system-id": "appname",
629 "notification-url": "http://localhost:8585/NotificationServer/notify"
631 "service-delete-req-info": {
632 "service-name": "test",
633 "due-date": "2016-11-28T00:00:01Z",
634 "tail-retention": "no"
638 headers = {'content-type': 'application/json'}
639 response = requests.request(
640 "POST", url, data=json.dumps(data), headers=headers,
641 auth=('admin', 'admin'))
642 self.assertEqual(response.status_code, requests.codes.ok)
643 res = response.json()
644 self.assertIn('not exists in datastore',
645 res['output']['configuration-response-common']['response-message'])
649 #Verify 'test' service deleted
650 def test_16_get_service(self):
651 url = ("{}/operational/org-openroadm-service:service-list/services/test-new"
652 .format(self.restconf_baseurl))
653 headers = {'content-type': 'application/json',
654 "Accept": "application/json"}
655 response = requests.request(
656 "GET", url, headers=headers, auth=('admin', 'admin'))
657 self.assertEqual(response.status_code, 404)
660 if __name__ == "__main__":
661 unittest.main(verbosity=2)