3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION = '2.2.1'
34 cr_serv_sample_data = {"input": {
35 "sdnc-request-header": {
36 "request-id": "request-1",
37 "rpc-action": "service-create",
38 "request-system-id": "appname"
40 "service-name": "service-OCH-OTU4-AB",
41 "common-id": "commonId",
42 "connection-type": "infrastructure",
44 "service-rate": "100",
45 "node-id": "SPDR-SA1",
46 "service-format": "OTU",
47 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
51 "port-device-name": "SPDR-SA1-XPDR2",
53 "port-name": "XPDR2-NETWORK1",
54 "port-rack": "000000.00",
55 "port-shelf": "Chassis#1"
58 "lgx-device-name": "Some lgx-device-name",
59 "lgx-port-name": "Some lgx-port-name",
60 "lgx-port-rack": "000000.00",
61 "lgx-port-shelf": "00"
67 "port-device-name": "SPDR-SA1-XPDR2",
69 "port-name": "XPDR2-NETWORK1",
70 "port-rack": "000000.00",
71 "port-shelf": "Chassis#1"
74 "lgx-device-name": "Some lgx-device-name",
75 "lgx-port-name": "Some lgx-port-name",
76 "lgx-port-rack": "000000.00",
77 "lgx-port-shelf": "00"
84 "service-rate": "100",
85 "node-id": "SPDR-SB1",
86 "service-format": "OTU",
87 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
91 "port-device-name": "SPDR-SB1-XPDR2",
93 "port-name": "XPDR2-NETWORK1",
94 "port-rack": "000000.00",
95 "port-shelf": "Chassis#1"
98 "lgx-device-name": "Some lgx-device-name",
99 "lgx-port-name": "Some lgx-port-name",
100 "lgx-port-rack": "000000.00",
101 "lgx-port-shelf": "00"
107 "port-device-name": "SPDR-SB1-XPDR2",
108 "port-type": "fixed",
109 "port-name": "XPDR2-NETWORK1",
110 "port-rack": "000000.00",
111 "port-shelf": "Chassis#1"
114 "lgx-device-name": "Some lgx-device-name",
115 "lgx-port-name": "Some lgx-port-name",
116 "lgx-port-rack": "000000.00",
117 "lgx-port-shelf": "00"
123 "due-date": "2018-06-15T00:00:01Z",
124 "operator-contact": "pw1234"
130 cls.processes = test_utils.start_tpce()
131 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
132 ('spdrb', cls.NODE_VERSION),
133 ('spdrc', cls.NODE_VERSION),
134 ('roadma', cls.NODE_VERSION),
135 ('roadmb', cls.NODE_VERSION),
136 ('roadmc', cls.NODE_VERSION)])
139 def tearDownClass(cls):
140 # pylint: disable=not-an-iterable
141 for process in cls.processes:
142 test_utils.shutdown_process(process)
143 print("all processes killed")
148 def test_001_connect_spdrA(self):
149 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
150 self.assertEqual(response.status_code,
151 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_002_connect_spdrB(self):
154 response = test_utils.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
155 self.assertEqual(response.status_code,
156 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
158 def test_003_connect_spdrC(self):
159 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
160 self.assertEqual(response.status_code,
161 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
163 def test_004_connect_rdmA(self):
164 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
165 self.assertEqual(response.status_code,
166 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
168 def test_005_connect_rdmB(self):
169 response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
170 self.assertEqual(response.status_code,
171 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
173 def test_006_connect_rdmC(self):
174 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
175 self.assertEqual(response.status_code,
176 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
178 def test_007_connect_sprdA_2_N1_to_roadmA_PP3(self):
179 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "1",
180 "ROADM-A1", "1", "SRG1-PP3-TXRX")
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
183 self.assertIn('Xponder Roadm Link created successfully',
184 res["output"]["result"])
187 def test_008_connect_roadmA_PP3_to_spdrA_2_N1(self):
188 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "1",
189 "ROADM-A1", "1", "SRG1-PP3-TXRX")
190 self.assertEqual(response.status_code, requests.codes.ok)
191 res = response.json()
192 self.assertIn('Roadm Xponder links created successfully',
193 res["output"]["result"])
196 def test_009_connect_sprdC_2_N1_to_roadmC_PP3(self):
197 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "2", "1",
198 "ROADM-C1", "1", "SRG1-PP3-TXRX")
199 self.assertEqual(response.status_code, requests.codes.ok)
200 res = response.json()
201 self.assertIn('Xponder Roadm Link created successfully',
202 res["output"]["result"])
205 def test_010_connect_roadmC_PP3_to_spdrC_2_N1(self):
206 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "2", "1",
207 "ROADM-C1", "1", "SRG1-PP3-TXRX")
208 self.assertEqual(response.status_code, requests.codes.ok)
209 res = response.json()
210 self.assertIn('Roadm Xponder links created successfully',
211 res["output"]["result"])
214 def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
215 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "1",
216 "ROADM-B1", "1", "SRG1-PP1-TXRX")
217 self.assertEqual(response.status_code, requests.codes.ok)
218 res = response.json()
219 self.assertIn('Xponder Roadm Link created successfully',
220 res["output"]["result"])
223 def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
224 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "1",
225 "ROADM-B1", "1", "SRG1-PP1-TXRX")
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 self.assertIn('Roadm Xponder links created successfully',
229 res["output"]["result"])
232 def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
233 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SB1", "2", "2",
234 "ROADM-B1", "1", "SRG1-PP2-TXRX")
235 self.assertEqual(response.status_code, requests.codes.ok)
236 res = response.json()
237 self.assertIn('Xponder Roadm Link created successfully',
238 res["output"]["result"])
241 def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
242 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SB1", "2", "2",
243 "ROADM-B1", "1", "SRG1-PP2-TXRX")
244 self.assertEqual(response.status_code, requests.codes.ok)
245 res = response.json()
246 self.assertIn('Roadm Xponder links created successfully',
247 res["output"]["result"])
250 def test_015_add_omsAttributes_ROADMA_ROADMB(self):
251 # Config ROADMA-ROADMB oms-attributes
253 "auto-spanloss": "true",
254 "spanloss-base": 11.4,
255 "spanloss-current": 12,
256 "engineered-spanloss": 12.2,
257 "link-concatenation": [{
260 "SRLG-length": 100000,
262 response = test_utils.add_oms_attr_request(
263 "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
264 self.assertEqual(response.status_code, requests.codes.created)
266 def test_016_add_omsAttributes_ROADMB_ROADMA(self):
267 # Config ROADMB-ROADMA oms-attributes
269 "auto-spanloss": "true",
270 "spanloss-base": 11.4,
271 "spanloss-current": 12,
272 "engineered-spanloss": 12.2,
273 "link-concatenation": [{
276 "SRLG-length": 100000,
278 response = test_utils.add_oms_attr_request(
279 "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
280 self.assertEqual(response.status_code, requests.codes.created)
282 def test_017_add_omsAttributes_ROADMB_ROADMC(self):
283 # Config ROADMB-ROADMC oms-attributes
285 "auto-spanloss": "true",
286 "spanloss-base": 11.4,
287 "spanloss-current": 12,
288 "engineered-spanloss": 12.2,
289 "link-concatenation": [{
292 "SRLG-length": 100000,
294 response = test_utils.add_oms_attr_request(
295 "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
296 self.assertEqual(response.status_code, requests.codes.created)
298 def test_018_add_omsAttributes_ROADMC_ROADMB(self):
299 # Config ROADMC-ROADMB oms-attributes
301 "auto-spanloss": "true",
302 "spanloss-base": 11.4,
303 "spanloss-current": 12,
304 "engineered-spanloss": 12.2,
305 "link-concatenation": [{
308 "SRLG-length": 100000,
310 response = test_utils.add_oms_attr_request(
311 "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
312 self.assertEqual(response.status_code, requests.codes.created)
314 def test_019_create_OTS_ROADMA_DEG1(self):
315 response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
317 self.assertEqual(response.status_code, requests.codes.ok)
318 res = response.json()
319 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
320 res["output"]["result"])
322 def test_020_create_OTS_ROADMB_DEG1(self):
323 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG1-TTP-TXRX")
325 self.assertEqual(response.status_code, requests.codes.ok)
326 res = response.json()
327 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
328 res["output"]["result"])
330 def test_021_create_OTS_ROADMB_DEG2(self):
331 response = test_utils.create_ots_oms_request("ROADM-B1", "DEG2-TTP-TXRX")
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
336 res["output"]["result"])
338 def test_022_create_OTS_ROADMC_DEG2(self):
339 response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
343 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
344 res["output"]["result"])
346 def test_023_calculate_span_loss_base_all(self):
347 url = "{}/operations/transportpce-olm:calculate-spanloss-base"
353 response = test_utils.post_request(url, data)
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertIn('Success',
357 res["output"]["result"])
360 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
361 }, res["output"]["spans"])
364 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
365 }, res["output"]["spans"])
368 "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
369 }, res["output"]["spans"])
372 "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
373 }, res["output"]["spans"])
376 "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
377 }, res["output"]["spans"])
380 "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
381 }, res["output"]["spans"])
384 def test_024_check_otn_topology(self):
385 response = test_utils.get_otn_topo_request()
386 self.assertEqual(response.status_code, requests.codes.ok)
387 res = response.json()
388 nbNode = len(res['network'][0]['node'])
389 self.assertEqual(nbNode, 9, 'There should be 9 nodes')
390 self.assertNotIn('ietf-network-topology:link', res['network'][0],
391 'otn-topology should have no link')
393 # test service-create for OCH-OTU4 service from spdrA to spdrB
394 def test_025_create_OCH_OTU4_service_AB(self):
395 response = test_utils.service_create_request(self.cr_serv_sample_data)
396 self.assertEqual(response.status_code, requests.codes.ok)
397 res = response.json()
398 self.assertIn('PCE calculation in progress',
399 res['output']['configuration-response-common']['response-message'])
400 time.sleep(self.WAITING)
402 def test_026_get_OCH_OTU4_service_AB(self):
403 response = test_utils.get_service_list_request(
404 "services/service-OCH-OTU4-AB")
405 self.assertEqual(response.status_code, requests.codes.ok)
406 res = response.json()
408 res['services'][0]['administrative-state'], 'inService')
410 res['services'][0]['service-name'], 'service-OCH-OTU4-AB')
412 res['services'][0]['connection-type'], 'infrastructure')
414 res['services'][0]['lifecycle-state'], 'planned')
417 # Check correct configuration of devices
418 def test_027_check_interface_och_spdra(self):
419 response = test_utils.check_netconf_node_request(
420 "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
421 self.assertEqual(response.status_code, requests.codes.ok)
422 res = response.json()
423 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
424 'administrative-state': 'inService',
425 'supporting-circuit-pack-name': 'CP5-CFP',
426 'type': 'org-openroadm-interfaces:opticalChannel',
427 'supporting-port': 'CP5-CFP-P1'
428 }, **res['interface'][0]),
431 self.assertDictEqual(
432 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
433 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
434 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
436 def test_028_check_interface_OTU4_spdra(self):
437 response = test_utils.check_netconf_node_request(
438 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
441 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
442 'administrative-state': 'inService',
443 'supporting-circuit-pack-name': 'CP5-CFP',
444 'supporting-interface': 'XPDR2-NETWORK1-761:768',
445 'type': 'org-openroadm-interfaces:otnOtu',
446 'supporting-port': 'CP5-CFP-P1'
448 input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
449 'expected-dapi': 'AOQxIv+6nCD+',
450 'tx-dapi': 'X+8cRNi+HbE=',
451 'expected-sapi': 'X+8cRNi+HbE=',
452 'rate': 'org-openroadm-otn-common-types:OTU4',
455 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
458 self.assertDictEqual(input_dict_2,
460 ['org-openroadm-otn-otu-interfaces:otu'])
461 response2 = test_utils.check_netconf_node_request(
462 "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU/otu")
463 self.assertEqual(response2.status_code, requests.codes.ok)
464 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
465 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
466 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
467 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
468 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
470 def test_029_check_interface_och_spdrB(self):
471 response = test_utils.check_netconf_node_request(
472 "SPDR-SB1", "interface/XPDR2-NETWORK1-761:768")
473 self.assertEqual(response.status_code, requests.codes.ok)
474 res = response.json()
475 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
476 'administrative-state': 'inService',
477 'supporting-circuit-pack-name': 'CP5-CFP',
478 'type': 'org-openroadm-interfaces:opticalChannel',
479 'supporting-port': 'CP5-CFP-P1'
480 }, **res['interface'][0]),
483 self.assertDictEqual(
484 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
485 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
486 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
488 def test_030_check_interface_OTU4_spdrB(self):
489 response = test_utils.check_netconf_node_request(
490 "SPDR-SB1", "interface/XPDR2-NETWORK1-OTU")
491 self.assertEqual(response.status_code, requests.codes.ok)
492 res = response.json()
493 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
494 'administrative-state': 'inService',
495 'supporting-circuit-pack-name': 'CP5-CFP',
496 'supporting-interface': 'XPDR2-NETWORK1-761:768',
497 'type': 'org-openroadm-interfaces:otnOtu',
498 'supporting-port': 'CP5-CFP-P1'
500 input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
501 'expected-sapi': 'AOQxIv+6nCD+',
502 'tx-sapi': 'X+8cRNi+HbE=',
503 'expected-dapi': 'X+8cRNi+HbE=',
504 'rate': 'org-openroadm-otn-common-types:OTU4',
508 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
511 self.assertDictEqual(input_dict_2,
513 ['org-openroadm-otn-otu-interfaces:otu'])
514 response2 = test_utils.check_netconf_node_request(
515 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU/otu")
516 self.assertEqual(response.status_code, requests.codes.ok)
517 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
518 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
519 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
520 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
521 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
523 def test_031_check_no_interface_ODU4_spdra(self):
524 response = test_utils.check_netconf_node_request(
525 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
526 self.assertEqual(response.status_code, requests.codes.conflict)
527 res = response.json()
529 {"error-type": "application", "error-tag": "data-missing",
530 "error-message": "Request could not be completed because the relevant data model content does not exist"},
531 res['errors']['error'])
533 def test_032_check_openroadm_topo_spdra(self):
534 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
535 self.assertEqual(response.status_code, requests.codes.ok)
536 res = response.json()
537 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
538 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
539 self.assertEqual({'frequency': 196.1,
541 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
542 self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
543 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
546 def test_033_check_openroadm_topo_ROADMA_SRG(self):
547 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
548 self.assertEqual(response.status_code, requests.codes.ok)
549 res = response.json()
550 freq_map = base64.b64decode(
551 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
552 freq_map_array = [int(x) for x in freq_map]
553 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
554 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
556 if ele['tp-id'] == 'SRG1-PP3-TXRX':
557 freq_map = base64.b64decode(
558 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
559 freq_map_array = [int(x) for x in freq_map]
560 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
561 if ele['tp-id'] == 'SRG1-PP2-TXRX':
562 self.assertNotIn('avail-freq-maps', dict.keys(ele))
565 def test_034_check_openroadm_topo_ROADMA_DEG1(self):
566 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG1")
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 freq_map = base64.b64decode(
570 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
571 freq_map_array = [int(x) for x in freq_map]
572 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
573 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
575 if ele['tp-id'] == 'DEG1-CTP-TXRX':
576 freq_map = base64.b64decode(
577 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
578 freq_map_array = [int(x) for x in freq_map]
579 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
580 if ele['tp-id'] == 'DEG1-TTP-TXRX':
581 freq_map = base64.b64decode(
582 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
583 freq_map_array = [int(x) for x in freq_map]
584 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
587 def test_035_check_otn_topo_otu4_links(self):
588 response = test_utils.get_otn_topo_request()
589 self.assertEqual(response.status_code, requests.codes.ok)
590 res = response.json()
591 nb_links = len(res['network'][0]['ietf-network-topology:link'])
592 self.assertEqual(nb_links, 2)
593 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
594 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
595 for link in res['network'][0]['ietf-network-topology:link']:
596 self.assertIn(link['link-id'], listLinkId)
598 link['transportpce-networkutils:otn-link-type'], 'OTU4')
600 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
602 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
604 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
606 link['org-openroadm-common-network:opposite-link'], listLinkId)
609 # test service-create for OCH-OTU4 service from spdrB to spdrC
611 def test_036_create_OCH_OTU4_service_BC(self):
612 # pylint: disable=line-too-long
613 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-BC"
614 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SB1"
615 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSB"
616 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
617 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
618 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
619 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
620 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
621 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
622 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
623 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
624 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
625 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
627 response = test_utils.service_create_request(self.cr_serv_sample_data)
628 self.assertEqual(response.status_code, requests.codes.ok)
629 res = response.json()
630 self.assertIn('PCE calculation in progress',
631 res['output']['configuration-response-common']['response-message'])
632 time.sleep(self.WAITING)
634 def test_037_get_OCH_OTU4_service_BC(self):
635 response = test_utils.get_service_list_request(
636 "services/service-OCH-OTU4-BC")
637 self.assertEqual(response.status_code, requests.codes.ok)
638 res = response.json()
640 res['services'][0]['administrative-state'], 'inService')
642 res['services'][0]['service-name'], 'service-OCH-OTU4-BC')
644 res['services'][0]['connection-type'], 'infrastructure')
646 res['services'][0]['lifecycle-state'], 'planned')
649 # Check correct configuration of devices
650 def test_038_check_interface_och_spdrB(self):
651 response = test_utils.check_netconf_node_request(
652 "SPDR-SB1", "interface/XPDR2-NETWORK2-753:760")
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
655 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
656 'administrative-state': 'inService',
657 'supporting-circuit-pack-name': 'CP6-CFP',
658 'type': 'org-openroadm-interfaces:opticalChannel',
659 'supporting-port': 'CP1-CFP0-P1'
660 }, **res['interface'][0]),
663 self.assertDictEqual(
664 {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
665 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
666 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
668 def test_039_check_interface_OTU4_spdrB(self):
669 response = test_utils.check_netconf_node_request(
670 "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU")
671 self.assertEqual(response.status_code, requests.codes.ok)
672 res = response.json()
673 input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
674 'administrative-state': 'inService',
675 'supporting-circuit-pack-name': 'CP6-CFP',
676 'supporting-interface': 'XPDR2-NETWORK1-753:760',
677 'type': 'org-openroadm-interfaces:otnOtu',
678 'supporting-port': 'CP6-CFP-P1'
680 input_dict_2 = {'tx-sapi': 'X+8cRNi+HbI=',
681 'expected-dapi': 'X+8cRNi+HbI=',
682 'tx-dapi': 'ALvne1QI5jo4',
683 'expected-sapi': 'ALvne1QI5jo4',
684 'rate': 'org-openroadm-otn-common-types:OTU4',
687 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
690 self.assertDictEqual(input_dict_2,
692 ['org-openroadm-otn-otu-interfaces:otu'])
693 response2 = test_utils.check_netconf_node_request(
694 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU/otu")
695 self.assertEqual(response.status_code, requests.codes.ok)
696 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
697 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
698 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
699 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
700 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
702 def test_040_check_interface_och_spdrC(self):
703 response = test_utils.check_netconf_node_request(
704 "SPDR-SC1", "interface/XPDR2-NETWORK1-753:760")
705 self.assertEqual(response.status_code, requests.codes.ok)
706 res = response.json()
707 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
708 'administrative-state': 'inService',
709 'supporting-circuit-pack-name': 'CP5-CFP',
710 'type': 'org-openroadm-interfaces:opticalChannel',
711 'supporting-port': 'CP5-CFP-P1'
712 }, **res['interface'][0]),
715 self.assertDictEqual(
716 {'frequency': 196.05, 'rate': 'org-openroadm-common-types:R100G',
717 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
718 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
720 def test_041_check_interface_OTU4_spdrC(self):
721 response = test_utils.check_netconf_node_request(
722 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
723 self.assertEqual(response.status_code, requests.codes.ok)
724 res = response.json()
725 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
726 'administrative-state': 'inService',
727 'supporting-circuit-pack-name': 'CP5-CFP',
728 'supporting-interface': 'XPDR2-NETWORK1-753:760',
729 'type': 'org-openroadm-interfaces:otnOtu',
730 'supporting-port': 'CP5-CFP-P1'
732 input_dict_2 = {'tx-dapi': 'X+8cRNi+HbI=',
733 'expected-sapi': 'X+8cRNi+HbI=',
734 'tx-sapi': 'ALvne1QI5jo4',
735 'expected-dapi': 'ALvne1QI5jo4',
736 'rate': 'org-openroadm-otn-common-types:OTU4',
740 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
743 self.assertDictEqual(input_dict_2,
745 ['org-openroadm-otn-otu-interfaces:otu'])
746 response2 = test_utils.check_netconf_node_request(
747 "SPDR-SB1", "interface/XPDR2-NETWORK2-OTU/otu")
748 self.assertEqual(response.status_code, requests.codes.ok)
749 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
750 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
751 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
752 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
753 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
755 def test_042_check_no_interface_ODU4_spdrB(self):
756 response = test_utils.check_netconf_node_request(
757 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
758 self.assertEqual(response.status_code, requests.codes.conflict)
759 res = response.json()
761 {"error-type": "application", "error-tag": "data-missing",
762 "error-message": "Request could not be completed because the relevant data model content does not exist"},
763 res['errors']['error'])
765 def test_043_check_openroadm_topo_spdrB(self):
766 response = test_utils.get_ordm_topo_request("node/SPDR-SB1-XPDR2")
767 self.assertEqual(response.status_code, requests.codes.ok)
768 res = response.json()
769 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
770 # pylint: disable=consider-using-f-string
772 if ele['tp-id'] == 'XPDR2-NETWORK1':
773 self.assertEqual({'frequency': 196.1,
775 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
776 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
777 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
778 elif ele['tp-id'] == 'XPDR2-NETWORK2':
779 self.assertEqual({'frequency': 196.05,
781 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
782 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
783 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
785 print("ele = {}".format(ele))
786 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
789 def test_044_check_openroadm_topo_ROADMB_SRG1(self):
790 response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
791 self.assertEqual(response.status_code, requests.codes.ok)
792 res = response.json()
793 freq_map = base64.b64decode(
794 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
795 freq_map_array = [int(x) for x in freq_map]
796 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
797 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
798 self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
799 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
801 if ele['tp-id'] == 'SRG1-PP1-TXRX':
802 freq_map = base64.b64decode(
803 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
804 freq_map_array = [int(x) for x in freq_map]
805 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
806 if ele['tp-id'] == 'SRG1-PP2-TXRX':
807 freq_map = base64.b64decode(
808 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
809 freq_map_array = [int(x) for x in freq_map]
810 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
811 if ele['tp-id'] == 'SRG1-PP3-TXRX':
812 self.assertNotIn('avail-freq-maps', dict.keys(ele))
815 def test_045_check_openroadm_topo_ROADMB_DEG2(self):
816 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
817 self.assertEqual(response.status_code, requests.codes.ok)
818 res = response.json()
819 freq_map = base64.b64decode(
820 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
821 freq_map_array = [int(x) for x in freq_map]
822 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
823 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
825 if ele['tp-id'] == 'DEG2-CTP-TXRX':
826 freq_map = base64.b64decode(
827 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
828 freq_map_array = [int(x) for x in freq_map]
829 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
830 if ele['tp-id'] == 'DEG2-TTP-TXRX':
831 freq_map = base64.b64decode(
832 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
833 freq_map_array = [int(x) for x in freq_map]
834 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
837 def test_046_check_otn_topo_otu4_links(self):
838 response = test_utils.get_otn_topo_request()
839 self.assertEqual(response.status_code, requests.codes.ok)
840 res = response.json()
841 nb_links = len(res['network'][0]['ietf-network-topology:link'])
842 self.assertEqual(nb_links, 4)
843 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
844 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
845 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
846 'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
847 for link in res['network'][0]['ietf-network-topology:link']:
848 self.assertIn(link['link-id'], listLinkId)
850 link['transportpce-networkutils:otn-link-type'], 'OTU4')
852 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
854 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
856 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
858 link['org-openroadm-common-network:opposite-link'], listLinkId)
860 # test service-create for 100GE service from spdrA to spdrC via spdrB
861 def test_047_create_100GE_service_ABC(self):
862 # pylint: disable=line-too-long
863 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-ABC"
864 self.cr_serv_sample_data["input"]["connection-type"] = "service"
865 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
866 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
867 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
868 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
869 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
870 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
871 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
872 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
873 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
874 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
875 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
876 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
877 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
878 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
879 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
880 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
882 response = test_utils.service_create_request(self.cr_serv_sample_data)
883 self.assertEqual(response.status_code, requests.codes.ok)
884 res = response.json()
885 self.assertIn('PCE calculation in progress',
886 res['output']['configuration-response-common']['response-message'])
887 time.sleep(self.WAITING)
889 def test_048_get_100GE_service_ABC(self):
890 response = test_utils.get_service_list_request(
891 "services/service-100GE-ABC")
892 self.assertEqual(response.status_code, requests.codes.ok)
893 res = response.json()
895 res['services'][0]['administrative-state'], 'inService')
897 res['services'][0]['service-name'], 'service-100GE-ABC')
899 res['services'][0]['connection-type'], 'service')
901 res['services'][0]['lifecycle-state'], 'planned')
904 def test_049_check_interface_100GE_CLIENT_spdra(self):
905 response = test_utils.check_netconf_node_request(
906 "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
907 self.assertEqual(response.status_code, requests.codes.ok)
908 res = response.json()
909 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
910 'administrative-state': 'inService',
911 'supporting-circuit-pack-name': 'CP2-QSFP1',
912 'type': 'org-openroadm-interfaces:ethernetCsmacd',
913 'supporting-port': 'CP2-QSFP1-P1'
915 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
917 self.assertDictEqual(
920 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
922 def test_050_check_interface_ODU4_CLIENT_spdra(self):
923 response = test_utils.check_netconf_node_request(
924 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
925 self.assertEqual(response.status_code, requests.codes.ok)
926 res = response.json()
927 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
928 'administrative-state': 'inService',
929 'supporting-circuit-pack-name': 'CP2-QSFP1',
930 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
931 'type': 'org-openroadm-interfaces:otnOdu',
932 'supporting-port': 'CP2-QSFP1-P1'}
934 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
935 'rate': 'org-openroadm-otn-common-types:ODU4',
936 'monitoring-mode': 'terminated',
937 'expected-dapi': 'AItaZ6nmyaKJ',
938 'expected-sapi': 'AKFnJJaijWiz',
939 'tx-dapi': 'AKFnJJaijWiz',
940 'tx-sapi': 'AItaZ6nmyaKJ'}
942 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
944 self.assertDictEqual(dict(input_dict_2,
945 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
946 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
947 self.assertDictEqual(
948 {'payload-type': '21', 'exp-payload-type': '21'},
949 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
950 response2 = test_utils.check_netconf_node_request(
951 "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
952 self.assertEqual(response.status_code, requests.codes.ok)
953 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
954 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
955 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
956 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
957 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
959 def test_051_check_interface_ODU4_NETWORK_spdra(self):
960 response = test_utils.check_netconf_node_request(
961 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
962 self.assertEqual(response.status_code, requests.codes.ok)
963 res = response.json()
964 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
965 'administrative-state': 'inService',
966 'supporting-circuit-pack-name': 'CP5-CFP',
967 'type': 'org-openroadm-interfaces:otnOdu',
968 'supporting-port': 'CP5-CFP-P1',
970 'description': 'TBD'}
972 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
973 'rate': 'org-openroadm-otn-common-types:ODU4',
974 'monitoring-mode': 'monitored'}
976 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
978 self.assertDictEqual(dict(input_dict_2,
979 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
980 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
981 self.assertNotIn('opu',
982 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
984 def test_052_check_ODU4_connection_spdra(self):
985 response = test_utils.check_netconf_node_request(
987 "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
988 self.assertEqual(response.status_code, requests.codes.ok)
989 res = response.json()
992 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
993 'direction': 'bidirectional'
996 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
997 res['odu-connection'][0])
998 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
999 res['odu-connection'][0]['destination'])
1000 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1001 res['odu-connection'][0]['source'])
1003 def test_053_check_interface_100GE_CLIENT_spdrc(self):
1004 response = test_utils.check_netconf_node_request(
1005 "SPDR-SC1", "interface/XPDR2-CLIENT1-ETHERNET")
1006 self.assertEqual(response.status_code, requests.codes.ok)
1007 res = response.json()
1008 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
1009 'administrative-state': 'inService',
1010 'supporting-circuit-pack-name': 'CP2-QSFP1',
1011 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1012 'supporting-port': 'CP2-QSFP1-P1'
1014 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1015 res['interface'][0])
1016 self.assertDictEqual(
1019 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1021 def test_054_check_interface_ODU4_CLIENT_spdrc(self):
1022 response = test_utils.check_netconf_node_request(
1023 "SPDR-SC1", "interface/XPDR2-CLIENT1-ODU4")
1024 self.assertEqual(response.status_code, requests.codes.ok)
1025 res = response.json()
1026 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
1027 'administrative-state': 'inService',
1028 'supporting-circuit-pack-name': 'CP2-QSFP1',
1029 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
1030 'type': 'org-openroadm-interfaces:otnOdu',
1031 'supporting-port': 'CP2-QSFP1-P1',
1032 'circuit-id': 'TBD',
1033 'description': 'TBD'}
1035 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1036 'rate': 'org-openroadm-otn-common-types:ODU4',
1037 'monitoring-mode': 'terminated',
1038 'expected-dapi': 'AKFnJJaijWiz',
1039 'expected-sapi': 'AItaZ6nmyaKJ',
1040 'tx-dapi': 'AItaZ6nmyaKJ',
1041 'tx-sapi': 'AKFnJJaijWiz'}
1043 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1044 res['interface'][0])
1045 self.assertDictEqual(dict(input_dict_2,
1046 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1047 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1048 self.assertDictEqual(
1049 {'payload-type': '21', 'exp-payload-type': '21'},
1050 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1051 response2 = test_utils.check_netconf_node_request(
1052 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1053 self.assertEqual(response.status_code, requests.codes.ok)
1054 res2 = response2.json()['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1055 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1056 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1057 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1058 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1060 def test_055_check_interface_ODU4_NETWORK_spdrc(self):
1061 response = test_utils.check_netconf_node_request(
1062 "SPDR-SC1", "interface/XPDR2-NETWORK1-ODU4")
1063 self.assertEqual(response.status_code, requests.codes.ok)
1064 res = response.json()
1065 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1066 'administrative-state': 'inService',
1067 'supporting-circuit-pack-name': 'CP5-CFP',
1068 'type': 'org-openroadm-interfaces:otnOdu',
1069 'supporting-port': 'CP5-CFP-P1',
1070 'circuit-id': 'TBD',
1071 'description': 'TBD'}
1073 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1074 'rate': 'org-openroadm-otn-common-types:ODU4',
1075 'monitoring-mode': 'monitored'}
1077 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1078 res['interface'][0])
1079 self.assertDictEqual(dict(input_dict_2,
1080 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1081 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1082 self.assertNotIn('opu',
1083 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1085 def test_056_check_ODU4_connection_spdrc(self):
1086 response = test_utils.check_netconf_node_request(
1088 "odu-connection/XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4")
1089 self.assertEqual(response.status_code, requests.codes.ok)
1090 res = response.json()
1093 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1094 'direction': 'bidirectional'
1097 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1098 res['odu-connection'][0])
1099 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1100 res['odu-connection'][0]['destination'])
1101 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1102 res['odu-connection'][0]['source'])
1104 def test_057_check_interface_ODU4_NETWORK1_spdrb(self):
1105 response = test_utils.check_netconf_node_request(
1106 "SPDR-SB1", "interface/XPDR2-NETWORK1-ODU4")
1107 self.assertEqual(response.status_code, requests.codes.ok)
1108 res = response.json()
1109 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1110 'administrative-state': 'inService',
1111 'supporting-circuit-pack-name': 'CP5-CFP',
1112 'type': 'org-openroadm-interfaces:otnOdu',
1113 'supporting-port': 'CP5-CFP-P1'}
1115 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1116 'rate': 'org-openroadm-otn-common-types:ODU4',
1117 'monitoring-mode': 'monitored'}
1119 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1120 res['interface'][0])
1121 self.assertDictEqual(dict(input_dict_2,
1122 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1123 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1124 self.assertNotIn('opu',
1125 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1127 def test_058_check_interface_ODU4_NETWORK2_spdrb(self):
1128 response = test_utils.check_netconf_node_request(
1129 "SPDR-SB1", "interface/XPDR2-NETWORK2-ODU4")
1130 self.assertEqual(response.status_code, requests.codes.ok)
1131 res = response.json()
1132 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1133 'administrative-state': 'inService',
1134 'supporting-circuit-pack-name': 'CP6-CFP',
1135 'type': 'org-openroadm-interfaces:otnOdu',
1136 'supporting-port': 'CP6-CFP-P1'}
1138 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1139 'rate': 'org-openroadm-otn-common-types:ODU4',
1140 'monitoring-mode': 'monitored'}
1142 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1143 res['interface'][0])
1144 self.assertDictEqual(dict(input_dict_2,
1145 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1146 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1147 self.assertNotIn('opu',
1148 dict.keys(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1150 def test_059_check_ODU4_connection_spdrb(self):
1151 response = test_utils.check_netconf_node_request(
1153 "odu-connection/XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4")
1154 self.assertEqual(response.status_code, requests.codes.ok)
1155 res = response.json()
1158 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1159 'direction': 'bidirectional'
1162 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1163 res['odu-connection'][0])
1164 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1165 res['odu-connection'][0]['destination'])
1166 self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1167 res['odu-connection'][0]['source'])
1169 def test_060_check_otn_topo_links(self):
1170 response = test_utils.get_otn_topo_request()
1171 self.assertEqual(response.status_code, requests.codes.ok)
1172 res = response.json()
1173 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1174 self.assertEqual(nb_links, 4)
1175 for link in res['network'][0]['ietf-network-topology:link']:
1177 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1179 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1181 def test_061_delete_service_100GE_ABC(self):
1182 response = test_utils.service_delete_request("service-100GE-ABC")
1183 self.assertEqual(response.status_code, requests.codes.ok)
1184 res = response.json()
1185 self.assertIn('Renderer service delete in progress',
1186 res['output']['configuration-response-common']['response-message'])
1187 time.sleep(self.WAITING)
1189 def test_062_check_service_list(self):
1190 response = test_utils.get_service_list_request("")
1191 self.assertEqual(response.status_code, requests.codes.ok)
1192 res = response.json()
1193 self.assertEqual(len(res['service-list']['services']), 2)
1196 def test_063_check_no_ODU4_connection_spdra(self):
1197 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1198 self.assertEqual(response.status_code, requests.codes.ok)
1199 res = response.json()
1200 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1203 def test_064_check_no_interface_ODU4_NETWORK_spdra(self):
1204 response = test_utils.check_netconf_node_request(
1205 "SPDR-SA1", "interface/XPDR2-NETWORK1-ODU4")
1206 self.assertEqual(response.status_code, requests.codes.conflict)
1208 def test_065_check_no_interface_ODU4_CLIENT_spdra(self):
1209 response = test_utils.check_netconf_node_request(
1210 "SPDR-SA1", "interface/XPDR2-CLIENT1-ODU4")
1211 self.assertEqual(response.status_code, requests.codes.conflict)
1213 def test_066_check_no_interface_100GE_CLIENT_spdra(self):
1214 response = test_utils.check_netconf_node_request(
1215 "SPDR-SA1", "interface/XPDR2-CLIENT1-ETHERNET")
1216 self.assertEqual(response.status_code, requests.codes.conflict)
1218 def test_067_check_otn_topo_links(self):
1219 self.test_046_check_otn_topo_otu4_links()
1221 def test_068_delete_OCH_OTU4_service_AB(self):
1222 response = test_utils.service_delete_request("service-OCH-OTU4-AB")
1223 self.assertEqual(response.status_code, requests.codes.ok)
1224 res = response.json()
1225 self.assertIn('Renderer service delete in progress',
1226 res['output']['configuration-response-common']['response-message'])
1227 time.sleep(self.WAITING)
1229 def test_069_delete_OCH_OTU4_service_BC(self):
1230 response = test_utils.service_delete_request("service-OCH-OTU4-BC")
1231 self.assertEqual(response.status_code, requests.codes.ok)
1232 res = response.json()
1233 self.assertIn('Renderer service delete in progress',
1234 res['output']['configuration-response-common']['response-message'])
1235 time.sleep(self.WAITING)
1237 def test_070_get_no_service(self):
1238 response = test_utils.get_service_list_request("")
1239 self.assertEqual(response.status_code, requests.codes.conflict)
1240 res = response.json()
1242 {"error-type": "application", "error-tag": "data-missing",
1243 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1244 res['errors']['error'])
1247 def test_071_check_no_interface_OTU4_spdra(self):
1248 response = test_utils.check_netconf_node_request(
1249 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1250 self.assertEqual(response.status_code, requests.codes.conflict)
1252 def test_072_check_no_interface_OCH_spdra(self):
1253 response = test_utils.check_netconf_node_request(
1254 "SPDR-SA1", "interface/XPDR2-NETWORK1-761:768")
1255 self.assertEqual(response.status_code, requests.codes.conflict)
1257 def test_073_getLinks_OtnTopology(self):
1258 response = test_utils.get_otn_topo_request()
1259 self.assertEqual(response.status_code, requests.codes.ok)
1260 res = response.json()
1261 self.assertNotIn('ietf-network-topology:link', res['network'][0])
1263 def test_074_check_openroadm_topo_spdra(self):
1264 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR2")
1265 self.assertEqual(response.status_code, requests.codes.ok)
1266 res = response.json()
1267 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
1268 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1269 self.assertNotIn('wavelength', dict.keys(
1270 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1273 def test_075_check_openroadm_topo_ROADMB_SRG1(self):
1274 response = test_utils.get_ordm_topo_request("node/ROADM-B1-SRG1")
1275 self.assertEqual(response.status_code, requests.codes.ok)
1276 res = response.json()
1277 freq_map = base64.b64decode(
1278 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1279 freq_map_array = [int(x) for x in freq_map]
1280 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1281 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1282 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1283 for ele in liste_tp:
1284 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1285 freq_map = base64.b64decode(
1286 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1287 freq_map_array = [int(x) for x in freq_map]
1288 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1289 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1290 freq_map = base64.b64decode(
1291 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1292 freq_map_array = [int(x) for x in freq_map]
1293 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1296 def test_076_check_openroadm_topo_ROADMB_DEG1(self):
1297 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG1")
1298 self.assertEqual(response.status_code, requests.codes.ok)
1299 res = response.json()
1300 freq_map = base64.b64decode(
1301 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1302 freq_map_array = [int(x) for x in freq_map]
1303 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1304 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1305 for ele in liste_tp:
1306 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1307 freq_map = base64.b64decode(
1308 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1309 freq_map_array = [int(x) for x in freq_map]
1310 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1311 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1312 freq_map = base64.b64decode(
1313 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1314 freq_map_array = [int(x) for x in freq_map]
1315 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1318 def test_077_check_openroadm_topo_ROADMB_DEG2(self):
1319 response = test_utils.get_ordm_topo_request("node/ROADM-B1-DEG2")
1320 self.assertEqual(response.status_code, requests.codes.ok)
1321 res = response.json()
1322 freq_map = base64.b64decode(
1323 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1324 freq_map_array = [int(x) for x in freq_map]
1325 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1326 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1327 for ele in liste_tp:
1328 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1329 freq_map = base64.b64decode(
1330 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1331 freq_map_array = [int(x) for x in freq_map]
1332 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1333 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1334 freq_map = base64.b64decode(
1335 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1336 freq_map_array = [int(x) for x in freq_map]
1337 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1340 def test_078_disconnect_xponders_from_roadm(self):
1341 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1342 response = test_utils.get_ordm_topo_request("")
1343 self.assertEqual(response.status_code, requests.codes.ok)
1344 res = response.json()
1345 links = res['network'][0]['ietf-network-topology:link']
1347 if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
1348 and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1349 link_name = link["link-id"]
1350 response = test_utils.delete_request(url+link_name)
1351 self.assertEqual(response.status_code, requests.codes.ok)
1353 def test_079_disconnect_spdrB(self):
1354 response = test_utils.unmount_device("SPDR-SB1")
1355 self.assertEqual(response.status_code, requests.codes.ok,
1356 test_utils.CODE_SHOULD_BE_200)
1358 def test_080_disconnect_roadmB(self):
1359 response = test_utils.unmount_device("ROADM-B1")
1360 self.assertEqual(response.status_code, requests.codes.ok,
1361 test_utils.CODE_SHOULD_BE_200)
1363 def test_081_remove_roadm_to_roadm_links(self):
1364 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1365 response = test_utils.get_ordm_topo_request("")
1366 self.assertEqual(response.status_code, requests.codes.ok)
1367 res = response.json()
1368 links = res['network'][0]['ietf-network-topology:link']
1370 if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1371 and 'ROADM-B1' in link['link-id']):
1372 link_name = link["link-id"]
1373 response = test_utils.delete_request(url+link_name)
1374 self.assertEqual(response.status_code, requests.codes.ok)
1376 def test_082_add_omsAttributes_ROADMA_ROADMC(self):
1377 # Config ROADMA-ROADMC oms-attributes
1379 "auto-spanloss": "true",
1380 "spanloss-base": 11.4,
1381 "spanloss-current": 12,
1382 "engineered-spanloss": 12.2,
1383 "link-concatenation": [{
1385 "fiber-type": "smf",
1386 "SRLG-length": 100000,
1388 response = test_utils.add_oms_attr_request(
1389 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
1390 self.assertEqual(response.status_code, requests.codes.created)
1392 def test_083_add_omsAttributes_ROADMC_ROADMA(self):
1393 # Config ROADMC-ROADMA oms-attributes
1395 "auto-spanloss": "true",
1396 "spanloss-base": 11.4,
1397 "spanloss-current": 12,
1398 "engineered-spanloss": 12.2,
1399 "link-concatenation": [{
1401 "fiber-type": "smf",
1402 "SRLG-length": 100000,
1404 response = test_utils.add_oms_attr_request(
1405 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
1406 self.assertEqual(response.status_code, requests.codes.created)
1408 def test_084_create_OCH_OTU4_service_AC(self):
1409 # pylint: disable=line-too-long
1410 self.cr_serv_sample_data["input"]["service-name"] = "service-OCH-OTU4-AC"
1411 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1412 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1413 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1414 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1415 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1416 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1417 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1418 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1419 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1420 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1421 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1422 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1423 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1424 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1425 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1426 response = test_utils.service_create_request(self.cr_serv_sample_data)
1427 self.assertEqual(response.status_code, requests.codes.ok)
1428 res = response.json()
1429 self.assertIn('PCE calculation in progress',
1430 res['output']['configuration-response-common']['response-message'])
1431 time.sleep(self.WAITING)
1433 def test_085_get_OCH_OTU4_service_AC(self):
1434 response = test_utils.get_service_list_request(
1435 "services/service-OCH-OTU4-AC")
1436 self.assertEqual(response.status_code, requests.codes.ok)
1437 res = response.json()
1439 res['services'][0]['administrative-state'], 'inService')
1441 res['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1443 res['services'][0]['connection-type'], 'infrastructure')
1445 res['services'][0]['lifecycle-state'], 'planned')
1448 # test service-create for 100GE service from spdrA to spdrC via spdrB
1449 def test_086_create_100GE_service_AC(self):
1450 # pylint: disable=line-too-long
1451 self.cr_serv_sample_data["input"]["service-name"] = "service-100GE-AC"
1452 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1453 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1454 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "SPDR-SA1"
1455 self.cr_serv_sample_data["input"]["service-a-end"]["clli"] = "NodeSA"
1456 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1457 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1458 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1459 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1460 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1461 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1462 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "SPDR-SC1"
1463 self.cr_serv_sample_data["input"]["service-z-end"]["clli"] = "NodeSC"
1464 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1465 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1466 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1467 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1468 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1470 response = test_utils.service_create_request(self.cr_serv_sample_data)
1471 self.assertEqual(response.status_code, requests.codes.ok)
1472 res = response.json()
1473 self.assertIn('PCE calculation in progress',
1474 res['output']['configuration-response-common']['response-message'])
1475 time.sleep(self.WAITING)
1477 def test_087_get_100GE_service_AC(self):
1478 response = test_utils.get_service_list_request("services/service-100GE-AC")
1479 self.assertEqual(response.status_code, requests.codes.ok)
1480 res = response.json()
1482 res['services'][0]['administrative-state'], 'inService')
1484 res['services'][0]['service-name'], 'service-100GE-AC')
1486 res['services'][0]['connection-type'], 'service')
1488 res['services'][0]['lifecycle-state'], 'planned')
1491 def test_088_check_interface_OTU4_spdra(self):
1492 response = test_utils.check_netconf_node_request(
1493 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU")
1494 self.assertEqual(response.status_code, requests.codes.ok)
1495 res = response.json()
1496 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1497 'administrative-state': 'inService',
1498 'supporting-circuit-pack-name': 'CP5-CFP',
1499 'supporting-interface': 'XPDR2-NETWORK1-761:768',
1500 'type': 'org-openroadm-interfaces:otnOtu',
1501 'supporting-port': 'CP5-CFP-P1'
1503 input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
1504 'expected-dapi': 'AOQxIv+6nCD+',
1505 'tx-dapi': 'ALvne1QI5jo4',
1506 'expected-sapi': 'ALvne1QI5jo4',
1507 'rate': 'org-openroadm-otn-common-types:OTU4',
1510 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1511 res['interface'][0])
1513 self.assertDictEqual(input_dict_2,
1515 ['org-openroadm-otn-otu-interfaces:otu'])
1516 response2 = test_utils.check_netconf_node_request(
1517 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU/otu")
1518 self.assertEqual(response.status_code, requests.codes.ok)
1519 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
1520 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1521 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1522 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1523 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1525 def test_089_check_interface_OTU4_spdrC(self):
1526 response = test_utils.check_netconf_node_request(
1527 "SPDR-SC1", "interface/XPDR2-NETWORK1-OTU")
1528 self.assertEqual(response.status_code, requests.codes.ok)
1529 res = response.json()
1530 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1531 'administrative-state': 'inService',
1532 'supporting-circuit-pack-name': 'CP5-CFP',
1533 'supporting-interface': 'XPDR2-NETWORK1-753:760',
1534 'type': 'org-openroadm-interfaces:otnOtu',
1535 'supporting-port': 'CP5-CFP-P1'
1537 input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
1538 'expected-sapi': 'AOQxIv+6nCD+',
1539 'tx-sapi': 'ALvne1QI5jo4',
1540 'expected-dapi': 'ALvne1QI5jo4',
1541 'rate': 'org-openroadm-otn-common-types:OTU4',
1545 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1546 res['interface'][0])
1547 self.assertDictEqual(input_dict_2,
1549 ['org-openroadm-otn-otu-interfaces:otu'])
1550 response2 = test_utils.check_netconf_node_request(
1551 "SPDR-SA1", "interface/XPDR2-NETWORK1-OTU/otu")
1552 self.assertEqual(response.status_code, requests.codes.ok)
1553 res2 = response2.json()['org-openroadm-otn-otu-interfaces:otu']
1554 self.assertEqual(input_dict_2['tx-sapi'], res2['tx-dapi'])
1555 self.assertEqual(input_dict_2['tx-sapi'], res2['expected-sapi'])
1556 self.assertEqual(input_dict_2['tx-dapi'], res2['tx-sapi'])
1557 self.assertEqual(input_dict_2['tx-dapi'], res2['expected-dapi'])
1559 def test_090_check_configuration_spdra(self):
1560 self.test_049_check_interface_100GE_CLIENT_spdra()
1561 self.test_050_check_interface_ODU4_CLIENT_spdra()
1562 self.test_051_check_interface_ODU4_NETWORK_spdra()
1563 self.test_052_check_ODU4_connection_spdra()
1565 def test_091_check_configuration_spdrc(self):
1566 self.test_053_check_interface_100GE_CLIENT_spdrc()
1567 self.test_054_check_interface_ODU4_CLIENT_spdrc()
1568 self.test_055_check_interface_ODU4_NETWORK_spdrc()
1569 self.test_056_check_ODU4_connection_spdrc()
1571 def test_092_check_otn_topo_links(self):
1572 response = test_utils.get_otn_topo_request()
1573 self.assertEqual(response.status_code, requests.codes.ok)
1574 res = response.json()
1575 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1576 self.assertEqual(nb_links, 2)
1577 for link in res['network'][0]['ietf-network-topology:link']:
1579 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1581 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1583 def test_093_delete_100GE_service_AC(self):
1584 response = test_utils.service_delete_request("service-100GE-AC")
1585 self.assertEqual(response.status_code, requests.codes.ok)
1586 res = response.json()
1587 self.assertIn('Renderer service delete in progress',
1588 res['output']['configuration-response-common']['response-message'])
1589 time.sleep(self.WAITING)
1591 def test_094_check_service_list(self):
1592 response = test_utils.get_service_list_request("")
1593 self.assertEqual(response.status_code, requests.codes.ok)
1594 res = response.json()
1595 self.assertEqual(len(res['service-list']['services']), 1)
1598 def test_095_check_configuration_spdra(self):
1599 self.test_063_check_no_ODU4_connection_spdra()
1600 self.test_064_check_no_interface_ODU4_NETWORK_spdra()
1601 self.test_065_check_no_interface_ODU4_CLIENT_spdra()
1602 self.test_066_check_no_interface_100GE_CLIENT_spdra()
1604 def test_096_check_otn_topo_links(self):
1605 response = test_utils.get_otn_topo_request()
1606 self.assertEqual(response.status_code, requests.codes.ok)
1607 res = response.json()
1608 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1609 self.assertEqual(nb_links, 2)
1610 for link in res['network'][0]['ietf-network-topology:link']:
1612 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1614 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1616 def test_097_disconnect_xponders_from_roadm(self):
1617 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1618 response = test_utils.get_ordm_topo_request("")
1619 self.assertEqual(response.status_code, requests.codes.ok)
1620 res = response.json()
1621 links = res['network'][0]['ietf-network-topology:link']
1623 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1624 link_name = link["link-id"]
1625 response = test_utils.delete_request(url+link_name)
1626 self.assertEqual(response.status_code, requests.codes.ok)
1628 def test_098_disconnect_spdrA(self):
1629 response = test_utils.unmount_device("SPDR-SA1")
1630 self.assertEqual(response.status_code, requests.codes.ok,
1631 test_utils.CODE_SHOULD_BE_200)
1633 def test_099_disconnect_spdrC(self):
1634 response = test_utils.unmount_device("SPDR-SC1")
1635 self.assertEqual(response.status_code, requests.codes.ok,
1636 test_utils.CODE_SHOULD_BE_200)
1638 def test_100_disconnect_roadmA(self):
1639 response = test_utils.unmount_device("ROADM-A1")
1640 self.assertEqual(response.status_code, requests.codes.ok,
1641 test_utils.CODE_SHOULD_BE_200)
1643 def test_101_disconnect_roadmC(self):
1644 response = test_utils.unmount_device("ROADM-C1")
1645 self.assertEqual(response.status_code, requests.codes.ok,
1646 test_utils.CODE_SHOULD_BE_200)
1649 if __name__ == "__main__":
1650 unittest.main(verbosity=2)