2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
22 class TransportPCEFulltesting(unittest.TestCase):
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
26 "rpc-action": "service-create",
27 "request-system-id": "appname",
29 "http://localhost:8585/NotificationServer/notify"
31 "service-name": "service1",
32 "common-id": "ASATT1234567",
33 "connection-type": "service",
35 "service-rate": "100",
37 "service-format": "Ethernet",
38 "clli": "SNJSCAMCJP8",
42 "ROUTER_SNJSCAMCJP8_000000.00_00",
43 "port-type": "router",
44 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
45 "port-rack": "000000.00",
50 "LGX Panel_SNJSCAMCJP8_000000.00_00",
51 "lgx-port-name": "LGX Back.3",
52 "lgx-port-rack": "000000.00",
53 "lgx-port-shelf": "00"
59 "ROUTER_SNJSCAMCJP8_000000.00_00",
60 "port-type": "router",
61 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
62 "port-rack": "000000.00",
67 "LGX Panel_SNJSCAMCJP8_000000.00_00",
68 "lgx-port-name": "LGX Back.4",
69 "lgx-port-rack": "000000.00",
70 "lgx-port-shelf": "00"
76 "service-rate": "100",
78 "service-format": "Ethernet",
79 "clli": "SNJSCAMCJT4",
83 "ROUTER_SNJSCAMCJT4_000000.00_00",
84 "port-type": "router",
85 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
86 "port-rack": "000000.00",
91 "LGX Panel_SNJSCAMCJT4_000000.00_00",
92 "lgx-port-name": "LGX Back.29",
93 "lgx-port-rack": "000000.00",
94 "lgx-port-shelf": "00"
100 "ROUTER_SNJSCAMCJT4_000000.00_00",
101 "port-type": "router",
102 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
103 "port-rack": "000000.00",
108 "LGX Panel_SNJSCAMCJT4_000000.00_00",
109 "lgx-port-name": "LGX Back.30",
110 "lgx-port-rack": "000000.00",
111 "lgx-port-shelf": "00"
116 "due-date": "2016-11-28T00:00:01Z",
117 "operator-contact": "pw1234"
122 NODE_VERSION = '1.2.1'
126 cls.processes = test_utils.start_tpce()
127 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
128 ('roadma-full', cls.NODE_VERSION),
129 ('roadmc-full', cls.NODE_VERSION),
130 ('xpdrc', cls.NODE_VERSION)])
133 def tearDownClass(cls):
134 # pylint: disable=not-an-iterable
135 for process in cls.processes:
136 test_utils.shutdown_process(process)
137 print("all processes killed")
139 def setUp(self): # instruction executed before each test method
140 print("execution of {}".format(self.id().split(".")[-1]))
142 # connect netconf devices
143 def test_01_connect_xpdrA(self):
144 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_02_connect_xpdrC(self):
148 response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_03_connect_rdmA(self):
152 response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_04_connect_rdmC(self):
156 response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
157 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
159 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
160 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
161 "ROADMA01", "1", "SRG1-PP1-TXRX")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Xponder Roadm Link created successfully',
165 res["output"]["result"])
168 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
169 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
170 "ROADMA01", "1", "SRG1-PP1-TXRX")
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
173 self.assertIn('Roadm Xponder links created successfully',
174 res["output"]["result"])
177 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
178 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
179 "ROADMC01", "1", "SRG1-PP1-TXRX")
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertIn('Xponder Roadm Link created successfully',
183 res["output"]["result"])
186 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
187 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
188 "ROADMC01", "1", "SRG1-PP1-TXRX")
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Roadm Xponder links created successfully',
192 res["output"]["result"])
195 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
196 # Config ROADMA-ROADMC oms-attributes
198 "auto-spanloss": "true",
199 "spanloss-base": 11.4,
200 "spanloss-current": 12,
201 "engineered-spanloss": 12.2,
202 "link-concatenation": [{
205 "SRLG-length": 100000,
207 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
208 self.assertEqual(response.status_code, requests.codes.created)
210 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
211 # Config ROADMC-ROADMA oms-attributes
213 "auto-spanloss": "true",
214 "spanloss-base": 11.4,
215 "spanloss-current": 12,
216 "engineered-spanloss": 12.2,
217 "link-concatenation": [{
220 "SRLG-length": 100000,
222 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
223 self.assertEqual(response.status_code, requests.codes.created)
225 # test service-create for Eth service from xpdr to xpdr
226 def test_11_create_eth_service1(self):
227 self.cr_serv_sample_data["input"]["service-name"] = "service1"
228 response = test_utils.service_create_request(self.cr_serv_sample_data)
229 self.assertEqual(response.status_code, requests.codes.ok)
230 res = response.json()
231 self.assertIn('PCE calculation in progress',
232 res['output']['configuration-response-common'][
234 time.sleep(self.WAITING)
236 def test_12_get_eth_service1(self):
237 response = test_utils.get_service_list_request("services/service1")
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
241 res['services'][0]['administrative-state'],
244 res['services'][0]['service-name'], 'service1')
246 res['services'][0]['connection-type'], 'service')
248 res['services'][0]['lifecycle-state'], 'planned')
251 def test_13_check_xc1_ROADMA(self):
252 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
255 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
256 self.assertDictEqual(
258 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
259 'wavelength-number': 1,
260 'opticalControlMode': 'gainLoss',
261 'target-output-power': -3.0
262 }, **res['roadm-connections'][0]),
263 res['roadm-connections'][0]
265 self.assertDictEqual(
266 {'src-if': 'SRG1-PP1-TXRX-761:768'},
267 res['roadm-connections'][0]['source'])
268 self.assertDictEqual(
269 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
270 res['roadm-connections'][0]['destination'])
273 def test_14_check_xc1_ROADMC(self):
274 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
277 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
278 self.assertDictEqual(
280 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
281 'wavelength-number': 1,
282 'opticalControlMode': 'gainLoss',
283 'target-output-power': 2.0
284 }, **res['roadm-connections'][0]),
285 res['roadm-connections'][0]
287 self.assertDictEqual(
288 {'src-if': 'SRG1-PP1-TXRX-761:768'},
289 res['roadm-connections'][0]['source'])
290 self.assertDictEqual(
291 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
292 res['roadm-connections'][0]['destination'])
295 def test_15_check_topo_XPDRA(self):
296 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
297 self.assertEqual(response.status_code, requests.codes.ok)
298 res = response.json()
299 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
301 if ele['tp-id'] == 'XPDR1-NETWORK1':
302 self.assertEqual({u'frequency': 196.1,
304 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
305 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
306 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
307 if ele['tp-id'] == 'XPDR1-NETWORK2':
308 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
311 def test_16_check_topo_ROADMA_SRG1(self):
312 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
313 self.assertEqual(response.status_code, requests.codes.ok)
314 res = response.json()
315 freq_map = base64.b64decode(
316 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
317 freq_map_array = [int(x) for x in freq_map]
318 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
319 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
321 if ele['tp-id'] == 'SRG1-PP1-TXRX':
322 freq_map = base64.b64decode(
323 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
324 freq_map_array = [int(x) for x in freq_map]
325 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
326 if ele['tp-id'] == 'SRG1-PP2-TXRX':
327 self.assertNotIn('avail-freq-maps', dict.keys(ele))
330 def test_17_check_topo_ROADMA_DEG1(self):
331 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 freq_map = base64.b64decode(
335 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
336 freq_map_array = [int(x) for x in freq_map]
337 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
338 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
340 if ele['tp-id'] == 'DEG2-CTP-TXRX':
341 freq_map = base64.b64decode(
342 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
343 freq_map_array = [int(x) for x in freq_map]
344 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
345 if ele['tp-id'] == 'DEG2-TTP-TXRX':
346 freq_map = base64.b64decode(
347 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
348 freq_map_array = [int(x) for x in freq_map]
349 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
352 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
353 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
354 "ROADMA01", "1", "SRG1-PP2-TXRX")
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 self.assertIn('Xponder Roadm Link created successfully',
358 res["output"]["result"])
361 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
362 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
363 "ROADMA01", "1", "SRG1-PP2-TXRX")
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
366 self.assertIn('Roadm Xponder links created successfully',
367 res["output"]["result"])
370 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
371 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
372 "ROADMC01", "1", "SRG1-PP2-TXRX")
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
375 self.assertIn('Xponder Roadm Link created successfully',
376 res["output"]["result"])
379 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
380 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
381 "ROADMC01", "1", "SRG1-PP2-TXRX")
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
384 self.assertIn('Roadm Xponder links created successfully',
385 res["output"]["result"])
388 def test_22_create_eth_service2(self):
389 self.cr_serv_sample_data["input"]["service-name"] = "service2"
390 response = test_utils.service_create_request(self.cr_serv_sample_data)
391 self.assertEqual(response.status_code, requests.codes.ok)
392 res = response.json()
393 self.assertIn('PCE calculation in progress',
394 res['output']['configuration-response-common'][
396 time.sleep(self.WAITING)
398 def test_23_get_eth_service2(self):
399 response = test_utils.get_service_list_request("services/service2")
400 self.assertEqual(response.status_code, requests.codes.ok)
401 res = response.json()
403 res['services'][0]['administrative-state'],
406 res['services'][0]['service-name'], 'service2')
408 res['services'][0]['connection-type'], 'service')
410 res['services'][0]['lifecycle-state'], 'planned')
413 def test_24_check_xc2_ROADMA(self):
414 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760")
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
418 self.assertDictEqual(
420 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760',
421 'wavelength-number': 2,
422 'opticalControlMode': 'power'
423 }, **res['roadm-connections'][0]),
424 res['roadm-connections'][0]
426 self.assertDictEqual(
427 {'src-if': 'DEG1-TTP-TXRX-753:760'},
428 res['roadm-connections'][0]['source'])
429 self.assertDictEqual(
430 {'dst-if': 'SRG1-PP2-TXRX-753:760'},
431 res['roadm-connections'][0]['destination'])
433 def test_25_check_topo_XPDRA(self):
434 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
435 self.assertEqual(response.status_code, requests.codes.ok)
436 res = response.json()
437 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
439 if ele['tp-id'] == 'XPDR1-NETWORK1':
440 self.assertEqual({u'frequency': 196.1,
442 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
443 if ele['tp-id'] == 'XPDR1-NETWORK2':
444 self.assertEqual({u'frequency': 196.05,
446 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
447 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
448 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
451 def test_26_check_topo_ROADMA_SRG1(self):
452 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
453 self.assertEqual(response.status_code, requests.codes.ok)
454 res = response.json()
455 freq_map = base64.b64decode(
456 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
457 freq_map_array = [int(x) for x in freq_map]
458 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
459 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
460 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
462 if ele['tp-id'] == 'SRG1-PP1-TXRX':
463 freq_map = base64.b64decode(
464 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
465 freq_map_array = [int(x) for x in freq_map]
466 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
467 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
468 if ele['tp-id'] == 'SRG1-PP2-TXRX':
469 freq_map = base64.b64decode(
470 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
471 freq_map_array = [int(x) for x in freq_map]
472 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
473 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
474 if ele['tp-id'] == 'SRG1-PP3-TXRX':
475 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
478 def test_27_check_topo_ROADMA_DEG1(self):
479 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
480 self.assertEqual(response.status_code, requests.codes.ok)
481 res = response.json()
482 freq_map = base64.b64decode(
483 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
484 freq_map_array = [int(x) for x in freq_map]
485 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
486 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
487 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
489 if ele['tp-id'] == 'DEG2-CTP-TXRX':
490 freq_map = base64.b64decode(
491 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
492 freq_map_array = [int(x) for x in freq_map]
493 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
494 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
495 if ele['tp-id'] == 'DEG2-TTP-TXRX':
496 freq_map = base64.b64decode(
497 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
498 freq_map_array = [int(x) for x in freq_map]
499 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
500 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
503 # creation service test on a non-available resource
504 def test_28_create_eth_service3(self):
505 self.cr_serv_sample_data["input"]["service-name"] = "service3"
506 response = test_utils.service_create_request(self.cr_serv_sample_data)
507 self.assertEqual(response.status_code, requests.codes.ok)
508 res = response.json()
509 self.assertIn('PCE calculation in progress',
510 res['output']['configuration-response-common'][
512 self.assertIn('200', res['output']['configuration-response-common'][
514 time.sleep(self.WAITING)
516 # add a test that check the openroadm-service-list still only
517 # contains 2 elements
519 def test_29_delete_eth_service3(self):
520 response = test_utils.service_delete_request("service3")
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 self.assertIn('Service \'service3\' does not exist in datastore',
524 res['output']['configuration-response-common'][
526 self.assertIn('500', res['output']['configuration-response-common'][
530 def test_30_delete_eth_service1(self):
531 response = test_utils.service_delete_request("service1")
532 self.assertEqual(response.status_code, requests.codes.ok)
533 res = response.json()
534 self.assertIn('Renderer service delete in progress',
535 res['output']['configuration-response-common'][
539 def test_31_delete_eth_service2(self):
540 response = test_utils.service_delete_request("service2")
541 self.assertEqual(response.status_code, requests.codes.ok)
542 res = response.json()
543 self.assertIn('Renderer service delete in progress',
544 res['output']['configuration-response-common'][
548 def test_32_check_no_xc_ROADMA(self):
549 response = test_utils.check_netconf_node_request("ROADMA01", "")
550 res = response.json()
551 self.assertEqual(response.status_code, requests.codes.ok)
552 self.assertNotIn('roadm-connections',
553 dict.keys(res['org-openroadm-device']))
556 def test_33_check_topo_XPDRA(self):
557 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
558 self.assertEqual(response.status_code, requests.codes.ok)
559 res = response.json()
560 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
562 if ((ele[u'org-openroadm-common-network:tp-type'] ==
564 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
565 'tp-id'] == 'XPDR1-CLIENT3')):
567 'org-openroadm-network-topology:xpdr-client-attributes',
569 elif (ele[u'org-openroadm-common-network:tp-type'] ==
571 self.assertIn(u'tail-equipment-id', dict.keys(
572 ele[u'org-openroadm-network-topology:'
573 u'xpdr-network-attributes']))
574 self.assertNotIn('wavelength', dict.keys(
575 ele[u'org-openroadm-network-topology:'
576 u'xpdr-network-attributes']))
579 def test_34_check_topo_ROADMA_SRG1(self):
580 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
581 self.assertEqual(response.status_code, requests.codes.ok)
582 res = response.json()
583 freq_map = base64.b64decode(
584 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
585 freq_map_array = [int(x) for x in freq_map]
586 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
587 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
588 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
590 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
591 freq_map = base64.b64decode(
592 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
593 freq_map_array = [int(x) for x in freq_map]
594 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
595 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
596 elif ele['tp-id'] == 'SRG1-CP-TXRX':
597 freq_map = base64.b64decode(
598 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
599 freq_map_array = [int(x) for x in freq_map]
600 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
601 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
603 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
606 def test_35_check_topo_ROADMA_DEG1(self):
607 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
608 self.assertEqual(response.status_code, requests.codes.ok)
609 res = response.json()
610 freq_map = base64.b64decode(
611 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
612 freq_map_array = [int(x) for x in freq_map]
613 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
614 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
615 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
617 if ele['tp-id'] == 'DEG2-CTP-TXRX':
618 freq_map = base64.b64decode(
619 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
620 freq_map_array = [int(x) for x in freq_map]
621 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
622 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
623 if ele['tp-id'] == 'DEG2-TTP-TXRX':
624 freq_map = base64.b64decode(
625 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
626 freq_map_array = [int(x) for x in freq_map]
627 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
628 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
631 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
632 def test_36_create_oc_service1(self):
633 self.cr_serv_sample_data["input"]["service-name"] = "service1"
634 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
635 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
636 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
637 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
638 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
639 response = test_utils.service_create_request(self.cr_serv_sample_data)
640 self.assertEqual(response.status_code, requests.codes.ok)
641 res = response.json()
642 self.assertIn('PCE calculation in progress',
643 res['output']['configuration-response-common'][
645 time.sleep(self.WAITING)
647 def test_37_get_oc_service1(self):
648 response = test_utils.get_service_list_request("services/service1")
649 self.assertEqual(response.status_code, requests.codes.ok)
650 res = response.json()
652 res['services'][0]['administrative-state'],
655 res['services'][0]['service-name'], 'service1')
657 res['services'][0]['connection-type'], 'roadm-line')
659 res['services'][0]['lifecycle-state'], 'planned')
662 def test_38_check_xc1_ROADMA(self):
663 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
664 self.assertEqual(response.status_code, requests.codes.ok)
665 res = response.json()
666 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
667 self.assertDictEqual(
669 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
670 'wavelength-number': 1,
671 'opticalControlMode': 'gainLoss',
672 'target-output-power': -3.0
673 }, **res['roadm-connections'][0]),
674 res['roadm-connections'][0]
676 self.assertDictEqual(
677 {'src-if': 'SRG1-PP1-TXRX-761:768'},
678 res['roadm-connections'][0]['source'])
679 self.assertDictEqual(
680 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
681 res['roadm-connections'][0]['destination'])
684 def test_39_check_xc1_ROADMC(self):
685 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
686 self.assertEqual(response.status_code, requests.codes.ok)
687 res = response.json()
688 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
689 self.assertDictEqual(
691 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
692 'wavelength-number': 1,
693 'opticalControlMode': 'gainLoss',
694 'target-output-power': 2.0
695 }, **res['roadm-connections'][0]),
696 res['roadm-connections'][0]
698 self.assertDictEqual(
699 {'src-if': 'SRG1-PP1-TXRX-761:768'},
700 res['roadm-connections'][0]['source'])
701 self.assertDictEqual(
702 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
703 res['roadm-connections'][0]['destination'])
706 def test_40_create_oc_service2(self):
707 self.cr_serv_sample_data["input"]["service-name"] = "service2"
708 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
709 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
710 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
711 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
712 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
713 response = test_utils.service_create_request(self.cr_serv_sample_data)
714 self.assertEqual(response.status_code, requests.codes.ok)
715 res = response.json()
716 self.assertIn('PCE calculation in progress',
717 res['output']['configuration-response-common'][
719 time.sleep(self.WAITING)
721 def test_41_get_oc_service2(self):
722 response = test_utils.get_service_list_request("services/service2")
723 self.assertEqual(response.status_code, requests.codes.ok)
724 res = response.json()
726 res['services'][0]['administrative-state'],
729 res['services'][0]['service-name'], 'service2')
731 res['services'][0]['connection-type'], 'roadm-line')
733 res['services'][0]['lifecycle-state'], 'planned')
736 def test_42_check_xc2_ROADMA(self):
737 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760")
738 self.assertEqual(response.status_code, requests.codes.ok)
739 res = response.json()
740 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
741 self.assertDictEqual(
743 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760',
744 'wavelength-number': 2,
745 'opticalControlMode': 'gainLoss',
746 'target-output-power': -3.0
747 }, **res['roadm-connections'][0]),
748 res['roadm-connections'][0]
750 self.assertDictEqual(
751 {'src-if': 'SRG1-PP2-TXRX-753:760'},
752 res['roadm-connections'][0]['source'])
753 self.assertDictEqual(
754 {'dst-if': 'DEG1-TTP-TXRX-753:760'},
755 res['roadm-connections'][0]['destination'])
758 def test_43_check_topo_ROADMA(self):
759 self.test_26_check_topo_ROADMA_SRG1()
760 self.test_27_check_topo_ROADMA_DEG1()
763 def test_44_delete_oc_service1(self):
764 response = test_utils.service_delete_request("service1")
765 self.assertEqual(response.status_code, requests.codes.ok)
766 res = response.json()
767 self.assertIn('Renderer service delete in progress',
768 res['output']['configuration-response-common'][
772 def test_45_delete_oc_service2(self):
773 response = test_utils.service_delete_request("service2")
774 self.assertEqual(response.status_code, requests.codes.ok)
775 res = response.json()
776 self.assertIn('Renderer service delete in progress',
777 res['output']['configuration-response-common'][
781 def test_46_get_no_oc_services(self):
783 response = test_utils.get_service_list_request("")
784 self.assertEqual(response.status_code, requests.codes.conflict)
785 res = response.json()
788 "error-type": "application",
789 "error-tag": "data-missing",
791 "Request could not be completed because the relevant data "
792 "model content does not exist"
794 res['errors']['error'])
797 def test_47_get_no_xc_ROADMA(self):
798 response = test_utils.check_netconf_node_request("ROADMA01", "")
799 self.assertEqual(response.status_code, requests.codes.ok)
800 res = response.json()
801 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
804 def test_48_check_topo_ROADMA(self):
805 self.test_34_check_topo_ROADMA_SRG1()
806 self.test_35_check_topo_ROADMA_DEG1()
808 def test_49_loop_create_eth_service(self):
809 for i in range(1, 6):
810 print("iteration number {}".format(i))
811 print("eth service creation")
812 self.test_11_create_eth_service1()
813 print("check xc in ROADMA01")
814 self.test_13_check_xc1_ROADMA()
815 print("check xc in ROADMC01")
816 self.test_14_check_xc1_ROADMC()
817 print("eth service deletion\n")
818 self.test_30_delete_eth_service1()
820 def test_50_loop_create_oc_service(self):
821 response = test_utils.get_service_list_request("services/service1")
822 if response.status_code != 404:
823 response = test_utils.service_delete_request("service1")
826 for i in range(1, 6):
827 print("iteration number {}".format(i))
828 print("oc service creation")
829 self.test_36_create_oc_service1()
830 print("check xc in ROADMA01")
831 self.test_38_check_xc1_ROADMA()
832 print("check xc in ROADMC01")
833 self.test_39_check_xc1_ROADMC()
834 print("oc service deletion\n")
835 self.test_44_delete_oc_service1()
837 def test_51_disconnect_XPDRA(self):
838 response = test_utils.unmount_device("XPDRA01")
839 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
841 def test_52_disconnect_XPDRC(self):
842 response = test_utils.unmount_device("XPDRC01")
843 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
845 def test_53_disconnect_ROADMA(self):
846 response = test_utils.unmount_device("ROADMA01")
847 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
849 def test_54_disconnect_ROADMC(self):
850 response = test_utils.unmount_device("ROADMC01")
851 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
854 if __name__ == "__main__":
855 unittest.main(verbosity=2)