3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
24 class TransportOlmTesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
34 def __start_honeynode1(cls):
35 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
36 "/honeynode-distribution-1.18.01/honeycomb-tpce")
37 if os.path.isfile(executable):
38 with open('honeynode1.log', 'w') as outfile:
39 cls.honeynode_process1 = subprocess.Popen(
40 [executable, "17830", "sample_configs/ord_2.1/oper-ROADMA-full.xml"],
44 def __start_honeynode2(cls):
45 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
46 "/honeynode-distribution-1.18.01/honeycomb-tpce")
47 if os.path.isfile(executable):
48 with open('honeynode2.log', 'w') as outfile:
49 cls.honeynode_process2 = subprocess.Popen(
50 [executable, "17831", "sample_configs/ord_2.1/oper-XPDRA.xml"],
54 def __start_honeynode3(cls):
55 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
56 "/honeynode-distribution-1.18.01/honeycomb-tpce")
57 if os.path.isfile(executable):
58 with open('honeynode3.log', 'w') as outfile:
59 cls.honeynode_process3 = subprocess.Popen(
60 [executable, "17833", "sample_configs/ord_2.1/oper-ROADMC-full.xml"],
63 def __start_honeynode4(cls):
64 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
65 "/honeynode-distribution-1.18.01/honeycomb-tpce")
66 if os.path.isfile(executable):
67 with open('honeynode4.log', 'w') as outfile:
68 cls.honeynode_process4 = subprocess.Popen(
69 [executable, "17834", "sample_configs/ord_2.1/oper-XPDRC.xml"],
74 executable = "../karaf/target/assembly/bin/karaf"
75 with open('odl.log', 'w') as outfile:
76 cls.odl_process = subprocess.Popen(
77 ["bash", executable, "server"], stdout=outfile,
78 stdin=open(os.devnull))
82 cls.__start_honeynode1()
84 cls.__start_honeynode2()
86 cls.__start_honeynode3()
88 cls.__start_honeynode4()
94 def tearDownClass(cls):
95 for child in psutil.Process(cls.odl_process.pid).children():
96 child.send_signal(signal.SIGINT)
98 cls.odl_process.send_signal(signal.SIGINT)
99 cls.odl_process.wait()
100 for child in psutil.Process(cls.honeynode_process1.pid).children():
101 child.send_signal(signal.SIGINT)
103 cls.honeynode_process1.send_signal(signal.SIGINT)
104 cls.honeynode_process1.wait()
105 for child in psutil.Process(cls.honeynode_process2.pid).children():
106 child.send_signal(signal.SIGINT)
108 cls.honeynode_process2.send_signal(signal.SIGINT)
109 cls.honeynode_process2.wait()
110 for child in psutil.Process(cls.honeynode_process3.pid).children():
111 child.send_signal(signal.SIGINT)
113 cls.honeynode_process3.send_signal(signal.SIGINT)
114 cls.honeynode_process3.wait()
115 for child in psutil.Process(cls.honeynode_process4.pid).children():
116 child.send_signal(signal.SIGINT)
118 cls.honeynode_process4.send_signal(signal.SIGINT)
119 cls.honeynode_process4.wait()
122 print ("execution of {}".format(self.id().split(".")[-1]))
125 def test_01_xpdrA_device_connected(self):
126 url = ("{}/config/network-topology:"
127 "network-topology/topology/topology-netconf/node/XPDRA"
128 .format(self.restconf_baseurl))
131 "netconf-node-topology:username": "admin",
132 "netconf-node-topology:password": "admin",
133 "netconf-node-topology:host": "127.0.0.1",
134 "netconf-node-topology:port": "17831",
135 "netconf-node-topology:tcp-only": "false",
136 "netconf-node-topology:pass-through": {}}]}
137 headers = {'content-type': 'application/json'}
138 response = requests.request(
139 "PUT", url, data=json.dumps(data), headers=headers,
140 auth=('admin', 'admin'))
141 self.assertEqual(response.status_code, requests.codes.created)
144 def test_02_xpdrC_device_connected(self):
145 url = ("{}/config/network-topology:"
146 "network-topology/topology/topology-netconf/node/XPDRC"
147 .format(self.restconf_baseurl))
150 "netconf-node-topology:username": "admin",
151 "netconf-node-topology:password": "admin",
152 "netconf-node-topology:host": "127.0.0.1",
153 "netconf-node-topology:port": "17834",
154 "netconf-node-topology:tcp-only": "false",
155 "netconf-node-topology:pass-through": {}}]}
156 headers = {'content-type': 'application/json'}
157 response = requests.request(
158 "PUT", url, data=json.dumps(data), headers=headers,
159 auth=('admin', 'admin'))
160 self.assertEqual(response.status_code, requests.codes.created)
163 def test_03_rdmA_device_connected(self):
164 url = ("{}/config/network-topology:"
165 "network-topology/topology/topology-netconf/node/ROADMA"
166 .format(self.restconf_baseurl))
169 "netconf-node-topology:username": "admin",
170 "netconf-node-topology:password": "admin",
171 "netconf-node-topology:host": "127.0.0.1",
172 "netconf-node-topology:port": "17830",
173 "netconf-node-topology:tcp-only": "false",
174 "netconf-node-topology:pass-through": {}}]}
175 headers = {'content-type': 'application/json'}
176 response = requests.request(
177 "PUT", url, data=json.dumps(data), headers=headers,
178 auth=('admin', 'admin'))
179 self.assertEqual(response.status_code, requests.codes.created)
182 def test_04_rdmC_device_connected(self):
183 url = ("{}/config/network-topology:"
184 "network-topology/topology/topology-netconf/node/ROADMC"
185 .format(self.restconf_baseurl))
188 "netconf-node-topology:username": "admin",
189 "netconf-node-topology:password": "admin",
190 "netconf-node-topology:host": "127.0.0.1",
191 "netconf-node-topology:port": "17833",
192 "netconf-node-topology:tcp-only": "false",
193 "netconf-node-topology:pass-through": {}}]}
194 headers = {'content-type': 'application/json'}
195 response = requests.request(
196 "PUT", url, data=json.dumps(data), headers=headers,
197 auth=('admin', 'admin'))
198 self.assertEqual(response.status_code, requests.codes.created)
201 def test_05_connect_xprdA_to_roadmA(self):
202 url = "{}/operations/networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
204 "networkutils:input": {
205 "networkutils:links-input": {
206 "networkutils:xpdr-node": "XPDRA",
207 "networkutils:xpdr-num": "1",
208 "networkutils:network-num": "1",
209 "networkutils:rdm-node": "ROADMA",
210 "networkutils:srg-num": "1",
211 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
215 headers = {'content-type': 'application/json'}
216 response = requests.request(
217 "POST", url, data=json.dumps(data),
218 headers=headers, auth=('admin', 'admin'))
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
223 def test_06_connect_roadmA_to_xpdrA(self):
224 url = "{}/operations/networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
226 "networkutils:input": {
227 "networkutils:links-input": {
228 "networkutils:xpdr-node": "XPDRA",
229 "networkutils:xpdr-num": "1",
230 "networkutils:network-num": "1",
231 "networkutils:rdm-node": "ROADMA",
232 "networkutils:srg-num": "1",
233 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
237 headers = {'content-type': 'application/json'}
238 response = requests.request(
239 "POST", url, data=json.dumps(data),
240 headers=headers, auth=('admin', 'admin'))
241 self.assertEqual(response.status_code, requests.codes.ok)
242 res = response.json()
243 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
245 def test_07_connect_xprdC_to_roadmC(self):
246 url = "{}/operations/networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
248 "networkutils:input": {
249 "networkutils:links-input": {
250 "networkutils:xpdr-node": "XPDRC",
251 "networkutils:xpdr-num": "1",
252 "networkutils:network-num": "1",
253 "networkutils:rdm-node": "ROADMC",
254 "networkutils:srg-num": "1",
255 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
259 headers = {'content-type': 'application/json'}
260 response = requests.request(
261 "POST", url, data=json.dumps(data),
262 headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
267 def test_08_connect_roadmC_to_xpdrC(self):
268 url = "{}/operations/networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
270 "networkutils:input": {
271 "networkutils:links-input": {
272 "networkutils:xpdr-node": "XPDRC",
273 "networkutils:xpdr-num": "1",
274 "networkutils:network-num": "1",
275 "networkutils:rdm-node": "ROADMC",
276 "networkutils:srg-num": "1",
277 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
281 headers = {'content-type': 'application/json'}
282 response = requests.request(
283 "POST", url, data=json.dumps(data),
284 headers=headers, auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
289 def test_09_create_OTS_ROADMA(self):
290 url = "{}/operations/renderer:create-ots-oms".format(self.restconf_baseurl)
293 "node-id" : "ROADMA",
294 "logical-connection-point" : "DEG1-TTP-TXRX"
297 headers = {'content-type': 'application/json'}
298 response = requests.request(
299 "POST", url, data=json.dumps(data),
300 headers=headers, auth=('admin', 'admin'))
301 self.assertEqual(response.status_code, requests.codes.ok)
302 res = response.json()
303 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA',
304 res["output"]["result"])
306 def test_10_create_OTS_ROADMC(self):
307 url = "{}/operations/renderer:create-ots-oms".format(self.restconf_baseurl)
310 "node-id" : "ROADMC",
311 "logical-connection-point" : "DEG2-TTP-TXRX"
314 headers = {'content-type': 'application/json'}
315 response = requests.request(
316 "POST", url, data=json.dumps(data),
317 headers=headers, auth=('admin', 'admin'))
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
320 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC',
321 res["output"]["result"])
323 def test_11_get_PM_ROADMA(self):
324 url = "{}/operations/olm:get-pm".format(self.restconf_baseurl)
328 "resource-type": "interface",
329 "granularity": "15min",
330 "resource-identifier": {
331 "resource-name" : "OTS-DEG1-TTP-TXRX"
335 headers = {'content-type': 'application/json'}
336 response = requests.request(
337 "POST", url, data=json.dumps(data),
338 headers=headers, auth=('admin', 'admin'))
339 self.assertEqual(response.status_code, requests.codes.ok)
340 res = response.json()
342 "pmparameter-name": "OpticalPowerOutput",
343 "pmparameter-value": "2.5"
344 }, res["output"]["measurements"])
346 "pmparameter-name": "OpticalReturnLoss",
347 "pmparameter-value": "49.9"
348 }, res["output"]["measurements"])
350 "pmparameter-name": "OpticalPowerInput",
351 "pmparameter-value": "3"
352 }, res["output"]["measurements"])
354 def test_12_get_PM_ROADMC(self):
355 url = "{}/operations/olm:get-pm".format(self.restconf_baseurl)
359 "resource-type": "interface",
360 "granularity": "15min",
361 "resource-identifier": {
362 "resource-name" : "OTS-DEG2-TTP-TXRX"
366 headers = {'content-type': 'application/json'}
367 response = requests.request(
368 "POST", url, data=json.dumps(data),
369 headers=headers, auth=('admin', 'admin'))
370 self.assertEqual(response.status_code, requests.codes.ok)
371 res = response.json()
373 "pmparameter-name": "OpticalPowerOutput",
374 "pmparameter-value": "18.1"
375 }, res["output"]["measurements"])
377 "pmparameter-name": "OpticalReturnLoss",
378 "pmparameter-value": "48.8"
379 }, res["output"]["measurements"])
381 "pmparameter-name": "OpticalPowerInput",
382 "pmparameter-value": "-3.2"
383 }, res["output"]["measurements"])
385 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
386 url = "{}/operations/olm:calculate-spanloss-base".format(self.restconf_baseurl)
390 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
393 headers = {'content-type': 'application/json'}
394 response = requests.request(
395 "POST", url, data=json.dumps(data),
396 headers=headers, auth=('admin', 'admin'))
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
399 self.assertIn('Success',
400 res["output"]["result"])
403 def test_14_calculate_span_loss_base_ROADMC_ROADMA(self):
404 url = "{}/operations/olm:calculate-spanloss-base".format(self.restconf_baseurl)
408 "link-id": "ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX"
411 headers = {'content-type': 'application/json'}
412 response = requests.request(
413 "POST", url, data=json.dumps(data),
414 headers=headers, auth=('admin', 'admin'))
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 self.assertIn('Success',
418 res["output"]["result"])
421 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
422 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
423 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
424 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
425 headers = {'content-type': 'application/json'}
426 response = requests.request(
427 "GET", url, headers=headers, auth=('admin', 'admin'))
428 self.assertEqual(response.status_code, requests.codes.ok)
429 res = response.json()
430 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
431 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
433 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
434 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
435 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
436 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
437 headers = {'content-type': 'application/json'}
438 response = requests.request(
439 "GET", url, headers=headers, auth=('admin', 'admin'))
440 self.assertEqual(response.status_code, requests.codes.ok)
441 res = response.json()
442 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
443 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
445 def test_17_servicePath_create_AToZ(self):
446 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
449 "service-name": "test",
451 "modulation-format": "qpsk",
452 "operation": "create",
455 "dest-tp": "XPDR1-NETWORK1",
456 "src-tp": "XPDR1-CLIENT1",
460 "dest-tp": "DEG1-TTP-TXRX",
461 "src-tp": "SRG1-PP1-TXRX",
465 "dest-tp": "SRG1-PP1-TXRX",
466 "src-tp": "DEG2-TTP-TXRX",
470 "dest-tp": "XPDR1-CLIENT1",
471 "src-tp": "XPDR1-NETWORK1",
477 headers = {'content-type': 'application/json'}
478 response = requests.request(
479 "POST", url, data=json.dumps(data),
480 headers=headers, auth=('admin', 'admin'))
481 self.assertEqual(response.status_code, requests.codes.ok)
482 res = response.json()
483 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
486 def test_18_servicePath_create_ZToA(self):
487 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
490 "service-name": "test",
492 "modulation-format": "qpsk",
493 "operation": "create",
496 "dest-tp": "XPDR1-NETWORK1",
497 "src-tp": "XPDR1-CLIENT1",
501 "dest-tp": "DEG2-TTP-TXRX",
502 "src-tp": "SRG1-PP1-TXRX",
506 "src-tp": "DEG1-TTP-TXRX",
507 "dest-tp": "SRG1-PP1-TXRX",
511 "src-tp": "XPDR1-NETWORK1",
512 "dest-tp": "XPDR1-CLIENT1",
518 headers = {'content-type': 'application/json'}
519 response = requests.request(
520 "POST", url, data=json.dumps(data),
521 headers=headers, auth=('admin', 'admin'))
522 self.assertEqual(response.status_code, requests.codes.ok)
523 res = response.json()
524 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
527 def test_19_service_power_setup_XPDRA_XPDRC(self):
528 url = "{}/operations/olm:service-power-setup".format(self.restconf_baseurl)
531 "service-name": "test",
535 "dest-tp": "XPDR1-NETWORK1",
536 "src-tp": "XPDR1-CLIENT1",
540 "dest-tp": "DEG1-TTP-TXRX",
541 "src-tp": "SRG1-PP1-TXRX",
545 "dest-tp": "SRG1-PP1-TXRX",
546 "src-tp": "DEG2-TTP-TXRX",
550 "dest-tp": "XPDR1-CLIENT1",
551 "src-tp": "XPDR1-NETWORK1",
557 headers = {'content-type': 'application/json'}
558 response = requests.request(
559 "POST", url, data=json.dumps(data),
560 headers=headers, auth=('admin', 'admin'))
561 self.assertEqual(response.status_code, requests.codes.ok)
562 res = response.json()
563 self.assertIn('Success', res["output"]["result"])
565 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
566 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
567 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
568 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
569 headers = {'content-type': 'application/json'}
570 response = requests.request(
571 "GET", url, headers=headers, auth=('admin', 'admin'))
572 self.assertEqual(response.status_code, requests.codes.ok)
573 res = response.json()
574 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
575 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
577 def test_21_get_roadmconnection_ROADMA(self):
578 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
579 "org-openroadm-device:org-openroadm-device/roadm-connections/"
580 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
581 headers = {'content-type': 'application/json'}
582 response = requests.request(
583 "GET", url, headers=headers, auth=('admin', 'admin'))
584 self.assertEqual(response.status_code, requests.codes.ok)
585 res = response.json()
586 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
587 self.assertEqual(-3, res['roadm-connections'][0]['target-output-power'])
589 def test_22_get_roadmconnection_ROADMC(self):
590 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
591 "org-openroadm-device:org-openroadm-device/roadm-connections/"
592 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
593 headers = {'content-type': 'application/json'}
594 response = requests.request(
595 "GET", url, headers=headers, auth=('admin', 'admin'))
596 self.assertEqual(response.status_code, requests.codes.ok)
597 res = response.json()
598 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
600 def test_23_service_power_setup_XPDRC_XPDRA(self):
601 url = "{}/operations/olm:service-power-setup".format(self.restconf_baseurl)
604 "service-name": "test",
608 "dest-tp": "XPDR1-NETWORK1",
609 "src-tp": "XPDR1-CLIENT1",
613 "dest-tp": "DEG2-TTP-TXRX",
614 "src-tp": "SRG1-PP1-TXRX",
618 "src-tp": "DEG1-TTP-TXRX",
619 "dest-tp": "SRG1-PP1-TXRX",
623 "src-tp": "XPDR1-NETWORK1",
624 "dest-tp": "XPDR1-CLIENT1",
630 headers = {'content-type': 'application/json'}
631 response = requests.request(
632 "POST", url, data=json.dumps(data),
633 headers=headers, auth=('admin', 'admin'))
634 self.assertEqual(response.status_code, requests.codes.ok)
635 res = response.json()
636 self.assertIn('Success', res["output"]["result"])
638 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
639 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC/yang-ext:mount/"
640 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
641 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
642 headers = {'content-type': 'application/json'}
643 response = requests.request(
644 "GET", url, headers=headers, auth=('admin', 'admin'))
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
647 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
648 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
650 def test_25_get_roadmconnection_ROADMC(self):
651 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
652 "org-openroadm-device:org-openroadm-device/roadm-connections/"
653 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
654 headers = {'content-type': 'application/json'}
655 response = requests.request(
656 "GET", url, headers=headers, auth=('admin', 'admin'))
657 self.assertEqual(response.status_code, requests.codes.ok)
658 res = response.json()
659 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
660 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
662 def test_26_service_power_turndown_XPDRA_XPDRC(self):
663 url = "{}/operations/olm:service-power-turndown".format(self.restconf_baseurl)
666 "service-name": "test",
670 "dest-tp": "XPDR1-NETWORK1",
671 "src-tp": "XPDR1-CLIENT1",
675 "dest-tp": "DEG1-TTP-TXRX",
676 "src-tp": "SRG1-PP1-TXRX",
680 "dest-tp": "SRG1-PP1-TXRX",
681 "src-tp": "DEG2-TTP-TXRX",
685 "dest-tp": "XPDR1-CLIENT1",
686 "src-tp": "XPDR1-NETWORK1",
692 headers = {'content-type': 'application/json'}
693 response = requests.request(
694 "POST", url, data=json.dumps(data),
695 headers=headers, auth=('admin', 'admin'))
696 self.assertEqual(response.status_code, requests.codes.ok)
697 res = response.json()
698 self.assertIn('Success', res["output"]["result"])
700 def test_27_get_roadmconnection_ROADMA(self):
701 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
702 "org-openroadm-device:org-openroadm-device/roadm-connections/"
703 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
704 headers = {'content-type': 'application/json'}
705 response = requests.request(
706 "GET", url, headers=headers, auth=('admin', 'admin'))
707 self.assertEqual(response.status_code, requests.codes.ok)
708 res = response.json()
709 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
710 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
712 def test_28_get_roadmconnection_ROADMC(self):
713 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
714 "org-openroadm-device:org-openroadm-device/roadm-connections/"
715 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
716 headers = {'content-type': 'application/json'}
717 response = requests.request(
718 "GET", url, headers=headers, auth=('admin', 'admin'))
719 self.assertEqual(response.status_code, requests.codes.ok)
720 res = response.json()
721 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
723 def test_29_servicePath_delete_AToZ(self):
724 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
727 "service-name": "test",
729 "modulation-format": "qpsk",
730 "operation": "delete",
733 "dest-tp": "XPDR1-NETWORK1",
734 "src-tp": "XPDR1-CLIENT1",
738 "dest-tp": "DEG1-TTP-TXRX",
739 "src-tp": "SRG1-PP1-TXRX",
743 "dest-tp": "SRG1-PP1-TXRX",
744 "src-tp": "DEG2-TTP-TXRX",
748 "dest-tp": "XPDR1-CLIENT1",
749 "src-tp": "XPDR1-NETWORK1",
755 headers = {'content-type': 'application/json'}
756 response = requests.request(
757 "POST", url, data=json.dumps(data),
758 headers=headers, auth=('admin', 'admin'))
759 self.assertEqual(response.status_code, requests.codes.ok)
760 res = response.json()
761 self.assertIn('Request processed', res["output"]["result"])
764 def test_30_servicePath_delete_ZToA(self):
765 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
768 "service-name": "test",
770 "modulation-format": "qpsk",
771 "operation": "delete",
774 "dest-tp": "XPDR1-NETWORK1",
775 "src-tp": "XPDR1-CLIENT1",
779 "dest-tp": "DEG2-TTP-TXRX",
780 "src-tp": "SRG1-PP1-TXRX",
784 "src-tp": "DEG1-TTP-TXRX",
785 "dest-tp": "SRG1-PP1-TXRX",
789 "src-tp": "XPDR1-NETWORK1",
790 "dest-tp": "XPDR1-CLIENT1",
796 headers = {'content-type': 'application/json'}
797 response = requests.request(
798 "POST", url, data=json.dumps(data),
799 headers=headers, auth=('admin', 'admin'))
800 self.assertEqual(response.status_code, requests.codes.ok)
801 res = response.json()
802 self.assertIn('Request processed', res["output"]["result"])
805 """to test case where SRG where the xpdr is connected to has no optical range data"""
807 def test_31_connect_xprdA_to_roadmA(self):
808 url = "{}/operations/networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
810 "networkutils:input": {
811 "networkutils:links-input": {
812 "networkutils:xpdr-node": "XPDRA",
813 "networkutils:xpdr-num": "1",
814 "networkutils:network-num": "2",
815 "networkutils:rdm-node": "ROADMA",
816 "networkutils:srg-num": "1",
817 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
821 headers = {'content-type': 'application/json'}
822 response = requests.request(
823 "POST", url, data=json.dumps(data),
824 headers=headers, auth=('admin', 'admin'))
825 self.assertEqual(response.status_code, requests.codes.ok)
826 res = response.json()
827 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
829 def test_32_connect_roadmA_to_xpdrA(self):
830 url = "{}/operations/networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
832 "networkutils:input": {
833 "networkutils:links-input": {
834 "networkutils:xpdr-node": "XPDRA",
835 "networkutils:xpdr-num": "1",
836 "networkutils:network-num": "2",
837 "networkutils:rdm-node": "ROADMA",
838 "networkutils:srg-num": "1",
839 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
843 headers = {'content-type': 'application/json'}
844 response = requests.request(
845 "POST", url, data=json.dumps(data),
846 headers=headers, auth=('admin', 'admin'))
847 self.assertEqual(response.status_code, requests.codes.ok)
848 res = response.json()
849 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
851 def test_33_servicePath_create_AToZ(self):
852 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
855 "service-name": "test2",
857 "modulation-format": "qpsk",
858 "operation": "create",
861 "dest-tp": "XPDR1-NETWORK2",
862 "src-tp": "XPDR1-CLIENT2",
866 "dest-tp": "DEG1-TTP-TXRX",
867 "src-tp": "SRG1-PP2-TXRX",
873 headers = {'content-type': 'application/json'}
874 response = requests.request(
875 "POST", url, data=json.dumps(data),
876 headers=headers, auth=('admin', 'admin'))
877 self.assertEqual(response.status_code, requests.codes.ok)
878 res = response.json()
879 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
882 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
883 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
884 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
885 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
886 headers = {'content-type': 'application/json'}
887 response = requests.request(
888 "GET", url, headers=headers, auth=('admin', 'admin'))
889 self.assertEqual(response.status_code, requests.codes.ok)
890 res = response.json()
891 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
892 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
894 def test_35_servicePath_delete_AToZ(self):
895 url = "{}/operations/renderer:service-path".format(self.restconf_baseurl)
898 "service-name": "test",
900 "modulation-format": "qpsk",
901 "operation": "delete",
904 "dest-tp": "XPDR1-NETWORK2",
905 "src-tp": "XPDR1-CLIENT2",
909 "dest-tp": "DEG1-TTP-TXRX",
910 "src-tp": "SRG1-PP2-TXRX",
916 headers = {'content-type': 'application/json'}
917 response = requests.request(
918 "POST", url, data=json.dumps(data),
919 headers=headers, auth=('admin', 'admin'))
920 self.assertEqual(response.status_code, requests.codes.ok)
921 res = response.json()
922 self.assertIn('Request processed', res["output"]["result"])
925 def test_36_xpdrA_device_disconnected(self):
926 url = ("{}/config/network-topology:"
927 "network-topology/topology/topology-netconf/node/XPDRA"
928 .format(self.restconf_baseurl))
929 headers = {'content-type': 'application/json'}
930 response = requests.request(
931 "DELETE", url, headers=headers,
932 auth=('admin', 'admin'))
933 self.assertEqual(response.status_code, requests.codes.ok)
936 def test_37_xpdrC_device_disconnected(self):
937 url = ("{}/config/network-topology:"
938 "network-topology/topology/topology-netconf/node/XPDRC"
939 .format(self.restconf_baseurl))
940 headers = {'content-type': 'application/json'}
941 response = requests.request(
942 "DELETE", url, headers=headers,
943 auth=('admin', 'admin'))
944 self.assertEqual(response.status_code, requests.codes.ok)
947 def test_38_rdmA_device_disconnected(self):
948 url = ("{}/config/network-topology:"
949 "network-topology/topology/topology-netconf/node/ROADMA"
950 .format(self.restconf_baseurl))
951 headers = {'content-type': 'application/json'}
952 response = requests.request(
953 "DELETE", url, headers=headers,
954 auth=('admin', 'admin'))
955 self.assertEqual(response.status_code, requests.codes.ok)
958 def test_39_rdmC_device_disconnected(self):
959 url = ("{}/config/network-topology:"
960 "network-topology/topology/topology-netconf/node/ROADMC"
961 .format(self.restconf_baseurl))
962 headers = {'content-type': 'application/json'}
963 response = requests.request(
964 "DELETE", url, headers=headers,
965 auth=('admin', 'admin'))
966 self.assertEqual(response.status_code, requests.codes.ok)
969 if __name__ == "__main__":
970 unittest.main(verbosity=2)