2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
22 class TransportPCEFulltesting(unittest.TestCase):
25 cr_serv_sample_data = {"input": {
26 "sdnc-request-header": {
27 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
28 "rpc-action": "service-create",
29 "request-system-id": "appname",
30 "notification-url": "http://localhost:8585/NotificationServer/notify"
32 "service-name": "service1",
33 "common-id": "ASATT1234567",
34 "connection-type": "service",
36 "service-rate": "100",
38 "service-format": "Ethernet",
39 "clli": "SNJSCAMCJP8",
42 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
43 "port-type": "router",
44 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
45 "port-rack": "000000.00",
49 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
50 "lgx-port-name": "LGX Back.3",
51 "lgx-port-rack": "000000.00",
52 "lgx-port-shelf": "00"
57 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
58 "port-type": "router",
59 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
60 "port-rack": "000000.00",
64 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
65 "lgx-port-name": "LGX Back.4",
66 "lgx-port-rack": "000000.00",
67 "lgx-port-shelf": "00"
73 "service-rate": "100",
75 "service-format": "Ethernet",
76 "clli": "SNJSCAMCJT4",
79 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
80 "port-type": "router",
81 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
82 "port-rack": "000000.00",
86 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
87 "lgx-port-name": "LGX Back.29",
88 "lgx-port-rack": "000000.00",
89 "lgx-port-shelf": "00"
94 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
95 "port-type": "router",
96 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
97 "port-rack": "000000.00",
101 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
102 "lgx-port-name": "LGX Back.30",
103 "lgx-port-rack": "000000.00",
104 "lgx-port-shelf": "00"
109 "due-date": "2016-11-28T00:00:01Z",
110 "operator-contact": "pw1234"
114 WAITING = 20 # nominal value is 300
115 NODE_VERSION = '2.2.1'
119 cls.processes = test_utils.start_tpce()
120 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
121 ('roadma', cls.NODE_VERSION),
122 ('roadmc', cls.NODE_VERSION),
123 ('xpdrc', cls.NODE_VERSION)])
126 def tearDownClass(cls):
127 # pylint: disable=not-an-iterable
128 for process in cls.processes:
129 test_utils.shutdown_process(process)
130 print("all processes killed")
132 def setUp(self): # instruction executed before each test method
133 print("execution of {}".format(self.id().split(".")[-1]))
135 def test_01_connect_xpdrA(self):
136 response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
137 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
139 def test_02_connect_xpdrC(self):
140 response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_03_connect_rdmA(self):
144 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_04_connect_rdmC(self):
148 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
152 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
153 "ROADM-A1", "1", "SRG1-PP1-TXRX")
154 self.assertEqual(response.status_code, requests.codes.ok)
155 res = response.json()
156 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
159 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
160 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
161 "ROADM-A1", "1", "SRG1-PP1-TXRX")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
167 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
168 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
169 "ROADM-C1", "1", "SRG1-PP1-TXRX")
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
175 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
176 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
177 "ROADM-C1", "1", "SRG1-PP1-TXRX")
178 self.assertEqual(response.status_code, requests.codes.ok)
179 res = response.json()
180 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
183 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
184 # Config ROADMA-ROADMC oms-attributes
186 "auto-spanloss": "true",
187 "spanloss-base": 11.4,
188 "spanloss-current": 12,
189 "engineered-spanloss": 12.2,
190 "link-concatenation": [{
193 "SRLG-length": 100000,
195 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
196 self.assertEqual(response.status_code, requests.codes.created)
198 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
199 # Config ROADMC-ROADMA oms-attributes
201 "auto-spanloss": "true",
202 "spanloss-base": 11.4,
203 "spanloss-current": 12,
204 "engineered-spanloss": 12.2,
205 "link-concatenation": [{
208 "SRLG-length": 100000,
210 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
211 self.assertEqual(response.status_code, requests.codes.created)
213 # test service-create for Eth service from xpdr to xpdr
214 def test_11_create_eth_service1(self):
215 self.cr_serv_sample_data["input"]["service-name"] = "service1"
216 response = test_utils.service_create_request(self.cr_serv_sample_data)
217 self.assertEqual(response.status_code, requests.codes.ok)
218 res = response.json()
219 self.assertIn('PCE calculation in progress',
220 res['output']['configuration-response-common']['response-message'])
221 time.sleep(self.WAITING)
223 def test_12_get_eth_service1(self):
224 response = test_utils.get_service_list_request("services/service1")
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
228 res['services'][0]['administrative-state'], 'inService')
230 res['services'][0]['service-name'], 'service1')
232 res['services'][0]['connection-type'], 'service')
234 res['services'][0]['lifecycle-state'], 'planned')
237 def test_13_check_xc1_ROADMA(self):
238 response = test_utils.check_netconf_node_request(
239 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
240 self.assertEqual(response.status_code, requests.codes.ok)
241 res = response.json()
242 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
243 self.assertDictEqual(
245 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
246 'opticalControlMode': 'gainLoss',
247 'target-output-power': -3.0
248 }, **res['roadm-connections'][0]),
249 res['roadm-connections'][0]
251 self.assertDictEqual(
252 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
253 res['roadm-connections'][0]['source'])
254 self.assertDictEqual(
255 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
256 res['roadm-connections'][0]['destination'])
259 def test_14_check_xc1_ROADMC(self):
260 response = test_utils.check_netconf_node_request(
261 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
264 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
265 self.assertDictEqual(
267 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
268 'opticalControlMode': 'gainLoss',
269 'target-output-power': -3.0
270 }, **res['roadm-connections'][0]),
271 res['roadm-connections'][0]
273 self.assertDictEqual(
274 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
275 res['roadm-connections'][0]['source'])
276 self.assertDictEqual(
277 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
278 res['roadm-connections'][0]['destination'])
281 def test_15_check_topo_XPDRA(self):
282 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
283 self.assertEqual(response.status_code, requests.codes.ok)
284 res = response.json()
285 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
287 if ele['tp-id'] == 'XPDR1-NETWORK1':
288 self.assertEqual({u'frequency': 196.1,
290 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
291 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
292 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
293 if ele['tp-id'] == 'XPDR1-NETWORK2':
294 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
297 def test_16_check_topo_ROADMA_SRG1(self):
298 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
299 self.assertEqual(response.status_code, requests.codes.ok)
300 res = response.json()
301 freq_map = base64.b64decode(
302 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
303 freq_map_array = [int(x) for x in freq_map]
304 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
305 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
307 if ele['tp-id'] == 'SRG1-PP1-TXRX':
308 freq_map = base64.b64decode(
309 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
310 freq_map_array = [int(x) for x in freq_map]
311 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
312 if ele['tp-id'] == 'SRG1-PP2-TXRX':
313 self.assertNotIn('avail-freq-maps', dict.keys(ele))
316 def test_17_check_topo_ROADMA_DEG1(self):
317 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
318 self.assertEqual(response.status_code, requests.codes.ok)
319 res = response.json()
320 freq_map = base64.b64decode(
321 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
322 freq_map_array = [int(x) for x in freq_map]
323 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
324 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
326 if ele['tp-id'] == 'DEG2-CTP-TXRX':
327 freq_map = base64.b64decode(
328 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
329 freq_map_array = [int(x) for x in freq_map]
330 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
331 if ele['tp-id'] == 'DEG2-TTP-TXRX':
332 freq_map = base64.b64decode(
333 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
334 freq_map_array = [int(x) for x in freq_map]
335 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
338 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
339 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
340 "ROADM-A1", "1", "SRG1-PP2-TXRX")
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
343 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
346 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
347 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
348 "ROADM-A1", "1", "SRG1-PP2-TXRX")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
354 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
355 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
356 "ROADM-C1", "1", "SRG1-PP2-TXRX")
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
362 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
363 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
364 "ROADM-C1", "1", "SRG1-PP2-TXRX")
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
370 def test_22_create_eth_service2(self):
371 self.cr_serv_sample_data["input"]["service-name"] = "service2"
372 response = test_utils.service_create_request(self.cr_serv_sample_data)
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
375 self.assertIn('PCE calculation in progress',
376 res['output']['configuration-response-common']['response-message'])
377 time.sleep(self.WAITING)
379 def test_23_get_eth_service2(self):
380 response = test_utils.get_service_list_request("services/service2")
381 self.assertEqual(response.status_code, requests.codes.ok)
382 res = response.json()
384 res['services'][0]['administrative-state'],
387 res['services'][0]['service-name'], 'service2')
389 res['services'][0]['connection-type'], 'service')
391 res['services'][0]['lifecycle-state'], 'planned')
394 def test_24_check_xc2_ROADMA(self):
395 response = test_utils.check_netconf_node_request(
396 "ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
399 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
400 self.assertDictEqual(
402 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
403 'opticalControlMode': 'power'
404 }, **res['roadm-connections'][0]),
405 res['roadm-connections'][0]
407 self.assertDictEqual(
408 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
409 res['roadm-connections'][0]['source'])
410 self.assertDictEqual(
411 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
412 res['roadm-connections'][0]['destination'])
414 def test_25_check_topo_XPDRA(self):
415 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
420 if ele['tp-id'] == 'XPDR1-NETWORK1':
421 self.assertEqual({u'frequency': 196.1,
423 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
424 if ele['tp-id'] == 'XPDR1-NETWORK2':
425 self.assertEqual({u'frequency': 196.05,
427 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
428 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
429 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
432 def test_26_check_topo_ROADMA_SRG1(self):
433 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
434 self.assertEqual(response.status_code, requests.codes.ok)
435 res = response.json()
436 freq_map = base64.b64decode(
437 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
438 freq_map_array = [int(x) for x in freq_map]
439 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
440 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
441 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
443 if ele['tp-id'] == 'SRG1-PP1-TXRX':
444 freq_map = base64.b64decode(
445 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
446 freq_map_array = [int(x) for x in freq_map]
447 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
448 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
449 if ele['tp-id'] == 'SRG1-PP2-TXRX':
450 freq_map = base64.b64decode(
451 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
452 freq_map_array = [int(x) for x in freq_map]
453 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
454 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
455 if ele['tp-id'] == 'SRG1-PP3-TXRX':
456 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
459 def test_27_check_topo_ROADMA_DEG2(self):
460 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
461 self.assertEqual(response.status_code, requests.codes.ok)
462 res = response.json()
463 freq_map = base64.b64decode(
464 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
465 freq_map_array = [int(x) for x in freq_map]
466 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
467 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
468 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
470 if ele['tp-id'] == 'DEG2-CTP-TXRX':
471 freq_map = base64.b64decode(
472 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
473 freq_map_array = [int(x) for x in freq_map]
474 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
475 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
476 if ele['tp-id'] == 'DEG2-TTP-TXRX':
477 freq_map = base64.b64decode(
478 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
479 freq_map_array = [int(x) for x in freq_map]
480 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
481 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
484 # creation service test on a non-available resource
485 def test_28_create_eth_service3(self):
486 self.cr_serv_sample_data["input"]["service-name"] = "service3"
487 response = test_utils.service_create_request(self.cr_serv_sample_data)
488 self.assertEqual(response.status_code, requests.codes.ok)
489 res = response.json()
490 self.assertIn('PCE calculation in progress',
491 res['output']['configuration-response-common']['response-message'])
492 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
493 time.sleep(self.WAITING)
495 # add a test that check the openroadm-service-list still only contains 2 elements
496 def test_29_delete_eth_service3(self):
497 response = test_utils.service_delete_request("service3")
498 self.assertEqual(response.status_code, requests.codes.ok)
499 res = response.json()
500 self.assertIn('Service \'service3\' does not exist in datastore',
501 res['output']['configuration-response-common']['response-message'])
502 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
505 def test_30_delete_eth_service1(self):
506 response = test_utils.service_delete_request("service1")
507 self.assertEqual(response.status_code, requests.codes.ok)
508 res = response.json()
509 self.assertIn('Renderer service delete in progress',
510 res['output']['configuration-response-common']['response-message'])
513 def test_31_delete_eth_service2(self):
514 response = test_utils.service_delete_request("service2")
515 self.assertEqual(response.status_code, requests.codes.ok)
516 res = response.json()
517 self.assertIn('Renderer service delete in progress',
518 res['output']['configuration-response-common']['response-message'])
521 def test_32_check_no_xc_ROADMA(self):
522 response = test_utils.check_netconf_node_request("ROADM-A1", "")
523 res = response.json()
524 self.assertEqual(response.status_code, requests.codes.ok)
525 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
528 def test_33_check_topo_XPDRA(self):
529 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
530 self.assertEqual(response.status_code, requests.codes.ok)
531 res = response.json()
532 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
534 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
535 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
536 elif ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
537 self.assertIn(u'tail-equipment-id',
538 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
539 self.assertNotIn('wavelength', dict.keys(
540 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
543 def test_34_check_topo_ROADMA_SRG1(self):
544 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
545 self.assertEqual(response.status_code, requests.codes.ok)
546 res = response.json()
547 freq_map = base64.b64decode(
548 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
549 freq_map_array = [int(x) for x in freq_map]
550 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
551 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
552 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
554 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
555 freq_map = base64.b64decode(
556 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
557 freq_map_array = [int(x) for x in freq_map]
558 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
559 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
560 elif ele['tp-id'] == 'SRG1-CP-TXRX':
561 freq_map = base64.b64decode(
562 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
563 freq_map_array = [int(x) for x in freq_map]
564 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
565 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
567 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
570 def test_35_check_topo_ROADMA_DEG2(self):
571 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
572 self.assertEqual(response.status_code, requests.codes.ok)
573 res = response.json()
574 freq_map = base64.b64decode(
575 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
576 freq_map_array = [int(x) for x in freq_map]
577 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
578 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
579 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
581 if ele['tp-id'] == 'DEG2-CTP-TXRX':
582 freq_map = base64.b64decode(
583 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
584 freq_map_array = [int(x) for x in freq_map]
585 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
586 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
587 if ele['tp-id'] == 'DEG2-TTP-TXRX':
588 freq_map = base64.b64decode(
589 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
590 freq_map_array = [int(x) for x in freq_map]
591 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
592 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
595 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
596 def test_36_create_oc_service1(self):
597 self.cr_serv_sample_data["input"]["service-name"] = "service1"
598 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
599 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
600 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
601 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
602 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
603 response = test_utils.service_create_request(self.cr_serv_sample_data)
604 self.assertEqual(response.status_code, requests.codes.ok)
605 res = response.json()
606 self.assertIn('PCE calculation in progress',
607 res['output']['configuration-response-common']['response-message'])
608 time.sleep(self.WAITING)
610 def test_37_get_oc_service1(self):
611 response = test_utils.get_service_list_request("services/service1")
612 self.assertEqual(response.status_code, requests.codes.ok)
613 res = response.json()
615 res['services'][0]['administrative-state'],
618 res['services'][0]['service-name'], 'service1')
620 res['services'][0]['connection-type'], 'roadm-line')
622 res['services'][0]['lifecycle-state'], 'planned')
625 def test_38_check_xc1_ROADMA(self):
626 response = test_utils.check_netconf_node_request(
627 "ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
628 self.assertEqual(response.status_code, requests.codes.ok)
629 res = response.json()
630 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
631 self.assertDictEqual(
633 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
634 'opticalControlMode': 'gainLoss',
635 'target-output-power': -3.0
636 }, **res['roadm-connections'][0]),
637 res['roadm-connections'][0]
639 self.assertDictEqual(
640 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
641 res['roadm-connections'][0]['source'])
642 self.assertDictEqual(
643 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
644 res['roadm-connections'][0]['destination'])
647 def test_39_check_xc1_ROADMC(self):
648 response = test_utils.check_netconf_node_request(
649 "ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
650 self.assertEqual(response.status_code, requests.codes.ok)
651 res = response.json()
652 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
653 self.assertDictEqual(
655 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
656 'opticalControlMode': 'gainLoss',
657 'target-output-power': -3.0
658 }, **res['roadm-connections'][0]),
659 res['roadm-connections'][0]
661 self.assertDictEqual(
662 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
663 res['roadm-connections'][0]['source'])
664 self.assertDictEqual(
665 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
666 res['roadm-connections'][0]['destination'])
669 def test_40_create_oc_service2(self):
670 self.cr_serv_sample_data["input"]["service-name"] = "service2"
671 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
672 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
673 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
674 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
675 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
676 response = test_utils.service_create_request(self.cr_serv_sample_data)
677 self.assertEqual(response.status_code, requests.codes.ok)
678 res = response.json()
679 self.assertIn('PCE calculation in progress',
680 res['output']['configuration-response-common']['response-message'])
681 time.sleep(self.WAITING)
683 def test_41_get_oc_service2(self):
684 response = test_utils.get_service_list_request("services/service2")
685 self.assertEqual(response.status_code, requests.codes.ok)
686 res = response.json()
688 res['services'][0]['administrative-state'],
691 res['services'][0]['service-name'], 'service2')
693 res['services'][0]['connection-type'], 'roadm-line')
695 res['services'][0]['lifecycle-state'], 'planned')
698 def test_42_check_xc2_ROADMA(self):
699 response = test_utils.check_netconf_node_request(
700 "ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
701 self.assertEqual(response.status_code, requests.codes.ok)
702 res = response.json()
703 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
704 self.assertDictEqual(
706 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
707 'opticalControlMode': 'gainLoss',
708 'target-output-power': -3.0
709 }, **res['roadm-connections'][0]),
710 res['roadm-connections'][0]
712 self.assertDictEqual(
713 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
714 res['roadm-connections'][0]['source'])
715 self.assertDictEqual(
716 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
717 res['roadm-connections'][0]['destination'])
720 def test_43_check_topo_ROADMA(self):
721 self.test_26_check_topo_ROADMA_SRG1()
722 self.test_27_check_topo_ROADMA_DEG2()
725 def test_44_delete_oc_service1(self):
726 response = test_utils.service_delete_request("service1")
727 self.assertEqual(response.status_code, requests.codes.ok)
728 res = response.json()
729 self.assertIn('Renderer service delete in progress',
730 res['output']['configuration-response-common']['response-message'])
733 def test_45_delete_oc_service2(self):
734 response = test_utils.service_delete_request("service2")
735 self.assertEqual(response.status_code, requests.codes.ok)
736 res = response.json()
737 self.assertIn('Renderer service delete in progress',
738 res['output']['configuration-response-common']['response-message'])
741 def test_46_get_no_oc_services(self):
743 response = test_utils.get_service_list_request("")
744 self.assertEqual(response.status_code, requests.codes.conflict)
745 res = response.json()
747 {"error-type": "application", "error-tag": "data-missing",
748 "error-message": "Request could not be completed because the relevant data model content does not exist"},
749 res['errors']['error'])
752 def test_47_get_no_xc_ROADMA(self):
753 response = test_utils.check_netconf_node_request("ROADM-A1", "")
754 self.assertEqual(response.status_code, requests.codes.ok)
755 res = response.json()
756 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
759 def test_48_check_topo_ROADMA(self):
760 self.test_34_check_topo_ROADMA_SRG1()
761 self.test_35_check_topo_ROADMA_DEG2()
763 def test_49_loop_create_eth_service(self):
764 for i in range(1, 6):
765 print("iteration number {}".format(i))
766 print("eth service creation")
767 self.test_11_create_eth_service1()
768 print("check xc in ROADM-A1")
769 self.test_13_check_xc1_ROADMA()
770 print("check xc in ROADM-C1")
771 self.test_14_check_xc1_ROADMC()
772 print("eth service deletion\n")
773 self.test_30_delete_eth_service1()
775 def test_50_loop_create_oc_service(self):
776 response = test_utils.get_service_list_request("services/service1")
777 if response.status_code != 404:
778 response = test_utils.service_delete_request("service1")
781 for i in range(1, 6):
782 print("iteration number {}".format(i))
783 print("oc service creation")
784 self.test_36_create_oc_service1()
785 print("check xc in ROADM-A1")
786 self.test_38_check_xc1_ROADMA()
787 print("check xc in ROADM-C1")
788 self.test_39_check_xc1_ROADMC()
789 print("oc service deletion\n")
790 self.test_44_delete_oc_service1()
792 def test_51_disconnect_XPDRA(self):
793 response = test_utils.unmount_device("XPDR-A1")
794 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
796 def test_52_disconnect_XPDRC(self):
797 response = test_utils.unmount_device("XPDR-C1")
798 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
800 def test_53_disconnect_ROADMA(self):
801 response = test_utils.unmount_device("ROADM-A1")
802 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
804 def test_54_disconnect_ROADMC(self):
805 response = test_utils.unmount_device("ROADM-C1")
806 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
809 if __name__ == "__main__":
810 unittest.main(verbosity=2)