2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
29 cr_serv_sample_data = {"input": {
30 "sdnc-request-header": {
31 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
32 "rpc-action": "service-create",
33 "request-system-id": "appname",
34 "notification-url": "http://localhost:8585/NotificationServer/notify"
36 "service-name": "service1",
37 "common-id": "ASATT1234567",
38 "connection-type": "service",
40 "service-rate": "100",
42 "service-format": "Ethernet",
43 "clli": "SNJSCAMCJP8",
49 "service-rate": "100",
51 "service-format": "Ethernet",
52 "clli": "SNJSCAMCJT4",
57 "due-date": "2016-11-28T00:00:01Z",
58 "operator-contact": "pw1234"
62 WAITING = 20 # nominal value is 300
63 NODE_VERSION = '2.2.1'
67 cls.processes = test_utils.start_tpce()
68 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
69 ('roadma', cls.NODE_VERSION),
70 ('roadmc', cls.NODE_VERSION),
71 ('xpdrc', cls.NODE_VERSION)])
74 def tearDownClass(cls):
75 # pylint: disable=not-an-iterable
76 for process in cls.processes:
77 test_utils.shutdown_process(process)
78 print("all processes killed")
80 def setUp(self): # instruction executed before each test method
81 # pylint: disable=consider-using-f-string
82 print("execution of {}".format(self.id().split(".")[-1]))
84 def test_01_connect_xpdrA(self):
85 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
86 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
88 def test_02_connect_xpdrC(self):
89 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
90 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
92 def test_03_connect_rdmA(self):
93 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
94 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
96 def test_04_connect_rdmC(self):
97 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
98 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
100 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
101 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
102 "ROADM-A1", "1", "SRG1-PP1-TXRX")
103 self.assertEqual(response.status_code, requests.codes.ok)
104 res = response.json()
105 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
108 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
109 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
110 "ROADM-A1", "1", "SRG1-PP1-TXRX")
111 self.assertEqual(response.status_code, requests.codes.ok)
112 res = response.json()
113 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
116 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
117 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
118 "ROADM-C1", "1", "SRG1-PP1-TXRX")
119 self.assertEqual(response.status_code, requests.codes.ok)
120 res = response.json()
121 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
124 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
125 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
126 "ROADM-C1", "1", "SRG1-PP1-TXRX")
127 self.assertEqual(response.status_code, requests.codes.ok)
128 res = response.json()
129 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
132 def test_09_connect_xprdA_N2_to_roadmA_PP2(self):
133 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
134 "ROADM-A1", "1", "SRG1-PP2-TXRX")
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
137 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
140 def test_10_connect_roadmA_PP2_to_xpdrA_N2(self):
141 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
142 "ROADM-A1", "1", "SRG1-PP2-TXRX")
143 self.assertEqual(response.status_code, requests.codes.ok)
144 res = response.json()
145 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
148 def test_11_connect_xprdC_N2_to_roadmC_PP2(self):
149 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "2", "1",
150 "ROADM-C1", "1", "SRG1-PP2-TXRX")
151 self.assertEqual(response.status_code, requests.codes.ok)
152 res = response.json()
153 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
156 def test_12_connect_roadmC_PP2_to_xpdrC_N2(self):
157 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "2", "1",
158 "ROADM-C1", "1", "SRG1-PP2-TXRX")
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
164 def test_13_add_omsAttributes_ROADMA_ROADMC(self):
165 # Config ROADMA-ROADMC oms-attributes
167 "auto-spanloss": "true",
168 "spanloss-base": 11.4,
169 "spanloss-current": 12,
170 "engineered-spanloss": 12.2,
171 "link-concatenation": [{
174 "SRLG-length": 100000,
176 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
177 self.assertEqual(response.status_code, requests.codes.created)
179 def test_14_add_omsAttributes_ROADMC_ROADMA(self):
180 # Config ROADMC-ROADMA oms-attributes
182 "auto-spanloss": "true",
183 "spanloss-base": 11.4,
184 "spanloss-current": 12,
185 "engineered-spanloss": 12.2,
186 "link-concatenation": [{
189 "SRLG-length": 100000,
191 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
192 self.assertEqual(response.status_code, requests.codes.created)
194 # test service-create for Eth service from xpdr to xpdr
195 def test_15_create_eth_service1(self):
196 self.cr_serv_sample_data["input"]["service-name"] = "service1"
197 response = test_utils.service_create_request(self.cr_serv_sample_data)
198 self.assertEqual(response.status_code, requests.codes.ok)
199 res = response.json()
200 self.assertIn('PCE calculation in progress',
201 res['output']['configuration-response-common']['response-message'])
202 time.sleep(self.WAITING)
204 def test_16_get_eth_service1(self):
205 response = test_utils.get_service_list_request("services/service1")
206 self.assertEqual(response.status_code, requests.codes.ok)
207 res = response.json()
209 res['services'][0]['administrative-state'], 'inService')
211 res['services'][0]['service-name'], 'service1')
213 res['services'][0]['connection-type'], 'service')
215 res['services'][0]['lifecycle-state'], 'planned')
218 def test_17_check_xc1_ROADMA(self):
219 response = test_utils.check_netconf_node_request(
220 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
224 self.assertDictEqual(
226 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
227 'opticalControlMode': 'gainLoss',
228 'target-output-power': -3.0
229 }, **res['roadm-connections'][0]),
230 res['roadm-connections'][0]
232 self.assertDictEqual(
233 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
234 res['roadm-connections'][0]['source'])
235 self.assertDictEqual(
236 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
237 res['roadm-connections'][0]['destination'])
240 def test_18_check_xc1_ROADMC(self):
241 response = test_utils.check_netconf_node_request(
242 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
243 self.assertEqual(response.status_code, requests.codes.ok)
244 res = response.json()
245 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
246 self.assertDictEqual(
248 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
249 'opticalControlMode': 'gainLoss',
250 'target-output-power': -3.0
251 }, **res['roadm-connections'][0]),
252 res['roadm-connections'][0]
254 self.assertDictEqual(
255 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
256 res['roadm-connections'][0]['source'])
257 self.assertDictEqual(
258 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
259 res['roadm-connections'][0]['destination'])
262 def test_19_check_topo_XPDRA(self):
263 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
264 self.assertEqual(response.status_code, requests.codes.ok)
265 res = response.json()
266 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
268 if ele['tp-id'] == 'XPDR1-NETWORK1':
269 self.assertEqual({'frequency': 196.1,
271 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
272 elif ele['tp-id'] in ('XPDR1-CLIENT2'):
273 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
274 elif ele['tp-id'] == 'XPDR1-NETWORK2':
275 self.assertIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
278 def test_20_check_topo_ROADMA_SRG1(self):
279 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
280 self.assertEqual(response.status_code, requests.codes.ok)
281 res = response.json()
282 freq_map = base64.b64decode(
283 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
284 freq_map_array = [int(x) for x in freq_map]
285 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
286 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
288 if ele['tp-id'] == 'SRG1-PP1-TXRX':
289 freq_map = base64.b64decode(
290 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
291 freq_map_array = [int(x) for x in freq_map]
292 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
293 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
294 self.assertNotIn('avail-freq-maps', dict.keys(ele))
297 def test_21_check_topo_ROADMA_DEG2(self):
298 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 freq_map = base64.b64decode(
302 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
303 freq_map_array = [int(x) for x in freq_map]
304 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
305 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
307 if ele['tp-id'] == 'DEG2-CTP-TXRX':
308 freq_map = base64.b64decode(
309 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
310 freq_map_array = [int(x) for x in freq_map]
311 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
312 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
313 freq_map = base64.b64decode(
314 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
315 freq_map_array = [int(x) for x in freq_map]
316 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
319 @unittest.skip("Bug will be resolved in a future change")
320 def test_22_create_eth_service2(self):
321 self.cr_serv_sample_data["input"]["service-name"] = "service2"
322 response = test_utils.service_create_request(self.cr_serv_sample_data)
323 self.assertEqual(response.status_code, requests.codes.ok)
324 res = response.json()
325 self.assertIn('PCE calculation in progress',
326 res['output']['configuration-response-common']['response-message'])
327 time.sleep(self.WAITING)
329 @unittest.skip("Bug will be resolved in a future change")
330 def test_23_get_eth_service2(self):
331 response = test_utils.get_service_list_request("services/service2")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
335 res['services'][0]['administrative-state'],
338 res['services'][0]['service-name'], 'service2')
340 res['services'][0]['connection-type'], 'service')
342 res['services'][0]['lifecycle-state'], 'planned')
345 @unittest.skip("Bug will be resolved in a future change")
346 def test_24_check_xc2_ROADMA(self):
347 response = test_utils.check_netconf_node_request(
348 "ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
352 self.assertDictEqual(
354 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
355 'opticalControlMode': 'power'
356 }, **res['roadm-connections'][0]),
357 res['roadm-connections'][0]
359 self.assertDictEqual(
360 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
361 res['roadm-connections'][0]['source'])
362 self.assertDictEqual(
363 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
364 res['roadm-connections'][0]['destination'])
366 @unittest.skip("Bug will be resolved in a future change")
367 def test_25_check_topo_XPDRA(self):
368 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
369 self.assertEqual(response.status_code, requests.codes.ok)
370 res = response.json()
371 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
373 if ele['tp-id'] == 'XPDR1-NETWORK1':
374 self.assertEqual({'frequency': 196.1,
376 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
377 elif ele['tp-id'] == 'XPDR1-NETWORK2':
378 self.assertEqual({'frequency': 196.05,
380 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
381 elif ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT2'):
382 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
385 @unittest.skip("Bug will be resolved in a future change")
386 def test_26_check_topo_ROADMA_SRG1(self):
387 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 freq_map = base64.b64decode(
391 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
392 freq_map_array = [int(x) for x in freq_map]
393 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
394 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
395 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
397 if ele['tp-id'] == 'SRG1-PP1-TXRX':
398 freq_map = base64.b64decode(
399 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
400 freq_map_array = [int(x) for x in freq_map]
401 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
402 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
403 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
404 freq_map = base64.b64decode(
405 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
406 freq_map_array = [int(x) for x in freq_map]
407 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
408 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
409 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
410 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
413 @unittest.skip("Bug will be resolved in a future change")
414 def test_27_check_topo_ROADMA_DEG2(self):
415 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 freq_map = base64.b64decode(
419 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
420 freq_map_array = [int(x) for x in freq_map]
421 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
422 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
423 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
425 if ele['tp-id'] == 'DEG2-CTP-TXRX':
426 freq_map = base64.b64decode(
427 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
428 freq_map_array = [int(x) for x in freq_map]
429 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
430 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
431 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
432 freq_map = base64.b64decode(
433 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
434 freq_map_array = [int(x) for x in freq_map]
435 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
436 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
439 # creation service test on a non-available resource
440 @unittest.skip("Bug will be resolved in a future change")
441 def test_28_create_eth_service3(self):
442 self.cr_serv_sample_data["input"]["service-name"] = "service3"
443 response = test_utils.service_create_request(self.cr_serv_sample_data)
444 self.assertEqual(response.status_code, requests.codes.ok)
445 res = response.json()
446 self.assertIn('PCE calculation in progress',
447 res['output']['configuration-response-common']['response-message'])
448 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
449 time.sleep(self.WAITING)
451 # add a test that check the openroadm-service-list still only contains 2 elements
452 @unittest.skip("Bug will be resolved in a future change")
453 def test_29_delete_eth_service3(self):
454 response = test_utils.service_delete_request("service3")
455 self.assertEqual(response.status_code, requests.codes.ok)
456 res = response.json()
457 self.assertIn('Service \'service3\' does not exist in datastore',
458 res['output']['configuration-response-common']['response-message'])
459 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
462 def test_30_delete_eth_service1(self):
463 response = test_utils.service_delete_request("service1")
464 self.assertEqual(response.status_code, requests.codes.ok)
465 res = response.json()
466 self.assertIn('Renderer service delete in progress',
467 res['output']['configuration-response-common']['response-message'])
470 @unittest.skip("Bug will be resolved in a future change")
471 def test_31_delete_eth_service2(self):
472 response = test_utils.service_delete_request("service2")
473 self.assertEqual(response.status_code, requests.codes.ok)
474 res = response.json()
475 self.assertIn('Renderer service delete in progress',
476 res['output']['configuration-response-common']['response-message'])
479 def test_32_check_no_xc_ROADMA(self):
480 response = test_utils.check_netconf_node_request("ROADM-A1", "")
481 res = response.json()
482 self.assertEqual(response.status_code, requests.codes.ok)
483 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
486 def test_33_check_topo_XPDRA(self):
487 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
488 self.assertEqual(response.status_code, requests.codes.ok)
489 res = response.json()
490 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
492 if ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
493 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
494 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
495 self.assertIn('tail-equipment-id',
496 dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
497 self.assertNotIn('wavelength', dict.keys(
498 ele['org-openroadm-network-topology:xpdr-network-attributes']))
501 def test_34_check_topo_ROADMA_SRG1(self):
502 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
503 self.assertEqual(response.status_code, requests.codes.ok)
504 res = response.json()
505 freq_map = base64.b64decode(
506 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
507 freq_map_array = [int(x) for x in freq_map]
508 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
509 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
510 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
512 if ele['tp-id'] in ('SRG1-PP1-TXRX'):
513 freq_map = base64.b64decode(
514 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
515 freq_map_array = [int(x) for x in freq_map]
516 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
517 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
518 elif ele['tp-id'] == 'SRG1-CP-TXRX':
519 freq_map = base64.b64decode(
520 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
521 freq_map_array = [int(x) for x in freq_map]
522 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
523 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
525 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
528 def test_35_check_topo_ROADMA_DEG2(self):
529 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
530 self.assertEqual(response.status_code, requests.codes.ok)
531 res = response.json()
532 freq_map = base64.b64decode(
533 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
534 freq_map_array = [int(x) for x in freq_map]
535 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
536 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
537 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
539 if ele['tp-id'] == 'DEG2-CTP-TXRX':
540 freq_map = base64.b64decode(
541 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
542 freq_map_array = [int(x) for x in freq_map]
543 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
544 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
545 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
546 freq_map = base64.b64decode(
547 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
548 freq_map_array = [int(x) for x in freq_map]
549 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
550 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
553 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
554 def test_36_create_oc_service1(self):
555 self.cr_serv_sample_data["input"]["service-name"] = "service1"
556 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
557 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
558 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
559 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
560 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
561 response = test_utils.service_create_request(self.cr_serv_sample_data)
562 self.assertEqual(response.status_code, requests.codes.ok)
563 res = response.json()
564 self.assertIn('PCE calculation in progress',
565 res['output']['configuration-response-common']['response-message'])
566 time.sleep(self.WAITING)
568 def test_37_get_oc_service1(self):
569 response = test_utils.get_service_list_request("services/service1")
570 self.assertEqual(response.status_code, requests.codes.ok)
571 res = response.json()
573 res['services'][0]['administrative-state'],
576 res['services'][0]['service-name'], 'service1')
578 res['services'][0]['connection-type'], 'roadm-line')
580 res['services'][0]['lifecycle-state'], 'planned')
583 def test_38_check_xc1_ROADMA(self):
584 response = test_utils.check_netconf_node_request(
585 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
586 self.assertEqual(response.status_code, requests.codes.ok)
587 res = response.json()
588 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
589 self.assertDictEqual(
591 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
592 'opticalControlMode': 'gainLoss',
593 'target-output-power': -3.0
594 }, **res['roadm-connections'][0]),
595 res['roadm-connections'][0]
597 self.assertDictEqual(
598 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
599 res['roadm-connections'][0]['source'])
600 self.assertDictEqual(
601 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
602 res['roadm-connections'][0]['destination'])
605 def test_39_check_xc1_ROADMC(self):
606 response = test_utils.check_netconf_node_request(
607 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
608 self.assertEqual(response.status_code, requests.codes.ok)
609 res = response.json()
610 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
611 self.assertDictEqual(
613 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
614 'opticalControlMode': 'gainLoss',
615 'target-output-power': -3.0
616 }, **res['roadm-connections'][0]),
617 res['roadm-connections'][0]
619 self.assertDictEqual(
620 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
621 res['roadm-connections'][0]['source'])
622 self.assertDictEqual(
623 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
624 res['roadm-connections'][0]['destination'])
627 @unittest.skip("Bug will be resolved in a future change")
628 def test_40_create_oc_service2(self):
629 self.cr_serv_sample_data["input"]["service-name"] = "service2"
630 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
631 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
632 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
633 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
634 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
635 response = test_utils.service_create_request(self.cr_serv_sample_data)
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 self.assertIn('PCE calculation in progress',
639 res['output']['configuration-response-common']['response-message'])
640 time.sleep(self.WAITING)
642 @unittest.skip("Bug will be resolved in a future change")
643 def test_41_get_oc_service2(self):
644 response = test_utils.get_service_list_request("services/service2")
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
648 res['services'][0]['administrative-state'],
651 res['services'][0]['service-name'], 'service2')
653 res['services'][0]['connection-type'], 'roadm-line')
655 res['services'][0]['lifecycle-state'], 'planned')
658 @unittest.skip("Bug will be resolved in a future change")
659 def test_42_check_xc2_ROADMA(self):
660 response = test_utils.check_netconf_node_request(
661 "ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
662 self.assertEqual(response.status_code, requests.codes.ok)
663 res = response.json()
664 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
665 self.assertDictEqual(
667 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
668 'opticalControlMode': 'gainLoss',
669 'target-output-power': -3.0
670 }, **res['roadm-connections'][0]),
671 res['roadm-connections'][0]
673 self.assertDictEqual(
674 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
675 res['roadm-connections'][0]['source'])
676 self.assertDictEqual(
677 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
678 res['roadm-connections'][0]['destination'])
681 def test_43_check_topo_ROADMA(self):
682 self.test_26_check_topo_ROADMA_SRG1()
683 self.test_27_check_topo_ROADMA_DEG2()
686 def test_44_delete_oc_service1(self):
687 response = test_utils.service_delete_request("service1")
688 self.assertEqual(response.status_code, requests.codes.ok)
689 res = response.json()
690 self.assertIn('Renderer service delete in progress',
691 res['output']['configuration-response-common']['response-message'])
694 @unittest.skip("Bug will be resolved in a future change")
695 def test_45_delete_oc_service2(self):
696 response = test_utils.service_delete_request("service2")
697 self.assertEqual(response.status_code, requests.codes.ok)
698 res = response.json()
699 self.assertIn('Renderer service delete in progress',
700 res['output']['configuration-response-common']['response-message'])
703 def test_46_get_no_oc_services(self):
705 response = test_utils.get_service_list_request("")
706 self.assertEqual(response.status_code, requests.codes.conflict)
707 res = response.json()
709 {"error-type": "application", "error-tag": "data-missing",
710 "error-message": "Request could not be completed because the relevant data model content does not exist"},
711 res['errors']['error'])
714 def test_47_get_no_xc_ROADMA(self):
715 response = test_utils.check_netconf_node_request("ROADM-A1", "")
716 self.assertEqual(response.status_code, requests.codes.ok)
717 res = response.json()
718 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
721 def test_48_check_topo_ROADMA(self):
722 self.test_34_check_topo_ROADMA_SRG1()
723 self.test_35_check_topo_ROADMA_DEG2()
725 def test_49_loop_create_oc_service(self):
726 for i in range(1, 3):
727 # pylint: disable=consider-using-f-string
728 print("iteration number {}".format(i))
729 print("oc service creation")
730 self.test_36_create_oc_service1()
731 print("check xc in ROADM-A1")
732 self.test_38_check_xc1_ROADMA()
733 print("check xc in ROADM-C1")
734 self.test_39_check_xc1_ROADMC()
735 print("oc service deletion\n")
736 self.test_44_delete_oc_service1()
738 def test_50_loop_create_eth_service(self):
739 response = test_utils.get_service_list_request("services/service1")
740 if response.status_code != requests.codes.not_found:
741 response = test_utils.service_delete_request("service1")
743 self.cr_serv_sample_data["input"]["connection-type"] = "service"
744 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "XPDR-A1"
745 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
746 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "XPDR-C1"
747 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
748 for i in range(1, 3):
749 # pylint: disable=consider-using-f-string
750 print("iteration number {}".format(i))
751 print("eth service creation")
752 self.test_15_create_eth_service1()
753 print("check xc in ROADM-A1")
754 self.test_17_check_xc1_ROADMA()
755 print("eth service deletion\n")
756 self.test_30_delete_eth_service1()
758 def test_51_disconnect_XPDRA(self):
759 response = test_utils.unmount_device("XPDR-A1")
760 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
762 def test_52_disconnect_XPDRC(self):
763 response = test_utils.unmount_device("XPDR-C1")
764 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
766 def test_53_disconnect_ROADMA(self):
767 response = test_utils.unmount_device("ROADM-A1")
768 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
770 def test_54_disconnect_ROADMC(self):
771 response = test_utils.unmount_device("ROADM-C1")
772 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
775 if __name__ == "__main__":
776 unittest.main(verbosity=2)