2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
29 cr_serv_sample_data = {"input": {
30 "sdnc-request-header": {
31 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
32 "rpc-action": "service-create",
33 "request-system-id": "appname",
34 "notification-url": "http://localhost:8585/NotificationServer/notify"
36 "service-name": "service1",
37 "common-id": "ASATT1234567",
38 "connection-type": "service",
40 "service-rate": "100",
42 "service-format": "Ethernet",
43 "clli": "SNJSCAMCJP8",
46 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
47 "port-type": "router",
48 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
49 "port-rack": "000000.00",
53 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
54 "lgx-port-name": "LGX Back.3",
55 "lgx-port-rack": "000000.00",
56 "lgx-port-shelf": "00"
61 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
62 "port-type": "router",
63 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
64 "port-rack": "000000.00",
68 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
69 "lgx-port-name": "LGX Back.4",
70 "lgx-port-rack": "000000.00",
71 "lgx-port-shelf": "00"
77 "service-rate": "100",
79 "service-format": "Ethernet",
80 "clli": "SNJSCAMCJT4",
83 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
84 "port-type": "router",
85 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
86 "port-rack": "000000.00",
90 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
91 "lgx-port-name": "LGX Back.29",
92 "lgx-port-rack": "000000.00",
93 "lgx-port-shelf": "00"
98 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
99 "port-type": "router",
100 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
101 "port-rack": "000000.00",
105 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
106 "lgx-port-name": "LGX Back.30",
107 "lgx-port-rack": "000000.00",
108 "lgx-port-shelf": "00"
113 "due-date": "2016-11-28T00:00:01Z",
114 "operator-contact": "pw1234"
118 WAITING = 20 # nominal value is 300
119 NODE_VERSION = '2.2.1'
123 cls.processes = test_utils.start_tpce()
124 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
125 ('roadma', cls.NODE_VERSION),
126 ('roadmc', cls.NODE_VERSION),
127 ('xpdrc', cls.NODE_VERSION)])
130 def tearDownClass(cls):
131 # pylint: disable=not-an-iterable
132 for process in cls.processes:
133 test_utils.shutdown_process(process)
134 print("all processes killed")
136 def setUp(self): # instruction executed before each test method
137 # pylint: disable=consider-using-f-string
138 print("execution of {}".format(self.id().split(".")[-1]))
140 def test_01_connect_xpdrA(self):
141 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
142 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_02_connect_xpdrC(self):
145 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_03_connect_rdmA(self):
149 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_04_connect_rdmC(self):
153 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
157 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
158 "ROADM-A1", "1", "SRG1-PP1-TXRX")
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
164 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
165 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
166 "ROADM-A1", "1", "SRG1-PP1-TXRX")
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
172 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
173 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
174 "ROADM-C1", "1", "SRG1-PP1-TXRX")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
180 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
181 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
182 "ROADM-C1", "1", "SRG1-PP1-TXRX")
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
188 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
189 # Config ROADMA-ROADMC oms-attributes
191 "auto-spanloss": "true",
192 "spanloss-base": 11.4,
193 "spanloss-current": 12,
194 "engineered-spanloss": 12.2,
195 "link-concatenation": [{
198 "SRLG-length": 100000,
200 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
201 self.assertEqual(response.status_code, requests.codes.created)
203 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
204 # Config ROADMC-ROADMA oms-attributes
206 "auto-spanloss": "true",
207 "spanloss-base": 11.4,
208 "spanloss-current": 12,
209 "engineered-spanloss": 12.2,
210 "link-concatenation": [{
213 "SRLG-length": 100000,
215 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
216 self.assertEqual(response.status_code, requests.codes.created)
218 # test service-create for Eth service from xpdr to xpdr
219 def test_11_create_eth_service1(self):
220 self.cr_serv_sample_data["input"]["service-name"] = "service1"
221 response = test_utils.service_create_request(self.cr_serv_sample_data)
222 self.assertEqual(response.status_code, requests.codes.ok)
223 res = response.json()
224 self.assertIn('PCE calculation in progress',
225 res['output']['configuration-response-common']['response-message'])
226 time.sleep(self.WAITING)
228 def test_12_get_eth_service1(self):
229 response = test_utils.get_service_list_request("services/service1")
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
233 res['services'][0]['administrative-state'], 'inService')
235 res['services'][0]['service-name'], 'service1')
237 res['services'][0]['connection-type'], 'service')
239 res['services'][0]['lifecycle-state'], 'planned')
242 def test_13_check_xc1_ROADMA(self):
243 response = test_utils.check_netconf_node_request(
244 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
248 self.assertDictEqual(
250 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
251 'opticalControlMode': 'gainLoss',
252 'target-output-power': -3.0
253 }, **res['roadm-connections'][0]),
254 res['roadm-connections'][0]
256 self.assertDictEqual(
257 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
258 res['roadm-connections'][0]['source'])
259 self.assertDictEqual(
260 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
261 res['roadm-connections'][0]['destination'])
264 def test_14_check_xc1_ROADMC(self):
265 response = test_utils.check_netconf_node_request(
266 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
267 self.assertEqual(response.status_code, requests.codes.ok)
268 res = response.json()
269 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
270 self.assertDictEqual(
272 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
273 'opticalControlMode': 'gainLoss',
274 'target-output-power': -3.0
275 }, **res['roadm-connections'][0]),
276 res['roadm-connections'][0]
278 self.assertDictEqual(
279 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
280 res['roadm-connections'][0]['source'])
281 self.assertDictEqual(
282 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
283 res['roadm-connections'][0]['destination'])
286 def test_15_check_topo_XPDRA(self):
287 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
288 self.assertEqual(response.status_code, requests.codes.ok)
289 res = response.json()
290 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
292 if ele['tp-id'] == 'XPDR1-NETWORK1':
293 self.assertEqual({'frequency': 196.1,
295 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
296 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
297 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
298 if ele['tp-id'] == 'XPDR1-NETWORK2':
299 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
302 def test_16_check_topo_ROADMA_SRG1(self):
303 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
304 self.assertEqual(response.status_code, requests.codes.ok)
305 res = response.json()
306 freq_map = base64.b64decode(
307 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
308 freq_map_array = [int(x) for x in freq_map]
309 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
310 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
312 if ele['tp-id'] == 'SRG1-PP1-TXRX':
313 freq_map = base64.b64decode(
314 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
315 freq_map_array = [int(x) for x in freq_map]
316 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
317 if ele['tp-id'] == 'SRG1-PP2-TXRX':
318 self.assertNotIn('avail-freq-maps', dict.keys(ele))
321 def test_17_check_topo_ROADMA_DEG1(self):
322 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
323 self.assertEqual(response.status_code, requests.codes.ok)
324 res = response.json()
325 freq_map = base64.b64decode(
326 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
327 freq_map_array = [int(x) for x in freq_map]
328 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
329 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
331 if ele['tp-id'] == 'DEG2-CTP-TXRX':
332 freq_map = base64.b64decode(
333 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
334 freq_map_array = [int(x) for x in freq_map]
335 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
336 if ele['tp-id'] == 'DEG2-TTP-TXRX':
337 freq_map = base64.b64decode(
338 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
339 freq_map_array = [int(x) for x in freq_map]
340 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
343 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
344 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
345 "ROADM-A1", "1", "SRG1-PP2-TXRX")
346 self.assertEqual(response.status_code, requests.codes.ok)
347 res = response.json()
348 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
351 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
352 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
353 "ROADM-A1", "1", "SRG1-PP2-TXRX")
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
359 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
360 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
361 "ROADM-C1", "1", "SRG1-PP2-TXRX")
362 self.assertEqual(response.status_code, requests.codes.ok)
363 res = response.json()
364 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
367 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
368 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
369 "ROADM-C1", "1", "SRG1-PP2-TXRX")
370 self.assertEqual(response.status_code, requests.codes.ok)
371 res = response.json()
372 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
375 def test_22_create_eth_service2(self):
376 self.cr_serv_sample_data["input"]["service-name"] = "service2"
377 response = test_utils.service_create_request(self.cr_serv_sample_data)
378 self.assertEqual(response.status_code, requests.codes.ok)
379 res = response.json()
380 self.assertIn('PCE calculation in progress',
381 res['output']['configuration-response-common']['response-message'])
382 time.sleep(self.WAITING)
384 def test_23_get_eth_service2(self):
385 response = test_utils.get_service_list_request("services/service2")
386 self.assertEqual(response.status_code, requests.codes.ok)
387 res = response.json()
389 res['services'][0]['administrative-state'],
392 res['services'][0]['service-name'], 'service2')
394 res['services'][0]['connection-type'], 'service')
396 res['services'][0]['lifecycle-state'], 'planned')
399 def test_24_check_xc2_ROADMA(self):
400 response = test_utils.check_netconf_node_request(
401 "ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
405 self.assertDictEqual(
407 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
408 'opticalControlMode': 'power'
409 }, **res['roadm-connections'][0]),
410 res['roadm-connections'][0]
412 self.assertDictEqual(
413 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
414 res['roadm-connections'][0]['source'])
415 self.assertDictEqual(
416 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
417 res['roadm-connections'][0]['destination'])
419 def test_25_check_topo_XPDRA(self):
420 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
421 self.assertEqual(response.status_code, requests.codes.ok)
422 res = response.json()
423 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
425 if ele['tp-id'] == 'XPDR1-NETWORK1':
426 self.assertEqual({'frequency': 196.1,
428 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
429 if ele['tp-id'] == 'XPDR1-NETWORK2':
430 self.assertEqual({'frequency': 196.05,
432 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
433 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
434 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
437 def test_26_check_topo_ROADMA_SRG1(self):
438 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 freq_map = base64.b64decode(
442 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
443 freq_map_array = [int(x) for x in freq_map]
444 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
445 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
446 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
448 if ele['tp-id'] == 'SRG1-PP1-TXRX':
449 freq_map = base64.b64decode(
450 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
451 freq_map_array = [int(x) for x in freq_map]
452 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
453 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
454 if ele['tp-id'] == 'SRG1-PP2-TXRX':
455 freq_map = base64.b64decode(
456 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
457 freq_map_array = [int(x) for x in freq_map]
458 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
459 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
460 if ele['tp-id'] == 'SRG1-PP3-TXRX':
461 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
464 def test_27_check_topo_ROADMA_DEG2(self):
465 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
466 self.assertEqual(response.status_code, requests.codes.ok)
467 res = response.json()
468 freq_map = base64.b64decode(
469 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
470 freq_map_array = [int(x) for x in freq_map]
471 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
472 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
473 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
475 if ele['tp-id'] == 'DEG2-CTP-TXRX':
476 freq_map = base64.b64decode(
477 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
478 freq_map_array = [int(x) for x in freq_map]
479 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
480 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
481 if ele['tp-id'] == 'DEG2-TTP-TXRX':
482 freq_map = base64.b64decode(
483 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
484 freq_map_array = [int(x) for x in freq_map]
485 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
486 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
489 # creation service test on a non-available resource
490 def test_28_create_eth_service3(self):
491 self.cr_serv_sample_data["input"]["service-name"] = "service3"
492 response = test_utils.service_create_request(self.cr_serv_sample_data)
493 self.assertEqual(response.status_code, requests.codes.ok)
494 res = response.json()
495 self.assertIn('PCE calculation in progress',
496 res['output']['configuration-response-common']['response-message'])
497 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
498 time.sleep(self.WAITING)
500 # add a test that check the openroadm-service-list still only contains 2 elements
501 def test_29_delete_eth_service3(self):
502 response = test_utils.service_delete_request("service3")
503 self.assertEqual(response.status_code, requests.codes.ok)
504 res = response.json()
505 self.assertIn('Service \'service3\' does not exist in datastore',
506 res['output']['configuration-response-common']['response-message'])
507 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
510 def test_30_delete_eth_service1(self):
511 response = test_utils.service_delete_request("service1")
512 self.assertEqual(response.status_code, requests.codes.ok)
513 res = response.json()
514 self.assertIn('Renderer service delete in progress',
515 res['output']['configuration-response-common']['response-message'])
518 def test_31_delete_eth_service2(self):
519 response = test_utils.service_delete_request("service2")
520 self.assertEqual(response.status_code, requests.codes.ok)
521 res = response.json()
522 self.assertIn('Renderer service delete in progress',
523 res['output']['configuration-response-common']['response-message'])
526 def test_32_check_no_xc_ROADMA(self):
527 response = test_utils.check_netconf_node_request("ROADM-A1", "")
528 res = response.json()
529 self.assertEqual(response.status_code, requests.codes.ok)
530 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
533 def test_33_check_topo_XPDRA(self):
534 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
535 self.assertEqual(response.status_code, requests.codes.ok)
536 res = response.json()
537 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
539 if ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
540 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
541 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
542 self.assertIn('tail-equipment-id',
543 dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
544 self.assertNotIn('wavelength', dict.keys(
545 ele['org-openroadm-network-topology:xpdr-network-attributes']))
548 def test_34_check_topo_ROADMA_SRG1(self):
549 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
550 self.assertEqual(response.status_code, requests.codes.ok)
551 res = response.json()
552 freq_map = base64.b64decode(
553 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
554 freq_map_array = [int(x) for x in freq_map]
555 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
556 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
557 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
559 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
560 freq_map = base64.b64decode(
561 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
562 freq_map_array = [int(x) for x in freq_map]
563 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
564 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
565 elif ele['tp-id'] == 'SRG1-CP-TXRX':
566 freq_map = base64.b64decode(
567 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
568 freq_map_array = [int(x) for x in freq_map]
569 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
570 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
572 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
575 def test_35_check_topo_ROADMA_DEG2(self):
576 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
577 self.assertEqual(response.status_code, requests.codes.ok)
578 res = response.json()
579 freq_map = base64.b64decode(
580 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
581 freq_map_array = [int(x) for x in freq_map]
582 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
583 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
584 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
586 if ele['tp-id'] == 'DEG2-CTP-TXRX':
587 freq_map = base64.b64decode(
588 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
589 freq_map_array = [int(x) for x in freq_map]
590 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
591 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
592 if ele['tp-id'] == 'DEG2-TTP-TXRX':
593 freq_map = base64.b64decode(
594 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
595 freq_map_array = [int(x) for x in freq_map]
596 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
597 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
600 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
601 def test_36_create_oc_service1(self):
602 self.cr_serv_sample_data["input"]["service-name"] = "service1"
603 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
604 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
605 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
606 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
607 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
608 response = test_utils.service_create_request(self.cr_serv_sample_data)
609 self.assertEqual(response.status_code, requests.codes.ok)
610 res = response.json()
611 self.assertIn('PCE calculation in progress',
612 res['output']['configuration-response-common']['response-message'])
613 time.sleep(self.WAITING)
615 def test_37_get_oc_service1(self):
616 response = test_utils.get_service_list_request("services/service1")
617 self.assertEqual(response.status_code, requests.codes.ok)
618 res = response.json()
620 res['services'][0]['administrative-state'],
623 res['services'][0]['service-name'], 'service1')
625 res['services'][0]['connection-type'], 'roadm-line')
627 res['services'][0]['lifecycle-state'], 'planned')
630 def test_38_check_xc1_ROADMA(self):
631 response = test_utils.check_netconf_node_request(
632 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
633 self.assertEqual(response.status_code, requests.codes.ok)
634 res = response.json()
635 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
636 self.assertDictEqual(
638 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
639 'opticalControlMode': 'gainLoss',
640 'target-output-power': -3.0
641 }, **res['roadm-connections'][0]),
642 res['roadm-connections'][0]
644 self.assertDictEqual(
645 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
646 res['roadm-connections'][0]['source'])
647 self.assertDictEqual(
648 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
649 res['roadm-connections'][0]['destination'])
652 def test_39_check_xc1_ROADMC(self):
653 response = test_utils.check_netconf_node_request(
654 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
655 self.assertEqual(response.status_code, requests.codes.ok)
656 res = response.json()
657 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
658 self.assertDictEqual(
660 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
661 'opticalControlMode': 'gainLoss',
662 'target-output-power': -3.0
663 }, **res['roadm-connections'][0]),
664 res['roadm-connections'][0]
666 self.assertDictEqual(
667 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
668 res['roadm-connections'][0]['source'])
669 self.assertDictEqual(
670 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
671 res['roadm-connections'][0]['destination'])
674 def test_40_create_oc_service2(self):
675 self.cr_serv_sample_data["input"]["service-name"] = "service2"
676 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
677 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
678 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
679 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
680 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
681 response = test_utils.service_create_request(self.cr_serv_sample_data)
682 self.assertEqual(response.status_code, requests.codes.ok)
683 res = response.json()
684 self.assertIn('PCE calculation in progress',
685 res['output']['configuration-response-common']['response-message'])
686 time.sleep(self.WAITING)
688 def test_41_get_oc_service2(self):
689 response = test_utils.get_service_list_request("services/service2")
690 self.assertEqual(response.status_code, requests.codes.ok)
691 res = response.json()
693 res['services'][0]['administrative-state'],
696 res['services'][0]['service-name'], 'service2')
698 res['services'][0]['connection-type'], 'roadm-line')
700 res['services'][0]['lifecycle-state'], 'planned')
703 def test_42_check_xc2_ROADMA(self):
704 response = test_utils.check_netconf_node_request(
705 "ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
706 self.assertEqual(response.status_code, requests.codes.ok)
707 res = response.json()
708 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
709 self.assertDictEqual(
711 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
712 'opticalControlMode': 'gainLoss',
713 'target-output-power': -3.0
714 }, **res['roadm-connections'][0]),
715 res['roadm-connections'][0]
717 self.assertDictEqual(
718 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
719 res['roadm-connections'][0]['source'])
720 self.assertDictEqual(
721 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
722 res['roadm-connections'][0]['destination'])
725 def test_43_check_topo_ROADMA(self):
726 self.test_26_check_topo_ROADMA_SRG1()
727 self.test_27_check_topo_ROADMA_DEG2()
730 def test_44_delete_oc_service1(self):
731 response = test_utils.service_delete_request("service1")
732 self.assertEqual(response.status_code, requests.codes.ok)
733 res = response.json()
734 self.assertIn('Renderer service delete in progress',
735 res['output']['configuration-response-common']['response-message'])
738 def test_45_delete_oc_service2(self):
739 response = test_utils.service_delete_request("service2")
740 self.assertEqual(response.status_code, requests.codes.ok)
741 res = response.json()
742 self.assertIn('Renderer service delete in progress',
743 res['output']['configuration-response-common']['response-message'])
746 def test_46_get_no_oc_services(self):
748 response = test_utils.get_service_list_request("")
749 self.assertEqual(response.status_code, requests.codes.conflict)
750 res = response.json()
752 {"error-type": "application", "error-tag": "data-missing",
753 "error-message": "Request could not be completed because the relevant data model content does not exist"},
754 res['errors']['error'])
757 def test_47_get_no_xc_ROADMA(self):
758 response = test_utils.check_netconf_node_request("ROADM-A1", "")
759 self.assertEqual(response.status_code, requests.codes.ok)
760 res = response.json()
761 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
764 def test_48_check_topo_ROADMA(self):
765 self.test_34_check_topo_ROADMA_SRG1()
766 self.test_35_check_topo_ROADMA_DEG2()
768 def test_49_loop_create_eth_service(self):
769 for i in range(1, 6):
770 # pylint: disable=consider-using-f-string
771 print("iteration number {}".format(i))
772 print("eth service creation")
773 self.test_11_create_eth_service1()
774 print("check xc in ROADM-A1")
775 self.test_13_check_xc1_ROADMA()
776 print("check xc in ROADM-C1")
777 self.test_14_check_xc1_ROADMC()
778 print("eth service deletion\n")
779 self.test_30_delete_eth_service1()
781 def test_50_loop_create_oc_service(self):
782 response = test_utils.get_service_list_request("services/service1")
783 if response.status_code != 404:
784 response = test_utils.service_delete_request("service1")
787 for i in range(1, 6):
788 # pylint: disable=consider-using-f-string
789 print("iteration number {}".format(i))
790 print("oc service creation")
791 self.test_36_create_oc_service1()
792 print("check xc in ROADM-A1")
793 self.test_38_check_xc1_ROADMA()
794 print("check xc in ROADM-C1")
795 self.test_39_check_xc1_ROADMC()
796 print("oc service deletion\n")
797 self.test_44_delete_oc_service1()
799 def test_51_disconnect_XPDRA(self):
800 response = test_utils.unmount_device("XPDR-A1")
801 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
803 def test_52_disconnect_XPDRC(self):
804 response = test_utils.unmount_device("XPDR-C1")
805 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
807 def test_53_disconnect_ROADMA(self):
808 response = test_utils.unmount_device("ROADM-A1")
809 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
811 def test_54_disconnect_ROADMC(self):
812 response = test_utils.unmount_device("ROADM-C1")
813 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
816 if __name__ == "__main__":
817 unittest.main(verbosity=2)