2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 # pylint: disable=wrong-import-order
20 sys.path.append('transportpce_tests/common/')
21 # pylint: disable=wrong-import-position
22 # pylint: disable=import-error
23 import test_utils # nopep8
26 class TransportPCEFulltesting(unittest.TestCase):
27 cr_serv_sample_data = {"input": {
28 "sdnc-request-header": {
29 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
30 "rpc-action": "service-create",
31 "request-system-id": "appname",
33 "http://localhost:8585/NotificationServer/notify"
35 "service-name": "service1",
36 "common-id": "ASATT1234567",
37 "connection-type": "service",
39 "service-rate": "100",
41 "service-format": "Ethernet",
42 "clli": "SNJSCAMCJP8",
43 "tx-direction": [{"index": 0}],
44 "rx-direction": [{"index": 0}],
48 "service-rate": "100",
50 "service-format": "Ethernet",
51 "clli": "SNJSCAMCJT4",
52 "tx-direction": [{"index": 0}],
53 "rx-direction": [{"index": 0}],
56 "due-date": "2016-11-28T00:00:01Z",
57 "operator-contact": "pw1234"
62 NODE_VERSION = '1.2.1'
66 cls.processes = test_utils.start_tpce()
67 cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
68 ('roadma-full', cls.NODE_VERSION),
69 ('roadmc-full', cls.NODE_VERSION),
70 ('xpdrc', cls.NODE_VERSION)])
73 def tearDownClass(cls):
74 # pylint: disable=not-an-iterable
75 for process in cls.processes:
76 test_utils.shutdown_process(process)
77 print("all processes killed")
79 def setUp(self): # instruction executed before each test method
80 # pylint: disable=consider-using-f-string
81 print("execution of {}".format(self.id().split(".")[-1]))
83 # connect netconf devices
84 def test_01_connect_xpdrA(self):
85 response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
86 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
88 def test_02_connect_xpdrC(self):
89 response = test_utils.mount_device("XPDRC01", ('xpdrc', self.NODE_VERSION))
90 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
92 def test_03_connect_rdmA(self):
93 response = test_utils.mount_device("ROADMA01", ('roadma-full', self.NODE_VERSION))
94 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
96 def test_04_connect_rdmC(self):
97 response = test_utils.mount_device("ROADMC01", ('roadmc-full', self.NODE_VERSION))
98 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
100 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
101 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
102 "ROADMA01", "1", "SRG1-PP1-TXRX")
103 self.assertEqual(response.status_code, requests.codes.ok)
104 res = response.json()
105 self.assertIn('Xponder Roadm Link created successfully',
106 res["output"]["result"])
109 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
110 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
111 "ROADMA01", "1", "SRG1-PP1-TXRX")
112 self.assertEqual(response.status_code, requests.codes.ok)
113 res = response.json()
114 self.assertIn('Roadm Xponder links created successfully',
115 res["output"]["result"])
118 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
119 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
120 "ROADMC01", "1", "SRG1-PP1-TXRX")
121 self.assertEqual(response.status_code, requests.codes.ok)
122 res = response.json()
123 self.assertIn('Xponder Roadm Link created successfully',
124 res["output"]["result"])
127 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
128 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
129 "ROADMC01", "1", "SRG1-PP1-TXRX")
130 self.assertEqual(response.status_code, requests.codes.ok)
131 res = response.json()
132 self.assertIn('Roadm Xponder links created successfully',
133 res["output"]["result"])
136 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
137 # Config ROADMA-ROADMC oms-attributes
139 "auto-spanloss": "true",
140 "spanloss-base": 11.4,
141 "spanloss-current": 12,
142 "engineered-spanloss": 12.2,
143 "link-concatenation": [{
146 "SRLG-length": 100000,
148 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
149 self.assertEqual(response.status_code, requests.codes.created)
151 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
152 # Config ROADMC-ROADMA oms-attributes
154 "auto-spanloss": "true",
155 "spanloss-base": 11.4,
156 "spanloss-current": 12,
157 "engineered-spanloss": 12.2,
158 "link-concatenation": [{
161 "SRLG-length": 100000,
163 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
164 self.assertEqual(response.status_code, requests.codes.created)
166 # test service-create for Eth service from xpdr to xpdr
167 def test_11_create_eth_service1(self):
168 self.cr_serv_sample_data["input"]["service-name"] = "service1"
169 response = test_utils.service_create_request(self.cr_serv_sample_data)
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('PCE calculation in progress',
173 res['output']['configuration-response-common'][
175 time.sleep(self.WAITING)
177 def test_12_get_eth_service1(self):
178 response = test_utils.get_service_list_request("services/service1")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
182 res['services'][0]['administrative-state'],
185 res['services'][0]['service-name'], 'service1')
187 res['services'][0]['connection-type'], 'service')
189 res['services'][0]['lifecycle-state'], 'planned')
192 def test_13_check_xc1_ROADMA(self):
193 response = test_utils.check_netconf_node_request(
194 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
195 self.assertEqual(response.status_code, requests.codes.ok)
196 res = response.json()
197 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
198 self.assertDictEqual(
200 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
201 'wavelength-number': 1,
202 'opticalControlMode': 'gainLoss',
203 'target-output-power': -3.0
204 }, **res['roadm-connections'][0]),
205 res['roadm-connections'][0]
207 self.assertDictEqual(
208 {'src-if': 'SRG1-PP1-TXRX-761:768'},
209 res['roadm-connections'][0]['source'])
210 self.assertDictEqual(
211 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
212 res['roadm-connections'][0]['destination'])
215 def test_14_check_xc1_ROADMC(self):
216 response = test_utils.check_netconf_node_request(
217 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
218 self.assertEqual(response.status_code, requests.codes.ok)
219 res = response.json()
220 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
221 self.assertDictEqual(
223 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
224 'wavelength-number': 1,
225 'opticalControlMode': 'gainLoss',
226 'target-output-power': 2.0
227 }, **res['roadm-connections'][0]),
228 res['roadm-connections'][0]
230 self.assertDictEqual(
231 {'src-if': 'SRG1-PP1-TXRX-761:768'},
232 res['roadm-connections'][0]['source'])
233 self.assertDictEqual(
234 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
235 res['roadm-connections'][0]['destination'])
238 def test_15_check_topo_XPDRA(self):
239 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
240 self.assertEqual(response.status_code, requests.codes.ok)
241 res = response.json()
242 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
244 if ele['tp-id'] == 'XPDR1-NETWORK1':
245 self.assertEqual({'frequency': 196.1,
247 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
248 elif ele['tp-id'] in ('XPDR1-CLIENT2', 'XPDR1-CLIENT1'):
249 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
250 elif ele['tp-id'] == 'XPDR1-NETWORK2':
251 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
254 def test_16_check_topo_ROADMA_SRG1(self):
255 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
256 self.assertEqual(response.status_code, requests.codes.ok)
257 res = response.json()
258 freq_map = base64.b64decode(
259 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
260 freq_map_array = [int(x) for x in freq_map]
261 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
262 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
264 if ele['tp-id'] == 'SRG1-PP1-TXRX':
265 freq_map = base64.b64decode(
266 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
267 freq_map_array = [int(x) for x in freq_map]
268 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
269 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
270 self.assertNotIn('avail-freq-maps', dict.keys(ele))
273 def test_17_check_topo_ROADMA_DEG1(self):
274 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
277 freq_map = base64.b64decode(
278 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
279 freq_map_array = [int(x) for x in freq_map]
280 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
281 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
283 if ele['tp-id'] == 'DEG2-CTP-TXRX':
284 freq_map = base64.b64decode(
285 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
286 freq_map_array = [int(x) for x in freq_map]
287 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
288 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
289 freq_map = base64.b64decode(
290 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
291 freq_map_array = [int(x) for x in freq_map]
292 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
295 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
296 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
297 "ROADMA01", "1", "SRG1-PP2-TXRX")
298 self.assertEqual(response.status_code, requests.codes.ok)
299 res = response.json()
300 self.assertIn('Xponder Roadm Link created successfully',
301 res["output"]["result"])
304 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
305 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
306 "ROADMA01", "1", "SRG1-PP2-TXRX")
307 self.assertEqual(response.status_code, requests.codes.ok)
308 res = response.json()
309 self.assertIn('Roadm Xponder links created successfully',
310 res["output"]["result"])
313 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
314 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
315 "ROADMC01", "1", "SRG1-PP2-TXRX")
316 self.assertEqual(response.status_code, requests.codes.ok)
317 res = response.json()
318 self.assertIn('Xponder Roadm Link created successfully',
319 res["output"]["result"])
322 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
323 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
324 "ROADMC01", "1", "SRG1-PP2-TXRX")
325 self.assertEqual(response.status_code, requests.codes.ok)
326 res = response.json()
327 self.assertIn('Roadm Xponder links created successfully',
328 res["output"]["result"])
331 def test_22_create_eth_service2(self):
332 self.cr_serv_sample_data["input"]["service-name"] = "service2"
333 response = test_utils.service_create_request(self.cr_serv_sample_data)
334 self.assertEqual(response.status_code, requests.codes.ok)
335 res = response.json()
336 self.assertIn('PCE calculation in progress',
337 res['output']['configuration-response-common'][
339 time.sleep(self.WAITING)
341 def test_23_get_eth_service2(self):
342 response = test_utils.get_service_list_request("services/service2")
343 self.assertEqual(response.status_code, requests.codes.ok)
344 res = response.json()
346 res['services'][0]['administrative-state'],
349 res['services'][0]['service-name'], 'service2')
351 res['services'][0]['connection-type'], 'service')
353 res['services'][0]['lifecycle-state'], 'planned')
356 def test_24_check_xc2_ROADMA(self):
357 response = test_utils.check_netconf_node_request(
358 "ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760")
359 self.assertEqual(response.status_code, requests.codes.ok)
360 res = response.json()
361 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
362 self.assertDictEqual(
364 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-753:760',
365 'wavelength-number': 2,
366 'opticalControlMode': 'power'
367 }, **res['roadm-connections'][0]),
368 res['roadm-connections'][0]
370 self.assertDictEqual(
371 {'src-if': 'DEG1-TTP-TXRX-753:760'},
372 res['roadm-connections'][0]['source'])
373 self.assertDictEqual(
374 {'dst-if': 'SRG1-PP2-TXRX-753:760'},
375 res['roadm-connections'][0]['destination'])
377 def test_25_check_topo_XPDRA(self):
378 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
379 self.assertEqual(response.status_code, requests.codes.ok)
380 res = response.json()
381 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
383 if ele['tp-id'] == 'XPDR1-NETWORK1':
384 self.assertEqual({'frequency': 196.1,
386 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
387 elif ele['tp-id'] == 'XPDR1-NETWORK2':
388 self.assertEqual({'frequency': 196.05,
390 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
391 elif ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT2'):
392 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
395 def test_26_check_topo_ROADMA_SRG1(self):
396 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
399 freq_map = base64.b64decode(
400 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
401 freq_map_array = [int(x) for x in freq_map]
402 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
403 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
404 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
406 if ele['tp-id'] == 'SRG1-PP1-TXRX':
407 freq_map = base64.b64decode(
408 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
409 freq_map_array = [int(x) for x in freq_map]
410 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
411 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
412 elif ele['tp-id'] == 'SRG1-PP2-TXRX':
413 freq_map = base64.b64decode(
414 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
415 freq_map_array = [int(x) for x in freq_map]
416 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
417 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
418 elif ele['tp-id'] == 'SRG1-PP3-TXRX':
419 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
422 def test_27_check_topo_ROADMA_DEG1(self):
423 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
424 self.assertEqual(response.status_code, requests.codes.ok)
425 res = response.json()
426 freq_map = base64.b64decode(
427 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
428 freq_map_array = [int(x) for x in freq_map]
429 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
430 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
431 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
433 if ele['tp-id'] == 'DEG2-CTP-TXRX':
434 freq_map = base64.b64decode(
435 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
436 freq_map_array = [int(x) for x in freq_map]
437 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
438 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
439 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
440 freq_map = base64.b64decode(
441 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
442 freq_map_array = [int(x) for x in freq_map]
443 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
444 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
447 # creation service test on a non-available resource
448 def test_28_create_eth_service3(self):
449 self.cr_serv_sample_data["input"]["service-name"] = "service3"
450 response = test_utils.service_create_request(self.cr_serv_sample_data)
451 self.assertEqual(response.status_code, requests.codes.ok)
452 res = response.json()
453 self.assertIn('PCE calculation in progress',
454 res['output']['configuration-response-common'][
456 self.assertIn('200', res['output']['configuration-response-common'][
458 time.sleep(self.WAITING)
460 # add a test that check the openroadm-service-list still only
461 # contains 2 elements
463 def test_29_delete_eth_service3(self):
464 response = test_utils.service_delete_request("service3")
465 self.assertEqual(response.status_code, requests.codes.ok)
466 res = response.json()
467 self.assertIn('Service \'service3\' does not exist in datastore',
468 res['output']['configuration-response-common'][
470 self.assertIn('500', res['output']['configuration-response-common'][
474 def test_30_delete_eth_service1(self):
475 response = test_utils.service_delete_request("service1")
476 self.assertEqual(response.status_code, requests.codes.ok)
477 res = response.json()
478 self.assertIn('Renderer service delete in progress',
479 res['output']['configuration-response-common'][
483 def test_31_delete_eth_service2(self):
484 response = test_utils.service_delete_request("service2")
485 self.assertEqual(response.status_code, requests.codes.ok)
486 res = response.json()
487 self.assertIn('Renderer service delete in progress',
488 res['output']['configuration-response-common'][
492 def test_32_check_no_xc_ROADMA(self):
493 response = test_utils.check_netconf_node_request("ROADMA01", "")
494 res = response.json()
495 self.assertEqual(response.status_code, requests.codes.ok)
496 self.assertNotIn('roadm-connections',
497 dict.keys(res['org-openroadm-device']))
500 def test_33_check_topo_XPDRA(self):
501 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
502 self.assertEqual(response.status_code, requests.codes.ok)
503 res = response.json()
504 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
506 if (ele['org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT'
507 and ele['tp-id'] in ('XPDR1-CLIENT1', 'XPDR1-CLIENT3')):
509 'org-openroadm-network-topology:xpdr-client-attributes',
511 elif ele['org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
512 self.assertIn('tail-equipment-id', dict.keys(
513 ele['org-openroadm-network-topology:'
514 'xpdr-network-attributes']))
515 self.assertNotIn('wavelength', dict.keys(
516 ele['org-openroadm-network-topology:'
517 'xpdr-network-attributes']))
520 def test_34_check_topo_ROADMA_SRG1(self):
521 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
522 self.assertEqual(response.status_code, requests.codes.ok)
523 res = response.json()
524 freq_map = base64.b64decode(
525 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
526 freq_map_array = [int(x) for x in freq_map]
527 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
528 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
529 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
531 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
532 freq_map = base64.b64decode(
533 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
534 freq_map_array = [int(x) for x in freq_map]
535 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
536 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
537 elif ele['tp-id'] == 'SRG1-CP-TXRX':
538 freq_map = base64.b64decode(
539 ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
540 freq_map_array = [int(x) for x in freq_map]
541 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
542 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
544 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
547 def test_35_check_topo_ROADMA_DEG1(self):
548 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
549 self.assertEqual(response.status_code, requests.codes.ok)
550 res = response.json()
551 freq_map = base64.b64decode(
552 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
553 freq_map_array = [int(x) for x in freq_map]
554 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
555 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
556 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
558 if ele['tp-id'] == 'DEG2-CTP-TXRX':
559 freq_map = base64.b64decode(
560 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
561 freq_map_array = [int(x) for x in freq_map]
562 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
563 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
564 elif ele['tp-id'] == 'DEG2-TTP-TXRX':
565 freq_map = base64.b64decode(
566 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
567 freq_map_array = [int(x) for x in freq_map]
568 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
569 self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
572 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
573 def test_36_create_oc_service1(self):
574 self.cr_serv_sample_data["input"]["service-name"] = "service1"
575 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
576 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
577 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
578 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
579 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
580 response = test_utils.service_create_request(self.cr_serv_sample_data)
581 self.assertEqual(response.status_code, requests.codes.ok)
582 res = response.json()
583 self.assertIn('PCE calculation in progress',
584 res['output']['configuration-response-common'][
586 time.sleep(self.WAITING)
588 def test_37_get_oc_service1(self):
589 response = test_utils.get_service_list_request("services/service1")
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
593 res['services'][0]['administrative-state'],
596 res['services'][0]['service-name'], 'service1')
598 res['services'][0]['connection-type'], 'roadm-line')
600 res['services'][0]['lifecycle-state'], 'planned')
603 def test_38_check_xc1_ROADMA(self):
604 response = test_utils.check_netconf_node_request(
605 "ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
606 self.assertEqual(response.status_code, requests.codes.ok)
607 res = response.json()
608 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
609 self.assertDictEqual(
611 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
612 'wavelength-number': 1,
613 'opticalControlMode': 'gainLoss',
614 'target-output-power': -3.0
615 }, **res['roadm-connections'][0]),
616 res['roadm-connections'][0]
618 self.assertDictEqual(
619 {'src-if': 'SRG1-PP1-TXRX-761:768'},
620 res['roadm-connections'][0]['source'])
621 self.assertDictEqual(
622 {'dst-if': 'DEG1-TTP-TXRX-761:768'},
623 res['roadm-connections'][0]['destination'])
626 def test_39_check_xc1_ROADMC(self):
627 response = test_utils.check_netconf_node_request(
628 "ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
629 self.assertEqual(response.status_code, requests.codes.ok)
630 res = response.json()
631 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
632 self.assertDictEqual(
634 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
635 'wavelength-number': 1,
636 'opticalControlMode': 'gainLoss',
637 'target-output-power': 2.0
638 }, **res['roadm-connections'][0]),
639 res['roadm-connections'][0]
641 self.assertDictEqual(
642 {'src-if': 'SRG1-PP1-TXRX-761:768'},
643 res['roadm-connections'][0]['source'])
644 self.assertDictEqual(
645 {'dst-if': 'DEG2-TTP-TXRX-761:768'},
646 res['roadm-connections'][0]['destination'])
649 def test_40_create_oc_service2(self):
650 self.cr_serv_sample_data["input"]["service-name"] = "service2"
651 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
652 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
653 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
654 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
655 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
656 response = test_utils.service_create_request(self.cr_serv_sample_data)
657 self.assertEqual(response.status_code, requests.codes.ok)
658 res = response.json()
659 self.assertIn('PCE calculation in progress',
660 res['output']['configuration-response-common'][
662 time.sleep(self.WAITING)
664 def test_41_get_oc_service2(self):
665 response = test_utils.get_service_list_request("services/service2")
666 self.assertEqual(response.status_code, requests.codes.ok)
667 res = response.json()
669 res['services'][0]['administrative-state'],
672 res['services'][0]['service-name'], 'service2')
674 res['services'][0]['connection-type'], 'roadm-line')
676 res['services'][0]['lifecycle-state'], 'planned')
679 def test_42_check_xc2_ROADMA(self):
680 response = test_utils.check_netconf_node_request(
681 "ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760")
682 self.assertEqual(response.status_code, requests.codes.ok)
683 res = response.json()
684 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
685 self.assertDictEqual(
687 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-753:760',
688 'wavelength-number': 2,
689 'opticalControlMode': 'gainLoss',
690 'target-output-power': -3.0
691 }, **res['roadm-connections'][0]),
692 res['roadm-connections'][0]
694 self.assertDictEqual(
695 {'src-if': 'SRG1-PP2-TXRX-753:760'},
696 res['roadm-connections'][0]['source'])
697 self.assertDictEqual(
698 {'dst-if': 'DEG1-TTP-TXRX-753:760'},
699 res['roadm-connections'][0]['destination'])
702 def test_43_check_topo_ROADMA(self):
703 self.test_26_check_topo_ROADMA_SRG1()
704 self.test_27_check_topo_ROADMA_DEG1()
707 def test_44_delete_oc_service1(self):
708 response = test_utils.service_delete_request("service1")
709 self.assertEqual(response.status_code, requests.codes.ok)
710 res = response.json()
711 self.assertIn('Renderer service delete in progress',
712 res['output']['configuration-response-common'][
716 def test_45_delete_oc_service2(self):
717 response = test_utils.service_delete_request("service2")
718 self.assertEqual(response.status_code, requests.codes.ok)
719 res = response.json()
720 self.assertIn('Renderer service delete in progress',
721 res['output']['configuration-response-common'][
725 def test_46_get_no_oc_services(self):
727 response = test_utils.get_service_list_request("")
728 self.assertEqual(response.status_code, requests.codes.conflict)
729 res = response.json()
732 "error-type": "application",
733 "error-tag": "data-missing",
735 "Request could not be completed because the relevant data "
736 "model content does not exist"
738 res['errors']['error'])
741 def test_47_get_no_xc_ROADMA(self):
742 response = test_utils.check_netconf_node_request("ROADMA01", "")
743 self.assertEqual(response.status_code, requests.codes.ok)
744 res = response.json()
745 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
748 def test_48_check_topo_ROADMA(self):
749 self.test_34_check_topo_ROADMA_SRG1()
750 self.test_35_check_topo_ROADMA_DEG1()
752 def test_49_loop_create_eth_service(self):
753 # pylint: disable=consider-using-f-string
754 for i in range(1, 6):
755 print("iteration number {}".format(i))
756 print("eth service creation")
757 self.test_11_create_eth_service1()
758 print("check xc in ROADMA01")
759 self.test_13_check_xc1_ROADMA()
760 print("check xc in ROADMC01")
761 self.test_14_check_xc1_ROADMC()
762 print("eth service deletion\n")
763 self.test_30_delete_eth_service1()
765 def test_50_loop_create_oc_service(self):
766 response = test_utils.get_service_list_request("services/service1")
767 if response.status_code != 404:
768 response = test_utils.service_delete_request("service1")
771 # pylint: disable=consider-using-f-string
772 for i in range(1, 6):
773 print("iteration number {}".format(i))
774 print("oc service creation")
775 self.test_36_create_oc_service1()
776 print("check xc in ROADMA01")
777 self.test_38_check_xc1_ROADMA()
778 print("check xc in ROADMC01")
779 self.test_39_check_xc1_ROADMC()
780 print("oc service deletion\n")
781 self.test_44_delete_oc_service1()
783 def test_51_disconnect_XPDRA(self):
784 response = test_utils.unmount_device("XPDRA01")
785 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
787 def test_52_disconnect_XPDRC(self):
788 response = test_utils.unmount_device("XPDRC01")
789 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
791 def test_53_disconnect_ROADMA(self):
792 response = test_utils.unmount_device("ROADMA01")
793 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
795 def test_54_disconnect_ROADMC(self):
796 response = test_utils.unmount_device("ROADMC01")
797 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
800 if __name__ == "__main__":
801 unittest.main(verbosity=2)