3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
24 class TransportOlmTesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
33 #START_IGNORE_XTESTING
36 def __start_honeynode1(cls):
37 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
38 "/honeynode-distribution-1.18.01/honeycomb-tpce")
39 if os.path.isfile(executable):
40 with open('honeynode1.log', 'w') as outfile:
41 cls.honeynode_process1 = subprocess.Popen(
42 [executable, "17830", "sample_configs/openroadm/2.1/oper-XPDRA.xml"],
46 def __start_honeynode2(cls):
47 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
48 "/honeynode-distribution-1.18.01/honeycomb-tpce")
49 if os.path.isfile(executable):
50 with open('honeynode2.log', 'w') as outfile:
51 cls.honeynode_process2 = subprocess.Popen(
52 [executable, "17831", "sample_configs/openroadm/2.1/oper-ROADMA-full.xml"],
56 def __start_honeynode3(cls):
57 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
58 "/honeynode-distribution-1.18.01/honeycomb-tpce")
59 if os.path.isfile(executable):
60 with open('honeynode3.log', 'w') as outfile:
61 cls.honeynode_process3 = subprocess.Popen(
62 [executable, "17833", "sample_configs/openroadm/2.1/oper-ROADMC-full.xml"],
65 def __start_honeynode4(cls):
66 executable = ("./honeynode/2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
67 "/honeynode-distribution-1.18.01/honeycomb-tpce")
68 if os.path.isfile(executable):
69 with open('honeynode4.log', 'w') as outfile:
70 cls.honeynode_process4 = subprocess.Popen(
71 [executable, "17834", "sample_configs/openroadm/2.1/oper-XPDRC.xml"],
76 executable = "../karaf/target/assembly/bin/karaf"
77 with open('odl.log', 'w') as outfile:
78 cls.odl_process = subprocess.Popen(
79 ["bash", executable, "server"], stdout=outfile,
80 stdin=open(os.devnull))
84 cls.__start_honeynode1()
86 cls.__start_honeynode2()
88 cls.__start_honeynode3()
90 cls.__start_honeynode4()
96 def tearDownClass(cls):
97 for child in psutil.Process(cls.odl_process.pid).children():
98 child.send_signal(signal.SIGINT)
100 cls.odl_process.send_signal(signal.SIGINT)
101 cls.odl_process.wait()
102 for child in psutil.Process(cls.honeynode_process1.pid).children():
103 child.send_signal(signal.SIGINT)
105 cls.honeynode_process1.send_signal(signal.SIGINT)
106 cls.honeynode_process1.wait()
107 for child in psutil.Process(cls.honeynode_process2.pid).children():
108 child.send_signal(signal.SIGINT)
110 cls.honeynode_process2.send_signal(signal.SIGINT)
111 cls.honeynode_process2.wait()
112 for child in psutil.Process(cls.honeynode_process3.pid).children():
113 child.send_signal(signal.SIGINT)
115 cls.honeynode_process3.send_signal(signal.SIGINT)
116 cls.honeynode_process3.wait()
117 for child in psutil.Process(cls.honeynode_process4.pid).children():
118 child.send_signal(signal.SIGINT)
120 cls.honeynode_process4.send_signal(signal.SIGINT)
121 cls.honeynode_process4.wait()
124 print ("execution of {}".format(self.id().split(".")[-1]))
129 def test_01_xpdrA_device_connected(self):
130 url = ("{}/config/network-topology:"
131 "network-topology/topology/topology-netconf/node/XPDRA"
132 .format(self.restconf_baseurl))
135 "netconf-node-topology:username": "admin",
136 "netconf-node-topology:password": "admin",
137 "netconf-node-topology:host": "127.0.0.1",
138 "netconf-node-topology:port": "17830",
139 "netconf-node-topology:tcp-only": "false",
140 "netconf-node-topology:pass-through": {}}]}
141 headers = {'content-type': 'application/json'}
142 response = requests.request(
143 "PUT", url, data=json.dumps(data), headers=headers,
144 auth=('admin', 'admin'))
145 self.assertEqual(response.status_code, requests.codes.created)
148 def test_02_xpdrC_device_connected(self):
149 url = ("{}/config/network-topology:"
150 "network-topology/topology/topology-netconf/node/XPDRC"
151 .format(self.restconf_baseurl))
154 "netconf-node-topology:username": "admin",
155 "netconf-node-topology:password": "admin",
156 "netconf-node-topology:host": "127.0.0.1",
157 "netconf-node-topology:port": "17834",
158 "netconf-node-topology:tcp-only": "false",
159 "netconf-node-topology:pass-through": {}}]}
160 headers = {'content-type': 'application/json'}
161 response = requests.request(
162 "PUT", url, data=json.dumps(data), headers=headers,
163 auth=('admin', 'admin'))
164 self.assertEqual(response.status_code, requests.codes.created)
167 def test_03_rdmA_device_connected(self):
168 url = ("{}/config/network-topology:"
169 "network-topology/topology/topology-netconf/node/ROADMA"
170 .format(self.restconf_baseurl))
173 "netconf-node-topology:username": "admin",
174 "netconf-node-topology:password": "admin",
175 "netconf-node-topology:host": "127.0.0.1",
176 "netconf-node-topology:port": "17831",
177 "netconf-node-topology:tcp-only": "false",
178 "netconf-node-topology:pass-through": {}}]}
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "PUT", url, data=json.dumps(data), headers=headers,
182 auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.created)
186 def test_04_rdmC_device_connected(self):
187 url = ("{}/config/network-topology:"
188 "network-topology/topology/topology-netconf/node/ROADMC"
189 .format(self.restconf_baseurl))
192 "netconf-node-topology:username": "admin",
193 "netconf-node-topology:password": "admin",
194 "netconf-node-topology:host": "127.0.0.1",
195 "netconf-node-topology:port": "17833",
196 "netconf-node-topology:tcp-only": "false",
197 "netconf-node-topology:pass-through": {}}]}
198 headers = {'content-type': 'application/json'}
199 response = requests.request(
200 "PUT", url, data=json.dumps(data), headers=headers,
201 auth=('admin', 'admin'))
202 self.assertEqual(response.status_code, requests.codes.created)
205 def test_05_connect_xprdA_to_roadmA(self):
206 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
208 "networkutils:input": {
209 "networkutils:links-input": {
210 "networkutils:xpdr-node": "XPDRA",
211 "networkutils:xpdr-num": "1",
212 "networkutils:network-num": "1",
213 "networkutils:rdm-node": "ROADMA",
214 "networkutils:srg-num": "1",
215 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
219 headers = {'content-type': 'application/json'}
220 response = requests.request(
221 "POST", url, data=json.dumps(data),
222 headers=headers, auth=('admin', 'admin'))
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
225 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
227 def test_06_connect_roadmA_to_xpdrA(self):
228 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
230 "networkutils:input": {
231 "networkutils:links-input": {
232 "networkutils:xpdr-node": "XPDRA",
233 "networkutils:xpdr-num": "1",
234 "networkutils:network-num": "1",
235 "networkutils:rdm-node": "ROADMA",
236 "networkutils:srg-num": "1",
237 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
241 headers = {'content-type': 'application/json'}
242 response = requests.request(
243 "POST", url, data=json.dumps(data),
244 headers=headers, auth=('admin', 'admin'))
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
249 def test_07_connect_xprdC_to_roadmC(self):
250 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
252 "networkutils:input": {
253 "networkutils:links-input": {
254 "networkutils:xpdr-node": "XPDRC",
255 "networkutils:xpdr-num": "1",
256 "networkutils:network-num": "1",
257 "networkutils:rdm-node": "ROADMC",
258 "networkutils:srg-num": "1",
259 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
263 headers = {'content-type': 'application/json'}
264 response = requests.request(
265 "POST", url, data=json.dumps(data),
266 headers=headers, auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.ok)
268 res = response.json()
269 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
271 def test_08_connect_roadmC_to_xpdrC(self):
272 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
274 "networkutils:input": {
275 "networkutils:links-input": {
276 "networkutils:xpdr-node": "XPDRC",
277 "networkutils:xpdr-num": "1",
278 "networkutils:network-num": "1",
279 "networkutils:rdm-node": "ROADMC",
280 "networkutils:srg-num": "1",
281 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
285 headers = {'content-type': 'application/json'}
286 response = requests.request(
287 "POST", url, data=json.dumps(data),
288 headers=headers, auth=('admin', 'admin'))
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
293 def test_09_create_OTS_ROADMA(self):
294 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
297 "node-id" : "ROADMA",
298 "logical-connection-point" : "DEG1-TTP-TXRX"
301 headers = {'content-type': 'application/json'}
302 response = requests.request(
303 "POST", url, data=json.dumps(data),
304 headers=headers, auth=('admin', 'admin'))
305 self.assertEqual(response.status_code, requests.codes.ok)
306 res = response.json()
307 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA',
308 res["output"]["result"])
310 def test_10_create_OTS_ROADMC(self):
311 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
314 "node-id" : "ROADMC",
315 "logical-connection-point" : "DEG2-TTP-TXRX"
318 headers = {'content-type': 'application/json'}
319 response = requests.request(
320 "POST", url, data=json.dumps(data),
321 headers=headers, auth=('admin', 'admin'))
322 self.assertEqual(response.status_code, requests.codes.ok)
323 res = response.json()
324 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC',
325 res["output"]["result"])
327 def test_11_get_PM_ROADMA(self):
328 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
332 "resource-type": "interface",
333 "granularity": "15min",
334 "resource-identifier": {
335 "resource-name" : "OTS-DEG1-TTP-TXRX"
339 headers = {'content-type': 'application/json'}
340 response = requests.request(
341 "POST", url, data=json.dumps(data),
342 headers=headers, auth=('admin', 'admin'))
343 self.assertEqual(response.status_code, requests.codes.ok)
344 res = response.json()
346 "pmparameter-name": "OpticalPowerOutput",
347 "pmparameter-value": "2.5"
348 }, res["output"]["measurements"])
350 "pmparameter-name": "OpticalReturnLoss",
351 "pmparameter-value": "49.9"
352 }, res["output"]["measurements"])
354 "pmparameter-name": "OpticalPowerInput",
355 "pmparameter-value": "3"
356 }, res["output"]["measurements"])
358 def test_12_get_PM_ROADMC(self):
359 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
363 "resource-type": "interface",
364 "granularity": "15min",
365 "resource-identifier": {
366 "resource-name" : "OTS-DEG2-TTP-TXRX"
370 headers = {'content-type': 'application/json'}
371 response = requests.request(
372 "POST", url, data=json.dumps(data),
373 headers=headers, auth=('admin', 'admin'))
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
377 "pmparameter-name": "OpticalPowerOutput",
378 "pmparameter-value": "18.1"
379 }, res["output"]["measurements"])
381 "pmparameter-name": "OpticalReturnLoss",
382 "pmparameter-value": "48.8"
383 }, res["output"]["measurements"])
385 "pmparameter-name": "OpticalPowerInput",
386 "pmparameter-value": "-3.2"
387 }, res["output"]["measurements"])
389 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
390 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
394 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
397 headers = {'content-type': 'application/json'}
398 response = requests.request(
399 "POST", url, data=json.dumps(data),
400 headers=headers, auth=('admin', 'admin'))
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
403 self.assertIn('Success',
404 res["output"]["result"])
407 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
408 }, res["output"]["spans"])
411 def test_14_calculate_span_loss_base_all(self):
412 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
418 headers = {'content-type': 'application/json'}
419 response = requests.request(
420 "POST", url, data=json.dumps(data),
421 headers=headers, auth=('admin', 'admin'))
422 self.assertEqual(response.status_code, requests.codes.ok)
423 res = response.json()
424 self.assertIn('Success',
425 res["output"]["result"])
428 "link-id": "ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX"
429 }, res["output"]["spans"])
432 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
433 }, res["output"]["spans"])
436 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
437 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
438 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
439 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
440 headers = {'content-type': 'application/json'}
441 response = requests.request(
442 "GET", url, headers=headers, auth=('admin', 'admin'))
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
446 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
448 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
449 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
450 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
451 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
452 headers = {'content-type': 'application/json'}
453 response = requests.request(
454 "GET", url, headers=headers, auth=('admin', 'admin'))
455 self.assertEqual(response.status_code, requests.codes.ok)
456 res = response.json()
457 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
458 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
460 def test_17_servicePath_create_AToZ(self):
461 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
464 "service-name": "test",
466 "modulation-format": "qpsk",
467 "operation": "create",
470 "dest-tp": "XPDR1-NETWORK1",
471 "src-tp": "XPDR1-CLIENT1",
475 "dest-tp": "DEG1-TTP-TXRX",
476 "src-tp": "SRG1-PP1-TXRX",
480 "dest-tp": "SRG1-PP1-TXRX",
481 "src-tp": "DEG2-TTP-TXRX",
485 "dest-tp": "XPDR1-CLIENT1",
486 "src-tp": "XPDR1-NETWORK1",
492 headers = {'content-type': 'application/json'}
493 response = requests.request(
494 "POST", url, data=json.dumps(data),
495 headers=headers, auth=('admin', 'admin'))
496 self.assertEqual(response.status_code, requests.codes.ok)
497 res = response.json()
498 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
502 def test_18_servicePath_create_ZToA(self):
503 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
506 "service-name": "test",
508 "modulation-format": "qpsk",
509 "operation": "create",
512 "dest-tp": "XPDR1-NETWORK1",
513 "src-tp": "XPDR1-CLIENT1",
517 "dest-tp": "DEG2-TTP-TXRX",
518 "src-tp": "SRG1-PP1-TXRX",
522 "src-tp": "DEG1-TTP-TXRX",
523 "dest-tp": "SRG1-PP1-TXRX",
527 "src-tp": "XPDR1-NETWORK1",
528 "dest-tp": "XPDR1-CLIENT1",
534 headers = {'content-type': 'application/json'}
535 response = requests.request(
536 "POST", url, data=json.dumps(data),
537 headers=headers, auth=('admin', 'admin'))
538 self.assertEqual(response.status_code, requests.codes.ok)
539 res = response.json()
540 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
544 def test_19_service_power_setup_XPDRA_XPDRC(self):
545 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
548 "service-name": "test",
552 "dest-tp": "XPDR1-NETWORK1",
553 "src-tp": "XPDR1-CLIENT1",
557 "dest-tp": "DEG1-TTP-TXRX",
558 "src-tp": "SRG1-PP1-TXRX",
562 "dest-tp": "SRG1-PP1-TXRX",
563 "src-tp": "DEG2-TTP-TXRX",
567 "dest-tp": "XPDR1-CLIENT1",
568 "src-tp": "XPDR1-NETWORK1",
574 headers = {'content-type': 'application/json'}
575 response = requests.request(
576 "POST", url, data=json.dumps(data),
577 headers=headers, auth=('admin', 'admin'))
578 self.assertEqual(response.status_code, requests.codes.ok)
579 res = response.json()
580 self.assertIn('Success', res["output"]["result"])
582 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
583 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
584 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
585 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
586 headers = {'content-type': 'application/json'}
587 response = requests.request(
588 "GET", url, headers=headers, auth=('admin', 'admin'))
589 self.assertEqual(response.status_code, requests.codes.ok)
590 res = response.json()
591 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
592 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
594 def test_21_get_roadmconnection_ROADMA(self):
595 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
596 "org-openroadm-device:org-openroadm-device/roadm-connections/"
597 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
598 headers = {'content-type': 'application/json'}
599 response = requests.request(
600 "GET", url, headers=headers, auth=('admin', 'admin'))
601 self.assertEqual(response.status_code, requests.codes.ok)
602 res = response.json()
603 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
604 self.assertEqual(-3, res['roadm-connections'][0]['target-output-power'])
606 def test_22_get_roadmconnection_ROADMC(self):
607 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
608 "org-openroadm-device:org-openroadm-device/roadm-connections/"
609 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
610 headers = {'content-type': 'application/json'}
611 response = requests.request(
612 "GET", url, headers=headers, auth=('admin', 'admin'))
613 self.assertEqual(response.status_code, requests.codes.ok)
614 res = response.json()
615 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
617 def test_23_service_power_setup_XPDRC_XPDRA(self):
618 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
621 "service-name": "test",
625 "dest-tp": "XPDR1-NETWORK1",
626 "src-tp": "XPDR1-CLIENT1",
630 "dest-tp": "DEG2-TTP-TXRX",
631 "src-tp": "SRG1-PP1-TXRX",
635 "src-tp": "DEG1-TTP-TXRX",
636 "dest-tp": "SRG1-PP1-TXRX",
640 "src-tp": "XPDR1-NETWORK1",
641 "dest-tp": "XPDR1-CLIENT1",
647 headers = {'content-type': 'application/json'}
648 response = requests.request(
649 "POST", url, data=json.dumps(data),
650 headers=headers, auth=('admin', 'admin'))
651 self.assertEqual(response.status_code, requests.codes.ok)
652 res = response.json()
653 self.assertIn('Success', res["output"]["result"])
655 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
656 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC/yang-ext:mount/"
657 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
658 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
659 headers = {'content-type': 'application/json'}
660 response = requests.request(
661 "GET", url, headers=headers, auth=('admin', 'admin'))
662 self.assertEqual(response.status_code, requests.codes.ok)
663 res = response.json()
664 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
665 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
667 def test_25_get_roadmconnection_ROADMC(self):
668 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
669 "org-openroadm-device:org-openroadm-device/roadm-connections/"
670 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
671 headers = {'content-type': 'application/json'}
672 response = requests.request(
673 "GET", url, headers=headers, auth=('admin', 'admin'))
674 self.assertEqual(response.status_code, requests.codes.ok)
675 res = response.json()
676 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
677 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
679 def test_26_service_power_turndown_XPDRA_XPDRC(self):
680 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
683 "service-name": "test",
687 "dest-tp": "XPDR1-NETWORK1",
688 "src-tp": "XPDR1-CLIENT1",
692 "dest-tp": "DEG1-TTP-TXRX",
693 "src-tp": "SRG1-PP1-TXRX",
697 "dest-tp": "SRG1-PP1-TXRX",
698 "src-tp": "DEG2-TTP-TXRX",
702 "dest-tp": "XPDR1-CLIENT1",
703 "src-tp": "XPDR1-NETWORK1",
709 headers = {'content-type': 'application/json'}
710 response = requests.request(
711 "POST", url, data=json.dumps(data),
712 headers=headers, auth=('admin', 'admin'))
713 self.assertEqual(response.status_code, requests.codes.ok)
714 res = response.json()
715 self.assertIn('Success', res["output"]["result"])
717 def test_27_get_roadmconnection_ROADMA(self):
718 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
719 "org-openroadm-device:org-openroadm-device/roadm-connections/"
720 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
721 headers = {'content-type': 'application/json'}
722 response = requests.request(
723 "GET", url, headers=headers, auth=('admin', 'admin'))
724 self.assertEqual(response.status_code, requests.codes.ok)
725 res = response.json()
726 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
727 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
729 def test_28_get_roadmconnection_ROADMC(self):
730 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
731 "org-openroadm-device:org-openroadm-device/roadm-connections/"
732 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
733 headers = {'content-type': 'application/json'}
734 response = requests.request(
735 "GET", url, headers=headers, auth=('admin', 'admin'))
736 self.assertEqual(response.status_code, requests.codes.ok)
737 res = response.json()
738 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
740 def test_29_servicePath_delete_AToZ(self):
741 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
744 "service-name": "test",
746 "modulation-format": "qpsk",
747 "operation": "delete",
750 "dest-tp": "XPDR1-NETWORK1",
751 "src-tp": "XPDR1-CLIENT1",
755 "dest-tp": "DEG1-TTP-TXRX",
756 "src-tp": "SRG1-PP1-TXRX",
760 "dest-tp": "SRG1-PP1-TXRX",
761 "src-tp": "DEG2-TTP-TXRX",
765 "dest-tp": "XPDR1-CLIENT1",
766 "src-tp": "XPDR1-NETWORK1",
772 headers = {'content-type': 'application/json'}
773 response = requests.request(
774 "POST", url, data=json.dumps(data),
775 headers=headers, auth=('admin', 'admin'))
776 self.assertEqual(response.status_code, requests.codes.ok)
777 res = response.json()
778 self.assertIn('Request processed', res["output"]["result"])
781 def test_30_servicePath_delete_ZToA(self):
782 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
785 "service-name": "test",
787 "modulation-format": "qpsk",
788 "operation": "delete",
791 "dest-tp": "XPDR1-NETWORK1",
792 "src-tp": "XPDR1-CLIENT1",
796 "dest-tp": "DEG2-TTP-TXRX",
797 "src-tp": "SRG1-PP1-TXRX",
801 "src-tp": "DEG1-TTP-TXRX",
802 "dest-tp": "SRG1-PP1-TXRX",
806 "src-tp": "XPDR1-NETWORK1",
807 "dest-tp": "XPDR1-CLIENT1",
813 headers = {'content-type': 'application/json'}
814 response = requests.request(
815 "POST", url, data=json.dumps(data),
816 headers=headers, auth=('admin', 'admin'))
817 self.assertEqual(response.status_code, requests.codes.ok)
818 res = response.json()
819 self.assertIn('Request processed', res["output"]["result"])
822 """to test case where SRG where the xpdr is connected to has no optical range data"""
824 def test_31_connect_xprdA_to_roadmA(self):
825 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
827 "networkutils:input": {
828 "networkutils:links-input": {
829 "networkutils:xpdr-node": "XPDRA",
830 "networkutils:xpdr-num": "1",
831 "networkutils:network-num": "2",
832 "networkutils:rdm-node": "ROADMA",
833 "networkutils:srg-num": "1",
834 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
838 headers = {'content-type': 'application/json'}
839 response = requests.request(
840 "POST", url, data=json.dumps(data),
841 headers=headers, auth=('admin', 'admin'))
842 self.assertEqual(response.status_code, requests.codes.ok)
843 res = response.json()
844 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
846 def test_32_connect_roadmA_to_xpdrA(self):
847 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
849 "networkutils:input": {
850 "networkutils:links-input": {
851 "networkutils:xpdr-node": "XPDRA",
852 "networkutils:xpdr-num": "1",
853 "networkutils:network-num": "2",
854 "networkutils:rdm-node": "ROADMA",
855 "networkutils:srg-num": "1",
856 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
860 headers = {'content-type': 'application/json'}
861 response = requests.request(
862 "POST", url, data=json.dumps(data),
863 headers=headers, auth=('admin', 'admin'))
864 self.assertEqual(response.status_code, requests.codes.ok)
865 res = response.json()
866 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
868 def test_33_servicePath_create_AToZ(self):
869 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
872 "service-name": "test2",
874 "modulation-format": "qpsk",
875 "operation": "create",
878 "dest-tp": "XPDR1-NETWORK2",
879 "src-tp": "XPDR1-CLIENT2",
883 "dest-tp": "DEG1-TTP-TXRX",
884 "src-tp": "SRG1-PP2-TXRX",
890 headers = {'content-type': 'application/json'}
891 response = requests.request(
892 "POST", url, data=json.dumps(data),
893 headers=headers, auth=('admin', 'admin'))
894 self.assertEqual(response.status_code, requests.codes.ok)
895 res = response.json()
896 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
900 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
901 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
902 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
903 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
904 headers = {'content-type': 'application/json'}
905 response = requests.request(
906 "GET", url, headers=headers, auth=('admin', 'admin'))
907 self.assertEqual(response.status_code, requests.codes.ok)
908 res = response.json()
909 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
910 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
912 def test_35_servicePath_delete_AToZ(self):
913 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
916 "service-name": "test",
918 "modulation-format": "qpsk",
919 "operation": "delete",
922 "dest-tp": "XPDR1-NETWORK2",
923 "src-tp": "XPDR1-CLIENT2",
927 "dest-tp": "DEG1-TTP-TXRX",
928 "src-tp": "SRG1-PP2-TXRX",
934 headers = {'content-type': 'application/json'}
935 response = requests.request(
936 "POST", url, data=json.dumps(data),
937 headers=headers, auth=('admin', 'admin'))
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 self.assertIn('Request processed', res["output"]["result"])
943 def test_36_xpdrA_device_disconnected(self):
944 url = ("{}/config/network-topology:"
945 "network-topology/topology/topology-netconf/node/XPDRA"
946 .format(self.restconf_baseurl))
947 headers = {'content-type': 'application/json'}
948 response = requests.request(
949 "DELETE", url, headers=headers,
950 auth=('admin', 'admin'))
951 self.assertEqual(response.status_code, requests.codes.ok)
954 def test_37_xpdrC_device_disconnected(self):
955 url = ("{}/config/network-topology:"
956 "network-topology/topology/topology-netconf/node/XPDRC"
957 .format(self.restconf_baseurl))
958 headers = {'content-type': 'application/json'}
959 response = requests.request(
960 "DELETE", url, headers=headers,
961 auth=('admin', 'admin'))
962 self.assertEqual(response.status_code, requests.codes.ok)
965 def test_38_calculate_span_loss_current(self):
966 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
967 headers = {'content-type': 'application/json'}
968 response = requests.request(
969 "POST", url, headers=headers, auth=('admin', 'admin'))
970 self.assertEqual(response.status_code, requests.codes.ok)
971 res = response.json()
972 self.assertIn('Success',
973 res["output"]["result"])
976 def test_39_rdmA_device_disconnected(self):
977 url = ("{}/config/network-topology:"
978 "network-topology/topology/topology-netconf/node/ROADMA"
979 .format(self.restconf_baseurl))
980 headers = {'content-type': 'application/json'}
981 response = requests.request(
982 "DELETE", url, headers=headers,
983 auth=('admin', 'admin'))
984 self.assertEqual(response.status_code, requests.codes.ok)
987 def test_40_rdmC_device_disconnected(self):
988 url = ("{}/config/network-topology:"
989 "network-topology/topology/topology-netconf/node/ROADMC"
990 .format(self.restconf_baseurl))
991 headers = {'content-type': 'application/json'}
992 response = requests.request(
993 "DELETE", url, headers=headers,
994 auth=('admin', 'admin'))
995 self.assertEqual(response.status_code, requests.codes.ok)
998 if __name__ == "__main__":
999 unittest.main(verbosity=2)