3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportOlmTesting(unittest.TestCase):
29 NODE_VERSION = '2.2.1'
33 cls.processes = test_utils.start_tpce()
34 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
35 ('roadma', cls.NODE_VERSION),
36 ('roadmc', cls.NODE_VERSION),
37 ('xpdrc', cls.NODE_VERSION)])
40 def tearDownClass(cls):
41 # pylint: disable=not-an-iterable
42 for process in cls.processes:
43 test_utils.shutdown_process(process)
44 print("all processes killed")
47 print("execution of {}".format(self.id().split(".")[-1]))
50 def test_01_xpdrA_device_connected(self):
51 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
52 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
54 def test_02_xpdrC_device_connected(self):
55 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
56 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
58 def test_03_rdmA_device_connected(self):
59 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
60 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
62 def test_04_rdmC_device_connected(self):
63 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
64 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
66 def test_05_connect_xprdA_to_roadmA(self):
67 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
68 "ROADM-A1", "1", "SRG1-PP1-TXRX")
69 self.assertEqual(response.status_code, requests.codes.ok)
71 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
73 def test_06_connect_roadmA_to_xpdrA(self):
74 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
75 "ROADM-A1", "1", "SRG1-PP1-TXRX")
76 self.assertEqual(response.status_code, requests.codes.ok)
78 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
80 def test_07_connect_xprdC_to_roadmC(self):
81 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
82 "ROADM-C1", "1", "SRG1-PP1-TXRX")
83 self.assertEqual(response.status_code, requests.codes.ok)
85 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
87 def test_08_connect_roadmC_to_xpdrC(self):
88 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
89 "ROADM-C1", "1", "SRG1-PP1-TXRX")
90 self.assertEqual(response.status_code, requests.codes.ok)
92 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
94 def test_09_create_OTS_ROADMA(self):
95 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
97 self.assertEqual(response.status_code, requests.codes.ok)
99 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
100 res["output"]["result"])
102 def test_10_create_OTS_ROADMC(self):
103 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
104 self.assertEqual(response.status_code, requests.codes.ok)
105 res = response.json()
106 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
107 res["output"]["result"])
109 def test_11_get_PM_ROADMA(self):
110 url = "{}/operations/transportpce-olm:get-pm"
113 "node-id": "ROADM-A1",
114 "resource-type": "interface",
115 "granularity": "15min",
116 "resource-identifier": {
117 "resource-name": "OTS-DEG2-TTP-TXRX"
121 response = test_utils.post_request(url, data)
122 self.assertEqual(response.status_code, requests.codes.ok)
123 res = response.json()
125 "pmparameter-name": "OpticalPowerOutput",
126 "pmparameter-value": "2.5"
127 }, res["output"]["measurements"])
129 "pmparameter-name": "OpticalReturnLoss",
130 "pmparameter-value": "40"
131 }, res["output"]["measurements"])
133 "pmparameter-name": "OpticalPowerInput",
134 "pmparameter-value": "-21.1"
135 }, res["output"]["measurements"])
137 def test_12_get_PM_ROADMC(self):
138 url = "{}/operations/transportpce-olm:get-pm"
141 "node-id": "ROADM-C1",
142 "resource-type": "interface",
143 "granularity": "15min",
144 "resource-identifier": {
145 "resource-name": "OTS-DEG1-TTP-TXRX"
149 response = test_utils.post_request(url, data)
150 self.assertEqual(response.status_code, requests.codes.ok)
151 res = response.json()
153 "pmparameter-name": "OpticalPowerOutput",
154 "pmparameter-value": "4.6"
155 }, res["output"]["measurements"])
157 "pmparameter-name": "OpticalReturnLoss",
158 "pmparameter-value": "49.1"
159 }, res["output"]["measurements"])
161 "pmparameter-name": "OpticalPowerInput",
162 "pmparameter-value": "-15.1"
163 }, res["output"]["measurements"])
165 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
166 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
170 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
173 response = test_utils.post_request(url, data)
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Success',
177 res["output"]["result"])
180 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
181 }, res["output"]["spans"])
184 def test_14_calculate_span_loss_base_all(self):
185 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
191 response = test_utils.post_request(url, data)
192 self.assertEqual(response.status_code, requests.codes.ok)
193 res = response.json()
194 self.assertIn('Success',
195 res["output"]["result"])
198 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
199 }, res["output"]["spans"])
202 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
203 }, res["output"]["spans"])
206 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
207 response = test_utils.check_netconf_node_request(
209 "interface/OTS-DEG2-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
210 self.assertEqual(response.status_code, requests.codes.ok)
211 res = response.json()
212 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
213 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
215 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
216 response = test_utils.check_netconf_node_request(
218 "interface/OTS-DEG1-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
222 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
224 def test_17_servicePath_create_AToZ(self):
225 response = test_utils.service_path_request("create", "test", "1",
226 [{"node-id": "XPDR-A1",
227 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
228 {"node-id": "ROADM-A1",
229 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
230 {"node-id": "ROADM-C1",
231 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
232 {"node-id": "XPDR-C1",
233 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
234 196.1, 40, 196.075, 196.125, 761,
236 self.assertEqual(response.status_code, requests.codes.ok)
237 res = response.json()
238 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
242 def test_18_servicePath_create_ZToA(self):
243 response = test_utils.service_path_request("create", "test", "1",
244 [{"node-id": "XPDR-C1",
245 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
246 {"node-id": "ROADM-C1",
247 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
248 {"node-id": "ROADM-A1",
249 "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
250 {"node-id": "XPDR-A1",
251 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
252 196.1, 40, 196.075, 196.125, 761,
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
260 def test_19_service_power_setup_XPDRA_XPDRC(self):
261 url = "{}/operations/transportpce-olm:service-power-setup"
264 "service-name": "test",
268 "dest-tp": "XPDR1-NETWORK1",
269 "src-tp": "XPDR1-CLIENT1",
273 "dest-tp": "DEG2-TTP-TXRX",
274 "src-tp": "SRG1-PP1-TXRX",
275 "node-id": "ROADM-A1"
278 "dest-tp": "SRG1-PP1-TXRX",
279 "src-tp": "DEG1-TTP-TXRX",
280 "node-id": "ROADM-C1"
283 "dest-tp": "XPDR1-CLIENT1",
284 "src-tp": "XPDR1-NETWORK1",
288 "lower-spectral-slot-number": 761,
289 "higher-spectral-slot-number": 768
292 response = test_utils.post_request(url, data)
293 self.assertEqual(response.status_code, requests.codes.ok)
294 res = response.json()
295 self.assertIn('Success', res["output"]["result"])
297 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
298 response = test_utils.check_netconf_node_request(
300 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
301 self.assertEqual(response.status_code, requests.codes.ok)
302 res = response.json()
303 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
304 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
306 def test_21_get_roadmconnection_ROADMA(self):
307 response = test_utils.check_netconf_node_request(
308 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
309 self.assertEqual(response.status_code, requests.codes.ok)
310 res = response.json()
311 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
312 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
314 def test_22_get_roadmconnection_ROADMC(self):
315 response = test_utils.check_netconf_node_request(
316 "ROADM-C1", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768")
317 self.assertEqual(response.status_code, requests.codes.ok)
318 res = response.json()
319 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
321 def test_23_service_power_setup_XPDRC_XPDRA(self):
322 url = "{}/operations/transportpce-olm:service-power-setup"
325 "service-name": "test",
329 "dest-tp": "XPDR1-NETWORK1",
330 "src-tp": "XPDR1-CLIENT1",
334 "dest-tp": "DEG1-TTP-TXRX",
335 "src-tp": "SRG1-PP1-TXRX",
336 "node-id": "ROADM-C1"
339 "src-tp": "DEG2-TTP-TXRX",
340 "dest-tp": "SRG1-PP1-TXRX",
341 "node-id": "ROADM-A1"
344 "src-tp": "XPDR1-NETWORK1",
345 "dest-tp": "XPDR1-CLIENT1",
349 "lower-spectral-slot-number": 761,
350 "higher-spectral-slot-number": 768
353 response = test_utils.post_request(url, data)
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertIn('Success', res["output"]["result"])
358 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
359 response = test_utils.check_netconf_node_request(
361 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
362 self.assertEqual(response.status_code, requests.codes.ok)
363 res = response.json()
364 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
365 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
367 def test_25_get_roadmconnection_ROADMC(self):
368 response = test_utils.check_netconf_node_request(
369 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
370 self.assertEqual(response.status_code, requests.codes.ok)
371 res = response.json()
372 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
373 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
375 def test_26_service_power_turndown_XPDRA_XPDRC(self):
376 url = "{}/operations/transportpce-olm:service-power-turndown"
379 "service-name": "test",
383 "dest-tp": "XPDR1-NETWORK1",
384 "src-tp": "XPDR1-CLIENT1",
388 "dest-tp": "DEG2-TTP-TXRX",
389 "src-tp": "SRG1-PP1-TXRX",
390 "node-id": "ROADM-A1"
393 "dest-tp": "SRG1-PP1-TXRX",
394 "src-tp": "DEG1-TTP-TXRX",
395 "node-id": "ROADM-C1"
398 "dest-tp": "XPDR1-CLIENT1",
399 "src-tp": "XPDR1-NETWORK1",
403 "lower-spectral-slot-number": 761,
404 "higher-spectral-slot-number": 768
407 response = test_utils.post_request(url, data)
408 self.assertEqual(response.status_code, requests.codes.ok)
409 res = response.json()
410 self.assertIn('Success', res["output"]["result"])
412 def test_27_get_roadmconnection_ROADMA(self):
413 response = test_utils.check_netconf_node_request(
414 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
418 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
420 def test_28_get_roadmconnection_ROADMC(self):
421 response = test_utils.check_netconf_node_request(
422 "ROADM-C1", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768")
423 self.assertEqual(response.status_code, requests.codes.ok)
424 res = response.json()
425 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
427 def test_29_servicePath_delete_AToZ(self):
428 response = test_utils.service_path_request("delete", "test", "1",
429 [{"node-id": "XPDR-A1",
430 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
431 {"node-id": "ROADM-A1",
432 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
433 {"node-id": "ROADM-C1",
434 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
435 {"node-id": "XPDR-C1",
436 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
437 196.1, 40, 196.075, 196.125, 761,
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 self.assertIn('Request processed', res["output"]["result"])
444 def test_30_servicePath_delete_ZToA(self):
445 response = test_utils.service_path_request("delete", "test", "1",
446 [{"node-id": "XPDR-C1",
447 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
448 {"node-id": "ROADM-C1",
449 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
450 {"node-id": "ROADM-A1",
451 "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
452 {"node-id": "XPDR-A1",
453 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
454 196.053125, 40, 196.025, 196.08125, 761,
456 self.assertEqual(response.status_code, requests.codes.ok)
457 res = response.json()
458 self.assertIn('Request processed', res["output"]["result"])
461 #"""to test case where SRG where the xpdr is connected to has no optical range data"""
463 def test_31_connect_xprdA_to_roadmA(self):
464 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
465 "ROADM-A1", "1", "SRG1-PP2-TXRX")
466 self.assertEqual(response.status_code, requests.codes.ok)
467 res = response.json()
468 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
470 def test_32_connect_roadmA_to_xpdrA(self):
471 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
472 "ROADM-A1", "1", "SRG1-PP2-TXRX")
473 self.assertEqual(response.status_code, requests.codes.ok)
474 res = response.json()
475 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
477 def test_33_servicePath_create_AToZ(self):
478 response = test_utils.service_path_request("create", "test2", "2",
479 [{"node-id": "XPDR-A1",
480 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
481 {"node-id": "ROADM-A1",
482 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
483 196.1, 40, 196.075, 196.125, 753,
485 self.assertEqual(response.status_code, requests.codes.ok)
486 res = response.json()
487 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
491 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
492 response = test_utils.check_netconf_node_request(
494 "interface/XPDR1-NETWORK2-753:760/org-openroadm-optical-channel-interfaces:och")
495 self.assertEqual(response.status_code, requests.codes.ok)
496 res = response.json()
497 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
498 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
500 def test_35_servicePath_delete_AToZ(self):
501 response = test_utils.service_path_request("delete", "test", "1",
502 [{"node-id": "XPDR-A1",
503 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
504 {"node-id": "ROADM-A1",
505 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
506 196.053125, 40, 196.025, 196.08125, 761,
508 self.assertEqual(response.status_code, requests.codes.ok)
509 res = response.json()
510 self.assertIn('Request processed', res["output"]["result"])
513 def test_36_xpdrA_device_disconnected(self):
514 response = test_utils.unmount_device("XPDR-A1")
515 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
517 def test_37_xpdrC_device_disconnected(self):
518 response = test_utils.unmount_device("XPDR-C1")
519 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
521 def test_38_calculate_span_loss_current(self):
522 url = "{}/operations/transportpce-olm:calculate-spanloss-current"
523 response = test_utils.post_request(url, None)
524 self.assertEqual(response.status_code, requests.codes.ok)
525 res = response.json()
526 self.assertIn('Success',
527 res["output"]["result"])
530 def test_39_rdmA_device_disconnected(self):
531 response = test_utils.unmount_device("ROADM-A1")
532 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
534 def test_40_rdmC_device_disconnected(self):
535 response = test_utils.unmount_device("ROADM-C1")
536 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
539 if __name__ == "__main__":
540 unittest.main(verbosity=2)