2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 sys.path.append('transportpce_tests/common/')
22 class TransportPCEFulltesting(unittest.TestCase):
25 cr_serv_sample_data = {"input": {
26 "sdnc-request-header": {
27 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
28 "rpc-action": "service-create",
29 "request-system-id": "appname",
30 "notification-url": "http://localhost:8585/NotificationServer/notify"
32 "service-name": "service1",
33 "common-id": "ASATT1234567",
34 "connection-type": "service",
36 "service-rate": "100",
38 "service-format": "Ethernet",
39 "clli": "SNJSCAMCJP8",
42 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
43 "port-type": "router",
44 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
45 "port-rack": "000000.00",
49 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
50 "lgx-port-name": "LGX Back.3",
51 "lgx-port-rack": "000000.00",
52 "lgx-port-shelf": "00"
57 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
58 "port-type": "router",
59 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
60 "port-rack": "000000.00",
64 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
65 "lgx-port-name": "LGX Back.4",
66 "lgx-port-rack": "000000.00",
67 "lgx-port-shelf": "00"
73 "service-rate": "100",
75 "service-format": "Ethernet",
76 "clli": "SNJSCAMCJT4",
79 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
80 "port-type": "router",
81 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
82 "port-rack": "000000.00",
86 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
87 "lgx-port-name": "LGX Back.29",
88 "lgx-port-rack": "000000.00",
89 "lgx-port-shelf": "00"
94 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
95 "port-type": "router",
96 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
97 "port-rack": "000000.00",
101 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
102 "lgx-port-name": "LGX Back.30",
103 "lgx-port-rack": "000000.00",
104 "lgx-port-shelf": "00"
109 "due-date": "2016-11-28T00:00:01Z",
110 "operator-contact": "pw1234"
114 WAITING = 20 # nominal value is 300
115 NODE_VERSION = '2.2.1'
119 cls.processes = test_utils.start_tpce()
120 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
121 ('roadma', cls.NODE_VERSION),
122 ('roadmc', cls.NODE_VERSION),
123 ('xpdrc', cls.NODE_VERSION)])
126 def tearDownClass(cls):
127 # pylint: disable=not-an-iterable
128 for process in cls.processes:
129 test_utils.shutdown_process(process)
130 print("all processes killed")
132 def setUp(self): # instruction executed before each test method
133 print("execution of {}".format(self.id().split(".")[-1]))
135 def test_01_connect_xpdrA(self):
136 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
137 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
139 def test_02_connect_xpdrC(self):
140 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_03_connect_rdmA(self):
144 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_04_connect_rdmC(self):
148 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
152 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
153 "ROADM-A1", "1", "SRG1-PP1-TXRX")
154 self.assertEqual(response.status_code, requests.codes.ok)
155 res = response.json()
156 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
159 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
160 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
161 "ROADM-A1", "1", "SRG1-PP1-TXRX")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
167 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
168 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
169 "ROADM-C1", "1", "SRG1-PP1-TXRX")
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
175 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
176 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
177 "ROADM-C1", "1", "SRG1-PP1-TXRX")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
183 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
184 # Config ROADMA-ROADMC oms-attributes
186 "auto-spanloss": "true",
187 "spanloss-base": 11.4,
188 "spanloss-current": 12,
189 "engineered-spanloss": 12.2,
190 "link-concatenation": [{
193 "SRLG-length": 100000,
195 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
196 self.assertEqual(response.status_code, requests.codes.created)
198 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
199 # Config ROADMC-ROADMA oms-attributes
201 "auto-spanloss": "true",
202 "spanloss-base": 11.4,
203 "spanloss-current": 12,
204 "engineered-spanloss": 12.2,
205 "link-concatenation": [{
208 "SRLG-length": 100000,
210 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
211 self.assertEqual(response.status_code, requests.codes.created)
213 # test service-create for Eth service from xpdr to xpdr
214 def test_11_create_eth_service1(self):
215 self.cr_serv_sample_data["input"]["service-name"] = "service1"
216 response = test_utils.service_create_request(self.cr_serv_sample_data)
217 self.assertEqual(response.status_code, requests.codes.ok)
218 res = response.json()
219 self.assertIn('PCE calculation in progress',
220 res['output']['configuration-response-common']['response-message'])
221 time.sleep(self.WAITING)
223 def test_12_get_eth_service1(self):
224 response = test_utils.get_service_list_request("services/service1")
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
228 res['services'][0]['administrative-state'], 'inService')
230 res['services'][0]['service-name'], 'service1')
232 res['services'][0]['connection-type'], 'service')
234 res['services'][0]['lifecycle-state'], 'planned')
237 def test_13_check_xc1_ROADMA(self):
238 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
239 self.assertEqual(response.status_code, requests.codes.ok)
240 res = response.json()
241 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
242 self.assertDictEqual(
244 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
245 'opticalControlMode': 'gainLoss',
246 'target-output-power': -3.0
247 }, **res['roadm-connections'][0]),
248 res['roadm-connections'][0]
250 self.assertDictEqual(
251 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
252 res['roadm-connections'][0]['source'])
253 self.assertDictEqual(
254 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
255 res['roadm-connections'][0]['destination'])
258 def test_14_check_xc1_ROADMC(self):
259 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
260 self.assertEqual(response.status_code, requests.codes.ok)
261 res = response.json()
262 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
263 self.assertDictEqual(
265 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
266 'opticalControlMode': 'gainLoss',
267 'target-output-power': -3.0
268 }, **res['roadm-connections'][0]),
269 res['roadm-connections'][0]
271 self.assertDictEqual(
272 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
273 res['roadm-connections'][0]['source'])
274 self.assertDictEqual(
275 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
276 res['roadm-connections'][0]['destination'])
279 def test_15_check_topo_XPDRA(self):
280 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
281 self.assertEqual(response.status_code, requests.codes.ok)
282 res = response.json()
283 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
285 if ele['tp-id'] == 'XPDR1-NETWORK1':
286 self.assertEqual({u'frequency': 196.1,
288 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
289 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
290 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
291 if ele['tp-id'] == 'XPDR1-NETWORK2':
292 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
295 def test_16_check_topo_ROADMA_SRG1(self):
296 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
297 self.assertEqual(response.status_code, requests.codes.ok)
298 res = response.json()
299 freq_map = base64.b64decode(
300 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
301 freq_map_array = [int(x) for x in freq_map]
302 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
303 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
305 if ele['tp-id'] == 'SRG1-PP1-TXRX':
306 freq_map = base64.b64decode(
307 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
308 freq_map_array = [int(x) for x in freq_map]
309 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
310 if ele['tp-id'] == 'SRG1-PP2-TXRX':
311 self.assertNotIn('avail-freq-maps', dict.keys(ele))
314 def test_17_check_topo_ROADMA_DEG1(self):
315 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
316 self.assertEqual(response.status_code, requests.codes.ok)
317 res = response.json()
318 freq_map = base64.b64decode(
319 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
320 freq_map_array = [int(x) for x in freq_map]
321 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
322 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
324 if ele['tp-id'] == 'DEG2-CTP-TXRX':
325 freq_map = base64.b64decode(
326 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
327 freq_map_array = [int(x) for x in freq_map]
328 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
329 if ele['tp-id'] == 'DEG2-TTP-TXRX':
330 freq_map = base64.b64decode(
331 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
332 freq_map_array = [int(x) for x in freq_map]
333 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
336 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
337 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
338 "ROADM-A1", "1", "SRG1-PP2-TXRX")
339 self.assertEqual(response.status_code, requests.codes.ok)
340 res = response.json()
341 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
344 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
345 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
346 "ROADM-A1", "1", "SRG1-PP2-TXRX")
347 self.assertEqual(response.status_code, requests.codes.ok)
348 res = response.json()
349 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
352 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
353 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
354 "ROADM-C1", "1", "SRG1-PP2-TXRX")
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
360 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
361 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
362 "ROADM-C1", "1", "SRG1-PP2-TXRX")
363 self.assertEqual(response.status_code, requests.codes.ok)
364 res = response.json()
365 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
368 def test_22_create_eth_service2(self):
369 self.cr_serv_sample_data["input"]["service-name"] = "service2"
370 response = test_utils.service_create_request(self.cr_serv_sample_data)
371 self.assertEqual(response.status_code, requests.codes.ok)
372 res = response.json()
373 self.assertIn('PCE calculation in progress',
374 res['output']['configuration-response-common']['response-message'])
375 time.sleep(self.WAITING)
377 def test_23_get_eth_service2(self):
378 response = test_utils.get_service_list_request("services/service2")
379 self.assertEqual(response.status_code, requests.codes.ok)
380 res = response.json()
382 res['services'][0]['administrative-state'],
385 res['services'][0]['service-name'], 'service2')
387 res['services'][0]['connection-type'], 'service')
389 res['services'][0]['lifecycle-state'], 'planned')
392 def test_24_check_xc2_ROADMA(self):
393 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
396 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
397 self.assertDictEqual(
399 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
400 'opticalControlMode': 'power'
401 }, **res['roadm-connections'][0]),
402 res['roadm-connections'][0]
404 self.assertDictEqual(
405 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
406 res['roadm-connections'][0]['source'])
407 self.assertDictEqual(
408 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
409 res['roadm-connections'][0]['destination'])
411 def test_25_check_topo_XPDRA(self):
412 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
413 self.assertEqual(response.status_code, requests.codes.ok)
414 res = response.json()
415 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
417 if ele['tp-id'] == 'XPDR1-NETWORK1':
418 self.assertEqual({u'frequency': 196.1,
420 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
421 if ele['tp-id'] == 'XPDR1-NETWORK2':
422 self.assertEqual({u'frequency': 196.05,
424 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
425 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
426 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
429 def test_26_check_topo_ROADMA_SRG1(self):
430 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
431 self.assertEqual(response.status_code, requests.codes.ok)
432 res = response.json()
433 freq_map = base64.b64decode(
434 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
435 freq_map_array = [int(x) for x in freq_map]
436 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
437 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
438 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
440 if ele['tp-id'] == 'SRG1-PP1-TXRX':
441 freq_map = base64.b64decode(
442 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
443 freq_map_array = [int(x) for x in freq_map]
444 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
445 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
446 if ele['tp-id'] == 'SRG1-PP2-TXRX':
447 freq_map = base64.b64decode(
448 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
449 freq_map_array = [int(x) for x in freq_map]
450 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
451 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
452 if ele['tp-id'] == 'SRG1-PP3-TXRX':
453 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
456 def test_27_check_topo_ROADMA_DEG2(self):
457 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
458 self.assertEqual(response.status_code, requests.codes.ok)
459 res = response.json()
460 freq_map = base64.b64decode(
461 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
462 freq_map_array = [int(x) for x in freq_map]
463 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
464 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
465 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
467 if ele['tp-id'] == 'DEG2-CTP-TXRX':
468 freq_map = base64.b64decode(
469 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
470 freq_map_array = [int(x) for x in freq_map]
471 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
472 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
473 if ele['tp-id'] == 'DEG2-TTP-TXRX':
474 freq_map = base64.b64decode(
475 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
476 freq_map_array = [int(x) for x in freq_map]
477 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
478 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
481 # creation service test on a non-available resource
482 def test_28_create_eth_service3(self):
483 self.cr_serv_sample_data["input"]["service-name"] = "service3"
484 response = test_utils.service_create_request(self.cr_serv_sample_data)
485 self.assertEqual(response.status_code, requests.codes.ok)
486 res = response.json()
487 self.assertIn('PCE calculation in progress',
488 res['output']['configuration-response-common']['response-message'])
489 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
490 time.sleep(self.WAITING)
492 # add a test that check the openroadm-service-list still only contains 2 elements
493 def test_29_delete_eth_service3(self):
494 response = test_utils.service_delete_request("service3")
495 self.assertEqual(response.status_code, requests.codes.ok)
496 res = response.json()
497 self.assertIn('Service \'service3\' does not exist in datastore',
498 res['output']['configuration-response-common']['response-message'])
499 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
502 def test_30_delete_eth_service1(self):
503 response = test_utils.service_delete_request("service1")
504 self.assertEqual(response.status_code, requests.codes.ok)
505 res = response.json()
506 self.assertIn('Renderer service delete in progress',
507 res['output']['configuration-response-common']['response-message'])
510 def test_31_delete_eth_service2(self):
511 response = test_utils.service_delete_request("service2")
512 self.assertEqual(response.status_code, requests.codes.ok)
513 res = response.json()
514 self.assertIn('Renderer service delete in progress',
515 res['output']['configuration-response-common']['response-message'])
518 def test_32_check_no_xc_ROADMA(self):
519 response = test_utils.check_netconf_node_request("ROADM-A1", "")
520 res = response.json()
521 self.assertEqual(response.status_code, requests.codes.ok)
522 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
525 def test_33_check_topo_XPDRA(self):
526 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
527 self.assertEqual(response.status_code, requests.codes.ok)
528 res = response.json()
529 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
531 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
532 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
533 elif ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
534 self.assertIn(u'tail-equipment-id',
535 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
536 self.assertNotIn('wavelength', dict.keys(
537 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
540 def test_34_check_topo_ROADMA_SRG1(self):
541 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
542 self.assertEqual(response.status_code, requests.codes.ok)
543 res = response.json()
544 freq_map = base64.b64decode(
545 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
546 freq_map_array = [int(x) for x in freq_map]
547 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
548 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
549 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
551 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
552 freq_map = base64.b64decode(
553 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
554 freq_map_array = [int(x) for x in freq_map]
555 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
556 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
557 elif ele['tp-id'] == 'SRG1-CP-TXRX':
558 freq_map = base64.b64decode(
559 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
560 freq_map_array = [int(x) for x in freq_map]
561 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
562 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
564 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
567 def test_35_check_topo_ROADMA_DEG2(self):
568 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
569 self.assertEqual(response.status_code, requests.codes.ok)
570 res = response.json()
571 freq_map = base64.b64decode(
572 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
573 freq_map_array = [int(x) for x in freq_map]
574 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
575 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
576 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
578 if ele['tp-id'] == 'DEG2-CTP-TXRX':
579 freq_map = base64.b64decode(
580 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
581 freq_map_array = [int(x) for x in freq_map]
582 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
583 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
584 if ele['tp-id'] == 'DEG2-TTP-TXRX':
585 freq_map = base64.b64decode(
586 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
587 freq_map_array = [int(x) for x in freq_map]
588 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
589 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
592 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
593 def test_36_create_oc_service1(self):
594 self.cr_serv_sample_data["input"]["service-name"] = "service1"
595 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
596 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
597 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
598 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
599 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
600 response = test_utils.service_create_request(self.cr_serv_sample_data)
601 self.assertEqual(response.status_code, requests.codes.ok)
602 res = response.json()
603 self.assertIn('PCE calculation in progress',
604 res['output']['configuration-response-common']['response-message'])
605 time.sleep(self.WAITING)
607 def test_37_get_oc_service1(self):
608 response = test_utils.get_service_list_request("services/service1")
609 self.assertEqual(response.status_code, requests.codes.ok)
610 res = response.json()
612 res['services'][0]['administrative-state'],
615 res['services'][0]['service-name'], 'service1')
617 res['services'][0]['connection-type'], 'roadm-line')
619 res['services'][0]['lifecycle-state'], 'planned')
622 def test_38_check_xc1_ROADMA(self):
623 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
624 self.assertEqual(response.status_code, requests.codes.ok)
625 res = response.json()
626 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
627 self.assertDictEqual(
629 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
630 'opticalControlMode': 'gainLoss',
631 'target-output-power': -3.0
632 }, **res['roadm-connections'][0]),
633 res['roadm-connections'][0]
635 self.assertDictEqual(
636 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
637 res['roadm-connections'][0]['source'])
638 self.assertDictEqual(
639 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
640 res['roadm-connections'][0]['destination'])
643 def test_39_check_xc1_ROADMC(self):
644 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
647 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
648 self.assertDictEqual(
650 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
651 'opticalControlMode': 'gainLoss',
652 'target-output-power': -3.0
653 }, **res['roadm-connections'][0]),
654 res['roadm-connections'][0]
656 self.assertDictEqual(
657 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
658 res['roadm-connections'][0]['source'])
659 self.assertDictEqual(
660 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
661 res['roadm-connections'][0]['destination'])
664 def test_40_create_oc_service2(self):
665 self.cr_serv_sample_data["input"]["service-name"] = "service2"
666 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
667 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
668 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
669 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
670 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
671 response = test_utils.service_create_request(self.cr_serv_sample_data)
672 self.assertEqual(response.status_code, requests.codes.ok)
673 res = response.json()
674 self.assertIn('PCE calculation in progress',
675 res['output']['configuration-response-common']['response-message'])
676 time.sleep(self.WAITING)
678 def test_41_get_oc_service2(self):
679 response = test_utils.get_service_list_request("services/service2")
680 self.assertEqual(response.status_code, requests.codes.ok)
681 res = response.json()
683 res['services'][0]['administrative-state'],
686 res['services'][0]['service-name'], 'service2')
688 res['services'][0]['connection-type'], 'roadm-line')
690 res['services'][0]['lifecycle-state'], 'planned')
693 def test_42_check_xc2_ROADMA(self):
694 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
695 self.assertEqual(response.status_code, requests.codes.ok)
696 res = response.json()
697 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
698 self.assertDictEqual(
700 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
701 'opticalControlMode': 'gainLoss',
702 'target-output-power': -3.0
703 }, **res['roadm-connections'][0]),
704 res['roadm-connections'][0]
706 self.assertDictEqual(
707 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
708 res['roadm-connections'][0]['source'])
709 self.assertDictEqual(
710 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
711 res['roadm-connections'][0]['destination'])
714 def test_43_check_topo_ROADMA(self):
715 self.test_26_check_topo_ROADMA_SRG1()
716 self.test_27_check_topo_ROADMA_DEG2()
719 def test_44_delete_oc_service1(self):
720 response = test_utils.service_delete_request("service1")
721 self.assertEqual(response.status_code, requests.codes.ok)
722 res = response.json()
723 self.assertIn('Renderer service delete in progress',
724 res['output']['configuration-response-common']['response-message'])
727 def test_45_delete_oc_service2(self):
728 response = test_utils.service_delete_request("service2")
729 self.assertEqual(response.status_code, requests.codes.ok)
730 res = response.json()
731 self.assertIn('Renderer service delete in progress',
732 res['output']['configuration-response-common']['response-message'])
735 def test_46_get_no_oc_services(self):
737 response = test_utils.get_service_list_request("")
738 self.assertEqual(response.status_code, requests.codes.conflict)
739 res = response.json()
741 {"error-type": "application", "error-tag": "data-missing",
742 "error-message": "Request could not be completed because the relevant data model content does not exist"},
743 res['errors']['error'])
746 def test_47_get_no_xc_ROADMA(self):
747 response = test_utils.check_netconf_node_request("ROADM-A1", "")
748 self.assertEqual(response.status_code, requests.codes.ok)
749 res = response.json()
750 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
753 def test_48_check_topo_ROADMA(self):
754 self.test_34_check_topo_ROADMA_SRG1()
755 self.test_35_check_topo_ROADMA_DEG2()
757 def test_49_loop_create_eth_service(self):
758 for i in range(1, 6):
759 print("iteration number {}".format(i))
760 print("eth service creation")
761 self.test_11_create_eth_service1()
762 print("check xc in ROADM-A1")
763 self.test_13_check_xc1_ROADMA()
764 print("check xc in ROADM-C1")
765 self.test_14_check_xc1_ROADMC()
766 print("eth service deletion\n")
767 self.test_30_delete_eth_service1()
769 def test_50_loop_create_oc_service(self):
770 response = test_utils.get_service_list_request("services/service1")
771 if response.status_code != 404:
772 response = test_utils.service_delete_request("service1")
775 for i in range(1, 6):
776 print("iteration number {}".format(i))
777 print("oc service creation")
778 self.test_36_create_oc_service1()
779 print("check xc in ROADM-A1")
780 self.test_38_check_xc1_ROADMA()
781 print("check xc in ROADM-C1")
782 self.test_39_check_xc1_ROADMC()
783 print("oc service deletion\n")
784 self.test_44_delete_oc_service1()
786 def test_51_disconnect_XPDRA(self):
787 response = test_utils.unmount_device("XPDR-A1")
788 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
790 def test_52_disconnect_XPDRC(self):
791 response = test_utils.unmount_device("XPDR-C1")
792 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
794 def test_53_disconnect_ROADMA(self):
795 response = test_utils.unmount_device("ROADM-A1")
796 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
798 def test_54_disconnect_ROADMC(self):
799 response = test_utils.unmount_device("ROADM-C1")
800 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
803 if __name__ == "__main__":
804 unittest.main(verbosity=2)