2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
27 cr_serv_input_data = {
28 "sdnc-request-header": {
29 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
30 "rpc-action": "service-create",
31 "request-system-id": "appname",
33 "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
43 "tx-direction": [{"index": 0}],
44 "rx-direction": [{"index": 0}],
48 "service-rate": "100",
50 "service-format": "Ethernet",
51 "clli": "SNJSCAMCJT4",
52 "tx-direction": [{"index": 0}],
53 "rx-direction": [{"index": 0}],
56 "due-date": "2016-11-28T00:00:01Z",
57 "operator-contact": "pw1234"
59 del_serv_input_data = {
60 "sdnc-request-header": {
61 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
62 "rpc-action": "service-delete",
63 "request-system-id": "appname",
64 "notification-url": "http://localhost:8585/NotificationServer/notify"},
65 "service-delete-req-info": {
66 "service-name": "TBD",
67 "tail-retention": "no"}
71 NODE_VERSION = '1.2.1'
75 cls.processes = test_utils.start_tpce()
76 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
77 ('roadma-full', cls.NODE_VERSION),
78 ('roadmc-full', cls.NODE_VERSION),
79 ('xpdrc', cls.NODE_VERSION)])
82 def tearDownClass(cls):
83 # pylint: disable=not-an-iterable
84 for process in cls.processes:
85 test_utils.shutdown_process(process)
86 print("all processes killed")
88 def setUp(self): # instruction executed before each test method
89 # pylint: disable=consider-using-f-string
90 print("execution of {}".format(self.id().split(".")[-1]))
92 # connect netconf devices
93 def test_01_connect_xpdrA(self):
94 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
95 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
97 def test_02_connect_xpdrC(self):
98 response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
99 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
101 def test_03_connect_rdmA(self):
102 response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
103 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
105 def test_04_connect_rdmC(self):
106 response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
107 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
109 def test_05_connect_xpdrA_N1_to_roadmA_PP1(self):
110 response = test_utils.transportpce_api_rpc_request(
111 'transportpce-networkutils', 'init-xpdr-rdm-links',
112 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
113 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
114 self.assertEqual(response['status_code'], requests.codes.ok)
115 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
118 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
119 response = test_utils.transportpce_api_rpc_request(
120 'transportpce-networkutils', 'init-rdm-xpdr-links',
121 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '1',
122 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
123 self.assertEqual(response['status_code'], requests.codes.ok)
124 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
127 def test_07_connect_xpdrC_N1_to_roadmC_PP1(self):
128 response = test_utils.transportpce_api_rpc_request(
129 'transportpce-networkutils', 'init-xpdr-rdm-links',
130 {'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
131 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
132 self.assertEqual(response['status_code'], requests.codes.ok)
133 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
136 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
137 response = test_utils.transportpce_api_rpc_request(
138 'transportpce-networkutils', 'init-rdm-xpdr-links',
139 {'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '1',
140 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
141 self.assertEqual(response['status_code'], requests.codes.ok)
142 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
145 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
146 # Config ROADMA-ROADMC oms-attributes
148 "auto-spanloss": "true",
149 "spanloss-base": 11.4,
150 "spanloss-current": 12,
151 "engineered-spanloss": 12.2,
152 "link-concatenation": [{
155 "SRLG-length": 100000,
157 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX",
159 self.assertEqual(response.status_code, requests.codes.created)
161 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
162 # Config ROADMC-ROADMA oms-attributes
164 "auto-spanloss": "true",
165 "spanloss-base": 11.4,
166 "spanloss-current": 12,
167 "engineered-spanloss": 12.2,
168 "link-concatenation": [{
171 "SRLG-length": 100000,
173 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX",
175 self.assertEqual(response.status_code, requests.codes.created)
177 # test service-create for Eth service from xpdr to xpdr
179 def test_11_create_eth_service1(self):
180 self.cr_serv_input_data["service-name"] = "service1"
181 response = test_utils.transportpce_api_rpc_request(
182 'org-openroadm-service', 'service-create',
183 self.cr_serv_input_data)
184 self.assertEqual(response['status_code'], requests.codes.ok)
185 self.assertIn('PCE calculation in progress',
186 response['output']['configuration-response-common']['response-message'])
187 time.sleep(self.WAITING)
189 def test_12_get_eth_service1(self):
190 response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
191 self.assertEqual(response['status_code'], requests.codes.ok)
192 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
193 self.assertEqual(response['services'][0]['service-name'], 'service1')
194 self.assertEqual(response['services'][0]['connection-type'], 'service')
195 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
198 def test_13_check_xc1_ROADMA(self):
199 response = test_utils.check_node_attribute_request(
200 "ROADMA01", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
201 self.assertEqual(response['status_code'], requests.codes.ok)
202 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
203 self.assertDictEqual(
205 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
206 'wavelength-number': 1,
207 'opticalControlMode': 'gainLoss'
208 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
209 self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-761:768'}, response['roadm-connections'][0]['source'])
210 self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-761:768'}, response['roadm-connections'][0]['destination'])
213 def test_14_check_xc1_ROADMC(self):
214 response = test_utils.check_node_attribute_request(
215 "ROADMC01", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
216 self.assertEqual(response['status_code'], requests.codes.ok)
217 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
218 self.assertDictEqual(
220 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
221 'wavelength-number': 1,
222 'opticalControlMode': 'gainLoss'
223 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
224 self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-761:768'}, response['roadm-connections'][0]['source'])
225 self.assertDictEqual({'dst-if': 'DEG2-TTP-TXRX-761:768'}, response['roadm-connections'][0]['destination'])
228 def test_15_check_topo_XPDRA(self):
229 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
230 self.assertEqual(response['status_code'], requests.codes.ok)
231 liste_tp = response['node']['ietf-network-topology:termination-point']
233 if ele['tp-id'] == 'XPDR1-NETWORK1':
236 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
239 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
240 elif ele['tp-id'] in ('XPDR1-CLIENT2', 'XPDR1-CLIENT1'):
241 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
242 elif ele['tp-id'] == 'XPDR1-NETWORK2':
243 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
246 def test_16_check_topo_ROADMA_SRG1(self):
247 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
248 self.assertEqual(response['status_code'], requests.codes.ok)
249 freq_map = base64.b64decode(
250 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
251 freq_map_array = [int(x) for x in freq_map]
252 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
253 liste_tp = response['node']['ietf-network-topology:termination-point']
255 if ele['tp-id'] == 'SRG1-PP1-TXRX':
256 freq_map = base64.b64decode(
257 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
258 freq_map_array = [int(x) for x in freq_map]
259 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
260 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
261 self.assertNotIn('avail-freq-maps', dict.keys(ele))
264 def test_17_check_topo_ROADMA_DEG1(self):
265 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
266 self.assertEqual(response['status_code'], requests.codes.ok)
267 freq_map = base64.b64decode(
268 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
269 freq_map_array = [int(x) for x in freq_map]
270 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
271 liste_tp = response['node']['ietf-network-topology:termination-point']
273 if ele['tp-id'] == 'DEG2-CTP-TXRX':
274 freq_map = base64.b64decode(
275 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
276 freq_map_array = [int(x) for x in freq_map]
277 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
278 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
279 freq_map = base64.b64decode(
280 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
281 freq_map_array = [int(x) for x in freq_map]
282 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
285 def test_18_connect_xpdrA_N2_to_roadmA_PP2(self):
286 response = test_utils.transportpce_api_rpc_request(
287 'transportpce-networkutils', 'init-xpdr-rdm-links',
288 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
289 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
290 self.assertEqual(response['status_code'], requests.codes.ok)
291 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
294 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
295 response = test_utils.transportpce_api_rpc_request(
296 'transportpce-networkutils', 'init-rdm-xpdr-links',
297 {'links-input': {'xpdr-node': 'XPDRA01', 'xpdr-num': '1', 'network-num': '2',
298 'rdm-node': 'ROADMA01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
299 self.assertEqual(response['status_code'], requests.codes.ok)
300 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
303 def test_20_connect_xpdrC_N2_to_roadmC_PP2(self):
304 response = test_utils.transportpce_api_rpc_request(
305 'transportpce-networkutils', 'init-xpdr-rdm-links',
306 {'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '2',
307 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
308 self.assertEqual(response['status_code'], requests.codes.ok)
309 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
312 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
313 response = test_utils.transportpce_api_rpc_request(
314 'transportpce-networkutils', 'init-rdm-xpdr-links',
315 {'links-input': {'xpdr-node': 'XPDRC01', 'xpdr-num': '1', 'network-num': '2',
316 'rdm-node': 'ROADMC01', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
317 self.assertEqual(response['status_code'], requests.codes.ok)
318 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
321 def test_22_create_eth_service2(self):
322 self.cr_serv_input_data["service-name"] = "service2"
323 response = test_utils.transportpce_api_rpc_request(
324 'org-openroadm-service', 'service-create',
325 self.cr_serv_input_data)
326 self.assertEqual(response['status_code'], requests.codes.ok)
327 self.assertIn('PCE calculation in progress',
328 response['output']['configuration-response-common']['response-message'])
329 time.sleep(self.WAITING)
331 def test_23_get_eth_service2(self):
332 response = test_utils.get_ordm_serv_list_attr_request("services", "service2")
333 self.assertEqual(response['status_code'], requests.codes.ok)
334 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
335 self.assertEqual(response['services'][0]['service-name'], 'service2')
336 self.assertEqual(response['services'][0]['connection-type'], 'service')
337 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
340 def test_24_check_xc2_ROADMA(self):
341 response = test_utils.check_node_attribute_request(
342 "ROADMA01", "roadm-connections", "DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760")
343 self.assertEqual(response['status_code'], requests.codes.ok)
344 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
345 self.assertDictEqual(
347 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760',
348 'wavelength-number': 2,
349 'opticalControlMode': 'power'
350 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
351 self.assertDictEqual({'src-if': 'DEG1-TTP-TXRX-753:760'}, response['roadm-connections'][0]['source'])
352 self.assertDictEqual({'dst-if': 'SRG1-PP2-TXRX-753:760'}, response['roadm-connections'][0]['destination'])
355 def test_25_check_topo_XPDRA(self):
356 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
357 self.assertEqual(response['status_code'], requests.codes.ok)
358 liste_tp = response['node']['ietf-network-topology:termination-point']
360 if ele['tp-id'] == 'XPDR1-NETWORK1':
363 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
366 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
367 elif ele['tp-id'] == 'XPDR1-NETWORK2':
370 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
373 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
374 elif ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT2'):
375 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
378 def test_26_check_topo_ROADMA_SRG1(self):
379 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
380 self.assertEqual(response['status_code'], requests.codes.ok)
381 freq_map = base64.b64decode(
382 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
383 freq_map_array = [int(x) for x in freq_map]
384 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
385 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
386 liste_tp = response['node']['ietf-network-topology:termination-point']
388 if ele['tp-id'] == 'SRG1-PP1-TXRX':
389 freq_map = base64.b64decode(
390 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
391 freq_map_array = [int(x) for x in freq_map]
392 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
393 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
394 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
395 freq_map = base64.b64decode(
396 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
397 freq_map_array = [int(x) for x in freq_map]
398 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
399 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
400 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
401 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
404 def test_27_check_topo_ROADMA_DEG1(self):
405 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
406 self.assertEqual(response['status_code'], requests.codes.ok)
407 freq_map = base64.b64decode(
408 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
409 freq_map_array = [int(x) for x in freq_map]
410 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
411 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
412 liste_tp = response['node']['ietf-network-topology:termination-point']
414 if ele['tp-id'] == 'DEG2-CTP-TXRX':
415 freq_map = base64.b64decode(
416 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
417 freq_map_array = [int(x) for x in freq_map]
418 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
419 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
420 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
421 freq_map = base64.b64decode(
422 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
423 freq_map_array = [int(x) for x in freq_map]
424 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
425 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
428 # creation service test on a non-available resource
429 def test_28_create_eth_service3(self):
430 self.cr_serv_input_data["service-name"] = "service3"
431 response = test_utils.transportpce_api_rpc_request(
432 'org-openroadm-service', 'service-create',
433 self.cr_serv_input_data)
434 self.assertEqual(response['status_code'], requests.codes.ok)
435 self.assertIn('PCE calculation in progress',
436 response['output']['configuration-response-common']['response-message'])
437 self.assertIn('200', response['output']['configuration-response-common']['response-code'])
438 time.sleep(self.WAITING)
440 # add a test that check the openroadm-service-list still only
441 # contains 2 elements
443 def test_29_delete_eth_service3(self):
444 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service3"
445 response = test_utils.transportpce_api_rpc_request(
446 'org-openroadm-service', 'service-delete',
447 self.del_serv_input_data)
448 self.assertEqual(response['status_code'], requests.codes.ok)
449 self.assertIn('Service \'service3\' does not exist in datastore',
450 response['output']['configuration-response-common']['response-message'])
451 self.assertIn('500', response['output']['configuration-response-common']['response-code'])
454 def test_30_delete_eth_service1(self):
455 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
456 response = test_utils.transportpce_api_rpc_request(
457 'org-openroadm-service', 'service-delete',
458 self.del_serv_input_data)
459 self.assertEqual(response['status_code'], requests.codes.ok)
460 self.assertIn('Renderer service delete in progress',
461 response['output']['configuration-response-common']['response-message'])
462 time.sleep(self.WAITING)
464 def test_31_delete_eth_service2(self):
465 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2"
466 response = test_utils.transportpce_api_rpc_request(
467 'org-openroadm-service', 'service-delete',
468 self.del_serv_input_data)
469 self.assertEqual(response['status_code'], requests.codes.ok)
470 self.assertIn('Renderer service delete in progress',
471 response['output']['configuration-response-common']['response-message'])
472 time.sleep(self.WAITING)
474 def test_32_check_no_xc_ROADMA(self):
475 response = test_utils.check_node_request("ROADMA01")
476 self.assertEqual(response['status_code'], requests.codes.ok)
477 self.assertNotIn('roadm-connections',
478 dict.keys(response['org-openroadm-device']))
481 def test_33_check_topo_XPDRA(self):
482 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDRA01-XPDR1', 'config')
483 self.assertEqual(response['status_code'], requests.codes.ok)
484 liste_tp = response['node']['ietf-network-topology:termination-point']
486 if (ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT'
487 and ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT3')):
489 'org-openroadm-network-topology:xpdr-client-attributes',
491 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
492 self.assertIn('tail-equipment-id', dict.keys(
493 ele['org-openroadm-network-topology:'
494 'xpdr-network-attributes']))
495 self.assertNotIn('wavelength', dict.keys(
496 ele['org-openroadm-network-topology:'
497 'xpdr-network-attributes']))
500 def test_34_check_topo_ROADMA_SRG1(self):
501 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-SRG1', 'config')
502 self.assertEqual(response['status_code'], requests.codes.ok)
503 freq_map = base64.b64decode(
504 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
505 freq_map_array = [int(x) for x in freq_map]
506 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
507 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
508 liste_tp = response['node']['ietf-network-topology:termination-point']
510 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
511 freq_map = base64.b64decode(
512 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
513 freq_map_array = [int(x) for x in freq_map]
514 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
515 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
516 elif ele['tp-id'] == 'SRG1-CP-TXRX':
517 freq_map = base64.b64decode(
518 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
519 freq_map_array = [int(x) for x in freq_map]
520 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
521 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
523 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
526 def test_35_check_topo_ROADMA_DEG1(self):
527 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADMA01-DEG1', 'config')
528 self.assertEqual(response['status_code'], requests.codes.ok)
529 freq_map = base64.b64decode(
530 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
531 freq_map_array = [int(x) for x in freq_map]
532 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
533 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
534 liste_tp = response['node']['ietf-network-topology:termination-point']
536 if ele['tp-id'] == 'DEG2-CTP-TXRX':
537 freq_map = base64.b64decode(
538 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
539 freq_map_array = [int(x) for x in freq_map]
540 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
541 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
542 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
543 freq_map = base64.b64decode(
544 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
545 freq_map_array = [int(x) for x in freq_map]
546 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
547 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
550 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
552 def test_36_create_oc_service1(self):
553 self.cr_serv_input_data["service-name"] = "service1"
554 self.cr_serv_input_data["connection-type"] = "roadm-line"
555 self.cr_serv_input_data["service-a-end"]["node-id"] = "ROADMA01"
556 self.cr_serv_input_data["service-a-end"]["service-format"] = "OC"
557 self.cr_serv_input_data["service-z-end"]["node-id"] = "ROADMC01"
558 self.cr_serv_input_data["service-z-end"]["service-format"] = "OC"
559 response = test_utils.transportpce_api_rpc_request(
560 'org-openroadm-service', 'service-create',
561 self.cr_serv_input_data)
562 self.assertEqual(response['status_code'], requests.codes.ok)
563 self.assertIn('PCE calculation in progress',
564 response['output']['configuration-response-common']['response-message'])
565 time.sleep(self.WAITING)
567 def test_37_get_oc_service1(self):
568 response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
569 self.assertEqual(response['status_code'], requests.codes.ok)
570 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
571 self.assertEqual(response['services'][0]['service-name'], 'service1')
572 self.assertEqual(response['services'][0]['connection-type'], 'roadm-line')
573 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
576 def test_38_check_xc1_ROADMA(self):
577 response = test_utils.check_node_attribute_request(
578 "ROADMA01", "roadm-connections", "SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
579 self.assertEqual(response['status_code'], requests.codes.ok)
580 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
581 self.assertDictEqual(
583 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
584 'wavelength-number': 1,
585 'opticalControlMode': 'gainLoss',
586 'target-output-power': -3.0
587 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
588 self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-761:768'}, response['roadm-connections'][0]['source'])
589 self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-761:768'}, response['roadm-connections'][0]['destination'])
592 def test_39_check_xc1_ROADMC(self):
593 response = test_utils.check_node_attribute_request(
594 "ROADMC01", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
595 self.assertEqual(response['status_code'], requests.codes.ok)
596 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
597 self.assertDictEqual(
599 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
600 'wavelength-number': 1,
601 'opticalControlMode': 'gainLoss',
602 'target-output-power': 2.0
603 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
604 self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-761:768'}, response['roadm-connections'][0]['source'])
605 self.assertDictEqual({'dst-if': 'DEG2-TTP-TXRX-761:768'}, response['roadm-connections'][0]['destination'])
608 def test_40_create_oc_service2(self):
609 self.cr_serv_input_data["service-name"] = "service2"
610 self.cr_serv_input_data["connection-type"] = "roadm-line"
611 self.cr_serv_input_data["service-a-end"]["node-id"] = "ROADMA01"
612 self.cr_serv_input_data["service-a-end"]["service-format"] = "OC"
613 self.cr_serv_input_data["service-z-end"]["node-id"] = "ROADMC01"
614 self.cr_serv_input_data["service-z-end"]["service-format"] = "OC"
615 response = test_utils.transportpce_api_rpc_request(
616 'org-openroadm-service', 'service-create',
617 self.cr_serv_input_data)
618 self.assertEqual(response['status_code'], requests.codes.ok)
619 self.assertIn('PCE calculation in progress',
620 response['output']['configuration-response-common']['response-message'])
621 time.sleep(self.WAITING)
623 def test_41_get_oc_service2(self):
624 response = test_utils.get_ordm_serv_list_attr_request("services", "service2")
625 self.assertEqual(response['status_code'], requests.codes.ok)
626 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
627 self.assertEqual(response['services'][0]['service-name'], 'service2')
628 self.assertEqual(response['services'][0]['connection-type'], 'roadm-line')
629 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
632 def test_42_check_xc2_ROADMA(self):
633 response = test_utils.check_node_attribute_request(
634 "ROADMA01", "roadm-connections", "SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760")
635 self.assertEqual(response['status_code'], requests.codes.ok)
636 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
637 self.assertDictEqual(
639 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760',
640 'wavelength-number': 2,
641 'opticalControlMode': 'gainLoss',
642 'target-output-power': -3.0
643 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
644 self.assertDictEqual({'src-if': 'SRG1-PP2-TXRX-753:760'}, response['roadm-connections'][0]['source'])
645 self.assertDictEqual({'dst-if': 'DEG1-TTP-TXRX-753:760'}, response['roadm-connections'][0]['destination'])
648 def test_43_check_topo_ROADMA(self):
649 self.test_26_check_topo_ROADMA_SRG1()
650 self.test_27_check_topo_ROADMA_DEG1()
653 def test_44_delete_oc_service1(self):
654 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
655 response = test_utils.transportpce_api_rpc_request(
656 'org-openroadm-service', 'service-delete',
657 self.del_serv_input_data)
658 self.assertEqual(response['status_code'], requests.codes.ok)
659 self.assertIn('Renderer service delete in progress',
660 response['output']['configuration-response-common']['response-message'])
661 time.sleep(self.WAITING)
663 def test_45_delete_oc_service2(self):
664 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2"
665 response = test_utils.transportpce_api_rpc_request(
666 'org-openroadm-service', 'service-delete',
667 self.del_serv_input_data)
668 self.assertEqual(response['status_code'], requests.codes.ok)
669 self.assertIn('Renderer service delete in progress',
670 response['output']['configuration-response-common']['response-message'])
671 time.sleep(self.WAITING)
673 def test_46_get_no_oc_services(self):
674 response = test_utils.get_ordm_serv_list_request()
675 self.assertEqual(response['status_code'], requests.codes.conflict)
676 self.assertIn(response['service-list'], (
678 "error-type": "protocol",
679 "error-tag": "data-missing",
681 "Request could not be completed because the relevant data "
682 "model content does not exist"
684 "error-type": "application",
685 "error-tag": "data-missing",
687 "Request could not be completed because the relevant data "
688 "model content does not exist"
692 def test_47_get_no_xc_ROADMA(self):
693 response = test_utils.check_node_request("ROADMA01")
694 self.assertEqual(response['status_code'], requests.codes.ok)
695 self.assertNotIn('roadm-connections', dict.keys(response['org-openroadm-device']))
698 def test_48_check_topo_ROADMA(self):
699 self.test_34_check_topo_ROADMA_SRG1()
700 self.test_35_check_topo_ROADMA_DEG1()
702 def test_49_loop_create_eth_service(self):
703 # pylint: disable=consider-using-f-string
704 for i in range(1, 4):
705 print("iteration number {}".format(i))
706 print("eth service creation")
707 self.test_11_create_eth_service1()
708 print("check xc in ROADMA01")
709 self.test_13_check_xc1_ROADMA()
710 print("check xc in ROADMC01")
711 self.test_14_check_xc1_ROADMC()
712 print("eth service deletion\n")
713 self.test_30_delete_eth_service1()
715 def test_50_loop_create_oc_service(self):
716 response = test_utils.get_ordm_serv_list_attr_request("services", "service1")
717 if response['status_code'] != 404:
718 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1"
719 response = test_utils.transportpce_api_rpc_request(
720 'org-openroadm-service', 'service-delete',
721 self.del_serv_input_data)
724 # pylint: disable=consider-using-f-string
725 for i in range(1, 4):
726 print("iteration number {}".format(i))
727 print("oc service creation")
728 self.test_36_create_oc_service1()
729 print("check xc in ROADMA01")
730 self.test_38_check_xc1_ROADMA()
731 print("check xc in ROADMC01")
732 self.test_39_check_xc1_ROADMC()
733 print("oc service deletion\n")
734 self.test_44_delete_oc_service1()
736 def test_51_disconnect_XPDRA(self):
737 response = test_utils.unmount_device("XPDRA01")
738 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
740 def test_52_disconnect_XPDRC(self):
741 response = test_utils.unmount_device("XPDRC01")
742 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
744 def test_53_disconnect_ROADMA(self):
745 response = test_utils.unmount_device("ROADMA01")
746 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
748 def test_54_disconnect_ROADMC(self):
749 response = test_utils.unmount_device("ROADMC01")
750 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
753 if __name__ == "__main__":
754 unittest.main(verbosity=2)