3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
24 class TransportOlmTesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
34 def __start_honeynode1(cls):
35 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
36 "/honeynode-distribution-1.18.01/honeycomb-tpce")
37 if os.path.isfile(executable):
38 with open('honeynode1.log', 'w') as outfile:
39 cls.honeynode_process1 = subprocess.Popen(
40 [executable, "17830", "sample_configs/ord_2.1/oper-ROADMA-full.xml"],
44 def __start_honeynode2(cls):
45 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
46 "/honeynode-distribution-1.18.01/honeycomb-tpce")
47 if os.path.isfile(executable):
48 with open('honeynode2.log', 'w') as outfile:
49 cls.honeynode_process2 = subprocess.Popen(
50 [executable, "17831", "sample_configs/ord_2.1/oper-XPDRA.xml"],
54 def __start_honeynode3(cls):
55 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
56 "/honeynode-distribution-1.18.01/honeycomb-tpce")
57 if os.path.isfile(executable):
58 with open('honeynode3.log', 'w') as outfile:
59 cls.honeynode_process3 = subprocess.Popen(
60 [executable, "17833", "sample_configs/ord_2.1/oper-ROADMC-full.xml"],
63 def __start_honeynode4(cls):
64 executable = ("./honeynode/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
65 "/honeynode-distribution-1.18.01/honeycomb-tpce")
66 if os.path.isfile(executable):
67 with open('honeynode4.log', 'w') as outfile:
68 cls.honeynode_process4 = subprocess.Popen(
69 [executable, "17834", "sample_configs/ord_2.1/oper-XPDRC.xml"],
74 executable = "../karaf/target/assembly/bin/karaf"
75 with open('odl.log', 'w') as outfile:
76 cls.odl_process = subprocess.Popen(
77 ["bash", executable, "server"], stdout=outfile,
78 stdin=open(os.devnull))
82 cls.__start_honeynode1()
84 cls.__start_honeynode2()
86 cls.__start_honeynode3()
88 cls.__start_honeynode4()
94 def tearDownClass(cls):
95 for child in psutil.Process(cls.odl_process.pid).children():
96 child.send_signal(signal.SIGINT)
98 cls.odl_process.send_signal(signal.SIGINT)
99 cls.odl_process.wait()
100 for child in psutil.Process(cls.honeynode_process1.pid).children():
101 child.send_signal(signal.SIGINT)
103 cls.honeynode_process1.send_signal(signal.SIGINT)
104 cls.honeynode_process1.wait()
105 for child in psutil.Process(cls.honeynode_process2.pid).children():
106 child.send_signal(signal.SIGINT)
108 cls.honeynode_process2.send_signal(signal.SIGINT)
109 cls.honeynode_process2.wait()
110 for child in psutil.Process(cls.honeynode_process3.pid).children():
111 child.send_signal(signal.SIGINT)
113 cls.honeynode_process3.send_signal(signal.SIGINT)
114 cls.honeynode_process3.wait()
115 for child in psutil.Process(cls.honeynode_process4.pid).children():
116 child.send_signal(signal.SIGINT)
118 cls.honeynode_process4.send_signal(signal.SIGINT)
119 cls.honeynode_process4.wait()
122 print ("execution of {}".format(self.id().split(".")[-1]))
125 def test_01_xpdrA_device_connected(self):
126 url = ("{}/config/network-topology:"
127 "network-topology/topology/topology-netconf/node/XPDRA"
128 .format(self.restconf_baseurl))
131 "netconf-node-topology:username": "admin",
132 "netconf-node-topology:password": "admin",
133 "netconf-node-topology:host": "127.0.0.1",
134 "netconf-node-topology:port": "17831",
135 "netconf-node-topology:tcp-only": "false",
136 "netconf-node-topology:pass-through": {}}]}
137 headers = {'content-type': 'application/json'}
138 response = requests.request(
139 "PUT", url, data=json.dumps(data), headers=headers,
140 auth=('admin', 'admin'))
141 self.assertEqual(response.status_code, requests.codes.created)
144 def test_02_xpdrC_device_connected(self):
145 url = ("{}/config/network-topology:"
146 "network-topology/topology/topology-netconf/node/XPDRC"
147 .format(self.restconf_baseurl))
150 "netconf-node-topology:username": "admin",
151 "netconf-node-topology:password": "admin",
152 "netconf-node-topology:host": "127.0.0.1",
153 "netconf-node-topology:port": "17834",
154 "netconf-node-topology:tcp-only": "false",
155 "netconf-node-topology:pass-through": {}}]}
156 headers = {'content-type': 'application/json'}
157 response = requests.request(
158 "PUT", url, data=json.dumps(data), headers=headers,
159 auth=('admin', 'admin'))
160 self.assertEqual(response.status_code, requests.codes.created)
163 def test_03_rdmA_device_connected(self):
164 url = ("{}/config/network-topology:"
165 "network-topology/topology/topology-netconf/node/ROADMA"
166 .format(self.restconf_baseurl))
169 "netconf-node-topology:username": "admin",
170 "netconf-node-topology:password": "admin",
171 "netconf-node-topology:host": "127.0.0.1",
172 "netconf-node-topology:port": "17830",
173 "netconf-node-topology:tcp-only": "false",
174 "netconf-node-topology:pass-through": {}}]}
175 headers = {'content-type': 'application/json'}
176 response = requests.request(
177 "PUT", url, data=json.dumps(data), headers=headers,
178 auth=('admin', 'admin'))
179 self.assertEqual(response.status_code, requests.codes.created)
182 def test_04_rdmC_device_connected(self):
183 url = ("{}/config/network-topology:"
184 "network-topology/topology/topology-netconf/node/ROADMC"
185 .format(self.restconf_baseurl))
188 "netconf-node-topology:username": "admin",
189 "netconf-node-topology:password": "admin",
190 "netconf-node-topology:host": "127.0.0.1",
191 "netconf-node-topology:port": "17833",
192 "netconf-node-topology:tcp-only": "false",
193 "netconf-node-topology:pass-through": {}}]}
194 headers = {'content-type': 'application/json'}
195 response = requests.request(
196 "PUT", url, data=json.dumps(data), headers=headers,
197 auth=('admin', 'admin'))
198 self.assertEqual(response.status_code, requests.codes.created)
201 def test_05_connect_xprdA_to_roadmA(self):
202 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
204 "networkutils:input": {
205 "networkutils:links-input": {
206 "networkutils:xpdr-node": "XPDRA",
207 "networkutils:xpdr-num": "1",
208 "networkutils:network-num": "1",
209 "networkutils:rdm-node": "ROADMA",
210 "networkutils:srg-num": "1",
211 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
215 headers = {'content-type': 'application/json'}
216 response = requests.request(
217 "POST", url, data=json.dumps(data),
218 headers=headers, auth=('admin', 'admin'))
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
223 def test_06_connect_roadmA_to_xpdrA(self):
224 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
226 "networkutils:input": {
227 "networkutils:links-input": {
228 "networkutils:xpdr-node": "XPDRA",
229 "networkutils:xpdr-num": "1",
230 "networkutils:network-num": "1",
231 "networkutils:rdm-node": "ROADMA",
232 "networkutils:srg-num": "1",
233 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
237 headers = {'content-type': 'application/json'}
238 response = requests.request(
239 "POST", url, data=json.dumps(data),
240 headers=headers, auth=('admin', 'admin'))
241 self.assertEqual(response.status_code, requests.codes.ok)
242 res = response.json()
243 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
245 def test_07_connect_xprdC_to_roadmC(self):
246 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
248 "networkutils:input": {
249 "networkutils:links-input": {
250 "networkutils:xpdr-node": "XPDRC",
251 "networkutils:xpdr-num": "1",
252 "networkutils:network-num": "1",
253 "networkutils:rdm-node": "ROADMC",
254 "networkutils:srg-num": "1",
255 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
259 headers = {'content-type': 'application/json'}
260 response = requests.request(
261 "POST", url, data=json.dumps(data),
262 headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
267 def test_08_connect_roadmC_to_xpdrC(self):
268 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
270 "networkutils:input": {
271 "networkutils:links-input": {
272 "networkutils:xpdr-node": "XPDRC",
273 "networkutils:xpdr-num": "1",
274 "networkutils:network-num": "1",
275 "networkutils:rdm-node": "ROADMC",
276 "networkutils:srg-num": "1",
277 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
281 headers = {'content-type': 'application/json'}
282 response = requests.request(
283 "POST", url, data=json.dumps(data),
284 headers=headers, auth=('admin', 'admin'))
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
289 def test_09_create_OTS_ROADMA(self):
290 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
293 "node-id" : "ROADMA",
294 "logical-connection-point" : "DEG1-TTP-TXRX"
297 headers = {'content-type': 'application/json'}
298 response = requests.request(
299 "POST", url, data=json.dumps(data),
300 headers=headers, auth=('admin', 'admin'))
301 self.assertEqual(response.status_code, requests.codes.ok)
302 res = response.json()
303 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA',
304 res["output"]["result"])
306 def test_10_create_OTS_ROADMC(self):
307 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
310 "node-id" : "ROADMC",
311 "logical-connection-point" : "DEG2-TTP-TXRX"
314 headers = {'content-type': 'application/json'}
315 response = requests.request(
316 "POST", url, data=json.dumps(data),
317 headers=headers, auth=('admin', 'admin'))
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
320 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC',
321 res["output"]["result"])
323 def test_11_get_PM_ROADMA(self):
324 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
328 "resource-type": "interface",
329 "granularity": "15min",
330 "resource-identifier": {
331 "resource-name" : "OTS-DEG1-TTP-TXRX"
335 headers = {'content-type': 'application/json'}
336 response = requests.request(
337 "POST", url, data=json.dumps(data),
338 headers=headers, auth=('admin', 'admin'))
339 self.assertEqual(response.status_code, requests.codes.ok)
340 res = response.json()
342 "pmparameter-name": "OpticalPowerOutput",
343 "pmparameter-value": "2.5"
344 }, res["output"]["measurements"])
346 "pmparameter-name": "OpticalReturnLoss",
347 "pmparameter-value": "49.9"
348 }, res["output"]["measurements"])
350 "pmparameter-name": "OpticalPowerInput",
351 "pmparameter-value": "3"
352 }, res["output"]["measurements"])
354 def test_12_get_PM_ROADMC(self):
355 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
359 "resource-type": "interface",
360 "granularity": "15min",
361 "resource-identifier": {
362 "resource-name" : "OTS-DEG2-TTP-TXRX"
366 headers = {'content-type': 'application/json'}
367 response = requests.request(
368 "POST", url, data=json.dumps(data),
369 headers=headers, auth=('admin', 'admin'))
370 self.assertEqual(response.status_code, requests.codes.ok)
371 res = response.json()
373 "pmparameter-name": "OpticalPowerOutput",
374 "pmparameter-value": "18.1"
375 }, res["output"]["measurements"])
377 "pmparameter-name": "OpticalReturnLoss",
378 "pmparameter-value": "48.8"
379 }, res["output"]["measurements"])
381 "pmparameter-name": "OpticalPowerInput",
382 "pmparameter-value": "-3.2"
383 }, res["output"]["measurements"])
385 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
386 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
390 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
393 headers = {'content-type': 'application/json'}
394 response = requests.request(
395 "POST", url, data=json.dumps(data),
396 headers=headers, auth=('admin', 'admin'))
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
399 self.assertIn('Success',
400 res["output"]["result"])
403 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
404 }, res["output"]["spans"])
407 def test_14_calculate_span_loss_base_all(self):
408 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
414 headers = {'content-type': 'application/json'}
415 response = requests.request(
416 "POST", url, data=json.dumps(data),
417 headers=headers, auth=('admin', 'admin'))
418 self.assertEqual(response.status_code, requests.codes.ok)
419 res = response.json()
420 self.assertIn('Success',
421 res["output"]["result"])
424 "link-id": "ROADMC-DEG2-DEG2-TTP-TXRXtoROADMA-DEG1-DEG1-TTP-TXRX"
425 }, res["output"]["spans"])
428 "link-id": "ROADMA-DEG1-DEG1-TTP-TXRXtoROADMC-DEG2-DEG2-TTP-TXRX"
429 }, res["output"]["spans"])
432 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
433 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
434 "node/ROADMA/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
435 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
436 headers = {'content-type': 'application/json'}
437 response = requests.request(
438 "GET", url, headers=headers, auth=('admin', 'admin'))
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
442 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
444 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
445 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
446 "node/ROADMC/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
447 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
448 headers = {'content-type': 'application/json'}
449 response = requests.request(
450 "GET", url, headers=headers, auth=('admin', 'admin'))
451 self.assertEqual(response.status_code, requests.codes.ok)
452 res = response.json()
453 self.assertEqual(15, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
454 self.assertEqual(6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
456 def test_17_servicePath_create_AToZ(self):
457 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
460 "service-name": "test",
462 "modulation-format": "qpsk",
463 "operation": "create",
466 "dest-tp": "XPDR1-NETWORK1",
467 "src-tp": "XPDR1-CLIENT1",
471 "dest-tp": "DEG1-TTP-TXRX",
472 "src-tp": "SRG1-PP1-TXRX",
476 "dest-tp": "SRG1-PP1-TXRX",
477 "src-tp": "DEG2-TTP-TXRX",
481 "dest-tp": "XPDR1-CLIENT1",
482 "src-tp": "XPDR1-NETWORK1",
488 headers = {'content-type': 'application/json'}
489 response = requests.request(
490 "POST", url, data=json.dumps(data),
491 headers=headers, auth=('admin', 'admin'))
492 self.assertEqual(response.status_code, requests.codes.ok)
493 res = response.json()
494 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
497 def test_18_servicePath_create_ZToA(self):
498 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
501 "service-name": "test",
503 "modulation-format": "qpsk",
504 "operation": "create",
507 "dest-tp": "XPDR1-NETWORK1",
508 "src-tp": "XPDR1-CLIENT1",
512 "dest-tp": "DEG2-TTP-TXRX",
513 "src-tp": "SRG1-PP1-TXRX",
517 "src-tp": "DEG1-TTP-TXRX",
518 "dest-tp": "SRG1-PP1-TXRX",
522 "src-tp": "XPDR1-NETWORK1",
523 "dest-tp": "XPDR1-CLIENT1",
529 headers = {'content-type': 'application/json'}
530 response = requests.request(
531 "POST", url, data=json.dumps(data),
532 headers=headers, auth=('admin', 'admin'))
533 self.assertEqual(response.status_code, requests.codes.ok)
534 res = response.json()
535 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
538 def test_19_service_power_setup_XPDRA_XPDRC(self):
539 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
542 "service-name": "test",
546 "dest-tp": "XPDR1-NETWORK1",
547 "src-tp": "XPDR1-CLIENT1",
551 "dest-tp": "DEG1-TTP-TXRX",
552 "src-tp": "SRG1-PP1-TXRX",
556 "dest-tp": "SRG1-PP1-TXRX",
557 "src-tp": "DEG2-TTP-TXRX",
561 "dest-tp": "XPDR1-CLIENT1",
562 "src-tp": "XPDR1-NETWORK1",
568 headers = {'content-type': 'application/json'}
569 response = requests.request(
570 "POST", url, data=json.dumps(data),
571 headers=headers, auth=('admin', 'admin'))
572 self.assertEqual(response.status_code, requests.codes.ok)
573 res = response.json()
574 self.assertIn('Success', res["output"]["result"])
576 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
577 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
578 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
579 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
580 headers = {'content-type': 'application/json'}
581 response = requests.request(
582 "GET", url, headers=headers, auth=('admin', 'admin'))
583 self.assertEqual(response.status_code, requests.codes.ok)
584 res = response.json()
585 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
586 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
588 def test_21_get_roadmconnection_ROADMA(self):
589 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
590 "org-openroadm-device:org-openroadm-device/roadm-connections/"
591 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
592 headers = {'content-type': 'application/json'}
593 response = requests.request(
594 "GET", url, headers=headers, auth=('admin', 'admin'))
595 self.assertEqual(response.status_code, requests.codes.ok)
596 res = response.json()
597 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
598 self.assertEqual(-3, res['roadm-connections'][0]['target-output-power'])
600 def test_22_get_roadmconnection_ROADMC(self):
601 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
602 "org-openroadm-device:org-openroadm-device/roadm-connections/"
603 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
604 headers = {'content-type': 'application/json'}
605 response = requests.request(
606 "GET", url, headers=headers, auth=('admin', 'admin'))
607 self.assertEqual(response.status_code, requests.codes.ok)
608 res = response.json()
609 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
611 def test_23_service_power_setup_XPDRC_XPDRA(self):
612 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
615 "service-name": "test",
619 "dest-tp": "XPDR1-NETWORK1",
620 "src-tp": "XPDR1-CLIENT1",
624 "dest-tp": "DEG2-TTP-TXRX",
625 "src-tp": "SRG1-PP1-TXRX",
629 "src-tp": "DEG1-TTP-TXRX",
630 "dest-tp": "SRG1-PP1-TXRX",
634 "src-tp": "XPDR1-NETWORK1",
635 "dest-tp": "XPDR1-CLIENT1",
641 headers = {'content-type': 'application/json'}
642 response = requests.request(
643 "POST", url, data=json.dumps(data),
644 headers=headers, auth=('admin', 'admin'))
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
647 self.assertIn('Success', res["output"]["result"])
649 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
650 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC/yang-ext:mount/"
651 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
652 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
653 headers = {'content-type': 'application/json'}
654 response = requests.request(
655 "GET", url, headers=headers, auth=('admin', 'admin'))
656 self.assertEqual(response.status_code, requests.codes.ok)
657 res = response.json()
658 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
659 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
661 def test_25_get_roadmconnection_ROADMC(self):
662 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
663 "org-openroadm-device:org-openroadm-device/roadm-connections/"
664 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
665 headers = {'content-type': 'application/json'}
666 response = requests.request(
667 "GET", url, headers=headers, auth=('admin', 'admin'))
668 self.assertEqual(response.status_code, requests.codes.ok)
669 res = response.json()
670 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
671 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
673 def test_26_service_power_turndown_XPDRA_XPDRC(self):
674 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
677 "service-name": "test",
681 "dest-tp": "XPDR1-NETWORK1",
682 "src-tp": "XPDR1-CLIENT1",
686 "dest-tp": "DEG1-TTP-TXRX",
687 "src-tp": "SRG1-PP1-TXRX",
691 "dest-tp": "SRG1-PP1-TXRX",
692 "src-tp": "DEG2-TTP-TXRX",
696 "dest-tp": "XPDR1-CLIENT1",
697 "src-tp": "XPDR1-NETWORK1",
703 headers = {'content-type': 'application/json'}
704 response = requests.request(
705 "POST", url, data=json.dumps(data),
706 headers=headers, auth=('admin', 'admin'))
707 self.assertEqual(response.status_code, requests.codes.ok)
708 res = response.json()
709 self.assertIn('Success', res["output"]["result"])
711 def test_27_get_roadmconnection_ROADMA(self):
712 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA/yang-ext:mount/"
713 "org-openroadm-device:org-openroadm-device/roadm-connections/"
714 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
715 headers = {'content-type': 'application/json'}
716 response = requests.request(
717 "GET", url, headers=headers, auth=('admin', 'admin'))
718 self.assertEqual(response.status_code, requests.codes.ok)
719 res = response.json()
720 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
721 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
723 def test_28_get_roadmconnection_ROADMC(self):
724 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC/yang-ext:mount/"
725 "org-openroadm-device:org-openroadm-device/roadm-connections/"
726 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
727 headers = {'content-type': 'application/json'}
728 response = requests.request(
729 "GET", url, headers=headers, auth=('admin', 'admin'))
730 self.assertEqual(response.status_code, requests.codes.ok)
731 res = response.json()
732 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
734 def test_29_servicePath_delete_AToZ(self):
735 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
738 "service-name": "test",
740 "modulation-format": "qpsk",
741 "operation": "delete",
744 "dest-tp": "XPDR1-NETWORK1",
745 "src-tp": "XPDR1-CLIENT1",
749 "dest-tp": "DEG1-TTP-TXRX",
750 "src-tp": "SRG1-PP1-TXRX",
754 "dest-tp": "SRG1-PP1-TXRX",
755 "src-tp": "DEG2-TTP-TXRX",
759 "dest-tp": "XPDR1-CLIENT1",
760 "src-tp": "XPDR1-NETWORK1",
766 headers = {'content-type': 'application/json'}
767 response = requests.request(
768 "POST", url, data=json.dumps(data),
769 headers=headers, auth=('admin', 'admin'))
770 self.assertEqual(response.status_code, requests.codes.ok)
771 res = response.json()
772 self.assertIn('Request processed', res["output"]["result"])
775 def test_30_servicePath_delete_ZToA(self):
776 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
779 "service-name": "test",
781 "modulation-format": "qpsk",
782 "operation": "delete",
785 "dest-tp": "XPDR1-NETWORK1",
786 "src-tp": "XPDR1-CLIENT1",
790 "dest-tp": "DEG2-TTP-TXRX",
791 "src-tp": "SRG1-PP1-TXRX",
795 "src-tp": "DEG1-TTP-TXRX",
796 "dest-tp": "SRG1-PP1-TXRX",
800 "src-tp": "XPDR1-NETWORK1",
801 "dest-tp": "XPDR1-CLIENT1",
807 headers = {'content-type': 'application/json'}
808 response = requests.request(
809 "POST", url, data=json.dumps(data),
810 headers=headers, auth=('admin', 'admin'))
811 self.assertEqual(response.status_code, requests.codes.ok)
812 res = response.json()
813 self.assertIn('Request processed', res["output"]["result"])
816 """to test case where SRG where the xpdr is connected to has no optical range data"""
818 def test_31_connect_xprdA_to_roadmA(self):
819 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
821 "networkutils:input": {
822 "networkutils:links-input": {
823 "networkutils:xpdr-node": "XPDRA",
824 "networkutils:xpdr-num": "1",
825 "networkutils:network-num": "2",
826 "networkutils:rdm-node": "ROADMA",
827 "networkutils:srg-num": "1",
828 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
832 headers = {'content-type': 'application/json'}
833 response = requests.request(
834 "POST", url, data=json.dumps(data),
835 headers=headers, auth=('admin', 'admin'))
836 self.assertEqual(response.status_code, requests.codes.ok)
837 res = response.json()
838 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
840 def test_32_connect_roadmA_to_xpdrA(self):
841 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
843 "networkutils:input": {
844 "networkutils:links-input": {
845 "networkutils:xpdr-node": "XPDRA",
846 "networkutils:xpdr-num": "1",
847 "networkutils:network-num": "2",
848 "networkutils:rdm-node": "ROADMA",
849 "networkutils:srg-num": "1",
850 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
854 headers = {'content-type': 'application/json'}
855 response = requests.request(
856 "POST", url, data=json.dumps(data),
857 headers=headers, auth=('admin', 'admin'))
858 self.assertEqual(response.status_code, requests.codes.ok)
859 res = response.json()
860 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
862 def test_33_servicePath_create_AToZ(self):
863 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
866 "service-name": "test2",
868 "modulation-format": "qpsk",
869 "operation": "create",
872 "dest-tp": "XPDR1-NETWORK2",
873 "src-tp": "XPDR1-CLIENT2",
877 "dest-tp": "DEG1-TTP-TXRX",
878 "src-tp": "SRG1-PP2-TXRX",
884 headers = {'content-type': 'application/json'}
885 response = requests.request(
886 "POST", url, data=json.dumps(data),
887 headers=headers, auth=('admin', 'admin'))
888 self.assertEqual(response.status_code, requests.codes.ok)
889 res = response.json()
890 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
893 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
894 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA/yang-ext:mount/"
895 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
896 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
897 headers = {'content-type': 'application/json'}
898 response = requests.request(
899 "GET", url, headers=headers, auth=('admin', 'admin'))
900 self.assertEqual(response.status_code, requests.codes.ok)
901 res = response.json()
902 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
903 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
905 def test_35_servicePath_delete_AToZ(self):
906 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
909 "service-name": "test",
911 "modulation-format": "qpsk",
912 "operation": "delete",
915 "dest-tp": "XPDR1-NETWORK2",
916 "src-tp": "XPDR1-CLIENT2",
920 "dest-tp": "DEG1-TTP-TXRX",
921 "src-tp": "SRG1-PP2-TXRX",
927 headers = {'content-type': 'application/json'}
928 response = requests.request(
929 "POST", url, data=json.dumps(data),
930 headers=headers, auth=('admin', 'admin'))
931 self.assertEqual(response.status_code, requests.codes.ok)
932 res = response.json()
933 self.assertIn('Request processed', res["output"]["result"])
936 def test_36_xpdrA_device_disconnected(self):
937 url = ("{}/config/network-topology:"
938 "network-topology/topology/topology-netconf/node/XPDRA"
939 .format(self.restconf_baseurl))
940 headers = {'content-type': 'application/json'}
941 response = requests.request(
942 "DELETE", url, headers=headers,
943 auth=('admin', 'admin'))
944 self.assertEqual(response.status_code, requests.codes.ok)
947 def test_37_xpdrC_device_disconnected(self):
948 url = ("{}/config/network-topology:"
949 "network-topology/topology/topology-netconf/node/XPDRC"
950 .format(self.restconf_baseurl))
951 headers = {'content-type': 'application/json'}
952 response = requests.request(
953 "DELETE", url, headers=headers,
954 auth=('admin', 'admin'))
955 self.assertEqual(response.status_code, requests.codes.ok)
958 def test_38_calculate_span_loss_current(self):
959 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
960 headers = {'content-type': 'application/json'}
961 response = requests.request(
962 "POST", url, headers=headers, auth=('admin', 'admin'))
963 self.assertEqual(response.status_code, requests.codes.ok)
964 res = response.json()
965 self.assertIn('Success',
966 res["output"]["result"])
969 def test_39_rdmA_device_disconnected(self):
970 url = ("{}/config/network-topology:"
971 "network-topology/topology/topology-netconf/node/ROADMA"
972 .format(self.restconf_baseurl))
973 headers = {'content-type': 'application/json'}
974 response = requests.request(
975 "DELETE", url, headers=headers,
976 auth=('admin', 'admin'))
977 self.assertEqual(response.status_code, requests.codes.ok)
980 def test_40_rdmC_device_disconnected(self):
981 url = ("{}/config/network-topology:"
982 "network-topology/topology/topology-netconf/node/ROADMC"
983 .format(self.restconf_baseurl))
984 headers = {'content-type': 'application/json'}
985 response = requests.request(
986 "DELETE", url, headers=headers,
987 auth=('admin', 'admin'))
988 self.assertEqual(response.status_code, requests.codes.ok)
991 if __name__ == "__main__":
992 unittest.main(verbosity=2)