2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
23 class TransportPCEFulltesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
30 restconf_baseurl = "http://localhost:8181/restconf"
32 #START_IGNORE_XTESTING
35 def __start_honeynode1(cls):
36 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
37 "/honeynode-distribution-1.18.01/honeycomb-tpce")
38 if os.path.isfile(executable):
39 with open('honeynode1.log', 'w') as outfile:
40 cls.honeynode_process1 = subprocess.Popen(
41 [executable, "17840", "sample_configs/openroadm/2.1/oper-ROADMA-full.xml"],
45 def __start_honeynode2(cls):
46 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
47 "/honeynode-distribution-1.18.01/honeycomb-tpce")
48 if os.path.isfile(executable):
49 with open('honeynode2.log', 'w') as outfile:
50 cls.honeynode_process2 = subprocess.Popen(
51 [executable, "17831", "sample_configs/openroadm/2.1/oper-XPDRA.xml"],
55 def __start_honeynode3(cls):
56 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
57 "/honeynode-distribution-1.18.01/honeycomb-tpce")
58 if os.path.isfile(executable):
59 with open('honeynode3.log', 'w') as outfile:
60 cls.honeynode_process3 = subprocess.Popen(
61 [executable, "17843", "sample_configs/openroadm/2.1/oper-ROADMC-full.xml"],
65 def __start_honeynode4(cls):
66 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
67 "/honeynode-distribution-1.18.01/honeycomb-tpce")
68 if os.path.isfile(executable):
69 with open('honeynode4.log', 'w') as outfile:
70 cls.honeynode_process4 = subprocess.Popen(
71 [executable, "17834", "sample_configs/openroadm/2.1/oper-XPDRC.xml"],
76 executable = "../karaf/target/assembly/bin/karaf"
77 with open('odl.log', 'w') as outfile:
78 cls.odl_process = subprocess.Popen(
79 ["bash", executable, "server"], stdout=outfile,
80 stdin=open(os.devnull))
84 print ("starting honeynode1")
85 cls.__start_honeynode1()
87 print ("starting honeynode2")
88 cls.__start_honeynode2()
90 print ("starting honeynode3")
91 cls.__start_honeynode3()
93 print ("starting honeynode4")
94 cls.__start_honeynode4()
96 print ("starting opendaylight")
101 def tearDownClass(cls):
102 for child in psutil.Process(cls.odl_process.pid).children():
103 child.send_signal(signal.SIGINT)
105 cls.odl_process.send_signal(signal.SIGINT)
106 cls.odl_process.wait()
107 for child in psutil.Process(cls.honeynode_process1.pid).children():
108 child.send_signal(signal.SIGINT)
110 cls.honeynode_process1.send_signal(signal.SIGINT)
111 cls.honeynode_process1.wait()
112 for child in psutil.Process(cls.honeynode_process2.pid).children():
113 child.send_signal(signal.SIGINT)
115 cls.honeynode_process2.send_signal(signal.SIGINT)
116 cls.honeynode_process2.wait()
117 for child in psutil.Process(cls.honeynode_process3.pid).children():
118 child.send_signal(signal.SIGINT)
120 cls.honeynode_process3.send_signal(signal.SIGINT)
121 cls.honeynode_process3.wait()
122 for child in psutil.Process(cls.honeynode_process4.pid).children():
123 child.send_signal(signal.SIGINT)
125 cls.honeynode_process4.send_signal(signal.SIGINT)
126 cls.honeynode_process4.wait()
127 print ("all processes killed")
129 def setUp(self): # instruction executed before each test method
130 print ("execution of {}".format(self.id().split(".")[-1]))
134 # connect netconf devices
135 def test_01_connect_xpdrA(self):
136 url = ("{}/config/network-topology:"
137 "network-topology/topology/topology-netconf/node/XPDRA"
138 .format(self.restconf_baseurl))
141 "netconf-node-topology:username": "admin",
142 "netconf-node-topology:password": "admin",
143 "netconf-node-topology:host": "127.0.0.1",
144 "netconf-node-topology:port": "17831",
145 "netconf-node-topology:tcp-only": "false",
146 "netconf-node-topology:pass-through": {}}]}
147 headers = {'content-type': 'application/json'}
148 response = requests.request(
149 "PUT", url, data=json.dumps(data), headers=headers,
150 auth=('admin', 'admin'))
151 self.assertEqual(response.status_code, requests.codes.created)
154 def test_02_connect_xpdrC(self):
155 url = ("{}/config/network-topology:"
156 "network-topology/topology/topology-netconf/node/XPDRC"
157 .format(self.restconf_baseurl))
160 "netconf-node-topology:username": "admin",
161 "netconf-node-topology:password": "admin",
162 "netconf-node-topology:host": "127.0.0.1",
163 "netconf-node-topology:port": "17834",
164 "netconf-node-topology:tcp-only": "false",
165 "netconf-node-topology:pass-through": {}}]}
166 headers = {'content-type': 'application/json'}
167 response = requests.request(
168 "PUT", url, data=json.dumps(data), headers=headers,
169 auth=('admin', 'admin'))
170 self.assertEqual(response.status_code, requests.codes.created)
173 def test_03_connect_rdmA(self):
174 url = ("{}/config/network-topology:"
175 "network-topology/topology/topology-netconf/node/ROADMA"
176 .format(self.restconf_baseurl))
179 "netconf-node-topology:username": "admin",
180 "netconf-node-topology:password": "admin",
181 "netconf-node-topology:host": "127.0.0.1",
182 "netconf-node-topology:port": "17840",
183 "netconf-node-topology:tcp-only": "false",
184 "netconf-node-topology:pass-through": {}}]}
185 headers = {'content-type': 'application/json'}
186 response = requests.request(
187 "PUT", url, data=json.dumps(data), headers=headers,
188 auth=('admin', 'admin'))
189 self.assertEqual(response.status_code, requests.codes.created)
192 def test_04_connect_rdmC(self):
193 url = ("{}/config/network-topology:"
194 "network-topology/topology/topology-netconf/node/ROADMC"
195 .format(self.restconf_baseurl))
198 "netconf-node-topology:username": "admin",
199 "netconf-node-topology:password": "admin",
200 "netconf-node-topology:host": "127.0.0.1",
201 "netconf-node-topology:port": "17843",
202 "netconf-node-topology:tcp-only": "false",
203 "netconf-node-topology:pass-through": {}}]}
204 headers = {'content-type': 'application/json'}
205 response = requests.request(
206 "PUT", url, data=json.dumps(data), headers=headers,
207 auth=('admin', 'admin'))
208 self.assertEqual(response.status_code, requests.codes.created)
211 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
212 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
214 "networkutils:input": {
215 "networkutils:links-input": {
216 "networkutils:xpdr-node": "XPDRA",
217 "networkutils:xpdr-num": "1",
218 "networkutils:network-num": "1",
219 "networkutils:rdm-node": "ROADMA",
220 "networkutils:srg-num": "1",
221 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
225 headers = {'content-type': 'application/json'}
226 response = requests.request(
227 "POST", url, data=json.dumps(data),
228 headers=headers, auth=('admin', 'admin'))
229 self.assertEqual(response.status_code, requests.codes.ok)
230 res = response.json()
231 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
234 def test_06_connect_xprdA_N2_to_roadmA_PP2(self):
235 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
237 "networkutils:input": {
238 "networkutils:links-input": {
239 "networkutils:xpdr-node": "XPDRA",
240 "networkutils:xpdr-num": "1",
241 "networkutils:network-num": "2",
242 "networkutils:rdm-node": "ROADMA",
243 "networkutils:srg-num": "1",
244 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
248 headers = {'content-type': 'application/json'}
249 response = requests.request(
250 "POST", url, data=json.dumps(data),
251 headers=headers, auth=('admin', 'admin'))
252 self.assertEqual(response.status_code, requests.codes.ok)
253 res = response.json()
254 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
257 def test_07_connect_roadmA_PP1_to_xpdrA_N1(self):
258 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
260 "networkutils:input": {
261 "networkutils:links-input": {
262 "networkutils:xpdr-node": "XPDRA",
263 "networkutils:xpdr-num": "1",
264 "networkutils:network-num": "1",
265 "networkutils:rdm-node": "ROADMA",
266 "networkutils:srg-num": "1",
267 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
271 headers = {'content-type': 'application/json'}
272 response = requests.request(
273 "POST", url, data=json.dumps(data),
274 headers=headers, auth=('admin', 'admin'))
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
277 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
280 def test_08_connect_roadmA_PP2_to_xpdrA_N2(self):
281 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
283 "networkutils:input": {
284 "networkutils:links-input": {
285 "networkutils:xpdr-node": "XPDRA",
286 "networkutils:xpdr-num": "1",
287 "networkutils:network-num": "2",
288 "networkutils:rdm-node": "ROADMA",
289 "networkutils:srg-num": "1",
290 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
294 headers = {'content-type': 'application/json'}
295 response = requests.request(
296 "POST", url, data=json.dumps(data),
297 headers=headers, auth=('admin', 'admin'))
298 self.assertEqual(response.status_code, requests.codes.ok)
299 res = response.json()
300 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
303 def test_09_connect_xprdC_N1_to_roadmC_PP1(self):
304 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
306 "networkutils:input": {
307 "networkutils:links-input": {
308 "networkutils:xpdr-node": "XPDRC",
309 "networkutils:xpdr-num": "1",
310 "networkutils:network-num": "1",
311 "networkutils:rdm-node": "ROADMC",
312 "networkutils:srg-num": "1",
313 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
317 headers = {'content-type': 'application/json'}
318 response = requests.request(
319 "POST", url, data=json.dumps(data),
320 headers=headers, auth=('admin', 'admin'))
321 self.assertEqual(response.status_code, requests.codes.ok)
322 res = response.json()
323 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
326 def test_10_connect_xprdC_N2_to_roadmC_PP2(self):
327 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
329 "networkutils:input": {
330 "networkutils:links-input": {
331 "networkutils:xpdr-node": "XPDRC",
332 "networkutils:xpdr-num": "1",
333 "networkutils:network-num": "2",
334 "networkutils:rdm-node": "ROADMC",
335 "networkutils:srg-num": "1",
336 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
340 headers = {'content-type': 'application/json'}
341 response = requests.request(
342 "POST", url, data=json.dumps(data),
343 headers=headers, auth=('admin', 'admin'))
344 self.assertEqual(response.status_code, requests.codes.ok)
345 res = response.json()
346 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
349 def test_11_connect_roadmC_PP1_to_xpdrC_N1(self):
350 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
352 "networkutils:input": {
353 "networkutils:links-input": {
354 "networkutils:xpdr-node": "XPDRC",
355 "networkutils:xpdr-num": "1",
356 "networkutils:network-num": "1",
357 "networkutils:rdm-node": "ROADMC",
358 "networkutils:srg-num": "1",
359 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
363 headers = {'content-type': 'application/json'}
364 response = requests.request(
365 "POST", url, data=json.dumps(data),
366 headers=headers, auth=('admin', 'admin'))
367 self.assertEqual(response.status_code, requests.codes.ok)
368 res = response.json()
369 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
372 def test_12_connect_roadmC_PP2_to_xpdrC_N2(self):
373 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
375 "networkutils:input": {
376 "networkutils:links-input": {
377 "networkutils:xpdr-node": "XPDRC",
378 "networkutils:xpdr-num": "1",
379 "networkutils:network-num": "2",
380 "networkutils:rdm-node": "ROADMC",
381 "networkutils:srg-num": "1",
382 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
386 headers = {'content-type': 'application/json'}
387 response = requests.request(
388 "POST", url, data=json.dumps(data),
389 headers=headers, auth=('admin', 'admin'))
390 self.assertEqual(response.status_code, requests.codes.ok)
391 res = response.json()
392 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
395 #test service-create for Eth service from xpdr to xpdr
396 def test_13_create_eth_service1(self):
397 url = ("{}/operations/org-openroadm-service:service-create"
398 .format(self.restconf_baseurl))
400 "sdnc-request-header": {
401 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
402 "rpc-action": "service-create",
403 "request-system-id": "appname",
404 "notification-url": "http://localhost:8585/NotificationServer/notify"
406 "service-name": "service1",
407 "common-id": "ASATT1234567",
408 "connection-type": "service",
410 "service-rate": "100",
412 "service-format": "Ethernet",
413 "clli": "SNJSCAMCJP8",
416 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
417 "port-type": "router",
418 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
419 "port-rack": "000000.00",
423 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
424 "lgx-port-name": "LGX Back.3",
425 "lgx-port-rack": "000000.00",
426 "lgx-port-shelf": "00"
431 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
432 "port-type": "router",
433 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
434 "port-rack": "000000.00",
438 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
439 "lgx-port-name": "LGX Back.4",
440 "lgx-port-rack": "000000.00",
441 "lgx-port-shelf": "00"
447 "service-rate": "100",
449 "service-format": "Ethernet",
450 "clli": "SNJSCAMCJT4",
453 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
454 "port-type": "router",
455 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
456 "port-rack": "000000.00",
460 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
461 "lgx-port-name": "LGX Back.29",
462 "lgx-port-rack": "000000.00",
463 "lgx-port-shelf": "00"
468 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
469 "port-type": "router",
470 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
471 "port-rack": "000000.00",
475 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
476 "lgx-port-name": "LGX Back.30",
477 "lgx-port-rack": "000000.00",
478 "lgx-port-shelf": "00"
483 "due-date": "2016-11-28T00:00:01Z",
484 "operator-contact": "pw1234"
487 headers = {'content-type': 'application/json',
488 "Accept": "application/json"}
489 response = requests.request(
490 "POST", url, data=json.dumps(data), headers=headers,
491 auth=('admin', 'admin'))
492 self.assertEqual(response.status_code, requests.codes.ok)
493 res = response.json()
494 self.assertIn('PCE calculation in progress',
495 res['output']['configuration-response-common']['response-message'])
498 def test_14_get_eth_service1(self):
499 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
500 .format(self.restconf_baseurl))
501 headers = {'content-type': 'application/json',
502 "Accept": "application/json"}
503 response = requests.request(
504 "GET", url, headers=headers, auth=('admin', 'admin'))
505 self.assertEqual(response.status_code, requests.codes.ok)
506 res = response.json()
508 res['services'][0]['administrative-state'],
511 res['services'][0]['service-name'], 'service1')
513 res['services'][0]['connection-type'], 'service')
515 res['services'][0]['lifecycle-state'], 'planned')
518 def test_15_check_xc1_ROADMA(self):
519 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
520 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
521 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
522 .format(self.restconf_baseurl))
523 headers = {'content-type': 'application/json'}
524 response = requests.request(
525 "GET", url, headers=headers, auth=('admin', 'admin'))
526 self.assertEqual(response.status_code, requests.codes.ok)
527 res = response.json()
528 self.assertDictContainsSubset(
529 {'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
530 'wavelength-number': 1,
531 'opticalControlMode': 'gainLoss',
532 'target-output-power': -3.0},
533 res['roadm-connections'][0])
534 self.assertDictEqual(
535 {'src-if': 'SRG1-PP1-TXRX-1'},
536 res['roadm-connections'][0]['source'])
537 self.assertDictEqual(
538 {'dst-if': 'DEG1-TTP-TXRX-1'},
539 res['roadm-connections'][0]['destination'])
542 def test_16_check_xc1_ROADMC(self):
543 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
544 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
545 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
546 .format(self.restconf_baseurl))
547 headers = {'content-type': 'application/json'}
548 response = requests.request(
549 "GET", url, headers=headers, auth=('admin', 'admin'))
550 self.assertEqual(response.status_code, requests.codes.ok)
551 res = response.json()
552 self.assertDictContainsSubset(
553 {'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
554 'wavelength-number': 1,
555 'opticalControlMode': 'gainLoss',
556 'target-output-power': 2.0},
557 res['roadm-connections'][0])
558 self.assertDictEqual(
559 {'src-if': 'SRG1-PP1-TXRX-1'},
560 res['roadm-connections'][0]['source'])
561 self.assertDictEqual(
562 {'dst-if': 'DEG2-TTP-TXRX-1'},
563 res['roadm-connections'][0]['destination'])
566 def test_17_check_topo_XPDRA(self):
567 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA-XPDR1"
568 .format(self.restconf_baseurl))
569 response = requests.request(
570 "GET", url1, auth=('admin', 'admin'))
571 self.assertEqual(response.status_code, requests.codes.ok)
572 res = response.json()
573 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
575 if ele['tp-id'] == 'XPDR1-CLIENT1':
576 self.assertEqual({u'index': 1}, ele['org-openroadm-network-topology:xpdr-client-attributes']['wavelength'])
577 if ele['tp-id'] == 'XPDR1-NETWORK1':
578 self.assertEqual({u'index': 1}, ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
579 if ele['tp-id'] == 'XPDR1-CLIENT2':
580 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
581 if ele['tp-id'] == 'XPDR1-NETWORK2':
582 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
585 def test_18_check_topo_ROADMA_SRG1(self):
586 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG1"
587 .format(self.restconf_baseurl))
588 response = requests.request(
589 "GET", url1, auth=('admin', 'admin'))
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 self.assertNotIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
593 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
595 if ele['tp-id'] == 'SRG1-PP1-TXRX':
596 self.assertIn({u'index': 1}, ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
597 if ele['tp-id'] == 'SRG1-PP2-TXRX':
598 self.assertNotIn('used-wavelength', dict.keys(ele))
601 def test_19_check_topo_ROADMA_DEG1(self):
602 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG1"
603 .format(self.restconf_baseurl))
604 response = requests.request(
605 "GET", url1, auth=('admin', 'admin'))
606 self.assertEqual(response.status_code, requests.codes.ok)
607 res = response.json()
608 self.assertNotIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
609 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
611 if ele['tp-id'] == 'DEG1-CTP-TXRX':
612 self.assertIn({u'index': 1}, ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
613 if ele['tp-id'] == 'DEG1-TTP-TXRX':
614 self.assertIn({u'index': 1}, ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
617 def test_20_create_eth_service2(self):
618 url = ("{}/operations/org-openroadm-service:service-create"
619 .format(self.restconf_baseurl))
621 "sdnc-request-header": {
622 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
623 "rpc-action": "service-create",
624 "request-system-id": "appname",
625 "notification-url": "http://localhost:8585/NotificationServer/notify"
627 "service-name": "service2",
628 "common-id": "ASATT1234567",
629 "connection-type": "service",
631 "service-rate": "100",
633 "service-format": "Ethernet",
634 "clli": "SNJSCAMCJP8",
637 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
638 "port-type": "router",
639 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
640 "port-rack": "000000.00",
644 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
645 "lgx-port-name": "LGX Back.3",
646 "lgx-port-rack": "000000.00",
647 "lgx-port-shelf": "00"
652 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
653 "port-type": "router",
654 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
655 "port-rack": "000000.00",
659 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
660 "lgx-port-name": "LGX Back.4",
661 "lgx-port-rack": "000000.00",
662 "lgx-port-shelf": "00"
668 "service-rate": "100",
670 "service-format": "Ethernet",
671 "clli": "SNJSCAMCJT4",
674 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
675 "port-type": "router",
676 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
677 "port-rack": "000000.00",
681 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
682 "lgx-port-name": "LGX Back.29",
683 "lgx-port-rack": "000000.00",
684 "lgx-port-shelf": "00"
689 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
690 "port-type": "router",
691 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
692 "port-rack": "000000.00",
696 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
697 "lgx-port-name": "LGX Back.30",
698 "lgx-port-rack": "000000.00",
699 "lgx-port-shelf": "00"
704 "due-date": "2016-11-28T00:00:01Z",
705 "operator-contact": "pw1234"
708 headers = {'content-type': 'application/json',
709 "Accept": "application/json"}
710 response = requests.request(
711 "POST", url, data=json.dumps(data), headers=headers,
712 auth=('admin', 'admin'))
713 self.assertEqual(response.status_code, requests.codes.ok)
714 res = response.json()
715 self.assertIn('PCE calculation in progress',
716 res['output']['configuration-response-common']['response-message'])
719 def test_21_get_eth_service2(self):
720 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
721 .format(self.restconf_baseurl))
722 headers = {'content-type': 'application/json',
723 "Accept": "application/json"}
724 response = requests.request(
725 "GET", url, headers=headers, auth=('admin', 'admin'))
726 self.assertEqual(response.status_code, requests.codes.ok)
727 res = response.json()
729 res['services'][0]['administrative-state'],
732 res['services'][0]['service-name'], 'service2')
734 res['services'][0]['connection-type'], 'service')
736 res['services'][0]['lifecycle-state'], 'planned')
739 def test_22_check_xc2_ROADMA(self):
740 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
741 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
742 "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2"
743 .format(self.restconf_baseurl))
744 headers = {'content-type': 'application/json'}
745 response = requests.request(
746 "GET", url, headers=headers, auth=('admin', 'admin'))
747 self.assertEqual(response.status_code, requests.codes.ok)
748 res = response.json()
749 self.assertDictContainsSubset(
750 {'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
751 'wavelength-number': 2,
752 'opticalControlMode': 'power'},
753 res['roadm-connections'][0])
754 self.assertDictEqual(
755 {'src-if': 'DEG1-TTP-TXRX-2'},
756 res['roadm-connections'][0]['source'])
757 self.assertDictEqual(
758 {'dst-if': 'SRG1-PP2-TXRX-2'},
759 res['roadm-connections'][0]['destination'])
761 def test_23_check_topo_XPDRA(self):
762 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA-XPDR1"
763 .format(self.restconf_baseurl))
764 response = requests.request(
765 "GET", url1, auth=('admin', 'admin'))
766 self.assertEqual(response.status_code, requests.codes.ok)
767 res = response.json()
768 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
770 if ele['tp-id'] == 'XPDR1-CLIENT1':
771 self.assertEqual({u'index': 1}, ele['org-openroadm-network-topology:xpdr-client-attributes']['wavelength'])
772 if ele['tp-id'] == 'XPDR1-NETWORK1':
773 self.assertEqual({u'index': 1}, ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
774 if ele['tp-id'] == 'XPDR1-CLIENT2':
775 self.assertEqual({u'index': 2}, ele['org-openroadm-network-topology:xpdr-client-attributes']['wavelength'])
776 if ele['tp-id'] == 'XPDR1-NETWORK2':
777 self.assertEqual({u'index': 2}, ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
780 def test_24_check_topo_ROADMA_SRG1(self):
781 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG1"
782 .format(self.restconf_baseurl))
783 response = requests.request(
784 "GET", url1, auth=('admin', 'admin'))
785 self.assertEqual(response.status_code, requests.codes.ok)
786 res = response.json()
787 self.assertNotIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
788 self.assertNotIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
789 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
791 if ele['tp-id'] == 'SRG1-PP1-TXRX':
792 self.assertIn({u'index': 1}, ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
793 self.assertNotIn({u'index': 2}, ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
794 if ele['tp-id'] == 'SRG1-PP2-TXRX':
795 self.assertIn({u'index': 2}, ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
796 self.assertNotIn({u'index': 1}, ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
797 if ele['tp-id'] == 'SRG1-PP3-TXRX':
798 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
802 def test_25_check_topo_ROADMA_DEG1(self):
803 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG1"
804 .format(self.restconf_baseurl))
805 response = requests.request(
806 "GET", url1, auth=('admin', 'admin'))
807 self.assertEqual(response.status_code, requests.codes.ok)
808 res = response.json()
809 self.assertNotIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
810 self.assertNotIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
811 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
813 if ele['tp-id'] == 'DEG1-CTP-TXRX':
814 self.assertIn({u'index': 1}, ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
815 self.assertIn({u'index': 2}, ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
816 if ele['tp-id'] == 'DEG1-TTP-TXRX':
817 self.assertIn({u'index': 1}, ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
818 self.assertIn({u'index': 2}, ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
821 # creation service test on a non-available resource
822 def test_26_create_eth_service3(self):
823 url = ("{}/operations/org-openroadm-service:service-create"
824 .format(self.restconf_baseurl))
826 "sdnc-request-header": {
827 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
828 "rpc-action": "service-create",
829 "request-system-id": "appname",
830 "notification-url": "http://localhost:8585/NotificationServer/notify"
832 "service-name": "service3",
833 "common-id": "ASATT1234567",
834 "connection-type": "service",
836 "service-rate": "100",
838 "service-format": "Ethernet",
839 "clli": "SNJSCAMCJP8",
842 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
843 "port-type": "router",
844 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
845 "port-rack": "000000.00",
849 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
850 "lgx-port-name": "LGX Back.3",
851 "lgx-port-rack": "000000.00",
852 "lgx-port-shelf": "00"
857 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
858 "port-type": "router",
859 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
860 "port-rack": "000000.00",
864 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
865 "lgx-port-name": "LGX Back.4",
866 "lgx-port-rack": "000000.00",
867 "lgx-port-shelf": "00"
873 "service-rate": "100",
875 "service-format": "Ethernet",
876 "clli": "SNJSCAMCJT4",
879 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
880 "port-type": "router",
881 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
882 "port-rack": "000000.00",
886 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
887 "lgx-port-name": "LGX Back.29",
888 "lgx-port-rack": "000000.00",
889 "lgx-port-shelf": "00"
894 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
895 "port-type": "router",
896 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
897 "port-rack": "000000.00",
901 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
902 "lgx-port-name": "LGX Back.30",
903 "lgx-port-rack": "000000.00",
904 "lgx-port-shelf": "00"
909 "due-date": "2016-11-28T00:00:01Z",
910 "operator-contact": "pw1234"
913 headers = {'content-type': 'application/json',
914 "Accept": "application/json"}
915 response = requests.request(
916 "POST", url, data=json.dumps(data), headers=headers,
917 auth=('admin', 'admin'))
918 self.assertEqual(response.status_code, requests.codes.ok)
919 res = response.json()
920 self.assertIn('No path available',
921 res['output']['configuration-response-common']['response-message'])
922 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
925 # add a test that check the openroadm-service-list still only contains 2 elements
927 def test_27_delete_eth_service3(self):
928 url = ("{}/operations/org-openroadm-service:service-delete"
929 .format(self.restconf_baseurl))
931 "sdnc-request-header": {
932 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
933 "rpc-action": "service-delete",
934 "request-system-id": "appname",
935 "notification-url": "http://localhost:8585/NotificationServer/notify"
937 "service-delete-req-info": {
938 "service-name": "service3",
939 "tail-retention": "no"
943 headers = {'content-type': 'application/json'}
944 response = requests.request(
945 "POST", url, data=json.dumps(data), headers=headers,
946 auth=('admin', 'admin'))
947 self.assertEqual(response.status_code, requests.codes.ok)
948 res = response.json()
949 self.assertIn('Service \'service3\' does not exist in datastore',
950 res['output']['configuration-response-common']['response-message'])
951 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
954 def test_28_delete_eth_service1(self):
955 url = ("{}/operations/org-openroadm-service:service-delete"
956 .format(self.restconf_baseurl))
958 "sdnc-request-header": {
959 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
960 "rpc-action": "service-delete",
961 "request-system-id": "appname",
962 "notification-url": "http://localhost:8585/NotificationServer/notify"
964 "service-delete-req-info": {
965 "service-name": "service1",
966 "tail-retention": "no"
970 headers = {'content-type': 'application/json'}
971 response = requests.request(
972 "POST", url, data=json.dumps(data), headers=headers,
973 auth=('admin', 'admin'))
974 self.assertEqual(response.status_code, requests.codes.ok)
975 res = response.json()
976 self.assertIn('Renderer service delete in progress',
977 res['output']['configuration-response-common']['response-message'])
980 def test_29_delete_eth_service2(self):
981 url = ("{}/operations/org-openroadm-service:service-delete"
982 .format(self.restconf_baseurl))
984 "sdnc-request-header": {
985 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
986 "rpc-action": "service-delete",
987 "request-system-id": "appname",
988 "notification-url": "http://localhost:8585/NotificationServer/notify"
990 "service-delete-req-info": {
991 "service-name": "service2",
992 "tail-retention": "no"
996 headers = {'content-type': 'application/json'}
997 response = requests.request(
998 "POST", url, data=json.dumps(data), headers=headers,
999 auth=('admin', 'admin'))
1000 self.assertEqual(response.status_code, requests.codes.ok)
1001 res = response.json()
1002 self.assertIn('Renderer service delete in progress',
1003 res['output']['configuration-response-common']['response-message'])
1006 def test_30_check_no_xc_ROADMA(self):
1007 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1008 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1009 .format(self.restconf_baseurl))
1010 response = requests.request(
1011 "GET", url, auth=('admin', 'admin'))
1012 res = response.json()
1013 self.assertEqual(response.status_code, requests.codes.ok)
1014 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
1017 def test_31_check_topo_XPDRA(self):
1018 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDRA-XPDR1"
1019 .format(self.restconf_baseurl))
1020 response = requests.request(
1021 "GET", url1, auth=('admin', 'admin'))
1022 self.assertEqual(response.status_code, requests.codes.ok)
1023 res = response.json()
1024 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1025 for ele in liste_tp:
1026 if ele[u'org-openroadm-network-topology:tp-type'] == 'XPONDER-CLIENT':
1027 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-client-attributes']))
1028 elif ele[u'org-openroadm-network-topology:tp-type'] == 'XPONDER-NETWORK':
1029 self.assertNotIn('wavelength', dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
1032 def test_32_check_topo_ROADMA_SRG1(self):
1033 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-SRG1"
1034 .format(self.restconf_baseurl))
1035 response = requests.request(
1036 "GET", url1, auth=('admin', 'admin'))
1037 self.assertEqual(response.status_code, requests.codes.ok)
1038 res = response.json()
1039 self.assertIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1040 self.assertIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1041 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1042 for ele in liste_tp:
1043 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1044 self.assertEqual({}, ele['org-openroadm-network-topology:pp-attributes'])
1045 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
1046 self.assertEqual({}, ele['org-openroadm-network-topology:pp-attributes'])
1048 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1051 def test_33_check_topo_ROADMA_DEG1(self):
1052 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADMA-DEG1"
1053 .format(self.restconf_baseurl))
1054 response = requests.request(
1055 "GET", url1, auth=('admin', 'admin'))
1056 self.assertEqual(response.status_code, requests.codes.ok)
1057 res = response.json()
1058 self.assertIn({u'index': 1}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1059 self.assertIn({u'index': 2}, res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1060 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1061 for ele in liste_tp:
1062 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1063 self.assertEqual({}, ele['org-openroadm-network-topology:ctp-attributes'])
1064 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1065 self.assertEqual({}, ele['org-openroadm-network-topology:tx-ttp-attributes'])
1069 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1070 def test_34_create_oc_service1(self):
1071 url = ("{}/operations/org-openroadm-service:service-create"
1072 .format(self.restconf_baseurl))
1074 "sdnc-request-header": {
1075 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1076 "rpc-action": "service-create",
1077 "request-system-id": "appname",
1078 "notification-url": "http://localhost:8585/NotificationServer/notify"
1080 "service-name": "service1",
1081 "common-id": "ASATT1234567",
1082 "connection-type": "roadm-line",
1084 "service-rate": "100",
1085 "node-id": "ROADMA",
1086 "service-format": "OC",
1087 "clli": "SNJSCAMCJP8",
1090 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1091 "port-type": "router",
1092 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1093 "port-rack": "000000.00",
1097 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1098 "lgx-port-name": "LGX Back.3",
1099 "lgx-port-rack": "000000.00",
1100 "lgx-port-shelf": "00"
1105 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1106 "port-type": "router",
1107 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1108 "port-rack": "000000.00",
1112 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1113 "lgx-port-name": "LGX Back.4",
1114 "lgx-port-rack": "000000.00",
1115 "lgx-port-shelf": "00"
1118 "optic-type": "gray"
1121 "service-rate": "100",
1122 "node-id": "ROADMC",
1123 "service-format": "OC",
1124 "clli": "SNJSCAMCJT4",
1127 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1128 "port-type": "router",
1129 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1130 "port-rack": "000000.00",
1134 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1135 "lgx-port-name": "LGX Back.29",
1136 "lgx-port-rack": "000000.00",
1137 "lgx-port-shelf": "00"
1142 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1143 "port-type": "router",
1144 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1145 "port-rack": "000000.00",
1149 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1150 "lgx-port-name": "LGX Back.30",
1151 "lgx-port-rack": "000000.00",
1152 "lgx-port-shelf": "00"
1155 "optic-type": "gray"
1157 "due-date": "2016-11-28T00:00:01Z",
1158 "operator-contact": "pw1234"
1161 headers = {'content-type': 'application/json',
1162 "Accept": "application/json"}
1163 response = requests.request(
1164 "POST", url, data=json.dumps(data), headers=headers,
1165 auth=('admin', 'admin'))
1166 self.assertEqual(response.status_code, requests.codes.ok)
1167 res = response.json()
1168 self.assertIn('PCE calculation in progress',
1169 res['output']['configuration-response-common']['response-message'])
1172 def test_35_get_oc_service1(self):
1173 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1174 .format(self.restconf_baseurl))
1175 headers = {'content-type': 'application/json',
1176 "Accept": "application/json"}
1177 response = requests.request(
1178 "GET", url, headers=headers, auth=('admin', 'admin'))
1179 self.assertEqual(response.status_code, requests.codes.ok)
1180 res = response.json()
1182 res['services'][0]['administrative-state'],
1185 res['services'][0]['service-name'], 'service1')
1187 res['services'][0]['connection-type'], 'roadm-line')
1189 res['services'][0]['lifecycle-state'], 'planned')
1192 def test_36_check_xc1_ROADMA(self):
1193 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1194 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1195 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1196 .format(self.restconf_baseurl))
1197 headers = {'content-type': 'application/json'}
1198 response = requests.request(
1199 "GET", url, headers=headers, auth=('admin', 'admin'))
1200 self.assertEqual(response.status_code, requests.codes.ok)
1201 res = response.json()
1202 self.assertDictContainsSubset(
1203 {'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1204 'wavelength-number': 1,
1205 'opticalControlMode': 'gainLoss',
1206 'target-output-power': -3.0},
1207 res['roadm-connections'][0])
1208 self.assertDictEqual(
1209 {'src-if': 'SRG1-PP1-TXRX-1'},
1210 res['roadm-connections'][0]['source'])
1211 self.assertDictEqual(
1212 {'dst-if': 'DEG1-TTP-TXRX-1'},
1213 res['roadm-connections'][0]['destination'])
1216 def test_37_check_xc1_ROADMC(self):
1217 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1218 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1219 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1220 .format(self.restconf_baseurl))
1221 headers = {'content-type': 'application/json'}
1222 response = requests.request(
1223 "GET", url, headers=headers, auth=('admin', 'admin'))
1224 self.assertEqual(response.status_code, requests.codes.ok)
1225 res = response.json()
1226 self.assertDictContainsSubset(
1227 {'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1228 'wavelength-number': 1,
1229 'opticalControlMode': 'gainLoss',
1230 'target-output-power': 2.0},
1231 res['roadm-connections'][0])
1232 self.assertDictEqual(
1233 {'src-if': 'SRG1-PP1-TXRX-1'},
1234 res['roadm-connections'][0]['source'])
1235 self.assertDictEqual(
1236 {'dst-if': 'DEG2-TTP-TXRX-1'},
1237 res['roadm-connections'][0]['destination'])
1240 def test_38_create_oc_service2(self):
1241 url = ("{}/operations/org-openroadm-service:service-create"
1242 .format(self.restconf_baseurl))
1244 "sdnc-request-header": {
1245 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1246 "rpc-action": "service-create",
1247 "request-system-id": "appname",
1248 "notification-url": "http://localhost:8585/NotificationServer/notify"
1250 "service-name": "service2",
1251 "common-id": "ASATT1234567",
1252 "connection-type": "roadm-line",
1254 "service-rate": "100",
1255 "node-id": "ROADMA",
1256 "service-format": "OC",
1257 "clli": "SNJSCAMCJP8",
1260 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1261 "port-type": "router",
1262 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1263 "port-rack": "000000.00",
1267 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1268 "lgx-port-name": "LGX Back.3",
1269 "lgx-port-rack": "000000.00",
1270 "lgx-port-shelf": "00"
1275 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1276 "port-type": "router",
1277 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1278 "port-rack": "000000.00",
1282 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1283 "lgx-port-name": "LGX Back.4",
1284 "lgx-port-rack": "000000.00",
1285 "lgx-port-shelf": "00"
1288 "optic-type": "gray"
1291 "service-rate": "100",
1292 "node-id": "ROADMC",
1293 "service-format": "OC",
1294 "clli": "SNJSCAMCJT4",
1297 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1298 "port-type": "router",
1299 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1300 "port-rack": "000000.00",
1304 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1305 "lgx-port-name": "LGX Back.29",
1306 "lgx-port-rack": "000000.00",
1307 "lgx-port-shelf": "00"
1312 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1313 "port-type": "router",
1314 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1315 "port-rack": "000000.00",
1319 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1320 "lgx-port-name": "LGX Back.30",
1321 "lgx-port-rack": "000000.00",
1322 "lgx-port-shelf": "00"
1325 "optic-type": "gray"
1327 "due-date": "2016-11-28T00:00:01Z",
1328 "operator-contact": "pw1234"
1331 headers = {'content-type': 'application/json',
1332 "Accept": "application/json"}
1333 response = requests.request(
1334 "POST", url, data=json.dumps(data), headers=headers,
1335 auth=('admin', 'admin'))
1336 self.assertEqual(response.status_code, requests.codes.ok)
1337 res = response.json()
1338 self.assertIn('PCE calculation in progress',
1339 res['output']['configuration-response-common']['response-message'])
1342 def test_39_get_oc_service2(self):
1343 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1344 .format(self.restconf_baseurl))
1345 headers = {'content-type': 'application/json',
1346 "Accept": "application/json"}
1347 response = requests.request(
1348 "GET", url, headers=headers, auth=('admin', 'admin'))
1349 self.assertEqual(response.status_code, requests.codes.ok)
1350 res = response.json()
1352 res['services'][0]['administrative-state'],
1355 res['services'][0]['service-name'], 'service2')
1357 res['services'][0]['connection-type'], 'roadm-line')
1359 res['services'][0]['lifecycle-state'], 'planned')
1362 def test_40_check_xc2_ROADMA(self):
1363 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1364 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1365 "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2"
1366 .format(self.restconf_baseurl))
1367 headers = {'content-type': 'application/json'}
1368 response = requests.request(
1369 "GET", url, headers=headers, auth=('admin', 'admin'))
1370 self.assertEqual(response.status_code, requests.codes.ok)
1371 res = response.json()
1372 self.assertDictContainsSubset(
1373 {'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
1374 'wavelength-number': 2,
1375 'opticalControlMode': 'gainLoss',
1376 'target-output-power': -3.0},
1377 res['roadm-connections'][0])
1378 self.assertDictEqual(
1379 {'src-if': 'SRG1-PP2-TXRX-2'},
1380 res['roadm-connections'][0]['source'])
1381 self.assertDictEqual(
1382 {'dst-if': 'DEG1-TTP-TXRX-2'},
1383 res['roadm-connections'][0]['destination'])
1386 def test_41_check_topo_ROADMA(self):
1387 self.test_24_check_topo_ROADMA_SRG1()
1388 self.test_25_check_topo_ROADMA_DEG1()
1392 def test_42_delete_oc_service1(self):
1393 url = ("{}/operations/org-openroadm-service:service-delete"
1394 .format(self.restconf_baseurl))
1396 "sdnc-request-header": {
1397 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1398 "rpc-action": "service-delete",
1399 "request-system-id": "appname",
1400 "notification-url": "http://localhost:8585/NotificationServer/notify"
1402 "service-delete-req-info": {
1403 "service-name": "service1",
1404 "tail-retention": "no"
1408 headers = {'content-type': 'application/json'}
1409 response = requests.request(
1410 "POST", url, data=json.dumps(data), headers=headers,
1411 auth=('admin', 'admin'))
1412 self.assertEqual(response.status_code, requests.codes.ok)
1413 res = response.json()
1414 self.assertIn('Renderer service delete in progress',
1415 res['output']['configuration-response-common']['response-message'])
1418 def test_43_delete_oc_service2(self):
1419 url = ("{}/operations/org-openroadm-service:service-delete"
1420 .format(self.restconf_baseurl))
1422 "sdnc-request-header": {
1423 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1424 "rpc-action": "service-delete",
1425 "request-system-id": "appname",
1426 "notification-url": "http://localhost:8585/NotificationServer/notify"
1428 "service-delete-req-info": {
1429 "service-name": "service2",
1430 "tail-retention": "no"
1434 headers = {'content-type': 'application/json'}
1435 response = requests.request(
1436 "POST", url, data=json.dumps(data), headers=headers,
1437 auth=('admin', 'admin'))
1438 self.assertEqual(response.status_code, requests.codes.ok)
1439 res = response.json()
1440 self.assertIn('Renderer service delete in progress',
1441 res['output']['configuration-response-common']['response-message'])
1444 def test_44_get_no_oc_services(self):
1445 print ("start test")
1446 url = ("{}/operational/org-openroadm-service:service-list"
1447 .format(self.restconf_baseurl))
1448 headers = {'content-type': 'application/json',
1449 "Accept": "application/json"}
1450 response = requests.request(
1451 "GET", url, headers=headers, auth=('admin', 'admin'))
1452 self.assertEqual(response.status_code, requests.codes.ok)
1453 res = response.json()
1454 self.assertEqual({u'service-list': {}}, res)
1457 def test_45_get_no_xc_ROADMA(self):
1458 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1459 "/node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1460 .format(self.restconf_baseurl))
1461 headers = {'content-type': 'application/json',
1462 "Accept": "application/json"}
1463 response = requests.request(
1464 "GET", url, headers=headers, auth=('admin', 'admin'))
1465 self.assertEqual(response.status_code, requests.codes.ok)
1466 res = response.json()
1467 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1470 def test_46_check_topo_ROADMA(self):
1471 self.test_32_check_topo_ROADMA_SRG1()
1472 self.test_33_check_topo_ROADMA_DEG1()
1474 def test_47_loop_create_eth_service(self):
1475 for i in range(1,6):
1476 print ("trial number {}".format(i))
1477 print("eth service creation")
1478 self.test_13_create_eth_service1()
1479 print ("check xc in ROADMA")
1480 self.test_15_check_xc1_ROADMA()
1481 print ("check xc in ROADMC")
1482 self.test_16_check_xc1_ROADMC()
1483 print ("eth service deletion\n")
1484 self.test_28_delete_eth_service1()
1486 def test_48_loop_create_oc_service(self):
1487 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1488 .format(self.restconf_baseurl))
1489 response = requests.request("GET", url, auth=('admin', 'admin'))
1490 if response.status_code != 404:
1491 url = ("{}/operations/org-openroadm-service:service-delete"
1492 .format(self.restconf_baseurl))
1494 "sdnc-request-header": {
1495 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1496 "rpc-action": "service-delete",
1497 "request-system-id": "appname",
1498 "notification-url": "http://localhost:8585/NotificationServer/notify"
1500 "service-delete-req-info": {
1501 "service-name": "service1",
1502 "tail-retention": "no"
1506 headers = {'content-type': 'application/json'}
1507 requests.request("POST", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin'))
1510 for i in range(1,6):
1511 print ("trial number {}".format(i))
1512 print("oc service creation")
1513 self.test_34_create_oc_service1()
1514 print ("check xc in ROADMA")
1515 self.test_36_check_xc1_ROADMA()
1516 print ("check xc in ROADMC")
1517 self.test_37_check_xc1_ROADMC()
1518 print ("oc service deletion\n")
1519 self.test_42_delete_oc_service1()
1522 def test_49_disconnect_XPDRA(self):
1523 url = ("{}/config/network-topology:"
1524 "network-topology/topology/topology-netconf/node/XPDRA"
1525 .format(self.restconf_baseurl))
1526 headers = {'content-type': 'application/json'}
1527 response = requests.request(
1528 "DELETE", url, headers=headers,
1529 auth=('admin', 'admin'))
1530 self.assertEqual(response.status_code, requests.codes.ok)
1533 def test_50_disconnect_XPDRC(self):
1534 url = ("{}/config/network-topology:"
1535 "network-topology/topology/topology-netconf/node/XPDRC"
1536 .format(self.restconf_baseurl))
1537 headers = {'content-type': 'application/json'}
1538 response = requests.request(
1539 "DELETE", url, headers=headers,
1540 auth=('admin', 'admin'))
1541 self.assertEqual(response.status_code, requests.codes.ok)
1544 def test_51_disconnect_ROADMA(self):
1545 url = ("{}/config/network-topology:"
1546 "network-topology/topology/topology-netconf/node/ROADMA"
1547 .format(self.restconf_baseurl))
1548 headers = {'content-type': 'application/json'}
1549 response = requests.request(
1550 "DELETE", url, headers=headers,
1551 auth=('admin', 'admin'))
1552 self.assertEqual(response.status_code, requests.codes.ok)
1555 def test_52_disconnect_ROADMC(self):
1556 url = ("{}/config/network-topology:"
1557 "network-topology/topology/topology-netconf/node/ROADMC"
1558 .format(self.restconf_baseurl))
1559 headers = {'content-type': 'application/json'}
1560 response = requests.request(
1561 "DELETE", url, headers=headers,
1562 auth=('admin', 'admin'))
1563 self.assertEqual(response.status_code, requests.codes.ok)
1567 if __name__ == "__main__":
1568 unittest.main(verbosity=2)