2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 from common import test_utils
21 class TransportPCEFulltesting(unittest.TestCase):
22 cr_serv_sample_data = {"input": {
23 "sdnc-request-header": {
24 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
25 "rpc-action": "service-create",
26 "request-system-id": "appname",
28 "http://localhost:8585/NotificationServer/notify"
30 "service-name": "service1",
31 "common-id": "ASATT1234567",
32 "connection-type": "service",
34 "service-rate": "100",
36 "service-format": "Ethernet",
37 "clli": "SNJSCAMCJP8",
41 "ROUTER_SNJSCAMCJP8_000000.00_00",
42 "port-type": "router",
43 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
44 "port-rack": "000000.00",
49 "LGX Panel_SNJSCAMCJP8_000000.00_00",
50 "lgx-port-name": "LGX Back.3",
51 "lgx-port-rack": "000000.00",
52 "lgx-port-shelf": "00"
58 "ROUTER_SNJSCAMCJP8_000000.00_00",
59 "port-type": "router",
60 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
61 "port-rack": "000000.00",
66 "LGX Panel_SNJSCAMCJP8_000000.00_00",
67 "lgx-port-name": "LGX Back.4",
68 "lgx-port-rack": "000000.00",
69 "lgx-port-shelf": "00"
75 "service-rate": "100",
77 "service-format": "Ethernet",
78 "clli": "SNJSCAMCJT4",
82 "ROUTER_SNJSCAMCJT4_000000.00_00",
83 "port-type": "router",
84 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
85 "port-rack": "000000.00",
90 "LGX Panel_SNJSCAMCJT4_000000.00_00",
91 "lgx-port-name": "LGX Back.29",
92 "lgx-port-rack": "000000.00",
93 "lgx-port-shelf": "00"
99 "ROUTER_SNJSCAMCJT4_000000.00_00",
100 "port-type": "router",
101 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
102 "port-rack": "000000.00",
107 "LGX Panel_SNJSCAMCJT4_000000.00_00",
108 "lgx-port-name": "LGX Back.30",
109 "lgx-port-rack": "000000.00",
110 "lgx-port-shelf": "00"
115 "due-date": "2016-11-28T00:00:01Z",
116 "operator-contact": "pw1234"
124 cls.processes = test_utils.start_tpce()
125 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
128 def tearDownClass(cls):
129 # pylint: disable=not-an-iterable
130 for process in cls.processes:
131 test_utils.shutdown_process(process)
132 print("all processes killed")
134 def setUp(self): # instruction executed before each test method
135 print("execution of {}".format(self.id().split(".")[-1]))
137 # connect netconf devices
138 def test_01_connect_xpdrA(self):
139 response = test_utils.mount_device("XPDRA01", 'xpdra')
140 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
142 def test_02_connect_xpdrC(self):
143 response = test_utils.mount_device("XPDRC01", 'xpdrc')
144 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146 def test_03_connect_rdmA(self):
147 response = test_utils.mount_device("ROADMA01", 'roadma-full')
148 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150 def test_04_connect_rdmC(self):
151 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
152 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
154 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
155 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
156 "ROADMA01", "1", "SRG1-PP1-TXRX")
157 self.assertEqual(response.status_code, requests.codes.ok)
158 res = response.json()
159 self.assertIn('Xponder Roadm Link created successfully',
160 res["output"]["result"])
163 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
164 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
165 "ROADMA01", "1", "SRG1-PP1-TXRX")
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertIn('Roadm Xponder links created successfully',
169 res["output"]["result"])
172 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
173 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
174 "ROADMC01", "1", "SRG1-PP1-TXRX")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Xponder Roadm Link created successfully',
178 res["output"]["result"])
181 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
182 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
183 "ROADMC01", "1", "SRG1-PP1-TXRX")
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 self.assertIn('Roadm Xponder links created successfully',
187 res["output"]["result"])
190 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
191 # Config ROADMA-ROADMC oms-attributes
193 "auto-spanloss": "true",
194 "spanloss-base": 11.4,
195 "spanloss-current": 12,
196 "engineered-spanloss": 12.2,
197 "link-concatenation": [{
200 "SRLG-length": 100000,
202 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
203 self.assertEqual(response.status_code, requests.codes.created)
205 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
206 # Config ROADMC-ROADMA oms-attributes
208 "auto-spanloss": "true",
209 "spanloss-base": 11.4,
210 "spanloss-current": 12,
211 "engineered-spanloss": 12.2,
212 "link-concatenation": [{
215 "SRLG-length": 100000,
217 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
218 self.assertEqual(response.status_code, requests.codes.created)
220 # test service-create for Eth service from xpdr to xpdr
221 def test_11_create_eth_service1(self):
222 self.cr_serv_sample_data["input"]["service-name"] = "service1"
223 response = test_utils.service_create_request(self.cr_serv_sample_data)
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
226 self.assertIn('PCE calculation in progress',
227 res['output']['configuration-response-common'][
229 time.sleep(self.WAITING)
231 def test_12_get_eth_service1(self):
232 response = test_utils.get_service_list_request("services/service1")
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
236 res['services'][0]['administrative-state'],
239 res['services'][0]['service-name'], 'service1')
241 res['services'][0]['connection-type'], 'service')
243 res['services'][0]['lifecycle-state'], 'planned')
246 def test_13_check_xc1_ROADMA(self):
247 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
250 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
251 self.assertDictEqual(
253 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
254 'wavelength-number': 1,
255 'opticalControlMode': 'gainLoss',
256 'target-output-power': -3.0
257 }, **res['roadm-connections'][0]),
258 res['roadm-connections'][0]
260 self.assertDictEqual(
261 {'src-if': 'SRG1-PP1-TXRX-1'},
262 res['roadm-connections'][0]['source'])
263 self.assertDictEqual(
264 {'dst-if': 'DEG1-TTP-TXRX-1'},
265 res['roadm-connections'][0]['destination'])
268 def test_14_check_xc1_ROADMC(self):
269 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
270 self.assertEqual(response.status_code, requests.codes.ok)
271 res = response.json()
272 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
273 self.assertDictEqual(
275 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
276 'wavelength-number': 1,
277 'opticalControlMode': 'gainLoss',
278 'target-output-power': 2.0
279 }, **res['roadm-connections'][0]),
280 res['roadm-connections'][0]
282 self.assertDictEqual(
283 {'src-if': 'SRG1-PP1-TXRX-1'},
284 res['roadm-connections'][0]['source'])
285 self.assertDictEqual(
286 {'dst-if': 'DEG2-TTP-TXRX-1'},
287 res['roadm-connections'][0]['destination'])
290 def test_15_check_topo_XPDRA(self):
291 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
292 self.assertEqual(response.status_code, requests.codes.ok)
293 res = response.json()
294 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
296 if ele['tp-id'] == 'XPDR1-NETWORK1':
297 self.assertEqual({u'frequency': 196.1,
299 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
300 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
301 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
302 if ele['tp-id'] == 'XPDR1-NETWORK2':
303 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
306 def test_16_check_topo_ROADMA_SRG1(self):
307 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 freq_map = base64.b64decode(
311 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
312 freq_map_array = [int(x) for x in freq_map]
313 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
314 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
316 if ele['tp-id'] == 'SRG1-PP1-TXRX':
317 freq_map = base64.b64decode(
318 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
319 freq_map_array = [int(x) for x in freq_map]
320 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
321 if ele['tp-id'] == 'SRG1-PP2-TXRX':
322 self.assertNotIn('avail-freq-maps', dict.keys(ele))
325 def test_17_check_topo_ROADMA_DEG1(self):
326 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
327 self.assertEqual(response.status_code, requests.codes.ok)
328 res = response.json()
329 freq_map = base64.b64decode(
330 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
331 freq_map_array = [int(x) for x in freq_map]
332 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
333 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
335 if ele['tp-id'] == 'DEG2-CTP-TXRX':
336 freq_map = base64.b64decode(
337 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
338 freq_map_array = [int(x) for x in freq_map]
339 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
340 if ele['tp-id'] == 'DEG2-TTP-TXRX':
341 freq_map = base64.b64decode(
342 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
343 freq_map_array = [int(x) for x in freq_map]
344 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
347 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
348 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
349 "ROADMA01", "1", "SRG1-PP2-TXRX")
350 self.assertEqual(response.status_code, requests.codes.ok)
351 res = response.json()
352 self.assertIn('Xponder Roadm Link created successfully',
353 res["output"]["result"])
356 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
357 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
358 "ROADMA01", "1", "SRG1-PP2-TXRX")
359 self.assertEqual(response.status_code, requests.codes.ok)
360 res = response.json()
361 self.assertIn('Roadm Xponder links created successfully',
362 res["output"]["result"])
365 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
366 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
367 "ROADMC01", "1", "SRG1-PP2-TXRX")
368 self.assertEqual(response.status_code, requests.codes.ok)
369 res = response.json()
370 self.assertIn('Xponder Roadm Link created successfully',
371 res["output"]["result"])
374 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
375 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
376 "ROADMC01", "1", "SRG1-PP2-TXRX")
377 self.assertEqual(response.status_code, requests.codes.ok)
378 res = response.json()
379 self.assertIn('Roadm Xponder links created successfully',
380 res["output"]["result"])
383 def test_22_create_eth_service2(self):
384 self.cr_serv_sample_data["input"]["service-name"] = "service2"
385 response = test_utils.service_create_request(self.cr_serv_sample_data)
386 self.assertEqual(response.status_code, requests.codes.ok)
387 res = response.json()
388 self.assertIn('PCE calculation in progress',
389 res['output']['configuration-response-common'][
391 time.sleep(self.WAITING)
393 def test_23_get_eth_service2(self):
394 response = test_utils.get_service_list_request("services/service2")
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
398 res['services'][0]['administrative-state'],
401 res['services'][0]['service-name'], 'service2')
403 res['services'][0]['connection-type'], 'service')
405 res['services'][0]['lifecycle-state'], 'planned')
408 def test_24_check_xc2_ROADMA(self):
409 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
410 self.assertEqual(response.status_code, requests.codes.ok)
411 res = response.json()
412 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
413 self.assertDictEqual(
415 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
416 'wavelength-number': 2,
417 'opticalControlMode': 'power'
418 }, **res['roadm-connections'][0]),
419 res['roadm-connections'][0]
421 self.assertDictEqual(
422 {'src-if': 'DEG1-TTP-TXRX-2'},
423 res['roadm-connections'][0]['source'])
424 self.assertDictEqual(
425 {'dst-if': 'SRG1-PP2-TXRX-2'},
426 res['roadm-connections'][0]['destination'])
428 def test_25_check_topo_XPDRA(self):
429 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
430 self.assertEqual(response.status_code, requests.codes.ok)
431 res = response.json()
432 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
434 if ele['tp-id'] == 'XPDR1-NETWORK1':
435 self.assertEqual({u'frequency': 196.1,
437 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
438 if ele['tp-id'] == 'XPDR1-NETWORK2':
439 self.assertEqual({u'frequency': 196.05,
441 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
442 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
443 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
446 def test_26_check_topo_ROADMA_SRG1(self):
447 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
448 self.assertEqual(response.status_code, requests.codes.ok)
449 res = response.json()
450 freq_map = base64.b64decode(
451 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
452 freq_map_array = [int(x) for x in freq_map]
453 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
454 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
455 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
457 if ele['tp-id'] == 'SRG1-PP1-TXRX':
458 freq_map = base64.b64decode(
459 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
460 freq_map_array = [int(x) for x in freq_map]
461 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
462 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
463 if ele['tp-id'] == 'SRG1-PP2-TXRX':
464 freq_map = base64.b64decode(
465 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
466 freq_map_array = [int(x) for x in freq_map]
467 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
468 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
469 if ele['tp-id'] == 'SRG1-PP3-TXRX':
470 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
473 def test_27_check_topo_ROADMA_DEG1(self):
474 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
475 self.assertEqual(response.status_code, requests.codes.ok)
476 res = response.json()
477 freq_map = base64.b64decode(
478 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
479 freq_map_array = [int(x) for x in freq_map]
480 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
481 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
482 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
484 if ele['tp-id'] == 'DEG2-CTP-TXRX':
485 freq_map = base64.b64decode(
486 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
487 freq_map_array = [int(x) for x in freq_map]
488 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
489 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
490 if ele['tp-id'] == 'DEG2-TTP-TXRX':
491 freq_map = base64.b64decode(
492 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
493 freq_map_array = [int(x) for x in freq_map]
494 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
495 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
498 # creation service test on a non-available resource
499 def test_28_create_eth_service3(self):
500 self.cr_serv_sample_data["input"]["service-name"] = "service3"
501 response = test_utils.service_create_request(self.cr_serv_sample_data)
502 self.assertEqual(response.status_code, requests.codes.ok)
503 res = response.json()
504 self.assertIn('PCE calculation in progress',
505 res['output']['configuration-response-common'][
507 self.assertIn('200', res['output']['configuration-response-common'][
509 time.sleep(self.WAITING)
511 # add a test that check the openroadm-service-list still only
512 # contains 2 elements
514 def test_29_delete_eth_service3(self):
515 response = test_utils.service_delete_request("service3")
516 self.assertEqual(response.status_code, requests.codes.ok)
517 res = response.json()
518 self.assertIn('Service \'service3\' does not exist in datastore',
519 res['output']['configuration-response-common'][
521 self.assertIn('500', res['output']['configuration-response-common'][
525 def test_30_delete_eth_service1(self):
526 response = test_utils.service_delete_request("service1")
527 self.assertEqual(response.status_code, requests.codes.ok)
528 res = response.json()
529 self.assertIn('Renderer service delete in progress',
530 res['output']['configuration-response-common'][
534 def test_31_delete_eth_service2(self):
535 response = test_utils.service_delete_request("service2")
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 self.assertIn('Renderer service delete in progress',
539 res['output']['configuration-response-common'][
543 def test_32_check_no_xc_ROADMA(self):
544 response = test_utils.check_netconf_node_request("ROADMA01", "")
545 res = response.json()
546 self.assertEqual(response.status_code, requests.codes.ok)
547 self.assertNotIn('roadm-connections',
548 dict.keys(res['org-openroadm-device']))
551 def test_33_check_topo_XPDRA(self):
552 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
553 self.assertEqual(response.status_code, requests.codes.ok)
554 res = response.json()
555 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
557 if ((ele[u'org-openroadm-common-network:tp-type'] ==
559 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
560 'tp-id'] == 'XPDR1-CLIENT3')):
562 'org-openroadm-network-topology:xpdr-client-attributes',
564 elif (ele[u'org-openroadm-common-network:tp-type'] ==
566 self.assertIn(u'tail-equipment-id', dict.keys(
567 ele[u'org-openroadm-network-topology:'
568 u'xpdr-network-attributes']))
569 self.assertNotIn('wavelength', dict.keys(
570 ele[u'org-openroadm-network-topology:'
571 u'xpdr-network-attributes']))
574 def test_34_check_topo_ROADMA_SRG1(self):
575 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
576 self.assertEqual(response.status_code, requests.codes.ok)
577 res = response.json()
578 freq_map = base64.b64decode(
579 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
580 freq_map_array = [int(x) for x in freq_map]
581 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
582 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
583 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
585 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
586 freq_map = base64.b64decode(
587 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
588 freq_map_array = [int(x) for x in freq_map]
589 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
590 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
591 elif ele['tp-id'] == 'SRG1-CP-TXRX':
592 freq_map = base64.b64decode(
593 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
594 freq_map_array = [int(x) for x in freq_map]
595 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
596 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
598 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
601 def test_35_check_topo_ROADMA_DEG1(self):
602 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
603 self.assertEqual(response.status_code, requests.codes.ok)
604 res = response.json()
605 freq_map = base64.b64decode(
606 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
607 freq_map_array = [int(x) for x in freq_map]
608 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
609 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
610 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
612 if ele['tp-id'] == 'DEG2-CTP-TXRX':
613 freq_map = base64.b64decode(
614 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
615 freq_map_array = [int(x) for x in freq_map]
616 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
617 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
618 if ele['tp-id'] == 'DEG2-TTP-TXRX':
619 freq_map = base64.b64decode(
620 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
621 freq_map_array = [int(x) for x in freq_map]
622 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
623 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
626 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
627 def test_36_create_oc_service1(self):
628 self.cr_serv_sample_data["input"]["service-name"] = "service1"
629 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
630 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
631 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
632 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
633 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
634 response = test_utils.service_create_request(self.cr_serv_sample_data)
635 self.assertEqual(response.status_code, requests.codes.ok)
636 res = response.json()
637 self.assertIn('PCE calculation in progress',
638 res['output']['configuration-response-common'][
640 time.sleep(self.WAITING)
642 def test_37_get_oc_service1(self):
643 response = test_utils.get_service_list_request("services/service1")
644 self.assertEqual(response.status_code, requests.codes.ok)
645 res = response.json()
647 res['services'][0]['administrative-state'],
650 res['services'][0]['service-name'], 'service1')
652 res['services'][0]['connection-type'], 'roadm-line')
654 res['services'][0]['lifecycle-state'], 'planned')
657 def test_38_check_xc1_ROADMA(self):
658 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
659 self.assertEqual(response.status_code, requests.codes.ok)
660 res = response.json()
661 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
662 self.assertDictEqual(
664 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
665 'wavelength-number': 1,
666 'opticalControlMode': 'gainLoss',
667 'target-output-power': -3.0
668 }, **res['roadm-connections'][0]),
669 res['roadm-connections'][0]
671 self.assertDictEqual(
672 {'src-if': 'SRG1-PP1-TXRX-1'},
673 res['roadm-connections'][0]['source'])
674 self.assertDictEqual(
675 {'dst-if': 'DEG1-TTP-TXRX-1'},
676 res['roadm-connections'][0]['destination'])
679 def test_39_check_xc1_ROADMC(self):
680 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
681 self.assertEqual(response.status_code, requests.codes.ok)
682 res = response.json()
683 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
684 self.assertDictEqual(
686 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
687 'wavelength-number': 1,
688 'opticalControlMode': 'gainLoss',
689 'target-output-power': 2.0
690 }, **res['roadm-connections'][0]),
691 res['roadm-connections'][0]
693 self.assertDictEqual(
694 {'src-if': 'SRG1-PP1-TXRX-1'},
695 res['roadm-connections'][0]['source'])
696 self.assertDictEqual(
697 {'dst-if': 'DEG2-TTP-TXRX-1'},
698 res['roadm-connections'][0]['destination'])
701 def test_40_create_oc_service2(self):
702 self.cr_serv_sample_data["input"]["service-name"] = "service2"
703 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
704 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
705 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
706 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
707 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
708 response = test_utils.service_create_request(self.cr_serv_sample_data)
709 self.assertEqual(response.status_code, requests.codes.ok)
710 res = response.json()
711 self.assertIn('PCE calculation in progress',
712 res['output']['configuration-response-common'][
714 time.sleep(self.WAITING)
716 def test_41_get_oc_service2(self):
717 response = test_utils.get_service_list_request("services/service2")
718 self.assertEqual(response.status_code, requests.codes.ok)
719 res = response.json()
721 res['services'][0]['administrative-state'],
724 res['services'][0]['service-name'], 'service2')
726 res['services'][0]['connection-type'], 'roadm-line')
728 res['services'][0]['lifecycle-state'], 'planned')
731 def test_42_check_xc2_ROADMA(self):
732 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
733 self.assertEqual(response.status_code, requests.codes.ok)
734 res = response.json()
735 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
736 self.assertDictEqual(
738 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
739 'wavelength-number': 2,
740 'opticalControlMode': 'gainLoss',
741 'target-output-power': -3.0
742 }, **res['roadm-connections'][0]),
743 res['roadm-connections'][0]
745 self.assertDictEqual(
746 {'src-if': 'SRG1-PP2-TXRX-2'},
747 res['roadm-connections'][0]['source'])
748 self.assertDictEqual(
749 {'dst-if': 'DEG1-TTP-TXRX-2'},
750 res['roadm-connections'][0]['destination'])
753 def test_43_check_topo_ROADMA(self):
754 self.test_26_check_topo_ROADMA_SRG1()
755 self.test_27_check_topo_ROADMA_DEG1()
758 def test_44_delete_oc_service1(self):
759 response = test_utils.service_delete_request("service1")
760 self.assertEqual(response.status_code, requests.codes.ok)
761 res = response.json()
762 self.assertIn('Renderer service delete in progress',
763 res['output']['configuration-response-common'][
767 def test_45_delete_oc_service2(self):
768 response = test_utils.service_delete_request("service2")
769 self.assertEqual(response.status_code, requests.codes.ok)
770 res = response.json()
771 self.assertIn('Renderer service delete in progress',
772 res['output']['configuration-response-common'][
776 def test_46_get_no_oc_services(self):
778 response = test_utils.get_service_list_request("")
779 self.assertEqual(response.status_code, requests.codes.conflict)
780 res = response.json()
783 "error-type": "application",
784 "error-tag": "data-missing",
786 "Request could not be completed because the relevant data "
787 "model content does not exist"
789 res['errors']['error'])
792 def test_47_get_no_xc_ROADMA(self):
793 response = test_utils.check_netconf_node_request("ROADMA01", "")
794 self.assertEqual(response.status_code, requests.codes.ok)
795 res = response.json()
796 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
799 def test_48_check_topo_ROADMA(self):
800 self.test_34_check_topo_ROADMA_SRG1()
801 self.test_35_check_topo_ROADMA_DEG1()
803 def test_49_loop_create_eth_service(self):
804 for i in range(1, 6):
805 print("iteration number {}".format(i))
806 print("eth service creation")
807 self.test_11_create_eth_service1()
808 print("check xc in ROADMA01")
809 self.test_13_check_xc1_ROADMA()
810 print("check xc in ROADMC01")
811 self.test_14_check_xc1_ROADMC()
812 print("eth service deletion\n")
813 self.test_30_delete_eth_service1()
815 def test_50_loop_create_oc_service(self):
816 response = test_utils.get_service_list_request("services/service1")
817 if response.status_code != 404:
818 response = test_utils.service_delete_request("service1")
821 for i in range(1, 6):
822 print("iteration number {}".format(i))
823 print("oc service creation")
824 self.test_36_create_oc_service1()
825 print("check xc in ROADMA01")
826 self.test_38_check_xc1_ROADMA()
827 print("check xc in ROADMC01")
828 self.test_39_check_xc1_ROADMC()
829 print("oc service deletion\n")
830 self.test_44_delete_oc_service1()
832 def test_51_disconnect_XPDRA(self):
833 response = test_utils.unmount_device("XPDRA01")
834 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
836 def test_52_disconnect_XPDRC(self):
837 response = test_utils.unmount_device("XPDRC01")
838 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
840 def test_53_disconnect_ROADMA(self):
841 response = test_utils.unmount_device("ROADMA01")
842 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
844 def test_54_disconnect_ROADMC(self):
845 response = test_utils.unmount_device("ROADMC01")
846 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
849 if __name__ == "__main__":
850 unittest.main(verbosity=2)