3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
21 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils_rfc8040 # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION_221 = '2.2.1'
34 NODE_VERSION_71 = '7.1'
36 cr_serv_input_data = {
37 "sdnc-request-header": {
38 "request-id": "request-1",
39 "rpc-action": "service-create",
40 "request-system-id": "appname"
42 "service-name": "service1-OTUC4",
43 "common-id": "commonId",
44 "connection-type": "infrastructure",
46 "service-rate": "400",
48 "service-format": "OTU",
49 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
53 "port-device-name": "XPDR-A2-XPDR2",
55 "port-name": "XPDR2-NETWORK1",
56 "port-rack": "000000.00",
57 "port-shelf": "Chassis#1"
60 "lgx-device-name": "Some lgx-device-name",
61 "lgx-port-name": "Some lgx-port-name",
62 "lgx-port-rack": "000000.00",
63 "lgx-port-shelf": "00"
69 "port-device-name": "XPDR-A2-XPDR2",
71 "port-name": "XPDR2-NETWORK1",
72 "port-rack": "000000.00",
73 "port-shelf": "Chassis#1"
76 "lgx-device-name": "Some lgx-device-name",
77 "lgx-port-name": "Some lgx-port-name",
78 "lgx-port-rack": "000000.00",
79 "lgx-port-shelf": "00"
86 "service-rate": "400",
88 "service-format": "OTU",
89 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
93 "port-device-name": "XPDR-C2-XPDR2",
95 "port-name": "XPDR2-NETWORK1",
96 "port-rack": "000000.00",
97 "port-shelf": "Chassis#1"
100 "lgx-device-name": "Some lgx-device-name",
101 "lgx-port-name": "Some lgx-port-name",
102 "lgx-port-rack": "000000.00",
103 "lgx-port-shelf": "00"
109 "port-device-name": "XPDR-C2-XPDR2",
110 "port-type": "fixed",
111 "port-name": "XPDR2-NETWORK1",
112 "port-rack": "000000.00",
113 "port-shelf": "Chassis#1"
116 "lgx-device-name": "Some lgx-device-name",
117 "lgx-port-name": "Some lgx-port-name",
118 "lgx-port-rack": "000000.00",
119 "lgx-port-shelf": "00"
125 "due-date": "2018-06-15T00:00:01Z",
126 "operator-contact": "pw1234"
129 del_serv_input_data = {
130 "sdnc-request-header": {
131 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
132 "rpc-action": "service-delete",
133 "request-system-id": "appname",
134 "notification-url": "http://localhost:8585/NotificationServer/notify"},
135 "service-delete-req-info": {
136 "service-name": "TBD",
137 "tail-retention": "no"}
142 cls.processes = test_utils_rfc8040.start_tpce()
143 cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION_71),
144 ('roadma', cls.NODE_VERSION_221),
145 ('roadmc', cls.NODE_VERSION_221),
146 ('xpdrc2', cls.NODE_VERSION_71)])
149 def tearDownClass(cls):
150 # pylint: disable=not-an-iterable
151 for process in cls.processes:
152 test_utils_rfc8040.shutdown_process(process)
153 print("all processes killed")
158 def test_01_connect_xpdra2(self):
159 response = test_utils_rfc8040.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
160 self.assertEqual(response.status_code,
161 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
163 def test_02_connect_xpdrc2(self):
164 response = test_utils_rfc8040.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
165 self.assertEqual(response.status_code,
166 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
168 def test_03_connect_rdma(self):
169 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
170 self.assertEqual(response.status_code,
171 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
173 def test_04_connect_rdmc(self):
174 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
175 self.assertEqual(response.status_code,
176 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
178 def test_05_connect_xprda2_2_N1_to_roadma_PP2(self):
179 response = test_utils_rfc8040.transportpce_api_rpc_request(
180 'transportpce-networkutils', 'init-xpdr-rdm-links',
181 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
182 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
183 self.assertEqual(response['status_code'], requests.codes.ok)
184 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
186 def test_06_connect_roadma_PP2_to_xpdra2_2_N1(self):
187 response = test_utils_rfc8040.transportpce_api_rpc_request(
188 'transportpce-networkutils', 'init-rdm-xpdr-links',
189 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
190 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
191 self.assertEqual(response['status_code'], requests.codes.ok)
192 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
194 def test_07_connect_xprdc2_2_N1_to_roadmc_PP2(self):
195 response = test_utils_rfc8040.transportpce_api_rpc_request(
196 'transportpce-networkutils', 'init-xpdr-rdm-links',
197 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
198 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
199 self.assertEqual(response['status_code'], requests.codes.ok)
200 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
202 def test_08_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
203 response = test_utils_rfc8040.transportpce_api_rpc_request(
204 'transportpce-networkutils', 'init-rdm-xpdr-links',
205 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
206 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
207 self.assertEqual(response['status_code'], requests.codes.ok)
208 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
210 def test_09_add_omsAttributes_roadma_roadmc(self):
211 # Config ROADMA-ROADMC oms-attributes
213 "auto-spanloss": "true",
214 "spanloss-base": 11.4,
215 "spanloss-current": 12,
216 "engineered-spanloss": 12.2,
217 "link-concatenation": [{
220 "SRLG-length": 100000,
222 response = test_utils_rfc8040.add_oms_attr_request(
223 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224 self.assertEqual(response.status_code, requests.codes.created)
226 def test_10_add_omsAttributes_roadmc_roadma(self):
227 # Config ROADMC-ROADMA oms-attributes
229 "auto-spanloss": "true",
230 "spanloss-base": 11.4,
231 "spanloss-current": 12,
232 "engineered-spanloss": 12.2,
233 "link-concatenation": [{
236 "SRLG-length": 100000,
238 response = test_utils_rfc8040.add_oms_attr_request(
239 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240 self.assertEqual(response.status_code, requests.codes.created)
242 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
243 def test_11_check_otn_topology(self):
244 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
245 self.assertEqual(response['status_code'], requests.codes.ok)
246 self.assertEqual(len(response['network'][0]['node']), 6, 'There should be 6 nodes')
247 self.assertNotIn('ietf-network-topology:link', response['network'][0])
249 def test_12_create_OTUC4_service(self):
250 response = test_utils_rfc8040.transportpce_api_rpc_request(
251 'org-openroadm-service', 'service-create',
252 self.cr_serv_input_data)
253 self.assertEqual(response['status_code'], requests.codes.ok)
254 self.assertIn('PCE calculation in progress',
255 response['output']['configuration-response-common']['response-message'])
256 time.sleep(self.WAITING)
258 def test_13_get_OTUC4_service1(self):
259 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
260 "services", "service1-OTUC4")
261 self.assertEqual(response['status_code'], requests.codes.ok)
262 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
263 self.assertEqual(response['services'][0]['service-name'], 'service1-OTUC4')
264 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
265 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
266 self.assertEqual(response['services'][0]['service-layer'], 'wdm')
267 self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
268 self.assertEqual(response['services'][0]['service-a-end']['otu-service-rate'],
269 'org-openroadm-otn-common-types:OTUCn')
270 self.assertEqual(response['services'][0]['service-z-end']['otu-service-rate'],
271 'org-openroadm-otn-common-types:OTUCn')
273 # Check correct configuration of devices
274 def test_14_check_interface_otsi_xpdra2(self):
275 response = test_utils_rfc8040.check_node_attribute_request(
276 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
277 self.assertEqual(response['status_code'], requests.codes.ok)
278 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
279 'administrative-state': 'inService',
280 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
281 'type': 'org-openroadm-interfaces:otsi',
282 'supporting-port': 'L1'
283 }, **response['interface'][0]),
284 response['interface'][0])
285 self.assertDictEqual(
286 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
287 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
288 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
289 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
291 def test_15_check_interface_OTSI_GROUP_xpdra2(self):
292 response = test_utils_rfc8040.check_node_attribute_request(
293 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
294 self.assertEqual(response['status_code'], requests.codes.ok)
295 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
296 'administrative-state': 'inService',
297 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
298 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
299 'type': 'org-openroadm-interfaces:otsi-group',
300 'supporting-port': 'L1'
302 input_dict_2 = {"group-id": 1,
303 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
305 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
306 response['interface'][0])
307 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
308 ['org-openroadm-otsi-group-interfaces:otsi-group']),
309 response['interface'][0]
310 ['org-openroadm-otsi-group-interfaces:otsi-group'])
312 def test_16_check_interface_OTUC4_xpdra2(self):
313 response = test_utils_rfc8040.check_node_attribute_request(
314 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
315 self.assertEqual(response['status_code'], requests.codes.ok)
316 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
317 'administrative-state': 'inService',
318 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
319 'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
320 'type': 'org-openroadm-interfaces:otnOtu',
321 'supporting-port': 'L1'
323 input_dict_2 = {'tx-sapi': 'G54UFNImtOE=',
324 'expected-dapi': 'G54UFNImtOE=',
325 'tx-dapi': 'J/FIUzQc+4M=',
326 'expected-sapi': 'J/FIUzQc+4M=',
327 'rate': 'org-openroadm-otn-common-types:OTUCn',
328 'degthr-percentage': 100,
332 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
333 response['interface'][0])
334 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
335 response['interface'][0]
336 ['org-openroadm-otn-otu-interfaces:otu'])
338 def test_17_check_interface_otsi_xpdrc2(self):
339 response = test_utils_rfc8040.check_node_attribute_request(
340 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-755:768')
341 self.assertEqual(response['status_code'], requests.codes.ok)
342 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
343 'administrative-state': 'inService',
344 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
345 'type': 'org-openroadm-interfaces:otsi',
346 'supporting-port': 'L1'
347 }, **response['interface'][0]),
348 response['interface'][0])
350 self.assertDictEqual(
351 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
352 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
353 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
354 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
356 def test_18_check_interface_OTSI_GROUP_xpdrc2(self):
357 response = test_utils_rfc8040.check_node_attribute_request(
358 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
359 self.assertEqual(response['status_code'], requests.codes.ok)
360 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
361 'administrative-state': 'inService',
362 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
363 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
364 'type': 'org-openroadm-interfaces:otsi-group',
365 'supporting-port': 'L1'
367 input_dict_2 = {"group-id": 1,
368 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
370 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
371 response['interface'][0])
372 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
373 ['org-openroadm-otsi-group-interfaces:otsi-group']),
374 response['interface'][0]
375 ['org-openroadm-otsi-group-interfaces:otsi-group'])
377 def test_19_check_interface_OTUC4_xpdrc2(self):
378 response = test_utils_rfc8040.check_node_attribute_request(
379 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTUC4')
380 self.assertEqual(response['status_code'], requests.codes.ok)
381 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
382 'administrative-state': 'inService',
383 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
384 'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
385 'type': 'org-openroadm-interfaces:otnOtu',
386 'supporting-port': 'L1'
388 input_dict_2 = {'tx-dapi': 'G54UFNImtOE=',
389 'expected-sapi': 'G54UFNImtOE=',
390 'tx-sapi': 'J/FIUzQc+4M=',
391 'expected-dapi': 'J/FIUzQc+4M=',
392 'rate': 'org-openroadm-otn-common-types:OTUCn',
393 'degthr-percentage': 100,
398 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
399 response['interface'][0])
401 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
402 response['interface'][0]
403 ['org-openroadm-otn-otu-interfaces:otu'])
405 def test_20_check_no_interface_ODUC4_xpdra2(self):
406 response = test_utils_rfc8040.check_node_attribute_request(
407 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
408 self.assertEqual(response['status_code'], requests.codes.conflict)
410 def test_21_check_openroadm_topo_xpdra2(self):
411 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
412 self.assertEqual(response['status_code'], requests.codes.ok)
413 ele = response['node']['ietf-network-topology:termination-point'][0]
414 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
417 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
420 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
422 def test_22_check_otn_topo_OTUC4_links(self):
423 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
424 self.assertEqual(response['status_code'], requests.codes.ok)
425 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
426 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
427 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
428 for link in response['network'][0]['ietf-network-topology:link']:
429 self.assertIn(link['link-id'], listLinkId)
430 self.assertEqual(link['transportpce-networkutils:otn-link-type'], 'OTUC4')
431 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
432 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
433 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
434 self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
436 # test service-create for ODU4 service from xpdra2 to xpdrc2
437 def test_23_create_ODUC4_service(self):
438 self.cr_serv_input_data["service-name"] = "service1-ODUC4"
439 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
440 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
441 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
442 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
443 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
444 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
445 response = test_utils_rfc8040.transportpce_api_rpc_request(
446 'org-openroadm-service', 'service-create',
447 self.cr_serv_input_data)
448 self.assertEqual(response['status_code'], requests.codes.ok)
449 self.assertIn('PCE calculation in progress',
450 response['output']['configuration-response-common']['response-message'])
451 time.sleep(self.WAITING)
453 def test_24_get_ODUC4_service1(self):
454 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
455 "services", "service1-ODUC4")
456 self.assertEqual(response['status_code'], requests.codes.ok)
457 self.assertEqual(response['services'][0]['service-name'], 'service1-ODUC4')
458 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
459 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
460 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
461 self.assertEqual(response['services'][0]['service-layer'], 'wdm')
462 self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
463 self.assertEqual(response['services'][0]['service-a-end']['odu-service-rate'],
464 'org-openroadm-otn-common-types:ODUCn')
465 self.assertEqual(response['services'][0]['service-z-end']['odu-service-rate'],
466 'org-openroadm-otn-common-types:ODUCn')
468 def test_25_check_interface_ODUC4_xpdra2(self):
469 response = test_utils_rfc8040.check_node_attribute_request(
470 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
471 self.assertEqual(response['status_code'], requests.codes.ok)
472 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
473 'administrative-state': 'inService',
474 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
475 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
476 'type': 'org-openroadm-interfaces:otnOdu',
477 'supporting-port': 'L1'}
478 # SAPI/DAPI are added in the Otu4 renderer
479 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
480 'rate': 'org-openroadm-otn-common-types:ODUCn',
481 'expected-dapi': 'Nmbu2MNHvc4=',
482 'expected-sapi': 'Nmbu2MNHvc4=',
483 'tx-dapi': 'Nmbu2MNHvc4=',
484 'tx-sapi': 'LY9PxYJqUbw='}
486 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
487 response['interface'][0])
488 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
489 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
490 self.assertDictEqual(
491 {'payload-type': '22', 'exp-payload-type': '22'},
492 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
494 def test_26_check_interface_ODUC4_xpdrc2(self):
495 response = test_utils_rfc8040.check_node_attribute_request(
496 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODUC4')
497 self.assertEqual(response['status_code'], requests.codes.ok)
498 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
499 'administrative-state': 'inService',
500 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
501 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
502 'type': 'org-openroadm-interfaces:otnOdu',
503 'supporting-port': 'L1'}
504 # SAPI/DAPI are added in the Otu4 renderer
505 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
506 'rate': 'org-openroadm-otn-common-types:ODUCn',
507 'tx-sapi': 'Nmbu2MNHvc4=',
508 'tx-dapi': 'LY9PxYJqUbw=',
509 'expected-sapi': 'LY9PxYJqUbw=',
510 'expected-dapi': 'LY9PxYJqUbw='
512 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
513 response['interface'][0])
514 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
515 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
516 self.assertDictEqual(
517 {'payload-type': '22', 'exp-payload-type': '22'},
518 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
520 def test_27_check_otn_topo_links(self):
521 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
522 self.assertEqual(response['status_code'], requests.codes.ok)
523 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
524 for link in response['network'][0]['ietf-network-topology:link']:
525 linkId = link['link-id']
526 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
527 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
529 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
531 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
532 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
533 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
535 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
537 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
539 link['transportpce-networkutils:otn-link-type'], 'ODUC4')
541 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
542 self.assertIn(link['org-openroadm-common-network:opposite-link'],
543 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
544 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
546 self.fail("this link should not exist")
548 def test_28_check_otn_topo_tp(self):
549 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
550 self.assertEqual(response['status_code'], requests.codes.ok)
551 for node in response['network'][0]['node']:
552 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
553 tpList = node['ietf-network-topology:termination-point']
555 if tp['tp-id'] == 'XPDR2-NETWORK1':
556 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
557 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
559 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
560 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
561 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
563 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
564 def test_29_create_100GE_service_1(self):
565 self.cr_serv_input_data["service-name"] = "service-100GE"
566 self.cr_serv_input_data["connection-type"] = "service"
567 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
568 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
569 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
570 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
571 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
572 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
573 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
574 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
575 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
576 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
577 response = test_utils_rfc8040.transportpce_api_rpc_request(
578 'org-openroadm-service', 'service-create',
579 self.cr_serv_input_data)
580 self.assertEqual(response['status_code'], requests.codes.ok)
581 self.assertIn('PCE calculation in progress',
582 response['output']['configuration-response-common']['response-message'])
583 time.sleep(self.WAITING)
585 def test_30_get_100GE_service_1(self):
586 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
587 "services", "service-100GE")
588 self.assertEqual(response['status_code'], requests.codes.ok)
589 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
590 self.assertEqual(response['services'][0]['service-name'], 'service-100GE')
591 self.assertEqual(response['services'][0]['connection-type'], 'service')
592 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
594 def test_31_check_interface_100GE_CLIENT_xpdra2(self):
595 response = test_utils_rfc8040.check_node_attribute_request(
596 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
597 self.assertEqual(response['status_code'], requests.codes.ok)
598 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
599 'administrative-state': 'inService',
600 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
601 'type': 'org-openroadm-interfaces:ethernetCsmacd',
602 'supporting-port': 'C1'
604 input_dict_2 = {'speed': 100000}
605 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
606 response['interface'][0])
607 self.assertDictEqual(
608 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
609 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
611 def test_32_check_interface_ODU4_CLIENT_xpdra2(self):
612 response = test_utils_rfc8040.check_node_attribute_request(
613 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4')
614 self.assertEqual(response['status_code'], requests.codes.ok)
615 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
616 'administrative-state': 'inService',
617 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
618 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
619 'type': 'org-openroadm-interfaces:otnOdu',
620 'supporting-port': 'C1'}
622 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
623 'rate': 'org-openroadm-otn-common-types:ODU4',
624 'monitoring-mode': 'terminated'}
626 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
627 response['interface'][0])
628 self.assertDictEqual(dict(input_dict_2,
629 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
630 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
631 self.assertDictEqual(
632 {'payload-type': '07', 'exp-payload-type': '07'},
633 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
635 def test_33_check_interface_ODU4_NETWORK_xpdra2(self):
636 response = test_utils_rfc8040.check_node_attribute_request(
637 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4')
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
640 'administrative-state': 'inService',
641 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
642 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
643 'type': 'org-openroadm-interfaces:otnOdu',
644 'supporting-port': 'L1'}
646 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
647 'rate': 'org-openroadm-otn-common-types:ODU4',
648 'monitoring-mode': 'not-terminated'}
649 input_dict_3 = {'trib-port-number': 1}
651 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
652 response['interface'][0])
653 self.assertDictEqual(dict(input_dict_2,
654 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
655 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
656 self.assertDictEqual(dict(input_dict_3,
657 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
658 'parent-odu-allocation']),
659 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
660 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
661 ['opucn-trib-slots'])
662 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
663 ['opucn-trib-slots'])
665 def test_34_check_ODU4_connection_xpdra2(self):
666 response = test_utils_rfc8040.check_node_attribute_request(
667 'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
668 self.assertEqual(response['status_code'], requests.codes.ok)
671 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
672 'direction': 'bidirectional'
675 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
676 response['odu-connection'][0])
677 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
678 response['odu-connection'][0]['destination'])
679 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
680 response['odu-connection'][0]['source'])
682 def test_35_check_interface_100GE_CLIENT_xpdrc2(self):
683 response = test_utils_rfc8040.check_node_attribute_request(
684 'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
685 self.assertEqual(response['status_code'], requests.codes.ok)
686 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
687 'administrative-state': 'inService',
688 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
689 'type': 'org-openroadm-interfaces:ethernetCsmacd',
690 'supporting-port': 'C1'
692 input_dict_2 = {'speed': 100000}
693 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
694 response['interface'][0])
695 self.assertDictEqual(
696 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
697 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
699 def test_36_check_interface_ODU4_CLIENT_xpdrc2(self):
700 response = test_utils_rfc8040.check_node_attribute_request(
701 'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ODU4')
702 self.assertEqual(response['status_code'], requests.codes.ok)
703 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4',
704 'administrative-state': 'inService',
705 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
706 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
707 'type': 'org-openroadm-interfaces:otnOdu',
708 'supporting-port': 'C1'}
710 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
711 'rate': 'org-openroadm-otn-common-types:ODU4',
712 'monitoring-mode': 'terminated'}
714 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
715 response['interface'][0])
716 self.assertDictEqual(dict(input_dict_2,
717 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
718 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
719 self.assertDictEqual(
720 {'payload-type': '07', 'exp-payload-type': '07'},
721 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
723 def test_37_check_interface_ODU4_NETWORK_xpdrc2(self):
724 response = test_utils_rfc8040.check_node_attribute_request(
725 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4')
726 self.assertEqual(response['status_code'], requests.codes.ok)
727 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4',
728 'administrative-state': 'inService',
729 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
730 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
731 'type': 'org-openroadm-interfaces:otnOdu',
732 'supporting-port': 'C1'}
734 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
735 'rate': 'org-openroadm-otn-common-types:ODU4',
736 'monitoring-mode': 'not-terminated'}
738 input_dict_3 = {'trib-port-number': 1}
740 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
741 response['interface'][0])
742 self.assertDictEqual(dict(input_dict_2,
743 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
744 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
745 self.assertDictEqual(dict(input_dict_3,
746 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
747 'parent-odu-allocation']),
748 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
749 'parent-odu-allocation'])
751 response['interface'][0][
752 'org-openroadm-otn-odu-interfaces:odu'][
753 'parent-odu-allocation']['opucn-trib-slots'])
754 self.assertIn('1.20',
755 response['interface'][0][
756 'org-openroadm-otn-odu-interfaces:odu'][
757 'parent-odu-allocation']['opucn-trib-slots'])
759 def test_38_check_ODU4_connection_xpdrc2(self):
760 response = test_utils_rfc8040.check_node_attribute_request(
761 'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
762 self.assertEqual(response['status_code'], requests.codes.ok)
765 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
766 'direction': 'bidirectional'
769 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
770 response['odu-connection'][0])
771 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4'},
772 response['odu-connection'][0]['destination'])
773 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4'},
774 response['odu-connection'][0]['source'])
776 def test_39_check_otn_topo_links(self):
777 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
778 self.assertEqual(response['status_code'], requests.codes.ok)
779 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
780 for link in response['network'][0]['ietf-network-topology:link']:
781 linkId = link['link-id']
782 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
783 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
785 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
787 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
789 def test_40_check_otn_topo_tp(self):
790 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
791 self.assertEqual(response['status_code'], requests.codes.ok)
792 for node in response['network'][0]['node']:
793 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
794 tpList = node['ietf-network-topology:termination-point']
796 if tp['tp-id'] == 'XPDR2-NETWORK1':
797 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
798 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
799 tsPoolList = list(range(1, 20))
801 tsPoolList, xpdrTpPortConAt['ts-pool'])
803 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
805 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
807 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
808 def test_41_create_100GE_service_2(self):
809 self.cr_serv_input_data["service-name"] = "service-100GE2"
810 self.cr_serv_input_data["connection-type"] = "service"
811 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
812 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
813 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
814 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
815 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
816 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
817 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
818 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
819 response = test_utils_rfc8040.transportpce_api_rpc_request(
820 'org-openroadm-service', 'service-create',
821 self.cr_serv_input_data)
822 self.assertEqual(response['status_code'], requests.codes.ok)
823 self.assertIn('PCE calculation in progress',
824 response['output']['configuration-response-common']['response-message'])
825 time.sleep(self.WAITING)
827 def test_42_get_100GE_service_2(self):
828 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-100GE2")
829 self.assertEqual(response['status_code'], requests.codes.ok)
830 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
831 self.assertEqual(response['services'][0]['service-name'], 'service-100GE2')
832 self.assertEqual(response['services'][0]['connection-type'], 'service')
833 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
836 def test_43_check_service_list(self):
837 response = test_utils_rfc8040.get_ordm_serv_list_request()
838 self.assertEqual(response['status_code'], requests.codes.ok)
839 self.assertEqual(len(response['service-list']['services']), 4)
841 def test_44_check_otn_topo_links(self):
842 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
843 self.assertEqual(response['status_code'], requests.codes.ok)
844 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
845 for link in response['network'][0]['ietf-network-topology:link']:
846 linkId = link['link-id']
847 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
848 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
850 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
852 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
854 def test_45_check_otn_topo_tp(self):
855 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
856 self.assertEqual(response['status_code'], requests.codes.ok)
857 for node in response['network'][0]['node']:
858 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
859 tpList = node['ietf-network-topology:termination-point']
861 if tp['tp-id'] == 'XPDR2-NETWORK1':
862 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
863 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
864 tsPoolList = list(range(1, 40))
866 tsPoolList, xpdrTpPortConAt['ts-pool'])
868 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
870 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
872 def test_46_delete_100GE_service_2(self):
873 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE2"
874 response = test_utils_rfc8040.transportpce_api_rpc_request(
875 'org-openroadm-service', 'service-delete',
876 self.del_serv_input_data)
877 self.assertEqual(response['status_code'], requests.codes.ok)
878 self.assertIn('Renderer service delete in progress',
879 response['output']['configuration-response-common']['response-message'])
880 time.sleep(self.WAITING)
882 def test_47_delete_100GE_service_1(self):
883 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE"
884 response = test_utils_rfc8040.transportpce_api_rpc_request(
885 'org-openroadm-service', 'service-delete',
886 self.del_serv_input_data)
887 self.assertEqual(response['status_code'], requests.codes.ok)
888 self.assertIn('Renderer service delete in progress',
889 response['output']['configuration-response-common']['response-message'])
890 time.sleep(self.WAITING)
892 def test_48_check_service_list(self):
893 response = test_utils_rfc8040.get_ordm_serv_list_request()
894 self.assertEqual(response['status_code'], requests.codes.ok)
895 self.assertEqual(len(response['service-list']['services']), 2)
897 def test_49_check_no_ODU4_connection_xpdra2(self):
898 response = test_utils_rfc8040.check_node_request("XPDR-A2")
899 self.assertEqual(response['status_code'], requests.codes.ok)
900 self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
902 def test_50_check_no_interface_ODU4_NETWORK_xpdra2(self):
903 response = test_utils_rfc8040.check_node_attribute_request(
904 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4')
905 self.assertEqual(response['status_code'], requests.codes.conflict)
907 def test_51_check_no_interface_ODU4_CLIENT_xpdra2(self):
908 response = test_utils_rfc8040.check_node_attribute_request(
909 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4')
910 self.assertEqual(response['status_code'], requests.codes.conflict)
912 def test_52_check_no_interface_100GE_CLIENT_xpdra2(self):
913 response = test_utils_rfc8040.check_node_attribute_request(
914 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
915 self.assertEqual(response['status_code'], requests.codes.conflict)
917 def test_53_check_otn_topo_links(self):
918 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
919 self.assertEqual(response['status_code'], requests.codes.ok)
920 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
921 for link in response['network'][0]['ietf-network-topology:link']:
922 linkId = link['link-id']
923 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
924 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
926 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
928 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
930 def test_54_check_otn_topo_tp(self):
931 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
932 self.assertEqual(response['status_code'], requests.codes.ok)
933 for node in response['network'][0]['node']:
934 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
935 tpList = node['ietf-network-topology:termination-point']
937 if tp['tp-id'] == 'XPDR2-NETWORK1':
938 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
939 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
941 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
943 def test_55_delete_ODUC4_service(self):
944 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODUC4"
945 response = test_utils_rfc8040.transportpce_api_rpc_request(
946 'org-openroadm-service', 'service-delete',
947 self.del_serv_input_data)
948 self.assertEqual(response['status_code'], requests.codes.ok)
949 self.assertIn('Renderer service delete in progress',
950 response['output']['configuration-response-common']['response-message'])
951 time.sleep(self.WAITING)
953 def test_56_check_service_list(self):
954 response = test_utils_rfc8040.get_ordm_serv_list_request()
955 self.assertEqual(response['status_code'], requests.codes.ok)
956 self.assertEqual(len(response['service-list']['services']), 1)
958 def test_57_check_no_interface_ODU4_xpdra2(self):
959 response = test_utils_rfc8040.check_node_attribute_request(
960 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
961 self.assertEqual(response['status_code'], requests.codes.conflict)
963 def test_58_check_otn_topo_links(self):
964 self.test_22_check_otn_topo_OTUC4_links()
966 def test_59_check_otn_topo_tp(self):
967 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
968 self.assertEqual(response['status_code'], requests.codes.ok)
969 for node in response['network'][0]['node']:
970 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
971 tpList = node['ietf-network-topology:termination-point']
973 if tp['tp-id'] == 'XPDR2-NETWORK1':
974 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
977 def test_60_delete_OTUC4_service(self):
978 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OTUC4"
979 response = test_utils_rfc8040.transportpce_api_rpc_request(
980 'org-openroadm-service', 'service-delete',
981 self.del_serv_input_data)
982 self.assertEqual(response['status_code'], requests.codes.ok)
983 self.assertIn('Renderer service delete in progress',
984 response['output']['configuration-response-common']['response-message'])
985 time.sleep(self.WAITING)
987 def test_61_get_no_service(self):
988 response = test_utils_rfc8040.get_ordm_serv_list_request()
989 self.assertEqual(response['status_code'], requests.codes.conflict)
990 self.assertIn(response['service-list'], (
992 "error-type": "protocol",
993 "error-tag": "data-missing",
995 "Request could not be completed because the relevant data "
996 "model content does not exist"
998 "error-type": "application",
999 "error-tag": "data-missing",
1001 "Request could not be completed because the relevant data "
1002 "model content does not exist"
1005 def test_62_check_no_interface_OTUC4_xpdra2(self):
1006 response = test_utils_rfc8040.check_node_attribute_request(
1007 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
1008 self.assertEqual(response['status_code'], requests.codes.conflict)
1010 def test_63_check_no_interface_OTSI_xpdra2(self):
1011 response = test_utils_rfc8040.check_node_attribute_request(
1012 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
1013 self.assertEqual(response['status_code'], requests.codes.conflict)
1015 def test_64_check_no_interface_OTSIG_xpdra2(self):
1016 response = test_utils_rfc8040.check_node_attribute_request(
1017 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
1018 self.assertEqual(response['status_code'], requests.codes.conflict)
1020 def test_65_getLinks_OtnTopology(self):
1021 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1022 self.assertEqual(response['status_code'], requests.codes.ok)
1023 self.assertNotIn('ietf-network-topology:link', response['network'][0])
1025 def test_66_check_openroadm_topo_xpdra2(self):
1026 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
1027 self.assertEqual(response['status_code'], requests.codes.ok)
1028 tp = response['node']['ietf-network-topology:termination-point'][0]
1029 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1030 self.assertNotIn('wavelength', dict.keys(
1031 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1033 def test_67_check_openroadm_topology(self):
1034 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1035 self.assertEqual(response['status_code'], requests.codes.ok)
1036 links = response['network'][0]['ietf-network-topology:link']
1037 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1039 def test_68_connect_xprda2_1_N1_to_roadma_PP2(self):
1040 response = test_utils_rfc8040.transportpce_api_rpc_request(
1041 'transportpce-networkutils', 'init-xpdr-rdm-links',
1042 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1043 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1044 self.assertEqual(response['status_code'], requests.codes.ok)
1045 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1047 def test_69_connect_roadma_PP2_to_xpdra2_1_N1(self):
1048 response = test_utils_rfc8040.transportpce_api_rpc_request(
1049 'transportpce-networkutils', 'init-rdm-xpdr-links',
1050 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1051 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1052 self.assertEqual(response['status_code'], requests.codes.ok)
1053 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1055 def test_70_connect_xprdc2_1_N1_to_roadmc_PP2(self):
1056 response = test_utils_rfc8040.transportpce_api_rpc_request(
1057 'transportpce-networkutils', 'init-xpdr-rdm-links',
1058 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1059 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1060 self.assertEqual(response['status_code'], requests.codes.ok)
1061 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1063 def test_71_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
1064 response = test_utils_rfc8040.transportpce_api_rpc_request(
1065 'transportpce-networkutils', 'init-rdm-xpdr-links',
1066 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1067 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1068 self.assertEqual(response['status_code'], requests.codes.ok)
1069 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1071 # test service-create for 400GE service from xpdra2 to xpdrc2
1072 def test_72_create_400GE_service(self):
1073 self.cr_serv_input_data["service-name"] = "service-400GE"
1074 self.cr_serv_input_data["service-a-end"]["service-rate"] = "400"
1075 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1076 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1077 self.cr_serv_input_data["service-z-end"]["service-rate"] = "400"
1078 del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"]
1079 del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"]
1080 del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"]
1081 del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"]
1082 del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"]
1083 del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"]
1084 del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"]
1085 del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"]
1086 response = test_utils_rfc8040.transportpce_api_rpc_request(
1087 'org-openroadm-service', 'service-create',
1088 self.cr_serv_input_data)
1089 self.assertEqual(response['status_code'], requests.codes.ok)
1090 time.sleep(self.WAITING)
1091 self.assertIn('PCE calculation in progress',
1092 response['output']['configuration-response-common']['response-message'])
1093 time.sleep(self.WAITING)
1095 def test_73_get_400GE_service(self):
1096 response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", "service-400GE")
1097 self.assertEqual(response['status_code'], requests.codes.ok)
1098 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1099 self.assertEqual(response['services'][0]['service-name'], 'service-400GE')
1100 self.assertEqual(response['services'][0]['connection-type'], 'service')
1101 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1104 def test_74_check_xc1_roadma(self):
1105 response = test_utils_rfc8040.check_node_attribute_request(
1106 "ROADM-A1", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1107 self.assertEqual(response['status_code'], requests.codes.ok)
1108 self.assertDictEqual(
1110 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1111 'opticalControlMode': 'gainLoss',
1112 'target-output-power': -3.0
1113 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
1114 self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-nmc-755:768'}, response['roadm-connections'][0]['source'])
1115 self.assertDictEqual({'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'}, response['roadm-connections'][0]['destination'])
1118 def test_75_check_topo_xpdra2(self):
1119 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR1', 'config')
1120 self.assertEqual(response['status_code'], requests.codes.ok)
1121 liste_tp = response['node']['ietf-network-topology:termination-point']
1122 for ele in liste_tp:
1123 if ele['tp-id'] == 'XPDR1-NETWORK1':
1126 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
1129 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
1130 if ele['tp-id'] == 'XPDR1-CLIENT1':
1131 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1134 def test_76_check_topo_roadma_SRG1(self):
1135 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
1136 self.assertEqual(response['status_code'], requests.codes.ok)
1137 freq_map = base64.b64decode(
1138 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1139 freq_map_array = [int(x) for x in freq_map]
1140 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1141 liste_tp = response['node']['ietf-network-topology:termination-point']
1142 for ele in liste_tp:
1143 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1144 freq_map = base64.b64decode(
1145 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1146 freq_map_array = [int(x) for x in freq_map]
1147 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1148 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1149 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1152 def test_77_check_topo_roadma_DEG1(self):
1153 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
1154 self.assertEqual(response['status_code'], requests.codes.ok)
1155 freq_map = base64.b64decode(
1156 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1157 freq_map_array = [int(x) for x in freq_map]
1158 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1159 liste_tp = response['node']['ietf-network-topology:termination-point']
1160 for ele in liste_tp:
1161 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1162 freq_map = base64.b64decode(
1163 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1164 freq_map_array = [int(x) for x in freq_map]
1165 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1166 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1167 freq_map = base64.b64decode(
1168 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1169 freq_map_array = [int(x) for x in freq_map]
1170 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1173 def test_78_check_interface_400GE_CLIENT_xpdra2(self):
1174 response = test_utils_rfc8040.check_node_attribute_request(
1175 'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1176 self.assertEqual(response['status_code'], requests.codes.ok)
1177 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1178 'administrative-state': 'inService',
1179 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1180 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1181 'supporting-port': 'C1'
1183 input_dict_2 = {'speed': 400000}
1184 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1185 response['interface'][0])
1186 self.assertDictEqual(
1187 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1188 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1190 def test_79_check_interface_OTSI_xpdra2(self):
1191 response = test_utils_rfc8040.check_node_attribute_request(
1192 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1193 self.assertEqual(response['status_code'], requests.codes.ok)
1194 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1195 'administrative-state': 'inService',
1196 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1197 'type': 'org-openroadm-interfaces:otsi',
1198 'supporting-port': 'L1'}
1200 "frequency": 196.0812,
1201 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1202 "fec": "org-openroadm-common-types:ofec",
1203 "transmit-power": -5,
1204 "provision-mode": "explicit",
1205 "modulation-format": "dp-qam16"}
1207 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1208 response['interface'][0])
1209 self.assertDictEqual(
1210 dict(input_dict_2, **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1211 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1212 self.assertDictEqual(
1213 {"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1214 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1216 def test_80_check_interface_OTSI_GROUP_xpdra2(self):
1217 response = test_utils_rfc8040.check_node_attribute_request(
1218 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1219 self.assertEqual(response['status_code'], requests.codes.ok)
1220 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTSIGROUP-400G',
1221 'administrative-state': 'inService',
1222 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1223 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1224 'type': 'org-openroadm-interfaces:otsi-group',
1225 'supporting-port': 'L1'}
1226 input_dict_2 = {"group-id": 1,
1227 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1229 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1230 response['interface'][0])
1231 self.assertDictEqual(dict(input_dict_2,
1232 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1233 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1235 def test_81_check_interface_OTUC4_xpdra2(self):
1236 response = test_utils_rfc8040.check_node_attribute_request(
1237 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1238 self.assertEqual(response['status_code'], requests.codes.ok)
1239 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1240 'administrative-state': 'inService',
1241 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1242 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSIGROUP-400G',
1243 'type': 'org-openroadm-interfaces:otnOtu',
1244 'supporting-port': 'L1'}
1245 input_dict_2 = {"tx-sapi": "ANeUjNzWtDLV",
1246 "expected-dapi": "ANeUjNzWtDLV",
1247 'tx-dapi': 'AKsqPmWceByv',
1248 'expected-sapi': 'AKsqPmWceByv',
1249 "rate": "org-openroadm-otn-common-types:OTUCn",
1250 "degthr-percentage": 100,
1251 "tim-detect-mode": "Disabled",
1253 "degm-intervals": 2}
1255 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1256 response['interface'][0])
1257 self.assertDictEqual(dict(input_dict_2,
1258 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1259 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1261 def test_82_check_interface_ODUC4_xpdra2(self):
1262 response = test_utils_rfc8040.check_node_attribute_request(
1263 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1264 self.assertEqual(response['status_code'], requests.codes.ok)
1265 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1266 'administrative-state': 'inService',
1267 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1268 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTUC4',
1269 'type': 'org-openroadm-interfaces:otnOdu',
1270 'supporting-port': 'L1',
1271 'circuit-id': 'TBD',
1272 'description': 'TBD'}
1273 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1274 "tim-detect-mode": "Disabled",
1275 "degm-intervals": 2,
1276 "degthr-percentage": 100,
1277 "monitoring-mode": "terminated",
1278 "rate": "org-openroadm-otn-common-types:ODUCn",
1280 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1282 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1283 response['interface'][0])
1284 self.assertDictEqual(dict(input_dict_2,
1285 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1286 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1287 self.assertDictEqual(dict(input_dict_3,
1288 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1289 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1290 self.assertEqual('XPDR1-NETWORK1-OTUC4', response['interface'][0]['supporting-interface-list'][0])
1292 def test_82a_check_interface_ODUFLEX_xpdra2(self):
1293 response = test_utils_rfc8040.check_node_attribute_request(
1294 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUFLEX')
1295 self.assertEqual(response['status_code'], requests.codes.ok)
1296 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUFLEX',
1297 'administrative-state': 'inService',
1298 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1299 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-ODUC4',
1300 'type': 'org-openroadm-interfaces:otnOdu',
1301 'supporting-port': 'L1',
1302 'circuit-id': 'TBD',
1303 'description': 'TBD'}
1304 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
1305 "tim-detect-mode": "Disabled",
1306 "degm-intervals": 2,
1307 "degthr-percentage": 100,
1308 "monitoring-mode": "terminated",
1309 "rate": "org-openroadm-otn-common-types:ODUflex-cbr",
1310 "oduflex-cbr-service": "org-openroadm-otn-common-types:ODUflex-cbr-400G"
1312 input_dict_3 = {"exp-payload-type": "32", "payload-type": "32"}
1313 input_dict_4 = {'trib-port-number': 1}
1315 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1316 response['interface'][0])
1317 self.assertDictEqual(dict(input_dict_2,
1318 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1319 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1320 self.assertDictEqual(dict(input_dict_3,
1321 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1322 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1323 self.assertDictEqual(dict(input_dict_4,
1324 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1325 'parent-odu-allocation']),
1326 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1327 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1328 ['opucn-trib-slots'])
1329 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1330 ['opucn-trib-slots'])
1331 self.assertIn('2.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1332 ['opucn-trib-slots'])
1333 self.assertIn('2.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1334 ['opucn-trib-slots'])
1335 self.assertIn('3.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1336 ['opucn-trib-slots'])
1337 self.assertIn('3.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1338 ['opucn-trib-slots'])
1339 self.assertIn('4.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1340 ['opucn-trib-slots'])
1341 self.assertIn('4.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1342 ['opucn-trib-slots'])
1343 self.assertEqual('XPDR1-NETWORK1-ODUC4', response['interface'][0]['supporting-interface-list'][0])
1345 def test_83_delete_400GE_service(self):
1346 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-400GE"
1347 response = test_utils_rfc8040.transportpce_api_rpc_request(
1348 'org-openroadm-service', 'service-delete',
1349 self.del_serv_input_data)
1350 self.assertEqual(response['status_code'], requests.codes.ok)
1351 self.assertIn('Renderer service delete in progress',
1352 response['output']['configuration-response-common']['response-message'])
1353 time.sleep(self.WAITING)
1355 def test_84_get_no_service(self):
1356 response = test_utils_rfc8040.get_ordm_serv_list_request()
1357 self.assertEqual(response['status_code'], requests.codes.conflict)
1358 self.assertIn(response['service-list'], (
1360 "error-type": "protocol",
1361 "error-tag": "data-missing",
1363 "Request could not be completed because the relevant data "
1364 "model content does not exist"
1366 "error-type": "application",
1367 "error-tag": "data-missing",
1369 "Request could not be completed because the relevant data "
1370 "model content does not exist"
1374 def test_85_check_no_interface_ODUC4_xpdra2(self):
1375 response = test_utils_rfc8040.check_node_attribute_request(
1376 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1377 self.assertEqual(response['status_code'], requests.codes.conflict)
1379 def test_86_check_no_interface_OTUC4_xpdra2(self):
1380 response = test_utils_rfc8040.check_node_attribute_request(
1381 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1382 self.assertEqual(response['status_code'], requests.codes.conflict)
1384 def test_87_check_no_interface_OTSI_GROUP_xpdra2(self):
1385 response = test_utils_rfc8040.check_node_attribute_request(
1386 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1387 self.assertEqual(response['status_code'], requests.codes.conflict)
1389 def test_88_check_no_interface_OTSI_xpdra2(self):
1390 response = test_utils_rfc8040.check_node_attribute_request(
1391 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1392 self.assertEqual(response['status_code'], requests.codes.conflict)
1394 def test_89_check_no_interface_400GE_CLIENT_xpdra2(self):
1395 response = test_utils_rfc8040.check_node_attribute_request(
1396 'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1397 self.assertEqual(response['status_code'], requests.codes.conflict)
1399 def test_90_disconnect_xponders_from_roadm(self):
1400 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1401 self.assertEqual(response['status_code'], requests.codes.ok)
1402 links = response['network'][0]['ietf-network-topology:link']
1404 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1405 response = test_utils_rfc8040.del_ietf_network_link_request(
1406 'openroadm-topology', link['link-id'], 'config')
1407 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1409 def test_91_disconnect_xpdra2(self):
1410 response = test_utils_rfc8040.unmount_device("XPDR-A2")
1411 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1413 def test_92_disconnect_xpdrc2(self):
1414 response = test_utils_rfc8040.unmount_device("XPDR-C2")
1415 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1417 def test_93_disconnect_roadmA(self):
1418 response = test_utils_rfc8040.unmount_device("ROADM-A1")
1419 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1421 def test_94_disconnect_roadmC(self):
1422 response = test_utils_rfc8040.unmount_device("ROADM-C1")
1423 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1426 if __name__ == "__main__":
1427 unittest.main(verbosity=2)