2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
29 cr_serv_sample_data = {"input": {
30 "sdnc-request-header": {
31 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
32 "rpc-action": "service-create",
33 "request-system-id": "appname",
34 "notification-url": "http://localhost:8585/NotificationServer/notify"
36 "service-name": "service1",
37 "common-id": "ASATT1234567",
38 "connection-type": "service",
40 "service-rate": "100",
42 "service-format": "Ethernet",
43 "clli": "SNJSCAMCJP8",
46 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
47 "port-type": "router",
48 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
49 "port-rack": "000000.00",
53 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
54 "lgx-port-name": "LGX Back.3",
55 "lgx-port-rack": "000000.00",
56 "lgx-port-shelf": "00"
61 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
62 "port-type": "router",
63 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
64 "port-rack": "000000.00",
68 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
69 "lgx-port-name": "LGX Back.4",
70 "lgx-port-rack": "000000.00",
71 "lgx-port-shelf": "00"
77 "service-rate": "100",
79 "service-format": "Ethernet",
80 "clli": "SNJSCAMCJT4",
83 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
84 "port-type": "router",
85 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
86 "port-rack": "000000.00",
90 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
91 "lgx-port-name": "LGX Back.29",
92 "lgx-port-rack": "000000.00",
93 "lgx-port-shelf": "00"
98 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
99 "port-type": "router",
100 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
101 "port-rack": "000000.00",
105 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
106 "lgx-port-name": "LGX Back.30",
107 "lgx-port-rack": "000000.00",
108 "lgx-port-shelf": "00"
113 "due-date": "2016-11-28T00:00:01Z",
114 "operator-contact": "pw1234"
118 WAITING = 20 # nominal value is 300
119 NODE_VERSION = '2.2.1'
123 cls.processes = test_utils.start_tpce()
124 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
125 ('roadma', cls.NODE_VERSION),
126 ('roadmc', cls.NODE_VERSION),
127 ('xpdrc', cls.NODE_VERSION)])
130 def tearDownClass(cls):
131 # pylint: disable=not-an-iterable
132 for process in cls.processes:
133 test_utils.shutdown_process(process)
134 print("all processes killed")
136 def setUp(self): # instruction executed before each test method
137 print("execution of {}".format(self.id().split(".")[-1]))
139 def test_01_connect_xpdrA(self):
140 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_02_connect_xpdrC(self):
144 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_03_connect_rdmA(self):
148 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_04_connect_rdmC(self):
152 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
156 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
157 "ROADM-A1", "1", "SRG1-PP1-TXRX")
158 self.assertEqual(response.status_code, requests.codes.ok)
159 res = response.json()
160 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
163 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
164 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
165 "ROADM-A1", "1", "SRG1-PP1-TXRX")
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
171 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
172 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
173 "ROADM-C1", "1", "SRG1-PP1-TXRX")
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
179 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
180 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
181 "ROADM-C1", "1", "SRG1-PP1-TXRX")
182 self.assertEqual(response.status_code, requests.codes.ok)
183 res = response.json()
184 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
187 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
188 # Config ROADMA-ROADMC oms-attributes
190 "auto-spanloss": "true",
191 "spanloss-base": 11.4,
192 "spanloss-current": 12,
193 "engineered-spanloss": 12.2,
194 "link-concatenation": [{
197 "SRLG-length": 100000,
199 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
200 self.assertEqual(response.status_code, requests.codes.created)
202 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
203 # Config ROADMC-ROADMA oms-attributes
205 "auto-spanloss": "true",
206 "spanloss-base": 11.4,
207 "spanloss-current": 12,
208 "engineered-spanloss": 12.2,
209 "link-concatenation": [{
212 "SRLG-length": 100000,
214 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
215 self.assertEqual(response.status_code, requests.codes.created)
217 # test service-create for Eth service from xpdr to xpdr
218 def test_11_create_eth_service1(self):
219 self.cr_serv_sample_data["input"]["service-name"] = "service1"
220 response = test_utils.service_create_request(self.cr_serv_sample_data)
221 self.assertEqual(response.status_code, requests.codes.ok)
222 res = response.json()
223 self.assertIn('PCE calculation in progress',
224 res['output']['configuration-response-common']['response-message'])
225 time.sleep(self.WAITING)
227 def test_12_get_eth_service1(self):
228 response = test_utils.get_service_list_request("services/service1")
229 self.assertEqual(response.status_code, requests.codes.ok)
230 res = response.json()
232 res['services'][0]['administrative-state'], 'inService')
234 res['services'][0]['service-name'], 'service1')
236 res['services'][0]['connection-type'], 'service')
238 res['services'][0]['lifecycle-state'], 'planned')
241 def test_13_check_xc1_ROADMA(self):
242 response = test_utils.check_netconf_node_request(
243 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
244 self.assertEqual(response.status_code, requests.codes.ok)
245 res = response.json()
246 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
247 self.assertDictEqual(
249 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
250 'opticalControlMode': 'gainLoss',
251 'target-output-power': -3.0
252 }, **res['roadm-connections'][0]),
253 res['roadm-connections'][0]
255 self.assertDictEqual(
256 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
257 res['roadm-connections'][0]['source'])
258 self.assertDictEqual(
259 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
260 res['roadm-connections'][0]['destination'])
263 def test_14_check_xc1_ROADMC(self):
264 response = test_utils.check_netconf_node_request(
265 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
266 self.assertEqual(response.status_code, requests.codes.ok)
267 res = response.json()
268 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
269 self.assertDictEqual(
271 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
272 'opticalControlMode': 'gainLoss',
273 'target-output-power': -3.0
274 }, **res['roadm-connections'][0]),
275 res['roadm-connections'][0]
277 self.assertDictEqual(
278 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
279 res['roadm-connections'][0]['source'])
280 self.assertDictEqual(
281 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
282 res['roadm-connections'][0]['destination'])
285 def test_15_check_topo_XPDRA(self):
286 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
287 self.assertEqual(response.status_code, requests.codes.ok)
288 res = response.json()
289 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
291 if ele['tp-id'] == 'XPDR1-NETWORK1':
292 self.assertEqual({'frequency': 196.1,
294 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
295 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
296 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
297 if ele['tp-id'] == 'XPDR1-NETWORK2':
298 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
301 def test_16_check_topo_ROADMA_SRG1(self):
302 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
303 self.assertEqual(response.status_code, requests.codes.ok)
304 res = response.json()
305 freq_map = base64.b64decode(
306 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
307 freq_map_array = [int(x) for x in freq_map]
308 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
309 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
311 if ele['tp-id'] == 'SRG1-PP1-TXRX':
312 freq_map = base64.b64decode(
313 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
314 freq_map_array = [int(x) for x in freq_map]
315 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
316 if ele['tp-id'] == 'SRG1-PP2-TXRX':
317 self.assertNotIn('avail-freq-maps', dict.keys(ele))
320 def test_17_check_topo_ROADMA_DEG1(self):
321 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
322 self.assertEqual(response.status_code, requests.codes.ok)
323 res = response.json()
324 freq_map = base64.b64decode(
325 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
326 freq_map_array = [int(x) for x in freq_map]
327 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
328 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
330 if ele['tp-id'] == 'DEG2-CTP-TXRX':
331 freq_map = base64.b64decode(
332 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
333 freq_map_array = [int(x) for x in freq_map]
334 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
335 if ele['tp-id'] == 'DEG2-TTP-TXRX':
336 freq_map = base64.b64decode(
337 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
338 freq_map_array = [int(x) for x in freq_map]
339 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
342 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
343 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
344 "ROADM-A1", "1", "SRG1-PP2-TXRX")
345 self.assertEqual(response.status_code, requests.codes.ok)
346 res = response.json()
347 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
350 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
351 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
352 "ROADM-A1", "1", "SRG1-PP2-TXRX")
353 self.assertEqual(response.status_code, requests.codes.ok)
354 res = response.json()
355 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
358 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
359 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
360 "ROADM-C1", "1", "SRG1-PP2-TXRX")
361 self.assertEqual(response.status_code, requests.codes.ok)
362 res = response.json()
363 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
366 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
367 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
368 "ROADM-C1", "1", "SRG1-PP2-TXRX")
369 self.assertEqual(response.status_code, requests.codes.ok)
370 res = response.json()
371 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
374 def test_22_create_eth_service2(self):
375 self.cr_serv_sample_data["input"]["service-name"] = "service2"
376 response = test_utils.service_create_request(self.cr_serv_sample_data)
377 self.assertEqual(response.status_code, requests.codes.ok)
378 res = response.json()
379 self.assertIn('PCE calculation in progress',
380 res['output']['configuration-response-common']['response-message'])
381 time.sleep(self.WAITING)
383 def test_23_get_eth_service2(self):
384 response = test_utils.get_service_list_request("services/service2")
385 self.assertEqual(response.status_code, requests.codes.ok)
386 res = response.json()
388 res['services'][0]['administrative-state'],
391 res['services'][0]['service-name'], 'service2')
393 res['services'][0]['connection-type'], 'service')
395 res['services'][0]['lifecycle-state'], 'planned')
398 def test_24_check_xc2_ROADMA(self):
399 response = test_utils.check_netconf_node_request(
400 "ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
403 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
404 self.assertDictEqual(
406 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
407 'opticalControlMode': 'power'
408 }, **res['roadm-connections'][0]),
409 res['roadm-connections'][0]
411 self.assertDictEqual(
412 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
413 res['roadm-connections'][0]['source'])
414 self.assertDictEqual(
415 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
416 res['roadm-connections'][0]['destination'])
418 def test_25_check_topo_XPDRA(self):
419 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
420 self.assertEqual(response.status_code, requests.codes.ok)
421 res = response.json()
422 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
424 if ele['tp-id'] == 'XPDR1-NETWORK1':
425 self.assertEqual({'frequency': 196.1,
427 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
428 if ele['tp-id'] == 'XPDR1-NETWORK2':
429 self.assertEqual({'frequency': 196.05,
431 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
432 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
433 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
436 def test_26_check_topo_ROADMA_SRG1(self):
437 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
438 self.assertEqual(response.status_code, requests.codes.ok)
439 res = response.json()
440 freq_map = base64.b64decode(
441 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
442 freq_map_array = [int(x) for x in freq_map]
443 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
444 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
445 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
447 if ele['tp-id'] == 'SRG1-PP1-TXRX':
448 freq_map = base64.b64decode(
449 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
450 freq_map_array = [int(x) for x in freq_map]
451 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
452 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
453 if ele['tp-id'] == 'SRG1-PP2-TXRX':
454 freq_map = base64.b64decode(
455 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
456 freq_map_array = [int(x) for x in freq_map]
457 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
458 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
459 if ele['tp-id'] == 'SRG1-PP3-TXRX':
460 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
463 def test_27_check_topo_ROADMA_DEG2(self):
464 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
465 self.assertEqual(response.status_code, requests.codes.ok)
466 res = response.json()
467 freq_map = base64.b64decode(
468 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
469 freq_map_array = [int(x) for x in freq_map]
470 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
471 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
472 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
474 if ele['tp-id'] == 'DEG2-CTP-TXRX':
475 freq_map = base64.b64decode(
476 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
477 freq_map_array = [int(x) for x in freq_map]
478 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
479 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
480 if ele['tp-id'] == 'DEG2-TTP-TXRX':
481 freq_map = base64.b64decode(
482 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
483 freq_map_array = [int(x) for x in freq_map]
484 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
485 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
488 # creation service test on a non-available resource
489 def test_28_create_eth_service3(self):
490 self.cr_serv_sample_data["input"]["service-name"] = "service3"
491 response = test_utils.service_create_request(self.cr_serv_sample_data)
492 self.assertEqual(response.status_code, requests.codes.ok)
493 res = response.json()
494 self.assertIn('PCE calculation in progress',
495 res['output']['configuration-response-common']['response-message'])
496 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
497 time.sleep(self.WAITING)
499 # add a test that check the openroadm-service-list still only contains 2 elements
500 def test_29_delete_eth_service3(self):
501 response = test_utils.service_delete_request("service3")
502 self.assertEqual(response.status_code, requests.codes.ok)
503 res = response.json()
504 self.assertIn('Service \'service3\' does not exist in datastore',
505 res['output']['configuration-response-common']['response-message'])
506 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
509 def test_30_delete_eth_service1(self):
510 response = test_utils.service_delete_request("service1")
511 self.assertEqual(response.status_code, requests.codes.ok)
512 res = response.json()
513 self.assertIn('Renderer service delete in progress',
514 res['output']['configuration-response-common']['response-message'])
517 def test_31_delete_eth_service2(self):
518 response = test_utils.service_delete_request("service2")
519 self.assertEqual(response.status_code, requests.codes.ok)
520 res = response.json()
521 self.assertIn('Renderer service delete in progress',
522 res['output']['configuration-response-common']['response-message'])
525 def test_32_check_no_xc_ROADMA(self):
526 response = test_utils.check_netconf_node_request("ROADM-A1", "")
527 res = response.json()
528 self.assertEqual(response.status_code, requests.codes.ok)
529 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
532 def test_33_check_topo_XPDRA(self):
533 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
534 self.assertEqual(response.status_code, requests.codes.ok)
535 res = response.json()
536 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
538 if ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
539 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
540 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
541 self.assertIn('tail-equipment-id',
542 dict.keys(ele['org-openroadm-network-topology:xpdr-network-attributes']))
543 self.assertNotIn('wavelength', dict.keys(
544 ele['org-openroadm-network-topology:xpdr-network-attributes']))
547 def test_34_check_topo_ROADMA_SRG1(self):
548 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
549 self.assertEqual(response.status_code, requests.codes.ok)
550 res = response.json()
551 freq_map = base64.b64decode(
552 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
553 freq_map_array = [int(x) for x in freq_map]
554 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
555 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
556 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
558 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
559 freq_map = base64.b64decode(
560 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
561 freq_map_array = [int(x) for x in freq_map]
562 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
563 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
564 elif ele['tp-id'] == 'SRG1-CP-TXRX':
565 freq_map = base64.b64decode(
566 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
567 freq_map_array = [int(x) for x in freq_map]
568 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
569 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
571 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
574 def test_35_check_topo_ROADMA_DEG2(self):
575 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
576 self.assertEqual(response.status_code, requests.codes.ok)
577 res = response.json()
578 freq_map = base64.b64decode(
579 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
580 freq_map_array = [int(x) for x in freq_map]
581 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
582 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
583 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
585 if ele['tp-id'] == 'DEG2-CTP-TXRX':
586 freq_map = base64.b64decode(
587 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
588 freq_map_array = [int(x) for x in freq_map]
589 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
590 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
591 if ele['tp-id'] == 'DEG2-TTP-TXRX':
592 freq_map = base64.b64decode(
593 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
594 freq_map_array = [int(x) for x in freq_map]
595 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
596 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
599 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
600 def test_36_create_oc_service1(self):
601 self.cr_serv_sample_data["input"]["service-name"] = "service1"
602 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
603 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
604 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
605 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
606 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
607 response = test_utils.service_create_request(self.cr_serv_sample_data)
608 self.assertEqual(response.status_code, requests.codes.ok)
609 res = response.json()
610 self.assertIn('PCE calculation in progress',
611 res['output']['configuration-response-common']['response-message'])
612 time.sleep(self.WAITING)
614 def test_37_get_oc_service1(self):
615 response = test_utils.get_service_list_request("services/service1")
616 self.assertEqual(response.status_code, requests.codes.ok)
617 res = response.json()
619 res['services'][0]['administrative-state'],
622 res['services'][0]['service-name'], 'service1')
624 res['services'][0]['connection-type'], 'roadm-line')
626 res['services'][0]['lifecycle-state'], 'planned')
629 def test_38_check_xc1_ROADMA(self):
630 response = test_utils.check_netconf_node_request(
631 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
632 self.assertEqual(response.status_code, requests.codes.ok)
633 res = response.json()
634 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
635 self.assertDictEqual(
637 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
638 'opticalControlMode': 'gainLoss',
639 'target-output-power': -3.0
640 }, **res['roadm-connections'][0]),
641 res['roadm-connections'][0]
643 self.assertDictEqual(
644 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
645 res['roadm-connections'][0]['source'])
646 self.assertDictEqual(
647 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
648 res['roadm-connections'][0]['destination'])
651 def test_39_check_xc1_ROADMC(self):
652 response = test_utils.check_netconf_node_request(
653 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
654 self.assertEqual(response.status_code, requests.codes.ok)
655 res = response.json()
656 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
657 self.assertDictEqual(
659 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
660 'opticalControlMode': 'gainLoss',
661 'target-output-power': -3.0
662 }, **res['roadm-connections'][0]),
663 res['roadm-connections'][0]
665 self.assertDictEqual(
666 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
667 res['roadm-connections'][0]['source'])
668 self.assertDictEqual(
669 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
670 res['roadm-connections'][0]['destination'])
673 def test_40_create_oc_service2(self):
674 self.cr_serv_sample_data["input"]["service-name"] = "service2"
675 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
676 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
677 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
678 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
679 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
680 response = test_utils.service_create_request(self.cr_serv_sample_data)
681 self.assertEqual(response.status_code, requests.codes.ok)
682 res = response.json()
683 self.assertIn('PCE calculation in progress',
684 res['output']['configuration-response-common']['response-message'])
685 time.sleep(self.WAITING)
687 def test_41_get_oc_service2(self):
688 response = test_utils.get_service_list_request("services/service2")
689 self.assertEqual(response.status_code, requests.codes.ok)
690 res = response.json()
692 res['services'][0]['administrative-state'],
695 res['services'][0]['service-name'], 'service2')
697 res['services'][0]['connection-type'], 'roadm-line')
699 res['services'][0]['lifecycle-state'], 'planned')
702 def test_42_check_xc2_ROADMA(self):
703 response = test_utils.check_netconf_node_request(
704 "ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
705 self.assertEqual(response.status_code, requests.codes.ok)
706 res = response.json()
707 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
708 self.assertDictEqual(
710 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
711 'opticalControlMode': 'gainLoss',
712 'target-output-power': -3.0
713 }, **res['roadm-connections'][0]),
714 res['roadm-connections'][0]
716 self.assertDictEqual(
717 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
718 res['roadm-connections'][0]['source'])
719 self.assertDictEqual(
720 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
721 res['roadm-connections'][0]['destination'])
724 def test_43_check_topo_ROADMA(self):
725 self.test_26_check_topo_ROADMA_SRG1()
726 self.test_27_check_topo_ROADMA_DEG2()
729 def test_44_delete_oc_service1(self):
730 response = test_utils.service_delete_request("service1")
731 self.assertEqual(response.status_code, requests.codes.ok)
732 res = response.json()
733 self.assertIn('Renderer service delete in progress',
734 res['output']['configuration-response-common']['response-message'])
737 def test_45_delete_oc_service2(self):
738 response = test_utils.service_delete_request("service2")
739 self.assertEqual(response.status_code, requests.codes.ok)
740 res = response.json()
741 self.assertIn('Renderer service delete in progress',
742 res['output']['configuration-response-common']['response-message'])
745 def test_46_get_no_oc_services(self):
747 response = test_utils.get_service_list_request("")
748 self.assertEqual(response.status_code, requests.codes.conflict)
749 res = response.json()
751 {"error-type": "application", "error-tag": "data-missing",
752 "error-message": "Request could not be completed because the relevant data model content does not exist"},
753 res['errors']['error'])
756 def test_47_get_no_xc_ROADMA(self):
757 response = test_utils.check_netconf_node_request("ROADM-A1", "")
758 self.assertEqual(response.status_code, requests.codes.ok)
759 res = response.json()
760 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
763 def test_48_check_topo_ROADMA(self):
764 self.test_34_check_topo_ROADMA_SRG1()
765 self.test_35_check_topo_ROADMA_DEG2()
767 def test_49_loop_create_eth_service(self):
768 for i in range(1, 6):
769 print("iteration number {}".format(i))
770 print("eth service creation")
771 self.test_11_create_eth_service1()
772 print("check xc in ROADM-A1")
773 self.test_13_check_xc1_ROADMA()
774 print("check xc in ROADM-C1")
775 self.test_14_check_xc1_ROADMC()
776 print("eth service deletion\n")
777 self.test_30_delete_eth_service1()
779 def test_50_loop_create_oc_service(self):
780 response = test_utils.get_service_list_request("services/service1")
781 if response.status_code != 404:
782 response = test_utils.service_delete_request("service1")
785 for i in range(1, 6):
786 print("iteration number {}".format(i))
787 print("oc service creation")
788 self.test_36_create_oc_service1()
789 print("check xc in ROADM-A1")
790 self.test_38_check_xc1_ROADMA()
791 print("check xc in ROADM-C1")
792 self.test_39_check_xc1_ROADMC()
793 print("oc service deletion\n")
794 self.test_44_delete_oc_service1()
796 def test_51_disconnect_XPDRA(self):
797 response = test_utils.unmount_device("XPDR-A1")
798 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
800 def test_52_disconnect_XPDRC(self):
801 response = test_utils.unmount_device("XPDR-C1")
802 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
804 def test_53_disconnect_ROADMA(self):
805 response = test_utils.unmount_device("ROADM-A1")
806 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
808 def test_54_disconnect_ROADMC(self):
809 response = test_utils.unmount_device("ROADM-C1")
810 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
813 if __name__ == "__main__":
814 unittest.main(verbosity=2)