3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
25 class TransportOlmTesting(unittest.TestCase):
27 honeynode_process1 = None
28 honeynode_process2 = None
29 honeynode_process3 = None
30 honeynode_process4 = None
32 restconf_baseurl = "http://localhost:8181/restconf"
34 #START_IGNORE_XTESTING
38 print ("starting honeynode1...")
39 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
42 print ("starting honeynode2...")
43 cls.honeynode_process2 = test_utils.start_roadma_honeynode()
46 print ("starting honeynode3...")
47 cls.honeynode_process3 = test_utils.start_roadmc_honeynode()
50 print ("starting honeynode4...")
51 cls.honeynode_process4 = test_utils.start_xpdrc_honeynode()
54 print ("all honeynodes started")
56 print ("starting opendaylight...")
57 cls.odl_process = test_utils.start_tpce()
59 print ("opendaylight started")
62 def tearDownClass(cls):
63 for child in psutil.Process(cls.odl_process.pid).children():
64 child.send_signal(signal.SIGINT)
66 cls.odl_process.send_signal(signal.SIGINT)
67 cls.odl_process.wait()
68 for child in psutil.Process(cls.honeynode_process1.pid).children():
69 child.send_signal(signal.SIGINT)
71 cls.honeynode_process1.send_signal(signal.SIGINT)
72 cls.honeynode_process1.wait()
73 for child in psutil.Process(cls.honeynode_process2.pid).children():
74 child.send_signal(signal.SIGINT)
76 cls.honeynode_process2.send_signal(signal.SIGINT)
77 cls.honeynode_process2.wait()
78 for child in psutil.Process(cls.honeynode_process3.pid).children():
79 child.send_signal(signal.SIGINT)
81 cls.honeynode_process3.send_signal(signal.SIGINT)
82 cls.honeynode_process3.wait()
83 for child in psutil.Process(cls.honeynode_process4.pid).children():
84 child.send_signal(signal.SIGINT)
86 cls.honeynode_process4.send_signal(signal.SIGINT)
87 cls.honeynode_process4.wait()
90 print ("execution of {}".format(self.id().split(".")[-1]))
95 def test_01_xpdrA_device_connected(self):
96 url = ("{}/config/network-topology:"
97 "network-topology/topology/topology-netconf/node/XPDR-A1"
98 .format(self.restconf_baseurl))
100 "node-id": "XPDR-A1",
101 "netconf-node-topology:username": "admin",
102 "netconf-node-topology:password": "admin",
103 "netconf-node-topology:host": "127.0.0.1",
104 "netconf-node-topology:port": "17840",
105 "netconf-node-topology:tcp-only": "false",
106 "netconf-node-topology:pass-through": {}}]}
107 headers = {'content-type': 'application/json'}
108 response = requests.request(
109 "PUT", url, data=json.dumps(data), headers=headers,
110 auth=('admin', 'admin'))
111 self.assertEqual(response.status_code, requests.codes.created)
114 def test_02_xpdrC_device_connected(self):
115 url = ("{}/config/network-topology:"
116 "network-topology/topology/topology-netconf/node/XPDR-C1"
117 .format(self.restconf_baseurl))
119 "node-id": "XPDR-C1",
120 "netconf-node-topology:username": "admin",
121 "netconf-node-topology:password": "admin",
122 "netconf-node-topology:host": "127.0.0.1",
123 "netconf-node-topology:port": "17844",
124 "netconf-node-topology:tcp-only": "false",
125 "netconf-node-topology:pass-through": {}}]}
126 headers = {'content-type': 'application/json'}
127 response = requests.request(
128 "PUT", url, data=json.dumps(data), headers=headers,
129 auth=('admin', 'admin'))
130 self.assertEqual(response.status_code, requests.codes.created)
133 def test_03_rdmA_device_connected(self):
134 url = ("{}/config/network-topology:"
135 "network-topology/topology/topology-netconf/node/ROADM-A1"
136 .format(self.restconf_baseurl))
138 "node-id": "ROADM-A1",
139 "netconf-node-topology:username": "admin",
140 "netconf-node-topology:password": "admin",
141 "netconf-node-topology:host": "127.0.0.1",
142 "netconf-node-topology:port": "17841",
143 "netconf-node-topology:tcp-only": "false",
144 "netconf-node-topology:pass-through": {}}]}
145 headers = {'content-type': 'application/json'}
146 response = requests.request(
147 "PUT", url, data=json.dumps(data), headers=headers,
148 auth=('admin', 'admin'))
149 self.assertEqual(response.status_code, requests.codes.created)
152 def test_04_rdmC_device_connected(self):
153 url = ("{}/config/network-topology:"
154 "network-topology/topology/topology-netconf/node/ROADM-C1"
155 .format(self.restconf_baseurl))
157 "node-id": "ROADM-C1",
158 "netconf-node-topology:username": "admin",
159 "netconf-node-topology:password": "admin",
160 "netconf-node-topology:host": "127.0.0.1",
161 "netconf-node-topology:port": "17843",
162 "netconf-node-topology:tcp-only": "false",
163 "netconf-node-topology:pass-through": {}}]}
164 headers = {'content-type': 'application/json'}
165 response = requests.request(
166 "PUT", url, data=json.dumps(data), headers=headers,
167 auth=('admin', 'admin'))
168 self.assertEqual(response.status_code, requests.codes.created)
171 def test_05_connect_xprdA_to_roadmA(self):
172 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
174 "networkutils:input": {
175 "networkutils:links-input": {
176 "networkutils:xpdr-node": "XPDR-A1",
177 "networkutils:xpdr-num": "1",
178 "networkutils:network-num": "1",
179 "networkutils:rdm-node": "ROADM-A1",
180 "networkutils:srg-num": "1",
181 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
185 headers = {'content-type': 'application/json'}
186 response = requests.request(
187 "POST", url, data=json.dumps(data),
188 headers=headers, auth=('admin', 'admin'))
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
193 def test_06_connect_roadmA_to_xpdrA(self):
194 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
196 "networkutils:input": {
197 "networkutils:links-input": {
198 "networkutils:xpdr-node": "XPDR-A1",
199 "networkutils:xpdr-num": "1",
200 "networkutils:network-num": "1",
201 "networkutils:rdm-node": "ROADM-A1",
202 "networkutils:srg-num": "1",
203 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
207 headers = {'content-type': 'application/json'}
208 response = requests.request(
209 "POST", url, data=json.dumps(data),
210 headers=headers, auth=('admin', 'admin'))
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
213 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
215 def test_07_connect_xprdC_to_roadmC(self):
216 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
218 "networkutils:input": {
219 "networkutils:links-input": {
220 "networkutils:xpdr-node": "XPDR-C1",
221 "networkutils:xpdr-num": "1",
222 "networkutils:network-num": "1",
223 "networkutils:rdm-node": "ROADM-C1",
224 "networkutils:srg-num": "1",
225 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
229 headers = {'content-type': 'application/json'}
230 response = requests.request(
231 "POST", url, data=json.dumps(data),
232 headers=headers, auth=('admin', 'admin'))
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
237 def test_08_connect_roadmC_to_xpdrC(self):
238 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
240 "networkutils:input": {
241 "networkutils:links-input": {
242 "networkutils:xpdr-node": "XPDR-C1",
243 "networkutils:xpdr-num": "1",
244 "networkutils:network-num": "1",
245 "networkutils:rdm-node": "ROADM-C1",
246 "networkutils:srg-num": "1",
247 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
251 headers = {'content-type': 'application/json'}
252 response = requests.request(
253 "POST", url, data=json.dumps(data),
254 headers=headers, auth=('admin', 'admin'))
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
259 def test_09_create_OTS_ROADMA(self):
260 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
263 "node-id" : "ROADM-A1",
264 "logical-connection-point" : "DEG1-TTP-TXRX"
267 headers = {'content-type': 'application/json'}
268 response = requests.request(
269 "POST", url, data=json.dumps(data),
270 headers=headers, auth=('admin', 'admin'))
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
275 res["output"]["result"])
277 def test_10_create_OTS_ROADMC(self):
278 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
281 "node-id" : "ROADM-C1",
282 "logical-connection-point" : "DEG2-TTP-TXRX"
285 headers = {'content-type': 'application/json'}
286 response = requests.request(
287 "POST", url, data=json.dumps(data),
288 headers=headers, auth=('admin', 'admin'))
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
292 res["output"]["result"])
294 def test_11_get_PM_ROADMA(self):
295 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
298 "node-id": "ROADM-A1",
299 "resource-type": "interface",
300 "granularity": "15min",
301 "resource-identifier": {
302 "resource-name" : "OTS-DEG2-TTP-TXRX"
306 headers = {'content-type': 'application/json'}
307 response = requests.request(
308 "POST", url, data=json.dumps(data),
309 headers=headers, auth=('admin', 'admin'))
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
313 "pmparameter-name": "OpticalPowerOutput",
314 "pmparameter-value": "2.5"
315 }, res["output"]["measurements"])
317 "pmparameter-name": "OpticalReturnLoss",
318 "pmparameter-value": "40"
319 }, res["output"]["measurements"])
321 "pmparameter-name": "OpticalPowerInput",
322 "pmparameter-value": "-21.1"
323 }, res["output"]["measurements"])
325 def test_12_get_PM_ROADMC(self):
326 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
329 "node-id": "ROADM-C1",
330 "resource-type": "interface",
331 "granularity": "15min",
332 "resource-identifier": {
333 "resource-name" : "OTS-DEG1-TTP-TXRX"
337 headers = {'content-type': 'application/json'}
338 response = requests.request(
339 "POST", url, data=json.dumps(data),
340 headers=headers, auth=('admin', 'admin'))
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
344 "pmparameter-name": "OpticalPowerOutput",
345 "pmparameter-value": "4.6"
346 }, res["output"]["measurements"])
348 "pmparameter-name": "OpticalReturnLoss",
349 "pmparameter-value": "49.1"
350 }, res["output"]["measurements"])
352 "pmparameter-name": "OpticalPowerInput",
353 "pmparameter-value": "-15.1"
354 }, res["output"]["measurements"])
356 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
357 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
361 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
364 headers = {'content-type': 'application/json'}
365 response = requests.request(
366 "POST", url, data=json.dumps(data),
367 headers=headers, auth=('admin', 'admin'))
368 self.assertEqual(response.status_code, requests.codes.ok)
369 res = response.json()
370 self.assertIn('Success',
371 res["output"]["result"])
374 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
375 }, res["output"]["spans"])
378 def test_14_calculate_span_loss_base_all(self):
379 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
385 headers = {'content-type': 'application/json'}
386 response = requests.request(
387 "POST", url, data=json.dumps(data),
388 headers=headers, auth=('admin', 'admin'))
389 self.assertEqual(response.status_code, requests.codes.ok)
390 res = response.json()
391 self.assertIn('Success',
392 res["output"]["result"])
395 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
396 }, res["output"]["spans"])
399 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
400 }, res["output"]["spans"])
403 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
404 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
405 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
406 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
407 headers = {'content-type': 'application/json'}
408 response = requests.request(
409 "GET", url, headers=headers, auth=('admin', 'admin'))
410 self.assertEqual(response.status_code, requests.codes.ok)
411 res = response.json()
412 self.assertEqual(18, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
413 self.assertEqual(26, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
415 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
416 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
417 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
418 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
419 headers = {'content-type': 'application/json'}
420 response = requests.request(
421 "GET", url, headers=headers, auth=('admin', 'admin'))
422 self.assertEqual(response.status_code, requests.codes.ok)
423 res = response.json()
424 self.assertEqual(26, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
425 self.assertEqual(18, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
427 def test_17_servicePath_create_AToZ(self):
428 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
431 "service-name": "test",
433 "modulation-format": "qpsk",
434 "operation": "create",
437 "dest-tp": "XPDR1-NETWORK1",
438 "src-tp": "XPDR1-CLIENT1",
442 "dest-tp": "DEG2-TTP-TXRX",
443 "src-tp": "SRG1-PP1-TXRX",
444 "node-id": "ROADM-A1"
447 "dest-tp": "SRG1-PP1-TXRX",
448 "src-tp": "DEG1-TTP-TXRX",
449 "node-id": "ROADM-C1"
452 "dest-tp": "XPDR1-CLIENT1",
453 "src-tp": "XPDR1-NETWORK1",
459 headers = {'content-type': 'application/json'}
460 response = requests.request(
461 "POST", url, data=json.dumps(data),
462 headers=headers, auth=('admin', 'admin'))
463 self.assertEqual(response.status_code, requests.codes.ok)
464 res = response.json()
465 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
469 def test_18_servicePath_create_ZToA(self):
470 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
473 "service-name": "test",
475 "modulation-format": "qpsk",
476 "operation": "create",
479 "dest-tp": "XPDR1-NETWORK1",
480 "src-tp": "XPDR1-CLIENT1",
484 "dest-tp": "DEG1-TTP-TXRX",
485 "src-tp": "SRG1-PP1-TXRX",
486 "node-id": "ROADM-C1"
489 "src-tp": "DEG2-TTP-TXRX",
490 "dest-tp": "SRG1-PP1-TXRX",
491 "node-id": "ROADM-A1"
494 "src-tp": "XPDR1-NETWORK1",
495 "dest-tp": "XPDR1-CLIENT1",
501 headers = {'content-type': 'application/json'}
502 response = requests.request(
503 "POST", url, data=json.dumps(data),
504 headers=headers, auth=('admin', 'admin'))
505 self.assertEqual(response.status_code, requests.codes.ok)
506 res = response.json()
507 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
511 def test_19_service_power_setup_XPDRA_XPDRC(self):
512 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
515 "service-name": "test",
519 "dest-tp": "XPDR1-NETWORK1",
520 "src-tp": "XPDR1-CLIENT1",
524 "dest-tp": "DEG2-TTP-TXRX",
525 "src-tp": "SRG1-PP1-TXRX",
526 "node-id": "ROADM-A1"
529 "dest-tp": "SRG1-PP1-TXRX",
530 "src-tp": "DEG1-TTP-TXRX",
531 "node-id": "ROADM-C1"
534 "dest-tp": "XPDR1-CLIENT1",
535 "src-tp": "XPDR1-NETWORK1",
541 headers = {'content-type': 'application/json'}
542 response = requests.request(
543 "POST", url, data=json.dumps(data),
544 headers=headers, auth=('admin', 'admin'))
545 self.assertEqual(response.status_code, requests.codes.ok)
546 res = response.json()
547 self.assertIn('Success', res["output"]["result"])
549 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
550 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
551 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
552 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
553 headers = {'content-type': 'application/json'}
554 response = requests.request(
555 "GET", url, headers=headers, auth=('admin', 'admin'))
556 self.assertEqual(response.status_code, requests.codes.ok)
557 res = response.json()
558 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
559 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
561 def test_21_get_roadmconnection_ROADMA(self):
562 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
563 "org-openroadm-device:org-openroadm-device/roadm-connections/"
564 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
565 headers = {'content-type': 'application/json'}
566 response = requests.request(
567 "GET", url, headers=headers, auth=('admin', 'admin'))
568 self.assertEqual(response.status_code, requests.codes.ok)
569 res = response.json()
570 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
571 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
573 def test_22_get_roadmconnection_ROADMC(self):
574 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
575 "org-openroadm-device:org-openroadm-device/roadm-connections/"
576 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
577 headers = {'content-type': 'application/json'}
578 response = requests.request(
579 "GET", url, headers=headers, auth=('admin', 'admin'))
580 self.assertEqual(response.status_code, requests.codes.ok)
581 res = response.json()
582 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
584 def test_23_service_power_setup_XPDRC_XPDRA(self):
585 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
588 "service-name": "test",
592 "dest-tp": "XPDR1-NETWORK1",
593 "src-tp": "XPDR1-CLIENT1",
597 "dest-tp": "DEG1-TTP-TXRX",
598 "src-tp": "SRG1-PP1-TXRX",
599 "node-id": "ROADM-C1"
602 "src-tp": "DEG2-TTP-TXRX",
603 "dest-tp": "SRG1-PP1-TXRX",
604 "node-id": "ROADM-A1"
607 "src-tp": "XPDR1-NETWORK1",
608 "dest-tp": "XPDR1-CLIENT1",
614 headers = {'content-type': 'application/json'}
615 response = requests.request(
616 "POST", url, data=json.dumps(data),
617 headers=headers, auth=('admin', 'admin'))
618 self.assertEqual(response.status_code, requests.codes.ok)
619 res = response.json()
620 self.assertIn('Success', res["output"]["result"])
622 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
623 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
624 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
625 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
626 headers = {'content-type': 'application/json'}
627 response = requests.request(
628 "GET", url, headers=headers, auth=('admin', 'admin'))
629 self.assertEqual(response.status_code, requests.codes.ok)
630 res = response.json()
631 self.assertEqual(-5 , res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
632 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
634 def test_25_get_roadmconnection_ROADMC(self):
635 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
636 "org-openroadm-device:org-openroadm-device/roadm-connections/"
637 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
638 headers = {'content-type': 'application/json'}
639 response = requests.request(
640 "GET", url, headers=headers, auth=('admin', 'admin'))
641 self.assertEqual(response.status_code, requests.codes.ok)
642 res = response.json()
643 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
644 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
646 def test_26_service_power_turndown_XPDRA_XPDRC(self):
647 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
650 "service-name": "test",
654 "dest-tp": "XPDR1-NETWORK1",
655 "src-tp": "XPDR1-CLIENT1",
659 "dest-tp": "DEG2-TTP-TXRX",
660 "src-tp": "SRG1-PP1-TXRX",
661 "node-id": "ROADM-A1"
664 "dest-tp": "SRG1-PP1-TXRX",
665 "src-tp": "DEG1-TTP-TXRX",
666 "node-id": "ROADM-C1"
669 "dest-tp": "XPDR1-CLIENT1",
670 "src-tp": "XPDR1-NETWORK1",
676 headers = {'content-type': 'application/json'}
677 response = requests.request(
678 "POST", url, data=json.dumps(data),
679 headers=headers, auth=('admin', 'admin'))
680 self.assertEqual(response.status_code, requests.codes.ok)
681 res = response.json()
682 self.assertIn('Success', res["output"]["result"])
684 def test_27_get_roadmconnection_ROADMA(self):
685 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
686 "org-openroadm-device:org-openroadm-device/roadm-connections/"
687 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
688 headers = {'content-type': 'application/json'}
689 response = requests.request(
690 "GET", url, headers=headers, auth=('admin', 'admin'))
691 self.assertEqual(response.status_code, requests.codes.ok)
692 res = response.json()
693 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
694 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
696 def test_28_get_roadmconnection_ROADMC(self):
697 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
698 "org-openroadm-device:org-openroadm-device/roadm-connections/"
699 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
700 headers = {'content-type': 'application/json'}
701 response = requests.request(
702 "GET", url, headers=headers, auth=('admin', 'admin'))
703 self.assertEqual(response.status_code, requests.codes.ok)
704 res = response.json()
705 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
707 def test_29_servicePath_delete_AToZ(self):
708 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
711 "service-name": "test",
713 "modulation-format": "qpsk",
714 "operation": "delete",
717 "dest-tp": "XPDR1-NETWORK1",
718 "src-tp": "XPDR1-CLIENT1",
722 "dest-tp": "DEG2-TTP-TXRX",
723 "src-tp": "SRG1-PP1-TXRX",
724 "node-id": "ROADM-A1"
727 "dest-tp": "SRG1-PP1-TXRX",
728 "src-tp": "DEG1-TTP-TXRX",
729 "node-id": "ROADM-C1"
732 "dest-tp": "XPDR1-CLIENT1",
733 "src-tp": "XPDR1-NETWORK1",
739 headers = {'content-type': 'application/json'}
740 response = requests.request(
741 "POST", url, data=json.dumps(data),
742 headers=headers, auth=('admin', 'admin'))
743 self.assertEqual(response.status_code, requests.codes.ok)
744 res = response.json()
745 self.assertIn('Request processed', res["output"]["result"])
748 def test_30_servicePath_delete_ZToA(self):
749 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
752 "service-name": "test",
754 "modulation-format": "qpsk",
755 "operation": "delete",
758 "dest-tp": "XPDR1-NETWORK1",
759 "src-tp": "XPDR1-CLIENT1",
763 "dest-tp": "DEG1-TTP-TXRX",
764 "src-tp": "SRG1-PP1-TXRX",
765 "node-id": "ROADM-C1"
768 "src-tp": "DEG2-TTP-TXRX",
769 "dest-tp": "SRG1-PP1-TXRX",
770 "node-id": "ROADM-A1"
773 "src-tp": "XPDR1-NETWORK1",
774 "dest-tp": "XPDR1-CLIENT1",
780 headers = {'content-type': 'application/json'}
781 response = requests.request(
782 "POST", url, data=json.dumps(data),
783 headers=headers, auth=('admin', 'admin'))
784 self.assertEqual(response.status_code, requests.codes.ok)
785 res = response.json()
786 self.assertIn('Request processed', res["output"]["result"])
789 """to test case where SRG where the xpdr is connected to has no optical range data"""
791 def test_31_connect_xprdA_to_roadmA(self):
792 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
794 "networkutils:input": {
795 "networkutils:links-input": {
796 "networkutils:xpdr-node": "XPDR-A1",
797 "networkutils:xpdr-num": "1",
798 "networkutils:network-num": "2",
799 "networkutils:rdm-node": "ROADM-A1",
800 "networkutils:srg-num": "1",
801 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
805 headers = {'content-type': 'application/json'}
806 response = requests.request(
807 "POST", url, data=json.dumps(data),
808 headers=headers, auth=('admin', 'admin'))
809 self.assertEqual(response.status_code, requests.codes.ok)
810 res = response.json()
811 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
813 def test_32_connect_roadmA_to_xpdrA(self):
814 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
816 "networkutils:input": {
817 "networkutils:links-input": {
818 "networkutils:xpdr-node": "XPDR-A1",
819 "networkutils:xpdr-num": "1",
820 "networkutils:network-num": "2",
821 "networkutils:rdm-node": "ROADM-A1",
822 "networkutils:srg-num": "1",
823 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
827 headers = {'content-type': 'application/json'}
828 response = requests.request(
829 "POST", url, data=json.dumps(data),
830 headers=headers, auth=('admin', 'admin'))
831 self.assertEqual(response.status_code, requests.codes.ok)
832 res = response.json()
833 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
835 def test_33_servicePath_create_AToZ(self):
836 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
839 "service-name": "test2",
841 "modulation-format": "qpsk",
842 "operation": "create",
845 "dest-tp": "XPDR1-NETWORK2",
846 "src-tp": "XPDR1-CLIENT2",
850 "dest-tp": "DEG2-TTP-TXRX",
851 "src-tp": "SRG1-PP2-TXRX",
852 "node-id": "ROADM-A1"
857 headers = {'content-type': 'application/json'}
858 response = requests.request(
859 "POST", url, data=json.dumps(data),
860 headers=headers, auth=('admin', 'admin'))
861 self.assertEqual(response.status_code, requests.codes.ok)
862 res = response.json()
863 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
867 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
868 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
869 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
870 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
871 headers = {'content-type': 'application/json'}
872 response = requests.request(
873 "GET", url, headers=headers, auth=('admin', 'admin'))
874 self.assertEqual(response.status_code, requests.codes.ok)
875 res = response.json()
876 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
877 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
879 def test_35_servicePath_delete_AToZ(self):
880 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
883 "service-name": "test",
885 "modulation-format": "qpsk",
886 "operation": "delete",
889 "dest-tp": "XPDR1-NETWORK2",
890 "src-tp": "XPDR1-CLIENT2",
894 "dest-tp": "DEG2-TTP-TXRX",
895 "src-tp": "SRG1-PP2-TXRX",
896 "node-id": "ROADM-A1"
901 headers = {'content-type': 'application/json'}
902 response = requests.request(
903 "POST", url, data=json.dumps(data),
904 headers=headers, auth=('admin', 'admin'))
905 self.assertEqual(response.status_code, requests.codes.ok)
906 res = response.json()
907 self.assertIn('Request processed', res["output"]["result"])
910 def test_36_xpdrA_device_disconnected(self):
911 url = ("{}/config/network-topology:"
912 "network-topology/topology/topology-netconf/node/XPDR-A1"
913 .format(self.restconf_baseurl))
914 headers = {'content-type': 'application/json'}
915 response = requests.request(
916 "DELETE", url, headers=headers,
917 auth=('admin', 'admin'))
918 self.assertEqual(response.status_code, requests.codes.ok)
921 def test_37_xpdrC_device_disconnected(self):
922 url = ("{}/config/network-topology:"
923 "network-topology/topology/topology-netconf/node/XPDR-C1"
924 .format(self.restconf_baseurl))
925 headers = {'content-type': 'application/json'}
926 response = requests.request(
927 "DELETE", url, headers=headers,
928 auth=('admin', 'admin'))
929 self.assertEqual(response.status_code, requests.codes.ok)
932 def test_38_calculate_span_loss_current(self):
933 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
934 headers = {'content-type': 'application/json'}
935 response = requests.request(
936 "POST", url, headers=headers, auth=('admin', 'admin'))
937 self.assertEqual(response.status_code, requests.codes.ok)
938 res = response.json()
939 self.assertIn('Success',
940 res["output"]["result"])
943 def test_39_rdmA_device_disconnected(self):
944 url = ("{}/config/network-topology:"
945 "network-topology/topology/topology-netconf/node/ROADM-A1"
946 .format(self.restconf_baseurl))
947 headers = {'content-type': 'application/json'}
948 response = requests.request(
949 "DELETE", url, headers=headers,
950 auth=('admin', 'admin'))
951 self.assertEqual(response.status_code, requests.codes.ok)
954 def test_40_rdmC_device_disconnected(self):
955 url = ("{}/config/network-topology:"
956 "network-topology/topology/topology-netconf/node/ROADM-C1"
957 .format(self.restconf_baseurl))
958 headers = {'content-type': 'application/json'}
959 response = requests.request(
960 "DELETE", url, headers=headers,
961 auth=('admin', 'admin'))
962 self.assertEqual(response.status_code, requests.codes.ok)
965 if __name__ == "__main__":
966 unittest.main(verbosity=2)