3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
15 #from unittest.result import failfast
17 from common import test_utils
20 class TransportOlmTesting(unittest.TestCase):
26 cls.processes = test_utils.start_tpce()
27 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
30 def tearDownClass(cls):
31 for process in cls.processes:
32 test_utils.shutdown_process(process)
33 print("all processes killed")
36 print("execution of {}".format(self.id().split(".")[-1]))
39 def test_01_xpdrA_device_connected(self):
40 response = test_utils.mount_device("XPDR-A1", 'xpdra')
41 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
43 def test_02_xpdrC_device_connected(self):
44 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
45 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
47 def test_03_rdmA_device_connected(self):
48 response = test_utils.mount_device("ROADM-A1", 'roadma')
49 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
51 def test_04_rdmC_device_connected(self):
52 response = test_utils.mount_device("ROADM-C1", 'roadmc')
53 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
55 def test_05_connect_xprdA_to_roadmA(self):
56 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
58 "networkutils:input": {
59 "networkutils:links-input": {
60 "networkutils:xpdr-node": "XPDR-A1",
61 "networkutils:xpdr-num": "1",
62 "networkutils:network-num": "1",
63 "networkutils:rdm-node": "ROADM-A1",
64 "networkutils:srg-num": "1",
65 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
69 headers = {'content-type': 'application/json'}
70 response = requests.request(
71 "POST", url, data=json.dumps(data),
72 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
73 self.assertEqual(response.status_code, requests.codes.ok)
75 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
77 def test_06_connect_roadmA_to_xpdrA(self):
78 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
80 "networkutils:input": {
81 "networkutils:links-input": {
82 "networkutils:xpdr-node": "XPDR-A1",
83 "networkutils:xpdr-num": "1",
84 "networkutils:network-num": "1",
85 "networkutils:rdm-node": "ROADM-A1",
86 "networkutils:srg-num": "1",
87 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
91 headers = {'content-type': 'application/json'}
92 response = requests.request(
93 "POST", url, data=json.dumps(data),
94 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
95 self.assertEqual(response.status_code, requests.codes.ok)
97 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
99 def test_07_connect_xprdC_to_roadmC(self):
100 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
102 "networkutils:input": {
103 "networkutils:links-input": {
104 "networkutils:xpdr-node": "XPDR-C1",
105 "networkutils:xpdr-num": "1",
106 "networkutils:network-num": "1",
107 "networkutils:rdm-node": "ROADM-C1",
108 "networkutils:srg-num": "1",
109 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
113 headers = {'content-type': 'application/json'}
114 response = requests.request(
115 "POST", url, data=json.dumps(data),
116 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
117 self.assertEqual(response.status_code, requests.codes.ok)
118 res = response.json()
119 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
121 def test_08_connect_roadmC_to_xpdrC(self):
122 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
124 "networkutils:input": {
125 "networkutils:links-input": {
126 "networkutils:xpdr-node": "XPDR-C1",
127 "networkutils:xpdr-num": "1",
128 "networkutils:network-num": "1",
129 "networkutils:rdm-node": "ROADM-C1",
130 "networkutils:srg-num": "1",
131 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
135 headers = {'content-type': 'application/json'}
136 response = requests.request(
137 "POST", url, data=json.dumps(data),
138 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
139 self.assertEqual(response.status_code, requests.codes.ok)
140 res = response.json()
141 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
143 def test_09_create_OTS_ROADMA(self):
144 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(test_utils.RESTCONF_BASE_URL)
147 "node-id": "ROADM-A1",
148 "logical-connection-point": "DEG1-TTP-TXRX"
151 headers = {'content-type': 'application/json'}
152 response = requests.request(
153 "POST", url, data=json.dumps(data),
154 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
159 res["output"]["result"])
161 def test_10_create_OTS_ROADMC(self):
162 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(test_utils.RESTCONF_BASE_URL)
165 "node-id": "ROADM-C1",
166 "logical-connection-point": "DEG2-TTP-TXRX"
169 headers = {'content-type': 'application/json'}
170 response = requests.request(
171 "POST", url, data=json.dumps(data),
172 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
173 self.assertEqual(response.status_code, requests.codes.ok)
174 res = response.json()
175 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
176 res["output"]["result"])
178 def test_11_get_PM_ROADMA(self):
179 url = "{}/operations/transportpce-olm:get-pm".format(test_utils.RESTCONF_BASE_URL)
182 "node-id": "ROADM-A1",
183 "resource-type": "interface",
184 "granularity": "15min",
185 "resource-identifier": {
186 "resource-name": "OTS-DEG2-TTP-TXRX"
190 headers = {'content-type': 'application/json'}
191 response = requests.request(
192 "POST", url, data=json.dumps(data),
193 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
194 self.assertEqual(response.status_code, requests.codes.ok)
195 res = response.json()
197 "pmparameter-name": "OpticalPowerOutput",
198 "pmparameter-value": "2.5"
199 }, res["output"]["measurements"])
201 "pmparameter-name": "OpticalReturnLoss",
202 "pmparameter-value": "40"
203 }, res["output"]["measurements"])
205 "pmparameter-name": "OpticalPowerInput",
206 "pmparameter-value": "-21.1"
207 }, res["output"]["measurements"])
209 def test_12_get_PM_ROADMC(self):
210 url = "{}/operations/transportpce-olm:get-pm".format(test_utils.RESTCONF_BASE_URL)
213 "node-id": "ROADM-C1",
214 "resource-type": "interface",
215 "granularity": "15min",
216 "resource-identifier": {
217 "resource-name": "OTS-DEG1-TTP-TXRX"
221 headers = {'content-type': 'application/json'}
222 response = requests.request(
223 "POST", url, data=json.dumps(data),
224 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
228 "pmparameter-name": "OpticalPowerOutput",
229 "pmparameter-value": "4.6"
230 }, res["output"]["measurements"])
232 "pmparameter-name": "OpticalReturnLoss",
233 "pmparameter-value": "49.1"
234 }, res["output"]["measurements"])
236 "pmparameter-name": "OpticalPowerInput",
237 "pmparameter-value": "-15.1"
238 }, res["output"]["measurements"])
240 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
241 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(test_utils.RESTCONF_BASE_URL)
245 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
248 headers = {'content-type': 'application/json'}
249 response = requests.request(
250 "POST", url, data=json.dumps(data),
251 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
252 self.assertEqual(response.status_code, requests.codes.ok)
253 res = response.json()
254 self.assertIn('Success',
255 res["output"]["result"])
258 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
259 }, res["output"]["spans"])
262 def test_14_calculate_span_loss_base_all(self):
263 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(test_utils.RESTCONF_BASE_URL)
269 headers = {'content-type': 'application/json'}
270 response = requests.request(
271 "POST", url, data=json.dumps(data),
272 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
273 self.assertEqual(response.status_code, requests.codes.ok)
274 res = response.json()
275 self.assertIn('Success',
276 res["output"]["result"])
279 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
280 }, res["output"]["spans"])
283 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
284 }, res["output"]["spans"])
287 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
288 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
289 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
290 "org-openroadm-optical-transport-interfaces:ots".format(test_utils.RESTCONF_BASE_URL))
291 headers = {'content-type': 'application/json'}
292 response = requests.request(
293 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
294 self.assertEqual(response.status_code, requests.codes.ok)
295 res = response.json()
296 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
297 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
299 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
300 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
301 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
302 "org-openroadm-optical-transport-interfaces:ots".format(test_utils.RESTCONF_BASE_URL))
303 headers = {'content-type': 'application/json'}
304 response = requests.request(
305 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
306 self.assertEqual(response.status_code, requests.codes.ok)
307 res = response.json()
308 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
309 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
311 def test_17_servicePath_create_AToZ(self):
312 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
315 "service-name": "test",
317 "modulation-format": "qpsk",
318 "operation": "create",
321 "dest-tp": "XPDR1-NETWORK1",
322 "src-tp": "XPDR1-CLIENT1",
326 "dest-tp": "DEG2-TTP-TXRX",
327 "src-tp": "SRG1-PP1-TXRX",
328 "node-id": "ROADM-A1"
331 "dest-tp": "SRG1-PP1-TXRX",
332 "src-tp": "DEG1-TTP-TXRX",
333 "node-id": "ROADM-C1"
336 "dest-tp": "XPDR1-CLIENT1",
337 "src-tp": "XPDR1-NETWORK1",
343 headers = {'content-type': 'application/json'}
344 response = requests.request(
345 "POST", url, data=json.dumps(data),
346 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
347 self.assertEqual(response.status_code, requests.codes.ok)
348 res = response.json()
349 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
353 def test_18_servicePath_create_ZToA(self):
354 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
357 "service-name": "test",
359 "modulation-format": "qpsk",
360 "operation": "create",
363 "dest-tp": "XPDR1-NETWORK1",
364 "src-tp": "XPDR1-CLIENT1",
368 "dest-tp": "DEG1-TTP-TXRX",
369 "src-tp": "SRG1-PP1-TXRX",
370 "node-id": "ROADM-C1"
373 "src-tp": "DEG2-TTP-TXRX",
374 "dest-tp": "SRG1-PP1-TXRX",
375 "node-id": "ROADM-A1"
378 "src-tp": "XPDR1-NETWORK1",
379 "dest-tp": "XPDR1-CLIENT1",
385 headers = {'content-type': 'application/json'}
386 response = requests.request(
387 "POST", url, data=json.dumps(data),
388 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
389 self.assertEqual(response.status_code, requests.codes.ok)
390 res = response.json()
391 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
395 def test_19_service_power_setup_XPDRA_XPDRC(self):
396 url = "{}/operations/transportpce-olm:service-power-setup".format(test_utils.RESTCONF_BASE_URL)
399 "service-name": "test",
403 "dest-tp": "XPDR1-NETWORK1",
404 "src-tp": "XPDR1-CLIENT1",
408 "dest-tp": "DEG2-TTP-TXRX",
409 "src-tp": "SRG1-PP1-TXRX",
410 "node-id": "ROADM-A1"
413 "dest-tp": "SRG1-PP1-TXRX",
414 "src-tp": "DEG1-TTP-TXRX",
415 "node-id": "ROADM-C1"
418 "dest-tp": "XPDR1-CLIENT1",
419 "src-tp": "XPDR1-NETWORK1",
425 headers = {'content-type': 'application/json'}
426 response = requests.request(
427 "POST", url, data=json.dumps(data),
428 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
429 self.assertEqual(response.status_code, requests.codes.ok)
430 res = response.json()
431 self.assertIn('Success', res["output"]["result"])
433 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
434 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
435 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
436 "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
437 headers = {'content-type': 'application/json'}
438 response = requests.request(
439 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
440 self.assertEqual(response.status_code, requests.codes.ok)
441 res = response.json()
442 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
443 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
445 def test_21_get_roadmconnection_ROADMA(self):
446 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
447 "org-openroadm-device:org-openroadm-device/roadm-connections/"
448 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
449 headers = {'content-type': 'application/json'}
450 response = requests.request(
451 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
452 self.assertEqual(response.status_code, requests.codes.ok)
453 res = response.json()
454 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
455 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
457 def test_22_get_roadmconnection_ROADMC(self):
458 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
459 "org-openroadm-device:org-openroadm-device/roadm-connections/"
460 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
461 headers = {'content-type': 'application/json'}
462 response = requests.request(
463 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
464 self.assertEqual(response.status_code, requests.codes.ok)
465 res = response.json()
466 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
468 def test_23_service_power_setup_XPDRC_XPDRA(self):
469 url = "{}/operations/transportpce-olm:service-power-setup".format(test_utils.RESTCONF_BASE_URL)
472 "service-name": "test",
476 "dest-tp": "XPDR1-NETWORK1",
477 "src-tp": "XPDR1-CLIENT1",
481 "dest-tp": "DEG1-TTP-TXRX",
482 "src-tp": "SRG1-PP1-TXRX",
483 "node-id": "ROADM-C1"
486 "src-tp": "DEG2-TTP-TXRX",
487 "dest-tp": "SRG1-PP1-TXRX",
488 "node-id": "ROADM-A1"
491 "src-tp": "XPDR1-NETWORK1",
492 "dest-tp": "XPDR1-CLIENT1",
498 headers = {'content-type': 'application/json'}
499 response = requests.request(
500 "POST", url, data=json.dumps(data),
501 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
502 self.assertEqual(response.status_code, requests.codes.ok)
503 res = response.json()
504 self.assertIn('Success', res["output"]["result"])
506 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
507 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
508 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
509 "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
510 headers = {'content-type': 'application/json'}
511 response = requests.request(
512 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
513 self.assertEqual(response.status_code, requests.codes.ok)
514 res = response.json()
515 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
516 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
518 def test_25_get_roadmconnection_ROADMC(self):
519 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
520 "org-openroadm-device:org-openroadm-device/roadm-connections/"
521 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
522 headers = {'content-type': 'application/json'}
523 response = requests.request(
524 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
525 self.assertEqual(response.status_code, requests.codes.ok)
526 res = response.json()
527 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
528 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
530 def test_26_service_power_turndown_XPDRA_XPDRC(self):
531 url = "{}/operations/transportpce-olm:service-power-turndown".format(test_utils.RESTCONF_BASE_URL)
534 "service-name": "test",
538 "dest-tp": "XPDR1-NETWORK1",
539 "src-tp": "XPDR1-CLIENT1",
543 "dest-tp": "DEG2-TTP-TXRX",
544 "src-tp": "SRG1-PP1-TXRX",
545 "node-id": "ROADM-A1"
548 "dest-tp": "SRG1-PP1-TXRX",
549 "src-tp": "DEG1-TTP-TXRX",
550 "node-id": "ROADM-C1"
553 "dest-tp": "XPDR1-CLIENT1",
554 "src-tp": "XPDR1-NETWORK1",
560 headers = {'content-type': 'application/json'}
561 response = requests.request(
562 "POST", url, data=json.dumps(data),
563 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
564 self.assertEqual(response.status_code, requests.codes.ok)
565 res = response.json()
566 self.assertIn('Success', res["output"]["result"])
568 def test_27_get_roadmconnection_ROADMA(self):
569 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
570 "org-openroadm-device:org-openroadm-device/roadm-connections/"
571 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
572 headers = {'content-type': 'application/json'}
573 response = requests.request(
574 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
575 self.assertEqual(response.status_code, requests.codes.ok)
576 res = response.json()
577 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
578 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
580 def test_28_get_roadmconnection_ROADMC(self):
581 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
582 "org-openroadm-device:org-openroadm-device/roadm-connections/"
583 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(test_utils.RESTCONF_BASE_URL))
584 headers = {'content-type': 'application/json'}
585 response = requests.request(
586 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
587 self.assertEqual(response.status_code, requests.codes.ok)
588 res = response.json()
589 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
591 def test_29_servicePath_delete_AToZ(self):
592 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
595 "service-name": "test",
597 "modulation-format": "qpsk",
598 "operation": "delete",
601 "dest-tp": "XPDR1-NETWORK1",
602 "src-tp": "XPDR1-CLIENT1",
606 "dest-tp": "DEG2-TTP-TXRX",
607 "src-tp": "SRG1-PP1-TXRX",
608 "node-id": "ROADM-A1"
611 "dest-tp": "SRG1-PP1-TXRX",
612 "src-tp": "DEG1-TTP-TXRX",
613 "node-id": "ROADM-C1"
616 "dest-tp": "XPDR1-CLIENT1",
617 "src-tp": "XPDR1-NETWORK1",
623 headers = {'content-type': 'application/json'}
624 response = requests.request(
625 "POST", url, data=json.dumps(data),
626 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
627 self.assertEqual(response.status_code, requests.codes.ok)
628 res = response.json()
629 self.assertIn('Request processed', res["output"]["result"])
632 def test_30_servicePath_delete_ZToA(self):
633 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
636 "service-name": "test",
638 "modulation-format": "qpsk",
639 "operation": "delete",
642 "dest-tp": "XPDR1-NETWORK1",
643 "src-tp": "XPDR1-CLIENT1",
647 "dest-tp": "DEG1-TTP-TXRX",
648 "src-tp": "SRG1-PP1-TXRX",
649 "node-id": "ROADM-C1"
652 "src-tp": "DEG2-TTP-TXRX",
653 "dest-tp": "SRG1-PP1-TXRX",
654 "node-id": "ROADM-A1"
657 "src-tp": "XPDR1-NETWORK1",
658 "dest-tp": "XPDR1-CLIENT1",
664 headers = {'content-type': 'application/json'}
665 response = requests.request(
666 "POST", url, data=json.dumps(data),
667 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
668 self.assertEqual(response.status_code, requests.codes.ok)
669 res = response.json()
670 self.assertIn('Request processed', res["output"]["result"])
673 """to test case where SRG where the xpdr is connected to has no optical range data"""
675 def test_31_connect_xprdA_to_roadmA(self):
676 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
678 "networkutils:input": {
679 "networkutils:links-input": {
680 "networkutils:xpdr-node": "XPDR-A1",
681 "networkutils:xpdr-num": "1",
682 "networkutils:network-num": "2",
683 "networkutils:rdm-node": "ROADM-A1",
684 "networkutils:srg-num": "1",
685 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
689 headers = {'content-type': 'application/json'}
690 response = requests.request(
691 "POST", url, data=json.dumps(data),
692 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
693 self.assertEqual(response.status_code, requests.codes.ok)
694 res = response.json()
695 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
697 def test_32_connect_roadmA_to_xpdrA(self):
698 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
700 "networkutils:input": {
701 "networkutils:links-input": {
702 "networkutils:xpdr-node": "XPDR-A1",
703 "networkutils:xpdr-num": "1",
704 "networkutils:network-num": "2",
705 "networkutils:rdm-node": "ROADM-A1",
706 "networkutils:srg-num": "1",
707 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
711 headers = {'content-type': 'application/json'}
712 response = requests.request(
713 "POST", url, data=json.dumps(data),
714 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
715 self.assertEqual(response.status_code, requests.codes.ok)
716 res = response.json()
717 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
719 def test_33_servicePath_create_AToZ(self):
720 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
723 "service-name": "test2",
725 "modulation-format": "qpsk",
726 "operation": "create",
729 "dest-tp": "XPDR1-NETWORK2",
730 "src-tp": "XPDR1-CLIENT2",
734 "dest-tp": "DEG2-TTP-TXRX",
735 "src-tp": "SRG1-PP2-TXRX",
736 "node-id": "ROADM-A1"
741 headers = {'content-type': 'application/json'}
742 response = requests.request(
743 "POST", url, data=json.dumps(data),
744 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
745 self.assertEqual(response.status_code, requests.codes.ok)
746 res = response.json()
747 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
751 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
752 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
753 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
754 "org-openroadm-optical-channel-interfaces:och".format(test_utils.RESTCONF_BASE_URL))
755 headers = {'content-type': 'application/json'}
756 response = requests.request(
757 "GET", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
758 self.assertEqual(response.status_code, requests.codes.ok)
759 res = response.json()
760 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
761 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
763 def test_35_servicePath_delete_AToZ(self):
764 url = "{}/operations/transportpce-device-renderer:service-path".format(test_utils.RESTCONF_BASE_URL)
767 "service-name": "test",
769 "modulation-format": "qpsk",
770 "operation": "delete",
773 "dest-tp": "XPDR1-NETWORK2",
774 "src-tp": "XPDR1-CLIENT2",
778 "dest-tp": "DEG2-TTP-TXRX",
779 "src-tp": "SRG1-PP2-TXRX",
780 "node-id": "ROADM-A1"
785 headers = {'content-type': 'application/json'}
786 response = requests.request(
787 "POST", url, data=json.dumps(data),
788 headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
789 self.assertEqual(response.status_code, requests.codes.ok)
790 res = response.json()
791 self.assertIn('Request processed', res["output"]["result"])
794 def test_36_xpdrA_device_disconnected(self):
795 response = test_utils.unmount_device("XPDR-A1")
796 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
798 def test_37_xpdrC_device_disconnected(self):
799 response = test_utils.unmount_device("XPDR-C1")
800 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
802 def test_38_calculate_span_loss_current(self):
803 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(test_utils.RESTCONF_BASE_URL)
804 headers = {'content-type': 'application/json'}
805 response = requests.request(
806 "POST", url, headers=headers, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
807 self.assertEqual(response.status_code, requests.codes.ok)
808 res = response.json()
809 self.assertIn('Success',
810 res["output"]["result"])
813 def test_39_rdmA_device_disconnected(self):
814 response = test_utils.unmount_device("ROADM-A1")
815 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
817 def test_40_rdmC_device_disconnected(self):
818 response = test_utils.unmount_device("ROADM-C1")
819 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
822 if __name__ == "__main__":
823 unittest.main(verbosity=2)