2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 from common import test_utils
20 class TransportPCEFulltesting(unittest.TestCase):
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
26 "rpc-action": "service-create",
27 "request-system-id": "appname",
28 "notification-url": "http://localhost:8585/NotificationServer/notify"
30 "service-name": "service1",
31 "common-id": "ASATT1234567",
32 "connection-type": "service",
34 "service-rate": "100",
36 "service-format": "Ethernet",
37 "clli": "SNJSCAMCJP8",
40 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
41 "port-type": "router",
42 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
43 "port-rack": "000000.00",
47 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
48 "lgx-port-name": "LGX Back.3",
49 "lgx-port-rack": "000000.00",
50 "lgx-port-shelf": "00"
55 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
56 "port-type": "router",
57 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
58 "port-rack": "000000.00",
62 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
63 "lgx-port-name": "LGX Back.4",
64 "lgx-port-rack": "000000.00",
65 "lgx-port-shelf": "00"
71 "service-rate": "100",
73 "service-format": "Ethernet",
74 "clli": "SNJSCAMCJT4",
77 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
78 "port-type": "router",
79 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
80 "port-rack": "000000.00",
84 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
85 "lgx-port-name": "LGX Back.29",
86 "lgx-port-rack": "000000.00",
87 "lgx-port-shelf": "00"
92 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
93 "port-type": "router",
94 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
95 "port-rack": "000000.00",
99 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
100 "lgx-port-name": "LGX Back.30",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
107 "due-date": "2016-11-28T00:00:01Z",
108 "operator-contact": "pw1234"
112 WAITING = 20 # nominal value is 300
116 cls.processes = test_utils.start_tpce()
117 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
120 def tearDownClass(cls):
121 # pylint: disable=not-an-iterable
122 for process in cls.processes:
123 test_utils.shutdown_process(process)
124 print("all processes killed")
126 def setUp(self): # instruction executed before each test method
127 print("execution of {}".format(self.id().split(".")[-1]))
129 def test_01_connect_xpdrA(self):
130 response = test_utils.mount_device("XPDR-A1", 'xpdra')
131 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
133 def test_02_connect_xpdrC(self):
134 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
135 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
137 def test_03_connect_rdmA(self):
138 response = test_utils.mount_device("ROADM-A1", 'roadma')
139 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
141 def test_04_connect_rdmC(self):
142 response = test_utils.mount_device("ROADM-C1", 'roadmc')
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
146 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
147 "ROADM-A1", "1", "SRG1-PP1-TXRX")
148 self.assertEqual(response.status_code, requests.codes.ok)
149 res = response.json()
150 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
153 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
154 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
155 "ROADM-A1", "1", "SRG1-PP1-TXRX")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
161 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
162 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
163 "ROADM-C1", "1", "SRG1-PP1-TXRX")
164 self.assertEqual(response.status_code, requests.codes.ok)
165 res = response.json()
166 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
169 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
170 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
171 "ROADM-C1", "1", "SRG1-PP1-TXRX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
177 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
178 # Config ROADMA-ROADMC oms-attributes
180 "auto-spanloss": "true",
181 "spanloss-base": 11.4,
182 "spanloss-current": 12,
183 "engineered-spanloss": 12.2,
184 "link-concatenation": [{
187 "SRLG-length": 100000,
189 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
190 self.assertEqual(response.status_code, requests.codes.created)
192 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
193 # Config ROADMC-ROADMA oms-attributes
195 "auto-spanloss": "true",
196 "spanloss-base": 11.4,
197 "spanloss-current": 12,
198 "engineered-spanloss": 12.2,
199 "link-concatenation": [{
202 "SRLG-length": 100000,
204 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
205 self.assertEqual(response.status_code, requests.codes.created)
207 # test service-create for Eth service from xpdr to xpdr
208 def test_11_create_eth_service1(self):
209 self.cr_serv_sample_data["input"]["service-name"] = "service1"
210 response = test_utils.service_create_request(self.cr_serv_sample_data)
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
213 self.assertIn('PCE calculation in progress',
214 res['output']['configuration-response-common']['response-message'])
215 time.sleep(self.WAITING)
217 def test_12_get_eth_service1(self):
218 response = test_utils.get_service_list_request("services/service1")
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
222 res['services'][0]['administrative-state'], 'inService')
224 res['services'][0]['service-name'], 'service1')
226 res['services'][0]['connection-type'], 'service')
228 res['services'][0]['lifecycle-state'], 'planned')
231 def test_13_check_xc1_ROADMA(self):
232 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
236 self.assertDictEqual(
238 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
239 'opticalControlMode': 'gainLoss',
240 'target-output-power': -3.0
241 }, **res['roadm-connections'][0]),
242 res['roadm-connections'][0]
244 self.assertDictEqual(
245 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
246 res['roadm-connections'][0]['source'])
247 self.assertDictEqual(
248 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
249 res['roadm-connections'][0]['destination'])
252 def test_14_check_xc1_ROADMC(self):
253 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
257 self.assertDictEqual(
259 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
260 'opticalControlMode': 'gainLoss',
261 'target-output-power': -3.0
262 }, **res['roadm-connections'][0]),
263 res['roadm-connections'][0]
265 self.assertDictEqual(
266 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
267 res['roadm-connections'][0]['source'])
268 self.assertDictEqual(
269 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
270 res['roadm-connections'][0]['destination'])
273 def test_15_check_topo_XPDRA(self):
274 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
277 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
279 if ele['tp-id'] == 'XPDR1-NETWORK1':
280 self.assertEqual({u'frequency': 196.1,
282 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
283 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
284 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
285 if ele['tp-id'] == 'XPDR1-NETWORK2':
286 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
289 def test_16_check_topo_ROADMA_SRG1(self):
290 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
291 self.assertEqual(response.status_code, requests.codes.ok)
292 res = response.json()
293 freq_map = base64.b64decode(
294 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
295 freq_map_array = [int(x) for x in freq_map]
296 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
297 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
299 if ele['tp-id'] == 'SRG1-PP1-TXRX':
300 freq_map = base64.b64decode(
301 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
302 freq_map_array = [int(x) for x in freq_map]
303 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
304 if ele['tp-id'] == 'SRG1-PP2-TXRX':
305 self.assertNotIn('avail-freq-maps', dict.keys(ele))
308 def test_17_check_topo_ROADMA_DEG1(self):
309 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 freq_map = base64.b64decode(
313 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
314 freq_map_array = [int(x) for x in freq_map]
315 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
316 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
318 if ele['tp-id'] == 'DEG2-CTP-TXRX':
319 freq_map = base64.b64decode(
320 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
321 freq_map_array = [int(x) for x in freq_map]
322 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
323 if ele['tp-id'] == 'DEG2-TTP-TXRX':
324 freq_map = base64.b64decode(
325 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
326 freq_map_array = [int(x) for x in freq_map]
327 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
330 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
331 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
332 "ROADM-A1", "1", "SRG1-PP2-TXRX")
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
338 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
339 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
340 "ROADM-A1", "1", "SRG1-PP2-TXRX")
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
343 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
346 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
347 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
348 "ROADM-C1", "1", "SRG1-PP2-TXRX")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
354 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
355 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
356 "ROADM-C1", "1", "SRG1-PP2-TXRX")
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
362 def test_22_create_eth_service2(self):
363 self.cr_serv_sample_data["input"]["service-name"] = "service2"
364 response = test_utils.service_create_request(self.cr_serv_sample_data)
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 self.assertIn('PCE calculation in progress',
368 res['output']['configuration-response-common']['response-message'])
369 time.sleep(self.WAITING)
371 def test_23_get_eth_service2(self):
372 response = test_utils.get_service_list_request("services/service2")
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
376 res['services'][0]['administrative-state'],
379 res['services'][0]['service-name'], 'service2')
381 res['services'][0]['connection-type'], 'service')
383 res['services'][0]['lifecycle-state'], 'planned')
386 def test_24_check_xc2_ROADMA(self):
387 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
391 self.assertDictEqual(
393 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
394 'opticalControlMode': 'power'
395 }, **res['roadm-connections'][0]),
396 res['roadm-connections'][0]
398 self.assertDictEqual(
399 {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
400 res['roadm-connections'][0]['source'])
401 self.assertDictEqual(
402 {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
403 res['roadm-connections'][0]['destination'])
405 def test_25_check_topo_XPDRA(self):
406 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
407 self.assertEqual(response.status_code, requests.codes.ok)
408 res = response.json()
409 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
411 if ele['tp-id'] == 'XPDR1-NETWORK1':
412 self.assertEqual({u'frequency': 196.1,
414 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
415 if ele['tp-id'] == 'XPDR1-NETWORK2':
416 self.assertEqual({u'frequency': 196.05,
418 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
419 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
420 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
423 def test_26_check_topo_ROADMA_SRG1(self):
424 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
425 self.assertEqual(response.status_code, requests.codes.ok)
426 res = response.json()
427 freq_map = base64.b64decode(
428 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
429 freq_map_array = [int(x) for x in freq_map]
430 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
431 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
432 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
434 if ele['tp-id'] == 'SRG1-PP1-TXRX':
435 freq_map = base64.b64decode(
436 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
437 freq_map_array = [int(x) for x in freq_map]
438 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
439 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
440 if ele['tp-id'] == 'SRG1-PP2-TXRX':
441 freq_map = base64.b64decode(
442 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
443 freq_map_array = [int(x) for x in freq_map]
444 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
445 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
446 if ele['tp-id'] == 'SRG1-PP3-TXRX':
447 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
450 def test_27_check_topo_ROADMA_DEG2(self):
451 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
452 self.assertEqual(response.status_code, requests.codes.ok)
453 res = response.json()
454 freq_map = base64.b64decode(
455 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
456 freq_map_array = [int(x) for x in freq_map]
457 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
458 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
459 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
461 if ele['tp-id'] == 'DEG2-CTP-TXRX':
462 freq_map = base64.b64decode(
463 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
464 freq_map_array = [int(x) for x in freq_map]
465 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
466 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
467 if ele['tp-id'] == 'DEG2-TTP-TXRX':
468 freq_map = base64.b64decode(
469 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
470 freq_map_array = [int(x) for x in freq_map]
471 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
472 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
475 # creation service test on a non-available resource
476 def test_28_create_eth_service3(self):
477 self.cr_serv_sample_data["input"]["service-name"] = "service3"
478 response = test_utils.service_create_request(self.cr_serv_sample_data)
479 self.assertEqual(response.status_code, requests.codes.ok)
480 res = response.json()
481 self.assertIn('PCE calculation in progress',
482 res['output']['configuration-response-common']['response-message'])
483 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
484 time.sleep(self.WAITING)
486 # add a test that check the openroadm-service-list still only contains 2 elements
487 def test_29_delete_eth_service3(self):
488 response = test_utils.service_delete_request("service3")
489 self.assertEqual(response.status_code, requests.codes.ok)
490 res = response.json()
491 self.assertIn('Service \'service3\' does not exist in datastore',
492 res['output']['configuration-response-common']['response-message'])
493 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
496 def test_30_delete_eth_service1(self):
497 response = test_utils.service_delete_request("service1")
498 self.assertEqual(response.status_code, requests.codes.ok)
499 res = response.json()
500 self.assertIn('Renderer service delete in progress',
501 res['output']['configuration-response-common']['response-message'])
504 def test_31_delete_eth_service2(self):
505 response = test_utils.service_delete_request("service2")
506 self.assertEqual(response.status_code, requests.codes.ok)
507 res = response.json()
508 self.assertIn('Renderer service delete in progress',
509 res['output']['configuration-response-common']['response-message'])
512 def test_32_check_no_xc_ROADMA(self):
513 response = test_utils.check_netconf_node_request("ROADM-A1", "")
514 res = response.json()
515 self.assertEqual(response.status_code, requests.codes.ok)
516 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
519 def test_33_check_topo_XPDRA(self):
520 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
525 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
526 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
527 elif ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
528 self.assertIn(u'tail-equipment-id',
529 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
530 self.assertNotIn('wavelength', dict.keys(
531 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
534 def test_34_check_topo_ROADMA_SRG1(self):
535 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 freq_map = base64.b64decode(
539 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
540 freq_map_array = [int(x) for x in freq_map]
541 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
542 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
543 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
545 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
546 freq_map = base64.b64decode(
547 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
548 freq_map_array = [int(x) for x in freq_map]
549 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
550 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
551 elif ele['tp-id'] == 'SRG1-CP-TXRX':
552 freq_map = base64.b64decode(
553 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
554 freq_map_array = [int(x) for x in freq_map]
555 self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
556 self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
558 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
561 def test_35_check_topo_ROADMA_DEG2(self):
562 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
563 self.assertEqual(response.status_code, requests.codes.ok)
564 res = response.json()
565 freq_map = base64.b64decode(
566 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
567 freq_map_array = [int(x) for x in freq_map]
568 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
569 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
570 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
572 if ele['tp-id'] == 'DEG2-CTP-TXRX':
573 freq_map = base64.b64decode(
574 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
575 freq_map_array = [int(x) for x in freq_map]
576 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
577 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
578 if ele['tp-id'] == 'DEG2-TTP-TXRX':
579 freq_map = base64.b64decode(
580 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
581 freq_map_array = [int(x) for x in freq_map]
582 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
583 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
586 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
587 def test_36_create_oc_service1(self):
588 self.cr_serv_sample_data["input"]["service-name"] = "service1"
589 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
590 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
591 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
592 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
593 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
594 response = test_utils.service_create_request(self.cr_serv_sample_data)
595 self.assertEqual(response.status_code, requests.codes.ok)
596 res = response.json()
597 self.assertIn('PCE calculation in progress',
598 res['output']['configuration-response-common']['response-message'])
599 time.sleep(self.WAITING)
601 def test_37_get_oc_service1(self):
602 response = test_utils.get_service_list_request("services/service1")
603 self.assertEqual(response.status_code, requests.codes.ok)
604 res = response.json()
606 res['services'][0]['administrative-state'],
609 res['services'][0]['service-name'], 'service1')
611 res['services'][0]['connection-type'], 'roadm-line')
613 res['services'][0]['lifecycle-state'], 'planned')
616 def test_38_check_xc1_ROADMA(self):
617 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
618 self.assertEqual(response.status_code, requests.codes.ok)
619 res = response.json()
620 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
621 self.assertDictEqual(
623 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
624 'opticalControlMode': 'gainLoss',
625 'target-output-power': -3.0
626 }, **res['roadm-connections'][0]),
627 res['roadm-connections'][0]
629 self.assertDictEqual(
630 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
631 res['roadm-connections'][0]['source'])
632 self.assertDictEqual(
633 {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
634 res['roadm-connections'][0]['destination'])
637 def test_39_check_xc1_ROADMC(self):
638 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
639 self.assertEqual(response.status_code, requests.codes.ok)
640 res = response.json()
641 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
642 self.assertDictEqual(
644 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
645 'opticalControlMode': 'gainLoss',
646 'target-output-power': -3.0
647 }, **res['roadm-connections'][0]),
648 res['roadm-connections'][0]
650 self.assertDictEqual(
651 {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
652 res['roadm-connections'][0]['source'])
653 self.assertDictEqual(
654 {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
655 res['roadm-connections'][0]['destination'])
658 def test_40_create_oc_service2(self):
659 self.cr_serv_sample_data["input"]["service-name"] = "service2"
660 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
661 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
662 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
663 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
664 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
665 response = test_utils.service_create_request(self.cr_serv_sample_data)
666 self.assertEqual(response.status_code, requests.codes.ok)
667 res = response.json()
668 self.assertIn('PCE calculation in progress',
669 res['output']['configuration-response-common']['response-message'])
670 time.sleep(self.WAITING)
672 def test_41_get_oc_service2(self):
673 response = test_utils.get_service_list_request("services/service2")
674 self.assertEqual(response.status_code, requests.codes.ok)
675 res = response.json()
677 res['services'][0]['administrative-state'],
680 res['services'][0]['service-name'], 'service2')
682 res['services'][0]['connection-type'], 'roadm-line')
684 res['services'][0]['lifecycle-state'], 'planned')
687 def test_42_check_xc2_ROADMA(self):
688 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
689 self.assertEqual(response.status_code, requests.codes.ok)
690 res = response.json()
691 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
692 self.assertDictEqual(
694 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
695 'opticalControlMode': 'gainLoss',
696 'target-output-power': -3.0
697 }, **res['roadm-connections'][0]),
698 res['roadm-connections'][0]
700 self.assertDictEqual(
701 {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
702 res['roadm-connections'][0]['source'])
703 self.assertDictEqual(
704 {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
705 res['roadm-connections'][0]['destination'])
708 def test_43_check_topo_ROADMA(self):
709 self.test_26_check_topo_ROADMA_SRG1()
710 self.test_27_check_topo_ROADMA_DEG2()
713 def test_44_delete_oc_service1(self):
714 response = test_utils.service_delete_request("service1")
715 self.assertEqual(response.status_code, requests.codes.ok)
716 res = response.json()
717 self.assertIn('Renderer service delete in progress',
718 res['output']['configuration-response-common']['response-message'])
721 def test_45_delete_oc_service2(self):
722 response = test_utils.service_delete_request("service2")
723 self.assertEqual(response.status_code, requests.codes.ok)
724 res = response.json()
725 self.assertIn('Renderer service delete in progress',
726 res['output']['configuration-response-common']['response-message'])
729 def test_46_get_no_oc_services(self):
731 response = test_utils.get_service_list_request("")
732 self.assertEqual(response.status_code, requests.codes.conflict)
733 res = response.json()
735 {"error-type": "application", "error-tag": "data-missing",
736 "error-message": "Request could not be completed because the relevant data model content does not exist"},
737 res['errors']['error'])
740 def test_47_get_no_xc_ROADMA(self):
741 response = test_utils.check_netconf_node_request("ROADM-A1", "")
742 self.assertEqual(response.status_code, requests.codes.ok)
743 res = response.json()
744 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
747 def test_48_check_topo_ROADMA(self):
748 self.test_34_check_topo_ROADMA_SRG1()
749 self.test_35_check_topo_ROADMA_DEG2()
751 def test_49_loop_create_eth_service(self):
752 for i in range(1, 6):
753 print("iteration number {}".format(i))
754 print("eth service creation")
755 self.test_11_create_eth_service1()
756 print("check xc in ROADM-A1")
757 self.test_13_check_xc1_ROADMA()
758 print("check xc in ROADM-C1")
759 self.test_14_check_xc1_ROADMC()
760 print("eth service deletion\n")
761 self.test_30_delete_eth_service1()
763 def test_50_loop_create_oc_service(self):
764 response = test_utils.get_service_list_request("services/service1")
765 if response.status_code != 404:
766 response = test_utils.service_delete_request("service1")
769 for i in range(1, 6):
770 print("iteration number {}".format(i))
771 print("oc service creation")
772 self.test_36_create_oc_service1()
773 print("check xc in ROADM-A1")
774 self.test_38_check_xc1_ROADMA()
775 print("check xc in ROADM-C1")
776 self.test_39_check_xc1_ROADMC()
777 print("oc service deletion\n")
778 self.test_44_delete_oc_service1()
780 def test_51_disconnect_XPDRA(self):
781 response = test_utils.unmount_device("XPDR-A1")
782 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
784 def test_52_disconnect_XPDRC(self):
785 response = test_utils.unmount_device("XPDR-C1")
786 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
788 def test_53_disconnect_ROADMA(self):
789 response = test_utils.unmount_device("ROADM-A1")
790 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
792 def test_54_disconnect_ROADMC(self):
793 response = test_utils.unmount_device("ROADM-C1")
794 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
797 if __name__ == "__main__":
798 unittest.main(verbosity=2)