3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
16 from common import test_utils
19 class TransportOlmTesting(unittest.TestCase):
25 cls.processes = test_utils.start_tpce()
26 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
29 def tearDownClass(cls):
30 for process in cls.processes:
31 test_utils.shutdown_process(process)
32 print("all processes killed")
35 print("execution of {}".format(self.id().split(".")[-1]))
38 def test_01_xpdrA_device_connected(self):
39 response = test_utils.mount_device("XPDR-A1", 'xpdra')
40 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 def test_02_xpdrC_device_connected(self):
43 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
44 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
46 def test_03_rdmA_device_connected(self):
47 response = test_utils.mount_device("ROADM-A1", 'roadma')
48 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
50 def test_04_rdmC_device_connected(self):
51 response = test_utils.mount_device("ROADM-C1", 'roadmc')
52 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
54 def test_05_connect_xprdA_to_roadmA(self):
55 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
56 "ROADM-A1", "1", "SRG1-PP1-TXRX")
57 self.assertEqual(response.status_code, requests.codes.ok)
59 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
61 def test_06_connect_roadmA_to_xpdrA(self):
62 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
63 "ROADM-A1", "1", "SRG1-PP1-TXRX")
64 self.assertEqual(response.status_code, requests.codes.ok)
66 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
68 def test_07_connect_xprdC_to_roadmC(self):
69 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
70 "ROADM-C1", "1", "SRG1-PP1-TXRX")
71 self.assertEqual(response.status_code, requests.codes.ok)
73 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
75 def test_08_connect_roadmC_to_xpdrC(self):
76 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
77 "ROADM-C1", "1", "SRG1-PP1-TXRX")
78 self.assertEqual(response.status_code, requests.codes.ok)
80 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
82 def test_09_create_OTS_ROADMA(self):
83 url = "{}/operations/transportpce-device-renderer:create-ots-oms"
86 "node-id": "ROADM-A1",
87 "logical-connection-point": "DEG1-TTP-TXRX"
90 response = test_utils.post_request(url, data)
92 self.assertEqual(response.status_code, requests.codes.ok)
94 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
95 res["output"]["result"])
97 def test_10_create_OTS_ROADMC(self):
98 url = "{}/operations/transportpce-device-renderer:create-ots-oms"
101 "node-id": "ROADM-C1",
102 "logical-connection-point": "DEG2-TTP-TXRX"
105 response = test_utils.post_request(url, data)
106 self.assertEqual(response.status_code, requests.codes.ok)
107 res = response.json()
108 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
109 res["output"]["result"])
111 def test_11_get_PM_ROADMA(self):
112 url = "{}/operations/transportpce-olm:get-pm"
115 "node-id": "ROADM-A1",
116 "resource-type": "interface",
117 "granularity": "15min",
118 "resource-identifier": {
119 "resource-name": "OTS-DEG2-TTP-TXRX"
123 response = test_utils.post_request(url, data)
124 self.assertEqual(response.status_code, requests.codes.ok)
125 res = response.json()
127 "pmparameter-name": "OpticalPowerOutput",
128 "pmparameter-value": "2.5"
129 }, res["output"]["measurements"])
131 "pmparameter-name": "OpticalReturnLoss",
132 "pmparameter-value": "40"
133 }, res["output"]["measurements"])
135 "pmparameter-name": "OpticalPowerInput",
136 "pmparameter-value": "-21.1"
137 }, res["output"]["measurements"])
139 def test_12_get_PM_ROADMC(self):
140 url = "{}/operations/transportpce-olm:get-pm"
143 "node-id": "ROADM-C1",
144 "resource-type": "interface",
145 "granularity": "15min",
146 "resource-identifier": {
147 "resource-name": "OTS-DEG1-TTP-TXRX"
151 response = test_utils.post_request(url, data)
152 self.assertEqual(response.status_code, requests.codes.ok)
153 res = response.json()
155 "pmparameter-name": "OpticalPowerOutput",
156 "pmparameter-value": "4.6"
157 }, res["output"]["measurements"])
159 "pmparameter-name": "OpticalReturnLoss",
160 "pmparameter-value": "49.1"
161 }, res["output"]["measurements"])
163 "pmparameter-name": "OpticalPowerInput",
164 "pmparameter-value": "-15.1"
165 }, res["output"]["measurements"])
167 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
168 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
172 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
175 response = test_utils.post_request(url, data)
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
178 self.assertIn('Success',
179 res["output"]["result"])
182 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
183 }, res["output"]["spans"])
186 def test_14_calculate_span_loss_base_all(self):
187 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
193 response = test_utils.post_request(url, data)
194 self.assertEqual(response.status_code, requests.codes.ok)
195 res = response.json()
196 self.assertIn('Success',
197 res["output"]["result"])
200 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
201 }, res["output"]["spans"])
204 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
205 }, res["output"]["spans"])
208 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
209 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
210 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG2-TTP-TXRX/"
211 "org-openroadm-optical-transport-interfaces:ots")
212 response = test_utils.get_request(url)
213 self.assertEqual(response.status_code, requests.codes.ok)
214 res = response.json()
215 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
216 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
218 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
219 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
220 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/interface/OTS-DEG1-TTP-TXRX/"
221 "org-openroadm-optical-transport-interfaces:ots")
222 response = test_utils.get_request(url)
223 self.assertEqual(response.status_code, requests.codes.ok)
224 res = response.json()
225 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
226 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
228 def test_17_servicePath_create_AToZ(self):
229 url = "{}/operations/transportpce-device-renderer:service-path"
232 "service-name": "test",
234 "modulation-format": "qpsk",
235 "operation": "create",
238 "dest-tp": "XPDR1-NETWORK1",
239 "src-tp": "XPDR1-CLIENT1",
243 "dest-tp": "DEG2-TTP-TXRX",
244 "src-tp": "SRG1-PP1-TXRX",
245 "node-id": "ROADM-A1"
248 "dest-tp": "SRG1-PP1-TXRX",
249 "src-tp": "DEG1-TTP-TXRX",
250 "node-id": "ROADM-C1"
253 "dest-tp": "XPDR1-CLIENT1",
254 "src-tp": "XPDR1-NETWORK1",
260 response = test_utils.post_request(url, data)
261 self.assertEqual(response.status_code, requests.codes.ok)
262 res = response.json()
263 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
267 def test_18_servicePath_create_ZToA(self):
268 url = "{}/operations/transportpce-device-renderer:service-path"
271 "service-name": "test",
273 "modulation-format": "qpsk",
274 "operation": "create",
277 "dest-tp": "XPDR1-NETWORK1",
278 "src-tp": "XPDR1-CLIENT1",
282 "dest-tp": "DEG1-TTP-TXRX",
283 "src-tp": "SRG1-PP1-TXRX",
284 "node-id": "ROADM-C1"
287 "src-tp": "DEG2-TTP-TXRX",
288 "dest-tp": "SRG1-PP1-TXRX",
289 "node-id": "ROADM-A1"
292 "src-tp": "XPDR1-NETWORK1",
293 "dest-tp": "XPDR1-CLIENT1",
299 response = test_utils.post_request(url, data)
300 self.assertEqual(response.status_code, requests.codes.ok)
301 res = response.json()
302 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
306 def test_19_service_power_setup_XPDRA_XPDRC(self):
307 url = "{}/operations/transportpce-olm:service-power-setup"
310 "service-name": "test",
314 "dest-tp": "XPDR1-NETWORK1",
315 "src-tp": "XPDR1-CLIENT1",
319 "dest-tp": "DEG2-TTP-TXRX",
320 "src-tp": "SRG1-PP1-TXRX",
321 "node-id": "ROADM-A1"
324 "dest-tp": "SRG1-PP1-TXRX",
325 "src-tp": "DEG1-TTP-TXRX",
326 "node-id": "ROADM-C1"
329 "dest-tp": "XPDR1-CLIENT1",
330 "src-tp": "XPDR1-NETWORK1",
336 response = test_utils.post_request(url, data)
337 self.assertEqual(response.status_code, requests.codes.ok)
338 res = response.json()
339 self.assertIn('Success', res["output"]["result"])
341 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
342 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
343 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
344 "org-openroadm-optical-channel-interfaces:och")
345 response = test_utils.get_request(url)
346 self.assertEqual(response.status_code, requests.codes.ok)
347 res = response.json()
348 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
349 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
351 def test_21_get_roadmconnection_ROADMA(self):
352 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
353 "org-openroadm-device:org-openroadm-device/roadm-connections/"
354 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
355 response = test_utils.get_request(url)
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
359 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
361 def test_22_get_roadmconnection_ROADMC(self):
362 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
363 "org-openroadm-device:org-openroadm-device/roadm-connections/"
364 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1")
365 response = test_utils.get_request(url)
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
370 def test_23_service_power_setup_XPDRC_XPDRA(self):
371 url = "{}/operations/transportpce-olm:service-power-setup"
374 "service-name": "test",
378 "dest-tp": "XPDR1-NETWORK1",
379 "src-tp": "XPDR1-CLIENT1",
383 "dest-tp": "DEG1-TTP-TXRX",
384 "src-tp": "SRG1-PP1-TXRX",
385 "node-id": "ROADM-C1"
388 "src-tp": "DEG2-TTP-TXRX",
389 "dest-tp": "SRG1-PP1-TXRX",
390 "node-id": "ROADM-A1"
393 "src-tp": "XPDR1-NETWORK1",
394 "dest-tp": "XPDR1-CLIENT1",
400 response = test_utils.post_request(url, data)
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
403 self.assertIn('Success', res["output"]["result"])
405 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
406 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-C1/yang-ext:mount/"
407 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK1-1/"
408 "org-openroadm-optical-channel-interfaces:och")
409 response = test_utils.get_request(url)
410 self.assertEqual(response.status_code, requests.codes.ok)
411 res = response.json()
412 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
413 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
415 def test_25_get_roadmconnection_ROADMC(self):
416 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
417 "org-openroadm-device:org-openroadm-device/roadm-connections/"
418 "SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
419 response = test_utils.get_request(url)
420 self.assertEqual(response.status_code, requests.codes.ok)
421 res = response.json()
422 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
423 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
425 def test_26_service_power_turndown_XPDRA_XPDRC(self):
426 url = "{}/operations/transportpce-olm:service-power-turndown"
429 "service-name": "test",
433 "dest-tp": "XPDR1-NETWORK1",
434 "src-tp": "XPDR1-CLIENT1",
438 "dest-tp": "DEG2-TTP-TXRX",
439 "src-tp": "SRG1-PP1-TXRX",
440 "node-id": "ROADM-A1"
443 "dest-tp": "SRG1-PP1-TXRX",
444 "src-tp": "DEG1-TTP-TXRX",
445 "node-id": "ROADM-C1"
448 "dest-tp": "XPDR1-CLIENT1",
449 "src-tp": "XPDR1-NETWORK1",
455 response = test_utils.post_request(url, data)
456 self.assertEqual(response.status_code, requests.codes.ok)
457 res = response.json()
458 self.assertIn('Success', res["output"]["result"])
460 def test_27_get_roadmconnection_ROADMA(self):
461 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1/yang-ext:mount/"
462 "org-openroadm-device:org-openroadm-device/roadm-connections/"
463 "SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
464 response = test_utils.get_request(url)
465 self.assertEqual(response.status_code, requests.codes.ok)
466 res = response.json()
467 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
468 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
470 def test_28_get_roadmconnection_ROADMC(self):
471 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-C1/yang-ext:mount/"
472 "org-openroadm-device:org-openroadm-device/roadm-connections/"
473 "DEG1-TTP-TXRX-SRG1-PP1-TXRX-1")
474 response = test_utils.get_request(url)
475 self.assertEqual(response.status_code, requests.codes.ok)
476 res = response.json()
477 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
479 def test_29_servicePath_delete_AToZ(self):
480 url = "{}/operations/transportpce-device-renderer:service-path"
483 "service-name": "test",
485 "modulation-format": "qpsk",
486 "operation": "delete",
489 "dest-tp": "XPDR1-NETWORK1",
490 "src-tp": "XPDR1-CLIENT1",
494 "dest-tp": "DEG2-TTP-TXRX",
495 "src-tp": "SRG1-PP1-TXRX",
496 "node-id": "ROADM-A1"
499 "dest-tp": "SRG1-PP1-TXRX",
500 "src-tp": "DEG1-TTP-TXRX",
501 "node-id": "ROADM-C1"
504 "dest-tp": "XPDR1-CLIENT1",
505 "src-tp": "XPDR1-NETWORK1",
511 response = test_utils.post_request(url, data)
512 self.assertEqual(response.status_code, requests.codes.ok)
513 res = response.json()
514 self.assertIn('Request processed', res["output"]["result"])
517 def test_30_servicePath_delete_ZToA(self):
518 url = "{}/operations/transportpce-device-renderer:service-path"
521 "service-name": "test",
523 "modulation-format": "qpsk",
524 "operation": "delete",
527 "dest-tp": "XPDR1-NETWORK1",
528 "src-tp": "XPDR1-CLIENT1",
532 "dest-tp": "DEG1-TTP-TXRX",
533 "src-tp": "SRG1-PP1-TXRX",
534 "node-id": "ROADM-C1"
537 "src-tp": "DEG2-TTP-TXRX",
538 "dest-tp": "SRG1-PP1-TXRX",
539 "node-id": "ROADM-A1"
542 "src-tp": "XPDR1-NETWORK1",
543 "dest-tp": "XPDR1-CLIENT1",
549 response = test_utils.post_request(url, data)
550 self.assertEqual(response.status_code, requests.codes.ok)
551 res = response.json()
552 self.assertIn('Request processed', res["output"]["result"])
555 """to test case where SRG where the xpdr is connected to has no optical range data"""
557 def test_31_connect_xprdA_to_roadmA(self):
558 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
559 "ROADM-A1", "1", "SRG1-PP2-TXRX")
560 self.assertEqual(response.status_code, requests.codes.ok)
561 res = response.json()
562 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
564 def test_32_connect_roadmA_to_xpdrA(self):
565 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
566 "ROADM-A1", "1", "SRG1-PP2-TXRX")
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
571 def test_33_servicePath_create_AToZ(self):
572 url = "{}/operations/transportpce-device-renderer:service-path"
575 "service-name": "test2",
577 "modulation-format": "qpsk",
578 "operation": "create",
581 "dest-tp": "XPDR1-NETWORK2",
582 "src-tp": "XPDR1-CLIENT2",
586 "dest-tp": "DEG2-TTP-TXRX",
587 "src-tp": "SRG1-PP2-TXRX",
588 "node-id": "ROADM-A1"
593 response = test_utils.post_request(url, data)
594 self.assertEqual(response.status_code, requests.codes.ok)
595 res = response.json()
596 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
600 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
601 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/XPDR-A1/yang-ext:mount/"
602 "org-openroadm-device:org-openroadm-device/interface/XPDR1-NETWORK2-2/"
603 "org-openroadm-optical-channel-interfaces:och")
604 response = test_utils.get_request(url)
605 self.assertEqual(response.status_code, requests.codes.ok)
606 res = response.json()
607 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
608 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
610 def test_35_servicePath_delete_AToZ(self):
611 url = "{}/operations/transportpce-device-renderer:service-path"
614 "service-name": "test",
616 "modulation-format": "qpsk",
617 "operation": "delete",
620 "dest-tp": "XPDR1-NETWORK2",
621 "src-tp": "XPDR1-CLIENT2",
625 "dest-tp": "DEG2-TTP-TXRX",
626 "src-tp": "SRG1-PP2-TXRX",
627 "node-id": "ROADM-A1"
632 response = test_utils.post_request(url, data)
633 self.assertEqual(response.status_code, requests.codes.ok)
634 res = response.json()
635 self.assertIn('Request processed', res["output"]["result"])
638 def test_36_xpdrA_device_disconnected(self):
639 response = test_utils.unmount_device("XPDR-A1")
640 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
642 def test_37_xpdrC_device_disconnected(self):
643 response = test_utils.unmount_device("XPDR-C1")
644 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
646 def test_38_calculate_span_loss_current(self):
647 url = "{}/operations/transportpce-olm:calculate-spanloss-current"
648 response = test_utils.post_request(url, None)
649 self.assertEqual(response.status_code, requests.codes.ok)
650 res = response.json()
651 self.assertIn('Success',
652 res["output"]["result"])
655 def test_39_rdmA_device_disconnected(self):
656 response = test_utils.unmount_device("ROADM-A1")
657 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
659 def test_40_rdmC_device_disconnected(self):
660 response = test_utils.unmount_device("ROADM-C1")
661 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
664 if __name__ == "__main__":
665 unittest.main(verbosity=2)