3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
24 class TransportOlmTesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
33 #START_IGNORE_XTESTING
36 def __start_honeynode1(cls):
37 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
38 "/honeynode-distribution-1.18.01/honeycomb-tpce")
39 if os.path.isfile(executable):
40 with open('honeynode1.log', 'w') as outfile:
41 cls.honeynode_process1 = subprocess.Popen(
42 [executable, "17840", "sample_configs/openroadm/2.1/oper-ROADMA-full.xml"],
46 def __start_honeynode2(cls):
47 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
48 "/honeynode-distribution-1.18.01/honeycomb-tpce")
49 if os.path.isfile(executable):
50 with open('honeynode2.log', 'w') as outfile:
51 cls.honeynode_process2 = subprocess.Popen(
52 [executable, "17831", "sample_configs/openroadm/2.1/oper-XPDRA.xml"],
56 def __start_honeynode3(cls):
57 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
58 "/honeynode-distribution-1.18.01/honeycomb-tpce")
59 if os.path.isfile(executable):
60 with open('honeynode3.log', 'w') as outfile:
61 cls.honeynode_process3 = subprocess.Popen(
62 [executable, "17843", "sample_configs/openroadm/2.1/oper-ROADMC-full.xml"],
65 def __start_honeynode4(cls):
66 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
67 "/honeynode-distribution-1.18.01/honeycomb-tpce")
68 if os.path.isfile(executable):
69 with open('honeynode4.log', 'w') as outfile:
70 cls.honeynode_process4 = subprocess.Popen(
71 [executable, "17834", "sample_configs/openroadm/2.1/oper-XPDRC.xml"],
76 executable = "../karaf/target/assembly/bin/karaf"
77 with open('odl.log', 'w') as outfile:
78 cls.odl_process = subprocess.Popen(
79 ["bash", executable, "server"], stdout=outfile,
80 stdin=open(os.devnull))
84 cls.__start_honeynode1()
86 cls.__start_honeynode2()
88 cls.__start_honeynode3()
90 cls.__start_honeynode4()
96 def tearDownClass(cls):
97 for child in psutil.Process(cls.odl_process.pid).children():
98 child.send_signal(signal.SIGINT)
100 cls.odl_process.send_signal(signal.SIGINT)
101 cls.odl_process.wait()
102 for child in psutil.Process(cls.honeynode_process1.pid).children():
103 child.send_signal(signal.SIGINT)
105 cls.honeynode_process1.send_signal(signal.SIGINT)
106 cls.honeynode_process1.wait()
107 for child in psutil.Process(cls.honeynode_process2.pid).children():
108 child.send_signal(signal.SIGINT)
110 cls.honeynode_process2.send_signal(signal.SIGINT)
111 cls.honeynode_process2.wait()
112 for child in psutil.Process(cls.honeynode_process3.pid).children():
113 child.send_signal(signal.SIGINT)
115 cls.honeynode_process3.send_signal(signal.SIGINT)
116 cls.honeynode_process3.wait()
117 for child in psutil.Process(cls.honeynode_process4.pid).children():
118 child.send_signal(signal.SIGINT)
120 cls.honeynode_process4.send_signal(signal.SIGINT)
121 cls.honeynode_process4.wait()
124 print ("execution of {}".format(self.id().split(".")[-1]))
129 def test_01_xpdrA_device_connected(self):
130 url = ("{}/config/network-topology:"
131 "network-topology/topology/topology-netconf/node/XPDRA"
132 .format(self.restconf_baseurl))
135 "netconf-node-topology:username": "admin",
136 "netconf-node-topology:password": "admin",
137 "netconf-node-topology:host": "127.0.0.1",
138 "netconf-node-topology:port": "17831",
139 "netconf-node-topology:tcp-only": "false",
140 "netconf-node-topology:pass-through": {}}]}
141 headers = {'content-type': 'application/json'}
142 response = requests.request(
143 "PUT", url, data=json.dumps(data), headers=headers,
144 auth=('admin', 'admin'))
145 self.assertEqual(response.status_code, requests.codes.created)
148 def test_02_xpdrC_device_connected(self):
149 url = ("{}/config/network-topology:"
150 "network-topology/topology/topology-netconf/node/XPDRC"
151 .format(self.restconf_baseurl))
154 "netconf-node-topology:username": "admin",
155 "netconf-node-topology:password": "admin",
156 "netconf-node-topology:host": "127.0.0.1",
157 "netconf-node-topology:port": "17834",
158 "netconf-node-topology:tcp-only": "false",
159 "netconf-node-topology:pass-through": {}}]}
160 headers = {'content-type': 'application/json'}
161 response = requests.request(
162 "PUT", url, data=json.dumps(data), headers=headers,
163 auth=('admin', 'admin'))
164 self.assertEqual(response.status_code, requests.codes.created)
167 def test_03_rdmA_device_connected(self):
168 url = ("{}/config/network-topology:"
169 "network-topology/topology/topology-netconf/node/ROADMA"
170 .format(self.restconf_baseurl))
173 "netconf-node-topology:username": "admin",
174 "netconf-node-topology:password": "admin",
175 "netconf-node-topology:host": "127.0.0.1",
176 "netconf-node-topology:port": "17840",
177 "netconf-node-topology:tcp-only": "false",
178 "netconf-node-topology:pass-through": {}}]}
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "PUT", url, data=json.dumps(data), headers=headers,
182 auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.created)
186 def test_04_rdmC_device_connected(self):
187 url = ("{}/config/network-topology:"
188 "network-topology/topology/topology-netconf/node/ROADMC"
189 .format(self.restconf_baseurl))
192 "netconf-node-topology:username": "admin",
193 "netconf-node-topology:password": "admin",
194 "netconf-node-topology:host": "127.0.0.1",
195 "netconf-node-topology:port": "17843",
196 "netconf-node-topology:tcp-only": "false",
197 "netconf-node-topology:pass-through": {}}]}
198 headers = {'content-type': 'application/json'}
199 response = requests.request(
200 "PUT", url, data=json.dumps(data), headers=headers,
201 auth=('admin', 'admin'))
202 self.assertEqual(response.status_code, requests.codes.created)
205 def test_05_connect_xprdA_to_roadmA(self):
206 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
208 "networkutils:input": {
209 "networkutils:links-input": {
210 "networkutils:xpdr-node": "XPDRA",
211 "networkutils:xpdr-num": "1",
212 "networkutils:network-num": "1",
213 "networkutils:rdm-node": "ROADMA",
214 "networkutils:srg-num": "1",
215 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
219 headers = {'content-type': 'application/json'}
220 response = requests.request(
221 "POST", url, data=json.dumps(data),
222 headers=headers, auth=('admin', 'admin'))
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
225 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
227 def test_06_connect_roadmA_to_xpdrA(self):
228 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
230 "networkutils:input": {
231 "networkutils:links-input": {
232 "networkutils:xpdr-node": "XPDRA",
233 "networkutils:xpdr-num": "1",
234 "networkutils:network-num": "1",
235 "networkutils:rdm-node": "ROADMA",
236 "networkutils:srg-num": "1",
237 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
241 headers = {'content-type': 'application/json'}
242 response = requests.request(
243 "POST", url, data=json.dumps(data),
244 headers=headers, auth=('admin', 'admin'))
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
249 def test_07_connect_xprdC_to_roadmC(self):
250 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
252 "networkutils:input": {
253 "networkutils:links-input": {
254 "networkutils:xpdr-node": "XPDRC",
255 "networkutils:xpdr-num": "1",
256 "networkutils:network-num": "1",
257 "networkutils:rdm-node": "ROADMC",
258 "networkutils:srg-num": "1",
259 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
263 headers = {'content-type': 'application/json'}
264 response = requests.request(
265 "POST", url, data=json.dumps(data),
266 headers=headers, auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.ok)
268 res = response.json()
269 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
271 def test_08_connect_roadmC_to_xpdrC(self):
272 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
274 "networkutils:input": {
275 "networkutils:links-input": {
276 "networkutils:xpdr-node": "XPDRC",
277 "networkutils:xpdr-num": "1",
278 "networkutils:network-num": "1",
279 "networkutils:rdm-node": "ROADMC",
280 "networkutils:srg-num": "1",
281 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
285 headers = {'content-type': 'application/json'}
286 response = requests.request(
287 "POST", url, data=json.dumps(data),
288 headers=headers, auth=('admin', 'admin'))
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
293 def test_09_create_OTS_ROADMA(self):
294 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
297 "node-id" : "ROADMA",
298 "logical-connection-point" : "DEG1-TTP-TXRX"
301 headers = {'content-type': 'application/json'}
302 response = requests.request(
303 "POST", url, data=json.dumps(data),
304 headers=headers, auth=('admin', 'admin'))
305 self.assertEqual(response.status_code, requests.codes.ok)
306 res = response.json()
307 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA',
308 res["output"]["result"])
310 def test_10_create_OTS_ROADMC(self):
311 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
314 "node-id" : "ROADMC",
315 "logical-connection-point" : "DEG2-TTP-TXRX"
318 headers = {'content-type': 'application/json'}
319 response = requests.request(
320 "POST", url, data=json.dumps(data),
321 headers=headers, auth=('admin', 'admin'))
322 self.assertEqual(response.status_code, requests.codes.ok)
323 res = response.json()
324 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC',
325 res["output"]["result"])
327 def test_11_get_PM_ROADMA(self):
328 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
332 "resource-type": "interface",
333 "granularity": "15min",
334 "resource-identifier": {
335 "resource-name" : "OTS-DEG1-TTP-TXRX"
339 headers = {'content-type': 'application/json'}
340 response = requests.request(
341 "POST", url, data=json.dumps(data),
342 headers=headers, auth=('admin', 'admin'))
343 self.assertEqual(response.status_code, requests.codes.ok)
344 res = response.json()
346 "pmparameter-name": "OpticalPowerOutput",
347 "pmparameter-value": "2.5"
348 }, res["output"]["measurements"])
350 "pmparameter-name": "OpticalReturnLoss",
351 "pmparameter-value": "49.9"
352 }, res["output"]["measurements"])
354 "pmparameter-name": "OpticalPowerInput",
355 "pmparameter-value": "3"
356 }, res["output"]["measurements"])
358 def test_12_get_PM_ROADMC(self):
359 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
363 "resource-type": "interface",
364 "granularity": "15min",
365 "resource-identifier": {
366 "resource-name" : "OTS-DEG2-TTP-TXRX"
370 headers = {'content-type': 'application/json'}
371 response = requests.request(
372 "POST", url, data=json.dumps(data),
373 headers=headers, auth=('admin', 'admin'))
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
377 "pmparameter-name": "OpticalPowerOutput",
378 "pmparameter-value": "18.1"
379 }, res["output"]["measurements"])
381 "pmparameter-name": "OpticalReturnLoss",
382 "pmparameter-value": "48.8"
383 }, res["output"]["measurements"])
385 "pmparameter-name": "OpticalPowerInput",
386 "pmparameter-value": "-3.2"
387 }, res["output"]["measurements"])
389 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
390 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
394 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
397 headers = {'content-type': 'application/json'}
398 response = requests.request(
399 "POST", url, data=json.dumps(data),
400 headers=headers, auth=('admin', 'admin'))
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
403 self.assertIn('Success',
404 res["output"]["result"])
407 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
408 }, res["output"]["spans"])
411 def test_14_calculate_span_loss_base_all(self):
412 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
418 headers = {'content-type': 'application/json'}
419 response = requests.request(
420 "POST", url, data=json.dumps(data),
421 headers=headers, auth=('admin', 'admin'))
422 self.assertEqual(response.status_code, requests.codes.ok)
423 res = response.json()
424 self.assertIn('Success',
425 res["output"]["result"])
428 "link-id": "ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX"
429 }, res["output"]["spans"])
432 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
433 }, res["output"]["spans"])
436 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
437 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
438 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
439 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
440 headers = {'content-type': 'application/json'}
441 response = requests.request(
442 "GET", url, headers=headers, auth=('admin', 'admin'))
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
446 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
448 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
449 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
450 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
451 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
452 headers = {'content-type': 'application/json'}
453 response = requests.request(
454 "GET", url, headers=headers, auth=('admin', 'admin'))
455 self.assertEqual(response.status_code, requests.codes.ok)
456 res = response.json()
457 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
458 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
460 def test_17_servicePath_create_AToZ(self):
461 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
464 "service-name": "test",
466 "modulation-format": "qpsk",
467 "operation": "create",
470 "dest-tp": "XPDR1-NETWORK1",
471 "src-tp": "XPDR1-CLIENT1",
475 "dest-tp": "DEG1-TTP-TXRX",
476 "src-tp": "SRG1-PP1-TXRX",
480 "dest-tp": "SRG1-PP1-TXRX",
481 "src-tp": "DEG2-TTP-TXRX",
485 "dest-tp": "XPDR1-CLIENT1",
486 "src-tp": "XPDR1-NETWORK1",
492 headers = {'content-type': 'application/json'}
493 response = requests.request(
494 "POST", url, data=json.dumps(data),
495 headers=headers, auth=('admin', 'admin'))
496 self.assertEqual(response.status_code, requests.codes.ok)
497 res = response.json()
498 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
501 def test_18_servicePath_create_ZToA(self):
502 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
505 "service-name": "test",
507 "modulation-format": "qpsk",
508 "operation": "create",
511 "dest-tp": "XPDR1-NETWORK1",
512 "src-tp": "XPDR1-CLIENT1",
516 "dest-tp": "DEG2-TTP-TXRX",
517 "src-tp": "SRG1-PP1-TXRX",
521 "src-tp": "DEG1-TTP-TXRX",
522 "dest-tp": "SRG1-PP1-TXRX",
526 "src-tp": "XPDR1-NETWORK1",
527 "dest-tp": "XPDR1-CLIENT1",
533 headers = {'content-type': 'application/json'}
534 response = requests.request(
535 "POST", url, data=json.dumps(data),
536 headers=headers, auth=('admin', 'admin'))
537 self.assertEqual(response.status_code, requests.codes.ok)
538 res = response.json()
539 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
542 def test_19_service_power_setup_XPDRA_XPDRC(self):
543 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
546 "service-name": "test",
550 "dest-tp": "XPDR1-NETWORK1",
551 "src-tp": "XPDR1-CLIENT1",
555 "dest-tp": "DEG1-TTP-TXRX",
556 "src-tp": "SRG1-PP1-TXRX",
560 "dest-tp": "SRG1-PP1-TXRX",
561 "src-tp": "DEG2-TTP-TXRX",
565 "dest-tp": "XPDR1-CLIENT1",
566 "src-tp": "XPDR1-NETWORK1",
572 headers = {'content-type': 'application/json'}
573 response = requests.request(
574 "POST", url, data=json.dumps(data),
575 headers=headers, auth=('admin', 'admin'))
576 self.assertEqual(response.status_code, requests.codes.ok)
577 res = response.json()
578 self.assertIn('Success', res["output"]["result"])
580 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
581 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
582 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
583 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
584 headers = {'content-type': 'application/json'}
585 response = requests.request(
586 "GET", url, headers=headers, auth=('admin', 'admin'))
587 self.assertEqual(response.status_code, requests.codes.ok)
588 res = response.json()
589 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
590 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
592 def test_21_get_roadmconnection_ROADMA(self):
593 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
594 "org-openroadm-device:org-openroadm-device/roadm-connections/"
595 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
596 headers = {'content-type': 'application/json'}
597 response = requests.request(
598 "GET", url, headers=headers, auth=('admin', 'admin'))
599 self.assertEqual(response.status_code, requests.codes.ok)
600 res = response.json()
601 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
602 self.assertEqual(-3, res['roadm-connections'][0]['target-output-power'])
604 def test_22_get_roadmconnection_ROADMC(self):
605 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
606 "org-openroadm-device:org-openroadm-device/roadm-connections/"
607 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
608 headers = {'content-type': 'application/json'}
609 response = requests.request(
610 "GET", url, headers=headers, auth=('admin', 'admin'))
611 self.assertEqual(response.status_code, requests.codes.ok)
612 res = response.json()
613 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
615 def test_23_service_power_setup_XPDRC_XPDRA(self):
616 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
619 "service-name": "test",
623 "dest-tp": "XPDR1-NETWORK1",
624 "src-tp": "XPDR1-CLIENT1",
628 "dest-tp": "DEG2-TTP-TXRX",
629 "src-tp": "SRG1-PP1-TXRX",
633 "src-tp": "DEG1-TTP-TXRX",
634 "dest-tp": "SRG1-PP1-TXRX",
638 "src-tp": "XPDR1-NETWORK1",
639 "dest-tp": "XPDR1-CLIENT1",
645 headers = {'content-type': 'application/json'}
646 response = requests.request(
647 "POST", url, data=json.dumps(data),
648 headers=headers, auth=('admin', 'admin'))
649 self.assertEqual(response.status_code, requests.codes.ok)
650 res = response.json()
651 self.assertIn('Success', res["output"]["result"])
653 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
654 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC/yang-ext:mount/"
655 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
656 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
657 headers = {'content-type': 'application/json'}
658 response = requests.request(
659 "GET", url, headers=headers, auth=('admin', 'admin'))
660 self.assertEqual(response.status_code, requests.codes.ok)
661 res = response.json()
662 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
663 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
665 def test_25_get_roadmconnection_ROADMC(self):
666 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
667 "org-openroadm-device:org-openroadm-device/roadm-connections/"
668 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
669 headers = {'content-type': 'application/json'}
670 response = requests.request(
671 "GET", url, headers=headers, auth=('admin', 'admin'))
672 self.assertEqual(response.status_code, requests.codes.ok)
673 res = response.json()
674 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
675 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
677 def test_26_service_power_turndown_XPDRA_XPDRC(self):
678 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
681 "service-name": "test",
685 "dest-tp": "XPDR1-NETWORK1",
686 "src-tp": "XPDR1-CLIENT1",
690 "dest-tp": "DEG1-TTP-TXRX",
691 "src-tp": "SRG1-PP1-TXRX",
695 "dest-tp": "SRG1-PP1-TXRX",
696 "src-tp": "DEG2-TTP-TXRX",
700 "dest-tp": "XPDR1-CLIENT1",
701 "src-tp": "XPDR1-NETWORK1",
707 headers = {'content-type': 'application/json'}
708 response = requests.request(
709 "POST", url, data=json.dumps(data),
710 headers=headers, auth=('admin', 'admin'))
711 self.assertEqual(response.status_code, requests.codes.ok)
712 res = response.json()
713 self.assertIn('Success', res["output"]["result"])
715 def test_27_get_roadmconnection_ROADMA(self):
716 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
717 "org-openroadm-device:org-openroadm-device/roadm-connections/"
718 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
719 headers = {'content-type': 'application/json'}
720 response = requests.request(
721 "GET", url, headers=headers, auth=('admin', 'admin'))
722 self.assertEqual(response.status_code, requests.codes.ok)
723 res = response.json()
724 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
725 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
727 def test_28_get_roadmconnection_ROADMC(self):
728 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
729 "org-openroadm-device:org-openroadm-device/roadm-connections/"
730 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
731 headers = {'content-type': 'application/json'}
732 response = requests.request(
733 "GET", url, headers=headers, auth=('admin', 'admin'))
734 self.assertEqual(response.status_code, requests.codes.ok)
735 res = response.json()
736 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
738 def test_29_servicePath_delete_AToZ(self):
739 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
742 "service-name": "test",
744 "modulation-format": "qpsk",
745 "operation": "delete",
748 "dest-tp": "XPDR1-NETWORK1",
749 "src-tp": "XPDR1-CLIENT1",
753 "dest-tp": "DEG1-TTP-TXRX",
754 "src-tp": "SRG1-PP1-TXRX",
758 "dest-tp": "SRG1-PP1-TXRX",
759 "src-tp": "DEG2-TTP-TXRX",
763 "dest-tp": "XPDR1-CLIENT1",
764 "src-tp": "XPDR1-NETWORK1",
770 headers = {'content-type': 'application/json'}
771 response = requests.request(
772 "POST", url, data=json.dumps(data),
773 headers=headers, auth=('admin', 'admin'))
774 self.assertEqual(response.status_code, requests.codes.ok)
775 res = response.json()
776 self.assertIn('Request processed', res["output"]["result"])
779 def test_30_servicePath_delete_ZToA(self):
780 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
783 "service-name": "test",
785 "modulation-format": "qpsk",
786 "operation": "delete",
789 "dest-tp": "XPDR1-NETWORK1",
790 "src-tp": "XPDR1-CLIENT1",
794 "dest-tp": "DEG2-TTP-TXRX",
795 "src-tp": "SRG1-PP1-TXRX",
799 "src-tp": "DEG1-TTP-TXRX",
800 "dest-tp": "SRG1-PP1-TXRX",
804 "src-tp": "XPDR1-NETWORK1",
805 "dest-tp": "XPDR1-CLIENT1",
811 headers = {'content-type': 'application/json'}
812 response = requests.request(
813 "POST", url, data=json.dumps(data),
814 headers=headers, auth=('admin', 'admin'))
815 self.assertEqual(response.status_code, requests.codes.ok)
816 res = response.json()
817 self.assertIn('Request processed', res["output"]["result"])
820 """to test case where SRG where the xpdr is connected to has no optical range data"""
822 def test_31_connect_xprdA_to_roadmA(self):
823 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
825 "networkutils:input": {
826 "networkutils:links-input": {
827 "networkutils:xpdr-node": "XPDRA",
828 "networkutils:xpdr-num": "1",
829 "networkutils:network-num": "2",
830 "networkutils:rdm-node": "ROADMA",
831 "networkutils:srg-num": "1",
832 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
836 headers = {'content-type': 'application/json'}
837 response = requests.request(
838 "POST", url, data=json.dumps(data),
839 headers=headers, auth=('admin', 'admin'))
840 self.assertEqual(response.status_code, requests.codes.ok)
841 res = response.json()
842 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
844 def test_32_connect_roadmA_to_xpdrA(self):
845 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
847 "networkutils:input": {
848 "networkutils:links-input": {
849 "networkutils:xpdr-node": "XPDRA",
850 "networkutils:xpdr-num": "1",
851 "networkutils:network-num": "2",
852 "networkutils:rdm-node": "ROADMA",
853 "networkutils:srg-num": "1",
854 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
858 headers = {'content-type': 'application/json'}
859 response = requests.request(
860 "POST", url, data=json.dumps(data),
861 headers=headers, auth=('admin', 'admin'))
862 self.assertEqual(response.status_code, requests.codes.ok)
863 res = response.json()
864 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
866 def test_33_servicePath_create_AToZ(self):
867 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
870 "service-name": "test2",
872 "modulation-format": "qpsk",
873 "operation": "create",
876 "dest-tp": "XPDR1-NETWORK2",
877 "src-tp": "XPDR1-CLIENT2",
881 "dest-tp": "DEG1-TTP-TXRX",
882 "src-tp": "SRG1-PP2-TXRX",
888 headers = {'content-type': 'application/json'}
889 response = requests.request(
890 "POST", url, data=json.dumps(data),
891 headers=headers, auth=('admin', 'admin'))
892 self.assertEqual(response.status_code, requests.codes.ok)
893 res = response.json()
894 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
897 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
898 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
899 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
900 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
901 headers = {'content-type': 'application/json'}
902 response = requests.request(
903 "GET", url, headers=headers, auth=('admin', 'admin'))
904 self.assertEqual(response.status_code, requests.codes.ok)
905 res = response.json()
906 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
907 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
909 def test_35_servicePath_delete_AToZ(self):
910 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
913 "service-name": "test",
915 "modulation-format": "qpsk",
916 "operation": "delete",
919 "dest-tp": "XPDR1-NETWORK2",
920 "src-tp": "XPDR1-CLIENT2",
924 "dest-tp": "DEG1-TTP-TXRX",
925 "src-tp": "SRG1-PP2-TXRX",
931 headers = {'content-type': 'application/json'}
932 response = requests.request(
933 "POST", url, data=json.dumps(data),
934 headers=headers, auth=('admin', 'admin'))
935 self.assertEqual(response.status_code, requests.codes.ok)
936 res = response.json()
937 self.assertIn('Request processed', res["output"]["result"])
940 def test_36_xpdrA_device_disconnected(self):
941 url = ("{}/config/network-topology:"
942 "network-topology/topology/topology-netconf/node/XPDRA"
943 .format(self.restconf_baseurl))
944 headers = {'content-type': 'application/json'}
945 response = requests.request(
946 "DELETE", url, headers=headers,
947 auth=('admin', 'admin'))
948 self.assertEqual(response.status_code, requests.codes.ok)
951 def test_37_xpdrC_device_disconnected(self):
952 url = ("{}/config/network-topology:"
953 "network-topology/topology/topology-netconf/node/XPDRC"
954 .format(self.restconf_baseurl))
955 headers = {'content-type': 'application/json'}
956 response = requests.request(
957 "DELETE", url, headers=headers,
958 auth=('admin', 'admin'))
959 self.assertEqual(response.status_code, requests.codes.ok)
962 def test_38_calculate_span_loss_current(self):
963 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
964 headers = {'content-type': 'application/json'}
965 response = requests.request(
966 "POST", url, headers=headers, auth=('admin', 'admin'))
967 self.assertEqual(response.status_code, requests.codes.ok)
968 res = response.json()
969 self.assertIn('Success',
970 res["output"]["result"])
973 def test_39_rdmA_device_disconnected(self):
974 url = ("{}/config/network-topology:"
975 "network-topology/topology/topology-netconf/node/ROADMA"
976 .format(self.restconf_baseurl))
977 headers = {'content-type': 'application/json'}
978 response = requests.request(
979 "DELETE", url, headers=headers,
980 auth=('admin', 'admin'))
981 self.assertEqual(response.status_code, requests.codes.ok)
984 def test_40_rdmC_device_disconnected(self):
985 url = ("{}/config/network-topology:"
986 "network-topology/topology/topology-netconf/node/ROADMC"
987 .format(self.restconf_baseurl))
988 headers = {'content-type': 'application/json'}
989 response = requests.request(
990 "DELETE", url, headers=headers,
991 auth=('admin', 'admin'))
992 self.assertEqual(response.status_code, requests.codes.ok)
995 if __name__ == "__main__":
996 unittest.main(verbosity=2)