2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
29 cr_serv_sample_data = {"input": {
30 "sdnc-request-header": {
31 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
32 "rpc-action": "service-create",
33 "request-system-id": "appname",
34 "notification-url": "http://localhost:8585/NotificationServer/notify"
36 "service-name": "service1",
37 "common-id": "ASATT1234567",
38 "connection-type": "service",
40 "service-rate": "100",
42 "service-format": "Ethernet",
46 "port-device-name": "XPDR-A1-XPDR1",
48 "port-name": "1/0/2-PLUG-CLIENT",
49 "port-rack": "000000.00",
53 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
54 "lgx-port-name": "LGX Back.3",
55 "lgx-port-rack": "000000.00",
56 "lgx-port-shelf": "00"
62 "port-device-name": "XPDR-A1-XPDR1",
64 "port-name": "1/0/2-PLUG-CLIENT",
65 "port-rack": "000000.00",
69 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
70 "lgx-port-name": "LGX Back.4",
71 "lgx-port-rack": "000000.00",
72 "lgx-port-shelf": "00"
79 "service-rate": "100",
81 "service-format": "Ethernet",
85 "port-device-name": "XPDR-C1-XPDR2",
86 "port-type": "router",
87 "port-name": "1/0/2-PLUG-CLIENT",
88 "port-rack": "000000.00",
92 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
93 "lgx-port-name": "LGX Back.29",
94 "lgx-port-rack": "000000.00",
95 "lgx-port-shelf": "00"
101 "port-device-name": "XPDR-C1-XPDR2",
102 "port-type": "router",
103 "port-name": "1/0/2-PLUG-CLIENT",
104 "port-rack": "000000.00",
108 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
109 "lgx-port-name": "LGX Back.29",
110 "lgx-port-rack": "000000.00",
111 "lgx-port-shelf": "00"
117 "due-date": "2016-11-28T00:00:01Z",
118 "operator-contact": "pw1234"
122 WAITING = 20 # nominal value is 300
123 NODE_VERSION = '2.2.1'
127 cls.processes = test_utils.start_tpce()
128 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
129 ('roadma', cls.NODE_VERSION),
130 ('roadmc', cls.NODE_VERSION),
131 ('xpdrc', cls.NODE_VERSION)])
134 def tearDownClass(cls):
135 # pylint: disable=not-an-iterable
136 for process in cls.processes:
137 test_utils.shutdown_process(process)
138 print("all processes killed")
140 def setUp(self): # instruction executed before each test method
141 # pylint: disable=consider-using-f-string
142 print("execution of {}".format(self.id().split(".")[-1]))
144 def test_01_connect_xpdrA(self):
145 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_02_connect_xpdrC(self):
149 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_03_connect_rdmA(self):
153 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_04_connect_rdmC(self):
157 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
158 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
161 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
162 "ROADM-A1", "1", "SRG1-PP1-TXRX")
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
165 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
168 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
169 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
170 "ROADM-A1", "1", "SRG1-PP1-TXRX")
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
173 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
176 def test_07_connect_xprdC_xpdr1_N1_to_roadmC_PP1(self):
177 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
178 "ROADM-C1", "1", "SRG1-PP1-TXRX")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
184 def test_08_connect_roadmC_PP1_to_xpdrC_xprd1_N1(self):
185 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
186 "ROADM-C1", "1", "SRG1-PP1-TXRX")
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
189 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
192 def test_09_connect_xprdA_N2_to_roadmA_PP2(self):
193 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
194 "ROADM-A1", "1", "SRG1-PP2-TXRX")
195 self.assertEqual(response.status_code, requests.codes.ok)
196 res = response.json()
197 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
200 def test_10_connect_roadmA_PP2_to_xpdrA_N2(self):
201 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
202 "ROADM-A1", "1", "SRG1-PP2-TXRX")
203 self.assertEqual(response.status_code, requests.codes.ok)
204 res = response.json()
205 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
208 def test_11_connect_xprdC_xpdr2_N1_to_roadmC_PP2(self):
209 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "2", "1",
210 "ROADM-C1", "1", "SRG1-PP2-TXRX")
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
213 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
216 def test_12_connect_roadmC_PP2_to_xpdrC_xpdr2_N1(self):
217 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "2", "1",
218 "ROADM-C1", "1", "SRG1-PP2-TXRX")
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
224 def test_13_add_omsAttributes_ROADMA_ROADMC(self):
225 # Config ROADMA-ROADMC oms-attributes
227 "auto-spanloss": "true",
228 "spanloss-base": 11.4,
229 "spanloss-current": 12,
230 "engineered-spanloss": 12.2,
231 "link-concatenation": [{
234 "SRLG-length": 100000,
236 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
237 self.assertEqual(response.status_code, requests.codes.created)
239 def test_14_add_omsAttributes_ROADMC_ROADMA(self):
240 # Config ROADMC-ROADMA oms-attributes
242 "auto-spanloss": "true",
243 "spanloss-base": 11.4,
244 "spanloss-current": 12,
245 "engineered-spanloss": 12.2,
246 "link-concatenation": [{
249 "SRLG-length": 100000,
251 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
252 self.assertEqual(response.status_code, requests.codes.created)
254 # test service-create for Eth service from xpdr to xpdr
255 def test_15_create_eth_service2(self):
256 self.cr_serv_sample_data["input"]["service-name"] = "service2"
257 response = test_utils.service_create_request(self.cr_serv_sample_data)
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 self.assertIn('PCE calculation in progress',
261 res['output']['configuration-response-common']['response-message'])
262 time.sleep(self.WAITING)
264 def test_16_get_eth_service1(self):
265 response = test_utils.get_service_list_request("services/service2")
266 self.assertEqual(response.status_code, requests.codes.ok)
267 res = response.json()
269 res['services'][0]['administrative-state'], 'inService')
271 res['services'][0]['service-name'], 'service2')
273 res['services'][0]['connection-type'], 'service')
275 res['services'][0]['lifecycle-state'], 'planned')
278 def test_17_check_xc1_ROADMA(self):
279 response = test_utils.check_netconf_node_request(
280 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
281 self.assertEqual(response.status_code, requests.codes.ok)
282 res = response.json()
283 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
284 self.assertDictEqual(
286 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
287 'opticalControlMode': 'gainLoss',
288 'target-output-power': -3.0
289 }, **res['roadm-connections'][0]),
290 res['roadm-connections'][0]
292 self.assertDictEqual(
293 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
294 res['roadm-connections'][0]['source'])
295 self.assertDictEqual(
296 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
297 res['roadm-connections'][0]['destination'])
300 def test_18_check_xc1_ROADMC(self):
301 response = test_utils.check_netconf_node_request(
302 "ROADM-C1", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-761:768")
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
305 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
306 self.assertDictEqual(
308 'connection-name': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-761:768',
309 'opticalControlMode': 'gainLoss',
310 'target-output-power': -3.0
311 }, **res['roadm-connections'][0]),
312 res['roadm-connections'][0]
314 self.assertDictEqual(
315 {'src-if': 'SRG1-PP2-TXRX-nmc-761:768'},
316 res['roadm-connections'][0]['source'])
317 self.assertDictEqual(
318 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
319 res['roadm-connections'][0]['destination'])
322 def test_19_check_topo_XPDRA(self):
323 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
328 if ele['tp-id'] == 'XPDR1-NETWORK1':
329 self.assertEqual({'frequency': 196.1,
331 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
332 elif ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT2'):
333 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
334 elif ele['tp-id'] == 'XPDR1-NETWORK2':
335 self.assertIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
338 def test_20_check_topo_ROADMA_SRG1(self):
339 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 freq_map = base64.b64decode(
343 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
344 freq_map_array = [int(x) for x in freq_map]
345 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
346 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
348 if ele['tp-id'] == 'SRG1-PP1-TXRX':
349 freq_map = base64.b64decode(
350 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
351 freq_map_array = [int(x) for x in freq_map]
352 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
353 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
354 self.assertNotIn('avail-freq-maps', dict.keys(ele))
357 def test_21_check_topo_ROADMA_DEG2(self):
358 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
359 self.assertEqual(response.status_code, requests.codes.ok)
360 res = response.json()
361 freq_map = base64.b64decode(
362 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
363 freq_map_array = [int(x) for x in freq_map]
364 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
365 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
367 if ele['tp-id'] == 'DEG2-CTP-TXRX':
368 freq_map = base64.b64decode(
369 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
370 freq_map_array = [int(x) for x in freq_map]
371 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
372 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
373 freq_map = base64.b64decode(
374 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
375 freq_map_array = [int(x) for x in freq_map]
376 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
379 def test_22_create_eth_service1(self):
380 self.cr_serv_sample_data["input"]["service-name"] = "service1"
381 del self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"]
382 del self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"]
383 del self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"]
384 del self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"]
385 del self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"]
386 del self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"]
387 del self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"]
388 del self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"]
390 response = test_utils.service_create_request(self.cr_serv_sample_data)
391 self.assertEqual(response.status_code, requests.codes.ok)
392 res = response.json()
393 self.assertIn('PCE calculation in progress',
394 res['output']['configuration-response-common']['response-message'])
395 time.sleep(self.WAITING)
397 def test_23_get_eth_service1(self):
398 response = test_utils.get_service_list_request("services/service1")
399 self.assertEqual(response.status_code, requests.codes.ok)
400 res = response.json()
402 res['services'][0]['administrative-state'],
405 res['services'][0]['service-name'], 'service1')
407 res['services'][0]['connection-type'], 'service')
409 res['services'][0]['lifecycle-state'], 'planned')
412 def test_24_check_xc1_ROADMA(self):
413 response = test_utils.check_netconf_node_request(
414 "ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
418 self.assertDictEqual(
420 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
421 'opticalControlMode': 'power'
422 }, **res['roadm-connections'][0]),
423 res['roadm-connections'][0]
425 self.assertDictEqual(
426 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
427 res['roadm-connections'][0]['source'])
428 self.assertDictEqual(
429 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
430 res['roadm-connections'][0]['destination'])
432 def test_25_check_topo_XPDRA(self):
433 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
434 self.assertEqual(response.status_code, requests.codes.ok)
435 res = response.json()
436 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
438 if ele['tp-id'] == 'XPDR1-NETWORK2':
439 self.assertEqual({'frequency': 196.05,
441 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
442 elif ele['tp-id'] in ('XPDR1-CLIENT1'):
443 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
446 def test_26_check_topo_ROADMA_SRG1(self):
447 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
448 self.assertEqual(response.status_code, requests.codes.ok)
449 res = response.json()
450 freq_map = base64.b64decode(
451 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
452 freq_map_array = [int(x) for x in freq_map]
453 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
454 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
455 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
457 if ele['tp-id'] == 'SRG1-PP1-TXRX':
458 freq_map = base64.b64decode(
459 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
460 freq_map_array = [int(x) for x in freq_map]
461 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
462 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
463 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
464 freq_map = base64.b64decode(
465 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
466 freq_map_array = [int(x) for x in freq_map]
467 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
468 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
469 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
470 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
473 def test_27_check_topo_ROADMA_DEG2(self):
474 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
475 self.assertEqual(response.status_code, requests.codes.ok)
476 res = response.json()
477 freq_map = base64.b64decode(
478 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
479 freq_map_array = [int(x) for x in freq_map]
480 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
481 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
482 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
484 if ele['tp-id'] == 'DEG2-CTP-TXRX':
485 freq_map = base64.b64decode(
486 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
487 freq_map_array = [int(x) for x in freq_map]
488 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
489 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
490 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
491 freq_map = base64.b64decode(
492 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
493 freq_map_array = [int(x) for x in freq_map]
494 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
495 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
498 # creation service test on a non-available resource
499 def test_28_create_eth_service3(self):
500 self.cr_serv_sample_data["input"]["service-name"] = "service3"
501 response = test_utils.service_create_request(self.cr_serv_sample_data)
502 self.assertEqual(response.status_code, requests.codes.ok)
503 res = response.json()
504 self.assertIn('PCE calculation in progress',
505 res['output']['configuration-response-common']['response-message'])
506 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
507 time.sleep(self.WAITING)
509 # add a test that check the openroadm-service-list still only contains 2 elements
510 def test_29_delete_eth_service3(self):
511 response = test_utils.service_delete_request("service3")
512 self.assertEqual(response.status_code, requests.codes.ok)
513 res = response.json()
514 self.assertIn('Service \'service3\' does not exist in datastore',
515 res['output']['configuration-response-common']['response-message'])
516 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
519 def test_30_delete_eth_service1(self):
520 response = test_utils.service_delete_request("service1")
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 self.assertIn('Renderer service delete in progress',
524 res['output']['configuration-response-common']['response-message'])
527 def test_31_delete_eth_service2(self):
528 response = test_utils.service_delete_request("service2")
529 self.assertEqual(response.status_code, requests.codes.ok)
530 res = response.json()
531 self.assertIn('Renderer service delete in progress',
532 res['output']['configuration-response-common']['response-message'])
535 def test_32_check_no_xc_ROADMA(self):
536 response = test_utils.check_netconf_node_request("ROADM-A1", "")
537 res = response.json()
538 self.assertEqual(response.status_code, requests.codes.ok)
539 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
542 def test_33_check_topo_XPDRA(self):
543 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
544 self.assertEqual(response.status_code, requests.codes.ok)
545 res = response.json()
546 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
548 if ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
549 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
550 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
551 self.assertIn('tail-equipment-id',
552 dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
553 self.assertNotIn('wavelength', dict.keys(
554 ele['org-openroadm-network-topology:xpdr-network-attributes']))
557 def test_34_check_topo_ROADMA_SRG1(self):
558 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
559 self.assertEqual(response.status_code, requests.codes.ok)
560 res = response.json()
561 freq_map = base64.b64decode(
562 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
563 freq_map_array = [int(x) for x in freq_map]
564 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
565 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
566 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
568 if ele['tp-id'] in ('SRG1-PP1-TXRX', 'SRG1-PP2-TXRX'):
569 freq_map = base64.b64decode(
570 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
571 freq_map_array = [int(x) for x in freq_map]
572 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
573 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
574 elif ele['tp-id'] == 'SRG1-CP-TXRX':
575 freq_map = base64.b64decode(
576 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
577 freq_map_array = [int(x) for x in freq_map]
578 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
579 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
581 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
584 def test_35_check_topo_ROADMA_DEG2(self):
585 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
586 self.assertEqual(response.status_code, requests.codes.ok)
587 res = response.json()
588 freq_map = base64.b64decode(
589 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
590 freq_map_array = [int(x) for x in freq_map]
591 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
592 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
593 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
595 if ele['tp-id'] == 'DEG2-CTP-TXRX':
596 freq_map = base64.b64decode(
597 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
598 freq_map_array = [int(x) for x in freq_map]
599 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
600 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
601 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
602 freq_map = base64.b64decode(
603 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
604 freq_map_array = [int(x) for x in freq_map]
605 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
606 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
609 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
610 def test_36_create_oc_service1(self):
611 self.cr_serv_sample_data["input"]["service-name"] = "service1"
612 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
613 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
614 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
615 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
616 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
617 response = test_utils.service_create_request(self.cr_serv_sample_data)
618 self.assertEqual(response.status_code, requests.codes.ok)
619 res = response.json()
620 self.assertIn('PCE calculation in progress',
621 res['output']['configuration-response-common']['response-message'])
622 time.sleep(self.WAITING)
624 def test_37_get_oc_service1(self):
625 response = test_utils.get_service_list_request("services/service1")
626 self.assertEqual(response.status_code, requests.codes.ok)
627 res = response.json()
629 res['services'][0]['administrative-state'],
632 res['services'][0]['service-name'], 'service1')
634 res['services'][0]['connection-type'], 'roadm-line')
636 res['services'][0]['lifecycle-state'], 'planned')
639 def test_38_check_xc1_ROADMA(self):
640 response = test_utils.check_netconf_node_request(
641 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
642 self.assertEqual(response.status_code, requests.codes.ok)
643 res = response.json()
644 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
645 self.assertDictEqual(
647 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
648 'opticalControlMode': 'gainLoss',
649 'target-output-power': -3.0
650 }, **res['roadm-connections'][0]),
651 res['roadm-connections'][0]
653 self.assertDictEqual(
654 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
655 res['roadm-connections'][0]['source'])
656 self.assertDictEqual(
657 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
658 res['roadm-connections'][0]['destination'])
661 def test_39_check_xc1_ROADMC(self):
662 response = test_utils.check_netconf_node_request(
663 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
664 self.assertEqual(response.status_code, requests.codes.ok)
665 res = response.json()
666 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
667 self.assertDictEqual(
669 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
670 'opticalControlMode': 'gainLoss',
671 'target-output-power': -3.0
672 }, **res['roadm-connections'][0]),
673 res['roadm-connections'][0]
675 self.assertDictEqual(
676 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
677 res['roadm-connections'][0]['source'])
678 self.assertDictEqual(
679 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
680 res['roadm-connections'][0]['destination'])
683 def test_40_create_oc_service2(self):
684 self.cr_serv_sample_data["input"]["service-name"] = "service2"
685 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
686 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
687 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
688 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
689 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
690 response = test_utils.service_create_request(self.cr_serv_sample_data)
691 self.assertEqual(response.status_code, requests.codes.ok)
692 res = response.json()
693 self.assertIn('PCE calculation in progress',
694 res['output']['configuration-response-common']['response-message'])
695 time.sleep(self.WAITING)
697 def test_41_get_oc_service2(self):
698 response = test_utils.get_service_list_request("services/service2")
699 self.assertEqual(response.status_code, requests.codes.ok)
700 res = response.json()
702 res['services'][0]['administrative-state'],
705 res['services'][0]['service-name'], 'service2')
707 res['services'][0]['connection-type'], 'roadm-line')
709 res['services'][0]['lifecycle-state'], 'planned')
712 def test_42_check_xc2_ROADMA(self):
713 response = test_utils.check_netconf_node_request(
714 "ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
715 self.assertEqual(response.status_code, requests.codes.ok)
716 res = response.json()
717 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
718 self.assertDictEqual(
720 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
721 'opticalControlMode': 'gainLoss',
722 'target-output-power': -3.0
723 }, **res['roadm-connections'][0]),
724 res['roadm-connections'][0]
726 self.assertDictEqual(
727 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
728 res['roadm-connections'][0]['source'])
729 self.assertDictEqual(
730 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
731 res['roadm-connections'][0]['destination'])
734 def test_43_check_topo_ROADMA(self):
735 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
736 self.assertEqual(response.status_code, requests.codes.ok)
737 res = response.json()
738 freq_map = base64.b64decode(
739 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
740 freq_map_array = [int(x) for x in freq_map]
741 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
742 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
743 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
745 if ele['tp-id'] == 'SRG1-PP1-TXRX':
746 freq_map = base64.b64decode(
747 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
748 freq_map_array = [int(x) for x in freq_map]
749 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
750 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
751 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
752 freq_map = base64.b64decode(
753 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
754 freq_map_array = [int(x) for x in freq_map]
755 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
756 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
757 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
758 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
760 self.test_27_check_topo_ROADMA_DEG2()
763 def test_44_delete_oc_service1(self):
764 response = test_utils.service_delete_request("service1")
765 self.assertEqual(response.status_code, requests.codes.ok)
766 res = response.json()
767 self.assertIn('Renderer service delete in progress',
768 res['output']['configuration-response-common']['response-message'])
771 def test_45_delete_oc_service2(self):
772 response = test_utils.service_delete_request("service2")
773 self.assertEqual(response.status_code, requests.codes.ok)
774 res = response.json()
775 self.assertIn('Renderer service delete in progress',
776 res['output']['configuration-response-common']['response-message'])
779 def test_46_get_no_oc_services(self):
781 response = test_utils.get_service_list_request("")
782 self.assertEqual(response.status_code, requests.codes.conflict)
783 res = response.json()
785 {"error-type": "application", "error-tag": "data-missing",
786 "error-message": "Request could not be completed because the relevant data model content does not exist"},
787 res['errors']['error'])
790 def test_47_get_no_xc_ROADMA(self):
791 response = test_utils.check_netconf_node_request("ROADM-A1", "")
792 self.assertEqual(response.status_code, requests.codes.ok)
793 res = response.json()
794 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
797 def test_48_check_topo_ROADMA(self):
798 self.test_34_check_topo_ROADMA_SRG1()
799 self.test_35_check_topo_ROADMA_DEG2()
801 def test_49_loop_create_oc_service(self):
802 for i in range(1, 3):
803 # pylint: disable=consider-using-f-string
804 print("iteration number {}".format(i))
805 print("oc service creation")
806 self.test_36_create_oc_service1()
807 print("check xc in ROADM-A1")
808 self.test_38_check_xc1_ROADMA()
809 print("check xc in ROADM-C1")
810 self.test_39_check_xc1_ROADMC()
811 print("oc service deletion\n")
812 self.test_44_delete_oc_service1()
814 def test_50_loop_create_eth_service(self):
815 response = test_utils.get_service_list_request("services/service1")
816 if response.status_code != requests.codes.not_found:
817 response = test_utils.service_delete_request("service1")
819 self.cr_serv_sample_data["input"]["connection-type"] = "service"
820 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "XPDR-A1"
821 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
822 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "XPDR-C1"
823 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
824 for i in range(1, 3):
825 # pylint: disable=consider-using-f-string
826 print("iteration number {}".format(i))
827 print("eth service creation")
828 self.test_15_create_eth_service2()
829 print("check xc in ROADM-A1")
830 self.test_17_check_xc1_ROADMA()
831 print("eth service deletion\n")
832 self.test_31_delete_eth_service2()
834 def test_51_disconnect_XPDRA(self):
835 response = test_utils.unmount_device("XPDR-A1")
836 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
838 def test_52_disconnect_XPDRC(self):
839 response = test_utils.unmount_device("XPDR-C1")
840 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
842 def test_53_disconnect_ROADMA(self):
843 response = test_utils.unmount_device("ROADM-A1")
844 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
846 def test_54_disconnect_ROADMC(self):
847 response = test_utils.unmount_device("ROADM-C1")
848 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
851 if __name__ == "__main__":
852 unittest.main(verbosity=2)