3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
23 class TransportOlmTesting(unittest.TestCase):
26 NODE_VERSION = '1.2.1'
30 cls.processes = test_utils.start_tpce()
31 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
32 ('roadma-full', cls.NODE_VERSION),
33 ('roadmc-full', cls.NODE_VERSION),
34 ('xpdrc', cls.NODE_VERSION)])
37 def tearDownClass(cls):
38 # pylint: disable=not-an-iterable
39 for process in cls.processes:
40 test_utils.shutdown_process(process)
41 print("all processes killed")
44 print("execution of {}".format(self.id().split(".")[-1]))
47 def test_01_xpdrA_device_connected(self):
48 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
49 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
51 def test_02_xpdrC_device_connected(self):
52 response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
53 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
55 def test_03_rdmA_device_connected(self):
56 response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
57 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
59 def test_04_rdmC_device_connected(self):
60 response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
61 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
63 def test_05_connect_xprdA_to_roadmA(self):
64 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
65 "ROADMA01", "1", "SRG1-PP1-TXRX")
66 self.assertEqual(response.status_code, requests.codes.ok)
68 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
70 def test_06_connect_roadmA_to_xpdrA(self):
71 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
72 "ROADMA01", "1", "SRG1-PP1-TXRX")
73 self.assertEqual(response.status_code, requests.codes.ok)
75 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
77 def test_07_connect_xprdC_to_roadmC(self):
78 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
79 "ROADMC01", "1", "SRG1-PP1-TXRX")
80 self.assertEqual(response.status_code, requests.codes.ok)
82 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
84 def test_08_connect_roadmC_to_xpdrC(self):
85 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
86 "ROADMC01", "1", "SRG1-PP1-TXRX")
87 self.assertEqual(response.status_code, requests.codes.ok)
89 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
91 def test_09_create_OTS_ROADMA(self):
92 response = test_utils.create_ots_oms_request("ROADMA01", "DEG1-TTP-TXRX")
93 self.assertEqual(response.status_code, requests.codes.ok)
95 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA01',
96 res["output"]["result"])
98 def test_10_create_OTS_ROADMC(self):
99 response = test_utils.create_ots_oms_request("ROADMC01", "DEG2-TTP-TXRX")
100 self.assertEqual(response.status_code, requests.codes.ok)
101 res = response.json()
102 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC01',
103 res["output"]["result"])
105 def test_11_get_PM_ROADMA(self):
106 url = "{}/operations/transportpce-olm:get-pm"
109 "node-id": "ROADMA01",
110 "resource-type": "interface",
111 "granularity": "15min",
112 "resource-identifier": {
113 "resource-name": "OTS-DEG1-TTP-TXRX"
117 response = test_utils.post_request(url, data)
118 self.assertEqual(response.status_code, requests.codes.ok)
119 res = response.json()
121 "pmparameter-name": "OpticalPowerOutput",
122 "pmparameter-value": "2.5"
123 }, res["output"]["measurements"])
125 "pmparameter-name": "OpticalReturnLoss",
126 "pmparameter-value": "49.9"
127 }, res["output"]["measurements"])
129 "pmparameter-name": "OpticalPowerInput",
130 "pmparameter-value": "3"
131 }, res["output"]["measurements"])
133 def test_12_get_PM_ROADMC(self):
134 url = "{}/operations/transportpce-olm:get-pm"
137 "node-id": "ROADMC01",
138 "resource-type": "interface",
139 "granularity": "15min",
140 "resource-identifier": {
141 "resource-name": "OTS-DEG2-TTP-TXRX"
145 response = test_utils.post_request(url, data)
146 self.assertEqual(response.status_code, requests.codes.ok)
147 res = response.json()
149 "pmparameter-name": "OpticalPowerOutput",
150 "pmparameter-value": "18.1"
151 }, res["output"]["measurements"])
153 "pmparameter-name": "OpticalReturnLoss",
154 "pmparameter-value": "48.8"
155 }, res["output"]["measurements"])
157 "pmparameter-name": "OpticalPowerInput",
158 "pmparameter-value": "-3.2"
159 }, res["output"]["measurements"])
161 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
162 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
166 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
169 response = test_utils.post_request(url, data)
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('Success',
173 res["output"]["result"])
176 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
177 }, res["output"]["spans"])
180 def test_14_calculate_span_loss_base_all(self):
181 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
187 response = test_utils.post_request(url, data)
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Success',
191 res["output"]["result"])
194 "link-id": "ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX"
195 }, res["output"]["spans"])
198 "link-id": "ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX"
199 }, res["output"]["spans"])
202 def test_15_get_OTS_DEG1_TTP_TXRX_ROADMA(self):
203 response = test_utils.check_netconf_node_request(
205 "interface/OTS-DEG1-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
208 self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
209 self.assertEqual(15.1, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
211 def test_16_get_OTS_DEG2_TTP_TXRX_ROADMC(self):
212 response = test_utils.check_netconf_node_request(
214 "interface/OTS-DEG2-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
215 self.assertEqual(response.status_code, requests.codes.ok)
216 res = response.json()
217 self.assertEqual(15.1, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
218 self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
220 def test_17_servicePath_create_AToZ(self):
221 response = test_utils.service_path_request("create", "test", "1",
222 [{"node-id": "XPDRA01",
223 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
224 {"node-id": "ROADMA01",
225 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
226 {"node-id": "ROADMC01",
227 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG2-TTP-TXRX"},
228 {"node-id": "XPDRC01",
229 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
230 196.1, 40, 196.075, 196.125, 761,
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
234 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
238 def test_18_servicePath_create_ZToA(self):
239 response = test_utils.service_path_request("create", "test", "1",
240 [{"node-id": "XPDRC01",
241 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
242 {"node-id": "ROADMC01",
243 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
244 {"node-id": "ROADMA01",
245 "src-tp": "DEG1-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
246 {"node-id": "XPDRA01",
247 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
248 196.1, 40, 196.075, 196.125, 761,
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
252 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
256 def test_19_service_power_setup_XPDRA_XPDRC(self):
257 url = "{}/operations/transportpce-olm:service-power-setup"
260 "service-name": "test",
264 "dest-tp": "XPDR1-NETWORK1",
265 "src-tp": "XPDR1-CLIENT1",
269 "dest-tp": "DEG1-TTP-TXRX",
270 "src-tp": "SRG1-PP1-TXRX",
271 "node-id": "ROADMA01"
274 "dest-tp": "SRG1-PP1-TXRX",
275 "src-tp": "DEG2-TTP-TXRX",
276 "node-id": "ROADMC01"
279 "dest-tp": "XPDR1-CLIENT1",
280 "src-tp": "XPDR1-NETWORK1",
284 "center-freq": 196.1,
288 "lower-spectral-slot-number": 761,
289 "higher-spectral-slot-number": 768
292 response = test_utils.post_request(url, data)
293 self.assertEqual(response.status_code, requests.codes.ok)
294 res = response.json()
295 self.assertIn('Success', res["output"]["result"])
297 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
298 response = test_utils.check_netconf_node_request(
300 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
301 self.assertEqual(response.status_code, requests.codes.ok)
302 res = response.json()
303 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
304 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
306 def test_21_get_roadmconnection_ROADMA(self):
307 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
311 self.assertEqual(-3.3, res['roadm-connections'][0]['target-output-power'])
313 def test_22_get_roadmconnection_ROADMC(self):
314 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP1-TXRX-761:768")
315 self.assertEqual(response.status_code, requests.codes.ok)
316 res = response.json()
317 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
319 def test_23_service_power_setup_XPDRC_XPDRA(self):
320 url = "{}/operations/transportpce-olm:service-power-setup"
323 "service-name": "test",
327 "dest-tp": "XPDR1-NETWORK1",
328 "src-tp": "XPDR1-CLIENT1",
332 "dest-tp": "DEG2-TTP-TXRX",
333 "src-tp": "SRG1-PP1-TXRX",
334 "node-id": "ROADMC01"
337 "src-tp": "DEG1-TTP-TXRX",
338 "dest-tp": "SRG1-PP1-TXRX",
339 "node-id": "ROADMA01"
342 "src-tp": "XPDR1-NETWORK1",
343 "dest-tp": "XPDR1-CLIENT1",
347 "center-freq": 196.1,
351 "lower-spectral-slot-number": 761,
352 "higher-spectral-slot-number": 768
355 response = test_utils.post_request(url, data)
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertIn('Success', res["output"]["result"])
360 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
361 response = test_utils.check_netconf_node_request(
363 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
366 self.assertEqual(0, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
367 self.assertEqual(1, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
369 def test_25_get_roadmconnection_ROADMC(self):
370 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
371 self.assertEqual(response.status_code, requests.codes.ok)
372 res = response.json()
373 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
374 self.assertEqual(2, res['roadm-connections'][0]['target-output-power'])
376 def test_26_service_power_turndown_XPDRA_XPDRC(self):
377 url = "{}/operations/transportpce-olm:service-power-turndown"
380 "service-name": "test",
384 "dest-tp": "XPDR1-NETWORK1",
385 "src-tp": "XPDR1-CLIENT1",
389 "dest-tp": "DEG1-TTP-TXRX",
390 "src-tp": "SRG1-PP1-TXRX",
391 "node-id": "ROADMA01"
394 "dest-tp": "SRG1-PP1-TXRX",
395 "src-tp": "DEG2-TTP-TXRX",
396 "node-id": "ROADMC01"
399 "dest-tp": "XPDR1-CLIENT1",
400 "src-tp": "XPDR1-NETWORK1",
404 "center-freq": 196.1,
408 "lower-spectral-slot-number": 761,
409 "higher-spectral-slot-number": 768
412 response = test_utils.post_request(url, data)
413 print(response.json())
414 self.assertEqual(response.status_code, requests.codes.ok)
415 res = response.json()
416 self.assertIn('Success', res["output"]["result"])
418 def test_27_get_roadmconnection_ROADMA(self):
419 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
420 self.assertEqual(response.status_code, requests.codes.ok)
421 res = response.json()
422 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
423 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
425 def test_28_get_roadmconnection_ROADMC(self):
426 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP1-TXRX-761:768")
427 self.assertEqual(response.status_code, requests.codes.ok)
428 res = response.json()
429 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
431 def test_29_servicePath_delete_AToZ(self):
432 response = test_utils.service_path_request("delete", "test", "1",
433 [{"node-id": "XPDRA01",
434 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
435 {"node-id": "ROADMA01",
436 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
437 {"node-id": "ROADMC01",
438 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG2-TTP-TXRX"},
439 {"node-id": "XPDRC01",
440 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
441 196.1, 40, 196.075, 196.125, 761,
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 self.assertIn('Request processed', res["output"]["result"])
448 def test_30_servicePath_delete_ZToA(self):
449 response = test_utils.service_path_request("delete", "test", "1",
450 [{"node-id": "XPDRC01",
451 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
452 {"node-id": "ROADMC01",
453 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
454 {"node-id": "ROADMA01",
455 "src-tp": "DEG1-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
456 {"node-id": "XPDRA01",
457 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
458 196.1, 40, 196.075, 196.125, 761,
460 self.assertEqual(response.status_code, requests.codes.ok)
461 res = response.json()
462 self.assertIn('Request processed', res["output"]["result"])
465 #"""to test case where SRG where the xpdr is connected to has no optical range data"""
467 def test_31_connect_xprdA_to_roadmA(self):
468 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
469 "ROADMA01", "1", "SRG1-PP2-TXRX")
470 self.assertEqual(response.status_code, requests.codes.ok)
471 res = response.json()
472 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
474 def test_32_connect_roadmA_to_xpdrA(self):
475 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
476 "ROADMA01", "1", "SRG1-PP2-TXRX")
477 self.assertEqual(response.status_code, requests.codes.ok)
478 res = response.json()
479 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
481 def test_33_servicePath_create_AToZ(self):
482 response = test_utils.service_path_request("create", "test2", "2",
483 [{"node-id": "XPDRA01",
484 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
485 {"node-id": "ROADMA01",
486 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
487 196.05, 40, 196.025, 196.075, 753,
489 self.assertEqual(response.status_code, requests.codes.ok)
490 res = response.json()
491 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
495 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
496 response = test_utils.check_netconf_node_request(
498 "interface/XPDR1-NETWORK2-753:760/org-openroadm-optical-channel-interfaces:och")
499 self.assertEqual(response.status_code, requests.codes.ok)
500 res = response.json()
501 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
502 self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
504 def test_35_servicePath_delete_AToZ(self):
505 response = test_utils.service_path_request("delete", "test", "1",
506 [{"node-id": "XPDRA01",
507 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
508 {"node-id": "ROADMA01",
509 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
510 196.1, 40, 196.075, 196.125, 761,
512 self.assertEqual(response.status_code, requests.codes.ok)
513 res = response.json()
514 self.assertIn('Request processed', res["output"]["result"])
517 def test_36_xpdrA_device_disconnected(self):
518 response = test_utils.unmount_device("XPDRA01")
519 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
521 def test_37_xpdrC_device_disconnected(self):
522 response = test_utils.unmount_device("XPDRC01")
523 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
525 def test_38_calculate_span_loss_current(self):
526 url = "{}/operations/transportpce-olm:calculate-spanloss-current"
527 response = test_utils.post_request(url, None)
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 self.assertIn('Success',
531 res["output"]["result"])
534 def test_39_rdmA_device_disconnected(self):
535 response = test_utils.unmount_device("ROADMA01")
536 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
538 def test_40_rdmC_device_disconnected(self):
539 response = test_utils.unmount_device("ROADMC01")
540 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
543 if __name__ == "__main__":
544 unittest.main(verbosity=2)