3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
22 from common import test_utils
25 class TransportOlmTesting(unittest.TestCase):
31 cls.processes = test_utils.start_tpce()
32 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
35 def tearDownClass(cls):
36 for process in cls.processes:
37 test_utils.shutdown_process(process)
38 print("all processes killed")
41 print("execution of {}".format(self.id().split(".")[-1]))
44 def test_01_xpdrA_device_connected(self):
45 response = test_utils.mount_device("XPDR-A1", 'xpdra')
46 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
48 def test_02_xpdrC_device_connected(self):
49 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
50 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
52 def test_03_rdmA_device_connected(self):
53 response = test_utils.mount_device("ROADM-A1", 'roadma')
54 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
56 def test_04_rdmC_device_connected(self):
57 response = test_utils.mount_device("ROADM-C1", 'roadmc')
58 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
60 def test_05_connect_xprdA_to_roadmA(self):
61 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
63 "networkutils:input": {
64 "networkutils:links-input": {
65 "networkutils:xpdr-node": "XPDR-A1",
66 "networkutils:xpdr-num": "1",
67 "networkutils:network-num": "1",
68 "networkutils:rdm-node": "ROADM-A1",
69 "networkutils:srg-num": "1",
70 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
74 headers = {'content-type': 'application/json'}
75 response = requests.request(
76 "POST", url, data=json.dumps(data),
77 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
78 self.assertEqual(response.status_code, requests.codes.ok)
80 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
82 def test_06_connect_roadmA_to_xpdrA(self):
83 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
85 "networkutils:input": {
86 "networkutils:links-input": {
87 "networkutils:xpdr-node": "XPDR-A1",
88 "networkutils:xpdr-num": "1",
89 "networkutils:network-num": "1",
90 "networkutils:rdm-node": "ROADM-A1",
91 "networkutils:srg-num": "1",
92 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
96 headers = {'content-type': 'application/json'}
97 response = requests.request(
98 "POST", url, data=json.dumps(data),
99 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
100 self.assertEqual(response.status_code, requests.codes.ok)
101 res = response.json()
102 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
104 def test_07_connect_xprdC_to_roadmC(self):
105 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
107 "networkutils:input": {
108 "networkutils:links-input": {
109 "networkutils:xpdr-node": "XPDR-C1",
110 "networkutils:xpdr-num": "1",
111 "networkutils:network-num": "1",
112 "networkutils:rdm-node": "ROADM-C1",
113 "networkutils:srg-num": "1",
114 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
118 headers = {'content-type': 'application/json'}
119 response = requests.request(
120 "POST", url, data=json.dumps(data),
121 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
124 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
126 def test_08_connect_roadmC_to_xpdrC(self):
127 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
129 "networkutils:input": {
130 "networkutils:links-input": {
131 "networkutils:xpdr-node": "XPDR-C1",
132 "networkutils:xpdr-num": "1",
133 "networkutils:network-num": "1",
134 "networkutils:rdm-node": "ROADM-C1",
135 "networkutils:srg-num": "1",
136 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
140 headers = {'content-type': 'application/json'}
141 response = requests.request(
142 "POST", url, data=json.dumps(data),
143 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
144 self.assertEqual(response.status_code, requests.codes.ok)
145 res = response.json()
146 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
148 def test_09_create_OTS_ROADMA(self):
149 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(test_utils.RESTCONF_BASE_URL)
152 "node-id": "ROADM-A1",
153 "logical-connection-point": "DEG1-TTP-TXRX"
156 headers = {'content-type': 'application/json'}
157 response = requests.request(
158 "POST", url, data=json.dumps(data),
159 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
164 res["output"]["result"])
166 def test_10_create_OTS_ROADMC(self):
167 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(test_utils.RESTCONF_BASE_URL)
170 "node-id": "ROADM-C1",
171 "logical-connection-point": "DEG2-TTP-TXRX"
174 headers = {'content-type': 'application/json'}
175 response = requests.request(
176 "POST", url, data=json.dumps(data),
177 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
181 res["output"]["result"])
183 def test_11_get_PM_ROADMA(self):
184 url = "{}/operations/transportpce-olm:get-pm".format(test_utils.RESTCONF_BASE_URL)
187 "node-id": "ROADM-A1",
188 "resource-type": "interface",
189 "granularity": "15min",
190 "resource-identifier": {
191 "resource-name": "OTS-DEG2-TTP-TXRX"
195 headers = {'content-type': 'application/json'}
196 response = requests.request(
197 "POST", url, data=json.dumps(data),
198 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
202 "pmparameter-name": "OpticalPowerOutput",
203 "pmparameter-value": "2.5"
204 }, res["output"]["measurements"])
206 "pmparameter-name": "OpticalReturnLoss",
207 "pmparameter-value": "40"
208 }, res["output"]["measurements"])
210 "pmparameter-name": "OpticalPowerInput",
211 "pmparameter-value": "-21.1"
212 }, res["output"]["measurements"])
214 def test_12_get_PM_ROADMC(self):
215 url = "{}/operations/transportpce-olm:get-pm".format(test_utils.RESTCONF_BASE_URL)
218 "node-id": "ROADM-C1",
219 "resource-type": "interface",
220 "granularity": "15min",
221 "resource-identifier": {
222 "resource-name": "OTS-DEG1-TTP-TXRX"
226 headers = {'content-type': 'application/json'}
227 response = requests.request(
228 "POST", url, data=json.dumps(data),
229 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
233 "pmparameter-name": "OpticalPowerOutput",
234 "pmparameter-value": "4.6"
235 }, res["output"]["measurements"])
237 "pmparameter-name": "OpticalReturnLoss",
238 "pmparameter-value": "49.1"
239 }, res["output"]["measurements"])
241 "pmparameter-name": "OpticalPowerInput",
242 "pmparameter-value": "-15.1"
243 }, res["output"]["measurements"])
245 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
246 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(test_utils.RESTCONF_BASE_URL)
250 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
253 headers = {'content-type': 'application/json'}
254 response = requests.request(
255 "POST", url, data=json.dumps(data),
256 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
257 self.assertEqual(response.status_code, requests.codes.ok)
258 res = response.json()
259 self.assertIn('Success',
260 res["output"]["result"])
263 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
264 }, res["output"]["spans"])
267 def test_14_calculate_span_loss_base_all(self):
268 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(test_utils.RESTCONF_BASE_URL)
274 headers = {'content-type': 'application/json'}
275 response = requests.request(
276 "POST", url, data=json.dumps(data),
277 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
280 self.assertIn('Success',
281 res["output"]["result"])
284 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
285 }, res["output"]["spans"])
288 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
289 }, res["output"]["spans"])
292 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
293 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
294 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
295 "org-openroadm-optical-transport-interfaces:ots".format(test_utils.RESTCONF_BASE_URL))
296 headers = {'content-type': 'application/json'}
297 response = requests.request(
298 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
302 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
304 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
305 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
306 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
307 "org-openroadm-optical-transport-interfaces:ots".format(test_utils.RESTCONF_BASE_URL))
308 headers = {'content-type': 'application/json'}
309 response = requests.request(
310 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
311 self.assertEqual(response.status_code, requests.codes.ok)
312 res = response.json()
313 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
314 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
316 def test_17_servicePath_create_AToZ(self):
317 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
320 "service-name": "test",
322 "modulation-format": "qpsk",
323 "operation": "create",
326 "dest-tp": "XPDR1-NETWORK1",
327 "src-tp": "XPDR1-CLIENT1",
331 "dest-tp": "DEG2-TTP-TXRX",
332 "src-tp": "SRG1-PP1-TXRX",
333 "node-id": "ROADM-A1"
336 "dest-tp": "SRG1-PP1-TXRX",
337 "src-tp": "DEG1-TTP-TXRX",
338 "node-id": "ROADM-C1"
341 "dest-tp": "XPDR1-CLIENT1",
342 "src-tp": "XPDR1-NETWORK1",
348 headers = {'content-type': 'application/json'}
349 response = requests.request(
350 "POST", url, data=json.dumps(data),
351 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
352 self.assertEqual(response.status_code, requests.codes.ok)
353 res = response.json()
354 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
358 def test_18_servicePath_create_ZToA(self):
359 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
362 "service-name": "test",
364 "modulation-format": "qpsk",
365 "operation": "create",
368 "dest-tp": "XPDR1-NETWORK1",
369 "src-tp": "XPDR1-CLIENT1",
373 "dest-tp": "DEG1-TTP-TXRX",
374 "src-tp": "SRG1-PP1-TXRX",
375 "node-id": "ROADM-C1"
378 "src-tp": "DEG2-TTP-TXRX",
379 "dest-tp": "SRG1-PP1-TXRX",
380 "node-id": "ROADM-A1"
383 "src-tp": "XPDR1-NETWORK1",
384 "dest-tp": "XPDR1-CLIENT1",
390 headers = {'content-type': 'application/json'}
391 response = requests.request(
392 "POST", url, data=json.dumps(data),
393 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
396 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
400 def test_19_service_power_setup_XPDRA_XPDRC(self):
401 url = "{}/operations/transportpce-olm:service-power-setup".format(test_utils.RESTCONF_BASE_URL)
404 "service-name": "test",
408 "dest-tp": "XPDR1-NETWORK1",
409 "src-tp": "XPDR1-CLIENT1",
413 "dest-tp": "DEG2-TTP-TXRX",
414 "src-tp": "SRG1-PP1-TXRX",
415 "node-id": "ROADM-A1"
418 "dest-tp": "SRG1-PP1-TXRX",
419 "src-tp": "DEG1-TTP-TXRX",
420 "node-id": "ROADM-C1"
423 "dest-tp": "XPDR1-CLIENT1",
424 "src-tp": "XPDR1-NETWORK1",
430 headers = {'content-type': 'application/json'}
431 response = requests.request(
432 "POST", url, data=json.dumps(data),
433 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
434 self.assertEqual(response.status_code, requests.codes.ok)
435 res = response.json()
436 self.assertIn('Success', res["output"]["result"])
438 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
439 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
440 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
441 "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
442 headers = {'content-type': 'application/json'}
443 response = requests.request(
444 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
445 self.assertEqual(response.status_code, requests.codes.ok)
446 res = response.json()
447 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
448 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
450 def test_21_get_roadmconnection_ROADMA(self):
451 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
452 "org-openroadm-device:org-openroadm-device/roadm-connections/"
453 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
454 headers = {'content-type': 'application/json'}
455 response = requests.request(
456 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
457 self.assertEqual(response.status_code, requests.codes.ok)
458 res = response.json()
459 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
460 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
462 def test_22_get_roadmconnection_ROADMC(self):
463 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
464 "org-openroadm-device:org-openroadm-device/roadm-connections/"
465 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
466 headers = {'content-type': 'application/json'}
467 response = requests.request(
468 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
469 self.assertEqual(response.status_code, requests.codes.ok)
470 res = response.json()
471 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
473 def test_23_service_power_setup_XPDRC_XPDRA(self):
474 url = "{}/operations/transportpce-olm:service-power-setup".format(test_utils.RESTCONF_BASE_URL)
477 "service-name": "test",
481 "dest-tp": "XPDR1-NETWORK1",
482 "src-tp": "XPDR1-CLIENT1",
486 "dest-tp": "DEG1-TTP-TXRX",
487 "src-tp": "SRG1-PP1-TXRX",
488 "node-id": "ROADM-C1"
491 "src-tp": "DEG2-TTP-TXRX",
492 "dest-tp": "SRG1-PP1-TXRX",
493 "node-id": "ROADM-A1"
496 "src-tp": "XPDR1-NETWORK1",
497 "dest-tp": "XPDR1-CLIENT1",
503 headers = {'content-type': 'application/json'}
504 response = requests.request(
505 "POST", url, data=json.dumps(data),
506 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
507 self.assertEqual(response.status_code, requests.codes.ok)
508 res = response.json()
509 self.assertIn('Success', res["output"]["result"])
511 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
512 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
513 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
514 "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
515 headers = {'content-type': 'application/json'}
516 response = requests.request(
517 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
518 self.assertEqual(response.status_code, requests.codes.ok)
519 res = response.json()
520 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
521 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
523 def test_25_get_roadmconnection_ROADMC(self):
524 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
525 "org-openroadm-device:org-openroadm-device/roadm-connections/"
526 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
527 headers = {'content-type': 'application/json'}
528 response = requests.request(
529 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
530 self.assertEqual(response.status_code, requests.codes.ok)
531 res = response.json()
532 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
533 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
535 def test_26_service_power_turndown_XPDRA_XPDRC(self):
536 url = "{}/operations/transportpce-olm:service-power-turndown".format(test_utils.RESTCONF_BASE_URL)
539 "service-name": "test",
543 "dest-tp": "XPDR1-NETWORK1",
544 "src-tp": "XPDR1-CLIENT1",
548 "dest-tp": "DEG2-TTP-TXRX",
549 "src-tp": "SRG1-PP1-TXRX",
550 "node-id": "ROADM-A1"
553 "dest-tp": "SRG1-PP1-TXRX",
554 "src-tp": "DEG1-TTP-TXRX",
555 "node-id": "ROADM-C1"
558 "dest-tp": "XPDR1-CLIENT1",
559 "src-tp": "XPDR1-NETWORK1",
565 headers = {'content-type': 'application/json'}
566 response = requests.request(
567 "POST", url, data=json.dumps(data),
568 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
569 self.assertEqual(response.status_code, requests.codes.ok)
570 res = response.json()
571 self.assertIn('Success', res["output"]["result"])
573 def test_27_get_roadmconnection_ROADMA(self):
574 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
575 "org-openroadm-device:org-openroadm-device/roadm-connections/"
576 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
577 headers = {'content-type': 'application/json'}
578 response = requests.request(
579 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
580 self.assertEqual(response.status_code, requests.codes.ok)
581 res = response.json()
582 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
583 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
585 def test_28_get_roadmconnection_ROADMC(self):
586 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
587 "org-openroadm-device:org-openroadm-device/roadm-connections/"
588 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
589 headers = {'content-type': 'application/json'}
590 response = requests.request(
591 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
592 self.assertEqual(response.status_code, requests.codes.ok)
593 res = response.json()
594 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
596 def test_29_servicePath_delete_AToZ(self):
597 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
600 "service-name": "test",
602 "modulation-format": "qpsk",
603 "operation": "delete",
606 "dest-tp": "XPDR1-NETWORK1",
607 "src-tp": "XPDR1-CLIENT1",
611 "dest-tp": "DEG2-TTP-TXRX",
612 "src-tp": "SRG1-PP1-TXRX",
613 "node-id": "ROADM-A1"
616 "dest-tp": "SRG1-PP1-TXRX",
617 "src-tp": "DEG1-TTP-TXRX",
618 "node-id": "ROADM-C1"
621 "dest-tp": "XPDR1-CLIENT1",
622 "src-tp": "XPDR1-NETWORK1",
628 headers = {'content-type': 'application/json'}
629 response = requests.request(
630 "POST", url, data=json.dumps(data),
631 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
632 self.assertEqual(response.status_code, requests.codes.ok)
633 res = response.json()
634 self.assertIn('Request processed', res["output"]["result"])
637 def test_30_servicePath_delete_ZToA(self):
638 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
641 "service-name": "test",
643 "modulation-format": "qpsk",
644 "operation": "delete",
647 "dest-tp": "XPDR1-NETWORK1",
648 "src-tp": "XPDR1-CLIENT1",
652 "dest-tp": "DEG1-TTP-TXRX",
653 "src-tp": "SRG1-PP1-TXRX",
654 "node-id": "ROADM-C1"
657 "src-tp": "DEG2-TTP-TXRX",
658 "dest-tp": "SRG1-PP1-TXRX",
659 "node-id": "ROADM-A1"
662 "src-tp": "XPDR1-NETWORK1",
663 "dest-tp": "XPDR1-CLIENT1",
669 headers = {'content-type': 'application/json'}
670 response = requests.request(
671 "POST", url, data=json.dumps(data),
672 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
673 self.assertEqual(response.status_code, requests.codes.ok)
674 res = response.json()
675 self.assertIn('Request processed', res["output"]["result"])
678 """to test case where SRG where the xpdr is connected to has no optical range data"""
680 def test_31_connect_xprdA_to_roadmA(self):
681 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
683 "networkutils:input": {
684 "networkutils:links-input": {
685 "networkutils:xpdr-node": "XPDR-A1",
686 "networkutils:xpdr-num": "1",
687 "networkutils:network-num": "2",
688 "networkutils:rdm-node": "ROADM-A1",
689 "networkutils:srg-num": "1",
690 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
694 headers = {'content-type': 'application/json'}
695 response = requests.request(
696 "POST", url, data=json.dumps(data),
697 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
698 self.assertEqual(response.status_code, requests.codes.ok)
699 res = response.json()
700 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
702 def test_32_connect_roadmA_to_xpdrA(self):
703 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
705 "networkutils:input": {
706 "networkutils:links-input": {
707 "networkutils:xpdr-node": "XPDR-A1",
708 "networkutils:xpdr-num": "1",
709 "networkutils:network-num": "2",
710 "networkutils:rdm-node": "ROADM-A1",
711 "networkutils:srg-num": "1",
712 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
716 headers = {'content-type': 'application/json'}
717 response = requests.request(
718 "POST", url, data=json.dumps(data),
719 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
720 self.assertEqual(response.status_code, requests.codes.ok)
721 res = response.json()
722 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
724 def test_33_servicePath_create_AToZ(self):
725 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
728 "service-name": "test2",
730 "modulation-format": "qpsk",
731 "operation": "create",
734 "dest-tp": "XPDR1-NETWORK2",
735 "src-tp": "XPDR1-CLIENT2",
739 "dest-tp": "DEG2-TTP-TXRX",
740 "src-tp": "SRG1-PP2-TXRX",
741 "node-id": "ROADM-A1"
746 headers = {'content-type': 'application/json'}
747 response = requests.request(
748 "POST", url, data=json.dumps(data),
749 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
750 self.assertEqual(response.status_code, requests.codes.ok)
751 res = response.json()
752 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
756 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
757 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
758 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
759 "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
760 headers = {'content-type': 'application/json'}
761 response = requests.request(
762 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
763 self.assertEqual(response.status_code, requests.codes.ok)
764 res = response.json()
765 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
766 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
768 def test_35_servicePath_delete_AToZ(self):
769 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
772 "service-name": "test",
774 "modulation-format": "qpsk",
775 "operation": "delete",
778 "dest-tp": "XPDR1-NETWORK2",
779 "src-tp": "XPDR1-CLIENT2",
783 "dest-tp": "DEG2-TTP-TXRX",
784 "src-tp": "SRG1-PP2-TXRX",
785 "node-id": "ROADM-A1"
790 headers = {'content-type': 'application/json'}
791 response = requests.request(
792 "POST", url, data=json.dumps(data),
793 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
794 self.assertEqual(response.status_code, requests.codes.ok)
795 res = response.json()
796 self.assertIn('Request processed', res["output"]["result"])
799 def test_36_xpdrA_device_disconnected(self):
800 response = test_utils.unmount_device("XPDR-A1")
801 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
803 def test_37_xpdrC_device_disconnected(self):
804 response = test_utils.unmount_device("XPDR-C1")
805 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
807 def test_38_calculate_span_loss_current(self):
808 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(test_utils.RESTCONF_BASE_URL)
809 headers = {'content-type': 'application/json'}
810 response = requests.request(
811 "POST", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
812 self.assertEqual(response.status_code, requests.codes.ok)
813 res = response.json()
814 self.assertIn('Success',
815 res["output"]["result"])
818 def test_39_rdmA_device_disconnected(self):
819 response = test_utils.unmount_device("ROADM-A1")
820 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
822 def test_40_rdmC_device_disconnected(self):
823 response = test_utils.unmount_device("ROADM-C1")
824 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
827 if __name__ == "__main__":
828 unittest.main(verbosity=2)