3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportOlmTesting(unittest.TestCase):
27 honeynode_process1 = None
28 honeynode_process2 = None
29 honeynode_process3 = None
30 honeynode_process4 = None
32 restconf_baseurl = "http://localhost:8181/restconf"
34 #START_IGNORE_XTESTING
38 print ("starting honeynode1...")
39 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
42 print ("starting honeynode2...")
43 cls.honeynode_process2 = test_utils.start_roadma_full_honeynode()
46 print ("starting honeynode3...")
47 cls.honeynode_process3 = test_utils.start_roadmc_full_honeynode()
50 print ("starting honeynode4...")
51 cls.honeynode_process4 = test_utils.start_xpdrc_honeynode()
53 print ("all honeynodes started")
55 print ("starting opendaylight...")
56 cls.odl_process = test_utils.start_tpce()
58 print ("opendaylight started")
61 def tearDownClass(cls):
62 for child in psutil.Process(cls.odl_process.pid).children():
63 child.send_signal(signal.SIGINT)
65 cls.odl_process.send_signal(signal.SIGINT)
66 cls.odl_process.wait()
67 for child in psutil.Process(cls.honeynode_process1.pid).children():
68 child.send_signal(signal.SIGINT)
70 cls.honeynode_process1.send_signal(signal.SIGINT)
71 cls.honeynode_process1.wait()
72 for child in psutil.Process(cls.honeynode_process2.pid).children():
73 child.send_signal(signal.SIGINT)
75 cls.honeynode_process2.send_signal(signal.SIGINT)
76 cls.honeynode_process2.wait()
77 for child in psutil.Process(cls.honeynode_process3.pid).children():
78 child.send_signal(signal.SIGINT)
80 cls.honeynode_process3.send_signal(signal.SIGINT)
81 cls.honeynode_process3.wait()
82 for child in psutil.Process(cls.honeynode_process4.pid).children():
83 child.send_signal(signal.SIGINT)
85 cls.honeynode_process4.send_signal(signal.SIGINT)
86 cls.honeynode_process4.wait()
89 print ("execution of {}".format(self.id().split(".")[-1]))
94 def test_01_xpdrA_device_connected(self):
95 url = ("{}/config/network-topology:"
96 "network-topology/topology/topology-netconf/node/XPDRA"
97 .format(self.restconf_baseurl))
100 "netconf-node-topology:username": "admin",
101 "netconf-node-topology:password": "admin",
102 "netconf-node-topology:host": "127.0.0.1",
103 "netconf-node-topology:port": "17830",
104 "netconf-node-topology:tcp-only": "false",
105 "netconf-node-topology:pass-through": {}}]}
106 headers = {'content-type': 'application/json'}
107 response = requests.request(
108 "PUT", url, data=json.dumps(data), headers=headers,
109 auth=('admin', 'admin'))
110 self.assertEqual(response.status_code, requests.codes.created)
113 def test_02_xpdrC_device_connected(self):
114 url = ("{}/config/network-topology:"
115 "network-topology/topology/topology-netconf/node/XPDRC"
116 .format(self.restconf_baseurl))
119 "netconf-node-topology:username": "admin",
120 "netconf-node-topology:password": "admin",
121 "netconf-node-topology:host": "127.0.0.1",
122 "netconf-node-topology:port": "17834",
123 "netconf-node-topology:tcp-only": "false",
124 "netconf-node-topology:pass-through": {}}]}
125 headers = {'content-type': 'application/json'}
126 response = requests.request(
127 "PUT", url, data=json.dumps(data), headers=headers,
128 auth=('admin', 'admin'))
129 self.assertEqual(response.status_code, requests.codes.created)
132 def test_03_rdmA_device_connected(self):
133 url = ("{}/config/network-topology:"
134 "network-topology/topology/topology-netconf/node/ROADMA"
135 .format(self.restconf_baseurl))
138 "netconf-node-topology:username": "admin",
139 "netconf-node-topology:password": "admin",
140 "netconf-node-topology:host": "127.0.0.1",
141 "netconf-node-topology:port": "17821",
142 "netconf-node-topology:tcp-only": "false",
143 "netconf-node-topology:pass-through": {}}]}
144 headers = {'content-type': 'application/json'}
145 response = requests.request(
146 "PUT", url, data=json.dumps(data), headers=headers,
147 auth=('admin', 'admin'))
148 self.assertEqual(response.status_code, requests.codes.created)
151 def test_04_rdmC_device_connected(self):
152 url = ("{}/config/network-topology:"
153 "network-topology/topology/topology-netconf/node/ROADMC"
154 .format(self.restconf_baseurl))
157 "netconf-node-topology:username": "admin",
158 "netconf-node-topology:password": "admin",
159 "netconf-node-topology:host": "127.0.0.1",
160 "netconf-node-topology:port": "17823",
161 "netconf-node-topology:tcp-only": "false",
162 "netconf-node-topology:pass-through": {}}]}
163 headers = {'content-type': 'application/json'}
164 response = requests.request(
165 "PUT", url, data=json.dumps(data), headers=headers,
166 auth=('admin', 'admin'))
167 self.assertEqual(response.status_code, requests.codes.created)
170 def test_05_connect_xprdA_to_roadmA(self):
171 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
173 "networkutils:input": {
174 "networkutils:links-input": {
175 "networkutils:xpdr-node": "XPDRA",
176 "networkutils:xpdr-num": "1",
177 "networkutils:network-num": "1",
178 "networkutils:rdm-node": "ROADMA",
179 "networkutils:srg-num": "1",
180 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
184 headers = {'content-type': 'application/json'}
185 response = requests.request(
186 "POST", url, data=json.dumps(data),
187 headers=headers, auth=('admin', 'admin'))
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
192 def test_06_connect_roadmA_to_xpdrA(self):
193 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
195 "networkutils:input": {
196 "networkutils:links-input": {
197 "networkutils:xpdr-node": "XPDRA",
198 "networkutils:xpdr-num": "1",
199 "networkutils:network-num": "1",
200 "networkutils:rdm-node": "ROADMA",
201 "networkutils:srg-num": "1",
202 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
206 headers = {'content-type': 'application/json'}
207 response = requests.request(
208 "POST", url, data=json.dumps(data),
209 headers=headers, auth=('admin', 'admin'))
210 self.assertEqual(response.status_code, requests.codes.ok)
211 res = response.json()
212 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
214 def test_07_connect_xprdC_to_roadmC(self):
215 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
217 "networkutils:input": {
218 "networkutils:links-input": {
219 "networkutils:xpdr-node": "XPDRC",
220 "networkutils:xpdr-num": "1",
221 "networkutils:network-num": "1",
222 "networkutils:rdm-node": "ROADMC",
223 "networkutils:srg-num": "1",
224 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
228 headers = {'content-type': 'application/json'}
229 response = requests.request(
230 "POST", url, data=json.dumps(data),
231 headers=headers, auth=('admin', 'admin'))
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
234 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
236 def test_08_connect_roadmC_to_xpdrC(self):
237 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
239 "networkutils:input": {
240 "networkutils:links-input": {
241 "networkutils:xpdr-node": "XPDRC",
242 "networkutils:xpdr-num": "1",
243 "networkutils:network-num": "1",
244 "networkutils:rdm-node": "ROADMC",
245 "networkutils:srg-num": "1",
246 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
250 headers = {'content-type': 'application/json'}
251 response = requests.request(
252 "POST", url, data=json.dumps(data),
253 headers=headers, auth=('admin', 'admin'))
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
258 def test_09_create_OTS_ROADMA(self):
259 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
262 "node-id" : "ROADMA",
263 "logical-connection-point" : "DEG1-TTP-TXRX"
266 headers = {'content-type': 'application/json'}
267 response = requests.request(
268 "POST", url, data=json.dumps(data),
269 headers=headers, auth=('admin', 'admin'))
270 self.assertEqual(response.status_code, requests.codes.ok)
271 res = response.json()
272 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA',
273 res["output"]["result"])
275 def test_10_create_OTS_ROADMC(self):
276 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
279 "node-id" : "ROADMC",
280 "logical-connection-point" : "DEG2-TTP-TXRX"
283 headers = {'content-type': 'application/json'}
284 response = requests.request(
285 "POST", url, data=json.dumps(data),
286 headers=headers, auth=('admin', 'admin'))
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
289 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC',
290 res["output"]["result"])
292 def test_11_get_PM_ROADMA(self):
293 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
297 "resource-type": "interface",
298 "granularity": "15min",
299 "resource-identifier": {
300 "resource-name" : "OTS-DEG1-TTP-TXRX"
304 headers = {'content-type': 'application/json'}
305 response = requests.request(
306 "POST", url, data=json.dumps(data),
307 headers=headers, auth=('admin', 'admin'))
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
311 "pmparameter-name": "OpticalPowerOutput",
312 "pmparameter-value": "2.5"
313 }, res["output"]["measurements"])
315 "pmparameter-name": "OpticalReturnLoss",
316 "pmparameter-value": "49.9"
317 }, res["output"]["measurements"])
319 "pmparameter-name": "OpticalPowerInput",
320 "pmparameter-value": "3"
321 }, res["output"]["measurements"])
323 def test_12_get_PM_ROADMC(self):
324 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
328 "resource-type": "interface",
329 "granularity": "15min",
330 "resource-identifier": {
331 "resource-name" : "OTS-DEG2-TTP-TXRX"
335 headers = {'content-type': 'application/json'}
336 response = requests.request(
337 "POST", url, data=json.dumps(data),
338 headers=headers, auth=('admin', 'admin'))
339 self.assertEqual(response.status_code, requests.codes.ok)
340 res = response.json()
342 "pmparameter-name": "OpticalPowerOutput",
343 "pmparameter-value": "18.1"
344 }, res["output"]["measurements"])
346 "pmparameter-name": "OpticalReturnLoss",
347 "pmparameter-value": "48.8"
348 }, res["output"]["measurements"])
350 "pmparameter-name": "OpticalPowerInput",
351 "pmparameter-value": "-3.2"
352 }, res["output"]["measurements"])
354 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
355 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
359 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
362 headers = {'content-type': 'application/json'}
363 response = requests.request(
364 "POST", url, data=json.dumps(data),
365 headers=headers, auth=('admin', 'admin'))
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 self.assertIn('Success',
369 res["output"]["result"])
372 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
373 }, res["output"]["spans"])
376 def test_14_calculate_span_loss_base_all(self):
377 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
383 headers = {'content-type': 'application/json'}
384 response = requests.request(
385 "POST", url, data=json.dumps(data),
386 headers=headers, auth=('admin', 'admin'))
387 self.assertEqual(response.status_code, requests.codes.ok)
388 res = response.json()
389 self.assertIn('Success',
390 res["output"]["result"])
393 "link-id": "ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX"
394 }, res["output"]["spans"])
397 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
398 }, res["output"]["spans"])
401 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
402 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
403 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
404 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
405 headers = {'content-type': 'application/json'}
406 response = requests.request(
407 "GET", url, headers=headers, auth=('admin', 'admin'))
408 self.assertEqual(response.status_code, requests.codes.ok)
409 res = response.json()
410 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
411 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
413 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
414 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
415 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
416 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
417 headers = {'content-type': 'application/json'}
418 response = requests.request(
419 "GET", url, headers=headers, auth=('admin', 'admin'))
420 self.assertEqual(response.status_code, requests.codes.ok)
421 res = response.json()
422 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
423 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
425 def test_17_servicePath_create_AToZ(self):
426 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
429 "service-name": "test",
431 "modulation-format": "qpsk",
432 "operation": "create",
435 "dest-tp": "XPDR1-NETWORK1",
436 "src-tp": "XPDR1-CLIENT1",
440 "dest-tp": "DEG1-TTP-TXRX",
441 "src-tp": "SRG1-PP1-TXRX",
445 "dest-tp": "SRG1-PP1-TXRX",
446 "src-tp": "DEG2-TTP-TXRX",
450 "dest-tp": "XPDR1-CLIENT1",
451 "src-tp": "XPDR1-NETWORK1",
457 headers = {'content-type': 'application/json'}
458 response = requests.request(
459 "POST", url, data=json.dumps(data),
460 headers=headers, auth=('admin', 'admin'))
461 self.assertEqual(response.status_code, requests.codes.ok)
462 res = response.json()
463 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
467 def test_18_servicePath_create_ZToA(self):
468 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
471 "service-name": "test",
473 "modulation-format": "qpsk",
474 "operation": "create",
477 "dest-tp": "XPDR1-NETWORK1",
478 "src-tp": "XPDR1-CLIENT1",
482 "dest-tp": "DEG2-TTP-TXRX",
483 "src-tp": "SRG1-PP1-TXRX",
487 "src-tp": "DEG1-TTP-TXRX",
488 "dest-tp": "SRG1-PP1-TXRX",
492 "src-tp": "XPDR1-NETWORK1",
493 "dest-tp": "XPDR1-CLIENT1",
499 headers = {'content-type': 'application/json'}
500 response = requests.request(
501 "POST", url, data=json.dumps(data),
502 headers=headers, auth=('admin', 'admin'))
503 self.assertEqual(response.status_code, requests.codes.ok)
504 res = response.json()
505 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
509 def test_19_service_power_setup_XPDRA_XPDRC(self):
510 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
513 "service-name": "test",
517 "dest-tp": "XPDR1-NETWORK1",
518 "src-tp": "XPDR1-CLIENT1",
522 "dest-tp": "DEG1-TTP-TXRX",
523 "src-tp": "SRG1-PP1-TXRX",
527 "dest-tp": "SRG1-PP1-TXRX",
528 "src-tp": "DEG2-TTP-TXRX",
532 "dest-tp": "XPDR1-CLIENT1",
533 "src-tp": "XPDR1-NETWORK1",
539 headers = {'content-type': 'application/json'}
540 response = requests.request(
541 "POST", url, data=json.dumps(data),
542 headers=headers, auth=('admin', 'admin'))
543 self.assertEqual(response.status_code, requests.codes.ok)
544 res = response.json()
545 self.assertIn('Success', res["output"]["result"])
547 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
548 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
549 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
550 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
551 headers = {'content-type': 'application/json'}
552 response = requests.request(
553 "GET", url, headers=headers, auth=('admin', 'admin'))
554 self.assertEqual(response.status_code, requests.codes.ok)
555 res = response.json()
556 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
557 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
559 def test_21_get_roadmconnection_ROADMA(self):
560 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
561 "org-openroadm-device:org-openroadm-device/roadm-connections/"
562 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
563 headers = {'content-type': 'application/json'}
564 response = requests.request(
565 "GET", url, headers=headers, auth=('admin', 'admin'))
566 self.assertEqual(response.status_code, requests.codes.ok)
567 res = response.json()
568 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
569 self.assertEqual(-3, res['roadm-connections'][0]['target-output-power'])
571 def test_22_get_roadmconnection_ROADMC(self):
572 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
573 "org-openroadm-device:org-openroadm-device/roadm-connections/"
574 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
575 headers = {'content-type': 'application/json'}
576 response = requests.request(
577 "GET", url, headers=headers, auth=('admin', 'admin'))
578 self.assertEqual(response.status_code, requests.codes.ok)
579 res = response.json()
580 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
582 def test_23_service_power_setup_XPDRC_XPDRA(self):
583 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
586 "service-name": "test",
590 "dest-tp": "XPDR1-NETWORK1",
591 "src-tp": "XPDR1-CLIENT1",
595 "dest-tp": "DEG2-TTP-TXRX",
596 "src-tp": "SRG1-PP1-TXRX",
600 "src-tp": "DEG1-TTP-TXRX",
601 "dest-tp": "SRG1-PP1-TXRX",
605 "src-tp": "XPDR1-NETWORK1",
606 "dest-tp": "XPDR1-CLIENT1",
612 headers = {'content-type': 'application/json'}
613 response = requests.request(
614 "POST", url, data=json.dumps(data),
615 headers=headers, auth=('admin', 'admin'))
616 self.assertEqual(response.status_code, requests.codes.ok)
617 res = response.json()
618 self.assertIn('Success', res["output"]["result"])
620 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
621 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC/yang-ext:mount/"
622 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
623 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
624 headers = {'content-type': 'application/json'}
625 response = requests.request(
626 "GET", url, headers=headers, auth=('admin', 'admin'))
627 self.assertEqual(response.status_code, requests.codes.ok)
628 res = response.json()
629 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
630 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
632 def test_25_get_roadmconnection_ROADMC(self):
633 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
634 "org-openroadm-device:org-openroadm-device/roadm-connections/"
635 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
636 headers = {'content-type': 'application/json'}
637 response = requests.request(
638 "GET", url, headers=headers, auth=('admin', 'admin'))
639 self.assertEqual(response.status_code, requests.codes.ok)
640 res = response.json()
641 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
642 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
644 def test_26_service_power_turndown_XPDRA_XPDRC(self):
645 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
648 "service-name": "test",
652 "dest-tp": "XPDR1-NETWORK1",
653 "src-tp": "XPDR1-CLIENT1",
657 "dest-tp": "DEG1-TTP-TXRX",
658 "src-tp": "SRG1-PP1-TXRX",
662 "dest-tp": "SRG1-PP1-TXRX",
663 "src-tp": "DEG2-TTP-TXRX",
667 "dest-tp": "XPDR1-CLIENT1",
668 "src-tp": "XPDR1-NETWORK1",
674 headers = {'content-type': 'application/json'}
675 response = requests.request(
676 "POST", url, data=json.dumps(data),
677 headers=headers, auth=('admin', 'admin'))
678 self.assertEqual(response.status_code, requests.codes.ok)
679 res = response.json()
680 self.assertIn('Success', res["output"]["result"])
682 def test_27_get_roadmconnection_ROADMA(self):
683 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
684 "org-openroadm-device:org-openroadm-device/roadm-connections/"
685 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
686 headers = {'content-type': 'application/json'}
687 response = requests.request(
688 "GET", url, headers=headers, auth=('admin', 'admin'))
689 self.assertEqual(response.status_code, requests.codes.ok)
690 res = response.json()
691 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
692 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
694 def test_28_get_roadmconnection_ROADMC(self):
695 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
696 "org-openroadm-device:org-openroadm-device/roadm-connections/"
697 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
698 headers = {'content-type': 'application/json'}
699 response = requests.request(
700 "GET", url, headers=headers, auth=('admin', 'admin'))
701 self.assertEqual(response.status_code, requests.codes.ok)
702 res = response.json()
703 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
705 def test_29_servicePath_delete_AToZ(self):
706 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
709 "service-name": "test",
711 "modulation-format": "qpsk",
712 "operation": "delete",
715 "dest-tp": "XPDR1-NETWORK1",
716 "src-tp": "XPDR1-CLIENT1",
720 "dest-tp": "DEG1-TTP-TXRX",
721 "src-tp": "SRG1-PP1-TXRX",
725 "dest-tp": "SRG1-PP1-TXRX",
726 "src-tp": "DEG2-TTP-TXRX",
730 "dest-tp": "XPDR1-CLIENT1",
731 "src-tp": "XPDR1-NETWORK1",
737 headers = {'content-type': 'application/json'}
738 response = requests.request(
739 "POST", url, data=json.dumps(data),
740 headers=headers, auth=('admin', 'admin'))
741 self.assertEqual(response.status_code, requests.codes.ok)
742 res = response.json()
743 self.assertIn('Request processed', res["output"]["result"])
746 def test_30_servicePath_delete_ZToA(self):
747 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
750 "service-name": "test",
752 "modulation-format": "qpsk",
753 "operation": "delete",
756 "dest-tp": "XPDR1-NETWORK1",
757 "src-tp": "XPDR1-CLIENT1",
761 "dest-tp": "DEG2-TTP-TXRX",
762 "src-tp": "SRG1-PP1-TXRX",
766 "src-tp": "DEG1-TTP-TXRX",
767 "dest-tp": "SRG1-PP1-TXRX",
771 "src-tp": "XPDR1-NETWORK1",
772 "dest-tp": "XPDR1-CLIENT1",
778 headers = {'content-type': 'application/json'}
779 response = requests.request(
780 "POST", url, data=json.dumps(data),
781 headers=headers, auth=('admin', 'admin'))
782 self.assertEqual(response.status_code, requests.codes.ok)
783 res = response.json()
784 self.assertIn('Request processed', res["output"]["result"])
787 """to test case where SRG where the xpdr is connected to has no optical range data"""
789 def test_31_connect_xprdA_to_roadmA(self):
790 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
792 "networkutils:input": {
793 "networkutils:links-input": {
794 "networkutils:xpdr-node": "XPDRA",
795 "networkutils:xpdr-num": "1",
796 "networkutils:network-num": "2",
797 "networkutils:rdm-node": "ROADMA",
798 "networkutils:srg-num": "1",
799 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
803 headers = {'content-type': 'application/json'}
804 response = requests.request(
805 "POST", url, data=json.dumps(data),
806 headers=headers, auth=('admin', 'admin'))
807 self.assertEqual(response.status_code, requests.codes.ok)
808 res = response.json()
809 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
811 def test_32_connect_roadmA_to_xpdrA(self):
812 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
814 "networkutils:input": {
815 "networkutils:links-input": {
816 "networkutils:xpdr-node": "XPDRA",
817 "networkutils:xpdr-num": "1",
818 "networkutils:network-num": "2",
819 "networkutils:rdm-node": "ROADMA",
820 "networkutils:srg-num": "1",
821 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
825 headers = {'content-type': 'application/json'}
826 response = requests.request(
827 "POST", url, data=json.dumps(data),
828 headers=headers, auth=('admin', 'admin'))
829 self.assertEqual(response.status_code, requests.codes.ok)
830 res = response.json()
831 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
833 def test_33_servicePath_create_AToZ(self):
834 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
837 "service-name": "test2",
839 "modulation-format": "qpsk",
840 "operation": "create",
843 "dest-tp": "XPDR1-NETWORK2",
844 "src-tp": "XPDR1-CLIENT2",
848 "dest-tp": "DEG1-TTP-TXRX",
849 "src-tp": "SRG1-PP2-TXRX",
855 headers = {'content-type': 'application/json'}
856 response = requests.request(
857 "POST", url, data=json.dumps(data),
858 headers=headers, auth=('admin', 'admin'))
859 self.assertEqual(response.status_code, requests.codes.ok)
860 res = response.json()
861 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
865 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
866 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
867 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
868 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
869 headers = {'content-type': 'application/json'}
870 response = requests.request(
871 "GET", url, headers=headers, auth=('admin', 'admin'))
872 self.assertEqual(response.status_code, requests.codes.ok)
873 res = response.json()
874 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
875 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
877 def test_35_servicePath_delete_AToZ(self):
878 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
881 "service-name": "test",
883 "modulation-format": "qpsk",
884 "operation": "delete",
887 "dest-tp": "XPDR1-NETWORK2",
888 "src-tp": "XPDR1-CLIENT2",
892 "dest-tp": "DEG1-TTP-TXRX",
893 "src-tp": "SRG1-PP2-TXRX",
899 headers = {'content-type': 'application/json'}
900 response = requests.request(
901 "POST", url, data=json.dumps(data),
902 headers=headers, auth=('admin', 'admin'))
903 self.assertEqual(response.status_code, requests.codes.ok)
904 res = response.json()
905 self.assertIn('Request processed', res["output"]["result"])
908 def test_36_xpdrA_device_disconnected(self):
909 url = ("{}/config/network-topology:"
910 "network-topology/topology/topology-netconf/node/XPDRA"
911 .format(self.restconf_baseurl))
912 headers = {'content-type': 'application/json'}
913 response = requests.request(
914 "DELETE", url, headers=headers,
915 auth=('admin', 'admin'))
916 self.assertEqual(response.status_code, requests.codes.ok)
919 def test_37_xpdrC_device_disconnected(self):
920 url = ("{}/config/network-topology:"
921 "network-topology/topology/topology-netconf/node/XPDRC"
922 .format(self.restconf_baseurl))
923 headers = {'content-type': 'application/json'}
924 response = requests.request(
925 "DELETE", url, headers=headers,
926 auth=('admin', 'admin'))
927 self.assertEqual(response.status_code, requests.codes.ok)
930 def test_38_calculate_span_loss_current(self):
931 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
932 headers = {'content-type': 'application/json'}
933 response = requests.request(
934 "POST", url, headers=headers, auth=('admin', 'admin'))
935 self.assertEqual(response.status_code, requests.codes.ok)
936 res = response.json()
937 self.assertIn('Success',
938 res["output"]["result"])
941 def test_39_rdmA_device_disconnected(self):
942 url = ("{}/config/network-topology:"
943 "network-topology/topology/topology-netconf/node/ROADMA"
944 .format(self.restconf_baseurl))
945 headers = {'content-type': 'application/json'}
946 response = requests.request(
947 "DELETE", url, headers=headers,
948 auth=('admin', 'admin'))
949 self.assertEqual(response.status_code, requests.codes.ok)
952 def test_40_rdmC_device_disconnected(self):
953 url = ("{}/config/network-topology:"
954 "network-topology/topology/topology-netconf/node/ROADMC"
955 .format(self.restconf_baseurl))
956 headers = {'content-type': 'application/json'}
957 response = requests.request(
958 "DELETE", url, headers=headers,
959 auth=('admin', 'admin'))
960 self.assertEqual(response.status_code, requests.codes.ok)
963 if __name__ == "__main__":
964 unittest.main(verbosity=2)