3 #############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils_rfc8040 # nopep8
26 class TransportOlmTesting(unittest.TestCase):
29 NODE_VERSION = '2.2.1'
33 cls.processes = test_utils_rfc8040.start_tpce()
34 cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION),
35 ('roadma', cls.NODE_VERSION),
36 ('roadmc', cls.NODE_VERSION),
37 ('xpdrc', cls.NODE_VERSION)])
40 def tearDownClass(cls):
41 # pylint: disable=not-an-iterable
42 for process in cls.processes:
43 test_utils_rfc8040.shutdown_process(process)
44 print("all processes killed")
47 # pylint: disable=consider-using-f-string
48 print("execution of {}".format(self.id().split(".")[-1]))
51 def test_01_xpdrA_device_connected(self):
52 response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
53 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
55 def test_02_xpdrC_device_connected(self):
56 response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
57 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
59 def test_03_rdmA_device_connected(self):
60 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
61 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
63 def test_04_rdmC_device_connected(self):
64 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
65 self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
67 def test_05_connect_xprdA_to_roadmA(self):
68 response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
69 {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
70 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
71 self.assertEqual(response.status_code, requests.codes.ok)
73 def test_06_connect_roadmA_to_xpdrA(self):
74 response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
75 {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
76 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
77 self.assertEqual(response.status_code, requests.codes.ok)
79 def test_07_connect_xprdC_to_roadmC(self):
80 response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
81 {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
82 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
83 self.assertEqual(response.status_code, requests.codes.ok)
85 def test_08_connect_roadmC_to_xpdrC(self):
86 response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
87 {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
88 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'})
89 self.assertEqual(response.status_code, requests.codes.ok)
91 def test_09_create_OTS_ROADMA(self):
92 response = test_utils_rfc8040.device_renderer_create_ots_oms_request('ROADM-A1', 'DEG1-TTP-TXRX')
93 self.assertEqual(response.status_code, requests.codes.ok)
95 def test_10_create_OTS_ROADMC(self):
96 response = test_utils_rfc8040.device_renderer_create_ots_oms_request('ROADM-C1', 'DEG2-TTP-TXRX')
97 self.assertEqual(response.status_code, requests.codes.ok)
99 def test_11_get_PM_ROADMA(self):
100 response = test_utils_rfc8040.olm_get_pm_request({
101 'node-id': 'ROADM-A1',
102 'resource-type': 'interface',
103 'granularity': '15min',
104 'resource-identifier': {
105 'resource-name': 'OTS-DEG2-TTP-TXRX'
109 self.assertEqual(response['status_code'], requests.codes.ok)
111 'pmparameter-name': 'OpticalPowerOutput',
112 'pmparameter-value': '2.5'
113 }, response['output']['measurements'])
115 'pmparameter-name': 'OpticalReturnLoss',
116 'pmparameter-value': '40'
117 }, response['output']['measurements'])
119 'pmparameter-name': 'OpticalPowerInput',
120 'pmparameter-value': '-21.1'
121 }, response['output']['measurements'])
123 def test_12_get_PM_ROADMC(self):
124 response = test_utils_rfc8040.olm_get_pm_request({
125 'node-id': 'ROADM-C1',
126 'resource-type': 'interface',
127 'granularity': '15min',
128 'resource-identifier': {
129 'resource-name': 'OTS-DEG1-TTP-TXRX'
132 self.assertEqual(response['status_code'], requests.codes.ok)
134 'pmparameter-name': 'OpticalPowerOutput',
135 'pmparameter-value': '4.6'
136 }, response['output']['measurements'])
138 'pmparameter-name': 'OpticalReturnLoss',
139 'pmparameter-value': '49.1'
140 }, response['output']['measurements'])
142 'pmparameter-name': 'OpticalPowerInput',
143 'pmparameter-value': '-15.1'
144 }, response['output']['measurements'])
146 def test_13_calculate_span_loss_base_ROADMA_ROADMC(self):
147 response = test_utils_rfc8040.olm_calculate_spanloss_base_request({
149 'link-id': 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX'
151 self.assertEqual(response['status_code'], requests.codes.ok)
152 self.assertIn('Success',
153 response['output']['result'])
156 'link-id': 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX'
157 }, response['output']['spans'])
160 def test_14_calculate_span_loss_base_all(self):
161 response = test_utils_rfc8040.olm_calculate_spanloss_base_request({
164 self.assertEqual(response['status_code'], requests.codes.ok)
165 self.assertIn('Success',
166 response['output']['result'])
169 'link-id': 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX'
170 }, response['output']['spans'])
173 'link-id': 'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX'
174 }, response['output']['spans'])
177 def test_15_get_OTS_DEG2_TTP_TXRX_ROADMA(self):
178 response = test_utils_rfc8040.check_node_attribute2_request(
179 'ROADM-A1', 'interface', 'OTS-DEG2-TTP-TXRX', 'org-openroadm-optical-transport-interfaces:ots')
180 self.assertEqual(response['status_code'], requests.codes.ok)
181 self.assertEqual(float(response['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit']), 17.6)
182 self.assertEqual(float(response['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive']), 25.7)
184 def test_16_get_OTS_DEG1_TTP_TXRX_ROADMC(self):
185 response = test_utils_rfc8040.check_node_attribute2_request(
186 'ROADM-C1', 'interface', 'OTS-DEG1-TTP-TXRX', 'org-openroadm-optical-transport-interfaces:ots')
187 self.assertEqual(response['status_code'], requests.codes.ok)
188 self.assertEqual(float(response['org-openroadm-optical-transport-interfaces:ots']['span-loss-transmit']), 25.7)
189 self.assertEqual(float(response['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive']), 17.6)
191 def test_17_servicePath_create_AToZ(self):
192 response = test_utils_rfc8040.device_renderer_service_path_request({
193 'service-name': 'test',
195 'modulation-format': 'dp-qpsk',
196 'operation': 'create',
198 [{'node-id': 'XPDR-A1',
199 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
200 {'node-id': 'ROADM-A1',
201 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
202 {'node-id': 'ROADM-C1',
203 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG1-TTP-TXRX'},
204 {'node-id': 'XPDR-C1',
205 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
206 'center-freq': 196.1,
210 'lower-spectral-slot-number': 761,
211 'higher-spectral-slot-number': 768
213 self.assertEqual(response['status_code'], requests.codes.ok)
214 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
217 def test_18_servicePath_create_ZToA(self):
218 response = test_utils_rfc8040.device_renderer_service_path_request({
219 'service-name': 'test',
221 'modulation-format': 'dp-qpsk',
222 'operation': 'create',
224 [{'node-id': 'XPDR-C1',
225 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
226 {'node-id': 'ROADM-C1',
227 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
228 {'node-id': 'ROADM-A1',
229 'src-tp': 'DEG2-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
230 {'node-id': 'XPDR-A1',
231 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
232 'center-freq': 196.1,
236 'lower-spectral-slot-number': 761,
237 'higher-spectral-slot-number': 768
239 self.assertEqual(response['status_code'], requests.codes.ok)
240 self.assertIn('Interfaces created successfully for nodes: ', response['output']['result'])
243 def test_19_service_power_setup_XPDRA_XPDRC(self):
244 response = test_utils_rfc8040.olm_service_power_setup_request({
245 'service-name': 'test',
249 'dest-tp': 'XPDR1-NETWORK1',
250 'src-tp': 'XPDR1-CLIENT1',
254 'dest-tp': 'DEG2-TTP-TXRX',
255 'src-tp': 'SRG1-PP1-TXRX',
256 'node-id': 'ROADM-A1'
259 'dest-tp': 'SRG1-PP1-TXRX',
260 'src-tp': 'DEG1-TTP-TXRX',
261 'node-id': 'ROADM-C1'
264 'dest-tp': 'XPDR1-CLIENT1',
265 'src-tp': 'XPDR1-NETWORK1',
269 'lower-spectral-slot-number': 761,
270 'higher-spectral-slot-number': 768
272 self.assertEqual(response['status_code'], requests.codes.ok)
273 self.assertIn('Success', response['output']['result'])
275 def test_20_get_interface_XPDRA_XPDR1_NETWORK1(self):
276 response = test_utils_rfc8040.check_node_attribute2_request(
277 'XPDR-A1', 'interface', 'XPDR1-NETWORK1-761:768', 'org-openroadm-optical-channel-interfaces:och')
278 self.assertEqual(response['status_code'], requests.codes.ok)
279 self.assertEqual(float(response['org-openroadm-optical-channel-interfaces:och']['transmit-power']), -5)
280 self.assertEqual(float(response['org-openroadm-optical-channel-interfaces:och']['frequency']), 196.1)
282 def test_21_get_roadmconnection_ROADMA(self):
283 response = test_utils_rfc8040.check_node_attribute_request(
284 'ROADM-A1', 'roadm-connections', 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768')
285 self.assertEqual(response['status_code'], requests.codes.ok)
286 self.assertEqual("gainLoss", response['roadm-connections'][0]['opticalControlMode'])
287 self.assertEqual(float(response['roadm-connections'][0]['target-output-power']), 0.21)
289 def test_22_get_roadmconnection_ROADMC(self):
290 response = test_utils_rfc8040.check_node_attribute_request(
291 'ROADM-C1', 'roadm-connections', 'DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768')
292 self.assertEqual(response['status_code'], requests.codes.ok)
293 self.assertEqual("power", response['roadm-connections'][0]['opticalControlMode'])
295 def test_23_service_power_setup_XPDRC_XPDRA(self):
296 response = test_utils_rfc8040.olm_service_power_setup_request({
297 'service-name': 'test',
301 'dest-tp': 'XPDR1-NETWORK1',
302 'src-tp': 'XPDR1-CLIENT1',
306 'dest-tp': 'DEG1-TTP-TXRX',
307 'src-tp': 'SRG1-PP1-TXRX',
308 'node-id': 'ROADM-C1'
311 'src-tp': 'DEG2-TTP-TXRX',
312 'dest-tp': 'SRG1-PP1-TXRX',
313 'node-id': 'ROADM-A1'
316 'src-tp': 'XPDR1-NETWORK1',
317 'dest-tp': 'XPDR1-CLIENT1',
321 'lower-spectral-slot-number': 761,
322 'higher-spectral-slot-number': 768
324 self.assertEqual(response['status_code'], requests.codes.ok)
325 self.assertIn('Success', response['output']['result'])
327 def test_24_get_interface_XPDRC_XPDR1_NETWORK1(self):
328 response = test_utils_rfc8040.check_node_attribute2_request(
329 'XPDR-C1', 'interface', 'XPDR1-NETWORK1-761:768', 'org-openroadm-optical-channel-interfaces:och')
330 self.assertEqual(response['status_code'], requests.codes.ok)
331 self.assertEqual(float(response['org-openroadm-optical-channel-interfaces:och']['transmit-power']), -5)
332 self.assertEqual(float(response['org-openroadm-optical-channel-interfaces:och']['frequency']), 196.1)
334 def test_25_get_roadmconnection_ROADMC(self):
335 response = test_utils_rfc8040.check_node_attribute_request(
336 'ROADM-C1', 'roadm-connections', 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768')
337 self.assertEqual(response['status_code'], requests.codes.ok)
338 self.assertEqual("gainLoss", response['roadm-connections'][0]['opticalControlMode'])
339 self.assertEqual(float(response['roadm-connections'][0]['target-output-power']), 2.0)
341 def test_26_service_power_turndown_XPDRA_XPDRC(self):
342 response = test_utils_rfc8040.olm_service_power_turndown_request({
343 'service-name': 'test',
347 'dest-tp': 'XPDR1-NETWORK1',
348 'src-tp': 'XPDR1-CLIENT1',
352 'dest-tp': 'DEG2-TTP-TXRX',
353 'src-tp': 'SRG1-PP1-TXRX',
354 'node-id': 'ROADM-A1'
357 'dest-tp': 'SRG1-PP1-TXRX',
358 'src-tp': 'DEG1-TTP-TXRX',
359 'node-id': 'ROADM-C1'
362 'dest-tp': 'XPDR1-CLIENT1',
363 'src-tp': 'XPDR1-NETWORK1',
367 'lower-spectral-slot-number': 761,
368 'higher-spectral-slot-number': 768
370 self.assertEqual(response['status_code'], requests.codes.ok)
371 self.assertIn('Success', response['output']['result'])
373 def test_27_get_roadmconnection_ROADMA(self):
374 response = test_utils_rfc8040.check_node_attribute_request(
375 'ROADM-A1', 'roadm-connections', 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768')
376 self.assertEqual(response['status_code'], requests.codes.ok)
377 self.assertEqual("off", response['roadm-connections'][0]['opticalControlMode'])
378 self.assertEqual(float(response['roadm-connections'][0]['target-output-power']), -60)
380 def test_28_get_roadmconnection_ROADMC(self):
381 response = test_utils_rfc8040.check_node_attribute_request(
382 'ROADM-C1', 'roadm-connections', 'DEG1-TTP-TXRX-SRG1-PP1-TXRX-761:768')
383 self.assertEqual(response['status_code'], requests.codes.ok)
384 self.assertEqual("off", response['roadm-connections'][0]['opticalControlMode'])
386 def test_29_servicePath_delete_AToZ(self):
387 response = test_utils_rfc8040.device_renderer_service_path_request({
388 'service-name': 'test',
390 'modulation-format': 'dp-qpsk',
391 'operation': 'delete',
393 [{'node-id': 'XPDR-A1',
394 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
395 {'node-id': 'ROADM-A1',
396 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
397 {'node-id': 'ROADM-C1',
398 'dest-tp': 'SRG1-PP1-TXRX', 'src-tp': 'DEG1-TTP-TXRX'},
399 {'node-id': 'XPDR-C1',
400 'dest-tp': 'XPDR1-CLIENT1', 'src-tp': 'XPDR1-NETWORK1'}],
401 'center-freq': 196.1,
405 'lower-spectral-slot-number': 761,
406 'higher-spectral-slot-number': 768
408 self.assertEqual(response['status_code'], requests.codes.ok)
409 self.assertIn('Request processed', response['output']['result'])
412 def test_30_servicePath_delete_ZToA(self):
413 response = test_utils_rfc8040.device_renderer_service_path_request({
414 'service-name': 'test',
416 'modulation-format': 'dp-qpsk',
417 'operation': 'delete',
419 [{'node-id': 'XPDR-C1',
420 'dest-tp': 'XPDR1-NETWORK1', 'src-tp': 'XPDR1-CLIENT1'},
421 {'node-id': 'ROADM-C1',
422 'dest-tp': 'DEG1-TTP-TXRX', 'src-tp': 'SRG1-PP1-TXRX'},
423 {'node-id': 'ROADM-A1',
424 'src-tp': 'DEG2-TTP-TXRX', 'dest-tp': 'SRG1-PP1-TXRX'},
425 {'node-id': 'XPDR-A1',
426 'src-tp': 'XPDR1-NETWORK1', 'dest-tp': 'XPDR1-CLIENT1'}],
427 'center-freq': 196.1,
431 'lower-spectral-slot-number': 761,
432 'higher-spectral-slot-number': 768
434 self.assertEqual(response['status_code'], requests.codes.ok)
435 self.assertIn('Request processed', response['output']['result'])
438 #"""to test case where SRG where the xpdr is connected to has no optical range data"""
440 def test_31_connect_xprdA_to_roadmA(self):
441 response = test_utils_rfc8040.connect_xpdr_to_rdm_request(
442 {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '2',
443 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'})
444 self.assertEqual(response.status_code, requests.codes.ok)
446 def test_32_connect_roadmA_to_xpdrA(self):
447 response = test_utils_rfc8040.connect_rdm_to_xpdr_request(
448 {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '2',
449 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'})
450 self.assertEqual(response.status_code, requests.codes.ok)
452 def test_33_servicePath_create_AToZ(self):
453 response = test_utils_rfc8040.device_renderer_service_path_request({
454 'service-name': 'test2',
456 'modulation-format': 'dp-qpsk',
457 'operation': 'create',
459 [{'node-id': 'XPDR-A1',
460 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
461 {'node-id': 'ROADM-A1',
462 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
463 'center-freq': 196.1,
467 'lower-spectral-slot-number': 753,
468 'higher-spectral-slot-number': 760
470 self.assertEqual(response['status_code'], requests.codes.ok)
471 self.assertIn('Interfaces created successfully for nodes', response['output']['result'])
474 def test_34_get_interface_XPDRA_XPDR1_NETWORK2(self):
475 response = test_utils_rfc8040.check_node_attribute2_request(
476 'XPDR-A1', 'interface', 'XPDR1-NETWORK2-753:760', 'org-openroadm-optical-channel-interfaces:och')
477 self.assertEqual(response['status_code'], requests.codes.ok)
478 self.assertEqual(float(response['org-openroadm-optical-channel-interfaces:och']['transmit-power']), -5)
479 # self.assertEqual(2, response['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
481 def test_35_servicePath_delete_AToZ(self):
482 response = test_utils_rfc8040.device_renderer_service_path_request({
483 'service-name': 'test2',
485 'modulation-format': 'dp-qpsk',
486 'operation': 'delete',
488 [{'node-id': 'XPDR-A1',
489 'dest-tp': 'XPDR1-NETWORK2', 'src-tp': 'XPDR1-CLIENT2'},
490 {'node-id': 'ROADM-A1',
491 'dest-tp': 'DEG2-TTP-TXRX', 'src-tp': 'SRG1-PP2-TXRX'}],
492 'center-freq': 196.1,
496 'lower-spectral-slot-number': 753,
497 'higher-spectral-slot-number': 760
499 self.assertEqual(response['status_code'], requests.codes.ok)
500 self.assertIn('Request processed', response['output']['result'])
503 def test_36_xpdrA_device_disconnected(self):
504 response = test_utils_rfc8040.unmount_device("XPDR-A1")
505 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
507 def test_37_xpdrC_device_disconnected(self):
508 response = test_utils_rfc8040.unmount_device("XPDR-C1")
509 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
511 def test_38_calculate_span_loss_current(self):
512 response = test_utils_rfc8040.olm_calculate_spanloss_current_request()
513 self.assertEqual(response['status_code'], requests.codes.ok)
514 self.assertIn('Success',
515 response["output"]["result"])
518 def test_39_rdmA_device_disconnected(self):
519 response = test_utils_rfc8040.unmount_device("ROADM-A1")
520 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
522 def test_40_rdmC_device_disconnected(self):
523 response = test_utils_rfc8040.unmount_device("ROADM-C1")
524 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
527 if __name__ == "__main__":
528 unittest.main(verbosity=2)