3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportOlmTesting(unittest.TestCase):
32 restconf_baseurl = "http://localhost:8181/restconf"
34 # START_IGNORE_XTESTING
38 cls.sim_process1 = test_utils.start_sim('xpdra')
41 cls.sim_process2 = test_utils.start_sim('roadma-full')
44 cls.sim_process3 = test_utils.start_sim('roadmc-full')
47 cls.sim_process4 = test_utils.start_sim('xpdrc')
49 print("all sims started")
51 cls.odl_process = test_utils.start_tpce()
53 print("opendaylight started")
56 def tearDownClass(cls):
57 for child in psutil.Process(cls.odl_process.pid).children():
58 child.send_signal(signal.SIGINT)
60 cls.odl_process.send_signal(signal.SIGINT)
61 cls.odl_process.wait()
62 for child in psutil.Process(cls.sim_process1.pid).children():
63 child.send_signal(signal.SIGINT)
65 cls.sim_process1.send_signal(signal.SIGINT)
66 cls.sim_process1.wait()
67 for child in psutil.Process(cls.sim_process2.pid).children():
68 child.send_signal(signal.SIGINT)
70 cls.sim_process2.send_signal(signal.SIGINT)
71 cls.sim_process2.wait()
72 for child in psutil.Process(cls.sim_process3.pid).children():
73 child.send_signal(signal.SIGINT)
75 cls.sim_process3.send_signal(signal.SIGINT)
76 cls.sim_process3.wait()
77 for child in psutil.Process(cls.sim_process4.pid).children():
78 child.send_signal(signal.SIGINT)
80 cls.sim_process4.send_signal(signal.SIGINT)
81 cls.sim_process4.wait()
84 print("execution of {}".format(self.id().split(".")[-1]))
89 def test_01_xpdrA_device_connected(self):
90 url = ("{}/config/network-topology:"
91 "network-topology/topology/topology-netconf/node/XPDRA01"
92 .format(self.restconf_baseurl))
95 "netconf-node-topology:username": "admin",
96 "netconf-node-topology:password": "admin",
97 "netconf-node-topology:host": "127.0.0.1",
98 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
99 "netconf-node-topology:tcp-only": "false",
100 "netconf-node-topology:pass-through": {}}]}
101 headers = {'content-type': 'application/json'}
102 response = requests.request(
103 "PUT", url, data=json.dumps(data), headers=headers,
104 auth=('admin', 'admin'))
105 self.assertEqual(response.status_code, requests.codes.created)
108 def test_02_xpdrC_device_connected(self):
109 url = ("{}/config/network-topology:"
110 "network-topology/topology/topology-netconf/node/XPDRC01"
111 .format(self.restconf_baseurl))
113 "node-id": "XPDRC01",
114 "netconf-node-topology:username": "admin",
115 "netconf-node-topology:password": "admin",
116 "netconf-node-topology:host": "127.0.0.1",
117 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
118 "netconf-node-topology:tcp-only": "false",
119 "netconf-node-topology:pass-through": {}}]}
120 headers = {'content-type': 'application/json'}
121 response = requests.request(
122 "PUT", url, data=json.dumps(data), headers=headers,
123 auth=('admin', 'admin'))
124 self.assertEqual(response.status_code, requests.codes.created)
127 def test_03_rdmA_device_connected(self):
128 url = ("{}/config/network-topology:"
129 "network-topology/topology/topology-netconf/node/ROADMA01"
130 .format(self.restconf_baseurl))
132 "node-id": "ROADMA01",
133 "netconf-node-topology:username": "admin",
134 "netconf-node-topology:password": "admin",
135 "netconf-node-topology:host": "127.0.0.1",
136 "netconf-node-topology:port": test_utils.sims['roadma-full']['port'],
137 "netconf-node-topology:tcp-only": "false",
138 "netconf-node-topology:pass-through": {}}]}
139 headers = {'content-type': 'application/json'}
140 response = requests.request(
141 "PUT", url, data=json.dumps(data), headers=headers,
142 auth=('admin', 'admin'))
143 self.assertEqual(response.status_code, requests.codes.created)
146 def test_04_rdmC_device_connected(self):
147 url = ("{}/config/network-topology:"
148 "network-topology/topology/topology-netconf/node/ROADMC01"
149 .format(self.restconf_baseurl))
151 "node-id": "ROADMC01",
152 "netconf-node-topology:username": "admin",
153 "netconf-node-topology:password": "admin",
154 "netconf-node-topology:host": "127.0.0.1",
155 "netconf-node-topology:port": test_utils.sims['roadmc-full']['port'],
156 "netconf-node-topology:tcp-only": "false",
157 "netconf-node-topology:pass-through": {}}]}
158 headers = {'content-type': 'application/json'}
159 response = requests.request(
160 "PUT", url, data=json.dumps(data), headers=headers,
161 auth=('admin', 'admin'))
162 self.assertEqual(response.status_code, requests.codes.created)
165 def test_05_connect_xprdA_to_roadmA(self):
166 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
168 "networkutils:input": {
169 "networkutils:links-input": {
170 "networkutils:xpdr-node": "XPDRA01",
171 "networkutils:xpdr-num": "1",
172 "networkutils:network-num": "1",
173 "networkutils:rdm-node": "ROADMA01",
174 "networkutils:srg-num": "1",
175 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "POST", url, data=json.dumps(data),
182 headers=headers, auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
187 def test_06_connect_roadmA_to_xpdrA(self):
188 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
190 "networkutils:input": {
191 "networkutils:links-input": {
192 "networkutils:xpdr-node": "XPDRA01",
193 "networkutils:xpdr-num": "1",
194 "networkutils:network-num": "1",
195 "networkutils:rdm-node": "ROADMA01",
196 "networkutils:srg-num": "1",
197 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
201 headers = {'content-type': 'application/json'}
202 response = requests.request(
203 "POST", url, data=json.dumps(data),
204 headers=headers, auth=('admin', 'admin'))
205 self.assertEqual(response.status_code, requests.codes.ok)
206 res = response.json()
207 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
209 def test_07_connect_xprdC_to_roadmC(self):
210 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
212 "networkutils:input": {
213 "networkutils:links-input": {
214 "networkutils:xpdr-node": "XPDRC01",
215 "networkutils:xpdr-num": "1",
216 "networkutils:network-num": "1",
217 "networkutils:rdm-node": "ROADMC01",
218 "networkutils:srg-num": "1",
219 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
223 headers = {'content-type': 'application/json'}
224 response = requests.request(
225 "POST", url, data=json.dumps(data),
226 headers=headers, auth=('admin', 'admin'))
227 self.assertEqual(response.status_code, requests.codes.ok)
228 res = response.json()
229 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
231 def test_08_connect_roadmC_to_xpdrC(self):
232 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
234 "networkutils:input": {
235 "networkutils:links-input": {
236 "networkutils:xpdr-node": "XPDRC01",
237 "networkutils:xpdr-num": "1",
238 "networkutils:network-num": "1",
239 "networkutils:rdm-node": "ROADMC01",
240 "networkutils:srg-num": "1",
241 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
245 headers = {'content-type': 'application/json'}
246 response = requests.request(
247 "POST", url, data=json.dumps(data),
248 headers=headers, auth=('admin', 'admin'))
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
253 def test_09_create_OTS_ROADMA(self):
254 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
257 "node-id": "ROADMA01",
258 "logical-connection-point": "DEG1-TTP-TXRX"
261 headers = {'content-type': 'application/json'}
262 response = requests.request(
263 "POST", url, data=json.dumps(data),
264 headers=headers, auth=('admin', 'admin'))
265 self.assertEqual(response.status_code, requests.codes.ok)
266 res = response.json()
267 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA01',
268 res["output"]["result"])
270 def test_10_create_OTS_ROADMC(self):
271 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
274 "node-id": "ROADMC01",
275 "logical-connection-point": "DEG2-TTP-TXRX"
278 headers = {'content-type': 'application/json'}
279 response = requests.request(
280 "POST", url, data=json.dumps(data),
281 headers=headers, auth=('admin', 'admin'))
282 self.assertEqual(response.status_code, requests.codes.ok)
283 res = response.json()
284 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC01',
285 res["output"]["result"])
287 def test_11_get_PM_ROADMA(self):
288 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
291 "node-id": "ROADMA01",
292 "resource-type": "interface",
293 "granularity": "15min",
294 "resource-identifier": {
295 "resource-name": "OTS-DEG1-TTP-TXRX"
299 headers = {'content-type': 'application/json'}
300 response = requests.request(
301 "POST", url, data=json.dumps(data),
302 headers=headers, auth=('admin', 'admin'))
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
306 "pmparameter-name": "OpticalPowerOutput",
307 "pmparameter-value": "2.5"
308 }, res["output"]["measurements"])
310 "pmparameter-name": "OpticalReturnLoss",
311 "pmparameter-value": "49.9"
312 }, res["output"]["measurements"])
314 "pmparameter-name": "OpticalPowerInput",
315 "pmparameter-value": "3"
316 }, res["output"]["measurements"])
318 def test_12_get_PM_ROADMC(self):
319 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
322 "node-id": "ROADMC01",
323 "resource-type": "interface",
324 "granularity": "15min",
325 "resource-identifier": {
326 "resource-name": "OTS-DEG2-TTP-TXRX"
330 headers = {'content-type': 'application/json'}
331 response = requests.request(
332 "POST", url, data=json.dumps(data),
333 headers=headers, auth=('admin', 'admin'))
334 self.assertEqual(response.status_code, requests.codes.ok)
335 res = response.json()
337 "pmparameter-name": "OpticalPowerOutput",
338 "pmparameter-value": "18.1"
339 }, res["output"]["measurements"])
341 "pmparameter-name": "OpticalReturnLoss",
342 "pmparameter-value": "48.8"
343 }, res["output"]["measurements"])
345 "pmparameter-name": "OpticalPowerInput",
346 "pmparameter-value": "-3.2"
347 }, res["output"]["measurements"])
349 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
350 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
354 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
357 headers = {'content-type': 'application/json'}
358 response = requests.request(
359 "POST", url, data=json.dumps(data),
360 headers=headers, auth=('admin', 'admin'))
361 self.assertEqual(response.status_code, requests.codes.ok)
362 res = response.json()
363 self.assertIn('Success',
364 res["output"]["result"])
367 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
368 }, res["output"]["spans"])
371 def test_14_calculate_span_loss_base_all(self):
372 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
378 headers = {'content-type': 'application/json'}
379 response = requests.request(
380 "POST", url, data=json.dumps(data),
381 headers=headers, auth=('admin', 'admin'))
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
384 self.assertIn('Success',
385 res["output"]["result"])
388 "link-id": "ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX"
389 }, res["output"]["spans"])
392 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
393 }, res["output"]["spans"])
396 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
397 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
398 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
399 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
400 headers = {'content-type': 'application/json'}
401 response = requests.request(
402 "GET", url, headers=headers, auth=('admin', 'admin'))
403 self.assertEqual(response.status_code, requests.codes.ok)
404 res = response.json()
405 self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
406 self.assertEqual(15.1, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
408 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
409 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
410 "node/ROADMC01/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
411 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
412 headers = {'content-type': 'application/json'}
413 response = requests.request(
414 "GET", url, headers=headers, auth=('admin', 'admin'))
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 self.assertEqual(15.1, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
418 self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
420 def test_17_servicePath_create_AToZ(self):
421 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
424 "service-name": "test",
426 "modulation-format": "qpsk",
427 "operation": "create",
430 "dest-tp": "XPDR1-NETWORK1",
431 "src-tp": "XPDR1-CLIENT1",
435 "dest-tp": "DEG1-TTP-TXRX",
436 "src-tp": "SRG1-PP1-TXRX",
437 "node-id": "ROADMA01"
440 "dest-tp": "SRG1-PP1-TXRX",
441 "src-tp": "DEG2-TTP-TXRX",
442 "node-id": "ROADMC01"
445 "dest-tp": "XPDR1-CLIENT1",
446 "src-tp": "XPDR1-NETWORK1",
452 headers = {'content-type': 'application/json'}
453 response = requests.request(
454 "POST", url, data=json.dumps(data),
455 headers=headers, auth=('admin', 'admin'))
456 self.assertEqual(response.status_code, requests.codes.ok)
457 res = response.json()
458 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
462 def test_18_servicePath_create_ZToA(self):
463 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
466 "service-name": "test",
468 "modulation-format": "qpsk",
469 "operation": "create",
472 "dest-tp": "XPDR1-NETWORK1",
473 "src-tp": "XPDR1-CLIENT1",
477 "dest-tp": "DEG2-TTP-TXRX",
478 "src-tp": "SRG1-PP1-TXRX",
479 "node-id": "ROADMC01"
482 "src-tp": "DEG1-TTP-TXRX",
483 "dest-tp": "SRG1-PP1-TXRX",
484 "node-id": "ROADMA01"
487 "src-tp": "XPDR1-NETWORK1",
488 "dest-tp": "XPDR1-CLIENT1",
494 headers = {'content-type': 'application/json'}
495 response = requests.request(
496 "POST", url, data=json.dumps(data),
497 headers=headers, auth=('admin', 'admin'))
498 self.assertEqual(response.status_code, requests.codes.ok)
499 res = response.json()
500 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
504 def test_19_service_power_setup_XPDRA_XPDRC(self):
505 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
508 "service-name": "test",
512 "dest-tp": "XPDR1-NETWORK1",
513 "src-tp": "XPDR1-CLIENT1",
517 "dest-tp": "DEG1-TTP-TXRX",
518 "src-tp": "SRG1-PP1-TXRX",
519 "node-id": "ROADMA01"
522 "dest-tp": "SRG1-PP1-TXRX",
523 "src-tp": "DEG2-TTP-TXRX",
524 "node-id": "ROADMC01"
527 "dest-tp": "XPDR1-CLIENT1",
528 "src-tp": "XPDR1-NETWORK1",
534 headers = {'content-type': 'application/json'}
535 response = requests.request(
536 "POST", url, data=json.dumps(data),
537 headers=headers, auth=('admin', 'admin'))
538 self.assertEqual(response.status_code, requests.codes.ok)
539 res = response.json()
540 self.assertIn('Success', res["output"]["result"])
542 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
543 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA01/yang-ext:mount/"
544 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
545 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
546 headers = {'content-type': 'application/json'}
547 response = requests.request(
548 "GET", url, headers=headers, auth=('admin', 'admin'))
549 self.assertEqual(response.status_code, requests.codes.ok)
550 res = response.json()
551 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
552 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
554 def test_21_get_roadmconnection_ROADMA(self):
555 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA01/yang-ext:mount/"
556 "org-openroadm-device:org-openroadm-device/roadm-connections/"
557 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
558 headers = {'content-type': 'application/json'}
559 response = requests.request(
560 "GET", url, headers=headers, auth=('admin', 'admin'))
561 self.assertEqual(response.status_code, requests.codes.ok)
562 res = response.json()
563 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
564 self.assertEqual(-3.3, res['roadm-connections'][0]['target-output-power'])
566 def test_22_get_roadmconnection_ROADMC(self):
567 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
568 "org-openroadm-device:org-openroadm-device/roadm-connections/"
569 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
570 headers = {'content-type': 'application/json'}
571 response = requests.request(
572 "GET", url, headers=headers, auth=('admin', 'admin'))
573 self.assertEqual(response.status_code, requests.codes.ok)
574 res = response.json()
575 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
577 def test_23_service_power_setup_XPDRC_XPDRA(self):
578 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
581 "service-name": "test",
585 "dest-tp": "XPDR1-NETWORK1",
586 "src-tp": "XPDR1-CLIENT1",
590 "dest-tp": "DEG2-TTP-TXRX",
591 "src-tp": "SRG1-PP1-TXRX",
592 "node-id": "ROADMC01"
595 "src-tp": "DEG1-TTP-TXRX",
596 "dest-tp": "SRG1-PP1-TXRX",
597 "node-id": "ROADMA01"
600 "src-tp": "XPDR1-NETWORK1",
601 "dest-tp": "XPDR1-CLIENT1",
607 headers = {'content-type': 'application/json'}
608 response = requests.request(
609 "POST", url, data=json.dumps(data),
610 headers=headers, auth=('admin', 'admin'))
611 self.assertEqual(response.status_code, requests.codes.ok)
612 res = response.json()
613 self.assertIn('Success', res["output"]["result"])
615 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
616 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC01/yang-ext:mount/"
617 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
618 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
619 headers = {'content-type': 'application/json'}
620 response = requests.request(
621 "GET", url, headers=headers, auth=('admin', 'admin'))
622 self.assertEqual(response.status_code, requests.codes.ok)
623 res = response.json()
624 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
625 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
627 def test_25_get_roadmconnection_ROADMC(self):
628 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
629 "org-openroadm-device:org-openroadm-device/roadm-connections/"
630 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
631 headers = {'content-type': 'application/json'}
632 response = requests.request(
633 "GET", url, headers=headers, auth=('admin', 'admin'))
634 self.assertEqual(response.status_code, requests.codes.ok)
635 res = response.json()
636 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
637 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
639 def test_26_service_power_turndown_XPDRA_XPDRC(self):
640 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
643 "service-name": "test",
647 "dest-tp": "XPDR1-NETWORK1",
648 "src-tp": "XPDR1-CLIENT1",
652 "dest-tp": "DEG1-TTP-TXRX",
653 "src-tp": "SRG1-PP1-TXRX",
654 "node-id": "ROADMA01"
657 "dest-tp": "SRG1-PP1-TXRX",
658 "src-tp": "DEG2-TTP-TXRX",
659 "node-id": "ROADMC01"
662 "dest-tp": "XPDR1-CLIENT1",
663 "src-tp": "XPDR1-NETWORK1",
669 headers = {'content-type': 'application/json'}
670 response = requests.request(
671 "POST", url, data=json.dumps(data),
672 headers=headers, auth=('admin', 'admin'))
673 self.assertEqual(response.status_code, requests.codes.ok)
674 res = response.json()
675 self.assertIn('Success', res["output"]["result"])
677 def test_27_get_roadmconnection_ROADMA(self):
678 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA01/yang-ext:mount/"
679 "org-openroadm-device:org-openroadm-device/roadm-connections/"
680 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
681 headers = {'content-type': 'application/json'}
682 response = requests.request(
683 "GET", url, headers=headers, auth=('admin', 'admin'))
684 self.assertEqual(response.status_code, requests.codes.ok)
685 res = response.json()
686 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
687 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
689 def test_28_get_roadmconnection_ROADMC(self):
690 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
691 "org-openroadm-device:org-openroadm-device/roadm-connections/"
692 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
693 headers = {'content-type': 'application/json'}
694 response = requests.request(
695 "GET", url, headers=headers, auth=('admin', 'admin'))
696 self.assertEqual(response.status_code, requests.codes.ok)
697 res = response.json()
698 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
700 def test_29_servicePath_delete_AToZ(self):
701 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
704 "service-name": "test",
706 "modulation-format": "qpsk",
707 "operation": "delete",
710 "dest-tp": "XPDR1-NETWORK1",
711 "src-tp": "XPDR1-CLIENT1",
715 "dest-tp": "DEG1-TTP-TXRX",
716 "src-tp": "SRG1-PP1-TXRX",
717 "node-id": "ROADMA01"
720 "dest-tp": "SRG1-PP1-TXRX",
721 "src-tp": "DEG2-TTP-TXRX",
722 "node-id": "ROADMC01"
725 "dest-tp": "XPDR1-CLIENT1",
726 "src-tp": "XPDR1-NETWORK1",
732 headers = {'content-type': 'application/json'}
733 response = requests.request(
734 "POST", url, data=json.dumps(data),
735 headers=headers, auth=('admin', 'admin'))
736 self.assertEqual(response.status_code, requests.codes.ok)
737 res = response.json()
738 self.assertIn('Request processed', res["output"]["result"])
741 def test_30_servicePath_delete_ZToA(self):
742 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
745 "service-name": "test",
747 "modulation-format": "qpsk",
748 "operation": "delete",
751 "dest-tp": "XPDR1-NETWORK1",
752 "src-tp": "XPDR1-CLIENT1",
756 "dest-tp": "DEG2-TTP-TXRX",
757 "src-tp": "SRG1-PP1-TXRX",
758 "node-id": "ROADMC01"
761 "src-tp": "DEG1-TTP-TXRX",
762 "dest-tp": "SRG1-PP1-TXRX",
763 "node-id": "ROADMA01"
766 "src-tp": "XPDR1-NETWORK1",
767 "dest-tp": "XPDR1-CLIENT1",
773 headers = {'content-type': 'application/json'}
774 response = requests.request(
775 "POST", url, data=json.dumps(data),
776 headers=headers, auth=('admin', 'admin'))
777 self.assertEqual(response.status_code, requests.codes.ok)
778 res = response.json()
779 self.assertIn('Request processed', res["output"]["result"])
782 """to test case where SRG where the xpdr is connected to has no optical range data"""
784 def test_31_connect_xprdA_to_roadmA(self):
785 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
787 "networkutils:input": {
788 "networkutils:links-input": {
789 "networkutils:xpdr-node": "XPDRA01",
790 "networkutils:xpdr-num": "1",
791 "networkutils:network-num": "2",
792 "networkutils:rdm-node": "ROADMA01",
793 "networkutils:srg-num": "1",
794 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
798 headers = {'content-type': 'application/json'}
799 response = requests.request(
800 "POST", url, data=json.dumps(data),
801 headers=headers, auth=('admin', 'admin'))
802 self.assertEqual(response.status_code, requests.codes.ok)
803 res = response.json()
804 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
806 def test_32_connect_roadmA_to_xpdrA(self):
807 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
809 "networkutils:input": {
810 "networkutils:links-input": {
811 "networkutils:xpdr-node": "XPDRA01",
812 "networkutils:xpdr-num": "1",
813 "networkutils:network-num": "2",
814 "networkutils:rdm-node": "ROADMA01",
815 "networkutils:srg-num": "1",
816 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
820 headers = {'content-type': 'application/json'}
821 response = requests.request(
822 "POST", url, data=json.dumps(data),
823 headers=headers, auth=('admin', 'admin'))
824 self.assertEqual(response.status_code, requests.codes.ok)
825 res = response.json()
826 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
828 def test_33_servicePath_create_AToZ(self):
829 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
832 "service-name": "test2",
834 "modulation-format": "qpsk",
835 "operation": "create",
838 "dest-tp": "XPDR1-NETWORK2",
839 "src-tp": "XPDR1-CLIENT2",
843 "dest-tp": "DEG1-TTP-TXRX",
844 "src-tp": "SRG1-PP2-TXRX",
845 "node-id": "ROADMA01"
850 headers = {'content-type': 'application/json'}
851 response = requests.request(
852 "POST", url, data=json.dumps(data),
853 headers=headers, auth=('admin', 'admin'))
854 self.assertEqual(response.status_code, requests.codes.ok)
855 res = response.json()
856 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
860 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
861 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA01/yang-ext:mount/"
862 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
863 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
864 headers = {'content-type': 'application/json'}
865 response = requests.request(
866 "GET", url, headers=headers, auth=('admin', 'admin'))
867 self.assertEqual(response.status_code, requests.codes.ok)
868 res = response.json()
869 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
870 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
872 def test_35_servicePath_delete_AToZ(self):
873 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
876 "service-name": "test",
878 "modulation-format": "qpsk",
879 "operation": "delete",
882 "dest-tp": "XPDR1-NETWORK2",
883 "src-tp": "XPDR1-CLIENT2",
887 "dest-tp": "DEG1-TTP-TXRX",
888 "src-tp": "SRG1-PP2-TXRX",
889 "node-id": "ROADMA01"
894 headers = {'content-type': 'application/json'}
895 response = requests.request(
896 "POST", url, data=json.dumps(data),
897 headers=headers, auth=('admin', 'admin'))
898 self.assertEqual(response.status_code, requests.codes.ok)
899 res = response.json()
900 self.assertIn('Request processed', res["output"]["result"])
903 def test_36_xpdrA_device_disconnected(self):
904 url = ("{}/config/network-topology:"
905 "network-topology/topology/topology-netconf/node/XPDRA01"
906 .format(self.restconf_baseurl))
907 headers = {'content-type': 'application/json'}
908 response = requests.request(
909 "DELETE", url, headers=headers,
910 auth=('admin', 'admin'))
911 self.assertEqual(response.status_code, requests.codes.ok)
914 def test_37_xpdrC_device_disconnected(self):
915 url = ("{}/config/network-topology:"
916 "network-topology/topology/topology-netconf/node/XPDRC01"
917 .format(self.restconf_baseurl))
918 headers = {'content-type': 'application/json'}
919 response = requests.request(
920 "DELETE", url, headers=headers,
921 auth=('admin', 'admin'))
922 self.assertEqual(response.status_code, requests.codes.ok)
925 def test_38_calculate_span_loss_current(self):
926 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
927 headers = {'content-type': 'application/json'}
928 response = requests.request(
929 "POST", url, headers=headers, auth=('admin', 'admin'))
930 self.assertEqual(response.status_code, requests.codes.ok)
931 res = response.json()
932 self.assertIn('Success',
933 res["output"]["result"])
936 def test_39_rdmA_device_disconnected(self):
937 url = ("{}/config/network-topology:"
938 "network-topology/topology/topology-netconf/node/ROADMA01"
939 .format(self.restconf_baseurl))
940 headers = {'content-type': 'application/json'}
941 response = requests.request(
942 "DELETE", url, headers=headers,
943 auth=('admin', 'admin'))
944 self.assertEqual(response.status_code, requests.codes.ok)
947 def test_40_rdmC_device_disconnected(self):
948 url = ("{}/config/network-topology:"
949 "network-topology/topology/topology-netconf/node/ROADMC01"
950 .format(self.restconf_baseurl))
951 headers = {'content-type': 'application/json'}
952 response = requests.request(
953 "DELETE", url, headers=headers,
954 auth=('admin', 'admin'))
955 self.assertEqual(response.status_code, requests.codes.ok)
959 if __name__ == "__main__":
960 unittest.main(verbosity=2)