2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
24 class TransportPCEFulltesting(unittest.TestCase):
27 honeynode_process1 = None
28 honeynode_process2 = None
29 honeynode_process3 = None
30 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
32 WAITING = 20 #nominal value is 300
34 #START_IGNORE_XTESTING
38 print ("starting honeynode1...")
39 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
42 print ("starting honeynode2...")
43 cls.honeynode_process2 = test_utils.start_roadma_full_honeynode()
46 print ("starting honeynode3...")
47 cls.honeynode_process3 = test_utils.start_roadmc_full_honeynode()
50 print ("starting honeynode4...")
51 cls.honeynode_process4 = test_utils.start_xpdrc_honeynode()
53 print ("all honeynodes started")
55 print ("starting opendaylight...")
56 cls.odl_process = test_utils.start_tpce()
58 print ("opendaylight started")
61 def tearDownClass(cls):
62 for child in psutil.Process(cls.odl_process.pid).children():
63 child.send_signal(signal.SIGINT)
65 cls.odl_process.send_signal(signal.SIGINT)
66 cls.odl_process.wait()
67 for child in psutil.Process(cls.honeynode_process1.pid).children():
68 child.send_signal(signal.SIGINT)
70 cls.honeynode_process1.send_signal(signal.SIGINT)
71 cls.honeynode_process1.wait()
72 for child in psutil.Process(cls.honeynode_process2.pid).children():
73 child.send_signal(signal.SIGINT)
75 cls.honeynode_process2.send_signal(signal.SIGINT)
76 cls.honeynode_process2.wait()
77 for child in psutil.Process(cls.honeynode_process3.pid).children():
78 child.send_signal(signal.SIGINT)
80 cls.honeynode_process3.send_signal(signal.SIGINT)
81 cls.honeynode_process3.wait()
82 for child in psutil.Process(cls.honeynode_process4.pid).children():
83 child.send_signal(signal.SIGINT)
85 cls.honeynode_process4.send_signal(signal.SIGINT)
86 cls.honeynode_process4.wait()
87 print ("all processes killed")
89 def setUp(self): # instruction executed before each test method
90 print ("execution of {}".format(self.id().split(".")[-1]))
94 # connect netconf devices
95 def test_01_connect_xpdrA(self):
96 url = ("{}/config/network-topology:"
97 "network-topology/topology/topology-netconf/node/XPDRA"
98 .format(self.restconf_baseurl))
101 "netconf-node-topology:username": "admin",
102 "netconf-node-topology:password": "admin",
103 "netconf-node-topology:host": "127.0.0.1",
104 "netconf-node-topology:port": "17830",
105 "netconf-node-topology:tcp-only": "false",
106 "netconf-node-topology:pass-through": {}}]}
107 headers = {'content-type': 'application/json'}
108 response = requests.request(
109 "PUT", url, data=json.dumps(data), headers=headers,
110 auth=('admin', 'admin'))
111 self.assertEqual(response.status_code, requests.codes.created)
114 def test_02_connect_xpdrC(self):
115 url = ("{}/config/network-topology:"
116 "network-topology/topology/topology-netconf/node/XPDRC"
117 .format(self.restconf_baseurl))
120 "netconf-node-topology:username": "admin",
121 "netconf-node-topology:password": "admin",
122 "netconf-node-topology:host": "127.0.0.1",
123 "netconf-node-topology:port": "17834",
124 "netconf-node-topology:tcp-only": "false",
125 "netconf-node-topology:pass-through": {}}]}
126 headers = {'content-type': 'application/json'}
127 response = requests.request(
128 "PUT", url, data=json.dumps(data), headers=headers,
129 auth=('admin', 'admin'))
130 self.assertEqual(response.status_code, requests.codes.created)
133 def test_03_connect_rdmA(self):
134 url = ("{}/config/network-topology:"
135 "network-topology/topology/topology-netconf/node/ROADMA"
136 .format(self.restconf_baseurl))
139 "netconf-node-topology:username": "admin",
140 "netconf-node-topology:password": "admin",
141 "netconf-node-topology:host": "127.0.0.1",
142 "netconf-node-topology:port": "17821",
143 "netconf-node-topology:tcp-only": "false",
144 "netconf-node-topology:pass-through": {}}]}
145 headers = {'content-type': 'application/json'}
146 response = requests.request(
147 "PUT", url, data=json.dumps(data), headers=headers,
148 auth=('admin', 'admin'))
149 self.assertEqual(response.status_code, requests.codes.created)
152 def test_04_connect_rdmC(self):
153 url = ("{}/config/network-topology:"
154 "network-topology/topology/topology-netconf/node/ROADMC"
155 .format(self.restconf_baseurl))
158 "netconf-node-topology:username": "admin",
159 "netconf-node-topology:password": "admin",
160 "netconf-node-topology:host": "127.0.0.1",
161 "netconf-node-topology:port": "17823",
162 "netconf-node-topology:tcp-only": "false",
163 "netconf-node-topology:pass-through": {}}]}
164 headers = {'content-type': 'application/json'}
165 response = requests.request(
166 "PUT", url, data=json.dumps(data), headers=headers,
167 auth=('admin', 'admin'))
168 self.assertEqual(response.status_code, requests.codes.created)
171 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
172 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
174 "networkutils:input": {
175 "networkutils:links-input": {
176 "networkutils:xpdr-node": "XPDRA",
177 "networkutils:xpdr-num": "1",
178 "networkutils:network-num": "1",
179 "networkutils:rdm-node": "ROADMA",
180 "networkutils:srg-num": "1",
181 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
185 headers = {'content-type': 'application/json'}
186 response = requests.request(
187 "POST", url, data=json.dumps(data),
188 headers=headers, auth=('admin', 'admin'))
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
194 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
195 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
197 "networkutils:input": {
198 "networkutils:links-input": {
199 "networkutils:xpdr-node": "XPDRA",
200 "networkutils:xpdr-num": "1",
201 "networkutils:network-num": "1",
202 "networkutils:rdm-node": "ROADMA",
203 "networkutils:srg-num": "1",
204 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
208 headers = {'content-type': 'application/json'}
209 response = requests.request(
210 "POST", url, data=json.dumps(data),
211 headers=headers, auth=('admin', 'admin'))
212 self.assertEqual(response.status_code, requests.codes.ok)
213 res = response.json()
214 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
217 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
218 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
220 "networkutils:input": {
221 "networkutils:links-input": {
222 "networkutils:xpdr-node": "XPDRC",
223 "networkutils:xpdr-num": "1",
224 "networkutils:network-num": "1",
225 "networkutils:rdm-node": "ROADMC",
226 "networkutils:srg-num": "1",
227 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
231 headers = {'content-type': 'application/json'}
232 response = requests.request(
233 "POST", url, data=json.dumps(data),
234 headers=headers, auth=('admin', 'admin'))
235 self.assertEqual(response.status_code, requests.codes.ok)
236 res = response.json()
237 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
240 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
241 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
243 "networkutils:input": {
244 "networkutils:links-input": {
245 "networkutils:xpdr-node": "XPDRC",
246 "networkutils:xpdr-num": "1",
247 "networkutils:network-num": "1",
248 "networkutils:rdm-node": "ROADMC",
249 "networkutils:srg-num": "1",
250 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
254 headers = {'content-type': 'application/json'}
255 response = requests.request(
256 "POST", url, data=json.dumps(data),
257 headers=headers, auth=('admin', 'admin'))
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
263 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
264 # Config ROADMA-ROADMC oms-attributes
265 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
266 "link/ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
267 "OMS-attributes/span"
268 .format(self.restconf_baseurl))
271 "auto-spanloss": "true",
272 "spanloss-base": 11.4,
273 "spanloss-current": 12,
274 "engineered-spanloss": 12.2,
275 "link-concatenation": [{
278 "SRLG-length": 100000,
280 headers = {'content-type': 'application/json'}
281 response = requests.request(
282 "PUT", url, data=json.dumps(data), headers=headers,
283 auth=('admin', 'admin'))
284 self.assertEqual(response.status_code, requests.codes.created)
286 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
287 # Config ROADMC-ROADMA oms-attributes
288 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
289 "link/ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
290 "OMS-attributes/span"
291 .format(self.restconf_baseurl))
294 "auto-spanloss": "true",
295 "spanloss-base": 11.4,
296 "spanloss-current": 12,
297 "engineered-spanloss": 12.2,
298 "link-concatenation": [{
301 "SRLG-length": 100000,
303 headers = {'content-type': 'application/json'}
304 response = requests.request(
305 "PUT", url, data=json.dumps(data), headers=headers,
306 auth=('admin', 'admin'))
307 self.assertEqual(response.status_code, requests.codes.created)
309 #test service-create for Eth service from xpdr to xpdr
310 def test_11_create_eth_service1(self):
311 url = ("{}/operations/org-openroadm-service:service-create"
312 .format(self.restconf_baseurl))
314 "sdnc-request-header": {
315 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
316 "rpc-action": "service-create",
317 "request-system-id": "appname",
318 "notification-url": "http://localhost:8585/NotificationServer/notify"
320 "service-name": "service1",
321 "common-id": "ASATT1234567",
322 "connection-type": "service",
324 "service-rate": "100",
326 "service-format": "Ethernet",
327 "clli": "SNJSCAMCJP8",
330 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
331 "port-type": "router",
332 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
333 "port-rack": "000000.00",
337 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
338 "lgx-port-name": "LGX Back.3",
339 "lgx-port-rack": "000000.00",
340 "lgx-port-shelf": "00"
345 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
346 "port-type": "router",
347 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
348 "port-rack": "000000.00",
352 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
353 "lgx-port-name": "LGX Back.4",
354 "lgx-port-rack": "000000.00",
355 "lgx-port-shelf": "00"
361 "service-rate": "100",
363 "service-format": "Ethernet",
364 "clli": "SNJSCAMCJT4",
367 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
368 "port-type": "router",
369 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
370 "port-rack": "000000.00",
374 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
375 "lgx-port-name": "LGX Back.29",
376 "lgx-port-rack": "000000.00",
377 "lgx-port-shelf": "00"
382 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
383 "port-type": "router",
384 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
385 "port-rack": "000000.00",
389 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
390 "lgx-port-name": "LGX Back.30",
391 "lgx-port-rack": "000000.00",
392 "lgx-port-shelf": "00"
397 "due-date": "2016-11-28T00:00:01Z",
398 "operator-contact": "pw1234"
401 headers = {'content-type': 'application/json',
402 "Accept": "application/json"}
403 response = requests.request(
404 "POST", url, data=json.dumps(data), headers=headers,
405 auth=('admin', 'admin'))
406 self.assertEqual(response.status_code, requests.codes.ok)
407 res = response.json()
408 self.assertIn('PCE calculation in progress',
409 res['output']['configuration-response-common']['response-message'])
410 time.sleep(self.WAITING)
412 def test_12_get_eth_service1(self):
413 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
414 .format(self.restconf_baseurl))
415 headers = {'content-type': 'application/json',
416 "Accept": "application/json"}
417 response = requests.request(
418 "GET", url, headers=headers, auth=('admin', 'admin'))
419 self.assertEqual(response.status_code, requests.codes.ok)
420 res = response.json()
422 res['services'][0]['administrative-state'],
425 res['services'][0]['service-name'], 'service1')
427 res['services'][0]['connection-type'], 'service')
429 res['services'][0]['lifecycle-state'], 'planned')
432 def test_13_check_xc1_ROADMA(self):
433 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
434 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
435 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
436 .format(self.restconf_baseurl))
437 headers = {'content-type': 'application/json'}
438 response = requests.request(
439 "GET", url, headers=headers, auth=('admin', 'admin'))
440 self.assertEqual(response.status_code, requests.codes.ok)
441 res = response.json()
442 self.assertDictContainsSubset(
443 {'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
444 'wavelength-number': 1,
445 'opticalControlMode': 'gainLoss',
446 'target-output-power': -3.0},
447 res['roadm-connections'][0])
448 self.assertDictEqual(
449 {'src-if': 'SRG1-PP1-TXRX-1'},
450 res['roadm-connections'][0]['source'])
451 self.assertDictEqual(
452 {'dst-if': 'DEG1-TTP-TXRX-1'},
453 res['roadm-connections'][0]['destination'])
456 def test_14_check_xc1_ROADMC(self):
457 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
458 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
459 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
460 .format(self.restconf_baseurl))
461 headers = {'content-type': 'application/json'}
462 response = requests.request(
463 "GET", url, headers=headers, auth=('admin', 'admin'))
464 self.assertEqual(response.status_code, requests.codes.ok)
465 res = response.json()
466 self.assertDictContainsSubset(
467 {'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
468 'wavelength-number': 1,
469 'opticalControlMode': 'gainLoss',
470 'target-output-power': 2.0},
471 res['roadm-connections'][0])
472 self.assertDictEqual(
473 {'src-if': 'SRG1-PP1-TXRX-1'},
474 res['roadm-connections'][0]['source'])
475 self.assertDictEqual(
476 {'dst-if': 'DEG2-TTP-TXRX-1'},
477 res['roadm-connections'][0]['destination'])
480 def test_15_check_topo_XPDRA(self):
481 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA-XPDR1"
482 .format(self.restconf_baseurl))
483 response = requests.request(
484 "GET", url1, auth=('admin', 'admin'))
485 self.assertEqual(response.status_code, requests.codes.ok)
486 res = response.json()
487 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
489 if ele['tp-id'] == 'XPDR1-NETWORK1':
490 self.assertEqual({u'frequency': 196.1, u'width': 40},
491 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
492 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
493 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
494 if ele['tp-id'] == 'XPDR1-NETWORK2':
495 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
498 def test_16_check_topo_ROADMA_SRG1(self):
499 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG1"
500 .format(self.restconf_baseurl))
501 response = requests.request(
502 "GET", url1, auth=('admin', 'admin'))
503 self.assertEqual(response.status_code, requests.codes.ok)
504 res = response.json()
505 self.assertNotIn({u'index': 1},
506 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
507 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
509 if ele['tp-id'] == 'SRG1-PP1-TXRX':
510 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
511 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
512 if ele['tp-id'] == 'SRG1-PP2-TXRX':
513 self.assertNotIn('used-wavelength', dict.keys(ele))
516 def test_17_check_topo_ROADMA_DEG1(self):
517 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG1"
518 .format(self.restconf_baseurl))
519 response = requests.request(
520 "GET", url1, auth=('admin', 'admin'))
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 self.assertNotIn({u'index': 1},
524 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
525 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
527 if ele['tp-id'] == 'DEG1-CTP-TXRX':
528 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
529 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
530 if ele['tp-id'] == 'DEG1-TTP-TXRX':
531 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
532 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
535 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
536 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
538 "networkutils:input": {
539 "networkutils:links-input": {
540 "networkutils:xpdr-node": "XPDRA",
541 "networkutils:xpdr-num": "1",
542 "networkutils:network-num": "2",
543 "networkutils:rdm-node": "ROADMA",
544 "networkutils:srg-num": "1",
545 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
549 headers = {'content-type': 'application/json'}
550 response = requests.request(
551 "POST", url, data=json.dumps(data),
552 headers=headers, auth=('admin', 'admin'))
553 self.assertEqual(response.status_code, requests.codes.ok)
554 res = response.json()
555 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
559 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
560 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
562 "networkutils:input": {
563 "networkutils:links-input": {
564 "networkutils:xpdr-node": "XPDRA",
565 "networkutils:xpdr-num": "1",
566 "networkutils:network-num": "2",
567 "networkutils:rdm-node": "ROADMA",
568 "networkutils:srg-num": "1",
569 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
573 headers = {'content-type': 'application/json'}
574 response = requests.request(
575 "POST", url, data=json.dumps(data),
576 headers=headers, auth=('admin', 'admin'))
577 self.assertEqual(response.status_code, requests.codes.ok)
578 res = response.json()
579 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
583 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
584 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
586 "networkutils:input": {
587 "networkutils:links-input": {
588 "networkutils:xpdr-node": "XPDRC",
589 "networkutils:xpdr-num": "1",
590 "networkutils:network-num": "2",
591 "networkutils:rdm-node": "ROADMC",
592 "networkutils:srg-num": "1",
593 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
597 headers = {'content-type': 'application/json'}
598 response = requests.request(
599 "POST", url, data=json.dumps(data),
600 headers=headers, auth=('admin', 'admin'))
601 self.assertEqual(response.status_code, requests.codes.ok)
602 res = response.json()
603 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
607 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
608 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
610 "networkutils:input": {
611 "networkutils:links-input": {
612 "networkutils:xpdr-node": "XPDRC",
613 "networkutils:xpdr-num": "1",
614 "networkutils:network-num": "2",
615 "networkutils:rdm-node": "ROADMC",
616 "networkutils:srg-num": "1",
617 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
621 headers = {'content-type': 'application/json'}
622 response = requests.request(
623 "POST", url, data=json.dumps(data),
624 headers=headers, auth=('admin', 'admin'))
625 self.assertEqual(response.status_code, requests.codes.ok)
626 res = response.json()
627 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
630 def test_22_create_eth_service2(self):
631 url = ("{}/operations/org-openroadm-service:service-create"
632 .format(self.restconf_baseurl))
634 "sdnc-request-header": {
635 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
636 "rpc-action": "service-create",
637 "request-system-id": "appname",
638 "notification-url": "http://localhost:8585/NotificationServer/notify"
640 "service-name": "service2",
641 "common-id": "ASATT1234567",
642 "connection-type": "service",
644 "service-rate": "100",
646 "service-format": "Ethernet",
647 "clli": "SNJSCAMCJP8",
650 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
651 "port-type": "router",
652 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
653 "port-rack": "000000.00",
657 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
658 "lgx-port-name": "LGX Back.3",
659 "lgx-port-rack": "000000.00",
660 "lgx-port-shelf": "00"
665 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
666 "port-type": "router",
667 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
668 "port-rack": "000000.00",
672 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
673 "lgx-port-name": "LGX Back.4",
674 "lgx-port-rack": "000000.00",
675 "lgx-port-shelf": "00"
681 "service-rate": "100",
683 "service-format": "Ethernet",
684 "clli": "SNJSCAMCJT4",
687 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
688 "port-type": "router",
689 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
690 "port-rack": "000000.00",
694 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
695 "lgx-port-name": "LGX Back.29",
696 "lgx-port-rack": "000000.00",
697 "lgx-port-shelf": "00"
702 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
703 "port-type": "router",
704 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
705 "port-rack": "000000.00",
709 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
710 "lgx-port-name": "LGX Back.30",
711 "lgx-port-rack": "000000.00",
712 "lgx-port-shelf": "00"
717 "due-date": "2016-11-28T00:00:01Z",
718 "operator-contact": "pw1234"
721 headers = {'content-type': 'application/json',
722 "Accept": "application/json"}
723 response = requests.request(
724 "POST", url, data=json.dumps(data), headers=headers,
725 auth=('admin', 'admin'))
726 self.assertEqual(response.status_code, requests.codes.ok)
727 res = response.json()
728 self.assertIn('PCE calculation in progress',
729 res['output']['configuration-response-common']['response-message'])
730 time.sleep(self.WAITING)
732 def test_23_get_eth_service2(self):
733 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
734 .format(self.restconf_baseurl))
735 headers = {'content-type': 'application/json',
736 "Accept": "application/json"}
737 response = requests.request(
738 "GET", url, headers=headers, auth=('admin', 'admin'))
739 self.assertEqual(response.status_code, requests.codes.ok)
740 res = response.json()
742 res['services'][0]['administrative-state'],
745 res['services'][0]['service-name'], 'service2')
747 res['services'][0]['connection-type'], 'service')
749 res['services'][0]['lifecycle-state'], 'planned')
752 def test_24_check_xc2_ROADMA(self):
753 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
754 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
755 "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2"
756 .format(self.restconf_baseurl))
757 headers = {'content-type': 'application/json'}
758 response = requests.request(
759 "GET", url, headers=headers, auth=('admin', 'admin'))
760 self.assertEqual(response.status_code, requests.codes.ok)
761 res = response.json()
762 self.assertDictContainsSubset(
763 {'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
764 'wavelength-number': 2,
765 'opticalControlMode': 'power'},
766 res['roadm-connections'][0])
767 self.assertDictEqual(
768 {'src-if': 'DEG1-TTP-TXRX-2'},
769 res['roadm-connections'][0]['source'])
770 self.assertDictEqual(
771 {'dst-if': 'SRG1-PP2-TXRX-2'},
772 res['roadm-connections'][0]['destination'])
774 def test_25_check_topo_XPDRA(self):
775 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA-XPDR1"
776 .format(self.restconf_baseurl))
777 response = requests.request(
778 "GET", url1, auth=('admin', 'admin'))
779 self.assertEqual(response.status_code, requests.codes.ok)
780 res = response.json()
781 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
783 if ele['tp-id'] == 'XPDR1-NETWORK1':
784 self.assertEqual({u'frequency': 196.1, u'width': 40},
785 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
786 if ele['tp-id'] == 'XPDR1-NETWORK2':
787 self.assertEqual({u'frequency': 196.05, u'width': 40},
788 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
789 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
790 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
793 def test_26_check_topo_ROADMA_SRG1(self):
794 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG1"
795 .format(self.restconf_baseurl))
796 response = requests.request(
797 "GET", url1, auth=('admin', 'admin'))
798 self.assertEqual(response.status_code, requests.codes.ok)
799 res = response.json()
800 self.assertNotIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
801 self.assertNotIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
802 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
804 if ele['tp-id'] == 'SRG1-PP1-TXRX':
805 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
806 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
807 self.assertNotIn({u'index': 2, u'frequency': 196.05, u'width': 40},
808 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
809 if ele['tp-id'] == 'SRG1-PP2-TXRX':
810 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
811 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
812 self.assertNotIn({u'index': 1, u'frequency': 196.1, u'width': 40},
813 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
814 if ele['tp-id'] == 'SRG1-PP3-TXRX':
815 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
819 def test_27_check_topo_ROADMA_DEG1(self):
820 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG1"
821 .format(self.restconf_baseurl))
822 response = requests.request(
823 "GET", url1, auth=('admin', 'admin'))
824 self.assertEqual(response.status_code, requests.codes.ok)
825 res = response.json()
826 self.assertNotIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
827 self.assertNotIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
828 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
830 if ele['tp-id'] == 'DEG1-CTP-TXRX':
831 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
832 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
833 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
834 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
835 if ele['tp-id'] == 'DEG1-TTP-TXRX':
836 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
837 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
838 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
839 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
842 # creation service test on a non-available resource
843 def test_28_create_eth_service3(self):
844 url = ("{}/operations/org-openroadm-service:service-create"
845 .format(self.restconf_baseurl))
847 "sdnc-request-header": {
848 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
849 "rpc-action": "service-create",
850 "request-system-id": "appname",
851 "notification-url": "http://localhost:8585/NotificationServer/notify"
853 "service-name": "service3",
854 "common-id": "ASATT1234567",
855 "connection-type": "service",
857 "service-rate": "100",
859 "service-format": "Ethernet",
860 "clli": "SNJSCAMCJP8",
863 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
864 "port-type": "router",
865 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
866 "port-rack": "000000.00",
870 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
871 "lgx-port-name": "LGX Back.3",
872 "lgx-port-rack": "000000.00",
873 "lgx-port-shelf": "00"
878 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
879 "port-type": "router",
880 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
881 "port-rack": "000000.00",
885 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
886 "lgx-port-name": "LGX Back.4",
887 "lgx-port-rack": "000000.00",
888 "lgx-port-shelf": "00"
894 "service-rate": "100",
896 "service-format": "Ethernet",
897 "clli": "SNJSCAMCJT4",
900 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
901 "port-type": "router",
902 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
903 "port-rack": "000000.00",
907 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
908 "lgx-port-name": "LGX Back.29",
909 "lgx-port-rack": "000000.00",
910 "lgx-port-shelf": "00"
915 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
916 "port-type": "router",
917 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
918 "port-rack": "000000.00",
922 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
923 "lgx-port-name": "LGX Back.30",
924 "lgx-port-rack": "000000.00",
925 "lgx-port-shelf": "00"
930 "due-date": "2016-11-28T00:00:01Z",
931 "operator-contact": "pw1234"
934 headers = {'content-type': 'application/json',
935 "Accept": "application/json"}
936 response = requests.request(
937 "POST", url, data=json.dumps(data), headers=headers,
938 auth=('admin', 'admin'))
939 self.assertEqual(response.status_code, requests.codes.ok)
940 res = response.json()
941 self.assertIn('PCE calculation in progress',
942 res['output']['configuration-response-common']['response-message'])
943 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
944 time.sleep(self.WAITING)
946 # add a test that check the openroadm-service-list still only contains 2 elements
948 def test_29_delete_eth_service3(self):
949 url = ("{}/operations/org-openroadm-service:service-delete"
950 .format(self.restconf_baseurl))
952 "sdnc-request-header": {
953 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
954 "rpc-action": "service-delete",
955 "request-system-id": "appname",
956 "notification-url": "http://localhost:8585/NotificationServer/notify"
958 "service-delete-req-info": {
959 "service-name": "service3",
960 "tail-retention": "no"
964 headers = {'content-type': 'application/json'}
965 response = requests.request(
966 "POST", url, data=json.dumps(data), headers=headers,
967 auth=('admin', 'admin'))
968 self.assertEqual(response.status_code, requests.codes.ok)
969 res = response.json()
970 self.assertIn('Service \'service3\' does not exist in datastore',
971 res['output']['configuration-response-common']['response-message'])
972 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
975 def test_30_delete_eth_service1(self):
976 url = ("{}/operations/org-openroadm-service:service-delete"
977 .format(self.restconf_baseurl))
979 "sdnc-request-header": {
980 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
981 "rpc-action": "service-delete",
982 "request-system-id": "appname",
983 "notification-url": "http://localhost:8585/NotificationServer/notify"
985 "service-delete-req-info": {
986 "service-name": "service1",
987 "tail-retention": "no"
991 headers = {'content-type': 'application/json'}
992 response = requests.request(
993 "POST", url, data=json.dumps(data), headers=headers,
994 auth=('admin', 'admin'))
995 self.assertEqual(response.status_code, requests.codes.ok)
996 res = response.json()
997 self.assertIn('Renderer service delete in progress',
998 res['output']['configuration-response-common']['response-message'])
1001 def test_31_delete_eth_service2(self):
1002 url = ("{}/operations/org-openroadm-service:service-delete"
1003 .format(self.restconf_baseurl))
1005 "sdnc-request-header": {
1006 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1007 "rpc-action": "service-delete",
1008 "request-system-id": "appname",
1009 "notification-url": "http://localhost:8585/NotificationServer/notify"
1011 "service-delete-req-info": {
1012 "service-name": "service2",
1013 "tail-retention": "no"
1017 headers = {'content-type': 'application/json'}
1018 response = requests.request(
1019 "POST", url, data=json.dumps(data), headers=headers,
1020 auth=('admin', 'admin'))
1021 self.assertEqual(response.status_code, requests.codes.ok)
1022 res = response.json()
1023 self.assertIn('Renderer service delete in progress',
1024 res['output']['configuration-response-common']['response-message'])
1027 def test_32_check_no_xc_ROADMA(self):
1028 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1029 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1030 .format(self.restconf_baseurl))
1031 response = requests.request(
1032 "GET", url, auth=('admin', 'admin'))
1033 res = response.json()
1034 self.assertEqual(response.status_code, requests.codes.ok)
1035 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
1038 def test_33_check_topo_XPDRA(self):
1039 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA-XPDR1"
1040 .format(self.restconf_baseurl))
1041 response = requests.request(
1042 "GET", url1, auth=('admin', 'admin'))
1043 self.assertEqual(response.status_code, requests.codes.ok)
1044 res = response.json()
1045 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1046 for ele in liste_tp:
1047 if ((ele[u'org-openroadm-network-topology:tp-type'] == 'XPONDER-CLIENT')
1048 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3')):
1049 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
1050 elif ((ele[u'org-openroadm-network-topology:tp-type'] == 'XPONDER-NETWORK')
1051 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3')):
1052 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
1055 def test_34_check_topo_ROADMA_SRG1(self):
1056 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG1"
1057 .format(self.restconf_baseurl))
1058 response = requests.request(
1059 "GET", url1, auth=('admin', 'admin'))
1060 self.assertEqual(response.status_code, requests.codes.ok)
1061 res = response.json()
1062 self.assertIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1063 self.assertIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1064 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1065 for ele in liste_tp:
1066 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
1067 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1069 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1072 def test_35_check_topo_ROADMA_DEG1(self):
1073 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG1"
1074 .format(self.restconf_baseurl))
1075 response = requests.request(
1076 "GET", url1, auth=('admin', 'admin'))
1077 self.assertEqual(response.status_code, requests.codes.ok)
1078 res = response.json()
1079 self.assertIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1080 self.assertIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1081 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1082 for ele in liste_tp:
1083 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1084 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
1085 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1086 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
1090 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1091 def test_36_create_oc_service1(self):
1092 url = ("{}/operations/org-openroadm-service:service-create"
1093 .format(self.restconf_baseurl))
1095 "sdnc-request-header": {
1096 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1097 "rpc-action": "service-create",
1098 "request-system-id": "appname",
1099 "notification-url": "http://localhost:8585/NotificationServer/notify"
1101 "service-name": "service1",
1102 "common-id": "ASATT1234567",
1103 "connection-type": "roadm-line",
1105 "service-rate": "100",
1106 "node-id": "ROADMA",
1107 "service-format": "OC",
1108 "clli": "SNJSCAMCJP8",
1111 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1112 "port-type": "router",
1113 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1114 "port-rack": "000000.00",
1118 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1119 "lgx-port-name": "LGX Back.3",
1120 "lgx-port-rack": "000000.00",
1121 "lgx-port-shelf": "00"
1126 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1127 "port-type": "router",
1128 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1129 "port-rack": "000000.00",
1133 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1134 "lgx-port-name": "LGX Back.4",
1135 "lgx-port-rack": "000000.00",
1136 "lgx-port-shelf": "00"
1139 "optic-type": "gray"
1142 "service-rate": "100",
1143 "node-id": "ROADMC",
1144 "service-format": "OC",
1145 "clli": "SNJSCAMCJT4",
1148 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1149 "port-type": "router",
1150 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1151 "port-rack": "000000.00",
1155 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1156 "lgx-port-name": "LGX Back.29",
1157 "lgx-port-rack": "000000.00",
1158 "lgx-port-shelf": "00"
1163 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1164 "port-type": "router",
1165 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1166 "port-rack": "000000.00",
1170 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1171 "lgx-port-name": "LGX Back.30",
1172 "lgx-port-rack": "000000.00",
1173 "lgx-port-shelf": "00"
1176 "optic-type": "gray"
1178 "due-date": "2016-11-28T00:00:01Z",
1179 "operator-contact": "pw1234"
1182 headers = {'content-type': 'application/json',
1183 "Accept": "application/json"}
1184 response = requests.request(
1185 "POST", url, data=json.dumps(data), headers=headers,
1186 auth=('admin', 'admin'))
1187 self.assertEqual(response.status_code, requests.codes.ok)
1188 res = response.json()
1189 self.assertIn('PCE calculation in progress',
1190 res['output']['configuration-response-common']['response-message'])
1191 time.sleep(self.WAITING)
1193 def test_37_get_oc_service1(self):
1194 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1195 .format(self.restconf_baseurl))
1196 headers = {'content-type': 'application/json',
1197 "Accept": "application/json"}
1198 response = requests.request(
1199 "GET", url, headers=headers, auth=('admin', 'admin'))
1200 self.assertEqual(response.status_code, requests.codes.ok)
1201 res = response.json()
1203 res['services'][0]['administrative-state'],
1206 res['services'][0]['service-name'], 'service1')
1208 res['services'][0]['connection-type'], 'roadm-line')
1210 res['services'][0]['lifecycle-state'], 'planned')
1213 def test_38_check_xc1_ROADMA(self):
1214 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1215 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1216 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1217 .format(self.restconf_baseurl))
1218 headers = {'content-type': 'application/json'}
1219 response = requests.request(
1220 "GET", url, headers=headers, auth=('admin', 'admin'))
1221 self.assertEqual(response.status_code, requests.codes.ok)
1222 res = response.json()
1223 self.assertDictContainsSubset(
1224 {'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1225 'wavelength-number': 1,
1226 'opticalControlMode': 'gainLoss',
1227 'target-output-power': -3.0},
1228 res['roadm-connections'][0])
1229 self.assertDictEqual(
1230 {'src-if': 'SRG1-PP1-TXRX-1'},
1231 res['roadm-connections'][0]['source'])
1232 self.assertDictEqual(
1233 {'dst-if': 'DEG1-TTP-TXRX-1'},
1234 res['roadm-connections'][0]['destination'])
1237 def test_39_check_xc1_ROADMC(self):
1238 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1239 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1240 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1241 .format(self.restconf_baseurl))
1242 headers = {'content-type': 'application/json'}
1243 response = requests.request(
1244 "GET", url, headers=headers, auth=('admin', 'admin'))
1245 self.assertEqual(response.status_code, requests.codes.ok)
1246 res = response.json()
1247 self.assertDictContainsSubset(
1248 {'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1249 'wavelength-number': 1,
1250 'opticalControlMode': 'gainLoss',
1251 'target-output-power': 2.0},
1252 res['roadm-connections'][0])
1253 self.assertDictEqual(
1254 {'src-if': 'SRG1-PP1-TXRX-1'},
1255 res['roadm-connections'][0]['source'])
1256 self.assertDictEqual(
1257 {'dst-if': 'DEG2-TTP-TXRX-1'},
1258 res['roadm-connections'][0]['destination'])
1261 def test_40_create_oc_service2(self):
1262 url = ("{}/operations/org-openroadm-service:service-create"
1263 .format(self.restconf_baseurl))
1265 "sdnc-request-header": {
1266 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1267 "rpc-action": "service-create",
1268 "request-system-id": "appname",
1269 "notification-url": "http://localhost:8585/NotificationServer/notify"
1271 "service-name": "service2",
1272 "common-id": "ASATT1234567",
1273 "connection-type": "roadm-line",
1275 "service-rate": "100",
1276 "node-id": "ROADMA",
1277 "service-format": "OC",
1278 "clli": "SNJSCAMCJP8",
1281 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1282 "port-type": "router",
1283 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1284 "port-rack": "000000.00",
1288 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1289 "lgx-port-name": "LGX Back.3",
1290 "lgx-port-rack": "000000.00",
1291 "lgx-port-shelf": "00"
1296 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1297 "port-type": "router",
1298 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1299 "port-rack": "000000.00",
1303 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1304 "lgx-port-name": "LGX Back.4",
1305 "lgx-port-rack": "000000.00",
1306 "lgx-port-shelf": "00"
1309 "optic-type": "gray"
1312 "service-rate": "100",
1313 "node-id": "ROADMC",
1314 "service-format": "OC",
1315 "clli": "SNJSCAMCJT4",
1318 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1319 "port-type": "router",
1320 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1321 "port-rack": "000000.00",
1325 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1326 "lgx-port-name": "LGX Back.29",
1327 "lgx-port-rack": "000000.00",
1328 "lgx-port-shelf": "00"
1333 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1334 "port-type": "router",
1335 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1336 "port-rack": "000000.00",
1340 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1341 "lgx-port-name": "LGX Back.30",
1342 "lgx-port-rack": "000000.00",
1343 "lgx-port-shelf": "00"
1346 "optic-type": "gray"
1348 "due-date": "2016-11-28T00:00:01Z",
1349 "operator-contact": "pw1234"
1352 headers = {'content-type': 'application/json',
1353 "Accept": "application/json"}
1354 response = requests.request(
1355 "POST", url, data=json.dumps(data), headers=headers,
1356 auth=('admin', 'admin'))
1357 self.assertEqual(response.status_code, requests.codes.ok)
1358 res = response.json()
1359 self.assertIn('PCE calculation in progress',
1360 res['output']['configuration-response-common']['response-message'])
1361 time.sleep(self.WAITING)
1363 def test_41_get_oc_service2(self):
1364 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1365 .format(self.restconf_baseurl))
1366 headers = {'content-type': 'application/json',
1367 "Accept": "application/json"}
1368 response = requests.request(
1369 "GET", url, headers=headers, auth=('admin', 'admin'))
1370 self.assertEqual(response.status_code, requests.codes.ok)
1371 res = response.json()
1373 res['services'][0]['administrative-state'],
1376 res['services'][0]['service-name'], 'service2')
1378 res['services'][0]['connection-type'], 'roadm-line')
1380 res['services'][0]['lifecycle-state'], 'planned')
1383 def test_42_check_xc2_ROADMA(self):
1384 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1385 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1386 "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2"
1387 .format(self.restconf_baseurl))
1388 headers = {'content-type': 'application/json'}
1389 response = requests.request(
1390 "GET", url, headers=headers, auth=('admin', 'admin'))
1391 self.assertEqual(response.status_code, requests.codes.ok)
1392 res = response.json()
1393 self.assertDictContainsSubset(
1394 {'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
1395 'wavelength-number': 2,
1396 'opticalControlMode': 'gainLoss',
1397 'target-output-power': -3.0},
1398 res['roadm-connections'][0])
1399 self.assertDictEqual(
1400 {'src-if': 'SRG1-PP2-TXRX-2'},
1401 res['roadm-connections'][0]['source'])
1402 self.assertDictEqual(
1403 {'dst-if': 'DEG1-TTP-TXRX-2'},
1404 res['roadm-connections'][0]['destination'])
1407 def test_43_check_topo_ROADMA(self):
1408 self.test_26_check_topo_ROADMA_SRG1()
1409 self.test_27_check_topo_ROADMA_DEG1()
1413 def test_44_delete_oc_service1(self):
1414 url = ("{}/operations/org-openroadm-service:service-delete"
1415 .format(self.restconf_baseurl))
1417 "sdnc-request-header": {
1418 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1419 "rpc-action": "service-delete",
1420 "request-system-id": "appname",
1421 "notification-url": "http://localhost:8585/NotificationServer/notify"
1423 "service-delete-req-info": {
1424 "service-name": "service1",
1425 "tail-retention": "no"
1429 headers = {'content-type': 'application/json'}
1430 response = requests.request(
1431 "POST", url, data=json.dumps(data), headers=headers,
1432 auth=('admin', 'admin'))
1433 self.assertEqual(response.status_code, requests.codes.ok)
1434 res = response.json()
1435 self.assertIn('Renderer service delete in progress',
1436 res['output']['configuration-response-common']['response-message'])
1439 def test_45_delete_oc_service2(self):
1440 url = ("{}/operations/org-openroadm-service:service-delete"
1441 .format(self.restconf_baseurl))
1443 "sdnc-request-header": {
1444 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1445 "rpc-action": "service-delete",
1446 "request-system-id": "appname",
1447 "notification-url": "http://localhost:8585/NotificationServer/notify"
1449 "service-delete-req-info": {
1450 "service-name": "service2",
1451 "tail-retention": "no"
1455 headers = {'content-type': 'application/json'}
1456 response = requests.request(
1457 "POST", url, data=json.dumps(data), headers=headers,
1458 auth=('admin', 'admin'))
1459 self.assertEqual(response.status_code, requests.codes.ok)
1460 res = response.json()
1461 self.assertIn('Renderer service delete in progress',
1462 res['output']['configuration-response-common']['response-message'])
1465 def test_46_get_no_oc_services(self):
1466 print ("start test")
1467 url = ("{}/operational/org-openroadm-service:service-list"
1468 .format(self.restconf_baseurl))
1469 headers = {'content-type': 'application/json',
1470 "Accept": "application/json"}
1471 response = requests.request(
1472 "GET", url, headers=headers, auth=('admin', 'admin'))
1473 self.assertEqual(response.status_code, requests.codes.not_found)
1474 res = response.json()
1476 {"error-type":"application", "error-tag":"data-missing",
1477 "error-message":"Request could not be completed because the relevant data model content does not exist"},
1478 res['errors']['error'])
1481 def test_47_get_no_xc_ROADMA(self):
1482 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1483 "/node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1484 .format(self.restconf_baseurl))
1485 headers = {'content-type': 'application/json',
1486 "Accept": "application/json"}
1487 response = requests.request(
1488 "GET", url, headers=headers, auth=('admin', 'admin'))
1489 self.assertEqual(response.status_code, requests.codes.ok)
1490 res = response.json()
1491 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1494 def test_48_check_topo_ROADMA(self):
1495 self.test_34_check_topo_ROADMA_SRG1()
1496 self.test_35_check_topo_ROADMA_DEG1()
1498 def test_49_loop_create_eth_service(self):
1499 for i in range(1,6):
1500 print ("trial number {}".format(i))
1501 print("eth service creation")
1502 self.test_11_create_eth_service1()
1503 print ("check xc in ROADMA")
1504 self.test_13_check_xc1_ROADMA()
1505 print ("check xc in ROADMC")
1506 self.test_14_check_xc1_ROADMC()
1507 print ("eth service deletion\n")
1508 self.test_30_delete_eth_service1()
1510 def test_50_loop_create_oc_service(self):
1511 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1512 .format(self.restconf_baseurl))
1513 response = requests.request("GET", url, auth=('admin', 'admin'))
1514 if response.status_code != 404:
1515 url = ("{}/operations/org-openroadm-service:service-delete"
1516 .format(self.restconf_baseurl))
1518 "sdnc-request-header": {
1519 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1520 "rpc-action": "service-delete",
1521 "request-system-id": "appname",
1522 "notification-url": "http://localhost:8585/NotificationServer/notify"
1524 "service-delete-req-info": {
1525 "service-name": "service1",
1526 "tail-retention": "no"
1530 headers = {'content-type': 'application/json'}
1531 requests.request("POST", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin'))
1534 for i in range(1,6):
1535 print ("trial number {}".format(i))
1536 print("oc service creation")
1537 self.test_36_create_oc_service1()
1538 print ("check xc in ROADMA")
1539 self.test_38_check_xc1_ROADMA()
1540 print ("check xc in ROADMC")
1541 self.test_39_check_xc1_ROADMC()
1542 print ("oc service deletion\n")
1543 self.test_44_delete_oc_service1()
1546 def test_51_disconnect_XPDRA(self):
1547 url = ("{}/config/network-topology:"
1548 "network-topology/topology/topology-netconf/node/XPDRA"
1549 .format(self.restconf_baseurl))
1550 headers = {'content-type': 'application/json'}
1551 response = requests.request(
1552 "DELETE", url, headers=headers,
1553 auth=('admin', 'admin'))
1554 self.assertEqual(response.status_code, requests.codes.ok)
1557 def test_52_disconnect_XPDRC(self):
1558 url = ("{}/config/network-topology:"
1559 "network-topology/topology/topology-netconf/node/XPDRC"
1560 .format(self.restconf_baseurl))
1561 headers = {'content-type': 'application/json'}
1562 response = requests.request(
1563 "DELETE", url, headers=headers,
1564 auth=('admin', 'admin'))
1565 self.assertEqual(response.status_code, requests.codes.ok)
1568 def test_53_disconnect_ROADMA(self):
1569 url = ("{}/config/network-topology:"
1570 "network-topology/topology/topology-netconf/node/ROADMA"
1571 .format(self.restconf_baseurl))
1572 headers = {'content-type': 'application/json'}
1573 response = requests.request(
1574 "DELETE", url, headers=headers,
1575 auth=('admin', 'admin'))
1576 self.assertEqual(response.status_code, requests.codes.ok)
1579 def test_54_disconnect_ROADMC(self):
1580 url = ("{}/config/network-topology:"
1581 "network-topology/topology/topology-netconf/node/ROADMC"
1582 .format(self.restconf_baseurl))
1583 headers = {'content-type': 'application/json'}
1584 response = requests.request(
1585 "DELETE", url, headers=headers,
1586 auth=('admin', 'admin'))
1587 self.assertEqual(response.status_code, requests.codes.ok)
1591 if __name__ == "__main__":
1592 unittest.main(verbosity=2)