3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
23 class TransportOlmTesting(unittest.TestCase):
26 NODE_VERSION = '2.2.1'
30 cls.processes = test_utils.start_tpce()
31 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
32 ('roadma', cls.NODE_VERSION),
33 ('roadmc', cls.NODE_VERSION),
34 ('xpdrc', cls.NODE_VERSION)])
37 def tearDownClass(cls):
38 # pylint: disable=not-an-iterable
39 for process in cls.processes:
40 test_utils.shutdown_process(process)
41 print("all processes killed")
44 print("execution of {}".format(self.id().split(".")[-1]))
47 def test_01_xpdrA_device_connected(self):
48 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
49 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
51 def test_02_xpdrC_device_connected(self):
52 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
53 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
55 def test_03_rdmA_device_connected(self):
56 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
57 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
59 def test_04_rdmC_device_connected(self):
60 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
61 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
63 def test_05_connect_xprdA_to_roadmA(self):
64 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
65 "ROADM-A1", "1", "SRG1-PP1-TXRX")
66 self.assertEqual(response.status_code, requests.codes.ok)
68 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
70 def test_06_connect_roadmA_to_xpdrA(self):
71 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
72 "ROADM-A1", "1", "SRG1-PP1-TXRX")
73 self.assertEqual(response.status_code, requests.codes.ok)
75 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
77 def test_07_connect_xprdC_to_roadmC(self):
78 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
79 "ROADM-C1", "1", "SRG1-PP1-TXRX")
80 self.assertEqual(response.status_code, requests.codes.ok)
82 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
84 def test_08_connect_roadmC_to_xpdrC(self):
85 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
86 "ROADM-C1", "1", "SRG1-PP1-TXRX")
87 self.assertEqual(response.status_code, requests.codes.ok)
89 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
91 def test_09_create_OTS_ROADMA(self):
92 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
94 self.assertEqual(response.status_code, requests.codes.ok)
96 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
97 res["output"]["result"])
99 def test_10_create_OTS_ROADMC(self):
100 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
101 self.assertEqual(response.status_code, requests.codes.ok)
102 res = response.json()
103 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
104 res["output"]["result"])
106 def test_11_get_PM_ROADMA(self):
107 url = "{}/operations/transportpce-olm:get-pm"
110 "node-id": "ROADM-A1",
111 "resource-type": "interface",
112 "granularity": "15min",
113 "resource-identifier": {
114 "resource-name": "OTS-DEG2-TTP-TXRX"
118 response = test_utils.post_request(url, data)
119 self.assertEqual(response.status_code, requests.codes.ok)
120 res = response.json()
122 "pmparameter-name": "OpticalPowerOutput",
123 "pmparameter-value": "2.5"
124 }, res["output"]["measurements"])
126 "pmparameter-name": "OpticalReturnLoss",
127 "pmparameter-value": "40"
128 }, res["output"]["measurements"])
130 "pmparameter-name": "OpticalPowerInput",
131 "pmparameter-value": "-21.1"
132 }, res["output"]["measurements"])
134 def test_12_get_PM_ROADMC(self):
135 url = "{}/operations/transportpce-olm:get-pm"
138 "node-id": "ROADM-C1",
139 "resource-type": "interface",
140 "granularity": "15min",
141 "resource-identifier": {
142 "resource-name": "OTS-DEG1-TTP-TXRX"
146 response = test_utils.post_request(url, data)
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
150 "pmparameter-name": "OpticalPowerOutput",
151 "pmparameter-value": "4.6"
152 }, res["output"]["measurements"])
154 "pmparameter-name": "OpticalReturnLoss",
155 "pmparameter-value": "49.1"
156 }, res["output"]["measurements"])
158 "pmparameter-name": "OpticalPowerInput",
159 "pmparameter-value": "-15.1"
160 }, res["output"]["measurements"])
162 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
163 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
167 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
170 response = test_utils.post_request(url, data)
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
173 self.assertIn('Success',
174 res["output"]["result"])
177 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
178 }, res["output"]["spans"])
181 def test_14_calculate_span_loss_base_all(self):
182 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
188 response = test_utils.post_request(url, data)
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Success',
192 res["output"]["result"])
195 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
196 }, res["output"]["spans"])
199 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
200 }, res["output"]["spans"])
203 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
204 response = test_utils.check_netconf_node_request(
206 "interface/OTS-DEG2-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
207 self.assertEqual(response.status_code, requests.codes.ok)
208 res = response.json()
209 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
210 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
212 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
213 response = test_utils.check_netconf_node_request(
215 "interface/OTS-DEG1-TTP-TXRX/org-openroadm-optical-transport-interfaces:ots")
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 self.assertEqual(25.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit'])
219 self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
221 def test_17_servicePath_create_AToZ(self):
222 response = test_utils.service_path_request("create", "test", "1",
223 [{"node-id": "XPDR-A1",
224 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
225 {"node-id": "ROADM-A1",
226 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
227 {"node-id": "ROADM-C1",
228 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
229 {"node-id": "XPDR-C1",
230 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
231 196.1, 40, 196.075, 196.125, 761,
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
239 def test_18_servicePath_create_ZToA(self):
240 response = test_utils.service_path_request("create", "test", "1",
241 [{"node-id": "XPDR-C1",
242 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
243 {"node-id": "ROADM-C1",
244 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
245 {"node-id": "ROADM-A1",
246 "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
247 {"node-id": "XPDR-A1",
248 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
249 196.1, 40, 196.075, 196.125, 761,
251 self.assertEqual(response.status_code, requests.codes.ok)
252 res = response.json()
253 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
257 def test_19_service_power_setup_XPDRA_XPDRC(self):
258 url = "{}/operations/transportpce-olm:service-power-setup"
261 "service-name": "test",
265 "dest-tp": "XPDR1-NETWORK1",
266 "src-tp": "XPDR1-CLIENT1",
270 "dest-tp": "DEG2-TTP-TXRX",
271 "src-tp": "SRG1-PP1-TXRX",
272 "node-id": "ROADM-A1"
275 "dest-tp": "SRG1-PP1-TXRX",
276 "src-tp": "DEG1-TTP-TXRX",
277 "node-id": "ROADM-C1"
280 "dest-tp": "XPDR1-CLIENT1",
281 "src-tp": "XPDR1-NETWORK1",
285 "lower-spectral-slot-number": 761,
286 "higher-spectral-slot-number": 768
289 response = test_utils.post_request(url, data)
290 self.assertEqual(response.status_code, requests.codes.ok)
291 res = response.json()
292 self.assertIn('Success', res["output"]["result"])
294 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
295 response = test_utils.check_netconf_node_request(
297 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
298 self.assertEqual(response.status_code, requests.codes.ok)
299 res = response.json()
300 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
301 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
303 def test_21_get_roadmconnection_ROADMA(self):
304 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
305 self.assertEqual(response.status_code, requests.codes.ok)
306 res = response.json()
307 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
308 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
310 def test_22_get_roadmconnection_ROADMC(self):
311 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768")
312 self.assertEqual(response.status_code, requests.codes.ok)
313 res = response.json()
314 self.assertEqual("power", res['roadm-connections'][0]['opticalControlMode'])
316 def test_23_service_power_setup_XPDRC_XPDRA(self):
317 url = "{}/operations/transportpce-olm:service-power-setup"
320 "service-name": "test",
324 "dest-tp": "XPDR1-NETWORK1",
325 "src-tp": "XPDR1-CLIENT1",
329 "dest-tp": "DEG1-TTP-TXRX",
330 "src-tp": "SRG1-PP1-TXRX",
331 "node-id": "ROADM-C1"
334 "src-tp": "DEG2-TTP-TXRX",
335 "dest-tp": "SRG1-PP1-TXRX",
336 "node-id": "ROADM-A1"
339 "src-tp": "XPDR1-NETWORK1",
340 "dest-tp": "XPDR1-CLIENT1",
344 "lower-spectral-slot-number": 761,
345 "higher-spectral-slot-number": 768
348 response = test_utils.post_request(url, data)
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 self.assertIn('Success', res["output"]["result"])
353 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
354 response = test_utils.check_netconf_node_request(
356 "interface/XPDR1-NETWORK1-761:768/org-openroadm-optical-channel-interfaces:och")
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
360 self.assertEqual(196.1, res['org-openroadm-optical-channel-interfaces:och']['frequency'])
362 def test_25_get_roadmconnection_ROADMC(self):
363 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
366 self.assertEqual("gainLoss", res['roadm-connections'][0]['opticalControlMode'])
367 self.assertEqual(2.0, res['roadm-connections'][0]['target-output-power'])
369 def test_26_service_power_turndown_XPDRA_XPDRC(self):
370 url = "{}/operations/transportpce-olm:service-power-turndown"
373 "service-name": "test",
377 "dest-tp": "XPDR1-NETWORK1",
378 "src-tp": "XPDR1-CLIENT1",
382 "dest-tp": "DEG2-TTP-TXRX",
383 "src-tp": "SRG1-PP1-TXRX",
384 "node-id": "ROADM-A1"
387 "dest-tp": "SRG1-PP1-TXRX",
388 "src-tp": "DEG1-TTP-TXRX",
389 "node-id": "ROADM-C1"
392 "dest-tp": "XPDR1-CLIENT1",
393 "src-tp": "XPDR1-NETWORK1",
397 "lower-spectral-slot-number": 761,
398 "higher-spectral-slot-number": 768
401 response = test_utils.post_request(url, data)
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 self.assertIn('Success', res["output"]["result"])
406 def test_27_get_roadmconnection_ROADMA(self):
407 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
408 self.assertEqual(response.status_code, requests.codes.ok)
409 res = response.json()
410 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
411 self.assertEqual(-60, res['roadm-connections'][0]['target-output-power'])
413 def test_28_get_roadmconnection_ROADMC(self):
414 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768")
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
419 def test_29_servicePath_delete_AToZ(self):
420 response = test_utils.service_path_request("delete", "test", "1",
421 [{"node-id": "XPDR-A1",
422 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
423 {"node-id": "ROADM-A1",
424 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
425 {"node-id": "ROADM-C1",
426 "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
427 {"node-id": "XPDR-C1",
428 "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}],
429 196.1, 40, 196.075, 196.125, 761,
431 self.assertEqual(response.status_code, requests.codes.ok)
432 res = response.json()
433 self.assertIn('Request processed', res["output"]["result"])
436 def test_30_servicePath_delete_ZToA(self):
437 response = test_utils.service_path_request("delete", "test", "1",
438 [{"node-id": "XPDR-C1",
439 "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
440 {"node-id": "ROADM-C1",
441 "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
442 {"node-id": "ROADM-A1",
443 "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
444 {"node-id": "XPDR-A1",
445 "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}],
446 196.053125, 40, 196.025, 196.08125, 761,
448 self.assertEqual(response.status_code, requests.codes.ok)
449 res = response.json()
450 self.assertIn('Request processed', res["output"]["result"])
453 #"""to test case where SRG where the xpdr is connected to has no optical range data"""
455 def test_31_connect_xprdA_to_roadmA(self):
456 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
457 "ROADM-A1", "1", "SRG1-PP2-TXRX")
458 self.assertEqual(response.status_code, requests.codes.ok)
459 res = response.json()
460 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
462 def test_32_connect_roadmA_to_xpdrA(self):
463 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
464 "ROADM-A1", "1", "SRG1-PP2-TXRX")
465 self.assertEqual(response.status_code, requests.codes.ok)
466 res = response.json()
467 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
469 def test_33_servicePath_create_AToZ(self):
470 response = test_utils.service_path_request("create", "test2", "2",
471 [{"node-id": "XPDR-A1",
472 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
473 {"node-id": "ROADM-A1",
474 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
475 196.1, 40, 196.075, 196.125, 753,
477 self.assertEqual(response.status_code, requests.codes.ok)
478 res = response.json()
479 self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
483 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
484 response = test_utils.check_netconf_node_request(
486 "interface/XPDR1-NETWORK2-753:760/org-openroadm-optical-channel-interfaces:och")
487 self.assertEqual(response.status_code, requests.codes.ok)
488 res = response.json()
489 self.assertEqual(-5, res['org-openroadm-optical-channel-interfaces:och']['transmit-power'])
490 # self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
492 def test_35_servicePath_delete_AToZ(self):
493 response = test_utils.service_path_request("delete", "test", "1",
494 [{"node-id": "XPDR-A1",
495 "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
496 {"node-id": "ROADM-A1",
497 "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}],
498 196.053125, 40, 196.025, 196.08125, 761,
500 self.assertEqual(response.status_code, requests.codes.ok)
501 res = response.json()
502 self.assertIn('Request processed', res["output"]["result"])
505 def test_36_xpdrA_device_disconnected(self):
506 response = test_utils.unmount_device("XPDR-A1")
507 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
509 def test_37_xpdrC_device_disconnected(self):
510 response = test_utils.unmount_device("XPDR-C1")
511 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
513 def test_38_calculate_span_loss_current(self):
514 url = "{}/operations/transportpce-olm:calculate-spanloss-current"
515 response = test_utils.post_request(url, None)
516 self.assertEqual(response.status_code, requests.codes.ok)
517 res = response.json()
518 self.assertIn('Success',
519 res["output"]["result"])
522 def test_39_rdmA_device_disconnected(self):
523 response = test_utils.unmount_device("ROADM-A1")
524 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
526 def test_40_rdmC_device_disconnected(self):
527 response = test_utils.unmount_device("ROADM-C1")
528 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
531 if __name__ == "__main__":
532 unittest.main(verbosity=2)