2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 sys.path.append('transportpce_tests/common/')
23 class TransportPCEFulltesting(unittest.TestCase):
24 cr_serv_sample_data = {"input": {
25 "sdnc-request-header": {
26 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
27 "rpc-action": "service-create",
28 "request-system-id": "appname",
30 "http://localhost:8585/NotificationServer/notify"
32 "service-name": "service1",
33 "common-id": "ASATT1234567",
34 "connection-type": "service",
36 "service-rate": "100",
38 "service-format": "Ethernet",
39 "clli": "SNJSCAMCJP8",
43 "ROUTER_SNJSCAMCJP8_000000.00_00",
44 "port-type": "router",
45 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
46 "port-rack": "000000.00",
51 "LGX Panel_SNJSCAMCJP8_000000.00_00",
52 "lgx-port-name": "LGX Back.3",
53 "lgx-port-rack": "000000.00",
54 "lgx-port-shelf": "00"
60 "ROUTER_SNJSCAMCJP8_000000.00_00",
61 "port-type": "router",
62 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
63 "port-rack": "000000.00",
68 "LGX Panel_SNJSCAMCJP8_000000.00_00",
69 "lgx-port-name": "LGX Back.4",
70 "lgx-port-rack": "000000.00",
71 "lgx-port-shelf": "00"
77 "service-rate": "100",
79 "service-format": "Ethernet",
80 "clli": "SNJSCAMCJT4",
84 "ROUTER_SNJSCAMCJT4_000000.00_00",
85 "port-type": "router",
86 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
87 "port-rack": "000000.00",
92 "LGX Panel_SNJSCAMCJT4_000000.00_00",
93 "lgx-port-name": "LGX Back.29",
94 "lgx-port-rack": "000000.00",
95 "lgx-port-shelf": "00"
101 "ROUTER_SNJSCAMCJT4_000000.00_00",
102 "port-type": "router",
103 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
104 "port-rack": "000000.00",
109 "LGX Panel_SNJSCAMCJT4_000000.00_00",
110 "lgx-port-name": "LGX Back.30",
111 "lgx-port-rack": "000000.00",
112 "lgx-port-shelf": "00"
117 "due-date": "2016-11-28T00:00:01Z",
118 "operator-contact": "pw1234"
123 NODE_VERSION = '1.2.1'
127 cls.processes = test_utils.start_tpce()
128 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
129 ('roadma-full', cls.NODE_VERSION),
130 ('roadmc-full', cls.NODE_VERSION),
131 ('xpdrc', cls.NODE_VERSION)])
134 def tearDownClass(cls):
135 # pylint: disable=not-an-iterable
136 for process in cls.processes:
137 test_utils.shutdown_process(process)
138 print("all processes killed")
140 def setUp(self): # instruction executed before each test method
141 print("execution of {}".format(self.id().split(".")[-1]))
143 # connect netconf devices
144 def test_01_connect_xpdrA(self):
145 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_02_connect_xpdrC(self):
149 response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_03_connect_rdmA(self):
153 response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_04_connect_rdmC(self):
157 response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
158 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
160 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
161 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
162 "ROADMA01", "1", "SRG1-PP1-TXRX")
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
165 self.assertIn('Xponder Roadm Link created successfully',
166 res["output"]["result"])
169 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
170 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
171 "ROADMA01", "1", "SRG1-PP1-TXRX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.assertIn('Roadm Xponder links created successfully',
175 res["output"]["result"])
178 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
179 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
180 "ROADMC01", "1", "SRG1-PP1-TXRX")
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
183 self.assertIn('Xponder Roadm Link created successfully',
184 res["output"]["result"])
187 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
188 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
189 "ROADMC01", "1", "SRG1-PP1-TXRX")
190 self.assertEqual(response.status_code, requests.codes.ok)
191 res = response.json()
192 self.assertIn('Roadm Xponder links created successfully',
193 res["output"]["result"])
196 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
197 # Config ROADMA-ROADMC oms-attributes
199 "auto-spanloss": "true",
200 "spanloss-base": 11.4,
201 "spanloss-current": 12,
202 "engineered-spanloss": 12.2,
203 "link-concatenation": [{
206 "SRLG-length": 100000,
208 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
209 self.assertEqual(response.status_code, requests.codes.created)
211 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
212 # Config ROADMC-ROADMA oms-attributes
214 "auto-spanloss": "true",
215 "spanloss-base": 11.4,
216 "spanloss-current": 12,
217 "engineered-spanloss": 12.2,
218 "link-concatenation": [{
221 "SRLG-length": 100000,
223 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
224 self.assertEqual(response.status_code, requests.codes.created)
226 # test service-create for Eth service from xpdr to xpdr
227 def test_11_create_eth_service1(self):
228 self.cr_serv_sample_data["input"]["service-name"] = "service1"
229 response = test_utils.service_create_request(self.cr_serv_sample_data)
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
232 self.assertIn('PCE calculation in progress',
233 res['output']['configuration-response-common'][
235 time.sleep(self.WAITING)
237 def test_12_get_eth_service1(self):
238 response = test_utils.get_service_list_request("services/service1")
239 self.assertEqual(response.status_code, requests.codes.ok)
240 res = response.json()
242 res['services'][0]['administrative-state'],
245 res['services'][0]['service-name'], 'service1')
247 res['services'][0]['connection-type'], 'service')
249 res['services'][0]['lifecycle-state'], 'planned')
252 def test_13_check_xc1_ROADMA(self):
253 response = test_utils.check_netconf_node_request(
254 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
255 self.assertEqual(response.status_code, requests.codes.ok)
256 res = response.json()
257 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
258 self.assertDictEqual(
260 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
261 'wavelength-number': 1,
262 'opticalControlMode': 'gainLoss',
263 'target-output-power': -3.0
264 }, **res['roadm-connections'][0]),
265 res['roadm-connections'][0]
267 self.assertDictEqual(
268 {'src-if': 'SRG1-PP1-TXRX-761:768'},
269 res['roadm-connections'][0]['source'])
270 self.assertDictEqual(
271 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
272 res['roadm-connections'][0]['destination'])
275 def test_14_check_xc1_ROADMC(self):
276 response = test_utils.check_netconf_node_request(
277 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
280 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
281 self.assertDictEqual(
283 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
284 'wavelength-number': 1,
285 'opticalControlMode': 'gainLoss',
286 'target-output-power': 2.0
287 }, **res['roadm-connections'][0]),
288 res['roadm-connections'][0]
290 self.assertDictEqual(
291 {'src-if': 'SRG1-PP1-TXRX-761:768'},
292 res['roadm-connections'][0]['source'])
293 self.assertDictEqual(
294 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
295 res['roadm-connections'][0]['destination'])
298 def test_15_check_topo_XPDRA(self):
299 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
300 self.assertEqual(response.status_code, requests.codes.ok)
301 res = response.json()
302 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
304 if ele['tp-id'] == 'XPDR1-NETWORK1':
305 self.assertEqual({u'frequency': 196.1,
307 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
308 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
309 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
310 if ele['tp-id'] == 'XPDR1-NETWORK2':
311 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
314 def test_16_check_topo_ROADMA_SRG1(self):
315 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
316 self.assertEqual(response.status_code, requests.codes.ok)
317 res = response.json()
318 freq_map = base64.b64decode(
319 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
320 freq_map_array = [int(x) for x in freq_map]
321 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
322 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
324 if ele['tp-id'] == 'SRG1-PP1-TXRX':
325 freq_map = base64.b64decode(
326 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
327 freq_map_array = [int(x) for x in freq_map]
328 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
329 if ele['tp-id'] == 'SRG1-PP2-TXRX':
330 self.assertNotIn('avail-freq-maps', dict.keys(ele))
333 def test_17_check_topo_ROADMA_DEG1(self):
334 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
335 self.assertEqual(response.status_code, requests.codes.ok)
336 res = response.json()
337 freq_map = base64.b64decode(
338 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
339 freq_map_array = [int(x) for x in freq_map]
340 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
341 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
343 if ele['tp-id'] == 'DEG2-CTP-TXRX':
344 freq_map = base64.b64decode(
345 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
346 freq_map_array = [int(x) for x in freq_map]
347 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
348 if ele['tp-id'] == 'DEG2-TTP-TXRX':
349 freq_map = base64.b64decode(
350 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
351 freq_map_array = [int(x) for x in freq_map]
352 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
355 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
356 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
357 "ROADMA01", "1", "SRG1-PP2-TXRX")
358 self.assertEqual(response.status_code, requests.codes.ok)
359 res = response.json()
360 self.assertIn('Xponder Roadm Link created successfully',
361 res["output"]["result"])
364 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
365 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
366 "ROADMA01", "1", "SRG1-PP2-TXRX")
367 self.assertEqual(response.status_code, requests.codes.ok)
368 res = response.json()
369 self.assertIn('Roadm Xponder links created successfully',
370 res["output"]["result"])
373 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
374 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
375 "ROADMC01", "1", "SRG1-PP2-TXRX")
376 self.assertEqual(response.status_code, requests.codes.ok)
377 res = response.json()
378 self.assertIn('Xponder Roadm Link created successfully',
379 res["output"]["result"])
382 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
383 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
384 "ROADMC01", "1", "SRG1-PP2-TXRX")
385 self.assertEqual(response.status_code, requests.codes.ok)
386 res = response.json()
387 self.assertIn('Roadm Xponder links created successfully',
388 res["output"]["result"])
391 def test_22_create_eth_service2(self):
392 self.cr_serv_sample_data["input"]["service-name"] = "service2"
393 response = test_utils.service_create_request(self.cr_serv_sample_data)
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
396 self.assertIn('PCE calculation in progress',
397 res['output']['configuration-response-common'][
399 time.sleep(self.WAITING)
401 def test_23_get_eth_service2(self):
402 response = test_utils.get_service_list_request("services/service2")
403 self.assertEqual(response.status_code, requests.codes.ok)
404 res = response.json()
406 res['services'][0]['administrative-state'],
409 res['services'][0]['service-name'], 'service2')
411 res['services'][0]['connection-type'], 'service')
413 res['services'][0]['lifecycle-state'], 'planned')
416 def test_24_check_xc2_ROADMA(self):
417 response = test_utils.check_netconf_node_request(
418 "ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760")
419 self.assertEqual(response.status_code, requests.codes.ok)
420 res = response.json()
421 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
422 self.assertDictEqual(
424 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760',
425 'wavelength-number': 2,
426 'opticalControlMode': 'power'
427 }, **res['roadm-connections'][0]),
428 res['roadm-connections'][0]
430 self.assertDictEqual(
431 {'src-if': 'DEG1-TTP-TXRX-753:760'},
432 res['roadm-connections'][0]['source'])
433 self.assertDictEqual(
434 {'dst-if': 'SRG1-PP2-TXRX-753:760'},
435 res['roadm-connections'][0]['destination'])
437 def test_25_check_topo_XPDRA(self):
438 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
443 if ele['tp-id'] == 'XPDR1-NETWORK1':
444 self.assertEqual({u'frequency': 196.1,
446 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
447 if ele['tp-id'] == 'XPDR1-NETWORK2':
448 self.assertEqual({u'frequency': 196.05,
450 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
451 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
452 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
455 def test_26_check_topo_ROADMA_SRG1(self):
456 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
457 self.assertEqual(response.status_code, requests.codes.ok)
458 res = response.json()
459 freq_map = base64.b64decode(
460 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
461 freq_map_array = [int(x) for x in freq_map]
462 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
463 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
464 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
466 if ele['tp-id'] == 'SRG1-PP1-TXRX':
467 freq_map = base64.b64decode(
468 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
469 freq_map_array = [int(x) for x in freq_map]
470 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
471 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
472 if ele['tp-id'] == 'SRG1-PP2-TXRX':
473 freq_map = base64.b64decode(
474 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
475 freq_map_array = [int(x) for x in freq_map]
476 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
477 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
478 if ele['tp-id'] == 'SRG1-PP3-TXRX':
479 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
482 def test_27_check_topo_ROADMA_DEG1(self):
483 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
484 self.assertEqual(response.status_code, requests.codes.ok)
485 res = response.json()
486 freq_map = base64.b64decode(
487 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
488 freq_map_array = [int(x) for x in freq_map]
489 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
490 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
491 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
493 if ele['tp-id'] == 'DEG2-CTP-TXRX':
494 freq_map = base64.b64decode(
495 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
496 freq_map_array = [int(x) for x in freq_map]
497 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
498 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
499 if ele['tp-id'] == 'DEG2-TTP-TXRX':
500 freq_map = base64.b64decode(
501 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
502 freq_map_array = [int(x) for x in freq_map]
503 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
504 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
507 # creation service test on a non-available resource
508 def test_28_create_eth_service3(self):
509 self.cr_serv_sample_data["input"]["service-name"] = "service3"
510 response = test_utils.service_create_request(self.cr_serv_sample_data)
511 self.assertEqual(response.status_code, requests.codes.ok)
512 res = response.json()
513 self.assertIn('PCE calculation in progress',
514 res['output']['configuration-response-common'][
516 self.assertIn('200', res['output']['configuration-response-common'][
518 time.sleep(self.WAITING)
520 # add a test that check the openroadm-service-list still only
521 # contains 2 elements
523 def test_29_delete_eth_service3(self):
524 response = test_utils.service_delete_request("service3")
525 self.assertEqual(response.status_code, requests.codes.ok)
526 res = response.json()
527 self.assertIn('Service \'service3\' does not exist in datastore',
528 res['output']['configuration-response-common'][
530 self.assertIn('500', res['output']['configuration-response-common'][
534 def test_30_delete_eth_service1(self):
535 response = test_utils.service_delete_request("service1")
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 self.assertIn('Renderer service delete in progress',
539 res['output']['configuration-response-common'][
543 def test_31_delete_eth_service2(self):
544 response = test_utils.service_delete_request("service2")
545 self.assertEqual(response.status_code, requests.codes.ok)
546 res = response.json()
547 self.assertIn('Renderer service delete in progress',
548 res['output']['configuration-response-common'][
552 def test_32_check_no_xc_ROADMA(self):
553 response = test_utils.check_netconf_node_request("ROADMA01", "")
554 res = response.json()
555 self.assertEqual(response.status_code, requests.codes.ok)
556 self.assertNotIn('roadm-connections',
557 dict.keys(res['org-openroadm-device']))
560 def test_33_check_topo_XPDRA(self):
561 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
562 self.assertEqual(response.status_code, requests.codes.ok)
563 res = response.json()
564 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
566 if ((ele[u'org-openroadm-common-network:tp-type'] ==
568 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
569 'tp-id'] == 'XPDR1-CLIENT3')):
571 'org-openroadm-network-topology:xpdr-client-attributes',
573 elif (ele[u'org-openroadm-common-network:tp-type'] ==
575 self.assertIn(u'tail-equipment-id', dict.keys(
576 ele[u'org-openroadm-network-topology:'
577 u'xpdr-network-attributes']))
578 self.assertNotIn('wavelength', dict.keys(
579 ele[u'org-openroadm-network-topology:'
580 u'xpdr-network-attributes']))
583 def test_34_check_topo_ROADMA_SRG1(self):
584 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
585 self.assertEqual(response.status_code, requests.codes.ok)
586 res = response.json()
587 freq_map = base64.b64decode(
588 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
589 freq_map_array = [int(x) for x in freq_map]
590 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
591 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
592 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
594 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
595 freq_map = base64.b64decode(
596 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
597 freq_map_array = [int(x) for x in freq_map]
598 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
599 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
600 elif ele['tp-id'] == 'SRG1-CP-TXRX':
601 freq_map = base64.b64decode(
602 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
603 freq_map_array = [int(x) for x in freq_map]
604 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
605 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
607 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
610 def test_35_check_topo_ROADMA_DEG1(self):
611 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
612 self.assertEqual(response.status_code, requests.codes.ok)
613 res = response.json()
614 freq_map = base64.b64decode(
615 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
616 freq_map_array = [int(x) for x in freq_map]
617 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
618 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
619 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
621 if ele['tp-id'] == 'DEG2-CTP-TXRX':
622 freq_map = base64.b64decode(
623 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
624 freq_map_array = [int(x) for x in freq_map]
625 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
626 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
627 if ele['tp-id'] == 'DEG2-TTP-TXRX':
628 freq_map = base64.b64decode(
629 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
630 freq_map_array = [int(x) for x in freq_map]
631 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
632 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
635 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
636 def test_36_create_oc_service1(self):
637 self.cr_serv_sample_data["input"]["service-name"] = "service1"
638 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
639 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
640 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
641 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
642 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
643 response = test_utils.service_create_request(self.cr_serv_sample_data)
644 self.assertEqual(response.status_code, requests.codes.ok)
645 res = response.json()
646 self.assertIn('PCE calculation in progress',
647 res['output']['configuration-response-common'][
649 time.sleep(self.WAITING)
651 def test_37_get_oc_service1(self):
652 response = test_utils.get_service_list_request("services/service1")
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
656 res['services'][0]['administrative-state'],
659 res['services'][0]['service-name'], 'service1')
661 res['services'][0]['connection-type'], 'roadm-line')
663 res['services'][0]['lifecycle-state'], 'planned')
666 def test_38_check_xc1_ROADMA(self):
667 response = test_utils.check_netconf_node_request(
668 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
669 self.assertEqual(response.status_code, requests.codes.ok)
670 res = response.json()
671 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
672 self.assertDictEqual(
674 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
675 'wavelength-number': 1,
676 'opticalControlMode': 'gainLoss',
677 'target-output-power': -3.0
678 }, **res['roadm-connections'][0]),
679 res['roadm-connections'][0]
681 self.assertDictEqual(
682 {'src-if': 'SRG1-PP1-TXRX-761:768'},
683 res['roadm-connections'][0]['source'])
684 self.assertDictEqual(
685 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
686 res['roadm-connections'][0]['destination'])
689 def test_39_check_xc1_ROADMC(self):
690 response = test_utils.check_netconf_node_request(
691 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
692 self.assertEqual(response.status_code, requests.codes.ok)
693 res = response.json()
694 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
695 self.assertDictEqual(
697 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
698 'wavelength-number': 1,
699 'opticalControlMode': 'gainLoss',
700 'target-output-power': 2.0
701 }, **res['roadm-connections'][0]),
702 res['roadm-connections'][0]
704 self.assertDictEqual(
705 {'src-if': 'SRG1-PP1-TXRX-761:768'},
706 res['roadm-connections'][0]['source'])
707 self.assertDictEqual(
708 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
709 res['roadm-connections'][0]['destination'])
712 def test_40_create_oc_service2(self):
713 self.cr_serv_sample_data["input"]["service-name"] = "service2"
714 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
715 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
716 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
717 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
718 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
719 response = test_utils.service_create_request(self.cr_serv_sample_data)
720 self.assertEqual(response.status_code, requests.codes.ok)
721 res = response.json()
722 self.assertIn('PCE calculation in progress',
723 res['output']['configuration-response-common'][
725 time.sleep(self.WAITING)
727 def test_41_get_oc_service2(self):
728 response = test_utils.get_service_list_request("services/service2")
729 self.assertEqual(response.status_code, requests.codes.ok)
730 res = response.json()
732 res['services'][0]['administrative-state'],
735 res['services'][0]['service-name'], 'service2')
737 res['services'][0]['connection-type'], 'roadm-line')
739 res['services'][0]['lifecycle-state'], 'planned')
742 def test_42_check_xc2_ROADMA(self):
743 response = test_utils.check_netconf_node_request(
744 "ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760")
745 self.assertEqual(response.status_code, requests.codes.ok)
746 res = response.json()
747 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
748 self.assertDictEqual(
750 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760',
751 'wavelength-number': 2,
752 'opticalControlMode': 'gainLoss',
753 'target-output-power': -3.0
754 }, **res['roadm-connections'][0]),
755 res['roadm-connections'][0]
757 self.assertDictEqual(
758 {'src-if': 'SRG1-PP2-TXRX-753:760'},
759 res['roadm-connections'][0]['source'])
760 self.assertDictEqual(
761 {'dst-if': 'DEG1-TTP-TXRX-753:760'},
762 res['roadm-connections'][0]['destination'])
765 def test_43_check_topo_ROADMA(self):
766 self.test_26_check_topo_ROADMA_SRG1()
767 self.test_27_check_topo_ROADMA_DEG1()
770 def test_44_delete_oc_service1(self):
771 response = test_utils.service_delete_request("service1")
772 self.assertEqual(response.status_code, requests.codes.ok)
773 res = response.json()
774 self.assertIn('Renderer service delete in progress',
775 res['output']['configuration-response-common'][
779 def test_45_delete_oc_service2(self):
780 response = test_utils.service_delete_request("service2")
781 self.assertEqual(response.status_code, requests.codes.ok)
782 res = response.json()
783 self.assertIn('Renderer service delete in progress',
784 res['output']['configuration-response-common'][
788 def test_46_get_no_oc_services(self):
790 response = test_utils.get_service_list_request("")
791 self.assertEqual(response.status_code, requests.codes.conflict)
792 res = response.json()
795 "error-type": "application",
796 "error-tag": "data-missing",
798 "Request could not be completed because the relevant data "
799 "model content does not exist"
801 res['errors']['error'])
804 def test_47_get_no_xc_ROADMA(self):
805 response = test_utils.check_netconf_node_request("ROADMA01", "")
806 self.assertEqual(response.status_code, requests.codes.ok)
807 res = response.json()
808 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
811 def test_48_check_topo_ROADMA(self):
812 self.test_34_check_topo_ROADMA_SRG1()
813 self.test_35_check_topo_ROADMA_DEG1()
815 def test_49_loop_create_eth_service(self):
816 for i in range(1, 6):
817 print("iteration number {}".format(i))
818 print("eth service creation")
819 self.test_11_create_eth_service1()
820 print("check xc in ROADMA01")
821 self.test_13_check_xc1_ROADMA()
822 print("check xc in ROADMC01")
823 self.test_14_check_xc1_ROADMC()
824 print("eth service deletion\n")
825 self.test_30_delete_eth_service1()
827 def test_50_loop_create_oc_service(self):
828 response = test_utils.get_service_list_request("services/service1")
829 if response.status_code != 404:
830 response = test_utils.service_delete_request("service1")
833 for i in range(1, 6):
834 print("iteration number {}".format(i))
835 print("oc service creation")
836 self.test_36_create_oc_service1()
837 print("check xc in ROADMA01")
838 self.test_38_check_xc1_ROADMA()
839 print("check xc in ROADMC01")
840 self.test_39_check_xc1_ROADMC()
841 print("oc service deletion\n")
842 self.test_44_delete_oc_service1()
844 def test_51_disconnect_XPDRA(self):
845 response = test_utils.unmount_device("XPDRA01")
846 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
848 def test_52_disconnect_XPDRC(self):
849 response = test_utils.unmount_device("XPDRC01")
850 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
852 def test_53_disconnect_ROADMA(self):
853 response = test_utils.unmount_device("ROADMA01")
854 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
856 def test_54_disconnect_ROADMC(self):
857 response = test_utils.unmount_device("ROADMC01")
858 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
861 if __name__ == "__main__":
862 unittest.main(verbosity=2)