3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportOlmTesting(unittest.TestCase):
32 restconf_baseurl = "http://localhost:8181/restconf"
34 # START_IGNORE_XTESTING
38 cls.sim_process1 = test_utils.start_sim('xpdra')
40 cls.sim_process2 = test_utils.start_sim('roadma')
42 cls.sim_process3 = test_utils.start_sim('roadmc')
44 cls.sim_process4 = test_utils.start_sim('xpdrc')
46 print("all sims started")
48 cls.odl_process = test_utils.start_tpce()
50 print("opendaylight started")
53 def tearDownClass(cls):
54 for child in psutil.Process(cls.odl_process.pid).children():
55 child.send_signal(signal.SIGINT)
57 cls.odl_process.send_signal(signal.SIGINT)
58 cls.odl_process.wait()
59 for child in psutil.Process(cls.sim_process1.pid).children():
60 child.send_signal(signal.SIGINT)
62 cls.sim_process1.send_signal(signal.SIGINT)
63 cls.sim_process1.wait()
64 for child in psutil.Process(cls.sim_process2.pid).children():
65 child.send_signal(signal.SIGINT)
67 cls.sim_process2.send_signal(signal.SIGINT)
68 cls.sim_process2.wait()
69 for child in psutil.Process(cls.sim_process3.pid).children():
70 child.send_signal(signal.SIGINT)
72 cls.sim_process3.send_signal(signal.SIGINT)
73 cls.sim_process3.wait()
74 for child in psutil.Process(cls.sim_process4.pid).children():
75 child.send_signal(signal.SIGINT)
77 cls.sim_process4.send_signal(signal.SIGINT)
78 cls.sim_process4.wait()
81 print("execution of {}".format(self.id().split(".")[-1]))
86 def test_01_xpdrA_device_connected(self):
87 url = ("{}/config/network-topology:"
88 "network-topology/topology/topology-netconf/node/XPDR-A1"
89 .format(self.restconf_baseurl))
92 "netconf-node-topology:username": "admin",
93 "netconf-node-topology:password": "admin",
94 "netconf-node-topology:host": "127.0.0.1",
95 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
96 "netconf-node-topology:tcp-only": "false",
97 "netconf-node-topology:pass-through": {}}]}
98 headers = {'content-type': 'application/json'}
99 response = requests.request(
100 "PUT", url, data=json.dumps(data), headers=headers,
101 auth=('admin', 'admin'))
102 self.assertEqual(response.status_code, requests.codes.created)
105 def test_02_xpdrC_device_connected(self):
106 url = ("{}/config/network-topology:"
107 "network-topology/topology/topology-netconf/node/XPDR-C1"
108 .format(self.restconf_baseurl))
110 "node-id": "XPDR-C1",
111 "netconf-node-topology:username": "admin",
112 "netconf-node-topology:password": "admin",
113 "netconf-node-topology:host": "127.0.0.1",
114 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
115 "netconf-node-topology:tcp-only": "false",
116 "netconf-node-topology:pass-through": {}}]}
117 headers = {'content-type': 'application/json'}
118 response = requests.request(
119 "PUT", url, data=json.dumps(data), headers=headers,
120 auth=('admin', 'admin'))
121 self.assertEqual(response.status_code, requests.codes.created)
124 def test_03_rdmA_device_connected(self):
125 url = ("{}/config/network-topology:"
126 "network-topology/topology/topology-netconf/node/ROADM-A1"
127 .format(self.restconf_baseurl))
129 "node-id": "ROADM-A1",
130 "netconf-node-topology:username": "admin",
131 "netconf-node-topology:password": "admin",
132 "netconf-node-topology:host": "127.0.0.1",
133 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
134 "netconf-node-topology:tcp-only": "false",
135 "netconf-node-topology:pass-through": {}}]}
136 headers = {'content-type': 'application/json'}
137 response = requests.request(
138 "PUT", url, data=json.dumps(data), headers=headers,
139 auth=('admin', 'admin'))
140 self.assertEqual(response.status_code, requests.codes.created)
143 def test_04_rdmC_device_connected(self):
144 url = ("{}/config/network-topology:"
145 "network-topology/topology/topology-netconf/node/ROADM-C1"
146 .format(self.restconf_baseurl))
148 "node-id": "ROADM-C1",
149 "netconf-node-topology:username": "admin",
150 "netconf-node-topology:password": "admin",
151 "netconf-node-topology:host": "127.0.0.1",
152 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
153 "netconf-node-topology:tcp-only": "false",
154 "netconf-node-topology:pass-through": {}}]}
155 headers = {'content-type': 'application/json'}
156 response = requests.request(
157 "PUT", url, data=json.dumps(data), headers=headers,
158 auth=('admin', 'admin'))
159 self.assertEqual(response.status_code, requests.codes.created)
162 def test_05_connect_xprdA_to_roadmA(self):
163 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
165 "networkutils:input": {
166 "networkutils:links-input": {
167 "networkutils:xpdr-node": "XPDR-A1",
168 "networkutils:xpdr-num": "1",
169 "networkutils:network-num": "1",
170 "networkutils:rdm-node": "ROADM-A1",
171 "networkutils:srg-num": "1",
172 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
176 headers = {'content-type': 'application/json'}
177 response = requests.request(
178 "POST", url, data=json.dumps(data),
179 headers=headers, auth=('admin', 'admin'))
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
184 def test_06_connect_roadmA_to_xpdrA(self):
185 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
187 "networkutils:input": {
188 "networkutils:links-input": {
189 "networkutils:xpdr-node": "XPDR-A1",
190 "networkutils:xpdr-num": "1",
191 "networkutils:network-num": "1",
192 "networkutils:rdm-node": "ROADM-A1",
193 "networkutils:srg-num": "1",
194 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
198 headers = {'content-type': 'application/json'}
199 response = requests.request(
200 "POST", url, data=json.dumps(data),
201 headers=headers, auth=('admin', 'admin'))
202 self.assertEqual(response.status_code, requests.codes.ok)
203 res = response.json()
204 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
206 def test_07_connect_xprdC_to_roadmC(self):
207 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
209 "networkutils:input": {
210 "networkutils:links-input": {
211 "networkutils:xpdr-node": "XPDR-C1",
212 "networkutils:xpdr-num": "1",
213 "networkutils:network-num": "1",
214 "networkutils:rdm-node": "ROADM-C1",
215 "networkutils:srg-num": "1",
216 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
220 headers = {'content-type': 'application/json'}
221 response = requests.request(
222 "POST", url, data=json.dumps(data),
223 headers=headers, auth=('admin', 'admin'))
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
226 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
228 def test_08_connect_roadmC_to_xpdrC(self):
229 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
231 "networkutils:input": {
232 "networkutils:links-input": {
233 "networkutils:xpdr-node": "XPDR-C1",
234 "networkutils:xpdr-num": "1",
235 "networkutils:network-num": "1",
236 "networkutils:rdm-node": "ROADM-C1",
237 "networkutils:srg-num": "1",
238 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
242 headers = {'content-type': 'application/json'}
243 response = requests.request(
244 "POST", url, data=json.dumps(data),
245 headers=headers, auth=('admin', 'admin'))
246 self.assertEqual(response.status_code, requests.codes.ok)
247 res = response.json()
248 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
250 def test_09_create_OTS_ROADMA(self):
251 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
254 "node-id": "ROADM-A1",
255 "logical-connection-point": "DEG1-TTP-TXRX"
258 headers = {'content-type': 'application/json'}
259 response = requests.request(
260 "POST", url, data=json.dumps(data),
261 headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
266 res["output"]["result"])
268 def test_10_create_OTS_ROADMC(self):
269 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
272 "node-id": "ROADM-C1",
273 "logical-connection-point": "DEG2-TTP-TXRX"
276 headers = {'content-type': 'application/json'}
277 response = requests.request(
278 "POST", url, data=json.dumps(data),
279 headers=headers, auth=('admin', 'admin'))
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
282 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
283 res["output"]["result"])
285 def test_11_get_PM_ROADMA(self):
286 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
289 "node-id": "ROADM-A1",
290 "resource-type": "interface",
291 "granularity": "15min",
292 "resource-identifier": {
293 "resource-name": "OTS-DEG2-TTP-TXRX"
297 headers = {'content-type': 'application/json'}
298 response = requests.request(
299 "POST", url, data=json.dumps(data),
300 headers=headers, auth=('admin', 'admin'))
301 self.assertEqual(response.status_code, requests.codes.ok)
302 res = response.json()
304 "pmparameter-name": "OpticalPowerOutput",
305 "pmparameter-value": "2.5"
306 }, res["output"]["measurements"])
308 "pmparameter-name": "OpticalReturnLoss",
309 "pmparameter-value": "40"
310 }, res["output"]["measurements"])
312 "pmparameter-name": "OpticalPowerInput",
313 "pmparameter-value": "-21.1"
314 }, res["output"]["measurements"])
316 def test_12_get_PM_ROADMC(self):
317 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
320 "node-id": "ROADM-C1",
321 "resource-type": "interface",
322 "granularity": "15min",
323 "resource-identifier": {
324 "resource-name": "OTS-DEG1-TTP-TXRX"
328 headers = {'content-type': 'application/json'}
329 response = requests.request(
330 "POST", url, data=json.dumps(data),
331 headers=headers, auth=('admin', 'admin'))
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
335 "pmparameter-name": "OpticalPowerOutput",
336 "pmparameter-value": "4.6"
337 }, res["output"]["measurements"])
339 "pmparameter-name": "OpticalReturnLoss",
340 "pmparameter-value": "49.1"
341 }, res["output"]["measurements"])
343 "pmparameter-name": "OpticalPowerInput",
344 "pmparameter-value": "-15.1"
345 }, res["output"]["measurements"])
347 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
348 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
352 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
355 headers = {'content-type': 'application/json'}
356 response = requests.request(
357 "POST", url, data=json.dumps(data),
358 headers=headers, auth=('admin', 'admin'))
359 self.assertEqual(response.status_code, requests.codes.ok)
360 res = response.json()
361 self.assertIn('Success',
362 res["output"]["result"])
365 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
366 }, res["output"]["spans"])
369 def test_14_calculate_span_loss_base_all(self):
370 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
376 headers = {'content-type': 'application/json'}
377 response = requests.request(
378 "POST", url, data=json.dumps(data),
379 headers=headers, auth=('admin', 'admin'))
380 self.assertEqual(response.status_code, requests.codes.ok)
381 res = response.json()
382 self.assertIn('Success',
383 res["output"]["result"])
386 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
387 }, res["output"]["spans"])
390 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
391 }, res["output"]["spans"])
394 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
395 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
396 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
397 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
398 headers = {'content-type': 'application/json'}
399 response = requests.request(
400 "GET", url, headers=headers, auth=('admin', 'admin'))
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
403 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
404 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
406 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
407 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
408 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
409 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
410 headers = {'content-type': 'application/json'}
411 response = requests.request(
412 "GET", url, headers=headers, auth=('admin', 'admin'))
413 self.assertEqual(response.status_code, requests.codes.ok)
414 res = response.json()
415 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
416 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
418 def test_17_servicePath_create_AToZ(self):
419 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
422 "service-name": "test",
424 "modulation-format": "qpsk",
425 "operation": "create",
428 "dest-tp": "XPDR1-NETWORK1",
429 "src-tp": "XPDR1-CLIENT1",
433 "dest-tp": "DEG2-TTP-TXRX",
434 "src-tp": "SRG1-PP1-TXRX",
435 "node-id": "ROADM-A1"
438 "dest-tp": "SRG1-PP1-TXRX",
439 "src-tp": "DEG1-TTP-TXRX",
440 "node-id": "ROADM-C1"
443 "dest-tp": "XPDR1-CLIENT1",
444 "src-tp": "XPDR1-NETWORK1",
450 headers = {'content-type': 'application/json'}
451 response = requests.request(
452 "POST", url, data=json.dumps(data),
453 headers=headers, auth=('admin', 'admin'))
454 self.assertEqual(response.status_code, requests.codes.ok)
455 res = response.json()
456 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
460 def test_18_servicePath_create_ZToA(self):
461 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
464 "service-name": "test",
466 "modulation-format": "qpsk",
467 "operation": "create",
470 "dest-tp": "XPDR1-NETWORK1",
471 "src-tp": "XPDR1-CLIENT1",
475 "dest-tp": "DEG1-TTP-TXRX",
476 "src-tp": "SRG1-PP1-TXRX",
477 "node-id": "ROADM-C1"
480 "src-tp": "DEG2-TTP-TXRX",
481 "dest-tp": "SRG1-PP1-TXRX",
482 "node-id": "ROADM-A1"
485 "src-tp": "XPDR1-NETWORK1",
486 "dest-tp": "XPDR1-CLIENT1",
492 headers = {'content-type': 'application/json'}
493 response = requests.request(
494 "POST", url, data=json.dumps(data),
495 headers=headers, auth=('admin', 'admin'))
496 self.assertEqual(response.status_code, requests.codes.ok)
497 res = response.json()
498 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
502 def test_19_service_power_setup_XPDRA_XPDRC(self):
503 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
506 "service-name": "test",
510 "dest-tp": "XPDR1-NETWORK1",
511 "src-tp": "XPDR1-CLIENT1",
515 "dest-tp": "DEG2-TTP-TXRX",
516 "src-tp": "SRG1-PP1-TXRX",
517 "node-id": "ROADM-A1"
520 "dest-tp": "SRG1-PP1-TXRX",
521 "src-tp": "DEG1-TTP-TXRX",
522 "node-id": "ROADM-C1"
525 "dest-tp": "XPDR1-CLIENT1",
526 "src-tp": "XPDR1-NETWORK1",
532 headers = {'content-type': 'application/json'}
533 response = requests.request(
534 "POST", url, data=json.dumps(data),
535 headers=headers, auth=('admin', 'admin'))
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 self.assertIn('Success', res["output"]["result"])
540 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
541 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
542 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
543 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
544 headers = {'content-type': 'application/json'}
545 response = requests.request(
546 "GET", url, headers=headers, auth=('admin', 'admin'))
547 self.assertEqual(response.status_code, requests.codes.ok)
548 res = response.json()
549 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
550 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
552 def test_21_get_roadmconnection_ROADMA(self):
553 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
554 "org-openroadm-device:org-openroadm-device/roadm-connections/"
555 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
556 headers = {'content-type': 'application/json'}
557 response = requests.request(
558 "GET", url, headers=headers, auth=('admin', 'admin'))
559 self.assertEqual(response.status_code, requests.codes.ok)
560 res = response.json()
561 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
562 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
564 def test_22_get_roadmconnection_ROADMC(self):
565 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
566 "org-openroadm-device:org-openroadm-device/roadm-connections/"
567 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
568 headers = {'content-type': 'application/json'}
569 response = requests.request(
570 "GET", url, headers=headers, auth=('admin', 'admin'))
571 self.assertEqual(response.status_code, requests.codes.ok)
572 res = response.json()
573 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
575 def test_23_service_power_setup_XPDRC_XPDRA(self):
576 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
579 "service-name": "test",
583 "dest-tp": "XPDR1-NETWORK1",
584 "src-tp": "XPDR1-CLIENT1",
588 "dest-tp": "DEG1-TTP-TXRX",
589 "src-tp": "SRG1-PP1-TXRX",
590 "node-id": "ROADM-C1"
593 "src-tp": "DEG2-TTP-TXRX",
594 "dest-tp": "SRG1-PP1-TXRX",
595 "node-id": "ROADM-A1"
598 "src-tp": "XPDR1-NETWORK1",
599 "dest-tp": "XPDR1-CLIENT1",
605 headers = {'content-type': 'application/json'}
606 response = requests.request(
607 "POST", url, data=json.dumps(data),
608 headers=headers, auth=('admin', 'admin'))
609 self.assertEqual(response.status_code, requests.codes.ok)
610 res = response.json()
611 self.assertIn('Success', res["output"]["result"])
613 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
614 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
615 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
616 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
617 headers = {'content-type': 'application/json'}
618 response = requests.request(
619 "GET", url, headers=headers, auth=('admin', 'admin'))
620 self.assertEqual(response.status_code, requests.codes.ok)
621 res = response.json()
622 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
623 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
625 def test_25_get_roadmconnection_ROADMC(self):
626 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
627 "org-openroadm-device:org-openroadm-device/roadm-connections/"
628 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
629 headers = {'content-type': 'application/json'}
630 response = requests.request(
631 "GET", url, headers=headers, auth=('admin', 'admin'))
632 self.assertEqual(response.status_code, requests.codes.ok)
633 res = response.json()
634 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
635 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
637 def test_26_service_power_turndown_XPDRA_XPDRC(self):
638 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
641 "service-name": "test",
645 "dest-tp": "XPDR1-NETWORK1",
646 "src-tp": "XPDR1-CLIENT1",
650 "dest-tp": "DEG2-TTP-TXRX",
651 "src-tp": "SRG1-PP1-TXRX",
652 "node-id": "ROADM-A1"
655 "dest-tp": "SRG1-PP1-TXRX",
656 "src-tp": "DEG1-TTP-TXRX",
657 "node-id": "ROADM-C1"
660 "dest-tp": "XPDR1-CLIENT1",
661 "src-tp": "XPDR1-NETWORK1",
667 headers = {'content-type': 'application/json'}
668 response = requests.request(
669 "POST", url, data=json.dumps(data),
670 headers=headers, auth=('admin', 'admin'))
671 self.assertEqual(response.status_code, requests.codes.ok)
672 res = response.json()
673 self.assertIn('Success', res["output"]["result"])
675 def test_27_get_roadmconnection_ROADMA(self):
676 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
677 "org-openroadm-device:org-openroadm-device/roadm-connections/"
678 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
679 headers = {'content-type': 'application/json'}
680 response = requests.request(
681 "GET", url, headers=headers, auth=('admin', 'admin'))
682 self.assertEqual(response.status_code, requests.codes.ok)
683 res = response.json()
684 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
685 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
687 def test_28_get_roadmconnection_ROADMC(self):
688 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
689 "org-openroadm-device:org-openroadm-device/roadm-connections/"
690 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
691 headers = {'content-type': 'application/json'}
692 response = requests.request(
693 "GET", url, headers=headers, auth=('admin', 'admin'))
694 self.assertEqual(response.status_code, requests.codes.ok)
695 res = response.json()
696 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
698 def test_29_servicePath_delete_AToZ(self):
699 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
702 "service-name": "test",
704 "modulation-format": "qpsk",
705 "operation": "delete",
708 "dest-tp": "XPDR1-NETWORK1",
709 "src-tp": "XPDR1-CLIENT1",
713 "dest-tp": "DEG2-TTP-TXRX",
714 "src-tp": "SRG1-PP1-TXRX",
715 "node-id": "ROADM-A1"
718 "dest-tp": "SRG1-PP1-TXRX",
719 "src-tp": "DEG1-TTP-TXRX",
720 "node-id": "ROADM-C1"
723 "dest-tp": "XPDR1-CLIENT1",
724 "src-tp": "XPDR1-NETWORK1",
730 headers = {'content-type': 'application/json'}
731 response = requests.request(
732 "POST", url, data=json.dumps(data),
733 headers=headers, auth=('admin', 'admin'))
734 self.assertEqual(response.status_code, requests.codes.ok)
735 res = response.json()
736 self.assertIn('Request processed', res["output"]["result"])
739 def test_30_servicePath_delete_ZToA(self):
740 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
743 "service-name": "test",
745 "modulation-format": "qpsk",
746 "operation": "delete",
749 "dest-tp": "XPDR1-NETWORK1",
750 "src-tp": "XPDR1-CLIENT1",
754 "dest-tp": "DEG1-TTP-TXRX",
755 "src-tp": "SRG1-PP1-TXRX",
756 "node-id": "ROADM-C1"
759 "src-tp": "DEG2-TTP-TXRX",
760 "dest-tp": "SRG1-PP1-TXRX",
761 "node-id": "ROADM-A1"
764 "src-tp": "XPDR1-NETWORK1",
765 "dest-tp": "XPDR1-CLIENT1",
771 headers = {'content-type': 'application/json'}
772 response = requests.request(
773 "POST", url, data=json.dumps(data),
774 headers=headers, auth=('admin', 'admin'))
775 self.assertEqual(response.status_code, requests.codes.ok)
776 res = response.json()
777 self.assertIn('Request processed', res["output"]["result"])
780 """to test case where SRG where the xpdr is connected to has no optical range data"""
782 def test_31_connect_xprdA_to_roadmA(self):
783 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
785 "networkutils:input": {
786 "networkutils:links-input": {
787 "networkutils:xpdr-node": "XPDR-A1",
788 "networkutils:xpdr-num": "1",
789 "networkutils:network-num": "2",
790 "networkutils:rdm-node": "ROADM-A1",
791 "networkutils:srg-num": "1",
792 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
796 headers = {'content-type': 'application/json'}
797 response = requests.request(
798 "POST", url, data=json.dumps(data),
799 headers=headers, auth=('admin', 'admin'))
800 self.assertEqual(response.status_code, requests.codes.ok)
801 res = response.json()
802 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
804 def test_32_connect_roadmA_to_xpdrA(self):
805 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
807 "networkutils:input": {
808 "networkutils:links-input": {
809 "networkutils:xpdr-node": "XPDR-A1",
810 "networkutils:xpdr-num": "1",
811 "networkutils:network-num": "2",
812 "networkutils:rdm-node": "ROADM-A1",
813 "networkutils:srg-num": "1",
814 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
818 headers = {'content-type': 'application/json'}
819 response = requests.request(
820 "POST", url, data=json.dumps(data),
821 headers=headers, auth=('admin', 'admin'))
822 self.assertEqual(response.status_code, requests.codes.ok)
823 res = response.json()
824 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
826 def test_33_servicePath_create_AToZ(self):
827 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
830 "service-name": "test2",
832 "modulation-format": "qpsk",
833 "operation": "create",
836 "dest-tp": "XPDR1-NETWORK2",
837 "src-tp": "XPDR1-CLIENT2",
841 "dest-tp": "DEG2-TTP-TXRX",
842 "src-tp": "SRG1-PP2-TXRX",
843 "node-id": "ROADM-A1"
848 headers = {'content-type': 'application/json'}
849 response = requests.request(
850 "POST", url, data=json.dumps(data),
851 headers=headers, auth=('admin', 'admin'))
852 self.assertEqual(response.status_code, requests.codes.ok)
853 res = response.json()
854 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
858 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
859 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
860 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
861 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
862 headers = {'content-type': 'application/json'}
863 response = requests.request(
864 "GET", url, headers=headers, auth=('admin', 'admin'))
865 self.assertEqual(response.status_code, requests.codes.ok)
866 res = response.json()
867 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
868 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
870 def test_35_servicePath_delete_AToZ(self):
871 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
874 "service-name": "test",
876 "modulation-format": "qpsk",
877 "operation": "delete",
880 "dest-tp": "XPDR1-NETWORK2",
881 "src-tp": "XPDR1-CLIENT2",
885 "dest-tp": "DEG2-TTP-TXRX",
886 "src-tp": "SRG1-PP2-TXRX",
887 "node-id": "ROADM-A1"
892 headers = {'content-type': 'application/json'}
893 response = requests.request(
894 "POST", url, data=json.dumps(data),
895 headers=headers, auth=('admin', 'admin'))
896 self.assertEqual(response.status_code, requests.codes.ok)
897 res = response.json()
898 self.assertIn('Request processed', res["output"]["result"])
901 def test_36_xpdrA_device_disconnected(self):
902 url = ("{}/config/network-topology:"
903 "network-topology/topology/topology-netconf/node/XPDR-A1"
904 .format(self.restconf_baseurl))
905 headers = {'content-type': 'application/json'}
906 response = requests.request(
907 "DELETE", url, headers=headers,
908 auth=('admin', 'admin'))
909 self.assertEqual(response.status_code, requests.codes.ok)
912 def test_37_xpdrC_device_disconnected(self):
913 url = ("{}/config/network-topology:"
914 "network-topology/topology/topology-netconf/node/XPDR-C1"
915 .format(self.restconf_baseurl))
916 headers = {'content-type': 'application/json'}
917 response = requests.request(
918 "DELETE", url, headers=headers,
919 auth=('admin', 'admin'))
920 self.assertEqual(response.status_code, requests.codes.ok)
923 def test_38_calculate_span_loss_current(self):
924 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
925 headers = {'content-type': 'application/json'}
926 response = requests.request(
927 "POST", url, headers=headers, auth=('admin', 'admin'))
928 self.assertEqual(response.status_code, requests.codes.ok)
929 res = response.json()
930 self.assertIn('Success',
931 res["output"]["result"])
934 def test_39_rdmA_device_disconnected(self):
935 url = ("{}/config/network-topology:"
936 "network-topology/topology/topology-netconf/node/ROADM-A1"
937 .format(self.restconf_baseurl))
938 headers = {'content-type': 'application/json'}
939 response = requests.request(
940 "DELETE", url, headers=headers,
941 auth=('admin', 'admin'))
942 self.assertEqual(response.status_code, requests.codes.ok)
945 def test_40_rdmC_device_disconnected(self):
946 url = ("{}/config/network-topology:"
947 "network-topology/topology/topology-netconf/node/ROADM-C1"
948 .format(self.restconf_baseurl))
949 headers = {'content-type': 'application/json'}
950 response = requests.request(
951 "DELETE", url, headers=headers,
952 auth=('admin', 'admin'))
953 self.assertEqual(response.status_code, requests.codes.ok)
957 if __name__ == "__main__":
958 unittest.main(verbosity=2)