3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportOlmTesting(unittest.TestCase):
32 restconf_baseurl = "http://localhost:8181/restconf"
34 # START_IGNORE_XTESTING
38 cls.odl_process = test_utils.start_tpce()
39 cls.sim_process1 = test_utils.start_sim('xpdra')
40 cls.sim_process2 = test_utils.start_sim('roadma')
41 cls.sim_process3 = test_utils.start_sim('roadmc')
42 cls.sim_process4 = test_utils.start_sim('xpdrc')
45 def tearDownClass(cls):
46 for child in psutil.Process(cls.odl_process.pid).children():
47 child.send_signal(signal.SIGINT)
49 cls.odl_process.send_signal(signal.SIGINT)
50 cls.odl_process.wait()
51 for child in psutil.Process(cls.sim_process1.pid).children():
52 child.send_signal(signal.SIGINT)
54 cls.sim_process1.send_signal(signal.SIGINT)
55 cls.sim_process1.wait()
56 for child in psutil.Process(cls.sim_process2.pid).children():
57 child.send_signal(signal.SIGINT)
59 cls.sim_process2.send_signal(signal.SIGINT)
60 cls.sim_process2.wait()
61 for child in psutil.Process(cls.sim_process3.pid).children():
62 child.send_signal(signal.SIGINT)
64 cls.sim_process3.send_signal(signal.SIGINT)
65 cls.sim_process3.wait()
66 for child in psutil.Process(cls.sim_process4.pid).children():
67 child.send_signal(signal.SIGINT)
69 cls.sim_process4.send_signal(signal.SIGINT)
70 cls.sim_process4.wait()
73 print("execution of {}".format(self.id().split(".")[-1]))
78 def test_01_xpdrA_device_connected(self):
79 url = ("{}/config/network-topology:"
80 "network-topology/topology/topology-netconf/node/XPDR-A1"
81 .format(self.restconf_baseurl))
84 "netconf-node-topology:username": "admin",
85 "netconf-node-topology:password": "admin",
86 "netconf-node-topology:host": "127.0.0.1",
87 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
88 "netconf-node-topology:tcp-only": "false",
89 "netconf-node-topology:pass-through": {}}]}
90 headers = {'content-type': 'application/json'}
91 response = requests.request(
92 "PUT", url, data=json.dumps(data), headers=headers,
93 auth=('admin', 'admin'))
94 self.assertEqual(response.status_code, requests.codes.created)
97 def test_02_xpdrC_device_connected(self):
98 url = ("{}/config/network-topology:"
99 "network-topology/topology/topology-netconf/node/XPDR-C1"
100 .format(self.restconf_baseurl))
102 "node-id": "XPDR-C1",
103 "netconf-node-topology:username": "admin",
104 "netconf-node-topology:password": "admin",
105 "netconf-node-topology:host": "127.0.0.1",
106 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
107 "netconf-node-topology:tcp-only": "false",
108 "netconf-node-topology:pass-through": {}}]}
109 headers = {'content-type': 'application/json'}
110 response = requests.request(
111 "PUT", url, data=json.dumps(data), headers=headers,
112 auth=('admin', 'admin'))
113 self.assertEqual(response.status_code, requests.codes.created)
116 def test_03_rdmA_device_connected(self):
117 url = ("{}/config/network-topology:"
118 "network-topology/topology/topology-netconf/node/ROADM-A1"
119 .format(self.restconf_baseurl))
121 "node-id": "ROADM-A1",
122 "netconf-node-topology:username": "admin",
123 "netconf-node-topology:password": "admin",
124 "netconf-node-topology:host": "127.0.0.1",
125 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
126 "netconf-node-topology:tcp-only": "false",
127 "netconf-node-topology:pass-through": {}}]}
128 headers = {'content-type': 'application/json'}
129 response = requests.request(
130 "PUT", url, data=json.dumps(data), headers=headers,
131 auth=('admin', 'admin'))
132 self.assertEqual(response.status_code, requests.codes.created)
135 def test_04_rdmC_device_connected(self):
136 url = ("{}/config/network-topology:"
137 "network-topology/topology/topology-netconf/node/ROADM-C1"
138 .format(self.restconf_baseurl))
140 "node-id": "ROADM-C1",
141 "netconf-node-topology:username": "admin",
142 "netconf-node-topology:password": "admin",
143 "netconf-node-topology:host": "127.0.0.1",
144 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
145 "netconf-node-topology:tcp-only": "false",
146 "netconf-node-topology:pass-through": {}}]}
147 headers = {'content-type': 'application/json'}
148 response = requests.request(
149 "PUT", url, data=json.dumps(data), headers=headers,
150 auth=('admin', 'admin'))
151 self.assertEqual(response.status_code, requests.codes.created)
154 def test_05_connect_xprdA_to_roadmA(self):
155 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
157 "networkutils:input": {
158 "networkutils:links-input": {
159 "networkutils:xpdr-node": "XPDR-A1",
160 "networkutils:xpdr-num": "1",
161 "networkutils:network-num": "1",
162 "networkutils:rdm-node": "ROADM-A1",
163 "networkutils:srg-num": "1",
164 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
168 headers = {'content-type': 'application/json'}
169 response = requests.request(
170 "POST", url, data=json.dumps(data),
171 headers=headers, auth=('admin', 'admin'))
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
176 def test_06_connect_roadmA_to_xpdrA(self):
177 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
179 "networkutils:input": {
180 "networkutils:links-input": {
181 "networkutils:xpdr-node": "XPDR-A1",
182 "networkutils:xpdr-num": "1",
183 "networkutils:network-num": "1",
184 "networkutils:rdm-node": "ROADM-A1",
185 "networkutils:srg-num": "1",
186 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
190 headers = {'content-type': 'application/json'}
191 response = requests.request(
192 "POST", url, data=json.dumps(data),
193 headers=headers, auth=('admin', 'admin'))
194 self.assertEqual(response.status_code, requests.codes.ok)
195 res = response.json()
196 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
198 def test_07_connect_xprdC_to_roadmC(self):
199 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
201 "networkutils:input": {
202 "networkutils:links-input": {
203 "networkutils:xpdr-node": "XPDR-C1",
204 "networkutils:xpdr-num": "1",
205 "networkutils:network-num": "1",
206 "networkutils:rdm-node": "ROADM-C1",
207 "networkutils:srg-num": "1",
208 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
212 headers = {'content-type': 'application/json'}
213 response = requests.request(
214 "POST", url, data=json.dumps(data),
215 headers=headers, auth=('admin', 'admin'))
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
220 def test_08_connect_roadmC_to_xpdrC(self):
221 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
223 "networkutils:input": {
224 "networkutils:links-input": {
225 "networkutils:xpdr-node": "XPDR-C1",
226 "networkutils:xpdr-num": "1",
227 "networkutils:network-num": "1",
228 "networkutils:rdm-node": "ROADM-C1",
229 "networkutils:srg-num": "1",
230 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
234 headers = {'content-type': 'application/json'}
235 response = requests.request(
236 "POST", url, data=json.dumps(data),
237 headers=headers, auth=('admin', 'admin'))
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
240 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
242 def test_09_create_OTS_ROADMA(self):
243 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
246 "node-id": "ROADM-A1",
247 "logical-connection-point": "DEG1-TTP-TXRX"
250 headers = {'content-type': 'application/json'}
251 response = requests.request(
252 "POST", url, data=json.dumps(data),
253 headers=headers, auth=('admin', 'admin'))
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
258 res["output"]["result"])
260 def test_10_create_OTS_ROADMC(self):
261 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
264 "node-id": "ROADM-C1",
265 "logical-connection-point": "DEG2-TTP-TXRX"
268 headers = {'content-type': 'application/json'}
269 response = requests.request(
270 "POST", url, data=json.dumps(data),
271 headers=headers, auth=('admin', 'admin'))
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
275 res["output"]["result"])
277 def test_11_get_PM_ROADMA(self):
278 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
281 "node-id": "ROADM-A1",
282 "resource-type": "interface",
283 "granularity": "15min",
284 "resource-identifier": {
285 "resource-name": "OTS-DEG2-TTP-TXRX"
289 headers = {'content-type': 'application/json'}
290 response = requests.request(
291 "POST", url, data=json.dumps(data),
292 headers=headers, auth=('admin', 'admin'))
293 self.assertEqual(response.status_code, requests.codes.ok)
294 res = response.json()
296 "pmparameter-name": "OpticalPowerOutput",
297 "pmparameter-value": "2.5"
298 }, res["output"]["measurements"])
300 "pmparameter-name": "OpticalReturnLoss",
301 "pmparameter-value": "40"
302 }, res["output"]["measurements"])
304 "pmparameter-name": "OpticalPowerInput",
305 "pmparameter-value": "-21.1"
306 }, res["output"]["measurements"])
308 def test_12_get_PM_ROADMC(self):
309 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
312 "node-id": "ROADM-C1",
313 "resource-type": "interface",
314 "granularity": "15min",
315 "resource-identifier": {
316 "resource-name": "OTS-DEG1-TTP-TXRX"
320 headers = {'content-type': 'application/json'}
321 response = requests.request(
322 "POST", url, data=json.dumps(data),
323 headers=headers, auth=('admin', 'admin'))
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
327 "pmparameter-name": "OpticalPowerOutput",
328 "pmparameter-value": "4.6"
329 }, res["output"]["measurements"])
331 "pmparameter-name": "OpticalReturnLoss",
332 "pmparameter-value": "49.1"
333 }, res["output"]["measurements"])
335 "pmparameter-name": "OpticalPowerInput",
336 "pmparameter-value": "-15.1"
337 }, res["output"]["measurements"])
339 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
340 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
344 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
347 headers = {'content-type': 'application/json'}
348 response = requests.request(
349 "POST", url, data=json.dumps(data),
350 headers=headers, auth=('admin', 'admin'))
351 self.assertEqual(response.status_code, requests.codes.ok)
352 res = response.json()
353 self.assertIn('Success',
354 res["output"]["result"])
357 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
358 }, res["output"]["spans"])
361 def test_14_calculate_span_loss_base_all(self):
362 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
368 headers = {'content-type': 'application/json'}
369 response = requests.request(
370 "POST", url, data=json.dumps(data),
371 headers=headers, auth=('admin', 'admin'))
372 self.assertEqual(response.status_code, requests.codes.ok)
373 res = response.json()
374 self.assertIn('Success',
375 res["output"]["result"])
378 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
379 }, res["output"]["spans"])
382 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
383 }, res["output"]["spans"])
386 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
387 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
388 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
389 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
390 headers = {'content-type': 'application/json'}
391 response = requests.request(
392 "GET", url, headers=headers, auth=('admin', 'admin'))
393 self.assertEqual(response.status_code, requests.codes.ok)
394 res = response.json()
395 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
396 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
398 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
399 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
400 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
401 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
402 headers = {'content-type': 'application/json'}
403 response = requests.request(
404 "GET", url, headers=headers, auth=('admin', 'admin'))
405 self.assertEqual(response.status_code, requests.codes.ok)
406 res = response.json()
407 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
408 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
410 def test_17_servicePath_create_AToZ(self):
411 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
414 "service-name": "test",
416 "modulation-format": "qpsk",
417 "operation": "create",
420 "dest-tp": "XPDR1-NETWORK1",
421 "src-tp": "XPDR1-CLIENT1",
425 "dest-tp": "DEG2-TTP-TXRX",
426 "src-tp": "SRG1-PP1-TXRX",
427 "node-id": "ROADM-A1"
430 "dest-tp": "SRG1-PP1-TXRX",
431 "src-tp": "DEG1-TTP-TXRX",
432 "node-id": "ROADM-C1"
435 "dest-tp": "XPDR1-CLIENT1",
436 "src-tp": "XPDR1-NETWORK1",
442 headers = {'content-type': 'application/json'}
443 response = requests.request(
444 "POST", url, data=json.dumps(data),
445 headers=headers, auth=('admin', 'admin'))
446 self.assertEqual(response.status_code, requests.codes.ok)
447 res = response.json()
448 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
452 def test_18_servicePath_create_ZToA(self):
453 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
456 "service-name": "test",
458 "modulation-format": "qpsk",
459 "operation": "create",
462 "dest-tp": "XPDR1-NETWORK1",
463 "src-tp": "XPDR1-CLIENT1",
467 "dest-tp": "DEG1-TTP-TXRX",
468 "src-tp": "SRG1-PP1-TXRX",
469 "node-id": "ROADM-C1"
472 "src-tp": "DEG2-TTP-TXRX",
473 "dest-tp": "SRG1-PP1-TXRX",
474 "node-id": "ROADM-A1"
477 "src-tp": "XPDR1-NETWORK1",
478 "dest-tp": "XPDR1-CLIENT1",
484 headers = {'content-type': 'application/json'}
485 response = requests.request(
486 "POST", url, data=json.dumps(data),
487 headers=headers, auth=('admin', 'admin'))
488 self.assertEqual(response.status_code, requests.codes.ok)
489 res = response.json()
490 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
494 def test_19_service_power_setup_XPDRA_XPDRC(self):
495 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
498 "service-name": "test",
502 "dest-tp": "XPDR1-NETWORK1",
503 "src-tp": "XPDR1-CLIENT1",
507 "dest-tp": "DEG2-TTP-TXRX",
508 "src-tp": "SRG1-PP1-TXRX",
509 "node-id": "ROADM-A1"
512 "dest-tp": "SRG1-PP1-TXRX",
513 "src-tp": "DEG1-TTP-TXRX",
514 "node-id": "ROADM-C1"
517 "dest-tp": "XPDR1-CLIENT1",
518 "src-tp": "XPDR1-NETWORK1",
524 headers = {'content-type': 'application/json'}
525 response = requests.request(
526 "POST", url, data=json.dumps(data),
527 headers=headers, auth=('admin', 'admin'))
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 self.assertIn('Success', res["output"]["result"])
532 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
533 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
534 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
535 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
536 headers = {'content-type': 'application/json'}
537 response = requests.request(
538 "GET", url, headers=headers, auth=('admin', 'admin'))
539 self.assertEqual(response.status_code, requests.codes.ok)
540 res = response.json()
541 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
542 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
544 def test_21_get_roadmconnection_ROADMA(self):
545 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
546 "org-openroadm-device:org-openroadm-device/roadm-connections/"
547 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
548 headers = {'content-type': 'application/json'}
549 response = requests.request(
550 "GET", url, headers=headers, auth=('admin', 'admin'))
551 self.assertEqual(response.status_code, requests.codes.ok)
552 res = response.json()
553 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
554 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
556 def test_22_get_roadmconnection_ROADMC(self):
557 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
558 "org-openroadm-device:org-openroadm-device/roadm-connections/"
559 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
560 headers = {'content-type': 'application/json'}
561 response = requests.request(
562 "GET", url, headers=headers, auth=('admin', 'admin'))
563 self.assertEqual(response.status_code, requests.codes.ok)
564 res = response.json()
565 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
567 def test_23_service_power_setup_XPDRC_XPDRA(self):
568 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
571 "service-name": "test",
575 "dest-tp": "XPDR1-NETWORK1",
576 "src-tp": "XPDR1-CLIENT1",
580 "dest-tp": "DEG1-TTP-TXRX",
581 "src-tp": "SRG1-PP1-TXRX",
582 "node-id": "ROADM-C1"
585 "src-tp": "DEG2-TTP-TXRX",
586 "dest-tp": "SRG1-PP1-TXRX",
587 "node-id": "ROADM-A1"
590 "src-tp": "XPDR1-NETWORK1",
591 "dest-tp": "XPDR1-CLIENT1",
597 headers = {'content-type': 'application/json'}
598 response = requests.request(
599 "POST", url, data=json.dumps(data),
600 headers=headers, auth=('admin', 'admin'))
601 self.assertEqual(response.status_code, requests.codes.ok)
602 res = response.json()
603 self.assertIn('Success', res["output"]["result"])
605 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
606 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
607 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
608 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
609 headers = {'content-type': 'application/json'}
610 response = requests.request(
611 "GET", url, headers=headers, auth=('admin', 'admin'))
612 self.assertEqual(response.status_code, requests.codes.ok)
613 res = response.json()
614 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
615 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
617 def test_25_get_roadmconnection_ROADMC(self):
618 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
619 "org-openroadm-device:org-openroadm-device/roadm-connections/"
620 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
621 headers = {'content-type': 'application/json'}
622 response = requests.request(
623 "GET", url, headers=headers, auth=('admin', 'admin'))
624 self.assertEqual(response.status_code, requests.codes.ok)
625 res = response.json()
626 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
627 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
629 def test_26_service_power_turndown_XPDRA_XPDRC(self):
630 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
633 "service-name": "test",
637 "dest-tp": "XPDR1-NETWORK1",
638 "src-tp": "XPDR1-CLIENT1",
642 "dest-tp": "DEG2-TTP-TXRX",
643 "src-tp": "SRG1-PP1-TXRX",
644 "node-id": "ROADM-A1"
647 "dest-tp": "SRG1-PP1-TXRX",
648 "src-tp": "DEG1-TTP-TXRX",
649 "node-id": "ROADM-C1"
652 "dest-tp": "XPDR1-CLIENT1",
653 "src-tp": "XPDR1-NETWORK1",
659 headers = {'content-type': 'application/json'}
660 response = requests.request(
661 "POST", url, data=json.dumps(data),
662 headers=headers, auth=('admin', 'admin'))
663 self.assertEqual(response.status_code, requests.codes.ok)
664 res = response.json()
665 self.assertIn('Success', res["output"]["result"])
667 def test_27_get_roadmconnection_ROADMA(self):
668 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
669 "org-openroadm-device:org-openroadm-device/roadm-connections/"
670 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
671 headers = {'content-type': 'application/json'}
672 response = requests.request(
673 "GET", url, headers=headers, auth=('admin', 'admin'))
674 self.assertEqual(response.status_code, requests.codes.ok)
675 res = response.json()
676 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
677 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
679 def test_28_get_roadmconnection_ROADMC(self):
680 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
681 "org-openroadm-device:org-openroadm-device/roadm-connections/"
682 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
683 headers = {'content-type': 'application/json'}
684 response = requests.request(
685 "GET", url, headers=headers, auth=('admin', 'admin'))
686 self.assertEqual(response.status_code, requests.codes.ok)
687 res = response.json()
688 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
690 def test_29_servicePath_delete_AToZ(self):
691 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
694 "service-name": "test",
696 "modulation-format": "qpsk",
697 "operation": "delete",
700 "dest-tp": "XPDR1-NETWORK1",
701 "src-tp": "XPDR1-CLIENT1",
705 "dest-tp": "DEG2-TTP-TXRX",
706 "src-tp": "SRG1-PP1-TXRX",
707 "node-id": "ROADM-A1"
710 "dest-tp": "SRG1-PP1-TXRX",
711 "src-tp": "DEG1-TTP-TXRX",
712 "node-id": "ROADM-C1"
715 "dest-tp": "XPDR1-CLIENT1",
716 "src-tp": "XPDR1-NETWORK1",
722 headers = {'content-type': 'application/json'}
723 response = requests.request(
724 "POST", url, data=json.dumps(data),
725 headers=headers, auth=('admin', 'admin'))
726 self.assertEqual(response.status_code, requests.codes.ok)
727 res = response.json()
728 self.assertIn('Request processed', res["output"]["result"])
731 def test_30_servicePath_delete_ZToA(self):
732 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
735 "service-name": "test",
737 "modulation-format": "qpsk",
738 "operation": "delete",
741 "dest-tp": "XPDR1-NETWORK1",
742 "src-tp": "XPDR1-CLIENT1",
746 "dest-tp": "DEG1-TTP-TXRX",
747 "src-tp": "SRG1-PP1-TXRX",
748 "node-id": "ROADM-C1"
751 "src-tp": "DEG2-TTP-TXRX",
752 "dest-tp": "SRG1-PP1-TXRX",
753 "node-id": "ROADM-A1"
756 "src-tp": "XPDR1-NETWORK1",
757 "dest-tp": "XPDR1-CLIENT1",
763 headers = {'content-type': 'application/json'}
764 response = requests.request(
765 "POST", url, data=json.dumps(data),
766 headers=headers, auth=('admin', 'admin'))
767 self.assertEqual(response.status_code, requests.codes.ok)
768 res = response.json()
769 self.assertIn('Request processed', res["output"]["result"])
772 """to test case where SRG where the xpdr is connected to has no optical range data"""
774 def test_31_connect_xprdA_to_roadmA(self):
775 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
777 "networkutils:input": {
778 "networkutils:links-input": {
779 "networkutils:xpdr-node": "XPDR-A1",
780 "networkutils:xpdr-num": "1",
781 "networkutils:network-num": "2",
782 "networkutils:rdm-node": "ROADM-A1",
783 "networkutils:srg-num": "1",
784 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
788 headers = {'content-type': 'application/json'}
789 response = requests.request(
790 "POST", url, data=json.dumps(data),
791 headers=headers, auth=('admin', 'admin'))
792 self.assertEqual(response.status_code, requests.codes.ok)
793 res = response.json()
794 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
796 def test_32_connect_roadmA_to_xpdrA(self):
797 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
799 "networkutils:input": {
800 "networkutils:links-input": {
801 "networkutils:xpdr-node": "XPDR-A1",
802 "networkutils:xpdr-num": "1",
803 "networkutils:network-num": "2",
804 "networkutils:rdm-node": "ROADM-A1",
805 "networkutils:srg-num": "1",
806 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
810 headers = {'content-type': 'application/json'}
811 response = requests.request(
812 "POST", url, data=json.dumps(data),
813 headers=headers, auth=('admin', 'admin'))
814 self.assertEqual(response.status_code, requests.codes.ok)
815 res = response.json()
816 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
818 def test_33_servicePath_create_AToZ(self):
819 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
822 "service-name": "test2",
824 "modulation-format": "qpsk",
825 "operation": "create",
828 "dest-tp": "XPDR1-NETWORK2",
829 "src-tp": "XPDR1-CLIENT2",
833 "dest-tp": "DEG2-TTP-TXRX",
834 "src-tp": "SRG1-PP2-TXRX",
835 "node-id": "ROADM-A1"
840 headers = {'content-type': 'application/json'}
841 response = requests.request(
842 "POST", url, data=json.dumps(data),
843 headers=headers, auth=('admin', 'admin'))
844 self.assertEqual(response.status_code, requests.codes.ok)
845 res = response.json()
846 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
850 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
851 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
852 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
853 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
854 headers = {'content-type': 'application/json'}
855 response = requests.request(
856 "GET", url, headers=headers, auth=('admin', 'admin'))
857 self.assertEqual(response.status_code, requests.codes.ok)
858 res = response.json()
859 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
860 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
862 def test_35_servicePath_delete_AToZ(self):
863 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
866 "service-name": "test",
868 "modulation-format": "qpsk",
869 "operation": "delete",
872 "dest-tp": "XPDR1-NETWORK2",
873 "src-tp": "XPDR1-CLIENT2",
877 "dest-tp": "DEG2-TTP-TXRX",
878 "src-tp": "SRG1-PP2-TXRX",
879 "node-id": "ROADM-A1"
884 headers = {'content-type': 'application/json'}
885 response = requests.request(
886 "POST", url, data=json.dumps(data),
887 headers=headers, auth=('admin', 'admin'))
888 self.assertEqual(response.status_code, requests.codes.ok)
889 res = response.json()
890 self.assertIn('Request processed', res["output"]["result"])
893 def test_36_xpdrA_device_disconnected(self):
894 url = ("{}/config/network-topology:"
895 "network-topology/topology/topology-netconf/node/XPDR-A1"
896 .format(self.restconf_baseurl))
897 headers = {'content-type': 'application/json'}
898 response = requests.request(
899 "DELETE", url, headers=headers,
900 auth=('admin', 'admin'))
901 self.assertEqual(response.status_code, requests.codes.ok)
904 def test_37_xpdrC_device_disconnected(self):
905 url = ("{}/config/network-topology:"
906 "network-topology/topology/topology-netconf/node/XPDR-C1"
907 .format(self.restconf_baseurl))
908 headers = {'content-type': 'application/json'}
909 response = requests.request(
910 "DELETE", url, headers=headers,
911 auth=('admin', 'admin'))
912 self.assertEqual(response.status_code, requests.codes.ok)
915 def test_38_calculate_span_loss_current(self):
916 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
917 headers = {'content-type': 'application/json'}
918 response = requests.request(
919 "POST", url, headers=headers, auth=('admin', 'admin'))
920 self.assertEqual(response.status_code, requests.codes.ok)
921 res = response.json()
922 self.assertIn('Success',
923 res["output"]["result"])
926 def test_39_rdmA_device_disconnected(self):
927 url = ("{}/config/network-topology:"
928 "network-topology/topology/topology-netconf/node/ROADM-A1"
929 .format(self.restconf_baseurl))
930 headers = {'content-type': 'application/json'}
931 response = requests.request(
932 "DELETE", url, headers=headers,
933 auth=('admin', 'admin'))
934 self.assertEqual(response.status_code, requests.codes.ok)
937 def test_40_rdmC_device_disconnected(self):
938 url = ("{}/config/network-topology:"
939 "network-topology/topology/topology-netconf/node/ROADM-C1"
940 .format(self.restconf_baseurl))
941 headers = {'content-type': 'application/json'}
942 response = requests.request(
943 "DELETE", url, headers=headers,
944 auth=('admin', 'admin'))
945 self.assertEqual(response.status_code, requests.codes.ok)
949 if __name__ == "__main__":
950 unittest.main(verbosity=2)