2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
27 cr_serv_sample_data = {"input": {
28 "sdnc-request-header": {
29 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
30 "rpc-action": "service-create",
31 "request-system-id": "appname",
33 "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
46 "ROUTER_SNJSCAMCJP8_000000.00_00",
47 "port-type": "router",
48 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
49 "port-rack": "000000.00",
54 "LGX Panel_SNJSCAMCJP8_000000.00_00",
55 "lgx-port-name": "LGX Back.3",
56 "lgx-port-rack": "000000.00",
57 "lgx-port-shelf": "00"
63 "ROUTER_SNJSCAMCJP8_000000.00_00",
64 "port-type": "router",
65 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
66 "port-rack": "000000.00",
71 "LGX Panel_SNJSCAMCJP8_000000.00_00",
72 "lgx-port-name": "LGX Back.4",
73 "lgx-port-rack": "000000.00",
74 "lgx-port-shelf": "00"
80 "service-rate": "100",
82 "service-format": "Ethernet",
83 "clli": "SNJSCAMCJT4",
87 "ROUTER_SNJSCAMCJT4_000000.00_00",
88 "port-type": "router",
89 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
90 "port-rack": "000000.00",
95 "LGX Panel_SNJSCAMCJT4_000000.00_00",
96 "lgx-port-name": "LGX Back.29",
97 "lgx-port-rack": "000000.00",
98 "lgx-port-shelf": "00"
104 "ROUTER_SNJSCAMCJT4_000000.00_00",
105 "port-type": "router",
106 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
107 "port-rack": "000000.00",
112 "LGX Panel_SNJSCAMCJT4_000000.00_00",
113 "lgx-port-name": "LGX Back.30",
114 "lgx-port-rack": "000000.00",
115 "lgx-port-shelf": "00"
120 "due-date": "2016-11-28T00:00:01Z",
121 "operator-contact": "pw1234"
126 NODE_VERSION = '1.2.1'
130 cls.processes = test_utils.start_tpce()
131 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
132 ('roadma-full', cls.NODE_VERSION),
133 ('roadmc-full', cls.NODE_VERSION),
134 ('xpdrc', cls.NODE_VERSION)])
137 def tearDownClass(cls):
138 # pylint: disable=not-an-iterable
139 for process in cls.processes:
140 test_utils.shutdown_process(process)
141 print("all processes killed")
143 def setUp(self): # instruction executed before each test method
144 print("execution of {}".format(self.id().split(".")[-1]))
146 # connect netconf devices
147 def test_01_connect_xpdrA(self):
148 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_02_connect_xpdrC(self):
152 response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_03_connect_rdmA(self):
156 response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
157 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
159 def test_04_connect_rdmC(self):
160 response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
161 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
163 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
164 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
165 "ROADMA01", "1", "SRG1-PP1-TXRX")
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertIn('Xponder Roadm Link created successfully',
169 res["output"]["result"])
172 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
173 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
174 "ROADMA01", "1", "SRG1-PP1-TXRX")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Roadm Xponder links created successfully',
178 res["output"]["result"])
181 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
182 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
183 "ROADMC01", "1", "SRG1-PP1-TXRX")
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 self.assertIn('Xponder Roadm Link created successfully',
187 res["output"]["result"])
190 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
191 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
192 "ROADMC01", "1", "SRG1-PP1-TXRX")
193 self.assertEqual(response.status_code, requests.codes.ok)
194 res = response.json()
195 self.assertIn('Roadm Xponder links created successfully',
196 res["output"]["result"])
199 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
200 # Config ROADMA-ROADMC oms-attributes
202 "auto-spanloss": "true",
203 "spanloss-base": 11.4,
204 "spanloss-current": 12,
205 "engineered-spanloss": 12.2,
206 "link-concatenation": [{
209 "SRLG-length": 100000,
211 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
212 self.assertEqual(response.status_code, requests.codes.created)
214 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
215 # Config ROADMC-ROADMA oms-attributes
217 "auto-spanloss": "true",
218 "spanloss-base": 11.4,
219 "spanloss-current": 12,
220 "engineered-spanloss": 12.2,
221 "link-concatenation": [{
224 "SRLG-length": 100000,
226 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
227 self.assertEqual(response.status_code, requests.codes.created)
229 # test service-create for Eth service from xpdr to xpdr
230 def test_11_create_eth_service1(self):
231 self.cr_serv_sample_data["input"]["service-name"] = "service1"
232 response = test_utils.service_create_request(self.cr_serv_sample_data)
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 self.assertIn('PCE calculation in progress',
236 res['output']['configuration-response-common'][
238 time.sleep(self.WAITING)
240 def test_12_get_eth_service1(self):
241 response = test_utils.get_service_list_request("services/service1")
242 self.assertEqual(response.status_code, requests.codes.ok)
243 res = response.json()
245 res['services'][0]['administrative-state'],
248 res['services'][0]['service-name'], 'service1')
250 res['services'][0]['connection-type'], 'service')
252 res['services'][0]['lifecycle-state'], 'planned')
255 def test_13_check_xc1_ROADMA(self):
256 response = test_utils.check_netconf_node_request(
257 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
261 self.assertDictEqual(
263 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
264 'wavelength-number': 1,
265 'opticalControlMode': 'gainLoss',
266 'target-output-power': -3.0
267 }, **res['roadm-connections'][0]),
268 res['roadm-connections'][0]
270 self.assertDictEqual(
271 {'src-if': 'SRG1-PP1-TXRX-761:768'},
272 res['roadm-connections'][0]['source'])
273 self.assertDictEqual(
274 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
275 res['roadm-connections'][0]['destination'])
278 def test_14_check_xc1_ROADMC(self):
279 response = test_utils.check_netconf_node_request(
280 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
281 self.assertEqual(response.status_code, requests.codes.ok)
282 res = response.json()
283 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
284 self.assertDictEqual(
286 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
287 'wavelength-number': 1,
288 'opticalControlMode': 'gainLoss',
289 'target-output-power': 2.0
290 }, **res['roadm-connections'][0]),
291 res['roadm-connections'][0]
293 self.assertDictEqual(
294 {'src-if': 'SRG1-PP1-TXRX-761:768'},
295 res['roadm-connections'][0]['source'])
296 self.assertDictEqual(
297 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
298 res['roadm-connections'][0]['destination'])
301 def test_15_check_topo_XPDRA(self):
302 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
305 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
307 if ele['tp-id'] == 'XPDR1-NETWORK1':
308 self.assertEqual({'frequency': 196.1,
310 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
311 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
312 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
313 if ele['tp-id'] == 'XPDR1-NETWORK2':
314 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
317 def test_16_check_topo_ROADMA_SRG1(self):
318 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
319 self.assertEqual(response.status_code, requests.codes.ok)
320 res = response.json()
321 freq_map = base64.b64decode(
322 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
323 freq_map_array = [int(x) for x in freq_map]
324 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
325 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
327 if ele['tp-id'] == 'SRG1-PP1-TXRX':
328 freq_map = base64.b64decode(
329 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
330 freq_map_array = [int(x) for x in freq_map]
331 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
332 if ele['tp-id'] == 'SRG1-PP2-TXRX':
333 self.assertNotIn('avail-freq-maps', dict.keys(ele))
336 def test_17_check_topo_ROADMA_DEG1(self):
337 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
338 self.assertEqual(response.status_code, requests.codes.ok)
339 res = response.json()
340 freq_map = base64.b64decode(
341 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
342 freq_map_array = [int(x) for x in freq_map]
343 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
344 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
346 if ele['tp-id'] == 'DEG2-CTP-TXRX':
347 freq_map = base64.b64decode(
348 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
349 freq_map_array = [int(x) for x in freq_map]
350 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
351 if ele['tp-id'] == 'DEG2-TTP-TXRX':
352 freq_map = base64.b64decode(
353 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
354 freq_map_array = [int(x) for x in freq_map]
355 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
358 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
359 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
360 "ROADMA01", "1", "SRG1-PP2-TXRX")
361 self.assertEqual(response.status_code, requests.codes.ok)
362 res = response.json()
363 self.assertIn('Xponder Roadm Link created successfully',
364 res["output"]["result"])
367 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
368 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
369 "ROADMA01", "1", "SRG1-PP2-TXRX")
370 self.assertEqual(response.status_code, requests.codes.ok)
371 res = response.json()
372 self.assertIn('Roadm Xponder links created successfully',
373 res["output"]["result"])
376 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
377 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
378 "ROADMC01", "1", "SRG1-PP2-TXRX")
379 self.assertEqual(response.status_code, requests.codes.ok)
380 res = response.json()
381 self.assertIn('Xponder Roadm Link created successfully',
382 res["output"]["result"])
385 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
386 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
387 "ROADMC01", "1", "SRG1-PP2-TXRX")
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 self.assertIn('Roadm Xponder links created successfully',
391 res["output"]["result"])
394 def test_22_create_eth_service2(self):
395 self.cr_serv_sample_data["input"]["service-name"] = "service2"
396 response = test_utils.service_create_request(self.cr_serv_sample_data)
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
399 self.assertIn('PCE calculation in progress',
400 res['output']['configuration-response-common'][
402 time.sleep(self.WAITING)
404 def test_23_get_eth_service2(self):
405 response = test_utils.get_service_list_request("services/service2")
406 self.assertEqual(response.status_code, requests.codes.ok)
407 res = response.json()
409 res['services'][0]['administrative-state'],
412 res['services'][0]['service-name'], 'service2')
414 res['services'][0]['connection-type'], 'service')
416 res['services'][0]['lifecycle-state'], 'planned')
419 def test_24_check_xc2_ROADMA(self):
420 response = test_utils.check_netconf_node_request(
421 "ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760")
422 self.assertEqual(response.status_code, requests.codes.ok)
423 res = response.json()
424 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
425 self.assertDictEqual(
427 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760',
428 'wavelength-number': 2,
429 'opticalControlMode': 'power'
430 }, **res['roadm-connections'][0]),
431 res['roadm-connections'][0]
433 self.assertDictEqual(
434 {'src-if': 'DEG1-TTP-TXRX-753:760'},
435 res['roadm-connections'][0]['source'])
436 self.assertDictEqual(
437 {'dst-if': 'SRG1-PP2-TXRX-753:760'},
438 res['roadm-connections'][0]['destination'])
440 def test_25_check_topo_XPDRA(self):
441 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
442 self.assertEqual(response.status_code, requests.codes.ok)
443 res = response.json()
444 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
446 if ele['tp-id'] == 'XPDR1-NETWORK1':
447 self.assertEqual({'frequency': 196.1,
449 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
450 if ele['tp-id'] == 'XPDR1-NETWORK2':
451 self.assertEqual({'frequency': 196.05,
453 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
454 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
455 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
458 def test_26_check_topo_ROADMA_SRG1(self):
459 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
460 self.assertEqual(response.status_code, requests.codes.ok)
461 res = response.json()
462 freq_map = base64.b64decode(
463 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
464 freq_map_array = [int(x) for x in freq_map]
465 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
466 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
467 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
469 if ele['tp-id'] == 'SRG1-PP1-TXRX':
470 freq_map = base64.b64decode(
471 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
472 freq_map_array = [int(x) for x in freq_map]
473 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
474 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
475 if ele['tp-id'] == 'SRG1-PP2-TXRX':
476 freq_map = base64.b64decode(
477 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
478 freq_map_array = [int(x) for x in freq_map]
479 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
480 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
481 if ele['tp-id'] == 'SRG1-PP3-TXRX':
482 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
485 def test_27_check_topo_ROADMA_DEG1(self):
486 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
487 self.assertEqual(response.status_code, requests.codes.ok)
488 res = response.json()
489 freq_map = base64.b64decode(
490 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
491 freq_map_array = [int(x) for x in freq_map]
492 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
493 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
494 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
496 if ele['tp-id'] == 'DEG2-CTP-TXRX':
497 freq_map = base64.b64decode(
498 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
499 freq_map_array = [int(x) for x in freq_map]
500 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
501 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
502 if ele['tp-id'] == 'DEG2-TTP-TXRX':
503 freq_map = base64.b64decode(
504 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
505 freq_map_array = [int(x) for x in freq_map]
506 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
507 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
510 # creation service test on a non-available resource
511 def test_28_create_eth_service3(self):
512 self.cr_serv_sample_data["input"]["service-name"] = "service3"
513 response = test_utils.service_create_request(self.cr_serv_sample_data)
514 self.assertEqual(response.status_code, requests.codes.ok)
515 res = response.json()
516 self.assertIn('PCE calculation in progress',
517 res['output']['configuration-response-common'][
519 self.assertIn('200', res['output']['configuration-response-common'][
521 time.sleep(self.WAITING)
523 # add a test that check the openroadm-service-list still only
524 # contains 2 elements
526 def test_29_delete_eth_service3(self):
527 response = test_utils.service_delete_request("service3")
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 self.assertIn('Service \'service3\' does not exist in datastore',
531 res['output']['configuration-response-common'][
533 self.assertIn('500', res['output']['configuration-response-common'][
537 def test_30_delete_eth_service1(self):
538 response = test_utils.service_delete_request("service1")
539 self.assertEqual(response.status_code, requests.codes.ok)
540 res = response.json()
541 self.assertIn('Renderer service delete in progress',
542 res['output']['configuration-response-common'][
546 def test_31_delete_eth_service2(self):
547 response = test_utils.service_delete_request("service2")
548 self.assertEqual(response.status_code, requests.codes.ok)
549 res = response.json()
550 self.assertIn('Renderer service delete in progress',
551 res['output']['configuration-response-common'][
555 def test_32_check_no_xc_ROADMA(self):
556 response = test_utils.check_netconf_node_request("ROADMA01", "")
557 res = response.json()
558 self.assertEqual(response.status_code, requests.codes.ok)
559 self.assertNotIn('roadm-connections',
560 dict.keys(res['org-openroadm-device']))
563 def test_33_check_topo_XPDRA(self):
564 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
565 self.assertEqual(response.status_code, requests.codes.ok)
566 res = response.json()
567 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
569 if ((ele['org-openroadm-common-network:tp-type'] ==
571 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
572 'tp-id'] == 'XPDR1-CLIENT3')):
574 'org-openroadm-network-topology:xpdr-client-attributes',
576 elif (ele['org-openroadm-common-network:tp-type'] ==
578 self.assertIn('tail-equipment-id', dict.keys(
579 ele['org-openroadm-network-topology:'
580 'xpdr-network-attributes']))
581 self.assertNotIn('wavelength', dict.keys(
582 ele['org-openroadm-network-topology:'
583 'xpdr-network-attributes']))
586 def test_34_check_topo_ROADMA_SRG1(self):
587 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
588 self.assertEqual(response.status_code, requests.codes.ok)
589 res = response.json()
590 freq_map = base64.b64decode(
591 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
592 freq_map_array = [int(x) for x in freq_map]
593 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
594 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
595 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
597 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
598 freq_map = base64.b64decode(
599 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
600 freq_map_array = [int(x) for x in freq_map]
601 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
602 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
603 elif ele['tp-id'] == 'SRG1-CP-TXRX':
604 freq_map = base64.b64decode(
605 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
606 freq_map_array = [int(x) for x in freq_map]
607 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
608 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
610 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
613 def test_35_check_topo_ROADMA_DEG1(self):
614 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
615 self.assertEqual(response.status_code, requests.codes.ok)
616 res = response.json()
617 freq_map = base64.b64decode(
618 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
619 freq_map_array = [int(x) for x in freq_map]
620 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
621 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
622 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
624 if ele['tp-id'] == 'DEG2-CTP-TXRX':
625 freq_map = base64.b64decode(
626 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
627 freq_map_array = [int(x) for x in freq_map]
628 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
629 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
630 if ele['tp-id'] == 'DEG2-TTP-TXRX':
631 freq_map = base64.b64decode(
632 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
633 freq_map_array = [int(x) for x in freq_map]
634 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
635 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
638 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
639 def test_36_create_oc_service1(self):
640 self.cr_serv_sample_data["input"]["service-name"] = "service1"
641 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
642 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
643 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
644 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
645 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
646 response = test_utils.service_create_request(self.cr_serv_sample_data)
647 self.assertEqual(response.status_code, requests.codes.ok)
648 res = response.json()
649 self.assertIn('PCE calculation in progress',
650 res['output']['configuration-response-common'][
652 time.sleep(self.WAITING)
654 def test_37_get_oc_service1(self):
655 response = test_utils.get_service_list_request("services/service1")
656 self.assertEqual(response.status_code, requests.codes.ok)
657 res = response.json()
659 res['services'][0]['administrative-state'],
662 res['services'][0]['service-name'], 'service1')
664 res['services'][0]['connection-type'], 'roadm-line')
666 res['services'][0]['lifecycle-state'], 'planned')
669 def test_38_check_xc1_ROADMA(self):
670 response = test_utils.check_netconf_node_request(
671 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
672 self.assertEqual(response.status_code, requests.codes.ok)
673 res = response.json()
674 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
675 self.assertDictEqual(
677 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
678 'wavelength-number': 1,
679 'opticalControlMode': 'gainLoss',
680 'target-output-power': -3.0
681 }, **res['roadm-connections'][0]),
682 res['roadm-connections'][0]
684 self.assertDictEqual(
685 {'src-if': 'SRG1-PP1-TXRX-761:768'},
686 res['roadm-connections'][0]['source'])
687 self.assertDictEqual(
688 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
689 res['roadm-connections'][0]['destination'])
692 def test_39_check_xc1_ROADMC(self):
693 response = test_utils.check_netconf_node_request(
694 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
695 self.assertEqual(response.status_code, requests.codes.ok)
696 res = response.json()
697 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
698 self.assertDictEqual(
700 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
701 'wavelength-number': 1,
702 'opticalControlMode': 'gainLoss',
703 'target-output-power': 2.0
704 }, **res['roadm-connections'][0]),
705 res['roadm-connections'][0]
707 self.assertDictEqual(
708 {'src-if': 'SRG1-PP1-TXRX-761:768'},
709 res['roadm-connections'][0]['source'])
710 self.assertDictEqual(
711 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
712 res['roadm-connections'][0]['destination'])
715 def test_40_create_oc_service2(self):
716 self.cr_serv_sample_data["input"]["service-name"] = "service2"
717 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
718 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
719 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
720 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
721 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
722 response = test_utils.service_create_request(self.cr_serv_sample_data)
723 self.assertEqual(response.status_code, requests.codes.ok)
724 res = response.json()
725 self.assertIn('PCE calculation in progress',
726 res['output']['configuration-response-common'][
728 time.sleep(self.WAITING)
730 def test_41_get_oc_service2(self):
731 response = test_utils.get_service_list_request("services/service2")
732 self.assertEqual(response.status_code, requests.codes.ok)
733 res = response.json()
735 res['services'][0]['administrative-state'],
738 res['services'][0]['service-name'], 'service2')
740 res['services'][0]['connection-type'], 'roadm-line')
742 res['services'][0]['lifecycle-state'], 'planned')
745 def test_42_check_xc2_ROADMA(self):
746 response = test_utils.check_netconf_node_request(
747 "ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760")
748 self.assertEqual(response.status_code, requests.codes.ok)
749 res = response.json()
750 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
751 self.assertDictEqual(
753 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760',
754 'wavelength-number': 2,
755 'opticalControlMode': 'gainLoss',
756 'target-output-power': -3.0
757 }, **res['roadm-connections'][0]),
758 res['roadm-connections'][0]
760 self.assertDictEqual(
761 {'src-if': 'SRG1-PP2-TXRX-753:760'},
762 res['roadm-connections'][0]['source'])
763 self.assertDictEqual(
764 {'dst-if': 'DEG1-TTP-TXRX-753:760'},
765 res['roadm-connections'][0]['destination'])
768 def test_43_check_topo_ROADMA(self):
769 self.test_26_check_topo_ROADMA_SRG1()
770 self.test_27_check_topo_ROADMA_DEG1()
773 def test_44_delete_oc_service1(self):
774 response = test_utils.service_delete_request("service1")
775 self.assertEqual(response.status_code, requests.codes.ok)
776 res = response.json()
777 self.assertIn('Renderer service delete in progress',
778 res['output']['configuration-response-common'][
782 def test_45_delete_oc_service2(self):
783 response = test_utils.service_delete_request("service2")
784 self.assertEqual(response.status_code, requests.codes.ok)
785 res = response.json()
786 self.assertIn('Renderer service delete in progress',
787 res['output']['configuration-response-common'][
791 def test_46_get_no_oc_services(self):
793 response = test_utils.get_service_list_request("")
794 self.assertEqual(response.status_code, requests.codes.conflict)
795 res = response.json()
798 "error-type": "application",
799 "error-tag": "data-missing",
801 "Request could not be completed because the relevant data "
802 "model content does not exist"
804 res['errors']['error'])
807 def test_47_get_no_xc_ROADMA(self):
808 response = test_utils.check_netconf_node_request("ROADMA01", "")
809 self.assertEqual(response.status_code, requests.codes.ok)
810 res = response.json()
811 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
814 def test_48_check_topo_ROADMA(self):
815 self.test_34_check_topo_ROADMA_SRG1()
816 self.test_35_check_topo_ROADMA_DEG1()
818 def test_49_loop_create_eth_service(self):
819 for i in range(1, 6):
820 print("iteration number {}".format(i))
821 print("eth service creation")
822 self.test_11_create_eth_service1()
823 print("check xc in ROADMA01")
824 self.test_13_check_xc1_ROADMA()
825 print("check xc in ROADMC01")
826 self.test_14_check_xc1_ROADMC()
827 print("eth service deletion\n")
828 self.test_30_delete_eth_service1()
830 def test_50_loop_create_oc_service(self):
831 response = test_utils.get_service_list_request("services/service1")
832 if response.status_code != 404:
833 response = test_utils.service_delete_request("service1")
836 for i in range(1, 6):
837 print("iteration number {}".format(i))
838 print("oc service creation")
839 self.test_36_create_oc_service1()
840 print("check xc in ROADMA01")
841 self.test_38_check_xc1_ROADMA()
842 print("check xc in ROADMC01")
843 self.test_39_check_xc1_ROADMC()
844 print("oc service deletion\n")
845 self.test_44_delete_oc_service1()
847 def test_51_disconnect_XPDRA(self):
848 response = test_utils.unmount_device("XPDRA01")
849 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
851 def test_52_disconnect_XPDRC(self):
852 response = test_utils.unmount_device("XPDRC01")
853 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
855 def test_53_disconnect_ROADMA(self):
856 response = test_utils.unmount_device("ROADMA01")
857 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
859 def test_54_disconnect_ROADMC(self):
860 response = test_utils.unmount_device("ROADMC01")
861 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
864 if __name__ == "__main__":
865 unittest.main(verbosity=2)