2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
20 import test_utils # nopep8
23 class TransportPCEFulltesting(unittest.TestCase):
26 cr_serv_sample_data = {"input": {
27 "sdnc-request-header": {
28 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
29 "rpc-action": "service-create",
30 "request-system-id": "appname",
31 "notification-url": "http://localhost:8585/NotificationServer/notify"
33 "service-name": "service1",
34 "common-id": "ASATT1234567",
35 "connection-type": "service",
37 "service-rate": "100",
39 "service-format": "Ethernet",
40 "clli": "SNJSCAMCJP8",
43 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
44 "port-type": "router",
45 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
46 "port-rack": "000000.00",
50 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
51 "lgx-port-name": "LGX Back.3",
52 "lgx-port-rack": "000000.00",
53 "lgx-port-shelf": "00"
58 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
59 "port-type": "router",
60 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
61 "port-rack": "000000.00",
65 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
66 "lgx-port-name": "LGX Back.4",
67 "lgx-port-rack": "000000.00",
68 "lgx-port-shelf": "00"
74 "service-rate": "100",
76 "service-format": "Ethernet",
77 "clli": "SNJSCAMCJT4",
80 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
81 "port-type": "router",
82 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
83 "port-rack": "000000.00",
87 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
88 "lgx-port-name": "LGX Back.29",
89 "lgx-port-rack": "000000.00",
90 "lgx-port-shelf": "00"
95 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
96 "port-type": "router",
97 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
98 "port-rack": "000000.00",
102 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
103 "lgx-port-name": "LGX Back.30",
104 "lgx-port-rack": "000000.00",
105 "lgx-port-shelf": "00"
110 "due-date": "2016-11-28T00:00:01Z",
111 "operator-contact": "pw1234"
115 WAITING = 20 # nominal value is 300
116 NODE_VERSION = '2.2.1'
120 cls.processes = test_utils.start_tpce()
121 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
122 ('roadma', cls.NODE_VERSION),
123 ('roadmc', cls.NODE_VERSION),
124 ('xpdrc', cls.NODE_VERSION)])
127 def tearDownClass(cls):
128 # pylint: disable=not-an-iterable
129 for process in cls.processes:
130 test_utils.shutdown_process(process)
131 print("all processes killed")
133 def setUp(self): # instruction executed before each test method
134 print("execution of {}".format(self.id().split(".")[-1]))
136 def test_01_connect_xpdrA(self):
137 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
138 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
140 def test_02_connect_xpdrC(self):
141 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
142 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_03_connect_rdmA(self):
145 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_04_connect_rdmC(self):
149 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
153 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
154 "ROADM-A1", "1", "SRG1-PP1-TXRX")
155 self.assertEqual(response.status_code, requests.codes.ok)
156 res = response.json()
157 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
160 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
161 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
162 "ROADM-A1", "1", "SRG1-PP1-TXRX")
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
165 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
168 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
169 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
170 "ROADM-C1", "1", "SRG1-PP1-TXRX")
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
173 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
176 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
177 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
178 "ROADM-C1", "1", "SRG1-PP1-TXRX")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
184 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
185 # Config ROADMA-ROADMC oms-attributes
187 "auto-spanloss": "true",
188 "spanloss-base": 11.4,
189 "spanloss-current": 12,
190 "engineered-spanloss": 12.2,
191 "link-concatenation": [{
194 "SRLG-length": 100000,
196 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
197 self.assertEqual(response.status_code, requests.codes.created)
199 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
200 # Config ROADMC-ROADMA oms-attributes
202 "auto-spanloss": "true",
203 "spanloss-base": 11.4,
204 "spanloss-current": 12,
205 "engineered-spanloss": 12.2,
206 "link-concatenation": [{
209 "SRLG-length": 100000,
211 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
212 self.assertEqual(response.status_code, requests.codes.created)
214 # test service-create for Eth service from xpdr to xpdr
215 def test_11_create_eth_service1(self):
216 self.cr_serv_sample_data["input"]["service-name"] = "service1"
217 response = test_utils.service_create_request(self.cr_serv_sample_data)
218 self.assertEqual(response.status_code, requests.codes.ok)
219 res = response.json()
220 self.assertIn('PCE calculation in progress',
221 res['output']['configuration-response-common']['response-message'])
222 time.sleep(self.WAITING)
224 def test_12_get_eth_service1(self):
225 response = test_utils.get_service_list_request("services/service1")
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
229 res['services'][0]['administrative-state'], 'inService')
231 res['services'][0]['service-name'], 'service1')
233 res['services'][0]['connection-type'], 'service')
235 res['services'][0]['lifecycle-state'], 'planned')
238 def test_13_check_xc1_ROADMA(self):
239 response = test_utils.check_netconf_node_request(
240 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
241 self.assertEqual(response.status_code, requests.codes.ok)
242 res = response.json()
243 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
244 self.assertDictEqual(
246 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
247 'opticalControlMode': 'gainLoss',
248 'target-output-power': -3.0
249 }, **res['roadm-connections'][0]),
250 res['roadm-connections'][0]
252 self.assertDictEqual(
253 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
254 res['roadm-connections'][0]['source'])
255 self.assertDictEqual(
256 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
257 res['roadm-connections'][0]['destination'])
260 def test_14_check_xc1_ROADMC(self):
261 response = test_utils.check_netconf_node_request(
262 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
266 self.assertDictEqual(
268 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
269 'opticalControlMode': 'gainLoss',
270 'target-output-power': -3.0
271 }, **res['roadm-connections'][0]),
272 res['roadm-connections'][0]
274 self.assertDictEqual(
275 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
276 res['roadm-connections'][0]['source'])
277 self.assertDictEqual(
278 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
279 res['roadm-connections'][0]['destination'])
282 def test_15_check_topo_XPDRA(self):
283 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
284 self.assertEqual(response.status_code, requests.codes.ok)
285 res = response.json()
286 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
288 if ele['tp-id'] == 'XPDR1-NETWORK1':
289 self.assertEqual({'frequency': 196.1,
291 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
292 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
293 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
294 if ele['tp-id'] == 'XPDR1-NETWORK2':
295 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
298 def test_16_check_topo_ROADMA_SRG1(self):
299 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
300 self.assertEqual(response.status_code, requests.codes.ok)
301 res = response.json()
302 freq_map = base64.b64decode(
303 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
304 freq_map_array = [int(x) for x in freq_map]
305 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
306 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
308 if ele['tp-id'] == 'SRG1-PP1-TXRX':
309 freq_map = base64.b64decode(
310 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
311 freq_map_array = [int(x) for x in freq_map]
312 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
313 if ele['tp-id'] == 'SRG1-PP2-TXRX':
314 self.assertNotIn('avail-freq-maps', dict.keys(ele))
317 def test_17_check_topo_ROADMA_DEG1(self):
318 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
319 self.assertEqual(response.status_code, requests.codes.ok)
320 res = response.json()
321 freq_map = base64.b64decode(
322 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
323 freq_map_array = [int(x) for x in freq_map]
324 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
325 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
327 if ele['tp-id'] == 'DEG2-CTP-TXRX':
328 freq_map = base64.b64decode(
329 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
330 freq_map_array = [int(x) for x in freq_map]
331 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
332 if ele['tp-id'] == 'DEG2-TTP-TXRX':
333 freq_map = base64.b64decode(
334 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
335 freq_map_array = [int(x) for x in freq_map]
336 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
339 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
340 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
341 "ROADM-A1", "1", "SRG1-PP2-TXRX")
342 self.assertEqual(response.status_code, requests.codes.ok)
343 res = response.json()
344 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
347 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
348 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
349 "ROADM-A1", "1", "SRG1-PP2-TXRX")
350 self.assertEqual(response.status_code, requests.codes.ok)
351 res = response.json()
352 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
355 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
356 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
357 "ROADM-C1", "1", "SRG1-PP2-TXRX")
358 self.assertEqual(response.status_code, requests.codes.ok)
359 res = response.json()
360 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
363 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
364 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
365 "ROADM-C1", "1", "SRG1-PP2-TXRX")
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
371 def test_22_create_eth_service2(self):
372 self.cr_serv_sample_data["input"]["service-name"] = "service2"
373 response = test_utils.service_create_request(self.cr_serv_sample_data)
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
376 self.assertIn('PCE calculation in progress',
377 res['output']['configuration-response-common']['response-message'])
378 time.sleep(self.WAITING)
380 def test_23_get_eth_service2(self):
381 response = test_utils.get_service_list_request("services/service2")
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
385 res['services'][0]['administrative-state'],
388 res['services'][0]['service-name'], 'service2')
390 res['services'][0]['connection-type'], 'service')
392 res['services'][0]['lifecycle-state'], 'planned')
395 def test_24_check_xc2_ROADMA(self):
396 response = test_utils.check_netconf_node_request(
397 "ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
401 self.assertDictEqual(
403 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
404 'opticalControlMode': 'power'
405 }, **res['roadm-connections'][0]),
406 res['roadm-connections'][0]
408 self.assertDictEqual(
409 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
410 res['roadm-connections'][0]['source'])
411 self.assertDictEqual(
412 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
413 res['roadm-connections'][0]['destination'])
415 def test_25_check_topo_XPDRA(self):
416 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
417 self.assertEqual(response.status_code, requests.codes.ok)
418 res = response.json()
419 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
421 if ele['tp-id'] == 'XPDR1-NETWORK1':
422 self.assertEqual({'frequency': 196.1,
424 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
425 if ele['tp-id'] == 'XPDR1-NETWORK2':
426 self.assertEqual({'frequency': 196.05,
428 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
429 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
430 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
433 def test_26_check_topo_ROADMA_SRG1(self):
434 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
435 self.assertEqual(response.status_code, requests.codes.ok)
436 res = response.json()
437 freq_map = base64.b64decode(
438 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
439 freq_map_array = [int(x) for x in freq_map]
440 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
441 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
442 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
444 if ele['tp-id'] == 'SRG1-PP1-TXRX':
445 freq_map = base64.b64decode(
446 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
447 freq_map_array = [int(x) for x in freq_map]
448 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
449 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
450 if ele['tp-id'] == 'SRG1-PP2-TXRX':
451 freq_map = base64.b64decode(
452 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
453 freq_map_array = [int(x) for x in freq_map]
454 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
455 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
456 if ele['tp-id'] == 'SRG1-PP3-TXRX':
457 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
460 def test_27_check_topo_ROADMA_DEG2(self):
461 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
462 self.assertEqual(response.status_code, requests.codes.ok)
463 res = response.json()
464 freq_map = base64.b64decode(
465 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
466 freq_map_array = [int(x) for x in freq_map]
467 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
468 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
469 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
471 if ele['tp-id'] == 'DEG2-CTP-TXRX':
472 freq_map = base64.b64decode(
473 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
474 freq_map_array = [int(x) for x in freq_map]
475 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
476 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
477 if ele['tp-id'] == 'DEG2-TTP-TXRX':
478 freq_map = base64.b64decode(
479 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
480 freq_map_array = [int(x) for x in freq_map]
481 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
482 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
485 # creation service test on a non-available resource
486 def test_28_create_eth_service3(self):
487 self.cr_serv_sample_data["input"]["service-name"] = "service3"
488 response = test_utils.service_create_request(self.cr_serv_sample_data)
489 self.assertEqual(response.status_code, requests.codes.ok)
490 res = response.json()
491 self.assertIn('PCE calculation in progress',
492 res['output']['configuration-response-common']['response-message'])
493 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
494 time.sleep(self.WAITING)
496 # add a test that check the openroadm-service-list still only contains 2 elements
497 def test_29_delete_eth_service3(self):
498 response = test_utils.service_delete_request("service3")
499 self.assertEqual(response.status_code, requests.codes.ok)
500 res = response.json()
501 self.assertIn('Service \'service3\' does not exist in datastore',
502 res['output']['configuration-response-common']['response-message'])
503 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
506 def test_30_delete_eth_service1(self):
507 response = test_utils.service_delete_request("service1")
508 self.assertEqual(response.status_code, requests.codes.ok)
509 res = response.json()
510 self.assertIn('Renderer service delete in progress',
511 res['output']['configuration-response-common']['response-message'])
514 def test_31_delete_eth_service2(self):
515 response = test_utils.service_delete_request("service2")
516 self.assertEqual(response.status_code, requests.codes.ok)
517 res = response.json()
518 self.assertIn('Renderer service delete in progress',
519 res['output']['configuration-response-common']['response-message'])
522 def test_32_check_no_xc_ROADMA(self):
523 response = test_utils.check_netconf_node_request("ROADM-A1", "")
524 res = response.json()
525 self.assertEqual(response.status_code, requests.codes.ok)
526 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
529 def test_33_check_topo_XPDRA(self):
530 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
531 self.assertEqual(response.status_code, requests.codes.ok)
532 res = response.json()
533 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
535 if ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
536 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
537 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
538 self.assertIn('tail-equipment-id',
539 dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
540 self.assertNotIn('wavelength', dict.keys(
541 ele['org-openroadm-network-topology:xpdr-network-attributes']))
544 def test_34_check_topo_ROADMA_SRG1(self):
545 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
546 self.assertEqual(response.status_code, requests.codes.ok)
547 res = response.json()
548 freq_map = base64.b64decode(
549 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
550 freq_map_array = [int(x) for x in freq_map]
551 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
552 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
553 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
555 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
556 freq_map = base64.b64decode(
557 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
558 freq_map_array = [int(x) for x in freq_map]
559 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
560 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
561 elif ele['tp-id'] == 'SRG1-CP-TXRX':
562 freq_map = base64.b64decode(
563 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
564 freq_map_array = [int(x) for x in freq_map]
565 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
566 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
568 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
571 def test_35_check_topo_ROADMA_DEG2(self):
572 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
573 self.assertEqual(response.status_code, requests.codes.ok)
574 res = response.json()
575 freq_map = base64.b64decode(
576 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
577 freq_map_array = [int(x) for x in freq_map]
578 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
579 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
580 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
582 if ele['tp-id'] == 'DEG2-CTP-TXRX':
583 freq_map = base64.b64decode(
584 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
585 freq_map_array = [int(x) for x in freq_map]
586 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
587 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
588 if ele['tp-id'] == 'DEG2-TTP-TXRX':
589 freq_map = base64.b64decode(
590 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
591 freq_map_array = [int(x) for x in freq_map]
592 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
593 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
596 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
597 def test_36_create_oc_service1(self):
598 self.cr_serv_sample_data["input"]["service-name"] = "service1"
599 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
600 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
601 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
602 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
603 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
604 response = test_utils.service_create_request(self.cr_serv_sample_data)
605 self.assertEqual(response.status_code, requests.codes.ok)
606 res = response.json()
607 self.assertIn('PCE calculation in progress',
608 res['output']['configuration-response-common']['response-message'])
609 time.sleep(self.WAITING)
611 def test_37_get_oc_service1(self):
612 response = test_utils.get_service_list_request("services/service1")
613 self.assertEqual(response.status_code, requests.codes.ok)
614 res = response.json()
616 res['services'][0]['administrative-state'],
619 res['services'][0]['service-name'], 'service1')
621 res['services'][0]['connection-type'], 'roadm-line')
623 res['services'][0]['lifecycle-state'], 'planned')
626 def test_38_check_xc1_ROADMA(self):
627 response = test_utils.check_netconf_node_request(
628 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
629 self.assertEqual(response.status_code, requests.codes.ok)
630 res = response.json()
631 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
632 self.assertDictEqual(
634 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
635 'opticalControlMode': 'gainLoss',
636 'target-output-power': -3.0
637 }, **res['roadm-connections'][0]),
638 res['roadm-connections'][0]
640 self.assertDictEqual(
641 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
642 res['roadm-connections'][0]['source'])
643 self.assertDictEqual(
644 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
645 res['roadm-connections'][0]['destination'])
648 def test_39_check_xc1_ROADMC(self):
649 response = test_utils.check_netconf_node_request(
650 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
651 self.assertEqual(response.status_code, requests.codes.ok)
652 res = response.json()
653 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
654 self.assertDictEqual(
656 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
657 'opticalControlMode': 'gainLoss',
658 'target-output-power': -3.0
659 }, **res['roadm-connections'][0]),
660 res['roadm-connections'][0]
662 self.assertDictEqual(
663 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
664 res['roadm-connections'][0]['source'])
665 self.assertDictEqual(
666 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
667 res['roadm-connections'][0]['destination'])
670 def test_40_create_oc_service2(self):
671 self.cr_serv_sample_data["input"]["service-name"] = "service2"
672 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
673 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
674 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
675 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
676 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
677 response = test_utils.service_create_request(self.cr_serv_sample_data)
678 self.assertEqual(response.status_code, requests.codes.ok)
679 res = response.json()
680 self.assertIn('PCE calculation in progress',
681 res['output']['configuration-response-common']['response-message'])
682 time.sleep(self.WAITING)
684 def test_41_get_oc_service2(self):
685 response = test_utils.get_service_list_request("services/service2")
686 self.assertEqual(response.status_code, requests.codes.ok)
687 res = response.json()
689 res['services'][0]['administrative-state'],
692 res['services'][0]['service-name'], 'service2')
694 res['services'][0]['connection-type'], 'roadm-line')
696 res['services'][0]['lifecycle-state'], 'planned')
699 def test_42_check_xc2_ROADMA(self):
700 response = test_utils.check_netconf_node_request(
701 "ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
702 self.assertEqual(response.status_code, requests.codes.ok)
703 res = response.json()
704 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
705 self.assertDictEqual(
707 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
708 'opticalControlMode': 'gainLoss',
709 'target-output-power': -3.0
710 }, **res['roadm-connections'][0]),
711 res['roadm-connections'][0]
713 self.assertDictEqual(
714 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
715 res['roadm-connections'][0]['source'])
716 self.assertDictEqual(
717 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
718 res['roadm-connections'][0]['destination'])
721 def test_43_check_topo_ROADMA(self):
722 self.test_26_check_topo_ROADMA_SRG1()
723 self.test_27_check_topo_ROADMA_DEG2()
726 def test_44_delete_oc_service1(self):
727 response = test_utils.service_delete_request("service1")
728 self.assertEqual(response.status_code, requests.codes.ok)
729 res = response.json()
730 self.assertIn('Renderer service delete in progress',
731 res['output']['configuration-response-common']['response-message'])
734 def test_45_delete_oc_service2(self):
735 response = test_utils.service_delete_request("service2")
736 self.assertEqual(response.status_code, requests.codes.ok)
737 res = response.json()
738 self.assertIn('Renderer service delete in progress',
739 res['output']['configuration-response-common']['response-message'])
742 def test_46_get_no_oc_services(self):
744 response = test_utils.get_service_list_request("")
745 self.assertEqual(response.status_code, requests.codes.conflict)
746 res = response.json()
748 {"error-type": "application", "error-tag": "data-missing",
749 "error-message": "Request could not be completed because the relevant data model content does not exist"},
750 res['errors']['error'])
753 def test_47_get_no_xc_ROADMA(self):
754 response = test_utils.check_netconf_node_request("ROADM-A1", "")
755 self.assertEqual(response.status_code, requests.codes.ok)
756 res = response.json()
757 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
760 def test_48_check_topo_ROADMA(self):
761 self.test_34_check_topo_ROADMA_SRG1()
762 self.test_35_check_topo_ROADMA_DEG2()
764 def test_49_loop_create_eth_service(self):
765 for i in range(1, 6):
766 print("iteration number {}".format(i))
767 print("eth service creation")
768 self.test_11_create_eth_service1()
769 print("check xc in ROADM-A1")
770 self.test_13_check_xc1_ROADMA()
771 print("check xc in ROADM-C1")
772 self.test_14_check_xc1_ROADMC()
773 print("eth service deletion\n")
774 self.test_30_delete_eth_service1()
776 def test_50_loop_create_oc_service(self):
777 response = test_utils.get_service_list_request("services/service1")
778 if response.status_code != 404:
779 response = test_utils.service_delete_request("service1")
782 for i in range(1, 6):
783 print("iteration number {}".format(i))
784 print("oc service creation")
785 self.test_36_create_oc_service1()
786 print("check xc in ROADM-A1")
787 self.test_38_check_xc1_ROADMA()
788 print("check xc in ROADM-C1")
789 self.test_39_check_xc1_ROADMC()
790 print("oc service deletion\n")
791 self.test_44_delete_oc_service1()
793 def test_51_disconnect_XPDRA(self):
794 response = test_utils.unmount_device("XPDR-A1")
795 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
797 def test_52_disconnect_XPDRC(self):
798 response = test_utils.unmount_device("XPDR-C1")
799 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
801 def test_53_disconnect_ROADMA(self):
802 response = test_utils.unmount_device("ROADM-A1")
803 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
805 def test_54_disconnect_ROADMC(self):
806 response = test_utils.unmount_device("ROADM-C1")
807 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
810 if __name__ == "__main__":
811 unittest.main(verbosity=2)