3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportOlmTesting(unittest.TestCase):
29 NODE_VERSION = '2.2.1'
33 cls.processes = test_utils.start_tpce()
34 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
35 ('roadma', cls.NODE_VERSION),
36 ('roadmc', cls.NODE_VERSION),
37 ('xpdrc', cls.NODE_VERSION)])
40 def tearDownClass(cls):
41 # pylint: disable=not-an-iterable
42 for process in cls.processes:
43 test_utils.shutdown_process(process)
44 print("all processes killed")
47 # pylint: disable=consider-using-f-string
48 print("execution of {}".format(self.id().split(".")[-1]))
51 def test_01_xpdrA_device_connected(self):
52 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
53 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
55 def test_02_xpdrC_device_connected(self):
56 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
57 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
59 def test_03_rdmA_device_connected(self):
60 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
61 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
63 def test_04_rdmC_device_connected(self):
64 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
65 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
67 def test_05_connect_xprdA_to_roadmA(self):
68 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
69 "ROADM-A1", "1", "SRG1-PP1-TXRX")
70 self.assertEqual(response.status_code, requests.codes.ok)
72 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
74 def test_06_connect_roadmA_to_xpdrA(self):
75 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
76 "ROADM-A1", "1", "SRG1-PP1-TXRX")
77 self.assertEqual(response.status_code, requests.codes.ok)
79 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
81 def test_07_connect_xprdC_to_roadmC(self):
82 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
83 "ROADM-C1", "1", "SRG1-PP1-TXRX")
84 self.assertEqual(response.status_code, requests.codes.ok)
86 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
88 def test_08_connect_roadmC_to_xpdrC(self):
89 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
90 "ROADM-C1", "1", "SRG1-PP1-TXRX")
91 self.assertEqual(response.status_code, requests.codes.ok)
93 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
95 def test_09_create_OTS_ROADMA(self):
96 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
98 self.assertEqual(response.status_code, requests.codes.ok)
100 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
101 res["output"]["result"])
103 def test_10_create_OTS_ROADMC(self):
104 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
105 self.assertEqual(response.status_code, requests.codes.ok)
106 res = response.json()
107 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
108 res["output"]["result"])
110 def test_11_get_PM_ROADMA(self):
111 url = "{}/operations/transportpce-olm:get-pm"
114 "node-id": "ROADM-A1",
115 "resource-type": "interface",
116 "granularity": "15min",
117 "resource-identifier": {
118 "resource-name": "OTS-DEG2-TTP-TXRX"
122 response = test_utils.post_request(url, data)
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
126 "pmparameter-name": "OpticalPowerOutput",
127 "pmparameter-value": "2.5"
128 }, res["output"]["measurements"])
130 "pmparameter-name": "OpticalReturnLoss",
131 "pmparameter-value": "40"
132 }, res["output"]["measurements"])
134 "pmparameter-name": "OpticalPowerInput",
135 "pmparameter-value": "-21.1"
136 }, res["output"]["measurements"])
138 def test_12_get_PM_ROADMC(self):
139 url = "{}/operations/transportpce-olm:get-pm"
142 "node-id": "ROADM-C1",
143 "resource-type": "interface",
144 "granularity": "15min",
145 "resource-identifier": {
146 "resource-name": "OTS-DEG1-TTP-TXRX"
150 response = test_utils.post_request(url, data)
151 self.assertEqual(response.status_code, requests.codes.ok)
152 res = response.json()
154 "pmparameter-name": "OpticalPowerOutput",
155 "pmparameter-value": "4.6"
156 }, res["output"]["measurements"])
158 "pmparameter-name": "OpticalReturnLoss",
159 "pmparameter-value": "49.1"
160 }, res["output"]["measurements"])
162 "pmparameter-name": "OpticalPowerInput",
163 "pmparameter-value": "-15.1"
164 }, res["output"]["measurements"])
166 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
167 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
171 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
174 response = test_utils.post_request(url, data)
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Success',
178 res["output"]["result"])
181 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
182 }, res["output"]["spans"])
185 def test_14_calculate_span_loss_base_all(self):
186 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
192 response = test_utils.post_request(url, data)
193 self.assertEqual(response.status_code, requests.codes.ok)
194 res = response.json()
195 self.assertIn('Success',
196 res["output"]["result"])
199 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
200 }, res["output"]["spans"])
203 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
204 }, res["output"]["spans"])
207 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
208 response = test_utils.check_netconf_node_request(
210 "interface/OTS-DEG2-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
213 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
214 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
216 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
217 response = test_utils.check_netconf_node_request(
219 "interface/OTS-DEG1-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
222 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
223 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
225 def test_17_servicePath_create_AToZ(self):
226 response = test_utils.service_path_request("create", "test", "1",
227 [{"node-id": "XPDR-A1",
228 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
229 {"node-id": "ROADM-A1",
230 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
231 {"node-id": "ROADM-C1",
232 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
233 {"node-id": "XPDR-C1",
234 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
235 196.1, 40, 196.075, 196.125, 761,
237 self.assertEqual(response.status_code, requests.codes.ok)
238 res = response.json()
239 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
243 def test_18_servicePath_create_ZToA(self):
244 response = test_utils.service_path_request("create", "test", "1",
245 [{"node-id": "XPDR-C1",
246 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
247 {"node-id": "ROADM-C1",
248 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
249 {"node-id": "ROADM-A1",
250 "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
251 {"node-id": "XPDR-A1",
252 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
253 196.1, 40, 196.075, 196.125, 761,
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
261 def test_19_service_power_setup_XPDRA_XPDRC(self):
262 url = "{}/operations/transportpce-olm:service-power-setup"
265 "service-name": "test",
269 "dest-tp": "XPDR1-NETWORK1",
270 "src-tp": "XPDR1-CLIENT1",
274 "dest-tp": "DEG2-TTP-TXRX",
275 "src-tp": "SRG1-PP1-TXRX",
276 "node-id": "ROADM-A1"
279 "dest-tp": "SRG1-PP1-TXRX",
280 "src-tp": "DEG1-TTP-TXRX",
281 "node-id": "ROADM-C1"
284 "dest-tp": "XPDR1-CLIENT1",
285 "src-tp": "XPDR1-NETWORK1",
289 "lower-spectral-slot-number": 761,
290 "higher-spectral-slot-number": 768
293 response = test_utils.post_request(url, data)
294 self.assertEqual(response.status_code, requests.codes.ok)
295 res = response.json()
296 self.assertIn('Success', res["output"]["result"])
298 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
299 response = test_utils.check_netconf_node_request(
301 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
302 self.assertEqual(response.status_code, requests.codes.ok)
303 res = response.json()
304 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
305 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
307 def test_21_get_roadmconnection_ROADMA(self):
308 response = test_utils.check_netconf_node_request(
309 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
313 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
315 def test_22_get_roadmconnection_ROADMC(self):
316 response = test_utils.check_netconf_node_request(
317 "ROADM-C1", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768")
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
320 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
322 def test_23_service_power_setup_XPDRC_XPDRA(self):
323 url = "{}/operations/transportpce-olm:service-power-setup"
326 "service-name": "test",
330 "dest-tp": "XPDR1-NETWORK1",
331 "src-tp": "XPDR1-CLIENT1",
335 "dest-tp": "DEG1-TTP-TXRX",
336 "src-tp": "SRG1-PP1-TXRX",
337 "node-id": "ROADM-C1"
340 "src-tp": "DEG2-TTP-TXRX",
341 "dest-tp": "SRG1-PP1-TXRX",
342 "node-id": "ROADM-A1"
345 "src-tp": "XPDR1-NETWORK1",
346 "dest-tp": "XPDR1-CLIENT1",
350 "lower-spectral-slot-number": 761,
351 "higher-spectral-slot-number": 768
354 response = test_utils.post_request(url, data)
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 self.assertIn('Success', res["output"]["result"])
359 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
360 response = test_utils.check_netconf_node_request(
362 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
363 self.assertEqual(response.status_code, requests.codes.ok)
364 res = response.json()
365 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
366 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
368 def test_25_get_roadmconnection_ROADMC(self):
369 response = test_utils.check_netconf_node_request(
370 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
371 self.assertEqual(response.status_code, requests.codes.ok)
372 res = response.json()
373 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
374 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
376 def test_26_service_power_turndown_XPDRA_XPDRC(self):
377 url = "{}/operations/transportpce-olm:service-power-turndown"
380 "service-name": "test",
384 "dest-tp": "XPDR1-NETWORK1",
385 "src-tp": "XPDR1-CLIENT1",
389 "dest-tp": "DEG2-TTP-TXRX",
390 "src-tp": "SRG1-PP1-TXRX",
391 "node-id": "ROADM-A1"
394 "dest-tp": "SRG1-PP1-TXRX",
395 "src-tp": "DEG1-TTP-TXRX",
396 "node-id": "ROADM-C1"
399 "dest-tp": "XPDR1-CLIENT1",
400 "src-tp": "XPDR1-NETWORK1",
404 "lower-spectral-slot-number": 761,
405 "higher-spectral-slot-number": 768
408 response = test_utils.post_request(url, data)
409 self.assertEqual(response.status_code, requests.codes.ok)
410 res = response.json()
411 self.assertIn('Success', res["output"]["result"])
413 def test_27_get_roadmconnection_ROADMA(self):
414 response = test_utils.check_netconf_node_request(
415 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
419 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
421 def test_28_get_roadmconnection_ROADMC(self):
422 response = test_utils.check_netconf_node_request(
423 "ROADM-C1", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768")
424 self.assertEqual(response.status_code, requests.codes.ok)
425 res = response.json()
426 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
428 def test_29_servicePath_delete_AToZ(self):
429 response = test_utils.service_path_request("delete", "test", "1",
430 [{"node-id": "XPDR-A1",
431 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
432 {"node-id": "ROADM-A1",
433 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
434 {"node-id": "ROADM-C1",
435 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
436 {"node-id": "XPDR-C1",
437 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
438 196.1, 40, 196.075, 196.125, 761,
440 self.assertEqual(response.status_code, requests.codes.ok)
441 res = response.json()
442 self.assertIn('Request processed', res["output"]["result"])
445 def test_30_servicePath_delete_ZToA(self):
446 response = test_utils.service_path_request("delete", "test", "1",
447 [{"node-id": "XPDR-C1",
448 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
449 {"node-id": "ROADM-C1",
450 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
451 {"node-id": "ROADM-A1",
452 "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
453 {"node-id": "XPDR-A1",
454 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
455 196.053125, 40, 196.025, 196.08125, 761,
457 self.assertEqual(response.status_code, requests.codes.ok)
458 res = response.json()
459 self.assertIn('Request processed', res["output"]["result"])
462 #"""to test case where SRG where the xpdr is connected to has no optical range data"""
464 def test_31_connect_xprdA_to_roadmA(self):
465 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
466 "ROADM-A1", "1", "SRG1-PP2-TXRX")
467 self.assertEqual(response.status_code, requests.codes.ok)
468 res = response.json()
469 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
471 def test_32_connect_roadmA_to_xpdrA(self):
472 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
473 "ROADM-A1", "1", "SRG1-PP2-TXRX")
474 self.assertEqual(response.status_code, requests.codes.ok)
475 res = response.json()
476 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
478 def test_33_servicePath_create_AToZ(self):
479 response = test_utils.service_path_request("create", "test2", "2",
480 [{"node-id": "XPDR-A1",
481 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
482 {"node-id": "ROADM-A1",
483 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
484 196.1, 40, 196.075, 196.125, 753,
486 self.assertEqual(response.status_code, requests.codes.ok)
487 res = response.json()
488 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
492 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
493 response = test_utils.check_netconf_node_request(
495 "interface/XPDR1-NETWORK2-753:760/org-openroadm-optical-channel-interfaces:och")
496 self.assertEqual(response.status_code, requests.codes.ok)
497 res = response.json()
498 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
499 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
501 def test_35_servicePath_delete_AToZ(self):
502 response = test_utils.service_path_request("delete", "test", "1",
503 [{"node-id": "XPDR-A1",
504 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
505 {"node-id": "ROADM-A1",
506 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
507 196.053125, 40, 196.025, 196.08125, 761,
509 self.assertEqual(response.status_code, requests.codes.ok)
510 res = response.json()
511 self.assertIn('Request processed', res["output"]["result"])
514 def test_36_xpdrA_device_disconnected(self):
515 response = test_utils.unmount_device("XPDR-A1")
516 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
518 def test_37_xpdrC_device_disconnected(self):
519 response = test_utils.unmount_device("XPDR-C1")
520 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
522 def test_38_calculate_span_loss_current(self):
523 url = "{}/operations/transportpce-olm:calculate-spanloss-current"
524 response = test_utils.post_request(url, None)
525 self.assertEqual(response.status_code, requests.codes.ok)
526 res = response.json()
527 self.assertIn('Success',
528 res["output"]["result"])
531 def test_39_rdmA_device_disconnected(self):
532 response = test_utils.unmount_device("ROADM-A1")
533 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
535 def test_40_rdmC_device_disconnected(self):
536 response = test_utils.unmount_device("ROADM-C1")
537 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
540 if __name__ == "__main__":
541 unittest.main(verbosity=2)