3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
21 from unittest.result import failfast
22 from common import test_utils
25 class TransportOlmTesting(unittest.TestCase):
28 restconf_baseurl = "http://localhost:8181/restconf"
32 cls.processes = test_utils.start_tpce()
33 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
36 def tearDownClass(cls):
37 for process in cls.processes:
38 test_utils.shutdown_process(process)
39 print("all processes killed")
42 print("execution of {}".format(self.id().split(".")[-1]))
45 def test_01_xpdrA_device_connected(self):
46 url = ("{}/config/network-topology:"
47 "network-topology/topology/topology-netconf/node/XPDR-A1"
48 .format(self.restconf_baseurl))
51 "netconf-node-topology:username": "admin",
52 "netconf-node-topology:password": "admin",
53 "netconf-node-topology:host": "127.0.0.1",
54 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
55 "netconf-node-topology:tcp-only": "false",
56 "netconf-node-topology:pass-through": {}}]}
57 headers = {'content-type': 'application/json'}
58 response = requests.request(
59 "PUT", url, data=json.dumps(data), headers=headers,
60 auth=('admin', 'admin'))
61 self.assertEqual(response.status_code, requests.codes.created)
64 def test_02_xpdrC_device_connected(self):
65 url = ("{}/config/network-topology:"
66 "network-topology/topology/topology-netconf/node/XPDR-C1"
67 .format(self.restconf_baseurl))
70 "netconf-node-topology:username": "admin",
71 "netconf-node-topology:password": "admin",
72 "netconf-node-topology:host": "127.0.0.1",
73 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
74 "netconf-node-topology:tcp-only": "false",
75 "netconf-node-topology:pass-through": {}}]}
76 headers = {'content-type': 'application/json'}
77 response = requests.request(
78 "PUT", url, data=json.dumps(data), headers=headers,
79 auth=('admin', 'admin'))
80 self.assertEqual(response.status_code, requests.codes.created)
83 def test_03_rdmA_device_connected(self):
84 url = ("{}/config/network-topology:"
85 "network-topology/topology/topology-netconf/node/ROADM-A1"
86 .format(self.restconf_baseurl))
88 "node-id": "ROADM-A1",
89 "netconf-node-topology:username": "admin",
90 "netconf-node-topology:password": "admin",
91 "netconf-node-topology:host": "127.0.0.1",
92 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
93 "netconf-node-topology:tcp-only": "false",
94 "netconf-node-topology:pass-through": {}}]}
95 headers = {'content-type': 'application/json'}
96 response = requests.request(
97 "PUT", url, data=json.dumps(data), headers=headers,
98 auth=('admin', 'admin'))
99 self.assertEqual(response.status_code, requests.codes.created)
102 def test_04_rdmC_device_connected(self):
103 url = ("{}/config/network-topology:"
104 "network-topology/topology/topology-netconf/node/ROADM-C1"
105 .format(self.restconf_baseurl))
107 "node-id": "ROADM-C1",
108 "netconf-node-topology:username": "admin",
109 "netconf-node-topology:password": "admin",
110 "netconf-node-topology:host": "127.0.0.1",
111 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
112 "netconf-node-topology:tcp-only": "false",
113 "netconf-node-topology:pass-through": {}}]}
114 headers = {'content-type': 'application/json'}
115 response = requests.request(
116 "PUT", url, data=json.dumps(data), headers=headers,
117 auth=('admin', 'admin'))
118 self.assertEqual(response.status_code, requests.codes.created)
121 def test_05_connect_xprdA_to_roadmA(self):
122 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
124 "networkutils:input": {
125 "networkutils:links-input": {
126 "networkutils:xpdr-node": "XPDR-A1",
127 "networkutils:xpdr-num": "1",
128 "networkutils:network-num": "1",
129 "networkutils:rdm-node": "ROADM-A1",
130 "networkutils:srg-num": "1",
131 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
135 headers = {'content-type': 'application/json'}
136 response = requests.request(
137 "POST", url, data=json.dumps(data),
138 headers=headers, auth=('admin', 'admin'))
139 self.assertEqual(response.status_code, requests.codes.ok)
140 res = response.json()
141 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
143 def test_06_connect_roadmA_to_xpdrA(self):
144 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
146 "networkutils:input": {
147 "networkutils:links-input": {
148 "networkutils:xpdr-node": "XPDR-A1",
149 "networkutils:xpdr-num": "1",
150 "networkutils:network-num": "1",
151 "networkutils:rdm-node": "ROADM-A1",
152 "networkutils:srg-num": "1",
153 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
157 headers = {'content-type': 'application/json'}
158 response = requests.request(
159 "POST", url, data=json.dumps(data),
160 headers=headers, auth=('admin', 'admin'))
161 self.assertEqual(response.status_code, requests.codes.ok)
162 res = response.json()
163 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
165 def test_07_connect_xprdC_to_roadmC(self):
166 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
168 "networkutils:input": {
169 "networkutils:links-input": {
170 "networkutils:xpdr-node": "XPDR-C1",
171 "networkutils:xpdr-num": "1",
172 "networkutils:network-num": "1",
173 "networkutils:rdm-node": "ROADM-C1",
174 "networkutils:srg-num": "1",
175 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "POST", url, data=json.dumps(data),
182 headers=headers, auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
187 def test_08_connect_roadmC_to_xpdrC(self):
188 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
190 "networkutils:input": {
191 "networkutils:links-input": {
192 "networkutils:xpdr-node": "XPDR-C1",
193 "networkutils:xpdr-num": "1",
194 "networkutils:network-num": "1",
195 "networkutils:rdm-node": "ROADM-C1",
196 "networkutils:srg-num": "1",
197 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
201 headers = {'content-type': 'application/json'}
202 response = requests.request(
203 "POST", url, data=json.dumps(data),
204 headers=headers, auth=('admin', 'admin'))
205 self.assertEqual(response.status_code, requests.codes.ok)
206 res = response.json()
207 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
209 def test_09_create_OTS_ROADMA(self):
210 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
213 "node-id": "ROADM-A1",
214 "logical-connection-point": "DEG1-TTP-TXRX"
217 headers = {'content-type': 'application/json'}
218 response = requests.request(
219 "POST", url, data=json.dumps(data),
220 headers=headers, auth=('admin', 'admin'))
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
225 res["output"]["result"])
227 def test_10_create_OTS_ROADMC(self):
228 url = "{}/operations/transportpce-device-renderer:create-ots-oms".format(self.restconf_baseurl)
231 "node-id": "ROADM-C1",
232 "logical-connection-point": "DEG2-TTP-TXRX"
235 headers = {'content-type': 'application/json'}
236 response = requests.request(
237 "POST", url, data=json.dumps(data),
238 headers=headers, auth=('admin', 'admin'))
239 self.assertEqual(response.status_code, requests.codes.ok)
240 res = response.json()
241 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
242 res["output"]["result"])
244 def test_11_get_PM_ROADMA(self):
245 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
248 "node-id": "ROADM-A1",
249 "resource-type": "interface",
250 "granularity": "15min",
251 "resource-identifier": {
252 "resource-name": "OTS-DEG2-TTP-TXRX"
256 headers = {'content-type': 'application/json'}
257 response = requests.request(
258 "POST", url, data=json.dumps(data),
259 headers=headers, auth=('admin', 'admin'))
260 self.assertEqual(response.status_code, requests.codes.ok)
261 res = response.json()
263 "pmparameter-name": "OpticalPowerOutput",
264 "pmparameter-value": "2.5"
265 }, res["output"]["measurements"])
267 "pmparameter-name": "OpticalReturnLoss",
268 "pmparameter-value": "40"
269 }, res["output"]["measurements"])
271 "pmparameter-name": "OpticalPowerInput",
272 "pmparameter-value": "-21.1"
273 }, res["output"]["measurements"])
275 def test_12_get_PM_ROADMC(self):
276 url = "{}/operations/transportpce-olm:get-pm".format(self.restconf_baseurl)
279 "node-id": "ROADM-C1",
280 "resource-type": "interface",
281 "granularity": "15min",
282 "resource-identifier": {
283 "resource-name": "OTS-DEG1-TTP-TXRX"
287 headers = {'content-type': 'application/json'}
288 response = requests.request(
289 "POST", url, data=json.dumps(data),
290 headers=headers, auth=('admin', 'admin'))
291 self.assertEqual(response.status_code, requests.codes.ok)
292 res = response.json()
294 "pmparameter-name": "OpticalPowerOutput",
295 "pmparameter-value": "4.6"
296 }, res["output"]["measurements"])
298 "pmparameter-name": "OpticalReturnLoss",
299 "pmparameter-value": "49.1"
300 }, res["output"]["measurements"])
302 "pmparameter-name": "OpticalPowerInput",
303 "pmparameter-value": "-15.1"
304 }, res["output"]["measurements"])
306 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
307 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
311 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
314 headers = {'content-type': 'application/json'}
315 response = requests.request(
316 "POST", url, data=json.dumps(data),
317 headers=headers, auth=('admin', 'admin'))
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
320 self.assertIn('Success',
321 res["output"]["result"])
324 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
325 }, res["output"]["spans"])
328 def test_14_calculate_span_loss_base_all(self):
329 url = "{}/operations/transportpce-olm:calculate-spanloss-base".format(self.restconf_baseurl)
335 headers = {'content-type': 'application/json'}
336 response = requests.request(
337 "POST", url, data=json.dumps(data),
338 headers=headers, auth=('admin', 'admin'))
339 self.assertEqual(response.status_code, requests.codes.ok)
340 res = response.json()
341 self.assertIn('Success',
342 res["output"]["result"])
345 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
346 }, res["output"]["spans"])
349 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
350 }, res["output"]["spans"])
353 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
354 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
355 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
356 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
357 headers = {'content-type': 'application/json'}
358 response = requests.request(
359 "GET", url, headers=headers, auth=('admin', 'admin'))
360 self.assertEqual(response.status_code, requests.codes.ok)
361 res = response.json()
362 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
363 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
365 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
366 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
367 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
368 "org-openroadm-optical-transport-interfaces:ots".format(self.restconf_baseurl))
369 headers = {'content-type': 'application/json'}
370 response = requests.request(
371 "GET", url, headers=headers, auth=('admin', 'admin'))
372 self.assertEqual(response.status_code, requests.codes.ok)
373 res = response.json()
374 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
375 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
377 def test_17_servicePath_create_AToZ(self):
378 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
381 "service-name": "test",
383 "modulation-format": "qpsk",
384 "operation": "create",
387 "dest-tp": "XPDR1-NETWORK1",
388 "src-tp": "XPDR1-CLIENT1",
392 "dest-tp": "DEG2-TTP-TXRX",
393 "src-tp": "SRG1-PP1-TXRX",
394 "node-id": "ROADM-A1"
397 "dest-tp": "SRG1-PP1-TXRX",
398 "src-tp": "DEG1-TTP-TXRX",
399 "node-id": "ROADM-C1"
402 "dest-tp": "XPDR1-CLIENT1",
403 "src-tp": "XPDR1-NETWORK1",
409 headers = {'content-type': 'application/json'}
410 response = requests.request(
411 "POST", url, data=json.dumps(data),
412 headers=headers, auth=('admin', 'admin'))
413 self.assertEqual(response.status_code, requests.codes.ok)
414 res = response.json()
415 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
419 def test_18_servicePath_create_ZToA(self):
420 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
423 "service-name": "test",
425 "modulation-format": "qpsk",
426 "operation": "create",
429 "dest-tp": "XPDR1-NETWORK1",
430 "src-tp": "XPDR1-CLIENT1",
434 "dest-tp": "DEG1-TTP-TXRX",
435 "src-tp": "SRG1-PP1-TXRX",
436 "node-id": "ROADM-C1"
439 "src-tp": "DEG2-TTP-TXRX",
440 "dest-tp": "SRG1-PP1-TXRX",
441 "node-id": "ROADM-A1"
444 "src-tp": "XPDR1-NETWORK1",
445 "dest-tp": "XPDR1-CLIENT1",
451 headers = {'content-type': 'application/json'}
452 response = requests.request(
453 "POST", url, data=json.dumps(data),
454 headers=headers, auth=('admin', 'admin'))
455 self.assertEqual(response.status_code, requests.codes.ok)
456 res = response.json()
457 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
461 def test_19_service_power_setup_XPDRA_XPDRC(self):
462 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
465 "service-name": "test",
469 "dest-tp": "XPDR1-NETWORK1",
470 "src-tp": "XPDR1-CLIENT1",
474 "dest-tp": "DEG2-TTP-TXRX",
475 "src-tp": "SRG1-PP1-TXRX",
476 "node-id": "ROADM-A1"
479 "dest-tp": "SRG1-PP1-TXRX",
480 "src-tp": "DEG1-TTP-TXRX",
481 "node-id": "ROADM-C1"
484 "dest-tp": "XPDR1-CLIENT1",
485 "src-tp": "XPDR1-NETWORK1",
491 headers = {'content-type': 'application/json'}
492 response = requests.request(
493 "POST", url, data=json.dumps(data),
494 headers=headers, auth=('admin', 'admin'))
495 self.assertEqual(response.status_code, requests.codes.ok)
496 res = response.json()
497 self.assertIn('Success', res["output"]["result"])
499 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
500 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
501 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
502 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
503 headers = {'content-type': 'application/json'}
504 response = requests.request(
505 "GET", url, headers=headers, auth=('admin', 'admin'))
506 self.assertEqual(response.status_code, requests.codes.ok)
507 res = response.json()
508 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
509 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
511 def test_21_get_roadmconnection_ROADMA(self):
512 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
513 "org-openroadm-device:org-openroadm-device/roadm-connections/"
514 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
515 headers = {'content-type': 'application/json'}
516 response = requests.request(
517 "GET", url, headers=headers, auth=('admin', 'admin'))
518 self.assertEqual(response.status_code, requests.codes.ok)
519 res = response.json()
520 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
521 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
523 def test_22_get_roadmconnection_ROADMC(self):
524 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
525 "org-openroadm-device:org-openroadm-device/roadm-connections/"
526 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
527 headers = {'content-type': 'application/json'}
528 response = requests.request(
529 "GET", url, headers=headers, auth=('admin', 'admin'))
530 self.assertEqual(response.status_code, requests.codes.ok)
531 res = response.json()
532 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
534 def test_23_service_power_setup_XPDRC_XPDRA(self):
535 url = "{}/operations/transportpce-olm:service-power-setup".format(self.restconf_baseurl)
538 "service-name": "test",
542 "dest-tp": "XPDR1-NETWORK1",
543 "src-tp": "XPDR1-CLIENT1",
547 "dest-tp": "DEG1-TTP-TXRX",
548 "src-tp": "SRG1-PP1-TXRX",
549 "node-id": "ROADM-C1"
552 "src-tp": "DEG2-TTP-TXRX",
553 "dest-tp": "SRG1-PP1-TXRX",
554 "node-id": "ROADM-A1"
557 "src-tp": "XPDR1-NETWORK1",
558 "dest-tp": "XPDR1-CLIENT1",
564 headers = {'content-type': 'application/json'}
565 response = requests.request(
566 "POST", url, data=json.dumps(data),
567 headers=headers, auth=('admin', 'admin'))
568 self.assertEqual(response.status_code, requests.codes.ok)
569 res = response.json()
570 self.assertIn('Success', res["output"]["result"])
572 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
573 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
574 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
575 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
576 headers = {'content-type': 'application/json'}
577 response = requests.request(
578 "GET", url, headers=headers, auth=('admin', 'admin'))
579 self.assertEqual(response.status_code, requests.codes.ok)
580 res = response.json()
581 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
582 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
584 def test_25_get_roadmconnection_ROADMC(self):
585 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
586 "org-openroadm-device:org-openroadm-device/roadm-connections/"
587 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1".format(self.restconf_baseurl))
588 headers = {'content-type': 'application/json'}
589 response = requests.request(
590 "GET", url, headers=headers, auth=('admin', 'admin'))
591 self.assertEqual(response.status_code, requests.codes.ok)
592 res = response.json()
593 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
594 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
596 def test_26_service_power_turndown_XPDRA_XPDRC(self):
597 url = "{}/operations/transportpce-olm:service-power-turndown".format(self.restconf_baseurl)
600 "service-name": "test",
604 "dest-tp": "XPDR1-NETWORK1",
605 "src-tp": "XPDR1-CLIENT1",
609 "dest-tp": "DEG2-TTP-TXRX",
610 "src-tp": "SRG1-PP1-TXRX",
611 "node-id": "ROADM-A1"
614 "dest-tp": "SRG1-PP1-TXRX",
615 "src-tp": "DEG1-TTP-TXRX",
616 "node-id": "ROADM-C1"
619 "dest-tp": "XPDR1-CLIENT1",
620 "src-tp": "XPDR1-NETWORK1",
626 headers = {'content-type': 'application/json'}
627 response = requests.request(
628 "POST", url, data=json.dumps(data),
629 headers=headers, auth=('admin', 'admin'))
630 self.assertEqual(response.status_code, requests.codes.ok)
631 res = response.json()
632 self.assertIn('Success', res["output"]["result"])
634 def test_27_get_roadmconnection_ROADMA(self):
635 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
636 "org-openroadm-device:org-openroadm-device/roadm-connections/"
637 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1".format(self.restconf_baseurl))
638 headers = {'content-type': 'application/json'}
639 response = requests.request(
640 "GET", url, headers=headers, auth=('admin', 'admin'))
641 self.assertEqual(response.status_code, requests.codes.ok)
642 res = response.json()
643 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
644 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
646 def test_28_get_roadmconnection_ROADMC(self):
647 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
648 "org-openroadm-device:org-openroadm-device/roadm-connections/"
649 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1".format(self.restconf_baseurl))
650 headers = {'content-type': 'application/json'}
651 response = requests.request(
652 "GET", url, headers=headers, auth=('admin', 'admin'))
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
655 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
657 def test_29_servicePath_delete_AToZ(self):
658 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
661 "service-name": "test",
663 "modulation-format": "qpsk",
664 "operation": "delete",
667 "dest-tp": "XPDR1-NETWORK1",
668 "src-tp": "XPDR1-CLIENT1",
672 "dest-tp": "DEG2-TTP-TXRX",
673 "src-tp": "SRG1-PP1-TXRX",
674 "node-id": "ROADM-A1"
677 "dest-tp": "SRG1-PP1-TXRX",
678 "src-tp": "DEG1-TTP-TXRX",
679 "node-id": "ROADM-C1"
682 "dest-tp": "XPDR1-CLIENT1",
683 "src-tp": "XPDR1-NETWORK1",
689 headers = {'content-type': 'application/json'}
690 response = requests.request(
691 "POST", url, data=json.dumps(data),
692 headers=headers, auth=('admin', 'admin'))
693 self.assertEqual(response.status_code, requests.codes.ok)
694 res = response.json()
695 self.assertIn('Request processed', res["output"]["result"])
698 def test_30_servicePath_delete_ZToA(self):
699 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
702 "service-name": "test",
704 "modulation-format": "qpsk",
705 "operation": "delete",
708 "dest-tp": "XPDR1-NETWORK1",
709 "src-tp": "XPDR1-CLIENT1",
713 "dest-tp": "DEG1-TTP-TXRX",
714 "src-tp": "SRG1-PP1-TXRX",
715 "node-id": "ROADM-C1"
718 "src-tp": "DEG2-TTP-TXRX",
719 "dest-tp": "SRG1-PP1-TXRX",
720 "node-id": "ROADM-A1"
723 "src-tp": "XPDR1-NETWORK1",
724 "dest-tp": "XPDR1-CLIENT1",
730 headers = {'content-type': 'application/json'}
731 response = requests.request(
732 "POST", url, data=json.dumps(data),
733 headers=headers, auth=('admin', 'admin'))
734 self.assertEqual(response.status_code, requests.codes.ok)
735 res = response.json()
736 self.assertIn('Request processed', res["output"]["result"])
739 """to test case where SRG where the xpdr is connected to has no optical range data"""
741 def test_31_connect_xprdA_to_roadmA(self):
742 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
744 "networkutils:input": {
745 "networkutils:links-input": {
746 "networkutils:xpdr-node": "XPDR-A1",
747 "networkutils:xpdr-num": "1",
748 "networkutils:network-num": "2",
749 "networkutils:rdm-node": "ROADM-A1",
750 "networkutils:srg-num": "1",
751 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
755 headers = {'content-type': 'application/json'}
756 response = requests.request(
757 "POST", url, data=json.dumps(data),
758 headers=headers, auth=('admin', 'admin'))
759 self.assertEqual(response.status_code, requests.codes.ok)
760 res = response.json()
761 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
763 def test_32_connect_roadmA_to_xpdrA(self):
764 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
766 "networkutils:input": {
767 "networkutils:links-input": {
768 "networkutils:xpdr-node": "XPDR-A1",
769 "networkutils:xpdr-num": "1",
770 "networkutils:network-num": "2",
771 "networkutils:rdm-node": "ROADM-A1",
772 "networkutils:srg-num": "1",
773 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
777 headers = {'content-type': 'application/json'}
778 response = requests.request(
779 "POST", url, data=json.dumps(data),
780 headers=headers, auth=('admin', 'admin'))
781 self.assertEqual(response.status_code, requests.codes.ok)
782 res = response.json()
783 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
785 def test_33_servicePath_create_AToZ(self):
786 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
789 "service-name": "test2",
791 "modulation-format": "qpsk",
792 "operation": "create",
795 "dest-tp": "XPDR1-NETWORK2",
796 "src-tp": "XPDR1-CLIENT2",
800 "dest-tp": "DEG2-TTP-TXRX",
801 "src-tp": "SRG1-PP2-TXRX",
802 "node-id": "ROADM-A1"
807 headers = {'content-type': 'application/json'}
808 response = requests.request(
809 "POST", url, data=json.dumps(data),
810 headers=headers, auth=('admin', 'admin'))
811 self.assertEqual(response.status_code, requests.codes.ok)
812 res = response.json()
813 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
817 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
818 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
819 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
820 "org-openroadm-optical-channel-interfaces:och".format(self.restconf_baseurl))
821 headers = {'content-type': 'application/json'}
822 response = requests.request(
823 "GET", url, headers=headers, auth=('admin', 'admin'))
824 self.assertEqual(response.status_code, requests.codes.ok)
825 res = response.json()
826 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
827 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
829 def test_35_servicePath_delete_AToZ(self):
830 url = "{}/operations/transportpce-device-renderer:service-path".format(self.restconf_baseurl)
833 "service-name": "test",
835 "modulation-format": "qpsk",
836 "operation": "delete",
839 "dest-tp": "XPDR1-NETWORK2",
840 "src-tp": "XPDR1-CLIENT2",
844 "dest-tp": "DEG2-TTP-TXRX",
845 "src-tp": "SRG1-PP2-TXRX",
846 "node-id": "ROADM-A1"
851 headers = {'content-type': 'application/json'}
852 response = requests.request(
853 "POST", url, data=json.dumps(data),
854 headers=headers, auth=('admin', 'admin'))
855 self.assertEqual(response.status_code, requests.codes.ok)
856 res = response.json()
857 self.assertIn('Request processed', res["output"]["result"])
860 def test_36_xpdrA_device_disconnected(self):
861 url = ("{}/config/network-topology:"
862 "network-topology/topology/topology-netconf/node/XPDR-A1"
863 .format(self.restconf_baseurl))
864 headers = {'content-type': 'application/json'}
865 response = requests.request(
866 "DELETE", url, headers=headers,
867 auth=('admin', 'admin'))
868 self.assertEqual(response.status_code, requests.codes.ok)
871 def test_37_xpdrC_device_disconnected(self):
872 url = ("{}/config/network-topology:"
873 "network-topology/topology/topology-netconf/node/XPDR-C1"
874 .format(self.restconf_baseurl))
875 headers = {'content-type': 'application/json'}
876 response = requests.request(
877 "DELETE", url, headers=headers,
878 auth=('admin', 'admin'))
879 self.assertEqual(response.status_code, requests.codes.ok)
882 def test_38_calculate_span_loss_current(self):
883 url = "{}/operations/transportpce-olm:calculate-spanloss-current".format(self.restconf_baseurl)
884 headers = {'content-type': 'application/json'}
885 response = requests.request(
886 "POST", url, headers=headers, auth=('admin', 'admin'))
887 self.assertEqual(response.status_code, requests.codes.ok)
888 res = response.json()
889 self.assertIn('Success',
890 res["output"]["result"])
893 def test_39_rdmA_device_disconnected(self):
894 url = ("{}/config/network-topology:"
895 "network-topology/topology/topology-netconf/node/ROADM-A1"
896 .format(self.restconf_baseurl))
897 headers = {'content-type': 'application/json'}
898 response = requests.request(
899 "DELETE", url, headers=headers,
900 auth=('admin', 'admin'))
901 self.assertEqual(response.status_code, requests.codes.ok)
904 def test_40_rdmC_device_disconnected(self):
905 url = ("{}/config/network-topology:"
906 "network-topology/topology/topology-netconf/node/ROADM-C1"
907 .format(self.restconf_baseurl))
908 headers = {'content-type': 'application/json'}
909 response = requests.request(
910 "DELETE", url, headers=headers,
911 auth=('admin', 'admin'))
912 self.assertEqual(response.status_code, requests.codes.ok)
916 if __name__ == "__main__":
917 unittest.main(verbosity=2)