2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
27 cr_serv_sample_data = {"input": {
28 "sdnc-request-header": {
29 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
30 "rpc-action": "service-create",
31 "request-system-id": "appname",
33 "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
46 "ROUTER_SNJSCAMCJP8_000000.00_00",
47 "port-type": "router",
48 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
49 "port-rack": "000000.00",
54 "LGX Panel_SNJSCAMCJP8_000000.00_00",
55 "lgx-port-name": "LGX Back.3",
56 "lgx-port-rack": "000000.00",
57 "lgx-port-shelf": "00"
63 "ROUTER_SNJSCAMCJP8_000000.00_00",
64 "port-type": "router",
65 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
66 "port-rack": "000000.00",
71 "LGX Panel_SNJSCAMCJP8_000000.00_00",
72 "lgx-port-name": "LGX Back.4",
73 "lgx-port-rack": "000000.00",
74 "lgx-port-shelf": "00"
80 "service-rate": "100",
82 "service-format": "Ethernet",
83 "clli": "SNJSCAMCJT4",
87 "ROUTER_SNJSCAMCJT4_000000.00_00",
88 "port-type": "router",
89 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
90 "port-rack": "000000.00",
95 "LGX Panel_SNJSCAMCJT4_000000.00_00",
96 "lgx-port-name": "LGX Back.29",
97 "lgx-port-rack": "000000.00",
98 "lgx-port-shelf": "00"
104 "ROUTER_SNJSCAMCJT4_000000.00_00",
105 "port-type": "router",
106 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
107 "port-rack": "000000.00",
112 "LGX Panel_SNJSCAMCJT4_000000.00_00",
113 "lgx-port-name": "LGX Back.30",
114 "lgx-port-rack": "000000.00",
115 "lgx-port-shelf": "00"
120 "due-date": "2016-11-28T00:00:01Z",
121 "operator-contact": "pw1234"
126 NODE_VERSION = '1.2.1'
130 cls.processes = test_utils.start_tpce()
131 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
132 ('roadma-full', cls.NODE_VERSION),
133 ('roadmc-full', cls.NODE_VERSION),
134 ('xpdrc', cls.NODE_VERSION)])
137 def tearDownClass(cls):
138 # pylint: disable=not-an-iterable
139 for process in cls.processes:
140 test_utils.shutdown_process(process)
141 print("all processes killed")
143 def setUp(self): # instruction executed before each test method
144 # pylint: disable=consider-using-f-string
145 print("execution of {}".format(self.id().split(".")[-1]))
147 # connect netconf devices
148 def test_01_connect_xpdrA(self):
149 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_02_connect_xpdrC(self):
153 response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_03_connect_rdmA(self):
157 response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
158 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_04_connect_rdmC(self):
161 response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
162 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
164 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
165 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
166 "ROADMA01", "1", "SRG1-PP1-TXRX")
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Xponder Roadm Link created successfully',
170 res["output"]["result"])
173 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
174 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
175 "ROADMA01", "1", "SRG1-PP1-TXRX")
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
178 self.assertIn('Roadm Xponder links created successfully',
179 res["output"]["result"])
182 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
183 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
184 "ROADMC01", "1", "SRG1-PP1-TXRX")
185 self.assertEqual(response.status_code, requests.codes.ok)
186 res = response.json()
187 self.assertIn('Xponder Roadm Link created successfully',
188 res["output"]["result"])
191 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
192 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
193 "ROADMC01", "1", "SRG1-PP1-TXRX")
194 self.assertEqual(response.status_code, requests.codes.ok)
195 res = response.json()
196 self.assertIn('Roadm Xponder links created successfully',
197 res["output"]["result"])
200 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
201 # Config ROADMA-ROADMC oms-attributes
203 "auto-spanloss": "true",
204 "spanloss-base": 11.4,
205 "spanloss-current": 12,
206 "engineered-spanloss": 12.2,
207 "link-concatenation": [{
210 "SRLG-length": 100000,
212 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
213 self.assertEqual(response.status_code, requests.codes.created)
215 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
216 # Config ROADMC-ROADMA oms-attributes
218 "auto-spanloss": "true",
219 "spanloss-base": 11.4,
220 "spanloss-current": 12,
221 "engineered-spanloss": 12.2,
222 "link-concatenation": [{
225 "SRLG-length": 100000,
227 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
228 self.assertEqual(response.status_code, requests.codes.created)
230 # test service-create for Eth service from xpdr to xpdr
231 def test_11_create_eth_service1(self):
232 self.cr_serv_sample_data["input"]["service-name"] = "service1"
233 response = test_utils.service_create_request(self.cr_serv_sample_data)
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
236 self.assertIn('PCE calculation in progress',
237 res['output']['configuration-response-common'][
239 time.sleep(self.WAITING)
241 def test_12_get_eth_service1(self):
242 response = test_utils.get_service_list_request("services/service1")
243 self.assertEqual(response.status_code, requests.codes.ok)
244 res = response.json()
246 res['services'][0]['administrative-state'],
249 res['services'][0]['service-name'], 'service1')
251 res['services'][0]['connection-type'], 'service')
253 res['services'][0]['lifecycle-state'], 'planned')
256 def test_13_check_xc1_ROADMA(self):
257 response = test_utils.check_netconf_node_request(
258 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
259 self.assertEqual(response.status_code, requests.codes.ok)
260 res = response.json()
261 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
262 self.assertDictEqual(
264 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
265 'wavelength-number': 1,
266 'opticalControlMode': 'gainLoss',
267 'target-output-power': -3.0
268 }, **res['roadm-connections'][0]),
269 res['roadm-connections'][0]
271 self.assertDictEqual(
272 {'src-if': 'SRG1-PP1-TXRX-761:768'},
273 res['roadm-connections'][0]['source'])
274 self.assertDictEqual(
275 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
276 res['roadm-connections'][0]['destination'])
279 def test_14_check_xc1_ROADMC(self):
280 response = test_utils.check_netconf_node_request(
281 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
282 self.assertEqual(response.status_code, requests.codes.ok)
283 res = response.json()
284 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
285 self.assertDictEqual(
287 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
288 'wavelength-number': 1,
289 'opticalControlMode': 'gainLoss',
290 'target-output-power': 2.0
291 }, **res['roadm-connections'][0]),
292 res['roadm-connections'][0]
294 self.assertDictEqual(
295 {'src-if': 'SRG1-PP1-TXRX-761:768'},
296 res['roadm-connections'][0]['source'])
297 self.assertDictEqual(
298 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
299 res['roadm-connections'][0]['destination'])
302 def test_15_check_topo_XPDRA(self):
303 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
304 self.assertEqual(response.status_code, requests.codes.ok)
305 res = response.json()
306 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
308 if ele['tp-id'] == 'XPDR1-NETWORK1':
309 self.assertEqual({'frequency': 196.1,
311 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
312 elif ele['tp-id'] in ('XPDR1-CLIENT2', 'XPDR1-CLIENT1'):
313 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
314 elif ele['tp-id'] == 'XPDR1-NETWORK2':
315 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
318 def test_16_check_topo_ROADMA_SRG1(self):
319 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
320 self.assertEqual(response.status_code, requests.codes.ok)
321 res = response.json()
322 freq_map = base64.b64decode(
323 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
324 freq_map_array = [int(x) for x in freq_map]
325 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
326 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
328 if ele['tp-id'] == 'SRG1-PP1-TXRX':
329 freq_map = base64.b64decode(
330 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
331 freq_map_array = [int(x) for x in freq_map]
332 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
333 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
334 self.assertNotIn('avail-freq-maps', dict.keys(ele))
337 def test_17_check_topo_ROADMA_DEG1(self):
338 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
339 self.assertEqual(response.status_code, requests.codes.ok)
340 res = response.json()
341 freq_map = base64.b64decode(
342 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
343 freq_map_array = [int(x) for x in freq_map]
344 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
345 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
347 if ele['tp-id'] == 'DEG2-CTP-TXRX':
348 freq_map = base64.b64decode(
349 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
350 freq_map_array = [int(x) for x in freq_map]
351 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
352 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
353 freq_map = base64.b64decode(
354 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
355 freq_map_array = [int(x) for x in freq_map]
356 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
359 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
360 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
361 "ROADMA01", "1", "SRG1-PP2-TXRX")
362 self.assertEqual(response.status_code, requests.codes.ok)
363 res = response.json()
364 self.assertIn('Xponder Roadm Link created successfully',
365 res["output"]["result"])
368 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
369 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
370 "ROADMA01", "1", "SRG1-PP2-TXRX")
371 self.assertEqual(response.status_code, requests.codes.ok)
372 res = response.json()
373 self.assertIn('Roadm Xponder links created successfully',
374 res["output"]["result"])
377 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
378 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
379 "ROADMC01", "1", "SRG1-PP2-TXRX")
380 self.assertEqual(response.status_code, requests.codes.ok)
381 res = response.json()
382 self.assertIn('Xponder Roadm Link created successfully',
383 res["output"]["result"])
386 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
387 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
388 "ROADMC01", "1", "SRG1-PP2-TXRX")
389 self.assertEqual(response.status_code, requests.codes.ok)
390 res = response.json()
391 self.assertIn('Roadm Xponder links created successfully',
392 res["output"]["result"])
395 def test_22_create_eth_service2(self):
396 self.cr_serv_sample_data["input"]["service-name"] = "service2"
397 response = test_utils.service_create_request(self.cr_serv_sample_data)
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 self.assertIn('PCE calculation in progress',
401 res['output']['configuration-response-common'][
403 time.sleep(self.WAITING)
405 def test_23_get_eth_service2(self):
406 response = test_utils.get_service_list_request("services/service2")
407 self.assertEqual(response.status_code, requests.codes.ok)
408 res = response.json()
410 res['services'][0]['administrative-state'],
413 res['services'][0]['service-name'], 'service2')
415 res['services'][0]['connection-type'], 'service')
417 res['services'][0]['lifecycle-state'], 'planned')
420 def test_24_check_xc2_ROADMA(self):
421 response = test_utils.check_netconf_node_request(
422 "ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760")
423 self.assertEqual(response.status_code, requests.codes.ok)
424 res = response.json()
425 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
426 self.assertDictEqual(
428 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760',
429 'wavelength-number': 2,
430 'opticalControlMode': 'power'
431 }, **res['roadm-connections'][0]),
432 res['roadm-connections'][0]
434 self.assertDictEqual(
435 {'src-if': 'DEG1-TTP-TXRX-753:760'},
436 res['roadm-connections'][0]['source'])
437 self.assertDictEqual(
438 {'dst-if': 'SRG1-PP2-TXRX-753:760'},
439 res['roadm-connections'][0]['destination'])
441 def test_25_check_topo_XPDRA(self):
442 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
447 if ele['tp-id'] == 'XPDR1-NETWORK1':
448 self.assertEqual({'frequency': 196.1,
450 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
451 elif ele['tp-id'] == 'XPDR1-NETWORK2':
452 self.assertEqual({'frequency': 196.05,
454 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
455 elif ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT2'):
456 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
459 def test_26_check_topo_ROADMA_SRG1(self):
460 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
461 self.assertEqual(response.status_code, requests.codes.ok)
462 res = response.json()
463 freq_map = base64.b64decode(
464 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
465 freq_map_array = [int(x) for x in freq_map]
466 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
467 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
468 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
470 if ele['tp-id'] == 'SRG1-PP1-TXRX':
471 freq_map = base64.b64decode(
472 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
473 freq_map_array = [int(x) for x in freq_map]
474 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
475 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
476 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
477 freq_map = base64.b64decode(
478 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
479 freq_map_array = [int(x) for x in freq_map]
480 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
481 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
482 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
483 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
486 def test_27_check_topo_ROADMA_DEG1(self):
487 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
488 self.assertEqual(response.status_code, requests.codes.ok)
489 res = response.json()
490 freq_map = base64.b64decode(
491 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
492 freq_map_array = [int(x) for x in freq_map]
493 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
494 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
495 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
497 if ele['tp-id'] == 'DEG2-CTP-TXRX':
498 freq_map = base64.b64decode(
499 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
500 freq_map_array = [int(x) for x in freq_map]
501 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
502 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
503 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
504 freq_map = base64.b64decode(
505 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
506 freq_map_array = [int(x) for x in freq_map]
507 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
508 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
511 # creation service test on a non-available resource
512 def test_28_create_eth_service3(self):
513 self.cr_serv_sample_data["input"]["service-name"] = "service3"
514 response = test_utils.service_create_request(self.cr_serv_sample_data)
515 self.assertEqual(response.status_code, requests.codes.ok)
516 res = response.json()
517 self.assertIn('PCE calculation in progress',
518 res['output']['configuration-response-common'][
520 self.assertIn('200', res['output']['configuration-response-common'][
522 time.sleep(self.WAITING)
524 # add a test that check the openroadm-service-list still only
525 # contains 2 elements
527 def test_29_delete_eth_service3(self):
528 response = test_utils.service_delete_request("service3")
529 self.assertEqual(response.status_code, requests.codes.ok)
530 res = response.json()
531 self.assertIn('Service \'service3\' does not exist in datastore',
532 res['output']['configuration-response-common'][
534 self.assertIn('500', res['output']['configuration-response-common'][
538 def test_30_delete_eth_service1(self):
539 response = test_utils.service_delete_request("service1")
540 self.assertEqual(response.status_code, requests.codes.ok)
541 res = response.json()
542 self.assertIn('Renderer service delete in progress',
543 res['output']['configuration-response-common'][
547 def test_31_delete_eth_service2(self):
548 response = test_utils.service_delete_request("service2")
549 self.assertEqual(response.status_code, requests.codes.ok)
550 res = response.json()
551 self.assertIn('Renderer service delete in progress',
552 res['output']['configuration-response-common'][
556 def test_32_check_no_xc_ROADMA(self):
557 response = test_utils.check_netconf_node_request("ROADMA01", "")
558 res = response.json()
559 self.assertEqual(response.status_code, requests.codes.ok)
560 self.assertNotIn('roadm-connections',
561 dict.keys(res['org-openroadm-device']))
564 def test_33_check_topo_XPDRA(self):
565 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
566 self.assertEqual(response.status_code, requests.codes.ok)
567 res = response.json()
568 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
570 if (ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT'
571 and ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT3')):
573 'org-openroadm-network-topology:xpdr-client-attributes',
575 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
576 self.assertIn('tail-equipment-id', dict.keys(
577 ele['org-openroadm-network-topology:'
578 'xpdr-network-attributes']))
579 self.assertNotIn('wavelength', dict.keys(
580 ele['org-openroadm-network-topology:'
581 'xpdr-network-attributes']))
584 def test_34_check_topo_ROADMA_SRG1(self):
585 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
586 self.assertEqual(response.status_code, requests.codes.ok)
587 res = response.json()
588 freq_map = base64.b64decode(
589 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
590 freq_map_array = [int(x) for x in freq_map]
591 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
592 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
593 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
595 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
596 freq_map = base64.b64decode(
597 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
598 freq_map_array = [int(x) for x in freq_map]
599 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
600 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
601 elif ele['tp-id'] == 'SRG1-CP-TXRX':
602 freq_map = base64.b64decode(
603 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
604 freq_map_array = [int(x) for x in freq_map]
605 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
606 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
608 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
611 def test_35_check_topo_ROADMA_DEG1(self):
612 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
613 self.assertEqual(response.status_code, requests.codes.ok)
614 res = response.json()
615 freq_map = base64.b64decode(
616 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
617 freq_map_array = [int(x) for x in freq_map]
618 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
619 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
620 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
622 if ele['tp-id'] == 'DEG2-CTP-TXRX':
623 freq_map = base64.b64decode(
624 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
625 freq_map_array = [int(x) for x in freq_map]
626 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
627 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
628 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
629 freq_map = base64.b64decode(
630 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
631 freq_map_array = [int(x) for x in freq_map]
632 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
633 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
636 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
637 def test_36_create_oc_service1(self):
638 self.cr_serv_sample_data["input"]["service-name"] = "service1"
639 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
640 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
641 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
642 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
643 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
644 response = test_utils.service_create_request(self.cr_serv_sample_data)
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
647 self.assertIn('PCE calculation in progress',
648 res['output']['configuration-response-common'][
650 time.sleep(self.WAITING)
652 def test_37_get_oc_service1(self):
653 response = test_utils.get_service_list_request("services/service1")
654 self.assertEqual(response.status_code, requests.codes.ok)
655 res = response.json()
657 res['services'][0]['administrative-state'],
660 res['services'][0]['service-name'], 'service1')
662 res['services'][0]['connection-type'], 'roadm-line')
664 res['services'][0]['lifecycle-state'], 'planned')
667 def test_38_check_xc1_ROADMA(self):
668 response = test_utils.check_netconf_node_request(
669 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
670 self.assertEqual(response.status_code, requests.codes.ok)
671 res = response.json()
672 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
673 self.assertDictEqual(
675 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
676 'wavelength-number': 1,
677 'opticalControlMode': 'gainLoss',
678 'target-output-power': -3.0
679 }, **res['roadm-connections'][0]),
680 res['roadm-connections'][0]
682 self.assertDictEqual(
683 {'src-if': 'SRG1-PP1-TXRX-761:768'},
684 res['roadm-connections'][0]['source'])
685 self.assertDictEqual(
686 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
687 res['roadm-connections'][0]['destination'])
690 def test_39_check_xc1_ROADMC(self):
691 response = test_utils.check_netconf_node_request(
692 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
693 self.assertEqual(response.status_code, requests.codes.ok)
694 res = response.json()
695 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
696 self.assertDictEqual(
698 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
699 'wavelength-number': 1,
700 'opticalControlMode': 'gainLoss',
701 'target-output-power': 2.0
702 }, **res['roadm-connections'][0]),
703 res['roadm-connections'][0]
705 self.assertDictEqual(
706 {'src-if': 'SRG1-PP1-TXRX-761:768'},
707 res['roadm-connections'][0]['source'])
708 self.assertDictEqual(
709 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
710 res['roadm-connections'][0]['destination'])
713 def test_40_create_oc_service2(self):
714 self.cr_serv_sample_data["input"]["service-name"] = "service2"
715 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
716 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
717 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
718 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
719 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
720 response = test_utils.service_create_request(self.cr_serv_sample_data)
721 self.assertEqual(response.status_code, requests.codes.ok)
722 res = response.json()
723 self.assertIn('PCE calculation in progress',
724 res['output']['configuration-response-common'][
726 time.sleep(self.WAITING)
728 def test_41_get_oc_service2(self):
729 response = test_utils.get_service_list_request("services/service2")
730 self.assertEqual(response.status_code, requests.codes.ok)
731 res = response.json()
733 res['services'][0]['administrative-state'],
736 res['services'][0]['service-name'], 'service2')
738 res['services'][0]['connection-type'], 'roadm-line')
740 res['services'][0]['lifecycle-state'], 'planned')
743 def test_42_check_xc2_ROADMA(self):
744 response = test_utils.check_netconf_node_request(
745 "ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760")
746 self.assertEqual(response.status_code, requests.codes.ok)
747 res = response.json()
748 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
749 self.assertDictEqual(
751 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760',
752 'wavelength-number': 2,
753 'opticalControlMode': 'gainLoss',
754 'target-output-power': -3.0
755 }, **res['roadm-connections'][0]),
756 res['roadm-connections'][0]
758 self.assertDictEqual(
759 {'src-if': 'SRG1-PP2-TXRX-753:760'},
760 res['roadm-connections'][0]['source'])
761 self.assertDictEqual(
762 {'dst-if': 'DEG1-TTP-TXRX-753:760'},
763 res['roadm-connections'][0]['destination'])
766 def test_43_check_topo_ROADMA(self):
767 self.test_26_check_topo_ROADMA_SRG1()
768 self.test_27_check_topo_ROADMA_DEG1()
771 def test_44_delete_oc_service1(self):
772 response = test_utils.service_delete_request("service1")
773 self.assertEqual(response.status_code, requests.codes.ok)
774 res = response.json()
775 self.assertIn('Renderer service delete in progress',
776 res['output']['configuration-response-common'][
780 def test_45_delete_oc_service2(self):
781 response = test_utils.service_delete_request("service2")
782 self.assertEqual(response.status_code, requests.codes.ok)
783 res = response.json()
784 self.assertIn('Renderer service delete in progress',
785 res['output']['configuration-response-common'][
789 def test_46_get_no_oc_services(self):
791 response = test_utils.get_service_list_request("")
792 self.assertEqual(response.status_code, requests.codes.conflict)
793 res = response.json()
796 "error-type": "application",
797 "error-tag": "data-missing",
799 "Request could not be completed because the relevant data "
800 "model content does not exist"
802 res['errors']['error'])
805 def test_47_get_no_xc_ROADMA(self):
806 response = test_utils.check_netconf_node_request("ROADMA01", "")
807 self.assertEqual(response.status_code, requests.codes.ok)
808 res = response.json()
809 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
812 def test_48_check_topo_ROADMA(self):
813 self.test_34_check_topo_ROADMA_SRG1()
814 self.test_35_check_topo_ROADMA_DEG1()
816 def test_49_loop_create_eth_service(self):
817 # pylint: disable=consider-using-f-string
818 for i in range(1, 6):
819 print("iteration number {}".format(i))
820 print("eth service creation")
821 self.test_11_create_eth_service1()
822 print("check xc in ROADMA01")
823 self.test_13_check_xc1_ROADMA()
824 print("check xc in ROADMC01")
825 self.test_14_check_xc1_ROADMC()
826 print("eth service deletion\n")
827 self.test_30_delete_eth_service1()
829 def test_50_loop_create_oc_service(self):
830 response = test_utils.get_service_list_request("services/service1")
831 if response.status_code != 404:
832 response = test_utils.service_delete_request("service1")
835 # pylint: disable=consider-using-f-string
836 for i in range(1, 6):
837 print("iteration number {}".format(i))
838 print("oc service creation")
839 self.test_36_create_oc_service1()
840 print("check xc in ROADMA01")
841 self.test_38_check_xc1_ROADMA()
842 print("check xc in ROADMC01")
843 self.test_39_check_xc1_ROADMC()
844 print("oc service deletion\n")
845 self.test_44_delete_oc_service1()
847 def test_51_disconnect_XPDRA(self):
848 response = test_utils.unmount_device("XPDRA01")
849 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
851 def test_52_disconnect_XPDRC(self):
852 response = test_utils.unmount_device("XPDRC01")
853 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
855 def test_53_disconnect_ROADMA(self):
856 response = test_utils.unmount_device("ROADMA01")
857 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
859 def test_54_disconnect_ROADMC(self):
860 response = test_utils.unmount_device("ROADMC01")
861 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
864 if __name__ == "__main__":
865 unittest.main(verbosity=2)