2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
29 cr_serv_sample_data = {"input": {
30 "sdnc-request-header": {
31 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
32 "rpc-action": "service-create",
33 "request-system-id": "appname",
34 "notification-url": "http://localhost:8585/NotificationServer/notify"
36 "service-name": "service1",
37 "common-id": "ASATT1234567",
38 "connection-type": "service",
40 "service-rate": "100",
42 "service-format": "Ethernet",
46 "port-device-name": "XPDR-A1-XPDR1",
48 "port-name": "1/0/2-PLUG-CLIENT",
49 "port-rack": "000000.00",
53 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
54 "lgx-port-name": "LGX Back.3",
55 "lgx-port-rack": "000000.00",
56 "lgx-port-shelf": "00"
61 "port-device-name": "XPDR-A1-XPDR1",
63 "port-name": "1/0/2-PLUG-CLIENT",
64 "port-rack": "000000.00",
68 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
69 "lgx-port-name": "LGX Back.4",
70 "lgx-port-rack": "000000.00",
71 "lgx-port-shelf": "00"
77 "service-rate": "100",
79 "service-format": "Ethernet",
83 "port-device-name": "XPDR-C1-XPDR2",
84 "port-type": "router",
85 "port-name": "1/0/2-PLUG-CLIENT",
86 "port-rack": "000000.00",
90 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
91 "lgx-port-name": "LGX Back.29",
92 "lgx-port-rack": "000000.00",
93 "lgx-port-shelf": "00"
98 "port-device-name": "XPDR-C1-XPDR2",
99 "port-type": "router",
100 "port-name": "1/0/2-PLUG-CLIENT",
101 "port-rack": "000000.00",
105 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
106 "lgx-port-name": "LGX Back.30",
107 "lgx-port-rack": "000000.00",
108 "lgx-port-shelf": "00"
113 "due-date": "2016-11-28T00:00:01Z",
114 "operator-contact": "pw1234"
118 WAITING = 20 # nominal value is 300
119 NODE_VERSION = '2.2.1'
123 cls.processes = test_utils.start_tpce()
124 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
125 ('roadma', cls.NODE_VERSION),
126 ('roadmc', cls.NODE_VERSION),
127 ('xpdrc', cls.NODE_VERSION)])
130 def tearDownClass(cls):
131 # pylint: disable=not-an-iterable
132 for process in cls.processes:
133 test_utils.shutdown_process(process)
134 print("all processes killed")
136 def setUp(self): # instruction executed before each test method
137 # pylint: disable=consider-using-f-string
138 print("execution of {}".format(self.id().split(".")[-1]))
140 def test_01_connect_xpdrA(self):
141 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
142 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_02_connect_xpdrC(self):
145 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_03_connect_rdmA(self):
149 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_04_connect_rdmC(self):
153 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
157 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
158 "ROADM-A1", "1", "SRG1-PP1-TXRX")
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
164 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
165 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
166 "ROADM-A1", "1", "SRG1-PP1-TXRX")
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
172 def test_07_connect_xprdC_xpdr1_N1_to_roadmC_PP1(self):
173 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
174 "ROADM-C1", "1", "SRG1-PP1-TXRX")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
180 def test_08_connect_roadmC_PP1_to_xpdrC_xprd1_N1(self):
181 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
182 "ROADM-C1", "1", "SRG1-PP1-TXRX")
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
188 def test_09_connect_xprdA_N2_to_roadmA_PP2(self):
189 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
190 "ROADM-A1", "1", "SRG1-PP2-TXRX")
191 self.assertEqual(response.status_code, requests.codes.ok)
192 res = response.json()
193 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
196 def test_10_connect_roadmA_PP2_to_xpdrA_N2(self):
197 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
198 "ROADM-A1", "1", "SRG1-PP2-TXRX")
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
201 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
204 def test_11_connect_xprdC_xpdr2_N1_to_roadmC_PP2(self):
205 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "2", "1",
206 "ROADM-C1", "1", "SRG1-PP2-TXRX")
207 self.assertEqual(response.status_code, requests.codes.ok)
208 res = response.json()
209 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
212 def test_12_connect_roadmC_PP2_to_xpdrC_xpdr2_N1(self):
213 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "2", "1",
214 "ROADM-C1", "1", "SRG1-PP2-TXRX")
215 self.assertEqual(response.status_code, requests.codes.ok)
216 res = response.json()
217 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
220 def test_13_add_omsAttributes_ROADMA_ROADMC(self):
221 # Config ROADMA-ROADMC oms-attributes
223 "auto-spanloss": "true",
224 "spanloss-base": 11.4,
225 "spanloss-current": 12,
226 "engineered-spanloss": 12.2,
227 "link-concatenation": [{
230 "SRLG-length": 100000,
232 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
233 self.assertEqual(response.status_code, requests.codes.created)
235 def test_14_add_omsAttributes_ROADMC_ROADMA(self):
236 # Config ROADMC-ROADMA oms-attributes
238 "auto-spanloss": "true",
239 "spanloss-base": 11.4,
240 "spanloss-current": 12,
241 "engineered-spanloss": 12.2,
242 "link-concatenation": [{
245 "SRLG-length": 100000,
247 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
248 self.assertEqual(response.status_code, requests.codes.created)
250 # test service-create for Eth service from xpdr to xpdr
251 def test_15_create_eth_service2(self):
252 self.cr_serv_sample_data["input"]["service-name"] = "service2"
253 response = test_utils.service_create_request(self.cr_serv_sample_data)
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 self.assertIn('PCE calculation in progress',
257 res['output']['configuration-response-common']['response-message'])
258 time.sleep(self.WAITING)
260 def test_16_get_eth_service1(self):
261 response = test_utils.get_service_list_request("services/service2")
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
265 res['services'][0]['administrative-state'], 'inService')
267 res['services'][0]['service-name'], 'service2')
269 res['services'][0]['connection-type'], 'service')
271 res['services'][0]['lifecycle-state'], 'planned')
274 def test_17_check_xc1_ROADMA(self):
275 response = test_utils.check_netconf_node_request(
276 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
277 self.assertEqual(response.status_code, requests.codes.ok)
278 res = response.json()
279 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
280 self.assertDictEqual(
282 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
283 'opticalControlMode': 'gainLoss',
284 'target-output-power': -3.0
285 }, **res['roadm-connections'][0]),
286 res['roadm-connections'][0]
288 self.assertDictEqual(
289 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
290 res['roadm-connections'][0]['source'])
291 self.assertDictEqual(
292 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
293 res['roadm-connections'][0]['destination'])
296 def test_18_check_xc1_ROADMC(self):
297 response = test_utils.check_netconf_node_request(
298 "ROADM-C1", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-761:768")
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
302 self.assertDictEqual(
304 'connection-name': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-761:768',
305 'opticalControlMode': 'gainLoss',
306 'target-output-power': -3.0
307 }, **res['roadm-connections'][0]),
308 res['roadm-connections'][0]
310 self.assertDictEqual(
311 {'src-if': 'SRG1-PP2-TXRX-nmc-761:768'},
312 res['roadm-connections'][0]['source'])
313 self.assertDictEqual(
314 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
315 res['roadm-connections'][0]['destination'])
318 def test_19_check_topo_XPDRA(self):
319 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
320 self.assertEqual(response.status_code, requests.codes.ok)
321 res = response.json()
322 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
324 if ele['tp-id'] == 'XPDR1-NETWORK1':
325 self.assertEqual({'frequency': 196.1,
327 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
328 elif ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT2'):
329 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
330 elif ele['tp-id'] == 'XPDR1-NETWORK2':
331 self.assertIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
334 def test_20_check_topo_ROADMA_SRG1(self):
335 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
336 self.assertEqual(response.status_code, requests.codes.ok)
337 res = response.json()
338 freq_map = base64.b64decode(
339 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
340 freq_map_array = [int(x) for x in freq_map]
341 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
342 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
344 if ele['tp-id'] == 'SRG1-PP1-TXRX':
345 freq_map = base64.b64decode(
346 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
347 freq_map_array = [int(x) for x in freq_map]
348 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
349 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
350 self.assertNotIn('avail-freq-maps', dict.keys(ele))
353 def test_21_check_topo_ROADMA_DEG2(self):
354 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 freq_map = base64.b64decode(
358 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
359 freq_map_array = [int(x) for x in freq_map]
360 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
361 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
363 if ele['tp-id'] == 'DEG2-CTP-TXRX':
364 freq_map = base64.b64decode(
365 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
366 freq_map_array = [int(x) for x in freq_map]
367 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
368 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
369 freq_map = base64.b64decode(
370 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
371 freq_map_array = [int(x) for x in freq_map]
372 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
375 def test_22_create_eth_service1(self):
376 self.cr_serv_sample_data["input"]["service-name"] = "service1"
377 del self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"]
378 del self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"]
379 del self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"]
380 del self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"]
381 del self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"]
382 del self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"]
383 del self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"]
384 del self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"]
386 response = test_utils.service_create_request(self.cr_serv_sample_data)
387 self.assertEqual(response.status_code, requests.codes.ok)
388 res = response.json()
389 self.assertIn('PCE calculation in progress',
390 res['output']['configuration-response-common']['response-message'])
391 time.sleep(self.WAITING)
393 def test_23_get_eth_service1(self):
394 response = test_utils.get_service_list_request("services/service1")
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
398 res['services'][0]['administrative-state'],
401 res['services'][0]['service-name'], 'service1')
403 res['services'][0]['connection-type'], 'service')
405 res['services'][0]['lifecycle-state'], 'planned')
408 def test_24_check_xc1_ROADMA(self):
409 response = test_utils.check_netconf_node_request(
410 "ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
411 self.assertEqual(response.status_code, requests.codes.ok)
412 res = response.json()
413 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
414 self.assertDictEqual(
416 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
417 'opticalControlMode': 'power'
418 }, **res['roadm-connections'][0]),
419 res['roadm-connections'][0]
421 self.assertDictEqual(
422 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
423 res['roadm-connections'][0]['source'])
424 self.assertDictEqual(
425 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
426 res['roadm-connections'][0]['destination'])
428 def test_25_check_topo_XPDRA(self):
429 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
430 self.assertEqual(response.status_code, requests.codes.ok)
431 res = response.json()
432 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
434 if ele['tp-id'] == 'XPDR1-NETWORK2':
435 self.assertEqual({'frequency': 196.05,
437 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
438 elif ele['tp-id'] in ('XPDR1-CLIENT1'):
439 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
442 def test_26_check_topo_ROADMA_SRG1(self):
443 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
444 self.assertEqual(response.status_code, requests.codes.ok)
445 res = response.json()
446 freq_map = base64.b64decode(
447 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
448 freq_map_array = [int(x) for x in freq_map]
449 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
450 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
451 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
453 if ele['tp-id'] == 'SRG1-PP1-TXRX':
454 freq_map = base64.b64decode(
455 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
456 freq_map_array = [int(x) for x in freq_map]
457 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
458 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
459 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
460 freq_map = base64.b64decode(
461 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
462 freq_map_array = [int(x) for x in freq_map]
463 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
464 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
465 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
466 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
469 def test_27_check_topo_ROADMA_DEG2(self):
470 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
471 self.assertEqual(response.status_code, requests.codes.ok)
472 res = response.json()
473 freq_map = base64.b64decode(
474 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
475 freq_map_array = [int(x) for x in freq_map]
476 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
477 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
478 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
480 if ele['tp-id'] == 'DEG2-CTP-TXRX':
481 freq_map = base64.b64decode(
482 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
483 freq_map_array = [int(x) for x in freq_map]
484 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
485 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
486 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
487 freq_map = base64.b64decode(
488 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
489 freq_map_array = [int(x) for x in freq_map]
490 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
491 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
494 # creation service test on a non-available resource
495 def test_28_create_eth_service3(self):
496 self.cr_serv_sample_data["input"]["service-name"] = "service3"
497 response = test_utils.service_create_request(self.cr_serv_sample_data)
498 self.assertEqual(response.status_code, requests.codes.ok)
499 res = response.json()
500 self.assertIn('PCE calculation in progress',
501 res['output']['configuration-response-common']['response-message'])
502 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
503 time.sleep(self.WAITING)
505 # add a test that check the openroadm-service-list still only contains 2 elements
506 def test_29_delete_eth_service3(self):
507 response = test_utils.service_delete_request("service3")
508 self.assertEqual(response.status_code, requests.codes.ok)
509 res = response.json()
510 self.assertIn('Service \'service3\' does not exist in datastore',
511 res['output']['configuration-response-common']['response-message'])
512 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
515 def test_30_delete_eth_service1(self):
516 response = test_utils.service_delete_request("service1")
517 self.assertEqual(response.status_code, requests.codes.ok)
518 res = response.json()
519 self.assertIn('Renderer service delete in progress',
520 res['output']['configuration-response-common']['response-message'])
523 def test_31_delete_eth_service2(self):
524 response = test_utils.service_delete_request("service2")
525 self.assertEqual(response.status_code, requests.codes.ok)
526 res = response.json()
527 self.assertIn('Renderer service delete in progress',
528 res['output']['configuration-response-common']['response-message'])
531 def test_32_check_no_xc_ROADMA(self):
532 response = test_utils.check_netconf_node_request("ROADM-A1", "")
533 res = response.json()
534 self.assertEqual(response.status_code, requests.codes.ok)
535 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
538 def test_33_check_topo_XPDRA(self):
539 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
540 self.assertEqual(response.status_code, requests.codes.ok)
541 res = response.json()
542 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
544 if ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
545 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
546 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
547 self.assertIn('tail-equipment-id',
548 dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
549 self.assertNotIn('wavelength', dict.keys(
550 ele['org-openroadm-network-topology:xpdr-network-attributes']))
553 def test_34_check_topo_ROADMA_SRG1(self):
554 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
555 self.assertEqual(response.status_code, requests.codes.ok)
556 res = response.json()
557 freq_map = base64.b64decode(
558 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
559 freq_map_array = [int(x) for x in freq_map]
560 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
561 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
562 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
564 if ele['tp-id'] in ('SRG1-PP1-TXRX', 'SRG1-PP2-TXRX'):
565 freq_map = base64.b64decode(
566 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
567 freq_map_array = [int(x) for x in freq_map]
568 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
569 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
570 elif ele['tp-id'] == 'SRG1-CP-TXRX':
571 freq_map = base64.b64decode(
572 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
573 freq_map_array = [int(x) for x in freq_map]
574 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
575 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
577 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
580 def test_35_check_topo_ROADMA_DEG2(self):
581 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
582 self.assertEqual(response.status_code, requests.codes.ok)
583 res = response.json()
584 freq_map = base64.b64decode(
585 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
586 freq_map_array = [int(x) for x in freq_map]
587 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
588 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
589 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
591 if ele['tp-id'] == 'DEG2-CTP-TXRX':
592 freq_map = base64.b64decode(
593 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
594 freq_map_array = [int(x) for x in freq_map]
595 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
596 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
597 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
598 freq_map = base64.b64decode(
599 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
600 freq_map_array = [int(x) for x in freq_map]
601 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
602 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
605 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
606 def test_36_create_oc_service1(self):
607 self.cr_serv_sample_data["input"]["service-name"] = "service1"
608 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
609 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
610 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
611 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
612 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
613 response = test_utils.service_create_request(self.cr_serv_sample_data)
614 self.assertEqual(response.status_code, requests.codes.ok)
615 res = response.json()
616 self.assertIn('PCE calculation in progress',
617 res['output']['configuration-response-common']['response-message'])
618 time.sleep(self.WAITING)
620 def test_37_get_oc_service1(self):
621 response = test_utils.get_service_list_request("services/service1")
622 self.assertEqual(response.status_code, requests.codes.ok)
623 res = response.json()
625 res['services'][0]['administrative-state'],
628 res['services'][0]['service-name'], 'service1')
630 res['services'][0]['connection-type'], 'roadm-line')
632 res['services'][0]['lifecycle-state'], 'planned')
635 def test_38_check_xc1_ROADMA(self):
636 response = test_utils.check_netconf_node_request(
637 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
638 self.assertEqual(response.status_code, requests.codes.ok)
639 res = response.json()
640 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
641 self.assertDictEqual(
643 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
644 'opticalControlMode': 'gainLoss',
645 'target-output-power': -3.0
646 }, **res['roadm-connections'][0]),
647 res['roadm-connections'][0]
649 self.assertDictEqual(
650 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
651 res['roadm-connections'][0]['source'])
652 self.assertDictEqual(
653 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
654 res['roadm-connections'][0]['destination'])
657 def test_39_check_xc1_ROADMC(self):
658 response = test_utils.check_netconf_node_request(
659 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
660 self.assertEqual(response.status_code, requests.codes.ok)
661 res = response.json()
662 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
663 self.assertDictEqual(
665 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
666 'opticalControlMode': 'gainLoss',
667 'target-output-power': -3.0
668 }, **res['roadm-connections'][0]),
669 res['roadm-connections'][0]
671 self.assertDictEqual(
672 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
673 res['roadm-connections'][0]['source'])
674 self.assertDictEqual(
675 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
676 res['roadm-connections'][0]['destination'])
679 def test_40_create_oc_service2(self):
680 self.cr_serv_sample_data["input"]["service-name"] = "service2"
681 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
682 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
683 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
684 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
685 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
686 response = test_utils.service_create_request(self.cr_serv_sample_data)
687 self.assertEqual(response.status_code, requests.codes.ok)
688 res = response.json()
689 self.assertIn('PCE calculation in progress',
690 res['output']['configuration-response-common']['response-message'])
691 time.sleep(self.WAITING)
693 def test_41_get_oc_service2(self):
694 response = test_utils.get_service_list_request("services/service2")
695 self.assertEqual(response.status_code, requests.codes.ok)
696 res = response.json()
698 res['services'][0]['administrative-state'],
701 res['services'][0]['service-name'], 'service2')
703 res['services'][0]['connection-type'], 'roadm-line')
705 res['services'][0]['lifecycle-state'], 'planned')
708 def test_42_check_xc2_ROADMA(self):
709 response = test_utils.check_netconf_node_request(
710 "ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
711 self.assertEqual(response.status_code, requests.codes.ok)
712 res = response.json()
713 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
714 self.assertDictEqual(
716 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
717 'opticalControlMode': 'gainLoss',
718 'target-output-power': -3.0
719 }, **res['roadm-connections'][0]),
720 res['roadm-connections'][0]
722 self.assertDictEqual(
723 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
724 res['roadm-connections'][0]['source'])
725 self.assertDictEqual(
726 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
727 res['roadm-connections'][0]['destination'])
730 def test_43_check_topo_ROADMA(self):
731 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
732 self.assertEqual(response.status_code, requests.codes.ok)
733 res = response.json()
734 freq_map = base64.b64decode(
735 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
736 freq_map_array = [int(x) for x in freq_map]
737 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
738 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
739 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
741 if ele['tp-id'] == 'SRG1-PP1-TXRX':
742 freq_map = base64.b64decode(
743 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
744 freq_map_array = [int(x) for x in freq_map]
745 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
746 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
747 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
748 freq_map = base64.b64decode(
749 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
750 freq_map_array = [int(x) for x in freq_map]
751 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
752 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
753 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
754 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
756 self.test_27_check_topo_ROADMA_DEG2()
759 def test_44_delete_oc_service1(self):
760 response = test_utils.service_delete_request("service1")
761 self.assertEqual(response.status_code, requests.codes.ok)
762 res = response.json()
763 self.assertIn('Renderer service delete in progress',
764 res['output']['configuration-response-common']['response-message'])
767 def test_45_delete_oc_service2(self):
768 response = test_utils.service_delete_request("service2")
769 self.assertEqual(response.status_code, requests.codes.ok)
770 res = response.json()
771 self.assertIn('Renderer service delete in progress',
772 res['output']['configuration-response-common']['response-message'])
775 def test_46_get_no_oc_services(self):
777 response = test_utils.get_service_list_request("")
778 self.assertEqual(response.status_code, requests.codes.conflict)
779 res = response.json()
781 {"error-type": "application", "error-tag": "data-missing",
782 "error-message": "Request could not be completed because the relevant data model content does not exist"},
783 res['errors']['error'])
786 def test_47_get_no_xc_ROADMA(self):
787 response = test_utils.check_netconf_node_request("ROADM-A1", "")
788 self.assertEqual(response.status_code, requests.codes.ok)
789 res = response.json()
790 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
793 def test_48_check_topo_ROADMA(self):
794 self.test_34_check_topo_ROADMA_SRG1()
795 self.test_35_check_topo_ROADMA_DEG2()
797 def test_49_loop_create_oc_service(self):
798 for i in range(1, 3):
799 # pylint: disable=consider-using-f-string
800 print("iteration number {}".format(i))
801 print("oc service creation")
802 self.test_36_create_oc_service1()
803 print("check xc in ROADM-A1")
804 self.test_38_check_xc1_ROADMA()
805 print("check xc in ROADM-C1")
806 self.test_39_check_xc1_ROADMC()
807 print("oc service deletion\n")
808 self.test_44_delete_oc_service1()
810 def test_50_loop_create_eth_service(self):
811 response = test_utils.get_service_list_request("services/service1")
812 if response.status_code != requests.codes.not_found:
813 response = test_utils.service_delete_request("service1")
815 self.cr_serv_sample_data["input"]["connection-type"] = "service"
816 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "XPDR-A1"
817 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
818 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "XPDR-C1"
819 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
820 for i in range(1, 3):
821 # pylint: disable=consider-using-f-string
822 print("iteration number {}".format(i))
823 print("eth service creation")
824 self.test_15_create_eth_service2()
825 print("check xc in ROADM-A1")
826 self.test_17_check_xc1_ROADMA()
827 print("eth service deletion\n")
828 self.test_31_delete_eth_service2()
830 def test_51_disconnect_XPDRA(self):
831 response = test_utils.unmount_device("XPDR-A1")
832 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
834 def test_52_disconnect_XPDRC(self):
835 response = test_utils.unmount_device("XPDR-C1")
836 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
838 def test_53_disconnect_ROADMA(self):
839 response = test_utils.unmount_device("ROADM-A1")
840 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
842 def test_54_disconnect_ROADMC(self):
843 response = test_utils.unmount_device("ROADM-C1")
844 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
847 if __name__ == "__main__":
848 unittest.main(verbosity=2)