3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
22 from common import test_utils
25 class TransportOlmTesting(unittest.TestCase):
28 restconf_baseurl = "http://localhost:8181/restconf"
32 cls.processes = test_utils.start_tpce()
33 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
36 def tearDownClass(cls):
37 for process in cls.processes:
38 test_utils.shutdown_process(process)
39 print("all processes killed")
42 print("execution of {}".format(self.id().split(".")[-1]))
45 def test_01_xpdrA_device_connected(self):
46 url = ("{}/config/network-topology:"
47 "network-topology/topology/topology-netconf/node/XPDRA01"
48 .format(self.restconf_baseurl))
51 "netconf-node-topology:username": "admin",
52 "netconf-node-topology:password": "admin",
53 "netconf-node-topology:host": "127.0.0.1",
54 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
55 "netconf-node-topology:tcp-only": "false",
56 "netconf-node-topology:pass-through": {}}]}
57 headers = {'content-type': 'application/json'}
58 response = requests.request(
59 "PUT", url, data=json.dumps(data), headers=headers,
60 auth=('admin', 'admin'))
61 self.assertEqual(response.status_code, requests.codes.created)
64 def test_02_xpdrC_device_connected(self):
65 url = ("{}/config/network-topology:"
66 "network-topology/topology/topology-netconf/node/XPDRC01"
67 .format(self.restconf_baseurl))
70 "netconf-node-topology:username": "admin",
71 "netconf-node-topology:password": "admin",
72 "netconf-node-topology:host": "127.0.0.1",
73 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
74 "netconf-node-topology:tcp-only": "false",
75 "netconf-node-topology:pass-through": {}}]}
76 headers = {'content-type': 'application/json'}
77 response = requests.request(
78 "PUT", url, data=json.dumps(data), headers=headers,
79 auth=('admin', 'admin'))
80 self.assertEqual(response.status_code, requests.codes.created)
83 def test_03_rdmA_device_connected(self):
84 url = ("{}/config/network-topology:"
85 "network-topology/topology/topology-netconf/node/ROADMA01"
86 .format(self.restconf_baseurl))
88 "node-id": "ROADMA01",
89 "netconf-node-topology:username": "admin",
90 "netconf-node-topology:password": "admin",
91 "netconf-node-topology:host": "127.0.0.1",
92 "netconf-node-topology:port": test_utils.sims['roadma-full']['port'],
93 "netconf-node-topology:tcp-only": "false",
94 "netconf-node-topology:pass-through": {}}]}
95 headers = {'content-type': 'application/json'}
96 response = requests.request(
97 "PUT", url, data=json.dumps(data), headers=headers,
98 auth=('admin', 'admin'))
99 self.assertEqual(response.status_code, requests.codes.created)
102 def test_04_rdmC_device_connected(self):
103 url = ("{}/config/network-topology:"
104 "network-topology/topology/topology-netconf/node/ROADMC01"
105 .format(self.restconf_baseurl))
107 "node-id": "ROADMC01",
108 "netconf-node-topology:username": "admin",
109 "netconf-node-topology:password": "admin",
110 "netconf-node-topology:host": "127.0.0.1",
111 "netconf-node-topology:port": test_utils.sims['roadmc-full']['port'],
112 "netconf-node-topology:tcp-only": "false",
113 "netconf-node-topology:pass-through": {}}]}
114 headers = {'content-type': 'application/json'}
115 response = requests.request(
116 "PUT", url, data=json.dumps(data), headers=headers,
117 auth=('admin', 'admin'))
118 self.assertEqual(response.status_code, requests.codes.created)
121 def test_05_connect_xprdA_to_roadmA(self):
122 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
124 "networkutils:input": {
125 "networkutils:links-input": {
126 "networkutils:xpdr-node": "XPDRA01",
127 "networkutils:xpdr-num": "1",
128 "networkutils:network-num": "1",
129 "networkutils:rdm-node": "ROADMA01",
130 "networkutils:srg-num": "1",
131 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
135 headers = {'content-type': 'application/json'}
136 response = requests.request(
137 "POST", url, data=json.dumps(data),
138 headers=headers, auth=('admin', 'admin'))
139 self.assertEqual(response.status_code, requests.codes.ok)
140 res = response.json()
141 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
143 def test_06_connect_roadmA_to_xpdrA(self):
144 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
146 "networkutils:input": {
147 "networkutils:links-input": {
148 "networkutils:xpdr-node": "XPDRA01",
149 "networkutils:xpdr-num": "1",
150 "networkutils:network-num": "1",
151 "networkutils:rdm-node": "ROADMA01",
152 "networkutils:srg-num": "1",
153 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
157 headers = {'content-type': 'application/json'}
158 response = requests.request(
159 "POST", url, data=json.dumps(data),
160 headers=headers, auth=('admin', 'admin'))
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
165 def test_07_connect_xprdC_to_roadmC(self):
166 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
168 "networkutils:input": {
169 "networkutils:links-input": {
170 "networkutils:xpdr-node": "XPDRC01",
171 "networkutils:xpdr-num": "1",
172 "networkutils:network-num": "1",
173 "networkutils:rdm-node": "ROADMC01",
174 "networkutils:srg-num": "1",
175 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "POST", url, data=json.dumps(data),
182 headers=headers, auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
187 def test_08_connect_roadmC_to_xpdrC(self):
188 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
190 "networkutils:input": {
191 "networkutils:links-input": {
192 "networkutils:xpdr-node": "XPDRC01",
193 "networkutils:xpdr-num": "1",
194 "networkutils:network-num": "1",
195 "networkutils:rdm-node": "ROADMC01",
196 "networkutils:srg-num": "1",
197 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
201 headers = {'content-type': 'application/json'}
202 response = requests.request(
203 "POST", url, data=json.dumps(data),
204 headers=headers, auth=('admin', 'admin'))
205 self.assertEqual(response.status_code, requests.codes.ok)
206 res = response.json()
207 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
209 def test_09_create_OTS_ROADMA(self):
210 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
213 "node-id": "ROADMA01",
214 "logical-connection-point": "DEG1-TTP-TXRX"
217 headers = {'content-type': 'application/json'}
218 response = requests.request(
219 "POST", url, data=json.dumps(data),
220 headers=headers, auth=('admin', 'admin'))
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA01',
224 res["output"]["result"])
226 def test_10_create_OTS_ROADMC(self):
227 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
230 "node-id": "ROADMC01",
231 "logical-connection-point": "DEG2-TTP-TXRX"
234 headers = {'content-type': 'application/json'}
235 response = requests.request(
236 "POST", url, data=json.dumps(data),
237 headers=headers, auth=('admin', 'admin'))
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
240 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC01',
241 res["output"]["result"])
243 def test_11_get_PM_ROADMA(self):
244 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
247 "node-id": "ROADMA01",
248 "resource-type": "interface",
249 "granularity": "15min",
250 "resource-identifier": {
251 "resource-name": "OTS-DEG1-TTP-TXRX"
255 headers = {'content-type': 'application/json'}
256 response = requests.request(
257 "POST", url, data=json.dumps(data),
258 headers=headers, auth=('admin', 'admin'))
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
262 "pmparameter-name": "OpticalPowerOutput",
263 "pmparameter-value": "2.5"
264 }, res["output"]["measurements"])
266 "pmparameter-name": "OpticalReturnLoss",
267 "pmparameter-value": "49.9"
268 }, res["output"]["measurements"])
270 "pmparameter-name": "OpticalPowerInput",
271 "pmparameter-value": "3"
272 }, res["output"]["measurements"])
274 def test_12_get_PM_ROADMC(self):
275 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
278 "node-id": "ROADMC01",
279 "resource-type": "interface",
280 "granularity": "15min",
281 "resource-identifier": {
282 "resource-name": "OTS-DEG2-TTP-TXRX"
286 headers = {'content-type': 'application/json'}
287 response = requests.request(
288 "POST", url, data=json.dumps(data),
289 headers=headers, auth=('admin', 'admin'))
290 self.assertEqual(response.status_code, requests.codes.ok)
291 res = response.json()
293 "pmparameter-name": "OpticalPowerOutput",
294 "pmparameter-value": "18.1"
295 }, res["output"]["measurements"])
297 "pmparameter-name": "OpticalReturnLoss",
298 "pmparameter-value": "48.8"
299 }, res["output"]["measurements"])
301 "pmparameter-name": "OpticalPowerInput",
302 "pmparameter-value": "-3.2"
303 }, res["output"]["measurements"])
305 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
306 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
310 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
313 headers = {'content-type': 'application/json'}
314 response = requests.request(
315 "POST", url, data=json.dumps(data),
316 headers=headers, auth=('admin', 'admin'))
317 self.assertEqual(response.status_code, requests.codes.ok)
318 res = response.json()
319 self.assertIn('Success',
320 res["output"]["result"])
323 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
324 }, res["output"]["spans"])
327 def test_14_calculate_span_loss_base_all(self):
328 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
334 headers = {'content-type': 'application/json'}
335 response = requests.request(
336 "POST", url, data=json.dumps(data),
337 headers=headers, auth=('admin', 'admin'))
338 self.assertEqual(response.status_code, requests.codes.ok)
339 res = response.json()
340 self.assertIn('Success',
341 res["output"]["result"])
344 "link-id": "ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX"
345 }, res["output"]["spans"])
348 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
349 }, res["output"]["spans"])
352 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
353 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
354 "node/ROADMA01/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
355 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
356 headers = {'content-type': 'application/json'}
357 response = requests.request(
358 "GET", url, headers=headers, auth=('admin', 'admin'))
359 self.assertEqual(response.status_code, requests.codes.ok)
360 res = response.json()
361 self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
362 self.assertEqual(15.1, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
364 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
365 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
366 "node/ROADMC01/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
367 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
368 headers = {'content-type': 'application/json'}
369 response = requests.request(
370 "GET", url, headers=headers, auth=('admin', 'admin'))
371 self.assertEqual(response.status_code, requests.codes.ok)
372 res = response.json()
373 self.assertEqual(15.1, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
374 self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
376 def test_17_servicePath_create_AToZ(self):
377 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
380 "service-name": "test",
382 "modulation-format": "qpsk",
383 "operation": "create",
386 "dest-tp": "XPDR1-NETWORK1",
387 "src-tp": "XPDR1-CLIENT1",
391 "dest-tp": "DEG1-TTP-TXRX",
392 "src-tp": "SRG1-PP1-TXRX",
393 "node-id": "ROADMA01"
396 "dest-tp": "SRG1-PP1-TXRX",
397 "src-tp": "DEG2-TTP-TXRX",
398 "node-id": "ROADMC01"
401 "dest-tp": "XPDR1-CLIENT1",
402 "src-tp": "XPDR1-NETWORK1",
408 headers = {'content-type': 'application/json'}
409 response = requests.request(
410 "POST", url, data=json.dumps(data),
411 headers=headers, auth=('admin', 'admin'))
412 self.assertEqual(response.status_code, requests.codes.ok)
413 res = response.json()
414 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
418 def test_18_servicePath_create_ZToA(self):
419 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
422 "service-name": "test",
424 "modulation-format": "qpsk",
425 "operation": "create",
428 "dest-tp": "XPDR1-NETWORK1",
429 "src-tp": "XPDR1-CLIENT1",
433 "dest-tp": "DEG2-TTP-TXRX",
434 "src-tp": "SRG1-PP1-TXRX",
435 "node-id": "ROADMC01"
438 "src-tp": "DEG1-TTP-TXRX",
439 "dest-tp": "SRG1-PP1-TXRX",
440 "node-id": "ROADMA01"
443 "src-tp": "XPDR1-NETWORK1",
444 "dest-tp": "XPDR1-CLIENT1",
450 headers = {'content-type': 'application/json'}
451 response = requests.request(
452 "POST", url, data=json.dumps(data),
453 headers=headers, auth=('admin', 'admin'))
454 self.assertEqual(response.status_code, requests.codes.ok)
455 res = response.json()
456 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
460 def test_19_service_power_setup_XPDRA_XPDRC(self):
461 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
464 "service-name": "test",
468 "dest-tp": "XPDR1-NETWORK1",
469 "src-tp": "XPDR1-CLIENT1",
473 "dest-tp": "DEG1-TTP-TXRX",
474 "src-tp": "SRG1-PP1-TXRX",
475 "node-id": "ROADMA01"
478 "dest-tp": "SRG1-PP1-TXRX",
479 "src-tp": "DEG2-TTP-TXRX",
480 "node-id": "ROADMC01"
483 "dest-tp": "XPDR1-CLIENT1",
484 "src-tp": "XPDR1-NETWORK1",
490 headers = {'content-type': 'application/json'}
491 response = requests.request(
492 "POST", url, data=json.dumps(data),
493 headers=headers, auth=('admin', 'admin'))
494 self.assertEqual(response.status_code, requests.codes.ok)
495 res = response.json()
496 self.assertIn('Success', res["output"]["result"])
498 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
499 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA01/yang-ext:mount/"
500 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
501 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
502 headers = {'content-type': 'application/json'}
503 response = requests.request(
504 "GET", url, headers=headers, auth=('admin', 'admin'))
505 self.assertEqual(response.status_code, requests.codes.ok)
506 res = response.json()
507 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
508 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
510 def test_21_get_roadmconnection_ROADMA(self):
511 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA01/yang-ext:mount/"
512 "org-openroadm-device:org-openroadm-device/roadm-connections/"
513 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
514 headers = {'content-type': 'application/json'}
515 response = requests.request(
516 "GET", url, headers=headers, auth=('admin', 'admin'))
517 self.assertEqual(response.status_code, requests.codes.ok)
518 res = response.json()
519 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
520 self.assertEqual(-3.3, res['roadm-connections'][0]['target-output-power'])
522 def test_22_get_roadmconnection_ROADMC(self):
523 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
524 "org-openroadm-device:org-openroadm-device/roadm-connections/"
525 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
526 headers = {'content-type': 'application/json'}
527 response = requests.request(
528 "GET", url, headers=headers, auth=('admin', 'admin'))
529 self.assertEqual(response.status_code, requests.codes.ok)
530 res = response.json()
531 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
533 def test_23_service_power_setup_XPDRC_XPDRA(self):
534 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
537 "service-name": "test",
541 "dest-tp": "XPDR1-NETWORK1",
542 "src-tp": "XPDR1-CLIENT1",
546 "dest-tp": "DEG2-TTP-TXRX",
547 "src-tp": "SRG1-PP1-TXRX",
548 "node-id": "ROADMC01"
551 "src-tp": "DEG1-TTP-TXRX",
552 "dest-tp": "SRG1-PP1-TXRX",
553 "node-id": "ROADMA01"
556 "src-tp": "XPDR1-NETWORK1",
557 "dest-tp": "XPDR1-CLIENT1",
563 headers = {'content-type': 'application/json'}
564 response = requests.request(
565 "POST", url, data=json.dumps(data),
566 headers=headers, auth=('admin', 'admin'))
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 self.assertIn('Success', res["output"]["result"])
571 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
572 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRC01/yang-ext:mount/"
573 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
574 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
575 headers = {'content-type': 'application/json'}
576 response = requests.request(
577 "GET", url, headers=headers, auth=('admin', 'admin'))
578 self.assertEqual(response.status_code, requests.codes.ok)
579 res = response.json()
580 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
581 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
583 def test_25_get_roadmconnection_ROADMC(self):
584 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
585 "org-openroadm-device:org-openroadm-device/roadm-connections/"
586 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
587 headers = {'content-type': 'application/json'}
588 response = requests.request(
589 "GET", url, headers=headers, auth=('admin', 'admin'))
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
593 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
595 def test_26_service_power_turndown_XPDRA_XPDRC(self):
596 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
599 "service-name": "test",
603 "dest-tp": "XPDR1-NETWORK1",
604 "src-tp": "XPDR1-CLIENT1",
608 "dest-tp": "DEG1-TTP-TXRX",
609 "src-tp": "SRG1-PP1-TXRX",
610 "node-id": "ROADMA01"
613 "dest-tp": "SRG1-PP1-TXRX",
614 "src-tp": "DEG2-TTP-TXRX",
615 "node-id": "ROADMC01"
618 "dest-tp": "XPDR1-CLIENT1",
619 "src-tp": "XPDR1-NETWORK1",
625 headers = {'content-type': 'application/json'}
626 response = requests.request(
627 "POST", url, data=json.dumps(data),
628 headers=headers, auth=('admin', 'admin'))
629 self.assertEqual(response.status_code, requests.codes.ok)
630 res = response.json()
631 self.assertIn('Success', res["output"]["result"])
633 def test_27_get_roadmconnection_ROADMA(self):
634 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMA01/yang-ext:mount/"
635 "org-openroadm-device:org-openroadm-device/roadm-connections/"
636 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
637 headers = {'content-type': 'application/json'}
638 response = requests.request(
639 "GET", url, headers=headers, auth=('admin', 'admin'))
640 self.assertEqual(response.status_code, requests.codes.ok)
641 res = response.json()
642 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
643 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
645 def test_28_get_roadmconnection_ROADMC(self):
646 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADMC01/yang-ext:mount/"
647 "org-openroadm-device:org-openroadm-device/roadm-connections/"
648 "DEG2-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
649 headers = {'content-type': 'application/json'}
650 response = requests.request(
651 "GET", url, headers=headers, auth=('admin', 'admin'))
652 self.assertEqual(response.status_code, requests.codes.ok)
653 res = response.json()
654 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
656 def test_29_servicePath_delete_AToZ(self):
657 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
660 "service-name": "test",
662 "modulation-format": "qpsk",
663 "operation": "delete",
666 "dest-tp": "XPDR1-NETWORK1",
667 "src-tp": "XPDR1-CLIENT1",
671 "dest-tp": "DEG1-TTP-TXRX",
672 "src-tp": "SRG1-PP1-TXRX",
673 "node-id": "ROADMA01"
676 "dest-tp": "SRG1-PP1-TXRX",
677 "src-tp": "DEG2-TTP-TXRX",
678 "node-id": "ROADMC01"
681 "dest-tp": "XPDR1-CLIENT1",
682 "src-tp": "XPDR1-NETWORK1",
688 headers = {'content-type': 'application/json'}
689 response = requests.request(
690 "POST", url, data=json.dumps(data),
691 headers=headers, auth=('admin', 'admin'))
692 self.assertEqual(response.status_code, requests.codes.ok)
693 res = response.json()
694 self.assertIn('Request processed', res["output"]["result"])
697 def test_30_servicePath_delete_ZToA(self):
698 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
701 "service-name": "test",
703 "modulation-format": "qpsk",
704 "operation": "delete",
707 "dest-tp": "XPDR1-NETWORK1",
708 "src-tp": "XPDR1-CLIENT1",
712 "dest-tp": "DEG2-TTP-TXRX",
713 "src-tp": "SRG1-PP1-TXRX",
714 "node-id": "ROADMC01"
717 "src-tp": "DEG1-TTP-TXRX",
718 "dest-tp": "SRG1-PP1-TXRX",
719 "node-id": "ROADMA01"
722 "src-tp": "XPDR1-NETWORK1",
723 "dest-tp": "XPDR1-CLIENT1",
729 headers = {'content-type': 'application/json'}
730 response = requests.request(
731 "POST", url, data=json.dumps(data),
732 headers=headers, auth=('admin', 'admin'))
733 self.assertEqual(response.status_code, requests.codes.ok)
734 res = response.json()
735 self.assertIn('Request processed', res["output"]["result"])
738 """to test case where SRG where the xpdr is connected to has no optical range data"""
740 def test_31_connect_xprdA_to_roadmA(self):
741 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
743 "networkutils:input": {
744 "networkutils:links-input": {
745 "networkutils:xpdr-node": "XPDRA01",
746 "networkutils:xpdr-num": "1",
747 "networkutils:network-num": "2",
748 "networkutils:rdm-node": "ROADMA01",
749 "networkutils:srg-num": "1",
750 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
754 headers = {'content-type': 'application/json'}
755 response = requests.request(
756 "POST", url, data=json.dumps(data),
757 headers=headers, auth=('admin', 'admin'))
758 self.assertEqual(response.status_code, requests.codes.ok)
759 res = response.json()
760 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
762 def test_32_connect_roadmA_to_xpdrA(self):
763 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
765 "networkutils:input": {
766 "networkutils:links-input": {
767 "networkutils:xpdr-node": "XPDRA01",
768 "networkutils:xpdr-num": "1",
769 "networkutils:network-num": "2",
770 "networkutils:rdm-node": "ROADMA01",
771 "networkutils:srg-num": "1",
772 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
776 headers = {'content-type': 'application/json'}
777 response = requests.request(
778 "POST", url, data=json.dumps(data),
779 headers=headers, auth=('admin', 'admin'))
780 self.assertEqual(response.status_code, requests.codes.ok)
781 res = response.json()
782 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
784 def test_33_servicePath_create_AToZ(self):
785 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
788 "service-name": "test2",
790 "modulation-format": "qpsk",
791 "operation": "create",
794 "dest-tp": "XPDR1-NETWORK2",
795 "src-tp": "XPDR1-CLIENT2",
799 "dest-tp": "DEG1-TTP-TXRX",
800 "src-tp": "SRG1-PP2-TXRX",
801 "node-id": "ROADMA01"
806 headers = {'content-type': 'application/json'}
807 response = requests.request(
808 "POST", url, data=json.dumps(data),
809 headers=headers, auth=('admin', 'admin'))
810 self.assertEqual(response.status_code, requests.codes.ok)
811 res = response.json()
812 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
816 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
817 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDRA01/yang-ext:mount/"
818 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
819 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
820 headers = {'content-type': 'application/json'}
821 response = requests.request(
822 "GET", url, headers=headers, auth=('admin', 'admin'))
823 self.assertEqual(response.status_code, requests.codes.ok)
824 res = response.json()
825 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
826 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
828 def test_35_servicePath_delete_AToZ(self):
829 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
832 "service-name": "test",
834 "modulation-format": "qpsk",
835 "operation": "delete",
838 "dest-tp": "XPDR1-NETWORK2",
839 "src-tp": "XPDR1-CLIENT2",
843 "dest-tp": "DEG1-TTP-TXRX",
844 "src-tp": "SRG1-PP2-TXRX",
845 "node-id": "ROADMA01"
850 headers = {'content-type': 'application/json'}
851 response = requests.request(
852 "POST", url, data=json.dumps(data),
853 headers=headers, auth=('admin', 'admin'))
854 self.assertEqual(response.status_code, requests.codes.ok)
855 res = response.json()
856 self.assertIn('Request processed', res["output"]["result"])
859 def test_36_xpdrA_device_disconnected(self):
860 url = ("{}/config/network-topology:"
861 "network-topology/topology/topology-netconf/node/XPDRA01"
862 .format(self.restconf_baseurl))
863 headers = {'content-type': 'application/json'}
864 response = requests.request(
865 "DELETE", url, headers=headers,
866 auth=('admin', 'admin'))
867 self.assertEqual(response.status_code, requests.codes.ok)
870 def test_37_xpdrC_device_disconnected(self):
871 url = ("{}/config/network-topology:"
872 "network-topology/topology/topology-netconf/node/XPDRC01"
873 .format(self.restconf_baseurl))
874 headers = {'content-type': 'application/json'}
875 response = requests.request(
876 "DELETE", url, headers=headers,
877 auth=('admin', 'admin'))
878 self.assertEqual(response.status_code, requests.codes.ok)
881 def test_38_calculate_span_loss_current(self):
882 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
883 headers = {'content-type': 'application/json'}
884 response = requests.request(
885 "POST", url, headers=headers, auth=('admin', 'admin'))
886 self.assertEqual(response.status_code, requests.codes.ok)
887 res = response.json()
888 self.assertIn('Success',
889 res["output"]["result"])
892 def test_39_rdmA_device_disconnected(self):
893 url = ("{}/config/network-topology:"
894 "network-topology/topology/topology-netconf/node/ROADMA01"
895 .format(self.restconf_baseurl))
896 headers = {'content-type': 'application/json'}
897 response = requests.request(
898 "DELETE", url, headers=headers,
899 auth=('admin', 'admin'))
900 self.assertEqual(response.status_code, requests.codes.ok)
903 def test_40_rdmC_device_disconnected(self):
904 url = ("{}/config/network-topology:"
905 "network-topology/topology/topology-netconf/node/ROADMC01"
906 .format(self.restconf_baseurl))
907 headers = {'content-type': 'application/json'}
908 response = requests.request(
909 "DELETE", url, headers=headers,
910 auth=('admin', 'admin'))
911 self.assertEqual(response.status_code, requests.codes.ok)
915 if __name__ == "__main__":
916 unittest.main(verbosity=2)