3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils_rfc8040 # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION = '2.2.1'
34 cr_serv_input_data = {
35 "sdnc-request-header": {
36 "request-id": "request-1",
37 "rpc-action": "service-create",
38 "request-system-id": "appname"
40 "service-name": "service-OCH-OTU4-AB",
41 "common-id": "commonId",
42 "connection-type": "infrastructure",
44 "service-rate": "100",
45 "node-id": "SPDR-SA1",
46 "service-format": "OTU",
47 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
51 "port-device-name": "SPDR-SA1-XPDR2",
53 "port-name": "XPDR2-NETWORK1",
54 "port-rack": "000000.00",
55 "port-shelf": "Chassis#1"
58 "lgx-device-name": "Some lgx-device-name",
59 "lgx-port-name": "Some lgx-port-name",
60 "lgx-port-rack": "000000.00",
61 "lgx-port-shelf": "00"
67 "port-device-name": "SPDR-SA1-XPDR2",
69 "port-name": "XPDR2-NETWORK1",
70 "port-rack": "000000.00",
71 "port-shelf": "Chassis#1"
74 "lgx-device-name": "Some lgx-device-name",
75 "lgx-port-name": "Some lgx-port-name",
76 "lgx-port-rack": "000000.00",
77 "lgx-port-shelf": "00"
84 "service-rate": "100",
85 "node-id": "SPDR-SB1",
86 "service-format": "OTU",
87 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
91 "port-device-name": "SPDR-SB1-XPDR2",
93 "port-name": "XPDR2-NETWORK1",
94 "port-rack": "000000.00",
95 "port-shelf": "Chassis#1"
98 "lgx-device-name": "Some lgx-device-name",
99 "lgx-port-name": "Some lgx-port-name",
100 "lgx-port-rack": "000000.00",
101 "lgx-port-shelf": "00"
107 "port-device-name": "SPDR-SB1-XPDR2",
108 "port-type": "fixed",
109 "port-name": "XPDR2-NETWORK1",
110 "port-rack": "000000.00",
111 "port-shelf": "Chassis#1"
114 "lgx-device-name": "Some lgx-device-name",
115 "lgx-port-name": "Some lgx-port-name",
116 "lgx-port-rack": "000000.00",
117 "lgx-port-shelf": "00"
123 "due-date": "2018-06-15T00:00:01Z",
124 "operator-contact": "pw1234"
127 del_serv_input_data = {
128 "sdnc-request-header": {
129 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
130 "rpc-action": "service-delete",
131 "request-system-id": "appname",
132 "notification-url": "http://localhost:8585/NotificationServer/notify"},
133 "service-delete-req-info": {
134 "service-name": "TBD",
135 "tail-retention": "no"}
140 cls.processes = test_utils_rfc8040.start_tpce()
141 cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
142 ('spdrb', cls.NODE_VERSION),
143 ('spdrc', cls.NODE_VERSION),
144 ('roadma', cls.NODE_VERSION),
145 ('roadmb', cls.NODE_VERSION),
146 ('roadmc', cls.NODE_VERSION)])
149 def tearDownClass(cls):
150 # pylint: disable=not-an-iterable
151 for process in cls.processes:
152 test_utils_rfc8040.shutdown_process(process)
153 print("all processes killed")
158 def test_001_connect_spdrA(self):
159 response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
160 self.assertEqual(response.status_code,
161 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
163 def test_002_connect_spdrB(self):
164 response = test_utils_rfc8040.mount_device("SPDR-SB1", ('spdrb', self.NODE_VERSION))
165 self.assertEqual(response.status_code,
166 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
168 def test_003_connect_spdrC(self):
169 response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
170 self.assertEqual(response.status_code,
171 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
173 def test_004_connect_rdmA(self):
174 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
175 self.assertEqual(response.status_code,
176 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
178 def test_005_connect_rdmB(self):
179 response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
180 self.assertEqual(response.status_code,
181 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
183 def test_006_connect_rdmC(self):
184 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
185 self.assertEqual(response.status_code,
186 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
188 def test_007_connect_sprdA_2_N1_to_roadmA_PP3(self):
189 response = test_utils_rfc8040.transportpce_api_rpc_request(
190 'transportpce-networkutils', 'init-xpdr-rdm-links',
191 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '1',
192 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
193 self.assertEqual(response['status_code'], requests.codes.ok)
194 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
197 def test_008_connect_roadmA_PP3_to_spdrA_2_N1(self):
198 response = test_utils_rfc8040.transportpce_api_rpc_request(
199 'transportpce-networkutils', 'init-rdm-xpdr-links',
200 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '1',
201 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
202 self.assertEqual(response['status_code'], requests.codes.ok)
203 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
206 def test_009_connect_sprdC_2_N1_to_roadmC_PP3(self):
207 response = test_utils_rfc8040.transportpce_api_rpc_request(
208 'transportpce-networkutils', 'init-xpdr-rdm-links',
209 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '2', 'network-num': '1',
210 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
211 self.assertEqual(response['status_code'], requests.codes.ok)
212 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
215 def test_010_connect_roadmC_PP3_to_spdrC_2_N1(self):
216 response = test_utils_rfc8040.transportpce_api_rpc_request(
217 'transportpce-networkutils', 'init-rdm-xpdr-links',
218 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '2', 'network-num': '1',
219 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
220 self.assertEqual(response['status_code'], requests.codes.ok)
221 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
224 def test_011_connect_sprdB_2_N1_to_roadmB_PP1(self):
225 response = test_utils_rfc8040.transportpce_api_rpc_request(
226 'transportpce-networkutils', 'init-xpdr-rdm-links',
227 {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '1',
228 'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
229 self.assertEqual(response['status_code'], requests.codes.ok)
230 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
233 def test_012_connect_roadmB_PP1_to_spdrB_2_N1(self):
234 response = test_utils_rfc8040.transportpce_api_rpc_request(
235 'transportpce-networkutils', 'init-rdm-xpdr-links',
236 {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '1',
237 'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
238 self.assertEqual(response['status_code'], requests.codes.ok)
239 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
242 def test_013_connect_sprdB_2_N2_to_roadmB_PP2(self):
243 response = test_utils_rfc8040.transportpce_api_rpc_request(
244 'transportpce-networkutils', 'init-xpdr-rdm-links',
245 {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '2',
246 'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
247 self.assertEqual(response['status_code'], requests.codes.ok)
248 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
251 def test_014_connect_roadmB_PP2_to_spdrB_2_N2(self):
252 response = test_utils_rfc8040.transportpce_api_rpc_request(
253 'transportpce-networkutils', 'init-rdm-xpdr-links',
254 {'links-input': {'xpdr-node': 'SPDR-SB1', 'xpdr-num': '2', 'network-num': '2',
255 'rdm-node': 'ROADM-B1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
256 self.assertEqual(response['status_code'], requests.codes.ok)
257 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
260 def test_015_add_omsAttributes_ROADMA_ROADMB(self):
261 # Config ROADMA-ROADMB oms-attributes
263 "auto-spanloss": "true",
264 "spanloss-base": 11.4,
265 "spanloss-current": 12,
266 "engineered-spanloss": 12.2,
267 "link-concatenation": [{
270 "SRLG-length": 100000,
272 response = test_utils_rfc8040.add_oms_attr_request("ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX",
274 self.assertEqual(response.status_code, requests.codes.created)
276 def test_016_add_omsAttributes_ROADMB_ROADMA(self):
277 # Config ROADMB-ROADMA oms-attributes
279 "auto-spanloss": "true",
280 "spanloss-base": 11.4,
281 "spanloss-current": 12,
282 "engineered-spanloss": 12.2,
283 "link-concatenation": [{
286 "SRLG-length": 100000,
288 response = test_utils_rfc8040.add_oms_attr_request("ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX",
290 self.assertEqual(response.status_code, requests.codes.created)
292 def test_017_add_omsAttributes_ROADMB_ROADMC(self):
293 # Config ROADMB-ROADMC oms-attributes
295 "auto-spanloss": "true",
296 "spanloss-base": 11.4,
297 "spanloss-current": 12,
298 "engineered-spanloss": 12.2,
299 "link-concatenation": [{
302 "SRLG-length": 100000,
304 response = test_utils_rfc8040.add_oms_attr_request("ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX",
306 self.assertEqual(response.status_code, requests.codes.created)
308 def test_018_add_omsAttributes_ROADMC_ROADMB(self):
309 # Config ROADMC-ROADMB oms-attributes
311 "auto-spanloss": "true",
312 "spanloss-base": 11.4,
313 "spanloss-current": 12,
314 "engineered-spanloss": 12.2,
315 "link-concatenation": [{
318 "SRLG-length": 100000,
320 response = test_utils_rfc8040.add_oms_attr_request("ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX",
322 self.assertEqual(response.status_code, requests.codes.created)
324 def test_019_create_OTS_ROADMA_DEG1(self):
325 response = test_utils_rfc8040.transportpce_api_rpc_request(
326 'transportpce-device-renderer', 'create-ots-oms',
328 'node-id': 'ROADM-A1',
329 'logical-connection-point': 'DEG1-TTP-TXRX'
332 self.assertEqual(response['status_code'], requests.codes.ok)
333 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-A1',
334 response["output"]["result"])
336 def test_020_create_OTS_ROADMB_DEG1(self):
337 response = test_utils_rfc8040.transportpce_api_rpc_request(
338 'transportpce-device-renderer', 'create-ots-oms',
340 'node-id': 'ROADM-B1',
341 'logical-connection-point': 'DEG1-TTP-TXRX'
344 self.assertEqual(response['status_code'], requests.codes.ok)
345 self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADM-B1',
346 response["output"]["result"])
348 def test_021_create_OTS_ROADMB_DEG2(self):
349 response = test_utils_rfc8040.transportpce_api_rpc_request(
350 'transportpce-device-renderer', 'create-ots-oms',
352 'node-id': 'ROADM-B1',
353 'logical-connection-point': 'DEG2-TTP-TXRX'
356 self.assertEqual(response['status_code'], requests.codes.ok)
357 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-B1',
358 response["output"]["result"])
360 def test_022_create_OTS_ROADMC_DEG2(self):
361 response = test_utils_rfc8040.transportpce_api_rpc_request(
362 'transportpce-device-renderer', 'create-ots-oms',
364 'node-id': 'ROADM-C1',
365 'logical-connection-point': 'DEG2-TTP-TXRX'
368 self.assertEqual(response['status_code'], requests.codes.ok)
369 self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
370 response["output"]["result"])
372 def test_023_calculate_span_loss_base_all(self):
373 response = test_utils_rfc8040.transportpce_api_rpc_request(
374 'transportpce-olm', 'calculate-spanloss-base',
378 self.assertEqual(response['status_code'], requests.codes.ok)
379 self.assertIn('Success', response["output"]["result"])
382 "link-id": "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX"
383 }, response["output"]["spans"])
386 "link-id": "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX"
387 }, response["output"]["spans"])
390 "link-id": "ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX"
391 }, response["output"]["spans"])
394 "link-id": "ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX"
395 }, response["output"]["spans"])
398 "link-id": "ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX"
399 }, response["output"]["spans"])
402 "link-id": "ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX"
403 }, response["output"]["spans"])
406 def test_024_check_otn_topology(self):
407 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
408 self.assertEqual(response['status_code'], requests.codes.ok)
409 self.assertEqual(len(response['network'][0]['node']), 9, 'There should be 9 nodes')
410 self.assertNotIn('ietf-network-topology:link', response['network'][0],
411 'otn-topology should have no link')
413 # test service-create for OCH-OTU4 service from spdrA to spdrB
414 def test_025_create_OCH_OTU4_service_AB(self):
415 response = test_utils_rfc8040.transportpce_api_rpc_request(
416 'org-openroadm-service', 'service-create',
417 self.cr_serv_input_data)
418 self.assertEqual(response['status_code'], requests.codes.ok)
419 self.assertIn('PCE calculation in progress',
420 response['output']['configuration-response-common']['response-message'])
421 time.sleep(self.WAITING)
423 def test_026_get_OCH_OTU4_service_AB(self):
424 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AB")
425 self.assertEqual(response['status_code'], requests.codes.ok)
426 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
427 self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-AB')
428 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
429 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
432 # Check correct configuration of devices
433 def test_027_check_interface_och_spdra(self):
434 response = test_utils_rfc8040.check_node_attribute_request(
435 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-761:768')
436 self.assertEqual(response['status_code'], requests.codes.ok)
437 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
438 'administrative-state': 'inService',
439 'supporting-circuit-pack-name': 'CP5-CFP',
440 'type': 'org-openroadm-interfaces:opticalChannel',
441 'supporting-port': 'CP5-CFP-P1'
442 }, **response['interface'][0]),
443 response['interface'][0])
445 self.assertEqual('org-openroadm-common-types:R100G',
446 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
447 self.assertEqual('dp-qpsk',
448 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
449 self.assertEqual(196.1,
450 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
453 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
455 def test_028_check_interface_OTU4_spdra(self):
456 response = test_utils_rfc8040.check_node_attribute_request(
457 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
458 self.assertEqual(response['status_code'], requests.codes.ok)
459 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
460 'administrative-state': 'inService',
461 'supporting-circuit-pack-name': 'CP5-CFP',
462 'supporting-interface': 'XPDR2-NETWORK1-761:768',
463 'type': 'org-openroadm-interfaces:otnOtu',
464 'supporting-port': 'CP5-CFP-P1'
466 input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
467 'expected-dapi': 'AOQxIv+6nCD+',
468 'tx-dapi': 'X+8cRNi+HbE=',
469 'expected-sapi': 'X+8cRNi+HbE=',
470 'rate': 'org-openroadm-otn-common-types:OTU4',
473 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
474 response['interface'][0])
475 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
476 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
478 response2 = test_utils_rfc8040.check_node_attribute2_request(
479 'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
480 self.assertEqual(response2['status_code'], requests.codes.ok)
481 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
482 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
483 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
484 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
486 def test_029_check_interface_och_spdrB(self):
487 response = test_utils_rfc8040.check_node_attribute_request(
488 'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-761:768')
489 self.assertEqual(response['status_code'], requests.codes.ok)
490 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-761:768',
491 'administrative-state': 'inService',
492 'supporting-circuit-pack-name': 'CP5-CFP',
493 'type': 'org-openroadm-interfaces:opticalChannel',
494 'supporting-port': 'CP5-CFP-P1'
495 }, **response['interface'][0]),
496 response['interface'][0])
498 self.assertEqual('org-openroadm-common-types:R100G',
499 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
500 self.assertEqual('dp-qpsk',
501 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
502 self.assertEqual(196.1,
503 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
506 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
508 def test_030_check_interface_OTU4_spdrB(self):
509 response = test_utils_rfc8040.check_node_attribute_request(
510 'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-OTU')
511 self.assertEqual(response['status_code'], requests.codes.ok)
512 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
513 'administrative-state': 'inService',
514 'supporting-circuit-pack-name': 'CP5-CFP',
515 'supporting-interface': 'XPDR2-NETWORK1-761:768',
516 'type': 'org-openroadm-interfaces:otnOtu',
517 'supporting-port': 'CP5-CFP-P1'
519 input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
520 'expected-sapi': 'AOQxIv+6nCD+',
521 'tx-sapi': 'X+8cRNi+HbE=',
522 'expected-dapi': 'X+8cRNi+HbE=',
523 'rate': 'org-openroadm-otn-common-types:OTU4',
526 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
527 response['interface'][0])
528 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
529 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
531 response2 = test_utils_rfc8040.check_node_attribute2_request(
532 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
533 self.assertEqual(response2['status_code'], requests.codes.ok)
534 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
535 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
536 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
537 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
539 def test_031_check_no_interface_ODU4_spdra(self):
540 response = test_utils_rfc8040.check_node_attribute_request(
541 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
542 self.assertEqual(response['status_code'], requests.codes.conflict)
543 self.assertIn(response['interface'], (
545 "error-type": "protocol",
546 "error-tag": "data-missing",
548 "Request could not be completed because the relevant data "
549 "model content does not exist"
551 "error-type": "application",
552 "error-tag": "data-missing",
554 "Request could not be completed because the relevant data "
555 "model content does not exist"
558 def test_032_check_openroadm_topo_spdra(self):
559 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
560 self.assertEqual(response['status_code'], requests.codes.ok)
561 ele = response['node']['ietf-network-topology:termination-point'][0]
562 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
565 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
568 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
569 self.assertEqual('ROADM-A1-SRG1--SRG1-PP3-TXRX',
570 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
573 def test_033_check_openroadm_topo_ROADMA_SRG(self):
574 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
575 self.assertEqual(response['status_code'], requests.codes.ok)
576 freq_map = base64.b64decode(
577 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
578 freq_map_array = [int(x) for x in freq_map]
579 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
580 liste_tp = response['node']['ietf-network-topology:termination-point']
582 if ele['tp-id'] == 'SRG1-PP3-TXRX':
583 freq_map = base64.b64decode(
584 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
585 freq_map_array = [int(x) for x in freq_map]
586 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
587 if ele['tp-id'] == 'SRG1-PP2-TXRX':
588 self.assertNotIn('avail-freq-maps', dict.keys(ele))
591 def test_034_check_openroadm_topo_ROADMA_DEG1(self):
592 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG1', 'config')
593 self.assertEqual(response['status_code'], requests.codes.ok)
594 freq_map = base64.b64decode(
595 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
596 freq_map_array = [int(x) for x in freq_map]
597 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
598 liste_tp = response['node']['ietf-network-topology:termination-point']
600 if ele['tp-id'] == 'DEG1-CTP-TXRX':
601 freq_map = base64.b64decode(
602 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
603 freq_map_array = [int(x) for x in freq_map]
604 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
605 if ele['tp-id'] == 'DEG1-TTP-TXRX':
606 freq_map = base64.b64decode(
607 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
608 freq_map_array = [int(x) for x in freq_map]
609 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
612 def test_035_check_otn_topo_otu4_links(self):
613 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
614 self.assertEqual(response['status_code'], requests.codes.ok)
615 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
616 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
617 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1']
618 for link in response['network'][0]['ietf-network-topology:link']:
619 self.assertIn(link['link-id'], listLinkId)
621 link['transportpce-networkutils:otn-link-type'], 'OTU4')
623 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
625 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
627 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
629 link['org-openroadm-common-network:opposite-link'], listLinkId)
632 # test service-create for OCH-OTU4 service from spdrB to spdrC
635 def test_036_create_OCH_OTU4_service_BC(self):
636 self.cr_serv_input_data["service-name"] = "service-OCH-OTU4-BC"
637 self.cr_serv_input_data["service-a-end"]["node-id"] = "SPDR-SB1"
638 self.cr_serv_input_data["service-a-end"]["clli"] = "NodeSB"
639 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
640 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
641 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SB1-XPDR2"
642 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK2"
643 self.cr_serv_input_data["service-z-end"]["node-id"] = "SPDR-SC1"
644 self.cr_serv_input_data["service-z-end"]["clli"] = "NodeSC"
645 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
646 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
647 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
648 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
649 response = test_utils_rfc8040.transportpce_api_rpc_request(
650 'org-openroadm-service', 'service-create',
651 self.cr_serv_input_data)
652 self.assertEqual(response['status_code'], requests.codes.ok)
653 self.assertIn('PCE calculation in progress',
654 response['output']['configuration-response-common']['response-message'])
655 time.sleep(self.WAITING)
657 def test_037_get_OCH_OTU4_service_BC(self):
658 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-BC")
659 self.assertEqual(response['status_code'], requests.codes.ok)
660 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
661 self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-BC')
662 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
663 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
666 # Check correct configuration of devices
667 def test_038_check_interface_och_spdrB(self):
668 response = test_utils_rfc8040.check_node_attribute_request(
669 'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-753:760')
670 self.assertEqual(response['status_code'], requests.codes.ok)
671 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
672 'administrative-state': 'inService',
673 'supporting-circuit-pack-name': 'CP6-CFP',
674 'type': 'org-openroadm-interfaces:opticalChannel',
675 'supporting-port': 'CP1-CFP0-P1'
676 }, **response['interface'][0]),
677 response['interface'][0])
679 self.assertEqual('org-openroadm-common-types:R100G',
680 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
681 self.assertEqual('dp-qpsk',
682 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
683 self.assertEqual(196.05,
684 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
687 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
689 def test_039_check_interface_OTU4_spdrB(self):
690 response = test_utils_rfc8040.check_node_attribute_request(
691 'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-OTU')
692 self.assertEqual(response['status_code'], requests.codes.ok)
693 input_dict_1 = {'name': 'XPDR2-NETWORK2-OTU',
694 'administrative-state': 'inService',
695 'supporting-circuit-pack-name': 'CP6-CFP',
696 'supporting-interface': 'XPDR2-NETWORK1-753:760',
697 'type': 'org-openroadm-interfaces:otnOtu',
698 'supporting-port': 'CP6-CFP-P1'
700 input_dict_2 = {'tx-sapi': 'X+8cRNi+HbI=',
701 'expected-dapi': 'X+8cRNi+HbI=',
702 'tx-dapi': 'ALvne1QI5jo4',
703 'expected-sapi': 'ALvne1QI5jo4',
704 'rate': 'org-openroadm-otn-common-types:OTU4',
707 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
708 response['interface'][0])
709 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
710 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
712 response2 = test_utils_rfc8040.check_node_attribute2_request(
713 'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
714 self.assertEqual(response2['status_code'], requests.codes.ok)
715 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
716 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
717 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
718 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
720 def test_040_check_interface_och_spdrC(self):
721 response = test_utils_rfc8040.check_node_attribute_request(
722 'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-753:760')
723 self.assertEqual(response['status_code'], requests.codes.ok)
724 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-753:760',
725 'administrative-state': 'inService',
726 'supporting-circuit-pack-name': 'CP5-CFP',
727 'type': 'org-openroadm-interfaces:opticalChannel',
728 'supporting-port': 'CP5-CFP-P1'
729 }, **response['interface'][0]),
730 response['interface'][0])
732 self.assertEqual('org-openroadm-common-types:R100G',
733 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
734 self.assertEqual('dp-qpsk',
735 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
736 self.assertEqual(196.05,
737 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
740 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
742 def test_041_check_interface_OTU4_spdrC(self):
743 response = test_utils_rfc8040.check_node_attribute_request(
744 'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU')
745 self.assertEqual(response['status_code'], requests.codes.ok)
746 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
747 'administrative-state': 'inService',
748 'supporting-circuit-pack-name': 'CP5-CFP',
749 'supporting-interface': 'XPDR2-NETWORK1-753:760',
750 'type': 'org-openroadm-interfaces:otnOtu',
751 'supporting-port': 'CP5-CFP-P1'
753 input_dict_2 = {'tx-dapi': 'X+8cRNi+HbI=',
754 'expected-sapi': 'X+8cRNi+HbI=',
755 'tx-sapi': 'ALvne1QI5jo4',
756 'expected-dapi': 'ALvne1QI5jo4',
757 'rate': 'org-openroadm-otn-common-types:OTU4',
760 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
761 response['interface'][0])
762 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
763 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
765 response2 = test_utils_rfc8040.check_node_attribute2_request(
766 'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-OTU', 'org-openroadm-otn-otu-interfaces:otu')
767 self.assertEqual(response2['status_code'], requests.codes.ok)
768 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
769 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
770 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
771 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
773 def test_042_check_no_interface_ODU4_spdrB(self):
774 response = test_utils_rfc8040.check_node_attribute_request(
775 'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-ODU4')
776 self.assertEqual(response['status_code'], requests.codes.conflict)
777 self.assertIn(response['interface'], (
779 "error-type": "protocol",
780 "error-tag": "data-missing",
782 "Request could not be completed because the relevant data "
783 "model content does not exist"
785 "error-type": "application",
786 "error-tag": "data-missing",
788 "Request could not be completed because the relevant data "
789 "model content does not exist"
792 def test_043_check_openroadm_topo_spdrB(self):
793 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SB1-XPDR2', 'config')
794 self.assertEqual(response['status_code'], requests.codes.ok)
795 liste_tp = response['node']['ietf-network-topology:termination-point']
796 # pylint: disable=consider-using-f-string
798 if ele['tp-id'] == 'XPDR2-NETWORK1':
801 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
804 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
805 self.assertEqual('ROADM-B1-SRG1--SRG1-PP1-TXRX',
806 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
807 elif ele['tp-id'] == 'XPDR2-NETWORK2':
810 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
813 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
814 self.assertEqual('ROADM-B1-SRG1--SRG1-PP2-TXRX',
815 ele['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'])
817 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
820 def test_044_check_openroadm_topo_ROADMB_SRG1(self):
821 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
822 self.assertEqual(response['status_code'], requests.codes.ok)
823 freq_map = base64.b64decode(
824 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
825 freq_map_array = [int(x) for x in freq_map]
826 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
827 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
828 self.assertEqual(freq_map_array[93], 255, "Lambda 3 should be available")
829 liste_tp = response['node']['ietf-network-topology:termination-point']
831 if ele['tp-id'] == 'SRG1-PP1-TXRX':
832 freq_map = base64.b64decode(
833 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
834 freq_map_array = [int(x) for x in freq_map]
835 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
836 if ele['tp-id'] == 'SRG1-PP2-TXRX':
837 freq_map = base64.b64decode(
838 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
839 freq_map_array = [int(x) for x in freq_map]
840 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
841 if ele['tp-id'] == 'SRG1-PP3-TXRX':
842 self.assertNotIn('avail-freq-maps', dict.keys(ele))
845 def test_045_check_openroadm_topo_ROADMB_DEG2(self):
846 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
847 self.assertEqual(response['status_code'], requests.codes.ok)
848 freq_map = base64.b64decode(
849 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
850 freq_map_array = [int(x) for x in freq_map]
851 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
852 liste_tp = response['node']['ietf-network-topology:termination-point']
854 if ele['tp-id'] == 'DEG2-CTP-TXRX':
855 freq_map = base64.b64decode(
856 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
857 freq_map_array = [int(x) for x in freq_map]
858 self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
859 if ele['tp-id'] == 'DEG2-TTP-TXRX':
860 freq_map = base64.b64decode(
861 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
862 freq_map_array = [int(x) for x in freq_map]
863 self.assertEqual(freq_map_array[94], 0, "Lambda 1 should not be available")
866 def test_046_check_otn_topo_otu4_links(self):
867 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
868 self.assertEqual(response['status_code'], requests.codes.ok)
869 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
870 listLinkId = ['OTU4-SPDR-SA1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK1',
871 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK1toSPDR-SA1-XPDR2-XPDR2-NETWORK1',
872 'OTU4-SPDR-SB1-XPDR2-XPDR2-NETWORK2toSPDR-SC1-XPDR2-XPDR2-NETWORK1',
873 'OTU4-SPDR-SC1-XPDR2-XPDR2-NETWORK1toSPDR-SB1-XPDR2-XPDR2-NETWORK2']
874 for link in response['network'][0]['ietf-network-topology:link']:
875 self.assertIn(link['link-id'], listLinkId)
877 link['transportpce-networkutils:otn-link-type'], 'OTU4')
879 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
881 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
883 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
885 link['org-openroadm-common-network:opposite-link'], listLinkId)
888 # test service-create for 100GE service from spdrA to spdrC via spdrB
890 def test_047_create_100GE_service_ABC(self):
891 self.cr_serv_input_data["service-name"] = "service-100GE-ABC"
892 self.cr_serv_input_data["connection-type"] = "service"
893 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
894 self.cr_serv_input_data["service-a-end"]["node-id"] = "SPDR-SA1"
895 self.cr_serv_input_data["service-a-end"]["clli"] = "NodeSA"
896 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
897 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
898 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
899 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
900 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
901 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
902 self.cr_serv_input_data["service-z-end"]["node-id"] = "SPDR-SC1"
903 self.cr_serv_input_data["service-z-end"]["clli"] = "NodeSC"
904 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
905 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
906 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
907 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
908 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
909 response = test_utils_rfc8040.transportpce_api_rpc_request(
910 'org-openroadm-service', 'service-create',
911 self.cr_serv_input_data)
912 self.assertEqual(response['status_code'], requests.codes.ok)
913 self.assertIn('PCE calculation in progress',
914 response['output']['configuration-response-common']['response-message'])
915 time.sleep(self.WAITING)
917 def test_048_get_100GE_service_ABC(self):
918 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-100GE-ABC")
919 self.assertEqual(response['status_code'], requests.codes.ok)
920 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
921 self.assertEqual(response['services'][0]['service-name'], 'service-100GE-ABC')
922 self.assertEqual(response['services'][0]['connection-type'], 'service')
923 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
926 def test_049_check_interface_100GE_CLIENT_spdra(self):
927 response = test_utils_rfc8040.check_node_attribute_request(
928 'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
929 self.assertEqual(response['status_code'], requests.codes.ok)
930 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
931 'administrative-state': 'inService',
932 'supporting-circuit-pack-name': 'CP2-QSFP1',
933 'type': 'org-openroadm-interfaces:ethernetCsmacd',
934 'supporting-port': 'CP2-QSFP1-P1'
936 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
937 response['interface'][0])
938 self.assertEqual(100000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
939 self.assertEqual('off', response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['fec'])
941 def test_050_check_interface_ODU4_CLIENT_spdra(self):
942 response = test_utils_rfc8040.check_node_attribute_request(
943 'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
944 self.assertEqual(response['status_code'], requests.codes.ok)
945 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
946 'administrative-state': 'inService',
947 'supporting-circuit-pack-name': 'CP2-QSFP1',
948 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
949 'type': 'org-openroadm-interfaces:otnOdu',
950 'supporting-port': 'CP2-QSFP1-P1'}
951 # SAPI/DAPI are added in the Otu4 renderer
953 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
954 'rate': 'org-openroadm-otn-common-types:ODU4',
955 'monitoring-mode': 'terminated',
956 'expected-dapi': 'AItaZ6nmyaKJ',
957 'expected-sapi': 'AKFnJJaijWiz',
958 'tx-dapi': 'AKFnJJaijWiz',
959 'tx-sapi': 'AItaZ6nmyaKJ'}
961 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
962 response['interface'][0])
963 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
965 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
967 self.assertDictEqual(
968 {'payload-type': '21', 'exp-payload-type': '21'},
969 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
971 response2 = test_utils_rfc8040.check_node_attribute_request(
972 'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ODU4')
973 self.assertEqual(response['status_code'], requests.codes.ok)
974 self.assertEqual(input_dict_2['tx-sapi'],
975 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-dapi'])
976 self.assertEqual(input_dict_2['tx-sapi'],
977 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-sapi'])
978 self.assertEqual(input_dict_2['tx-dapi'],
979 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-sapi'])
980 self.assertEqual(input_dict_2['tx-dapi'],
981 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-dapi'])
983 def test_051_check_interface_ODU4_NETWORK_spdra(self):
984 response = test_utils_rfc8040.check_node_attribute_request(
985 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
986 self.assertEqual(response['status_code'], requests.codes.ok)
987 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
988 'administrative-state': 'inService',
989 'supporting-circuit-pack-name': 'CP5-CFP',
990 'type': 'org-openroadm-interfaces:otnOdu',
991 'supporting-port': 'CP5-CFP-P1',
993 'description': 'TBD'}
995 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
996 'rate': 'org-openroadm-otn-common-types:ODU4',
997 'monitoring-mode': 'monitored'}
999 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1000 response['interface'][0])
1001 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1003 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1005 self.assertNotIn('opu',
1006 dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1008 def test_052_check_ODU4_connection_spdra(self):
1009 response = test_utils_rfc8040.check_node_attribute_request(
1010 'SPDR-SA1', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
1011 self.assertEqual(response['status_code'], requests.codes.ok)
1014 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1015 'direction': 'bidirectional'
1018 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1019 response['odu-connection'][0])
1020 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1021 response['odu-connection'][0]['destination'])
1022 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1023 response['odu-connection'][0]['source'])
1025 def test_053_check_interface_100GE_CLIENT_spdrc(self):
1026 response = test_utils_rfc8040.check_node_attribute_request(
1027 'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
1028 self.assertEqual(response['status_code'], requests.codes.ok)
1029 input_dict = {'name': 'XPDR2-CLIENT1-ETHERNET',
1030 'administrative-state': 'inService',
1031 'supporting-circuit-pack-name': 'CP2-QSFP1',
1032 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1033 'supporting-port': 'CP2-QSFP1-P1'
1035 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1036 response['interface'][0])
1037 self.assertEqual(100000, response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1038 self.assertEqual('off', response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['fec'])
1040 def test_054_check_interface_ODU4_CLIENT_spdrc(self):
1041 response = test_utils_rfc8040.check_node_attribute_request(
1042 'SPDR-SC1', 'interface', 'XPDR2-CLIENT1-ODU4')
1043 self.assertEqual(response['status_code'], requests.codes.ok)
1044 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
1045 'administrative-state': 'inService',
1046 'supporting-circuit-pack-name': 'CP2-QSFP1',
1047 'supporting-interface': 'XPDR2-CLIENT1-ETHERNET',
1048 'type': 'org-openroadm-interfaces:otnOdu',
1049 'supporting-port': 'CP2-QSFP1-P1',
1050 'circuit-id': 'TBD',
1051 'description': 'TBD'}
1053 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
1054 'rate': 'org-openroadm-otn-common-types:ODU4',
1055 'monitoring-mode': 'terminated',
1056 'expected-dapi': 'AKFnJJaijWiz',
1057 'expected-sapi': 'AItaZ6nmyaKJ',
1058 'tx-dapi': 'AItaZ6nmyaKJ',
1059 'tx-sapi': 'AKFnJJaijWiz'}
1061 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1062 response['interface'][0])
1063 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1065 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1067 self.assertDictEqual(
1068 {'payload-type': '21', 'exp-payload-type': '21'},
1069 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1071 response2 = test_utils_rfc8040.check_node_attribute_request(
1072 'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
1073 self.assertEqual(response['status_code'], requests.codes.ok)
1074 self.assertEqual(input_dict_2['tx-sapi'],
1075 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-dapi'])
1076 self.assertEqual(input_dict_2['tx-sapi'],
1077 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-sapi'])
1078 self.assertEqual(input_dict_2['tx-dapi'],
1079 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['tx-sapi'])
1080 self.assertEqual(input_dict_2['tx-dapi'],
1081 response2['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['expected-dapi'])
1083 def test_055_check_interface_ODU4_NETWORK_spdrc(self):
1084 response = test_utils_rfc8040.check_node_attribute_request(
1085 'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-ODU4')
1086 self.assertEqual(response['status_code'], requests.codes.ok)
1087 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1088 'administrative-state': 'inService',
1089 'supporting-circuit-pack-name': 'CP5-CFP',
1090 'type': 'org-openroadm-interfaces:otnOdu',
1091 'supporting-port': 'CP5-CFP-P1',
1092 'circuit-id': 'TBD',
1093 'description': 'TBD'}
1095 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1096 'rate': 'org-openroadm-otn-common-types:ODU4',
1097 'monitoring-mode': 'monitored'}
1099 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1100 response['interface'][0])
1101 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1103 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1105 self.assertNotIn('opu',
1106 dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1108 def test_056_check_ODU4_connection_spdrc(self):
1109 response = test_utils_rfc8040.check_node_attribute_request(
1110 'SPDR-SC1', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
1111 self.assertEqual(response['status_code'], requests.codes.ok)
1114 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1115 'direction': 'bidirectional'
1118 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1119 response['odu-connection'][0])
1120 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
1121 response['odu-connection'][0]['destination'])
1122 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
1123 response['odu-connection'][0]['source'])
1125 def test_057_check_interface_ODU4_NETWORK1_spdrb(self):
1126 response = test_utils_rfc8040.check_node_attribute_request(
1127 'SPDR-SB1', 'interface', 'XPDR2-NETWORK1-ODU4')
1128 self.assertEqual(response['status_code'], requests.codes.ok)
1129 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
1130 'administrative-state': 'inService',
1131 'supporting-circuit-pack-name': 'CP5-CFP',
1132 'type': 'org-openroadm-interfaces:otnOdu',
1133 'supporting-port': 'CP5-CFP-P1'}
1135 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1136 'rate': 'org-openroadm-otn-common-types:ODU4',
1137 'monitoring-mode': 'monitored'}
1139 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1140 response['interface'][0])
1141 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1143 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1145 self.assertNotIn('opu',
1146 dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1148 def test_058_check_interface_ODU4_NETWORK2_spdrb(self):
1149 response = test_utils_rfc8040.check_node_attribute_request(
1150 'SPDR-SB1', 'interface', 'XPDR2-NETWORK2-ODU4')
1151 self.assertEqual(response['status_code'], requests.codes.ok)
1152 input_dict_1 = {'name': 'XPDR2-NETWORK2-ODU4',
1153 'administrative-state': 'inService',
1154 'supporting-circuit-pack-name': 'CP6-CFP',
1155 'type': 'org-openroadm-interfaces:otnOdu',
1156 'supporting-port': 'CP6-CFP-P1'}
1158 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1159 'rate': 'org-openroadm-otn-common-types:ODU4',
1160 'monitoring-mode': 'monitored'}
1162 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1163 response['interface'][0])
1164 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
1166 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
1168 self.assertNotIn('opu',
1169 dict.keys(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']))
1171 def test_059_check_ODU4_connection_spdrb(self):
1172 response = test_utils_rfc8040.check_node_attribute_request(
1173 'SPDR-SB1', 'odu-connection', 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4')
1174 self.assertEqual(response['status_code'], requests.codes.ok)
1177 'XPDR2-NETWORK1-ODU4-x-XPDR2-NETWORK2-ODU4',
1178 'direction': 'bidirectional'
1181 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1182 response['odu-connection'][0])
1183 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK2-ODU4'},
1184 response['odu-connection'][0]['destination'])
1185 self.assertDictEqual({'src-if': 'XPDR2-NETWORK1-ODU4'},
1186 response['odu-connection'][0]['source'])
1188 def test_060_check_otn_topo_links(self):
1189 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1190 self.assertEqual(response['status_code'], requests.codes.ok)
1191 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1192 for link in response['network'][0]['ietf-network-topology:link']:
1194 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1196 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1198 def test_061_delete_service_100GE_ABC(self):
1199 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE-ABC"
1200 response = test_utils_rfc8040.transportpce_api_rpc_request(
1201 'org-openroadm-service', 'service-delete',
1202 self.del_serv_input_data)
1203 self.assertEqual(response['status_code'], requests.codes.ok)
1204 self.assertIn('Renderer service delete in progress',
1205 response['output']['configuration-response-common']['response-message'])
1206 time.sleep(self.WAITING)
1208 def test_062_check_service_list(self):
1209 response = test_utils_rfc8040.get_ordm_serv_list_request()
1210 self.assertEqual(response['status_code'], requests.codes.ok)
1211 self.assertEqual(len(response['service-list']['services']), 2)
1214 def test_063_check_no_ODU4_connection_spdra(self):
1215 response = test_utils_rfc8040.check_node_request("SPDR-SA1")
1216 self.assertEqual(response['status_code'], requests.codes.ok)
1217 self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
1220 def test_064_check_no_interface_ODU4_NETWORK_spdra(self):
1221 response = test_utils_rfc8040.check_node_attribute_request(
1222 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-ODU4')
1223 self.assertEqual(response['status_code'], requests.codes.conflict)
1225 def test_065_check_no_interface_ODU4_CLIENT_spdra(self):
1226 response = test_utils_rfc8040.check_node_attribute_request(
1227 'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ODU4')
1228 self.assertEqual(response['status_code'], requests.codes.conflict)
1230 def test_066_check_no_interface_100GE_CLIENT_spdra(self):
1231 response = test_utils_rfc8040.check_node_attribute_request(
1232 'SPDR-SA1', 'interface', 'XPDR2-CLIENT1-ETHERNET')
1233 self.assertEqual(response['status_code'], requests.codes.conflict)
1235 def test_067_check_otn_topo_links(self):
1236 self.test_046_check_otn_topo_otu4_links()
1238 def test_068_delete_OCH_OTU4_service_AB(self):
1239 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-OCH-OTU4-AB"
1240 response = test_utils_rfc8040.transportpce_api_rpc_request(
1241 'org-openroadm-service', 'service-delete',
1242 self.del_serv_input_data)
1243 self.assertEqual(response['status_code'], requests.codes.ok)
1244 self.assertIn('Renderer service delete in progress',
1245 response['output']['configuration-response-common']['response-message'])
1246 time.sleep(self.WAITING)
1248 def test_069_delete_OCH_OTU4_service_BC(self):
1249 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-OCH-OTU4-BC"
1250 response = test_utils_rfc8040.transportpce_api_rpc_request(
1251 'org-openroadm-service', 'service-delete',
1252 self.del_serv_input_data)
1253 self.assertEqual(response['status_code'], requests.codes.ok)
1254 self.assertIn('Renderer service delete in progress',
1255 response['output']['configuration-response-common']['response-message'])
1256 time.sleep(self.WAITING)
1258 def test_070_get_no_service(self):
1259 response = test_utils_rfc8040.get_ordm_serv_list_request()
1260 self.assertEqual(response['status_code'], requests.codes.conflict)
1261 self.assertIn(response['service-list'], (
1263 "error-type": "protocol",
1264 "error-tag": "data-missing",
1266 "Request could not be completed because the relevant data "
1267 "model content does not exist"
1269 "error-type": "application",
1270 "error-tag": "data-missing",
1272 "Request could not be completed because the relevant data "
1273 "model content does not exist"
1277 def test_071_check_no_interface_OTU4_spdra(self):
1278 response = test_utils_rfc8040.check_node_attribute_request(
1279 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
1280 self.assertEqual(response['status_code'], requests.codes.conflict)
1282 def test_072_check_no_interface_OCH_spdra(self):
1283 response = test_utils_rfc8040.check_node_attribute_request(
1284 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-761:768')
1285 self.assertEqual(response['status_code'], requests.codes.conflict)
1287 def test_073_getLinks_OtnTopology(self):
1288 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1289 self.assertEqual(response['status_code'], requests.codes.ok)
1290 self.assertNotIn('ietf-network-topology:link', response['network'][0])
1292 def test_074_check_openroadm_topo_spdra(self):
1293 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR2', 'config')
1294 self.assertEqual(response['status_code'], requests.codes.ok)
1295 tp = response['node']['ietf-network-topology:termination-point'][0]
1296 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1297 self.assertNotIn('wavelength', dict.keys(
1298 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1301 def test_075_check_openroadm_topo_ROADMB_SRG1(self):
1302 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-SRG1', 'config')
1303 self.assertEqual(response['status_code'], requests.codes.ok)
1304 freq_map = base64.b64decode(
1305 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1306 freq_map_array = [int(x) for x in freq_map]
1307 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1308 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1309 liste_tp = response['node']['ietf-network-topology:termination-point']
1310 for ele in liste_tp:
1311 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1312 freq_map = base64.b64decode(
1313 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1314 freq_map_array = [int(x) for x in freq_map]
1315 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1316 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1317 freq_map = base64.b64decode(
1318 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1319 freq_map_array = [int(x) for x in freq_map]
1320 self.assertEqual(freq_map_array[94], 255, "Lambda 1 should be available")
1323 def test_076_check_openroadm_topo_ROADMB_DEG1(self):
1324 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG1', 'config')
1325 self.assertEqual(response['status_code'], requests.codes.ok)
1326 freq_map = base64.b64decode(
1327 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1328 freq_map_array = [int(x) for x in freq_map]
1329 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1330 liste_tp = response['node']['ietf-network-topology:termination-point']
1331 for ele in liste_tp:
1332 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1333 freq_map = base64.b64decode(
1334 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1335 freq_map_array = [int(x) for x in freq_map]
1336 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1337 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1338 freq_map = base64.b64decode(
1339 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1340 freq_map_array = [int(x) for x in freq_map]
1341 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1344 def test_077_check_openroadm_topo_ROADMB_DEG2(self):
1345 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-B1-DEG2', 'config')
1346 self.assertEqual(response['status_code'], requests.codes.ok)
1347 freq_map = base64.b64decode(
1348 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1349 freq_map_array = [int(x) for x in freq_map]
1350 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1351 liste_tp = response['node']['ietf-network-topology:termination-point']
1352 for ele in liste_tp:
1353 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1354 freq_map = base64.b64decode(
1355 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1356 freq_map_array = [int(x) for x in freq_map]
1357 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1358 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1359 freq_map = base64.b64decode(
1360 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1361 freq_map_array = [int(x) for x in freq_map]
1362 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1365 def test_078_disconnect_xponders_from_roadm(self):
1366 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1367 self.assertEqual(response['status_code'], requests.codes.ok)
1368 links = response['network'][0]['ietf-network-topology:link']
1370 if (link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT')
1371 and ('SPDR-SB1' in link['link-id'] or 'ROADM-B1' in link['link-id'])):
1372 response = test_utils_rfc8040.del_ietf_network_link_request(
1373 'openroadm-topology', link['link-id'], 'config')
1374 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1376 def test_079_disconnect_spdrB(self):
1377 response = test_utils_rfc8040.unmount_device("SPDR-SB1")
1378 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1380 def test_080_disconnect_roadmB(self):
1381 response = test_utils_rfc8040.unmount_device("ROADM-B1")
1382 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1384 def test_081_remove_roadm_to_roadm_links(self):
1385 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1386 self.assertEqual(response['status_code'], requests.codes.ok)
1387 links = response['network'][0]['ietf-network-topology:link']
1389 if (link["org-openroadm-common-network:link-type"] == "ROADM-TO-ROADM"
1390 and 'ROADM-B1' in link['link-id']):
1391 response = test_utils_rfc8040.del_ietf_network_link_request(
1392 'openroadm-topology', link['link-id'], 'config')
1393 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1395 def test_082_add_omsAttributes_ROADMA_ROADMC(self):
1396 # Config ROADMA-ROADMC oms-attributes
1398 "auto-spanloss": "true",
1399 "spanloss-base": 11.4,
1400 "spanloss-current": 12,
1401 "engineered-spanloss": 12.2,
1402 "link-concatenation": [{
1404 "fiber-type": "smf",
1405 "SRLG-length": 100000,
1407 response = test_utils_rfc8040.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX",
1409 self.assertEqual(response.status_code, requests.codes.created)
1411 def test_083_add_omsAttributes_ROADMC_ROADMA(self):
1412 # Config ROADMC-ROADMA oms-attributes
1414 "auto-spanloss": "true",
1415 "spanloss-base": 11.4,
1416 "spanloss-current": 12,
1417 "engineered-spanloss": 12.2,
1418 "link-concatenation": [{
1420 "fiber-type": "smf",
1421 "SRLG-length": 100000,
1423 response = test_utils_rfc8040.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX",
1425 self.assertEqual(response.status_code, requests.codes.created)
1427 def test_084_create_OCH_OTU4_service_AC(self):
1428 self.cr_serv_input_data["service-name"] = "service-OCH-OTU4-AC"
1429 self.cr_serv_input_data["connection-type"] = "infrastructure"
1430 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
1431 self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
1432 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1433 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1434 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1435 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1436 self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1437 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1438 self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1439 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1440 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1441 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1442 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-NETWORK1"
1443 self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1444 response = test_utils_rfc8040.transportpce_api_rpc_request(
1445 'org-openroadm-service', 'service-create',
1446 self.cr_serv_input_data)
1447 self.assertEqual(response['status_code'], requests.codes.ok)
1448 self.assertIn('PCE calculation in progress',
1449 response['output']['configuration-response-common']['response-message'])
1450 time.sleep(self.WAITING)
1452 def test_085_get_OCH_OTU4_service_AC(self):
1453 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-OCH-OTU4-AC")
1454 self.assertEqual(response['status_code'], requests.codes.ok)
1455 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1456 self.assertEqual(response['services'][0]['service-name'], 'service-OCH-OTU4-AC')
1457 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
1458 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1461 # test service-create for 100GE service from spdrA to spdrC via spdrB
1462 def test_086_create_100GE_service_AC(self):
1463 self.cr_serv_input_data["service-name"] = "service-100GE-AC"
1464 self.cr_serv_input_data["connection-type"] = "service"
1465 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1466 self.cr_serv_input_data["service-a-end"]["node-id"] = "SPDR-SA1"
1467 self.cr_serv_input_data["service-a-end"]["clli"] = "NodeSA"
1468 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1469 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1470 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1471 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR2"
1472 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1473 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1474 self.cr_serv_input_data["service-z-end"]["node-id"] = "SPDR-SC1"
1475 self.cr_serv_input_data["service-z-end"]["clli"] = "NodeSC"
1476 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1477 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1478 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1479 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR2"
1480 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
1482 response = test_utils_rfc8040.transportpce_api_rpc_request(
1483 'org-openroadm-service', 'service-create',
1484 self.cr_serv_input_data)
1485 self.assertEqual(response['status_code'], requests.codes.ok)
1486 self.assertIn('PCE calculation in progress',
1487 response['output']['configuration-response-common']['response-message'])
1488 time.sleep(self.WAITING)
1490 def test_087_get_100GE_service_AC(self):
1491 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-100GE-AC")
1492 self.assertEqual(response['status_code'], requests.codes.ok)
1493 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1494 self.assertEqual(response['services'][0]['service-name'], 'service-100GE-AC')
1495 self.assertEqual(response['services'][0]['connection-type'], 'service')
1496 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1499 def test_088_check_interface_OTU4_spdra(self):
1500 response = test_utils_rfc8040.check_node_attribute_request(
1501 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU')
1502 self.assertEqual(response['status_code'], requests.codes.ok)
1503 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1504 'administrative-state': 'inService',
1505 'supporting-circuit-pack-name': 'CP5-CFP',
1506 'supporting-interface': 'XPDR2-NETWORK1-761:768',
1507 'type': 'org-openroadm-interfaces:otnOtu',
1508 'supporting-port': 'CP5-CFP-P1'
1510 input_dict_2 = {'tx-sapi': 'AOQxIv+6nCD+',
1511 'expected-dapi': 'AOQxIv+6nCD+',
1512 'tx-dapi': 'ALvne1QI5jo4',
1513 'expected-sapi': 'ALvne1QI5jo4',
1514 'rate': 'org-openroadm-otn-common-types:OTU4',
1517 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1518 response['interface'][0])
1519 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1520 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1522 response2 = test_utils_rfc8040.check_node_attribute2_request(
1523 'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
1524 self.assertEqual(response2['status_code'], requests.codes.ok)
1525 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
1526 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
1527 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
1528 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
1530 def test_089_check_interface_OTU4_spdrC(self):
1531 response = test_utils_rfc8040.check_node_attribute_request(
1532 'SPDR-SC1', 'interface', 'XPDR2-NETWORK1-OTU')
1533 self.assertEqual(response['status_code'], requests.codes.ok)
1534 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTU',
1535 'administrative-state': 'inService',
1536 'supporting-circuit-pack-name': 'CP5-CFP',
1537 'supporting-interface': 'XPDR2-NETWORK1-753:760',
1538 'type': 'org-openroadm-interfaces:otnOtu',
1539 'supporting-port': 'CP5-CFP-P1'
1541 input_dict_2 = {'tx-dapi': 'AOQxIv+6nCD+',
1542 'expected-sapi': 'AOQxIv+6nCD+',
1543 'tx-sapi': 'ALvne1QI5jo4',
1544 'expected-dapi': 'ALvne1QI5jo4',
1545 'rate': 'org-openroadm-otn-common-types:OTU4',
1548 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1549 response['interface'][0])
1550 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1551 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1553 response2 = test_utils_rfc8040.check_node_attribute2_request(
1554 'SPDR-SA1', 'interface', 'XPDR2-NETWORK1-OTU', 'org-openroadm-otn-otu-interfaces:otu')
1555 self.assertEqual(response2['status_code'], requests.codes.ok)
1556 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-dapi'])
1557 self.assertEqual(input_dict_2['tx-sapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-sapi'])
1558 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['tx-sapi'])
1559 self.assertEqual(input_dict_2['tx-dapi'], response2['org-openroadm-otn-otu-interfaces:otu']['expected-dapi'])
1561 def test_090_check_configuration_spdra(self):
1562 self.test_049_check_interface_100GE_CLIENT_spdra()
1563 self.test_050_check_interface_ODU4_CLIENT_spdra()
1564 self.test_051_check_interface_ODU4_NETWORK_spdra()
1565 self.test_052_check_ODU4_connection_spdra()
1567 def test_091_check_configuration_spdrc(self):
1568 self.test_053_check_interface_100GE_CLIENT_spdrc()
1569 self.test_054_check_interface_ODU4_CLIENT_spdrc()
1570 self.test_055_check_interface_ODU4_NETWORK_spdrc()
1571 self.test_056_check_ODU4_connection_spdrc()
1573 def test_092_check_otn_topo_links(self):
1574 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1575 self.assertEqual(response['status_code'], requests.codes.ok)
1576 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
1577 for link in response['network'][0]['ietf-network-topology:link']:
1579 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
1581 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1583 def test_093_delete_100GE_service_AC(self):
1584 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE-AC"
1585 response = test_utils_rfc8040.transportpce_api_rpc_request(
1586 'org-openroadm-service', 'service-delete',
1587 self.del_serv_input_data)
1588 self.assertEqual(response['status_code'], requests.codes.ok)
1589 self.assertIn('Renderer service delete in progress',
1590 response['output']['configuration-response-common']['response-message'])
1591 time.sleep(self.WAITING)
1593 def test_094_check_service_list(self):
1594 response = test_utils_rfc8040.get_ordm_serv_list_request()
1595 self.assertEqual(response['status_code'], requests.codes.ok)
1596 self.assertEqual(len(response['service-list']['services']), 1)
1599 def test_095_check_configuration_spdra(self):
1600 self.test_063_check_no_ODU4_connection_spdra()
1601 self.test_064_check_no_interface_ODU4_NETWORK_spdra()
1602 self.test_065_check_no_interface_ODU4_CLIENT_spdra()
1603 self.test_066_check_no_interface_100GE_CLIENT_spdra()
1605 def test_096_check_otn_topo_links(self):
1606 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1607 self.assertEqual(response['status_code'], requests.codes.ok)
1608 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
1609 for link in response['network'][0]['ietf-network-topology:link']:
1611 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1613 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1615 def test_097_disconnect_xponders_from_roadm(self):
1616 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1617 self.assertEqual(response['status_code'], requests.codes.ok)
1618 links = response['network'][0]['ietf-network-topology:link']
1620 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1621 response = test_utils_rfc8040.del_ietf_network_link_request(
1622 'openroadm-topology', link['link-id'], 'config')
1623 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1625 def test_098_disconnect_spdrA(self):
1626 response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1627 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1629 def test_099_disconnect_spdrC(self):
1630 response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1631 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1633 def test_100_disconnect_roadmA(self):
1634 response = test_utils_rfc8040.unmount_device("ROADM-A1")
1635 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1637 def test_101_disconnect_roadmC(self):
1638 response = test_utils_rfc8040.unmount_device("ROADM-C1")
1639 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1642 if __name__ == "__main__":
1643 unittest.main(verbosity=2)