3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION = '2.2.1'
34 cr_serv_sample_data = {"input": {
35 "sdnc-request-header": {
36 "request-id": "request-1",
37 "rpc-action": "service-create",
38 "request-system-id": "appname"
40 "service-name": "service-OCH-OTU4-AB",
41 "common-id": "commonId",
42 "connection-type": "infrastructure",
44 "service-rate": "100",
45 "node-id": "SPDR-SA1",
46 "service-format": "OTU",
47 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
51 "committed-info-rate": "100000",
52 "committed-burst-size": "64"
57 "port-device-name": "SPDR-SA1-XPDR2",
59 "port-name": "XPDR2-NETWORK1",
60 "port-rack": "000000.00",
61 "port-shelf": "Chassis#1"
64 "lgx-device-name": "Some lgx-device-name",
65 "lgx-port-name": "Some lgx-port-name",
66 "lgx-port-rack": "000000.00",
67 "lgx-port-shelf": "00"
72 "port-device-name": "SPDR-SA1-XPDR2",
74 "port-name": "XPDR2-NETWORK1",
75 "port-rack": "000000.00",
76 "port-shelf": "Chassis#1"
79 "lgx-device-name": "Some lgx-device-name",
80 "lgx-port-name": "Some lgx-port-name",
81 "lgx-port-rack": "000000.00",
82 "lgx-port-shelf": "00"
88 "service-rate": "100",
89 "node-id": "SPDR-SB1",
90 "service-format": "OTU",
91 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
95 "committed-info-rate": "100000",
96 "committed-burst-size": "64"
101 "port-device-name": "SPDR-SB1-XPDR2",
102 "port-type": "fixed",
103 "port-name": "XPDR2-NETWORK1",
104 "port-rack": "000000.00",
105 "port-shelf": "Chassis#1"
108 "lgx-device-name": "Some lgx-device-name",
109 "lgx-port-name": "Some lgx-port-name",
110 "lgx-port-rack": "000000.00",
111 "lgx-port-shelf": "00"
116 "port-device-name": "SPDR-SB1-XPDR2",
117 "port-type": "fixed",
118 "port-name": "XPDR2-NETWORK1",
119 "port-rack": "000000.00",
120 "port-shelf": "Chassis#1"
123 "lgx-device-name": "Some lgx-device-name",
124 "lgx-port-name": "Some lgx-port-name",
125 "lgx-port-rack": "000000.00",
126 "lgx-port-shelf": "00"
131 "due-date": "2018-06-15T00:00:01Z",
132 "operator-contact": "pw1234"
138 cls.processes = test_utils.start_tpce()
139 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140 ('spdrb', cls.NODE_VERSION),
141 ('spdrc', cls.NODE_VERSION),
142 ('roadma', cls.NODE_VERSION),
143 ('roadmb', cls.NODE_VERSION),
144 ('roadmc', cls.NODE_VERSION)])
147 def tearDownClass(cls):
148 # pylint: disable=not-an-iterable
149 for process in cls.processes:
150 test_utils.shutdown_process(process)
151 print("all processes killed")
156 def test_001_connect_spdrA(self):
157 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_002_connect_spdrB(self):
162 response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_003_connect_spdrC(self):
167 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
168 self.assertEqual(response.status_code,
169 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
171 def test_004_connect_rdmA(self):
172 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
173 self.assertEqual(response.status_code,
174 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
176 def test_005_connect_rdmB(self):
177 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
178 self.assertEqual(response.status_code,
179 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
181 def test_006_connect_rdmC(self):
182 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
183 self.assertEqual(response.status_code,
184 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
186 def test_007_connect_sprdA_2_N1_to_roadmA_PP3(self):
187 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
188 "ROADM-A1", "1", "SRG1-PP3-TXRX")
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Xponder Roadm Link created successfully',
192 res["output"]["result"])
195 def test_008_connect_roadmA_PP3_to_spdrA_2_N1(self):
196 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
197 "ROADM-A1", "1", "SRG1-PP3-TXRX")
198 self.assertEqual(response.status_code, requests.codes.ok)
199 res = response.json()
200 self.assertIn('Roadm Xponder links created successfully',
201 res["output"]["result"])
204 def test_009_connect_sprdC_2_N1_to_roadmC_PP3(self):
205 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
206 "ROADM-C1", "1", "SRG1-PP3-TXRX")
207 self.assertEqual(response.status_code, requests.codes.ok)
208 res = response.json()
209 self.assertIn('Xponder Roadm Link created successfully',
210 res["output"]["result"])
213 def test_010_connect_roadmC_PP3_to_spdrC_2_N1(self):
214 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
215 "ROADM-C1", "1", "SRG1-PP3-TXRX")
216 self.assertEqual(response.status_code, requests.codes.ok)
217 res = response.json()
218 self.assertIn('Roadm Xponder links created successfully',
219 res["output"]["result"])
222 def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
223 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
224 "ROADM-B1", "1", "SRG1-PP1-TXRX")
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 self.assertIn('Xponder Roadm Link created successfully',
228 res["output"]["result"])
231 def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
232 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
233 "ROADM-B1", "1", "SRG1-PP1-TXRX")
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
236 self.assertIn('Roadm Xponder links created successfully',
237 res["output"]["result"])
240 def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
241 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
242 "ROADM-B1", "1", "SRG1-PP2-TXRX")
243 self.assertEqual(response.status_code, requests.codes.ok)
244 res = response.json()
245 self.assertIn('Xponder Roadm Link created successfully',
246 res["output"]["result"])
249 def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
250 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
251 "ROADM-B1", "1", "SRG1-PP2-TXRX")
252 self.assertEqual(response.status_code, requests.codes.ok)
253 res = response.json()
254 self.assertIn('Roadm Xponder links created successfully',
255 res["output"]["result"])
258 def test_015_add_omsAttributes_ROADMA_ROADMB(self):
259 # Config ROADMA-ROADMB oms-attributes
261 "auto-spanloss": "true",
262 "spanloss-base": 11.4,
263 "spanloss-current": 12,
264 "engineered-spanloss": 12.2,
265 "link-concatenation": [{
268 "SRLG-length": 100000,
270 response = test_utils.add_oms_attr_request(
271 "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
272 self.assertEqual(response.status_code, requests.codes.created)
274 def test_016_add_omsAttributes_ROADMB_ROADMA(self):
275 # Config ROADMB-ROADMA oms-attributes
277 "auto-spanloss": "true",
278 "spanloss-base": 11.4,
279 "spanloss-current": 12,
280 "engineered-spanloss": 12.2,
281 "link-concatenation": [{
284 "SRLG-length": 100000,
286 response = test_utils.add_oms_attr_request(
287 "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
288 self.assertEqual(response.status_code, requests.codes.created)
290 def test_017_add_omsAttributes_ROADMB_ROADMC(self):
291 # Config ROADMB-ROADMC oms-attributes
293 "auto-spanloss": "true",
294 "spanloss-base": 11.4,
295 "spanloss-current": 12,
296 "engineered-spanloss": 12.2,
297 "link-concatenation": [{
300 "SRLG-length": 100000,
302 response = test_utils.add_oms_attr_request(
303 "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
304 self.assertEqual(response.status_code, requests.codes.created)
306 def test_018_add_omsAttributes_ROADMC_ROADMB(self):
307 # Config ROADMC-ROADMB oms-attributes
309 "auto-spanloss": "true",
310 "spanloss-base": 11.4,
311 "spanloss-current": 12,
312 "engineered-spanloss": 12.2,
313 "link-concatenation": [{
316 "SRLG-length": 100000,
318 response = test_utils.add_oms_attr_request(
319 "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
320 self.assertEqual(response.status_code, requests.codes.created)
322 def test_019_create_OTS_ROADMA_DEG1(self):
323 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
325 self.assertEqual(response.status_code, requests.codes.ok)
326 res = response.json()
327 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
328 res["output"]["result"])
330 def test_020_create_OTS_ROADMB_DEG1(self):
331 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
336 res["output"]["result"])
338 def test_021_create_OTS_ROADMB_DEG2(self):
339 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
343 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
344 res["output"]["result"])
346 def test_022_create_OTS_ROADMC_DEG2(self):
347 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
352 res["output"]["result"])
354 def test_023_calculate_span_loss_base_all(self):
355 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
361 response = test_utils.post_request(url, data)
362 self.assertEqual(response.status_code, requests.codes.ok)
363 res = response.json()
364 self.assertIn('Success',
365 res["output"]["result"])
368 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
369 }, res["output"]["spans"])
372 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
373 }, res["output"]["spans"])
376 "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
377 }, res["output"]["spans"])
380 "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
381 }, res["output"]["spans"])
384 "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
385 }, res["output"]["spans"])
388 "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
389 }, res["output"]["spans"])
392 def test_024_check_otn_topology(self):
393 response = test_utils.get_otn_topo_request()
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
396 nbNode = len(res['network'][0]['node'])
397 self.assertEqual(nbNode, 9, 'There should be 9 nodes')
398 self.assertNotIn('ietf-network-topology:link', res['network'][0],
399 'otn-topology should have no link')
401 # test service-create for OCH-OTU4 service from spdrA to spdrB
402 def test_025_create_OCH_OTU4_service_AB(self):
403 response = test_utils.service_create_request(self.cr_serv_sample_data)
404 self.assertEqual(response.status_code, requests.codes.ok)
405 res = response.json()
406 self.assertIn('PCE calculation in progress',
407 res['output']['configuration-response-common']['response-message'])
408 time.sleep(self.WAITING)
410 def test_026_get_OCH_OTU4_service_AB(self):
411 response = test_utils.get_service_list_request(
412 "services/service-OCH-OTU4-AB")
413 self.assertEqual(response.status_code, requests.codes.ok)
414 res = response.json()
416 res['services'][0]['administrative-state'], 'inService')
418 res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
420 res['services'][0]['connection-type'], 'infrastructure')
422 res['services'][0]['lifecycle-state'], 'planned')
425 # Check correct configuration of devices
426 def test_027_check_interface_och_spdra(self):
427 response = test_utils.check_netconf_node_request(
428 "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
429 self.assertEqual(response.status_code, requests.codes.ok)
430 res = response.json()
431 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
432 'administrative-state': 'inService',
433 'supporting-circuit-pack-name': 'CP5-CFP',
434 'type': 'org-openroadm-interfaces:opticalChannel',
435 'supporting-port': 'CP5-CFP-P1'
436 }, **res['interface'][0]),
439 self.assertDictEqual(
440 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
441 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
442 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
444 def test_028_check_interface_OTU4_spdra(self):
445 response = test_utils.check_netconf_node_request(
446 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
447 self.assertEqual(response.status_code, requests.codes.ok)
448 res = response.json()
449 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
450 'administrative-state': 'inService',
451 'supporting-circuit-pack-name': 'CP5-CFP',
452 'supporting-interface': 'XPDR2-NETWORK1-761:768',
453 'type': 'org-openroadm-interfaces:otnOtu',
454 'supporting-port': 'CP5-CFP-P1'
456 input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
457 'expected-dapi': 'AOQxIv+6nCD+',
458 'tx-dapi': 'X+8cRNi+HbE=',
459 'expected-sapi': 'X+8cRNi+HbE=',
460 'rate': 'org-openroadm-otn-common-types:OTU4',
463 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
466 self.assertDictEqual(input_dict_2,
468 ['org-openroadm-otn-otu-interfaces:otu'])
469 response2 = test_utils.check_netconf_node_request(
470 "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU/otu")
471 self.assertEqual(response2.status_code, requests.codes.ok)
472 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
473 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
474 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
475 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
476 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
478 def test_029_check_interface_och_spdrB(self):
479 response = test_utils.check_netconf_node_request(
480 "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
481 self.assertEqual(response.status_code, requests.codes.ok)
482 res = response.json()
483 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
484 'administrative-state': 'inService',
485 'supporting-circuit-pack-name': 'CP5-CFP',
486 'type': 'org-openroadm-interfaces:opticalChannel',
487 'supporting-port': 'CP5-CFP-P1'
488 }, **res['interface'][0]),
491 self.assertDictEqual(
492 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
493 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
494 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
496 def test_030_check_interface_OTU4_spdrB(self):
497 response = test_utils.check_netconf_node_request(
498 "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
499 self.assertEqual(response.status_code, requests.codes.ok)
500 res = response.json()
501 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
502 'administrative-state': 'inService',
503 'supporting-circuit-pack-name': 'CP5-CFP',
504 'supporting-interface': 'XPDR2-NETWORK1-761:768',
505 'type': 'org-openroadm-interfaces:otnOtu',
506 'supporting-port': 'CP5-CFP-P1'
508 input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
509 'expected-sapi': 'AOQxIv+6nCD+',
510 'tx-sapi': 'X+8cRNi+HbE=',
511 'expected-dapi': 'X+8cRNi+HbE=',
512 'rate': 'org-openroadm-otn-common-types:OTU4',
516 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
519 self.assertDictEqual(input_dict_2,
521 ['org-openroadm-otn-otu-interfaces:otu'])
522 response2 = test_utils.check_netconf_node_request(
523 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU/otu")
524 self.assertEqual(response.status_code, requests.codes.ok)
525 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
526 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
527 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
528 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
529 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
531 def test_031_check_no_interface_ODU4_spdra(self):
532 response = test_utils.check_netconf_node_request(
533 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
534 self.assertEqual(response.status_code, requests.codes.conflict)
535 res = response.json()
537 {"error-type": "application", "error-tag": "data-missing",
538 "error-message": "Request could not be completed because the relevant data model content does not exist"},
539 res['errors']['error'])
541 def test_032_check_openroadm_topo_spdra(self):
542 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
543 self.assertEqual(response.status_code, requests.codes.ok)
544 res = response.json()
545 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
546 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
547 self.assertEqual({'frequency': 196.1,
549 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
550 self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
551 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
554 def test_033_check_openroadm_topo_ROADMA_SRG(self):
555 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
556 self.assertEqual(response.status_code, requests.codes.ok)
557 res = response.json()
558 freq_map = base64.b64decode(
559 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
560 freq_map_array = [int(x) for x in freq_map]
561 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
562 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
564 if ele['tp-id'] == 'SRG1-PP3-TXRX':
565 freq_map = base64.b64decode(
566 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
567 freq_map_array = [int(x) for x in freq_map]
568 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
569 if ele['tp-id'] == 'SRG1-PP2-TXRX':
570 self.assertNotIn('avail-freq-maps', dict.keys(ele))
573 def test_034_check_openroadm_topo_ROADMA_DEG1(self):
574 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
575 self.assertEqual(response.status_code, requests.codes.ok)
576 res = response.json()
577 freq_map = base64.b64decode(
578 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
579 freq_map_array = [int(x) for x in freq_map]
580 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
581 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
583 if ele['tp-id'] == 'DEG1-CTP-TXRX':
584 freq_map = base64.b64decode(
585 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
586 freq_map_array = [int(x) for x in freq_map]
587 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
588 if ele['tp-id'] == 'DEG1-TTP-TXRX':
589 freq_map = base64.b64decode(
590 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
591 freq_map_array = [int(x) for x in freq_map]
592 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
595 def test_035_check_otn_topo_otu4_links(self):
596 response = test_utils.get_otn_topo_request()
597 self.assertEqual(response.status_code, requests.codes.ok)
598 res = response.json()
599 nb_links = len(res['network'][0]['ietf-network-topology:link'])
600 self.assertEqual(nb_links, 2)
601 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
602 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
603 for link in res['network'][0]['ietf-network-topology:link']:
604 self.assertIn(link['link-id'], listLinkId)
606 link['transportpce-topology:otn-link-type'], 'OTU4')
608 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
610 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
612 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
614 link['org-openroadm-common-network:opposite-link'], listLinkId)
617 # test service-create for OCH-OTU4 service from spdrB to spdrC
619 def test_036_create_OCH_OTU4_service_BC(self):
620 # pylint: disable=line-too-long
621 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
622 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
623 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
624 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
625 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
626 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
627 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK2"
628 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
629 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
630 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
631 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
632 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
633 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
635 response = test_utils.service_create_request(self.cr_serv_sample_data)
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 self.assertIn('PCE calculation in progress',
639 res['output']['configuration-response-common']['response-message'])
640 time.sleep(self.WAITING)
642 def test_037_get_OCH_OTU4_service_BC(self):
643 response = test_utils.get_service_list_request(
644 "services/service-OCH-OTU4-BC")
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
648 res['services'][0]['administrative-state'], 'inService')
650 res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
652 res['services'][0]['connection-type'], 'infrastructure')
654 res['services'][0]['lifecycle-state'], 'planned')
657 # Check correct configuration of devices
658 def test_038_check_interface_och_spdrB(self):
659 response = test_utils.check_netconf_node_request(
660 "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
664 'administrative-state': 'inService',
665 'supporting-circuit-pack-name': 'CP6-CFP',
666 'type': 'org-openroadm-interfaces:opticalChannel',
667 'supporting-port': 'CP1-CFP0-P1'
668 }, **res['interface'][0]),
671 self.assertDictEqual(
672 {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
673 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
674 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
676 def test_039_check_interface_OTU4_spdrB(self):
677 response = test_utils.check_netconf_node_request(
678 "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
679 self.assertEqual(response.status_code, requests.codes.ok)
680 res = response.json()
681 input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
682 'administrative-state': 'inService',
683 'supporting-circuit-pack-name': 'CP6-CFP',
684 'supporting-interface': 'XPDR2-NETWORK1-753:760',
685 'type': 'org-openroadm-interfaces:otnOtu',
686 'supporting-port': 'CP6-CFP-P1'
688 input_dict_2 = {'tx-sapi': 'X+8cRNi+HbI=',
689 'expected-dapi': 'X+8cRNi+HbI=',
690 'tx-dapi': 'ALvne1QI5jo4',
691 'expected-sapi': 'ALvne1QI5jo4',
692 'rate': 'org-openroadm-otn-common-types:OTU4',
695 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
698 self.assertDictEqual(input_dict_2,
700 ['org-openroadm-otn-otu-interfaces:otu'])
701 response2 = test_utils.check_netconf_node_request(
702 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU/otu")
703 self.assertEqual(response.status_code, requests.codes.ok)
704 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
705 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
706 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
707 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
708 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
710 def test_040_check_interface_och_spdrC(self):
711 response = test_utils.check_netconf_node_request(
712 "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
713 self.assertEqual(response.status_code, requests.codes.ok)
714 res = response.json()
715 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
716 'administrative-state': 'inService',
717 'supporting-circuit-pack-name': 'CP5-CFP',
718 'type': 'org-openroadm-interfaces:opticalChannel',
719 'supporting-port': 'CP5-CFP-P1'
720 }, **res['interface'][0]),
723 self.assertDictEqual(
724 {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
725 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
726 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
728 def test_041_check_interface_OTU4_spdrC(self):
729 response = test_utils.check_netconf_node_request(
730 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
731 self.assertEqual(response.status_code, requests.codes.ok)
732 res = response.json()
733 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
734 'administrative-state': 'inService',
735 'supporting-circuit-pack-name': 'CP5-CFP',
736 'supporting-interface': 'XPDR2-NETWORK1-753:760',
737 'type': 'org-openroadm-interfaces:otnOtu',
738 'supporting-port': 'CP5-CFP-P1'
740 input_dict_2 = {'tx-dapi': 'X+8cRNi+HbI=',
741 'expected-sapi': 'X+8cRNi+HbI=',
742 'tx-sapi': 'ALvne1QI5jo4',
743 'expected-dapi': 'ALvne1QI5jo4',
744 'rate': 'org-openroadm-otn-common-types:OTU4',
748 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
751 self.assertDictEqual(input_dict_2,
753 ['org-openroadm-otn-otu-interfaces:otu'])
754 response2 = test_utils.check_netconf_node_request(
755 "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU/otu")
756 self.assertEqual(response.status_code, requests.codes.ok)
757 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
758 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
759 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
760 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
761 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
763 def test_042_check_no_interface_ODU4_spdrB(self):
764 response = test_utils.check_netconf_node_request(
765 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
766 self.assertEqual(response.status_code, requests.codes.conflict)
767 res = response.json()
769 {"error-type": "application", "error-tag": "data-missing",
770 "error-message": "Request could not be completed because the relevant data model content does not exist"},
771 res['errors']['error'])
773 def test_043_check_openroadm_topo_spdrB(self):
774 response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
775 self.assertEqual(response.status_code, requests.codes.ok)
776 res = response.json()
777 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
778 # pylint: disable=consider-using-f-string
780 if ele['tp-id'] == 'XPDR2-NETWORK1':
781 self.assertEqual({'frequency': 196.1,
783 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
784 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
785 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
786 elif ele['tp-id'] == 'XPDR2-NETWORK2':
787 self.assertEqual({'frequency': 196.05,
789 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
790 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
791 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
793 print("ele = {}".format(ele))
794 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
797 def test_044_check_openroadm_topo_ROADMB_SRG1(self):
798 response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
799 self.assertEqual(response.status_code, requests.codes.ok)
800 res = response.json()
801 freq_map = base64.b64decode(
802 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
803 freq_map_array = [int(x) for x in freq_map]
804 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
805 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
806 self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
807 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
809 if ele['tp-id'] == 'SRG1-PP1-TXRX':
810 freq_map = base64.b64decode(
811 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
812 freq_map_array = [int(x) for x in freq_map]
813 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
814 if ele['tp-id'] == 'SRG1-PP2-TXRX':
815 freq_map = base64.b64decode(
816 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
817 freq_map_array = [int(x) for x in freq_map]
818 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
819 if ele['tp-id'] == 'SRG1-PP3-TXRX':
820 self.assertNotIn('avail-freq-maps', dict.keys(ele))
823 def test_045_check_openroadm_topo_ROADMB_DEG2(self):
824 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
825 self.assertEqual(response.status_code, requests.codes.ok)
826 res = response.json()
827 freq_map = base64.b64decode(
828 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
829 freq_map_array = [int(x) for x in freq_map]
830 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
831 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
833 if ele['tp-id'] == 'DEG2-CTP-TXRX':
834 freq_map = base64.b64decode(
835 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
836 freq_map_array = [int(x) for x in freq_map]
837 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
838 if ele['tp-id'] == 'DEG2-TTP-TXRX':
839 freq_map = base64.b64decode(
840 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
841 freq_map_array = [int(x) for x in freq_map]
842 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
845 def test_046_check_otn_topo_otu4_links(self):
846 response = test_utils.get_otn_topo_request()
847 self.assertEqual(response.status_code, requests.codes.ok)
848 res = response.json()
849 nb_links = len(res['network'][0]['ietf-network-topology:link'])
850 self.assertEqual(nb_links, 4)
851 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
852 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
853 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
854 'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
855 for link in res['network'][0]['ietf-network-topology:link']:
856 self.assertIn(link['link-id'], listLinkId)
858 link['transportpce-topology:otn-link-type'], 'OTU4')
860 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
862 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
864 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
866 link['org-openroadm-common-network:opposite-link'], listLinkId)
868 # test service-create for 100GE service from spdrA to spdrC via spdrB
869 def test_047_create_100GE_service_ABC(self):
870 # pylint: disable=line-too-long
871 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
872 self.cr_serv_sample_data["input"]["connection-type"] = "service"
873 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
874 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
875 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
876 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
877 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
878 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
879 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
880 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
881 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
882 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
883 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
884 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
885 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
886 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
887 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
888 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
890 response = test_utils.service_create_request(self.cr_serv_sample_data)
891 self.assertEqual(response.status_code, requests.codes.ok)
892 res = response.json()
893 self.assertIn('PCE calculation in progress',
894 res['output']['configuration-response-common']['response-message'])
895 time.sleep(self.WAITING)
897 def test_048_get_100GE_service_ABC(self):
898 response = test_utils.get_service_list_request(
899 "services/service-100GE-ABC")
900 self.assertEqual(response.status_code, requests.codes.ok)
901 res = response.json()
903 res['services'][0]['administrative-state'], 'inService')
905 res['services'][0]['service-name'], 'service-100GE-ABC')
907 res['services'][0]['connection-type'], 'service')
909 res['services'][0]['lifecycle-state'], 'planned')
912 def test_049_check_interface_100GE_CLIENT_spdra(self):
913 response = test_utils.check_netconf_node_request(
914 "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
915 self.assertEqual(response.status_code, requests.codes.ok)
916 res = response.json()
917 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
918 'administrative-state': 'inService',
919 'supporting-circuit-pack-name': 'CP2-QSFP1',
920 'type': 'org-openroadm-interfaces:ethernetCsmacd',
921 'supporting-port': 'CP2-QSFP1-P1'
923 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
925 self.assertDictEqual(
928 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
930 def test_050_check_interface_ODU4_CLIENT_spdra(self):
931 response = test_utils.check_netconf_node_request(
932 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
933 self.assertEqual(response.status_code, requests.codes.ok)
934 res = response.json()
935 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
936 'administrative-state': 'inService',
937 'supporting-circuit-pack-name': 'CP2-QSFP1',
938 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
939 'type': 'org-openroadm-interfaces:otnOdu',
940 'supporting-port': 'CP2-QSFP1-P1'}
942 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
943 'rate': 'org-openroadm-otn-common-types:ODU4',
944 'monitoring-mode': 'terminated',
945 'expected-dapi': 'AItaZ6nmyaKJ',
946 'expected-sapi': 'AKFnJJaijWiz',
947 'tx-dapi': 'AKFnJJaijWiz',
948 'tx-sapi': 'AItaZ6nmyaKJ'}
950 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
952 self.assertDictEqual(dict(input_dict_2,
953 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
954 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
955 self.assertDictEqual(
956 {'payload-type': '21', 'exp-payload-type': '21'},
957 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
958 response2 = test_utils.check_netconf_node_request(
959 "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
960 self.assertEqual(response.status_code, requests.codes.ok)
961 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
962 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
963 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
964 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
965 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
967 def test_051_check_interface_ODU4_NETWORK_spdra(self):
968 response = test_utils.check_netconf_node_request(
969 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
970 self.assertEqual(response.status_code, requests.codes.ok)
971 res = response.json()
972 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
973 'administrative-state': 'inService',
974 'supporting-circuit-pack-name': 'CP5-CFP',
975 'type': 'org-openroadm-interfaces:otnOdu',
976 'supporting-port': 'CP5-CFP-P1',
978 'description': 'TBD'}
980 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
981 'rate': 'org-openroadm-otn-common-types:ODU4',
982 'monitoring-mode': 'monitored'}
984 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
986 self.assertDictEqual(dict(input_dict_2,
987 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
988 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
989 self.assertNotIn('opu',
990 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
992 def test_052_check_ODU4_connection_spdra(self):
993 response = test_utils.check_netconf_node_request(
995 "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
996 self.assertEqual(response.status_code, requests.codes.ok)
997 res = response.json()
1000 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1001 'direction': 'bidirectional'
1004 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1005 res['odu-connection'][0])
1006 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1007 res['odu-connection'][0]['destination'])
1008 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1009 res['odu-connection'][0]['source'])
1011 def test_053_check_interface_100GE_CLIENT_spdrc(self):
1012 response = test_utils.check_netconf_node_request(
1013 "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
1014 self.assertEqual(response.status_code, requests.codes.ok)
1015 res = response.json()
1016 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
1017 'administrative-state': 'inService',
1018 'supporting-circuit-pack-name': 'CP2-QSFP1',
1019 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1020 'supporting-port': 'CP2-QSFP1-P1'
1022 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1023 res['interface'][0])
1024 self.assertDictEqual(
1027 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1029 def test_054_check_interface_ODU4_CLIENT_spdrc(self):
1030 response = test_utils.check_netconf_node_request(
1031 "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
1032 self.assertEqual(response.status_code, requests.codes.ok)
1033 res = response.json()
1034 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
1035 'administrative-state': 'inService',
1036 'supporting-circuit-pack-name': 'CP2-QSFP1',
1037 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
1038 'type': 'org-openroadm-interfaces:otnOdu',
1039 'supporting-port': 'CP2-QSFP1-P1',
1040 'circuit-id': 'TBD',
1041 'description': 'TBD'}
1043 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1044 'rate': 'org-openroadm-otn-common-types:ODU4',
1045 'monitoring-mode': 'terminated',
1046 'expected-dapi': 'AKFnJJaijWiz',
1047 'expected-sapi': 'AItaZ6nmyaKJ',
1048 'tx-dapi': 'AItaZ6nmyaKJ',
1049 'tx-sapi': 'AKFnJJaijWiz'}
1051 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1052 res['interface'][0])
1053 self.assertDictEqual(dict(input_dict_2,
1054 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1055 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1056 self.assertDictEqual(
1057 {'payload-type': '21', 'exp-payload-type': '21'},
1058 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1059 response2 = test_utils.check_netconf_node_request(
1060 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1061 self.assertEqual(response.status_code, requests.codes.ok)
1062 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1063 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1064 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1065 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1066 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1068 def test_055_check_interface_ODU4_NETWORK_spdrc(self):
1069 response = test_utils.check_netconf_node_request(
1070 "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
1071 self.assertEqual(response.status_code, requests.codes.ok)
1072 res = response.json()
1073 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1074 'administrative-state': 'inService',
1075 'supporting-circuit-pack-name': 'CP5-CFP',
1076 'type': 'org-openroadm-interfaces:otnOdu',
1077 'supporting-port': 'CP5-CFP-P1',
1078 'circuit-id': 'TBD',
1079 'description': 'TBD'}
1081 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1082 'rate': 'org-openroadm-otn-common-types:ODU4',
1083 'monitoring-mode': 'monitored'}
1085 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1086 res['interface'][0])
1087 self.assertDictEqual(dict(input_dict_2,
1088 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1089 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1090 self.assertNotIn('opu',
1091 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1093 def test_056_check_ODU4_connection_spdrc(self):
1094 response = test_utils.check_netconf_node_request(
1096 "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1097 self.assertEqual(response.status_code, requests.codes.ok)
1098 res = response.json()
1101 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1102 'direction': 'bidirectional'
1105 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1106 res['odu-connection'][0])
1107 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1108 res['odu-connection'][0]['destination'])
1109 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1110 res['odu-connection'][0]['source'])
1112 def test_057_check_interface_ODU4_NETWORK1_spdrb(self):
1113 response = test_utils.check_netconf_node_request(
1114 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1115 self.assertEqual(response.status_code, requests.codes.ok)
1116 res = response.json()
1117 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1118 'administrative-state': 'inService',
1119 'supporting-circuit-pack-name': 'CP5-CFP',
1120 'type': 'org-openroadm-interfaces:otnOdu',
1121 'supporting-port': 'CP5-CFP-P1'}
1123 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1124 'rate': 'org-openroadm-otn-common-types:ODU4',
1125 'monitoring-mode': 'monitored'}
1127 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1128 res['interface'][0])
1129 self.assertDictEqual(dict(input_dict_2,
1130 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1131 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1132 self.assertNotIn('opu',
1133 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1135 def test_058_check_interface_ODU4_NETWORK2_spdrb(self):
1136 response = test_utils.check_netconf_node_request(
1137 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1138 self.assertEqual(response.status_code, requests.codes.ok)
1139 res = response.json()
1140 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1141 'administrative-state': 'inService',
1142 'supporting-circuit-pack-name': 'CP6-CFP',
1143 'type': 'org-openroadm-interfaces:otnOdu',
1144 'supporting-port': 'CP6-CFP-P1'}
1146 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1147 'rate': 'org-openroadm-otn-common-types:ODU4',
1148 'monitoring-mode': 'monitored'}
1150 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1151 res['interface'][0])
1152 self.assertDictEqual(dict(input_dict_2,
1153 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1154 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1155 self.assertNotIn('opu',
1156 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1158 def test_059_check_ODU4_connection_spdrb(self):
1159 response = test_utils.check_netconf_node_request(
1161 "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1162 self.assertEqual(response.status_code, requests.codes.ok)
1163 res = response.json()
1166 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1167 'direction': 'bidirectional'
1170 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1171 res['odu-connection'][0])
1172 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1173 res['odu-connection'][0]['destination'])
1174 self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1175 res['odu-connection'][0]['source'])
1177 def test_060_check_otn_topo_links(self):
1178 response = test_utils.get_otn_topo_request()
1179 self.assertEqual(response.status_code, requests.codes.ok)
1180 res = response.json()
1181 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1182 self.assertEqual(nb_links, 4)
1183 for link in res['network'][0]['ietf-network-topology:link']:
1185 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1187 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1189 def test_061_delete_service_100GE_ABC(self):
1190 response = test_utils.service_delete_request("service-100GE-ABC")
1191 self.assertEqual(response.status_code, requests.codes.ok)
1192 res = response.json()
1193 self.assertIn('Renderer service delete in progress',
1194 res['output']['configuration-response-common']['response-message'])
1195 time.sleep(self.WAITING)
1197 def test_062_check_service_list(self):
1198 response = test_utils.get_service_list_request("")
1199 self.assertEqual(response.status_code, requests.codes.ok)
1200 res = response.json()
1201 self.assertEqual(len(res['service-list']['services']), 2)
1204 def test_063_check_no_ODU4_connection_spdra(self):
1205 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1206 self.assertEqual(response.status_code, requests.codes.ok)
1207 res = response.json()
1208 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1211 def test_064_check_no_interface_ODU4_NETWORK_spdra(self):
1212 response = test_utils.check_netconf_node_request(
1213 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1214 self.assertEqual(response.status_code, requests.codes.conflict)
1216 def test_065_check_no_interface_ODU4_CLIENT_spdra(self):
1217 response = test_utils.check_netconf_node_request(
1218 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1219 self.assertEqual(response.status_code, requests.codes.conflict)
1221 def test_066_check_no_interface_100GE_CLIENT_spdra(self):
1222 response = test_utils.check_netconf_node_request(
1223 "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1224 self.assertEqual(response.status_code, requests.codes.conflict)
1226 def test_067_check_otn_topo_links(self):
1227 self.test_046_check_otn_topo_otu4_links()
1229 def test_068_delete_OCH_OTU4_service_AB(self):
1230 response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1231 self.assertEqual(response.status_code, requests.codes.ok)
1232 res = response.json()
1233 self.assertIn('Renderer service delete in progress',
1234 res['output']['configuration-response-common']['response-message'])
1235 time.sleep(self.WAITING)
1237 def test_069_delete_OCH_OTU4_service_BC(self):
1238 response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1239 self.assertEqual(response.status_code, requests.codes.ok)
1240 res = response.json()
1241 self.assertIn('Renderer service delete in progress',
1242 res['output']['configuration-response-common']['response-message'])
1243 time.sleep(self.WAITING)
1245 def test_070_get_no_service(self):
1246 response = test_utils.get_service_list_request("")
1247 self.assertEqual(response.status_code, requests.codes.conflict)
1248 res = response.json()
1250 {"error-type": "application", "error-tag": "data-missing",
1251 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1252 res['errors']['error'])
1255 def test_071_check_no_interface_OTU4_spdra(self):
1256 response = test_utils.check_netconf_node_request(
1257 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1258 self.assertEqual(response.status_code, requests.codes.conflict)
1260 def test_072_check_no_interface_OCH_spdra(self):
1261 response = test_utils.check_netconf_node_request(
1262 "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1263 self.assertEqual(response.status_code, requests.codes.conflict)
1265 def test_073_getLinks_OtnTopology(self):
1266 response = test_utils.get_otn_topo_request()
1267 self.assertEqual(response.status_code, requests.codes.ok)
1268 res = response.json()
1269 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1271 def test_074_check_openroadm_topo_spdra(self):
1272 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1273 self.assertEqual(response.status_code, requests.codes.ok)
1274 res = response.json()
1275 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1276 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1277 self.assertNotIn('wavelength', dict.keys(
1278 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1281 def test_075_check_openroadm_topo_ROADMB_SRG1(self):
1282 response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1283 self.assertEqual(response.status_code, requests.codes.ok)
1284 res = response.json()
1285 freq_map = base64.b64decode(
1286 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1287 freq_map_array = [int(x) for x in freq_map]
1288 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1289 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1290 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1291 for ele in liste_tp:
1292 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1293 freq_map = base64.b64decode(
1294 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1295 freq_map_array = [int(x) for x in freq_map]
1296 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1297 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1298 freq_map = base64.b64decode(
1299 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1300 freq_map_array = [int(x) for x in freq_map]
1301 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1304 def test_076_check_openroadm_topo_ROADMB_DEG1(self):
1305 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1306 self.assertEqual(response.status_code, requests.codes.ok)
1307 res = response.json()
1308 freq_map = base64.b64decode(
1309 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1310 freq_map_array = [int(x) for x in freq_map]
1311 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1312 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1313 for ele in liste_tp:
1314 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1315 freq_map = base64.b64decode(
1316 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1317 freq_map_array = [int(x) for x in freq_map]
1318 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1319 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1320 freq_map = base64.b64decode(
1321 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1322 freq_map_array = [int(x) for x in freq_map]
1323 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1326 def test_077_check_openroadm_topo_ROADMB_DEG2(self):
1327 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1328 self.assertEqual(response.status_code, requests.codes.ok)
1329 res = response.json()
1330 freq_map = base64.b64decode(
1331 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1332 freq_map_array = [int(x) for x in freq_map]
1333 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1334 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1335 for ele in liste_tp:
1336 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1337 freq_map = base64.b64decode(
1338 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1339 freq_map_array = [int(x) for x in freq_map]
1340 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1341 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1342 freq_map = base64.b64decode(
1343 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1344 freq_map_array = [int(x) for x in freq_map]
1345 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1348 def test_078_disconnect_xponders_from_roadm(self):
1349 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1350 response = test_utils.get_ordm_topo_request("")
1351 self.assertEqual(response.status_code, requests.codes.ok)
1352 res = response.json()
1353 links = res['network'][0]['ietf-network-topology:link']
1355 if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
1356 and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1357 link_name = link["link-id"]
1358 response = test_utils.delete_request(url+link_name)
1359 self.assertEqual(response.status_code, requests.codes.ok)
1361 def test_079_disconnect_spdrB(self):
1362 response = test_utils.unmount_device("SPDR-SB1")
1363 self.assertEqual(response.status_code, requests.codes.ok,
1364 test_utils.CODE_SHOULD_BE_200)
1366 def test_080_disconnect_roadmB(self):
1367 response = test_utils.unmount_device("ROADM-B1")
1368 self.assertEqual(response.status_code, requests.codes.ok,
1369 test_utils.CODE_SHOULD_BE_200)
1371 def test_081_remove_roadm_to_roadm_links(self):
1372 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1373 response = test_utils.get_ordm_topo_request("")
1374 self.assertEqual(response.status_code, requests.codes.ok)
1375 res = response.json()
1376 links = res['network'][0]['ietf-network-topology:link']
1378 if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1379 and 'ROADM-B1' in link['link-id']):
1380 link_name = link["link-id"]
1381 response = test_utils.delete_request(url+link_name)
1382 self.assertEqual(response.status_code, requests.codes.ok)
1384 def test_082_add_omsAttributes_ROADMA_ROADMC(self):
1385 # Config ROADMA-ROADMC oms-attributes
1387 "auto-spanloss": "true",
1388 "spanloss-base": 11.4,
1389 "spanloss-current": 12,
1390 "engineered-spanloss": 12.2,
1391 "link-concatenation": [{
1393 "fiber-type": "smf",
1394 "SRLG-length": 100000,
1396 response = test_utils.add_oms_attr_request(
1397 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1398 self.assertEqual(response.status_code, requests.codes.created)
1400 def test_083_add_omsAttributes_ROADMC_ROADMA(self):
1401 # Config ROADMC-ROADMA oms-attributes
1403 "auto-spanloss": "true",
1404 "spanloss-base": 11.4,
1405 "spanloss-current": 12,
1406 "engineered-spanloss": 12.2,
1407 "link-concatenation": [{
1409 "fiber-type": "smf",
1410 "SRLG-length": 100000,
1412 response = test_utils.add_oms_attr_request(
1413 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1414 self.assertEqual(response.status_code, requests.codes.created)
1416 def test_084_create_OCH_OTU4_service_AC(self):
1417 # pylint: disable=line-too-long
1418 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1419 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1420 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1421 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1422 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1423 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1424 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1425 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1426 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1427 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1428 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1429 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1430 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1431 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1432 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-NETWORK1"
1433 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1434 response = test_utils.service_create_request(self.cr_serv_sample_data)
1435 self.assertEqual(response.status_code, requests.codes.ok)
1436 res = response.json()
1437 self.assertIn('PCE calculation in progress',
1438 res['output']['configuration-response-common']['response-message'])
1439 time.sleep(self.WAITING)
1441 def test_085_get_OCH_OTU4_service_AC(self):
1442 response = test_utils.get_service_list_request(
1443 "services/service-OCH-OTU4-AC")
1444 self.assertEqual(response.status_code, requests.codes.ok)
1445 res = response.json()
1447 res['services'][0]['administrative-state'], 'inService')
1449 res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1451 res['services'][0]['connection-type'], 'infrastructure')
1453 res['services'][0]['lifecycle-state'], 'planned')
1456 # test service-create for 100GE service from spdrA to spdrC via spdrB
1457 def test_086_create_100GE_service_AC(self):
1458 # pylint: disable=line-too-long
1459 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1460 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1461 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1462 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1463 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1464 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1465 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1466 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1467 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1468 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1469 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1470 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1471 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1472 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1473 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1474 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1475 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1476 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR2-CLIENT1"
1478 response = test_utils.service_create_request(self.cr_serv_sample_data)
1479 self.assertEqual(response.status_code, requests.codes.ok)
1480 res = response.json()
1481 self.assertIn('PCE calculation in progress',
1482 res['output']['configuration-response-common']['response-message'])
1483 time.sleep(self.WAITING)
1485 def test_087_get_100GE_service_AC(self):
1486 response = test_utils.get_service_list_request("services/service-100GE-AC")
1487 self.assertEqual(response.status_code, requests.codes.ok)
1488 res = response.json()
1490 res['services'][0]['administrative-state'], 'inService')
1492 res['services'][0]['service-name'], 'service-100GE-AC')
1494 res['services'][0]['connection-type'], 'service')
1496 res['services'][0]['lifecycle-state'], 'planned')
1499 def test_088_check_interface_OTU4_spdra(self):
1500 response = test_utils.check_netconf_node_request(
1501 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1502 self.assertEqual(response.status_code, requests.codes.ok)
1503 res = response.json()
1504 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1505 'administrative-state': 'inService',
1506 'supporting-circuit-pack-name': 'CP5-CFP',
1507 'supporting-interface': 'XPDR2-NETWORK1-761:768',
1508 'type': 'org-openroadm-interfaces:otnOtu',
1509 'supporting-port': 'CP5-CFP-P1'
1511 input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
1512 'expected-dapi': 'AOQxIv+6nCD+',
1513 'tx-dapi': 'ALvne1QI5jo4',
1514 'expected-sapi': 'ALvne1QI5jo4',
1515 'rate': 'org-openroadm-otn-common-types:OTU4',
1518 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1519 res['interface'][0])
1521 self.assertDictEqual(input_dict_2,
1523 ['org-openroadm-otn-otu-interfaces:otu'])
1524 response2 = test_utils.check_netconf_node_request(
1525 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU/otu")
1526 self.assertEqual(response.status_code, requests.codes.ok)
1527 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
1528 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1529 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1530 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1531 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1533 def test_089_check_interface_OTU4_spdrC(self):
1534 response = test_utils.check_netconf_node_request(
1535 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
1536 self.assertEqual(response.status_code, requests.codes.ok)
1537 res = response.json()
1538 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1539 'administrative-state': 'inService',
1540 'supporting-circuit-pack-name': 'CP5-CFP',
1541 'supporting-interface': 'XPDR2-NETWORK1-753:760',
1542 'type': 'org-openroadm-interfaces:otnOtu',
1543 'supporting-port': 'CP5-CFP-P1'
1545 input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
1546 'expected-sapi': 'AOQxIv+6nCD+',
1547 'tx-sapi': 'ALvne1QI5jo4',
1548 'expected-dapi': 'ALvne1QI5jo4',
1549 'rate': 'org-openroadm-otn-common-types:OTU4',
1553 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1554 res['interface'][0])
1555 self.assertDictEqual(input_dict_2,
1557 ['org-openroadm-otn-otu-interfaces:otu'])
1558 response2 = test_utils.check_netconf_node_request(
1559 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU/otu")
1560 self.assertEqual(response.status_code, requests.codes.ok)
1561 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
1562 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1563 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1564 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1565 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1567 def test_090_check_configuration_spdra(self):
1568 self.test_049_check_interface_100GE_CLIENT_spdra()
1569 self.test_050_check_interface_ODU4_CLIENT_spdra()
1570 self.test_051_check_interface_ODU4_NETWORK_spdra()
1571 self.test_052_check_ODU4_connection_spdra()
1573 def test_091_check_configuration_spdrc(self):
1574 self.test_053_check_interface_100GE_CLIENT_spdrc()
1575 self.test_054_check_interface_ODU4_CLIENT_spdrc()
1576 self.test_055_check_interface_ODU4_NETWORK_spdrc()
1577 self.test_056_check_ODU4_connection_spdrc()
1579 def test_092_check_otn_topo_links(self):
1580 response = test_utils.get_otn_topo_request()
1581 self.assertEqual(response.status_code, requests.codes.ok)
1582 res = response.json()
1583 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1584 self.assertEqual(nb_links, 2)
1585 for link in res['network'][0]['ietf-network-topology:link']:
1587 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1589 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1591 def test_093_delete_100GE_service_AC(self):
1592 response = test_utils.service_delete_request("service-100GE-AC")
1593 self.assertEqual(response.status_code, requests.codes.ok)
1594 res = response.json()
1595 self.assertIn('Renderer service delete in progress',
1596 res['output']['configuration-response-common']['response-message'])
1597 time.sleep(self.WAITING)
1599 def test_094_check_service_list(self):
1600 response = test_utils.get_service_list_request("")
1601 self.assertEqual(response.status_code, requests.codes.ok)
1602 res = response.json()
1603 self.assertEqual(len(res['service-list']['services']), 1)
1606 def test_095_check_configuration_spdra(self):
1607 self.test_063_check_no_ODU4_connection_spdra()
1608 self.test_064_check_no_interface_ODU4_NETWORK_spdra()
1609 self.test_065_check_no_interface_ODU4_CLIENT_spdra()
1610 self.test_066_check_no_interface_100GE_CLIENT_spdra()
1612 def test_096_check_otn_topo_links(self):
1613 response = test_utils.get_otn_topo_request()
1614 self.assertEqual(response.status_code, requests.codes.ok)
1615 res = response.json()
1616 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1617 self.assertEqual(nb_links, 2)
1618 for link in res['network'][0]['ietf-network-topology:link']:
1620 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1622 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1624 def test_097_disconnect_xponders_from_roadm(self):
1625 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1626 response = test_utils.get_ordm_topo_request("")
1627 self.assertEqual(response.status_code, requests.codes.ok)
1628 res = response.json()
1629 links = res['network'][0]['ietf-network-topology:link']
1631 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1632 link_name = link["link-id"]
1633 response = test_utils.delete_request(url+link_name)
1634 self.assertEqual(response.status_code, requests.codes.ok)
1636 def test_098_disconnect_spdrA(self):
1637 response = test_utils.unmount_device("SPDR-SA1")
1638 self.assertEqual(response.status_code, requests.codes.ok,
1639 test_utils.CODE_SHOULD_BE_200)
1641 def test_099_disconnect_spdrC(self):
1642 response = test_utils.unmount_device("SPDR-SC1")
1643 self.assertEqual(response.status_code, requests.codes.ok,
1644 test_utils.CODE_SHOULD_BE_200)
1646 def test_100_disconnect_roadmA(self):
1647 response = test_utils.unmount_device("ROADM-A1")
1648 self.assertEqual(response.status_code, requests.codes.ok,
1649 test_utils.CODE_SHOULD_BE_200)
1651 def test_101_disconnect_roadmC(self):
1652 response = test_utils.unmount_device("ROADM-C1")
1653 self.assertEqual(response.status_code, requests.codes.ok,
1654 test_utils.CODE_SHOULD_BE_200)
1657 if __name__ == "__main__":
1658 unittest.main(verbosity=2)