3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
24 class TransportOlmTesting(unittest.TestCase):
26 honeynode_process1 = None
27 honeynode_process2 = None
28 honeynode_process3 = None
29 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
33 #START_IGNORE_XTESTING
36 def __start_honeynode1(cls):
37 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
38 "/honeynode-distribution-1.18.01/honeycomb-tpce")
39 if os.path.isfile(executable):
40 with open('honeynode1.log', 'w') as outfile:
41 cls.honeynode_process1 = subprocess.Popen(
42 [executable, "17840", "sample_configs/openroadm/2.2.1/oper-XPDRA.xml"],
46 def __start_honeynode2(cls):
47 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
48 "/honeynode-distribution-1.18.01/honeycomb-tpce")
49 if os.path.isfile(executable):
50 with open('honeynode2.log', 'w') as outfile:
51 cls.honeynode_process2 = subprocess.Popen(
52 [executable, "17841", "sample_configs/openroadm/2.2.1/oper-ROADMA.xml"],
56 def __start_honeynode3(cls):
57 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
58 "/honeynode-distribution-1.18.01/honeycomb-tpce")
59 if os.path.isfile(executable):
60 with open('honeynode3.log', 'w') as outfile:
61 cls.honeynode_process3 = subprocess.Popen(
62 [executable, "17843", "sample_configs/openroadm/2.2.1/oper-ROADMC.xml"],
65 def __start_honeynode4(cls):
66 executable = ("./honeynode/2.2.1/honeynode-distribution/target/honeynode-distribution-1.18.01-hc"
67 "/honeynode-distribution-1.18.01/honeycomb-tpce")
68 if os.path.isfile(executable):
69 with open('honeynode4.log', 'w') as outfile:
70 cls.honeynode_process4 = subprocess.Popen(
71 [executable, "17844", "sample_configs/openroadm/2.2.1/oper-XPDRC.xml"],
76 executable = "../karaf/target/assembly/bin/karaf"
77 with open('odl.log', 'w') as outfile:
78 cls.odl_process = subprocess.Popen(
79 ["bash", executable, "server"], stdout=outfile,
80 stdin=open(os.devnull))
84 cls.__start_honeynode1()
86 cls.__start_honeynode2()
88 cls.__start_honeynode3()
90 cls.__start_honeynode4()
96 def tearDownClass(cls):
97 for child in psutil.Process(cls.odl_process.pid).children():
98 child.send_signal(signal.SIGINT)
100 cls.odl_process.send_signal(signal.SIGINT)
101 cls.odl_process.wait()
102 for child in psutil.Process(cls.honeynode_process1.pid).children():
103 child.send_signal(signal.SIGINT)
105 cls.honeynode_process1.send_signal(signal.SIGINT)
106 cls.honeynode_process1.wait()
107 for child in psutil.Process(cls.honeynode_process2.pid).children():
108 child.send_signal(signal.SIGINT)
110 cls.honeynode_process2.send_signal(signal.SIGINT)
111 cls.honeynode_process2.wait()
112 for child in psutil.Process(cls.honeynode_process3.pid).children():
113 child.send_signal(signal.SIGINT)
115 cls.honeynode_process3.send_signal(signal.SIGINT)
116 cls.honeynode_process3.wait()
117 for child in psutil.Process(cls.honeynode_process4.pid).children():
118 child.send_signal(signal.SIGINT)
120 cls.honeynode_process4.send_signal(signal.SIGINT)
121 cls.honeynode_process4.wait()
124 print ("execution of {}".format(self.id().split(".")[-1]))
129 def test_01_xpdrA_device_connected(self):
130 url = ("{}/config/network-topology:"
131 "network-topology/topology/topology-netconf/node/XPDR-A1"
132 .format(self.restconf_baseurl))
134 "node-id": "XPDR-A1",
135 "netconf-node-topology:username": "admin",
136 "netconf-node-topology:password": "admin",
137 "netconf-node-topology:host": "127.0.0.1",
138 "netconf-node-topology:port": "17840",
139 "netconf-node-topology:tcp-only": "false",
140 "netconf-node-topology:pass-through": {}}]}
141 headers = {'content-type': 'application/json'}
142 response = requests.request(
143 "PUT", url, data=json.dumps(data), headers=headers,
144 auth=('admin', 'admin'))
145 self.assertEqual(response.status_code, requests.codes.created)
148 def test_02_xpdrC_device_connected(self):
149 url = ("{}/config/network-topology:"
150 "network-topology/topology/topology-netconf/node/XPDR-C1"
151 .format(self.restconf_baseurl))
153 "node-id": "XPDR-C1",
154 "netconf-node-topology:username": "admin",
155 "netconf-node-topology:password": "admin",
156 "netconf-node-topology:host": "127.0.0.1",
157 "netconf-node-topology:port": "17844",
158 "netconf-node-topology:tcp-only": "false",
159 "netconf-node-topology:pass-through": {}}]}
160 headers = {'content-type': 'application/json'}
161 response = requests.request(
162 "PUT", url, data=json.dumps(data), headers=headers,
163 auth=('admin', 'admin'))
164 self.assertEqual(response.status_code, requests.codes.created)
167 def test_03_rdmA_device_connected(self):
168 url = ("{}/config/network-topology:"
169 "network-topology/topology/topology-netconf/node/ROADM-A1"
170 .format(self.restconf_baseurl))
172 "node-id": "ROADM-A1",
173 "netconf-node-topology:username": "admin",
174 "netconf-node-topology:password": "admin",
175 "netconf-node-topology:host": "127.0.0.1",
176 "netconf-node-topology:port": "17841",
177 "netconf-node-topology:tcp-only": "false",
178 "netconf-node-topology:pass-through": {}}]}
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "PUT", url, data=json.dumps(data), headers=headers,
182 auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.created)
186 def test_04_rdmC_device_connected(self):
187 url = ("{}/config/network-topology:"
188 "network-topology/topology/topology-netconf/node/ROADM-C1"
189 .format(self.restconf_baseurl))
191 "node-id": "ROADM-C1",
192 "netconf-node-topology:username": "admin",
193 "netconf-node-topology:password": "admin",
194 "netconf-node-topology:host": "127.0.0.1",
195 "netconf-node-topology:port": "17843",
196 "netconf-node-topology:tcp-only": "false",
197 "netconf-node-topology:pass-through": {}}]}
198 headers = {'content-type': 'application/json'}
199 response = requests.request(
200 "PUT", url, data=json.dumps(data), headers=headers,
201 auth=('admin', 'admin'))
202 self.assertEqual(response.status_code, requests.codes.created)
205 def test_05_connect_xprdA_to_roadmA(self):
206 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
208 "networkutils:input": {
209 "networkutils:links-input": {
210 "networkutils:xpdr-node": "XPDR-A1",
211 "networkutils:xpdr-num": "1",
212 "networkutils:network-num": "1",
213 "networkutils:rdm-node": "ROADM-A1",
214 "networkutils:srg-num": "1",
215 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
219 headers = {'content-type': 'application/json'}
220 response = requests.request(
221 "POST", url, data=json.dumps(data),
222 headers=headers, auth=('admin', 'admin'))
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
225 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
227 def test_06_connect_roadmA_to_xpdrA(self):
228 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
230 "networkutils:input": {
231 "networkutils:links-input": {
232 "networkutils:xpdr-node": "XPDR-A1",
233 "networkutils:xpdr-num": "1",
234 "networkutils:network-num": "1",
235 "networkutils:rdm-node": "ROADM-A1",
236 "networkutils:srg-num": "1",
237 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
241 headers = {'content-type': 'application/json'}
242 response = requests.request(
243 "POST", url, data=json.dumps(data),
244 headers=headers, auth=('admin', 'admin'))
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
249 def test_07_connect_xprdC_to_roadmC(self):
250 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
252 "networkutils:input": {
253 "networkutils:links-input": {
254 "networkutils:xpdr-node": "XPDR-C1",
255 "networkutils:xpdr-num": "1",
256 "networkutils:network-num": "1",
257 "networkutils:rdm-node": "ROADM-C1",
258 "networkutils:srg-num": "1",
259 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
263 headers = {'content-type': 'application/json'}
264 response = requests.request(
265 "POST", url, data=json.dumps(data),
266 headers=headers, auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.ok)
268 res = response.json()
269 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
271 def test_08_connect_roadmC_to_xpdrC(self):
272 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
274 "networkutils:input": {
275 "networkutils:links-input": {
276 "networkutils:xpdr-node": "XPDR-C1",
277 "networkutils:xpdr-num": "1",
278 "networkutils:network-num": "1",
279 "networkutils:rdm-node": "ROADM-C1",
280 "networkutils:srg-num": "1",
281 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
285 headers = {'content-type': 'application/json'}
286 response = requests.request(
287 "POST", url, data=json.dumps(data),
288 headers=headers, auth=('admin', 'admin'))
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
293 def test_09_create_OTS_ROADMA(self):
294 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
297 "node-id" : "ROADM-A1",
298 "logical-connection-point" : "DEG1-TTP-TXRX"
301 headers = {'content-type': 'application/json'}
302 response = requests.request(
303 "POST", url, data=json.dumps(data),
304 headers=headers, auth=('admin', 'admin'))
306 self.assertEqual(response.status_code, requests.codes.ok)
307 res = response.json()
308 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
309 res["output"]["result"])
311 def test_10_create_OTS_ROADMC(self):
312 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
315 "node-id" : "ROADM-C1",
316 "logical-connection-point" : "DEG2-TTP-TXRX"
319 headers = {'content-type': 'application/json'}
320 response = requests.request(
321 "POST", url, data=json.dumps(data),
322 headers=headers, auth=('admin', 'admin'))
323 self.assertEqual(response.status_code, requests.codes.ok)
324 res = response.json()
325 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
326 res["output"]["result"])
328 def test_11_get_PM_ROADMA(self):
329 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
332 "node-id": "ROADM-A1",
333 "resource-type": "interface",
334 "granularity": "15min",
335 "resource-identifier": {
336 "resource-name" : "OTS-DEG2-TTP-TXRX"
340 headers = {'content-type': 'application/json'}
341 response = requests.request(
342 "POST", url, data=json.dumps(data),
343 headers=headers, auth=('admin', 'admin'))
344 self.assertEqual(response.status_code, requests.codes.ok)
345 res = response.json()
347 "pmparameter-name": "OpticalPowerOutput",
348 "pmparameter-value": "2.5"
349 }, res["output"]["measurements"])
351 "pmparameter-name": "OpticalReturnLoss",
352 "pmparameter-value": "40"
353 }, res["output"]["measurements"])
355 "pmparameter-name": "OpticalPowerInput",
356 "pmparameter-value": "-21.1"
357 }, res["output"]["measurements"])
359 def test_12_get_PM_ROADMC(self):
360 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
363 "node-id": "ROADM-C1",
364 "resource-type": "interface",
365 "granularity": "15min",
366 "resource-identifier": {
367 "resource-name" : "OTS-DEG1-TTP-TXRX"
371 headers = {'content-type': 'application/json'}
372 response = requests.request(
373 "POST", url, data=json.dumps(data),
374 headers=headers, auth=('admin', 'admin'))
375 self.assertEqual(response.status_code, requests.codes.ok)
376 res = response.json()
378 "pmparameter-name": "OpticalPowerOutput",
379 "pmparameter-value": "4.6"
380 }, res["output"]["measurements"])
382 "pmparameter-name": "OpticalReturnLoss",
383 "pmparameter-value": "49.1"
384 }, res["output"]["measurements"])
386 "pmparameter-name": "OpticalPowerInput",
387 "pmparameter-value": "-15.1"
388 }, res["output"]["measurements"])
390 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
391 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
395 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
398 headers = {'content-type': 'application/json'}
399 response = requests.request(
400 "POST", url, data=json.dumps(data),
401 headers=headers, auth=('admin', 'admin'))
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 self.assertIn('Success',
405 res["output"]["result"])
408 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
409 }, res["output"]["spans"])
412 def test_14_calculate_span_loss_base_all(self):
413 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
419 headers = {'content-type': 'application/json'}
420 response = requests.request(
421 "POST", url, data=json.dumps(data),
422 headers=headers, auth=('admin', 'admin'))
423 self.assertEqual(response.status_code, requests.codes.ok)
424 res = response.json()
425 self.assertIn('Success',
426 res["output"]["result"])
429 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
430 }, res["output"]["spans"])
433 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
434 }, res["output"]["spans"])
437 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
438 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
439 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
440 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
441 headers = {'content-type': 'application/json'}
442 response = requests.request(
443 "GET", url, headers=headers, auth=('admin', 'admin'))
444 self.assertEqual(response.status_code, requests.codes.ok)
445 res = response.json()
446 self.assertEqual(18, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
447 self.assertEqual(26, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
449 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
450 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
451 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
452 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
453 headers = {'content-type': 'application/json'}
454 response = requests.request(
455 "GET", url, headers=headers, auth=('admin', 'admin'))
456 self.assertEqual(response.status_code, requests.codes.ok)
457 res = response.json()
458 self.assertEqual(26, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
459 self.assertEqual(18, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
461 def test_17_servicePath_create_AToZ(self):
462 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
465 "service-name": "test",
467 "modulation-format": "qpsk",
468 "operation": "create",
471 "dest-tp": "XPDR1-NETWORK1",
472 "src-tp": "XPDR1-CLIENT1",
476 "dest-tp": "DEG2-TTP-TXRX",
477 "src-tp": "SRG1-PP1-TXRX",
478 "node-id": "ROADM-A1"
481 "dest-tp": "SRG1-PP1-TXRX",
482 "src-tp": "DEG1-TTP-TXRX",
483 "node-id": "ROADM-C1"
486 "dest-tp": "XPDR1-CLIENT1",
487 "src-tp": "XPDR1-NETWORK1",
493 headers = {'content-type': 'application/json'}
494 response = requests.request(
495 "POST", url, data=json.dumps(data),
496 headers=headers, auth=('admin', 'admin'))
497 self.assertEqual(response.status_code, requests.codes.ok)
498 res = response.json()
499 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
503 def test_18_servicePath_create_ZToA(self):
504 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
507 "service-name": "test",
509 "modulation-format": "qpsk",
510 "operation": "create",
513 "dest-tp": "XPDR1-NETWORK1",
514 "src-tp": "XPDR1-CLIENT1",
518 "dest-tp": "DEG1-TTP-TXRX",
519 "src-tp": "SRG1-PP1-TXRX",
520 "node-id": "ROADM-C1"
523 "src-tp": "DEG2-TTP-TXRX",
524 "dest-tp": "SRG1-PP1-TXRX",
525 "node-id": "ROADM-A1"
528 "src-tp": "XPDR1-NETWORK1",
529 "dest-tp": "XPDR1-CLIENT1",
535 headers = {'content-type': 'application/json'}
536 response = requests.request(
537 "POST", url, data=json.dumps(data),
538 headers=headers, auth=('admin', 'admin'))
539 self.assertEqual(response.status_code, requests.codes.ok)
540 res = response.json()
541 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
545 def test_19_service_power_setup_XPDRA_XPDRC(self):
546 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
549 "service-name": "test",
553 "dest-tp": "XPDR1-NETWORK1",
554 "src-tp": "XPDR1-CLIENT1",
558 "dest-tp": "DEG2-TTP-TXRX",
559 "src-tp": "SRG1-PP1-TXRX",
560 "node-id": "ROADM-A1"
563 "dest-tp": "SRG1-PP1-TXRX",
564 "src-tp": "DEG1-TTP-TXRX",
565 "node-id": "ROADM-C1"
568 "dest-tp": "XPDR1-CLIENT1",
569 "src-tp": "XPDR1-NETWORK1",
575 headers = {'content-type': 'application/json'}
576 response = requests.request(
577 "POST", url, data=json.dumps(data),
578 headers=headers, auth=('admin', 'admin'))
579 self.assertEqual(response.status_code, requests.codes.ok)
580 res = response.json()
581 self.assertIn('Success', res["output"]["result"])
583 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
584 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
585 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
586 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
587 headers = {'content-type': 'application/json'}
588 response = requests.request(
589 "GET", url, headers=headers, auth=('admin', 'admin'))
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
593 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
595 def test_21_get_roadmconnection_ROADMA(self):
596 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
597 "org-openroadm-device:org-openroadm-device/roadm-connections/"
598 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
599 headers = {'content-type': 'application/json'}
600 response = requests.request(
601 "GET", url, headers=headers, auth=('admin', 'admin'))
602 self.assertEqual(response.status_code, requests.codes.ok)
603 res = response.json()
604 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
605 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
607 def test_22_get_roadmconnection_ROADMC(self):
608 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
609 "org-openroadm-device:org-openroadm-device/roadm-connections/"
610 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
611 headers = {'content-type': 'application/json'}
612 response = requests.request(
613 "GET", url, headers=headers, auth=('admin', 'admin'))
614 self.assertEqual(response.status_code, requests.codes.ok)
615 res = response.json()
616 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
618 def test_23_service_power_setup_XPDRC_XPDRA(self):
619 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
622 "service-name": "test",
626 "dest-tp": "XPDR1-NETWORK1",
627 "src-tp": "XPDR1-CLIENT1",
631 "dest-tp": "DEG1-TTP-TXRX",
632 "src-tp": "SRG1-PP1-TXRX",
633 "node-id": "ROADM-C1"
636 "src-tp": "DEG2-TTP-TXRX",
637 "dest-tp": "SRG1-PP1-TXRX",
638 "node-id": "ROADM-A1"
641 "src-tp": "XPDR1-NETWORK1",
642 "dest-tp": "XPDR1-CLIENT1",
648 headers = {'content-type': 'application/json'}
649 response = requests.request(
650 "POST", url, data=json.dumps(data),
651 headers=headers, auth=('admin', 'admin'))
652 self.assertEqual(response.status_code, requests.codes.ok)
653 res = response.json()
654 self.assertIn('Success', res["output"]["result"])
656 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
657 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
658 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
659 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
660 headers = {'content-type': 'application/json'}
661 response = requests.request(
662 "GET", url, headers=headers, auth=('admin', 'admin'))
663 self.assertEqual(response.status_code, requests.codes.ok)
664 res = response.json()
665 self.assertEqual(-5 , res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
666 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
668 def test_25_get_roadmconnection_ROADMC(self):
669 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
670 "org-openroadm-device:org-openroadm-device/roadm-connections/"
671 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
672 headers = {'content-type': 'application/json'}
673 response = requests.request(
674 "GET", url, headers=headers, auth=('admin', 'admin'))
675 self.assertEqual(response.status_code, requests.codes.ok)
676 res = response.json()
677 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
678 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
680 def test_26_service_power_turndown_XPDRA_XPDRC(self):
681 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
684 "service-name": "test",
688 "dest-tp": "XPDR1-NETWORK1",
689 "src-tp": "XPDR1-CLIENT1",
693 "dest-tp": "DEG2-TTP-TXRX",
694 "src-tp": "SRG1-PP1-TXRX",
695 "node-id": "ROADM-A1"
698 "dest-tp": "SRG1-PP1-TXRX",
699 "src-tp": "DEG1-TTP-TXRX",
700 "node-id": "ROADM-C1"
703 "dest-tp": "XPDR1-CLIENT1",
704 "src-tp": "XPDR1-NETWORK1",
710 headers = {'content-type': 'application/json'}
711 response = requests.request(
712 "POST", url, data=json.dumps(data),
713 headers=headers, auth=('admin', 'admin'))
714 self.assertEqual(response.status_code, requests.codes.ok)
715 res = response.json()
716 self.assertIn('Success', res["output"]["result"])
718 def test_27_get_roadmconnection_ROADMA(self):
719 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
720 "org-openroadm-device:org-openroadm-device/roadm-connections/"
721 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
722 headers = {'content-type': 'application/json'}
723 response = requests.request(
724 "GET", url, headers=headers, auth=('admin', 'admin'))
725 self.assertEqual(response.status_code, requests.codes.ok)
726 res = response.json()
727 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
728 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
730 def test_28_get_roadmconnection_ROADMC(self):
731 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
732 "org-openroadm-device:org-openroadm-device/roadm-connections/"
733 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
734 headers = {'content-type': 'application/json'}
735 response = requests.request(
736 "GET", url, headers=headers, auth=('admin', 'admin'))
737 self.assertEqual(response.status_code, requests.codes.ok)
738 res = response.json()
739 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
741 def test_29_servicePath_delete_AToZ(self):
742 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
745 "service-name": "test",
747 "modulation-format": "qpsk",
748 "operation": "delete",
751 "dest-tp": "XPDR1-NETWORK1",
752 "src-tp": "XPDR1-CLIENT1",
756 "dest-tp": "DEG2-TTP-TXRX",
757 "src-tp": "SRG1-PP1-TXRX",
758 "node-id": "ROADM-A1"
761 "dest-tp": "SRG1-PP1-TXRX",
762 "src-tp": "DEG1-TTP-TXRX",
763 "node-id": "ROADM-C1"
766 "dest-tp": "XPDR1-CLIENT1",
767 "src-tp": "XPDR1-NETWORK1",
773 headers = {'content-type': 'application/json'}
774 response = requests.request(
775 "POST", url, data=json.dumps(data),
776 headers=headers, auth=('admin', 'admin'))
777 self.assertEqual(response.status_code, requests.codes.ok)
778 res = response.json()
779 self.assertIn('Request processed', res["output"]["result"])
782 def test_30_servicePath_delete_ZToA(self):
783 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
786 "service-name": "test",
788 "modulation-format": "qpsk",
789 "operation": "delete",
792 "dest-tp": "XPDR1-NETWORK1",
793 "src-tp": "XPDR1-CLIENT1",
797 "dest-tp": "DEG1-TTP-TXRX",
798 "src-tp": "SRG1-PP1-TXRX",
799 "node-id": "ROADM-C1"
802 "src-tp": "DEG2-TTP-TXRX",
803 "dest-tp": "SRG1-PP1-TXRX",
804 "node-id": "ROADM-A1"
807 "src-tp": "XPDR1-NETWORK1",
808 "dest-tp": "XPDR1-CLIENT1",
814 headers = {'content-type': 'application/json'}
815 response = requests.request(
816 "POST", url, data=json.dumps(data),
817 headers=headers, auth=('admin', 'admin'))
818 self.assertEqual(response.status_code, requests.codes.ok)
819 res = response.json()
820 self.assertIn('Request processed', res["output"]["result"])
823 """to test case where SRG where the xpdr is connected to has no optical range data"""
825 def test_31_connect_xprdA_to_roadmA(self):
826 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
828 "networkutils:input": {
829 "networkutils:links-input": {
830 "networkutils:xpdr-node": "XPDR-A1",
831 "networkutils:xpdr-num": "1",
832 "networkutils:network-num": "2",
833 "networkutils:rdm-node": "ROADM-A1",
834 "networkutils:srg-num": "1",
835 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
839 headers = {'content-type': 'application/json'}
840 response = requests.request(
841 "POST", url, data=json.dumps(data),
842 headers=headers, auth=('admin', 'admin'))
843 self.assertEqual(response.status_code, requests.codes.ok)
844 res = response.json()
845 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
847 def test_32_connect_roadmA_to_xpdrA(self):
848 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
850 "networkutils:input": {
851 "networkutils:links-input": {
852 "networkutils:xpdr-node": "XPDR-A1",
853 "networkutils:xpdr-num": "1",
854 "networkutils:network-num": "2",
855 "networkutils:rdm-node": "ROADM-A1",
856 "networkutils:srg-num": "1",
857 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
861 headers = {'content-type': 'application/json'}
862 response = requests.request(
863 "POST", url, data=json.dumps(data),
864 headers=headers, auth=('admin', 'admin'))
865 self.assertEqual(response.status_code, requests.codes.ok)
866 res = response.json()
867 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
869 def test_33_servicePath_create_AToZ(self):
870 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
873 "service-name": "test2",
875 "modulation-format": "qpsk",
876 "operation": "create",
879 "dest-tp": "XPDR1-NETWORK2",
880 "src-tp": "XPDR1-CLIENT2",
884 "dest-tp": "DEG2-TTP-TXRX",
885 "src-tp": "SRG1-PP2-TXRX",
886 "node-id": "ROADM-A1"
891 headers = {'content-type': 'application/json'}
892 response = requests.request(
893 "POST", url, data=json.dumps(data),
894 headers=headers, auth=('admin', 'admin'))
895 self.assertEqual(response.status_code, requests.codes.ok)
896 res = response.json()
897 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
901 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
902 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
903 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
904 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
905 headers = {'content-type': 'application/json'}
906 response = requests.request(
907 "GET", url, headers=headers, auth=('admin', 'admin'))
908 self.assertEqual(response.status_code, requests.codes.ok)
909 res = response.json()
910 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
911 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
913 def test_35_servicePath_delete_AToZ(self):
914 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
917 "service-name": "test",
919 "modulation-format": "qpsk",
920 "operation": "delete",
923 "dest-tp": "XPDR1-NETWORK2",
924 "src-tp": "XPDR1-CLIENT2",
928 "dest-tp": "DEG2-TTP-TXRX",
929 "src-tp": "SRG1-PP2-TXRX",
930 "node-id": "ROADM-A1"
935 headers = {'content-type': 'application/json'}
936 response = requests.request(
937 "POST", url, data=json.dumps(data),
938 headers=headers, auth=('admin', 'admin'))
939 self.assertEqual(response.status_code, requests.codes.ok)
940 res = response.json()
941 self.assertIn('Request processed', res["output"]["result"])
944 def test_36_xpdrA_device_disconnected(self):
945 url = ("{}/config/network-topology:"
946 "network-topology/topology/topology-netconf/node/XPDR-A1"
947 .format(self.restconf_baseurl))
948 headers = {'content-type': 'application/json'}
949 response = requests.request(
950 "DELETE", url, headers=headers,
951 auth=('admin', 'admin'))
952 self.assertEqual(response.status_code, requests.codes.ok)
955 def test_37_xpdrC_device_disconnected(self):
956 url = ("{}/config/network-topology:"
957 "network-topology/topology/topology-netconf/node/XPDR-C1"
958 .format(self.restconf_baseurl))
959 headers = {'content-type': 'application/json'}
960 response = requests.request(
961 "DELETE", url, headers=headers,
962 auth=('admin', 'admin'))
963 self.assertEqual(response.status_code, requests.codes.ok)
966 def test_38_calculate_span_loss_current(self):
967 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
968 headers = {'content-type': 'application/json'}
969 response = requests.request(
970 "POST", url, headers=headers, auth=('admin', 'admin'))
971 self.assertEqual(response.status_code, requests.codes.ok)
972 res = response.json()
973 self.assertIn('Success',
974 res["output"]["result"])
977 def test_39_rdmA_device_disconnected(self):
978 url = ("{}/config/network-topology:"
979 "network-topology/topology/topology-netconf/node/ROADM-A1"
980 .format(self.restconf_baseurl))
981 headers = {'content-type': 'application/json'}
982 response = requests.request(
983 "DELETE", url, headers=headers,
984 auth=('admin', 'admin'))
985 self.assertEqual(response.status_code, requests.codes.ok)
988 def test_40_rdmC_device_disconnected(self):
989 url = ("{}/config/network-topology:"
990 "network-topology/topology/topology-netconf/node/ROADM-C1"
991 .format(self.restconf_baseurl))
992 headers = {'content-type': 'application/json'}
993 response = requests.request(
994 "DELETE", url, headers=headers,
995 auth=('admin', 'admin'))
996 self.assertEqual(response.status_code, requests.codes.ok)
999 if __name__ == "__main__":
1000 unittest.main(verbosity=2)