2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 from common import test_utils
18 from common.flexgrid_utils import INDEX_1_USED_FREQ_MAP, INDEX_1_2_USED_FREQ_MAP, AVAILABLE_FREQ_MAP, check_freq_map
21 class TransportPCEFulltesting(unittest.TestCase):
24 cr_serv_sample_data = {"input": {
25 "sdnc-request-header": {
26 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
27 "rpc-action": "service-create",
28 "request-system-id": "appname",
29 "notification-url": "http://localhost:8585/NotificationServer/notify"
31 "service-name": "service1",
32 "common-id": "ASATT1234567",
33 "connection-type": "service",
35 "service-rate": "100",
37 "service-format": "Ethernet",
38 "clli": "SNJSCAMCJP8",
41 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
42 "port-type": "router",
43 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
44 "port-rack": "000000.00",
48 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
49 "lgx-port-name": "LGX Back.3",
50 "lgx-port-rack": "000000.00",
51 "lgx-port-shelf": "00"
56 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
57 "port-type": "router",
58 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
59 "port-rack": "000000.00",
63 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
64 "lgx-port-name": "LGX Back.4",
65 "lgx-port-rack": "000000.00",
66 "lgx-port-shelf": "00"
72 "service-rate": "100",
74 "service-format": "Ethernet",
75 "clli": "SNJSCAMCJT4",
78 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
79 "port-type": "router",
80 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
81 "port-rack": "000000.00",
85 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
86 "lgx-port-name": "LGX Back.29",
87 "lgx-port-rack": "000000.00",
88 "lgx-port-shelf": "00"
93 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
94 "port-type": "router",
95 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
96 "port-rack": "000000.00",
100 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
101 "lgx-port-name": "LGX Back.30",
102 "lgx-port-rack": "000000.00",
103 "lgx-port-shelf": "00"
108 "due-date": "2016-11-28T00:00:01Z",
109 "operator-contact": "pw1234"
113 WAITING = 20 # nominal value is 300
117 cls.processes = test_utils.start_tpce()
118 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
121 def tearDownClass(cls):
122 # pylint: disable=not-an-iterable
123 for process in cls.processes:
124 test_utils.shutdown_process(process)
125 print("all processes killed")
127 def setUp(self): # instruction executed before each test method
128 print("execution of {}".format(self.id().split(".")[-1]))
130 def test_01_connect_xpdrA(self):
131 response = test_utils.mount_device("XPDR-A1", 'xpdra')
132 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
134 def test_02_connect_xpdrC(self):
135 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
136 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
138 def test_03_connect_rdmA(self):
139 response = test_utils.mount_device("ROADM-A1", 'roadma')
140 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
142 def test_04_connect_rdmC(self):
143 response = test_utils.mount_device("ROADM-C1", 'roadmc')
144 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
147 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
148 "ROADM-A1", "1", "SRG1-PP1-TXRX")
149 self.assertEqual(response.status_code, requests.codes.ok)
150 res = response.json()
151 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
154 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
155 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
156 "ROADM-A1", "1", "SRG1-PP1-TXRX")
157 self.assertEqual(response.status_code, requests.codes.ok)
158 res = response.json()
159 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
162 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
163 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
164 "ROADM-C1", "1", "SRG1-PP1-TXRX")
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
170 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
171 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
172 "ROADM-C1", "1", "SRG1-PP1-TXRX")
173 self.assertEqual(response.status_code, requests.codes.ok)
174 res = response.json()
175 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
178 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
179 # Config ROADMA-ROADMC oms-attributes
181 "auto-spanloss": "true",
182 "spanloss-base": 11.4,
183 "spanloss-current": 12,
184 "engineered-spanloss": 12.2,
185 "link-concatenation": [{
188 "SRLG-length": 100000,
190 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
191 self.assertEqual(response.status_code, requests.codes.created)
193 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
194 # Config ROADMC-ROADMA oms-attributes
196 "auto-spanloss": "true",
197 "spanloss-base": 11.4,
198 "spanloss-current": 12,
199 "engineered-spanloss": 12.2,
200 "link-concatenation": [{
203 "SRLG-length": 100000,
205 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
206 self.assertEqual(response.status_code, requests.codes.created)
208 # test service-create for Eth service from xpdr to xpdr
209 def test_11_create_eth_service1(self):
210 self.cr_serv_sample_data["input"]["service-name"] = "service1"
211 response = test_utils.service_create_request(self.cr_serv_sample_data)
212 self.assertEqual(response.status_code, requests.codes.ok)
213 res = response.json()
214 self.assertIn('PCE calculation in progress',
215 res['output']['configuration-response-common']['response-message'])
216 time.sleep(self.WAITING)
218 def test_12_get_eth_service1(self):
219 response = test_utils.get_service_list_request("services/service1")
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
223 res['services'][0]['administrative-state'], 'inService')
225 res['services'][0]['service-name'], 'service1')
227 res['services'][0]['connection-type'], 'service')
229 res['services'][0]['lifecycle-state'], 'planned')
232 def test_13_check_xc1_ROADMA(self):
233 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
236 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
237 self.assertDictEqual(
239 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
240 'opticalControlMode': 'gainLoss',
241 'target-output-power': -3.0
242 }, **res['roadm-connections'][0]),
243 res['roadm-connections'][0]
245 self.assertDictEqual(
246 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
247 res['roadm-connections'][0]['source'])
248 self.assertDictEqual(
249 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
250 res['roadm-connections'][0]['destination'])
253 def test_14_check_xc1_ROADMC(self):
254 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
258 self.assertDictEqual(
260 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
261 'opticalControlMode': 'gainLoss',
262 'target-output-power': -3.0
263 }, **res['roadm-connections'][0]),
264 res['roadm-connections'][0]
266 self.assertDictEqual(
267 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
268 res['roadm-connections'][0]['source'])
269 self.assertDictEqual(
270 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
271 res['roadm-connections'][0]['destination'])
274 def test_15_check_topo_XPDRA(self):
275 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
278 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
280 if ele['tp-id'] == 'XPDR1-NETWORK1':
281 self.assertEqual({u'frequency': 196.1,
283 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
284 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
285 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
286 if ele['tp-id'] == 'XPDR1-NETWORK2':
287 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
290 def test_16_check_topo_ROADMA_SRG1(self):
291 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
292 self.assertEqual(response.status_code, requests.codes.ok)
293 res = response.json()
294 freq_map = base64.b64decode(
295 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
296 freq_map_array = [int(x) for x in freq_map]
297 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
298 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
300 if ele['tp-id'] == 'SRG1-PP1-TXRX':
301 self.assertIn({u'index': 1, u'frequency': 196.1,
303 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
304 if ele['tp-id'] == 'SRG1-PP2-TXRX':
305 self.assertNotIn('used-wavelength', dict.keys(ele))
308 def test_17_check_topo_ROADMA_DEG1(self):
309 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 freq_map = base64.b64decode(
313 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
314 freq_map_array = [int(x) for x in freq_map]
315 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
316 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
318 if ele['tp-id'] == 'DEG2-CTP-TXRX':
319 self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
320 u'effective-bits': 8, u'freq-map': INDEX_1_USED_FREQ_MAP},
321 ele['org-openroadm-network-topology:'
324 if ele['tp-id'] == 'DEG2-TTP-TXRX':
325 self.assertIn({u'index': 1, u'frequency': 196.1,
327 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
330 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
331 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
332 "ROADM-A1", "1", "SRG1-PP2-TXRX")
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
338 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
339 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
340 "ROADM-A1", "1", "SRG1-PP2-TXRX")
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
343 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
346 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
347 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
348 "ROADM-C1", "1", "SRG1-PP2-TXRX")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
354 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
355 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
356 "ROADM-C1", "1", "SRG1-PP2-TXRX")
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
362 def test_22_create_eth_service2(self):
363 self.cr_serv_sample_data["input"]["service-name"] = "service2"
364 response = test_utils.service_create_request(self.cr_serv_sample_data)
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 self.assertIn('PCE calculation in progress',
368 res['output']['configuration-response-common']['response-message'])
369 time.sleep(self.WAITING)
371 def test_23_get_eth_service2(self):
372 response = test_utils.get_service_list_request("services/service2")
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
376 res['services'][0]['administrative-state'],
379 res['services'][0]['service-name'], 'service2')
381 res['services'][0]['connection-type'], 'service')
383 res['services'][0]['lifecycle-state'], 'planned')
386 def test_24_check_xc2_ROADMA(self):
387 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2")
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
391 self.assertDictEqual(
393 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
394 'opticalControlMode': 'power'
395 }, **res['roadm-connections'][0]),
396 res['roadm-connections'][0]
398 self.assertDictEqual(
399 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
400 res['roadm-connections'][0]['source'])
401 self.assertDictEqual(
402 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
403 res['roadm-connections'][0]['destination'])
405 def test_25_check_topo_XPDRA(self):
406 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
407 self.assertEqual(response.status_code, requests.codes.ok)
408 res = response.json()
409 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
411 if ele['tp-id'] == 'XPDR1-NETWORK1':
412 self.assertEqual({u'frequency': 196.1,
414 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
415 if ele['tp-id'] == 'XPDR1-NETWORK2':
416 self.assertEqual({u'frequency': 196.05,
418 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
419 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
420 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
423 def test_26_check_topo_ROADMA_SRG1(self):
424 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
425 self.assertEqual(response.status_code, requests.codes.ok)
426 res = response.json()
427 freq_map = base64.b64decode(
428 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
429 freq_map_array = [int(x) for x in freq_map]
430 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
431 self.assertEqual(freq_map_array[1], 0, "Index 2 should not be available")
432 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
434 if ele['tp-id'] == 'SRG1-PP1-TXRX':
435 self.assertIn({u'index': 1, u'frequency': 196.1,
437 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
438 self.assertNotIn({u'index': 2, u'frequency': 196.05,
440 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
441 if ele['tp-id'] == 'SRG1-PP2-TXRX':
442 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
443 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
444 self.assertNotIn({u'index': 1, u'frequency': 196.1,
446 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
447 if ele['tp-id'] == 'SRG1-PP3-TXRX':
448 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
451 def test_27_check_topo_ROADMA_DEG2(self):
452 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
453 self.assertEqual(response.status_code, requests.codes.ok)
454 res = response.json()
455 freq_map = base64.b64decode(
456 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
457 freq_map_array = [int(x) for x in freq_map]
458 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
459 self.assertEqual(freq_map_array[1], 0, "Index 2 should not be available")
460 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
462 if ele['tp-id'] == 'DEG2-CTP-TXRX':
463 self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
464 u'effective-bits': 8, u'freq-map': INDEX_1_2_USED_FREQ_MAP},
465 ele['org-openroadm-network-topology:'
468 if ele['tp-id'] == 'DEG2-TTP-TXRX':
469 self.assertIn({u'index': 1, u'frequency': 196.1,
471 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
472 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
473 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
476 # creation service test on a non-available resource
477 def test_28_create_eth_service3(self):
478 self.cr_serv_sample_data["input"]["service-name"] = "service3"
479 response = test_utils.service_create_request(self.cr_serv_sample_data)
480 self.assertEqual(response.status_code, requests.codes.ok)
481 res = response.json()
482 self.assertIn('PCE calculation in progress',
483 res['output']['configuration-response-common']['response-message'])
484 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
485 time.sleep(self.WAITING)
487 # add a test that check the openroadm-service-list still only contains 2 elements
488 def test_29_delete_eth_service3(self):
489 response = test_utils.service_delete_request("service3")
490 self.assertEqual(response.status_code, requests.codes.ok)
491 res = response.json()
492 self.assertIn('Service \'service3\' does not exist in datastore',
493 res['output']['configuration-response-common']['response-message'])
494 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
497 def test_30_delete_eth_service1(self):
498 response = test_utils.service_delete_request("service1")
499 self.assertEqual(response.status_code, requests.codes.ok)
500 res = response.json()
501 self.assertIn('Renderer service delete in progress',
502 res['output']['configuration-response-common']['response-message'])
505 def test_31_delete_eth_service2(self):
506 response = test_utils.service_delete_request("service2")
507 self.assertEqual(response.status_code, requests.codes.ok)
508 res = response.json()
509 self.assertIn('Renderer service delete in progress',
510 res['output']['configuration-response-common']['response-message'])
513 def test_32_check_no_xc_ROADMA(self):
514 response = test_utils.check_netconf_node_request("ROADM-A1", "")
515 res = response.json()
516 self.assertEqual(response.status_code, requests.codes.ok)
517 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
520 def test_33_check_topo_XPDRA(self):
521 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
522 self.assertEqual(response.status_code, requests.codes.ok)
523 res = response.json()
524 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
526 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
527 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
528 elif ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
529 self.assertIn(u'tail-equipment-id',
530 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
531 self.assertNotIn('wavelength', dict.keys(
532 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
535 def test_34_check_topo_ROADMA_SRG1(self):
536 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
537 self.assertEqual(response.status_code, requests.codes.ok)
538 res = response.json()
539 freq_map = base64.b64decode(
540 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
541 freq_map_array = [int(x) for x in freq_map]
542 self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
543 self.assertEqual(freq_map_array[1], 255, "Index 2 should be available")
544 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
546 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
547 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
549 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
552 def test_35_check_topo_ROADMA_DEG2(self):
553 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
554 self.assertEqual(response.status_code, requests.codes.ok)
555 res = response.json()
556 freq_map = base64.b64decode(
557 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
558 self.assertTrue(check_freq_map(freq_map), "Index 1 and 2 should be available")
559 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
561 if ele['tp-id'] == 'DEG2-CTP-TXRX':
562 self.assertEqual(ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'],
564 if ele['tp-id'] == 'DEG2-TTP-TXRX':
565 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
568 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
569 def test_36_create_oc_service1(self):
570 self.cr_serv_sample_data["input"]["service-name"] = "service1"
571 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
572 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
573 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
574 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
575 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
576 response = test_utils.service_create_request(self.cr_serv_sample_data)
577 self.assertEqual(response.status_code, requests.codes.ok)
578 res = response.json()
579 self.assertIn('PCE calculation in progress',
580 res['output']['configuration-response-common']['response-message'])
581 time.sleep(self.WAITING)
583 def test_37_get_oc_service1(self):
584 response = test_utils.get_service_list_request("services/service1")
585 self.assertEqual(response.status_code, requests.codes.ok)
586 res = response.json()
588 res['services'][0]['administrative-state'],
591 res['services'][0]['service-name'], 'service1')
593 res['services'][0]['connection-type'], 'roadm-line')
595 res['services'][0]['lifecycle-state'], 'planned')
598 def test_38_check_xc1_ROADMA(self):
599 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
600 self.assertEqual(response.status_code, requests.codes.ok)
601 res = response.json()
602 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
603 self.assertDictEqual(
605 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
606 'opticalControlMode': 'gainLoss',
607 'target-output-power': -3.0
608 }, **res['roadm-connections'][0]),
609 res['roadm-connections'][0]
611 self.assertDictEqual(
612 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
613 res['roadm-connections'][0]['source'])
614 self.assertDictEqual(
615 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
616 res['roadm-connections'][0]['destination'])
619 def test_39_check_xc1_ROADMC(self):
620 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
621 self.assertEqual(response.status_code, requests.codes.ok)
622 res = response.json()
623 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
624 self.assertDictEqual(
626 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
627 'opticalControlMode': 'gainLoss',
628 'target-output-power': -3.0
629 }, **res['roadm-connections'][0]),
630 res['roadm-connections'][0]
632 self.assertDictEqual(
633 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
634 res['roadm-connections'][0]['source'])
635 self.assertDictEqual(
636 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
637 res['roadm-connections'][0]['destination'])
640 def test_40_create_oc_service2(self):
641 self.cr_serv_sample_data["input"]["service-name"] = "service2"
642 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
643 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
644 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
645 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
646 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
647 response = test_utils.service_create_request(self.cr_serv_sample_data)
648 self.assertEqual(response.status_code, requests.codes.ok)
649 res = response.json()
650 self.assertIn('PCE calculation in progress',
651 res['output']['configuration-response-common']['response-message'])
652 time.sleep(self.WAITING)
654 def test_41_get_oc_service2(self):
655 response = test_utils.get_service_list_request("services/service2")
656 self.assertEqual(response.status_code, requests.codes.ok)
657 res = response.json()
659 res['services'][0]['administrative-state'],
662 res['services'][0]['service-name'], 'service2')
664 res['services'][0]['connection-type'], 'roadm-line')
666 res['services'][0]['lifecycle-state'], 'planned')
669 def test_42_check_xc2_ROADMA(self):
670 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2")
671 self.assertEqual(response.status_code, requests.codes.ok)
672 res = response.json()
673 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
674 self.assertDictEqual(
676 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
677 'opticalControlMode': 'gainLoss',
678 'target-output-power': -3.0
679 }, **res['roadm-connections'][0]),
680 res['roadm-connections'][0]
682 self.assertDictEqual(
683 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
684 res['roadm-connections'][0]['source'])
685 self.assertDictEqual(
686 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
687 res['roadm-connections'][0]['destination'])
690 def test_43_check_topo_ROADMA(self):
691 self.test_26_check_topo_ROADMA_SRG1()
692 self.test_27_check_topo_ROADMA_DEG2()
695 def test_44_delete_oc_service1(self):
696 response = test_utils.service_delete_request("service1")
697 self.assertEqual(response.status_code, requests.codes.ok)
698 res = response.json()
699 self.assertIn('Renderer service delete in progress',
700 res['output']['configuration-response-common']['response-message'])
703 def test_45_delete_oc_service2(self):
704 response = test_utils.service_delete_request("service2")
705 self.assertEqual(response.status_code, requests.codes.ok)
706 res = response.json()
707 self.assertIn('Renderer service delete in progress',
708 res['output']['configuration-response-common']['response-message'])
711 def test_46_get_no_oc_services(self):
713 response = test_utils.get_service_list_request("")
714 self.assertEqual(response.status_code, requests.codes.conflict)
715 res = response.json()
717 {"error-type": "application", "error-tag": "data-missing",
718 "error-message": "Request could not be completed because the relevant data model content does not exist"},
719 res['errors']['error'])
722 def test_47_get_no_xc_ROADMA(self):
723 response = test_utils.check_netconf_node_request("ROADM-A1", "")
724 self.assertEqual(response.status_code, requests.codes.ok)
725 res = response.json()
726 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
729 def test_48_check_topo_ROADMA(self):
730 self.test_34_check_topo_ROADMA_SRG1()
731 self.test_35_check_topo_ROADMA_DEG2()
733 def test_49_loop_create_eth_service(self):
734 for i in range(1, 6):
735 print("iteration number {}".format(i))
736 print("eth service creation")
737 self.test_11_create_eth_service1()
738 print("check xc in ROADM-A1")
739 self.test_13_check_xc1_ROADMA()
740 print("check xc in ROADM-C1")
741 self.test_14_check_xc1_ROADMC()
742 print("eth service deletion\n")
743 self.test_30_delete_eth_service1()
745 def test_50_loop_create_oc_service(self):
746 response = test_utils.get_service_list_request("services/service1")
747 if response.status_code != 404:
748 response = test_utils.service_delete_request("service1")
751 for i in range(1, 6):
752 print("iteration number {}".format(i))
753 print("oc service creation")
754 self.test_36_create_oc_service1()
755 print("check xc in ROADM-A1")
756 self.test_38_check_xc1_ROADMA()
757 print("check xc in ROADM-C1")
758 self.test_39_check_xc1_ROADMC()
759 print("oc service deletion\n")
760 self.test_44_delete_oc_service1()
762 def test_51_disconnect_XPDRA(self):
763 response = test_utils.unmount_device("XPDR-A1")
764 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
766 def test_52_disconnect_XPDRC(self):
767 response = test_utils.unmount_device("XPDR-C1")
768 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
770 def test_53_disconnect_ROADMA(self):
771 response = test_utils.unmount_device("ROADM-A1")
772 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
774 def test_54_disconnect_ROADMC(self):
775 response = test_utils.unmount_device("ROADM-C1")
776 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
779 if __name__ == "__main__":
780 unittest.main(verbosity=2)