3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=invalid-name
13 # pylint: disable=no-member
14 # pylint: disable=too-many-public-methods
15 # pylint: disable=too-many-lines
21 # pylint: disable=wrong-import-order
23 sys.path.append('transportpce_tests/common/')
24 # pylint: disable=wrong-import-position
25 # pylint: disable=import-error
26 import test_utils # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION_221 = '2.2.1'
34 NODE_VERSION_71 = '7.1'
36 cr_serv_input_data = {
37 "sdnc-request-header": {
38 "request-id": "request-1",
39 "rpc-action": "service-create",
40 "request-system-id": "appname"
42 "service-name": "service1-OTUC4",
43 "common-id": "commonId",
44 "connection-type": "infrastructure",
46 "service-rate": "400",
48 "service-format": "OTU",
49 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
53 "port-device-name": "XPDR-A2-XPDR2",
55 "port-name": "XPDR2-NETWORK1",
56 "port-rack": "000000.00",
57 "port-shelf": "Chassis#1"
60 "lgx-device-name": "Some lgx-device-name",
61 "lgx-port-name": "Some lgx-port-name",
62 "lgx-port-rack": "000000.00",
63 "lgx-port-shelf": "00"
69 "port-device-name": "XPDR-A2-XPDR2",
71 "port-name": "XPDR2-NETWORK1",
72 "port-rack": "000000.00",
73 "port-shelf": "Chassis#1"
76 "lgx-device-name": "Some lgx-device-name",
77 "lgx-port-name": "Some lgx-port-name",
78 "lgx-port-rack": "000000.00",
79 "lgx-port-shelf": "00"
86 "service-rate": "400",
88 "service-format": "OTU",
89 "otu-service-rate": "org-openroadm-otn-common-types:OTUCn",
93 "port-device-name": "XPDR-C2-XPDR2",
95 "port-name": "XPDR2-NETWORK1",
96 "port-rack": "000000.00",
97 "port-shelf": "Chassis#1"
100 "lgx-device-name": "Some lgx-device-name",
101 "lgx-port-name": "Some lgx-port-name",
102 "lgx-port-rack": "000000.00",
103 "lgx-port-shelf": "00"
109 "port-device-name": "XPDR-C2-XPDR2",
110 "port-type": "fixed",
111 "port-name": "XPDR2-NETWORK1",
112 "port-rack": "000000.00",
113 "port-shelf": "Chassis#1"
116 "lgx-device-name": "Some lgx-device-name",
117 "lgx-port-name": "Some lgx-port-name",
118 "lgx-port-rack": "000000.00",
119 "lgx-port-shelf": "00"
125 "due-date": "2018-06-15T00:00:01Z",
126 "operator-contact": "pw1234"
129 del_serv_input_data = {
130 "sdnc-request-header": {
131 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
132 "rpc-action": "service-delete",
133 "request-system-id": "appname",
134 "notification-url": "http://localhost:8585/NotificationServer/notify"},
135 "service-delete-req-info": {
136 "service-name": "TBD",
137 "tail-retention": "no"}
142 cls.processes = test_utils.start_tpce()
143 cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION_71),
144 ('roadma', cls.NODE_VERSION_221),
145 ('roadmc', cls.NODE_VERSION_221),
146 ('xpdrc2', cls.NODE_VERSION_71)])
149 def tearDownClass(cls):
150 # pylint: disable=not-an-iterable
151 for process in cls.processes:
152 test_utils.shutdown_process(process)
153 print("all processes killed")
158 def test_001_connect_xpdra2(self):
159 response = test_utils.mount_device("XPDR-A2", ('xpdra2', self.NODE_VERSION_71))
160 self.assertEqual(response.status_code,
161 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
163 def test_002_connect_xpdrc2(self):
164 response = test_utils.mount_device("XPDR-C2", ('xpdrc2', self.NODE_VERSION_71))
165 self.assertEqual(response.status_code,
166 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
168 def test_003_connect_rdma(self):
169 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
170 self.assertEqual(response.status_code,
171 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
173 def test_004_connect_rdmc(self):
174 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
175 self.assertEqual(response.status_code,
176 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
178 def test_005_connect_xpdra2_2_N1_to_roadma_PP2(self):
179 response = test_utils.transportpce_api_rpc_request(
180 'transportpce-networkutils', 'init-xpdr-rdm-links',
181 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
182 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
183 self.assertEqual(response['status_code'], requests.codes.ok)
184 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
186 def test_006_connect_roadma_PP2_to_xpdra2_2_N1(self):
187 response = test_utils.transportpce_api_rpc_request(
188 'transportpce-networkutils', 'init-rdm-xpdr-links',
189 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '2', 'network-num': '1',
190 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
191 self.assertEqual(response['status_code'], requests.codes.ok)
192 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
194 def test_007_connect_xpdrc2_2_N1_to_roadmc_PP2(self):
195 response = test_utils.transportpce_api_rpc_request(
196 'transportpce-networkutils', 'init-xpdr-rdm-links',
197 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
198 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
199 self.assertEqual(response['status_code'], requests.codes.ok)
200 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
202 def test_008_connect_roadmc_PP2_to_xpdrc2_2_N1(self):
203 response = test_utils.transportpce_api_rpc_request(
204 'transportpce-networkutils', 'init-rdm-xpdr-links',
205 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '2', 'network-num': '1',
206 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
207 self.assertEqual(response['status_code'], requests.codes.ok)
208 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
210 def test_009_add_omsAttributes_roadma_roadmc(self):
211 # Config ROADMA-ROADMC oms-attributes
213 "auto-spanloss": "true",
214 "spanloss-base": 11.4,
215 "spanloss-current": 12,
216 "engineered-spanloss": 12.2,
217 "link-concatenation": [{
220 "SRLG-length": 100000,
222 response = test_utils.add_oms_attr_request(
223 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224 self.assertEqual(response.status_code, requests.codes.created)
226 def test_010_add_omsAttributes_roadmc_roadma(self):
227 # Config ROADMC-ROADMA oms-attributes
229 "auto-spanloss": "true",
230 "spanloss-base": 11.4,
231 "spanloss-current": 12,
232 "engineered-spanloss": 12.2,
233 "link-concatenation": [{
236 "SRLG-length": 100000,
238 response = test_utils.add_oms_attr_request(
239 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240 self.assertEqual(response.status_code, requests.codes.created)
242 # test service-create for OCH-OTU4 service from xpdra2 to xpdrc2
243 def test_011_check_otn_topology(self):
244 response = test_utils.get_ietf_network_request('otn-topology', 'config')
245 self.assertEqual(response['status_code'], requests.codes.ok)
246 self.assertEqual(len(response['network'][0]['node']), 7, 'There should be 7 nodes')
247 self.assertNotIn('ietf-network-topology:link', response['network'][0])
249 def test_012_create_OTUC4_service(self):
250 response = test_utils.transportpce_api_rpc_request(
251 'org-openroadm-service', 'service-create',
252 self.cr_serv_input_data)
253 self.assertEqual(response['status_code'], requests.codes.ok)
254 self.assertIn('PCE calculation in progress',
255 response['output']['configuration-response-common']['response-message'])
256 time.sleep(self.WAITING)
258 def test_013_get_OTUC4_service1(self):
259 response = test_utils.get_ordm_serv_list_attr_request(
260 "services", "service1-OTUC4")
261 self.assertEqual(response['status_code'], requests.codes.ok)
262 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
263 self.assertEqual(response['services'][0]['service-name'], 'service1-OTUC4')
264 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
265 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
266 self.assertEqual(response['services'][0]['service-layer'], 'wdm')
267 self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
268 self.assertEqual(response['services'][0]['service-a-end']['otu-service-rate'],
269 'org-openroadm-otn-common-types:OTUCn')
270 self.assertEqual(response['services'][0]['service-z-end']['otu-service-rate'],
271 'org-openroadm-otn-common-types:OTUCn')
273 # Check correct configuration of devices
274 def test_014_check_interface_otsi_xpdra2(self):
275 response = test_utils.check_node_attribute_request(
276 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
277 self.assertEqual(response['status_code'], requests.codes.ok)
278 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
279 'administrative-state': 'inService',
280 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
281 'type': 'org-openroadm-interfaces:otsi',
282 'supporting-port': 'L1'
283 }, **response['interface'][0]),
284 response['interface'][0])
285 self.assertDictEqual(
286 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
287 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
288 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
289 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
291 def test_015_check_interface_OTSI_GROUP_xpdra2(self):
292 response = test_utils.check_node_attribute_request(
293 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
294 self.assertEqual(response['status_code'], requests.codes.ok)
295 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
296 'administrative-state': 'inService',
297 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
298 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
299 'type': 'org-openroadm-interfaces:otsi-group',
300 'supporting-port': 'L1'
302 input_dict_2 = {"group-id": 1,
303 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
305 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
306 response['interface'][0])
307 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
308 ['org-openroadm-otsi-group-interfaces:otsi-group']),
309 response['interface'][0]
310 ['org-openroadm-otsi-group-interfaces:otsi-group'])
312 def test_016_check_interface_OTUC4_xpdra2(self):
313 response = test_utils.check_node_attribute_request(
314 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
315 self.assertEqual(response['status_code'], requests.codes.ok)
316 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTUC4',
317 'administrative-state': 'inService',
318 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
319 'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
320 'type': 'org-openroadm-interfaces:otnOtu',
321 'supporting-port': 'L1'
323 input_dict_2 = {'tx-sapi': 'G54UFNImtOE=',
324 'expected-dapi': 'G54UFNImtOE=',
325 'tx-dapi': 'J/FIUzQc+4M=',
326 'expected-sapi': 'J/FIUzQc+4M=',
327 'rate': 'org-openroadm-otn-common-types:OTUCn',
328 'degthr-percentage': 100,
332 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
333 response['interface'][0])
334 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
335 response['interface'][0]
336 ['org-openroadm-otn-otu-interfaces:otu'])
338 def test_017_check_interface_otsi_xpdrc2(self):
339 response = test_utils.check_node_attribute_request(
340 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-755:768')
341 self.assertEqual(response['status_code'], requests.codes.ok)
342 self.assertDictEqual(dict({'name': 'XPDR2-NETWORK1-755:768',
343 'administrative-state': 'inService',
344 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
345 'type': 'org-openroadm-interfaces:otsi',
346 'supporting-port': 'L1'
347 }, **response['interface'][0]),
348 response['interface'][0])
350 self.assertDictEqual(
351 dict({'frequency': 196.0812, 'otsi-rate': 'org-openroadm-common-optical-channel-types:R400G-otsi',
352 'transmit-power': -5, 'modulation-format': 'dp-qam16'},
353 **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
354 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
356 def test_018_check_interface_OTSI_GROUP_xpdrc2(self):
357 response = test_utils.check_node_attribute_request(
358 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
359 self.assertEqual(response['status_code'], requests.codes.ok)
360 input_dict_1 = {'name': 'XPDR2-NETWORK1-OTSIGROUP-400G',
361 'administrative-state': 'inService',
362 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
363 'supporting-interface-list': 'XPDR2-NETWORK1-755:768',
364 'type': 'org-openroadm-interfaces:otsi-group',
365 'supporting-port': 'L1'
367 input_dict_2 = {"group-id": 1,
368 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"
370 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
371 response['interface'][0])
372 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]
373 ['org-openroadm-otsi-group-interfaces:otsi-group']),
374 response['interface'][0]
375 ['org-openroadm-otsi-group-interfaces:otsi-group'])
377 def test_019_check_interface_OTUC4_xpdrc2(self):
378 response = test_utils.check_node_attribute_request(
379 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-OTUC4')
380 self.assertEqual(response['status_code'], requests.codes.ok)
381 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
382 'administrative-state': 'inService',
383 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
384 'supporting-interface-list': 'XPDR2-NETWORK1-OTSIGROUP-400G',
385 'type': 'org-openroadm-interfaces:otnOtu',
386 'supporting-port': 'L1'
388 input_dict_2 = {'tx-dapi': 'G54UFNImtOE=',
389 'expected-sapi': 'G54UFNImtOE=',
390 'tx-sapi': 'J/FIUzQc+4M=',
391 'expected-dapi': 'J/FIUzQc+4M=',
392 'rate': 'org-openroadm-otn-common-types:OTUCn',
393 'degthr-percentage': 100,
398 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
399 response['interface'][0])
401 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
402 response['interface'][0]
403 ['org-openroadm-otn-otu-interfaces:otu'])
405 def test_020_check_no_interface_ODUC4_xpdra2(self):
406 response = test_utils.check_node_attribute_request(
407 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
408 self.assertEqual(response['status_code'], requests.codes.conflict)
410 def test_021_check_openroadm_topo_xpdra2(self):
411 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
412 self.assertEqual(response['status_code'], requests.codes.ok)
413 ele = response['node']['ietf-network-topology:termination-point'][0]
414 self.assertEqual('XPDR2-NETWORK1', ele['tp-id'])
417 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
420 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
422 def test_022_check_otn_topo_OTUC4_links(self):
423 response = test_utils.get_ietf_network_request('otn-topology', 'config')
424 self.assertEqual(response['status_code'], requests.codes.ok)
425 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
426 listLinkId = ['OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
427 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1']
428 for link in response['network'][0]['ietf-network-topology:link']:
429 self.assertIn(link['link-id'], listLinkId)
430 self.assertEqual(link['transportpce-networkutils:otn-link-type'], 'OTUC4')
431 self.assertEqual(link['org-openroadm-common-network:link-type'], 'OTN-LINK')
432 self.assertEqual(link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
433 self.assertEqual(link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
434 self.assertIn(link['org-openroadm-common-network:opposite-link'], listLinkId)
436 # test service-create for ODU4 service from xpdra2 to xpdrc2
437 def test_023_create_ODUC4_service(self):
438 self.cr_serv_input_data["service-name"] = "service1-ODUC4"
439 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
440 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
441 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
442 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
443 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
444 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODUCn"
445 response = test_utils.transportpce_api_rpc_request(
446 'org-openroadm-service', 'service-create',
447 self.cr_serv_input_data)
448 self.assertEqual(response['status_code'], requests.codes.ok)
449 self.assertIn('PCE calculation in progress',
450 response['output']['configuration-response-common']['response-message'])
451 time.sleep(self.WAITING)
453 def test_024_get_ODUC4_service1(self):
454 response = test_utils.get_ordm_serv_list_attr_request(
455 "services", "service1-ODUC4")
456 self.assertEqual(response['status_code'], requests.codes.ok)
457 self.assertEqual(response['services'][0]['service-name'], 'service1-ODUC4')
458 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
459 self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
460 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
461 self.assertEqual(response['services'][0]['service-layer'], 'wdm')
462 self.assertEqual(response['services'][0]['service-a-end']['service-rate'], 400)
463 self.assertEqual(response['services'][0]['service-a-end']['odu-service-rate'],
464 'org-openroadm-otn-common-types:ODUCn')
465 self.assertEqual(response['services'][0]['service-z-end']['odu-service-rate'],
466 'org-openroadm-otn-common-types:ODUCn')
468 def test_025_check_interface_ODUC4_xpdra2(self):
469 response = test_utils.check_node_attribute_request(
470 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
471 self.assertEqual(response['status_code'], requests.codes.ok)
472 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
473 'administrative-state': 'inService',
474 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
475 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
476 'type': 'org-openroadm-interfaces:otnOdu',
477 'supporting-port': 'L1'}
478 # SAPI/DAPI are added in the Otu4 renderer
479 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
480 'rate': 'org-openroadm-otn-common-types:ODUCn',
481 'expected-dapi': 'Nmbu2MNHvc4=',
482 'expected-sapi': 'Nmbu2MNHvc4=',
483 'tx-dapi': 'Nmbu2MNHvc4=',
484 'tx-sapi': 'LY9PxYJqUbw='}
486 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
487 response['interface'][0])
488 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
489 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
490 self.assertDictEqual(
491 {'payload-type': '22', 'exp-payload-type': '22'},
492 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
494 def test_026_check_interface_ODUC4_xpdrc2(self):
495 response = test_utils.check_node_attribute_request(
496 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODUC4')
497 self.assertEqual(response['status_code'], requests.codes.ok)
498 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODUC4',
499 'administrative-state': 'inService',
500 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
501 'supporting-interface-list': 'XPDR2-NETWORK1-OTUC4',
502 'type': 'org-openroadm-interfaces:otnOdu',
503 'supporting-port': 'L1'}
504 # SAPI/DAPI are added in the Otu4 renderer
505 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
506 'rate': 'org-openroadm-otn-common-types:ODUCn',
507 'tx-sapi': 'Nmbu2MNHvc4=',
508 'tx-dapi': 'LY9PxYJqUbw=',
509 'expected-sapi': 'LY9PxYJqUbw=',
510 'expected-dapi': 'LY9PxYJqUbw='
512 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
513 response['interface'][0])
514 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
515 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
516 self.assertDictEqual(
517 {'payload-type': '22', 'exp-payload-type': '22'},
518 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
520 def test_027_check_otn_topo_links(self):
521 response = test_utils.get_ietf_network_request('otn-topology', 'config')
522 self.assertEqual(response['status_code'], requests.codes.ok)
523 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
524 for link in response['network'][0]['ietf-network-topology:link']:
525 linkId = link['link-id']
526 if (linkId in ('OTUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
527 'OTUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
529 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
531 link['org-openroadm-otn-network-topology:used-bandwidth'], 400000)
532 elif (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
533 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
535 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
537 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
539 link['transportpce-networkutils:otn-link-type'], 'ODUC4')
541 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
542 self.assertIn(link['org-openroadm-common-network:opposite-link'],
543 ['ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
544 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1'])
546 self.fail("this link should not exist")
548 def test_028_check_otn_topo_tp(self):
549 response = test_utils.get_ietf_network_request('otn-topology', 'config')
550 self.assertEqual(response['status_code'], requests.codes.ok)
551 for node in response['network'][0]['node']:
552 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
553 tpList = node['ietf-network-topology:termination-point']
555 if tp['tp-id'] == 'XPDR2-NETWORK1':
556 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
557 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
559 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
560 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
561 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
563 # test service-create for 100GE service 1 from xpdra2 to xpdrc2
564 def test_029_create_100GE_service_1(self):
565 self.cr_serv_input_data["service-name"] = "service-100GE"
566 self.cr_serv_input_data["connection-type"] = "service"
567 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
568 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
569 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
570 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
571 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
572 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
573 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
574 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
575 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
576 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
577 response = test_utils.transportpce_api_rpc_request(
578 'org-openroadm-service', 'service-create',
579 self.cr_serv_input_data)
580 self.assertEqual(response['status_code'], requests.codes.ok)
581 self.assertIn('PCE calculation in progress',
582 response['output']['configuration-response-common']['response-message'])
583 time.sleep(self.WAITING)
585 def test_030_get_100GE_service_1(self):
586 response = test_utils.get_ordm_serv_list_attr_request(
587 "services", "service-100GE")
588 self.assertEqual(response['status_code'], requests.codes.ok)
589 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
590 self.assertEqual(response['services'][0]['service-name'], 'service-100GE')
591 self.assertEqual(response['services'][0]['connection-type'], 'service')
592 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
594 def test_031_check_interface_100GE_CLIENT_xpdra2(self):
595 response = test_utils.check_node_attribute_request(
596 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
597 self.assertEqual(response['status_code'], requests.codes.ok)
598 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
599 'administrative-state': 'inService',
600 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
601 'type': 'org-openroadm-interfaces:ethernetCsmacd',
602 'supporting-port': 'C1'
604 input_dict_2 = {'speed': 100000}
605 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
606 response['interface'][0])
607 self.assertDictEqual(
608 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
609 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
611 def test_032_check_interface_ODU4_CLIENT_xpdra2(self):
612 response = test_utils.check_node_attribute_request(
613 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
614 self.assertEqual(response['status_code'], requests.codes.ok)
615 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE',
616 'administrative-state': 'inService',
617 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
618 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
619 'type': 'org-openroadm-interfaces:otnOdu',
620 'supporting-port': 'C1'}
622 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
623 'rate': 'org-openroadm-otn-common-types:ODU4',
624 'monitoring-mode': 'terminated'}
626 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
627 response['interface'][0])
628 self.assertDictEqual(dict(input_dict_2,
629 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
630 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
631 self.assertDictEqual(
632 {'payload-type': '07', 'exp-payload-type': '07'},
633 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
635 def test_033_check_interface_ODU4_NETWORK_xpdra2(self):
636 response = test_utils.check_node_attribute_request(
637 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE',
640 'administrative-state': 'inService',
641 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
642 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
643 'type': 'org-openroadm-interfaces:otnOdu',
644 'supporting-port': 'L1'}
646 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
647 'rate': 'org-openroadm-otn-common-types:ODU4',
648 'monitoring-mode': 'not-terminated'}
649 input_dict_3 = {'trib-port-number': 1}
651 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
652 response['interface'][0])
653 self.assertDictEqual(dict(input_dict_2,
654 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
655 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
656 self.assertDictEqual(dict(input_dict_3,
657 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
658 'parent-odu-allocation']),
659 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
660 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
661 ['opucn-trib-slots'])
662 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
663 ['opucn-trib-slots'])
665 def test_034_check_ODU4_connection_xpdra2(self):
666 response = test_utils.check_node_attribute_request(
667 'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
668 self.assertEqual(response['status_code'], requests.codes.ok)
671 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
672 'direction': 'bidirectional'
675 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
676 response['odu-connection'][0])
677 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE'},
678 response['odu-connection'][0]['destination'])
679 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE'},
680 response['odu-connection'][0]['source'])
682 def test_035_check_interface_100GE_CLIENT_xpdrc2(self):
683 response = test_utils.check_node_attribute_request(
684 'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
685 self.assertEqual(response['status_code'], requests.codes.ok)
686 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
687 'administrative-state': 'inService',
688 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
689 'type': 'org-openroadm-interfaces:ethernetCsmacd',
690 'supporting-port': 'C1'
692 input_dict_2 = {'speed': 100000}
693 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
694 response['interface'][0])
695 self.assertDictEqual(
696 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
697 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
699 def test_036_check_interface_ODU4_CLIENT_xpdrc2(self):
700 response = test_utils.check_node_attribute_request(
701 'XPDR-C2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
702 self.assertEqual(response['status_code'], requests.codes.ok)
703 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE',
704 'administrative-state': 'inService',
705 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
706 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100G',
707 'type': 'org-openroadm-interfaces:otnOdu',
708 'supporting-port': 'C1'}
710 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
711 'rate': 'org-openroadm-otn-common-types:ODU4',
712 'monitoring-mode': 'terminated'}
714 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
715 response['interface'][0])
716 self.assertDictEqual(dict(input_dict_2,
717 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
718 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
719 self.assertDictEqual(
720 {'payload-type': '07', 'exp-payload-type': '07'},
721 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
723 def test_037_check_interface_ODU4_NETWORK_xpdrc2(self):
724 response = test_utils.check_node_attribute_request(
725 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
726 self.assertEqual(response['status_code'], requests.codes.ok)
727 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE',
728 'administrative-state': 'inService',
729 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
730 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
731 'type': 'org-openroadm-interfaces:otnOdu',
732 'supporting-port': 'C1'}
734 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
735 'rate': 'org-openroadm-otn-common-types:ODU4',
736 'monitoring-mode': 'not-terminated'}
738 input_dict_3 = {'trib-port-number': 1}
740 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
741 response['interface'][0])
742 self.assertDictEqual(dict(input_dict_2,
743 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
744 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
745 self.assertDictEqual(dict(input_dict_3,
746 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
747 'parent-odu-allocation']),
748 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
749 'parent-odu-allocation'])
751 response['interface'][0][
752 'org-openroadm-otn-odu-interfaces:odu'][
753 'parent-odu-allocation']['opucn-trib-slots'])
754 self.assertIn('1.20',
755 response['interface'][0][
756 'org-openroadm-otn-odu-interfaces:odu'][
757 'parent-odu-allocation']['opucn-trib-slots'])
759 def test_038_check_ODU4_connection_xpdrc2(self):
760 response = test_utils.check_node_attribute_request(
761 'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
762 self.assertEqual(response['status_code'], requests.codes.ok)
765 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
766 'direction': 'bidirectional'
769 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
770 response['odu-connection'][0])
771 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE'},
772 response['odu-connection'][0]['destination'])
773 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE'},
774 response['odu-connection'][0]['source'])
776 def test_039_check_otn_topo_links(self):
777 response = test_utils.get_ietf_network_request('otn-topology', 'config')
778 self.assertEqual(response['status_code'], requests.codes.ok)
779 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
780 for link in response['network'][0]['ietf-network-topology:link']:
781 linkId = link['link-id']
782 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
783 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
785 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
787 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
789 def test_040_check_otn_topo_tp(self):
790 response = test_utils.get_ietf_network_request('otn-topology', 'config')
791 self.assertEqual(response['status_code'], requests.codes.ok)
792 for node in response['network'][0]['node']:
793 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
794 tpList = node['ietf-network-topology:termination-point']
796 if tp['tp-id'] == 'XPDR2-NETWORK1':
797 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
798 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
799 tsPoolList = list(range(1, 20))
801 tsPoolList, xpdrTpPortConAt['ts-pool'])
803 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
805 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
807 # test service-create for 100GE service 2 from xpdra2 to xpdrc2
808 def test_041_create_100GE_service_2(self):
809 self.cr_serv_input_data["service-name"] = "service-100GE2"
810 self.cr_serv_input_data["connection-type"] = "service"
811 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
812 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
813 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
814 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
815 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
816 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
817 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
818 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
819 response = test_utils.transportpce_api_rpc_request(
820 'org-openroadm-service', 'service-create',
821 self.cr_serv_input_data)
822 self.assertEqual(response['status_code'], requests.codes.ok)
823 self.assertIn('PCE calculation in progress',
824 response['output']['configuration-response-common']['response-message'])
825 time.sleep(self.WAITING)
827 def test_042_get_100GE_service_2(self):
828 response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE2")
829 self.assertEqual(response['status_code'], requests.codes.ok)
830 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
831 self.assertEqual(response['services'][0]['service-name'], 'service-100GE2')
832 self.assertEqual(response['services'][0]['connection-type'], 'service')
833 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
836 def test_043_check_service_list(self):
837 response = test_utils.get_ordm_serv_list_request()
838 self.assertEqual(response['status_code'], requests.codes.ok)
839 self.assertEqual(len(response['service-list']['services']), 4)
841 def test_044_check_otn_topo_links(self):
842 response = test_utils.get_ietf_network_request('otn-topology', 'config')
843 self.assertEqual(response['status_code'], requests.codes.ok)
844 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
845 for link in response['network'][0]['ietf-network-topology:link']:
846 linkId = link['link-id']
847 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
848 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
850 link['org-openroadm-otn-network-topology:available-bandwidth'], 200000)
852 link['org-openroadm-otn-network-topology:used-bandwidth'], 200000)
854 def test_045_check_otn_topo_tp(self):
855 response = test_utils.get_ietf_network_request('otn-topology', 'config')
856 self.assertEqual(response['status_code'], requests.codes.ok)
857 for node in response['network'][0]['node']:
858 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
859 tpList = node['ietf-network-topology:termination-point']
861 if tp['tp-id'] == 'XPDR2-NETWORK1':
862 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
863 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 40)
864 tsPoolList = list(range(1, 40))
866 tsPoolList, xpdrTpPortConAt['ts-pool'])
868 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 2)
870 2, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
872 def test_046_delete_100GE_service_2(self):
873 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE2"
874 response = test_utils.transportpce_api_rpc_request(
875 'org-openroadm-service', 'service-delete',
876 self.del_serv_input_data)
877 self.assertEqual(response['status_code'], requests.codes.ok)
878 self.assertIn('Renderer service delete in progress',
879 response['output']['configuration-response-common']['response-message'])
880 time.sleep(self.WAITING)
882 def test_047_delete_100GE_service_1(self):
883 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE"
884 response = test_utils.transportpce_api_rpc_request(
885 'org-openroadm-service', 'service-delete',
886 self.del_serv_input_data)
887 self.assertEqual(response['status_code'], requests.codes.ok)
888 self.assertIn('Renderer service delete in progress',
889 response['output']['configuration-response-common']['response-message'])
890 time.sleep(self.WAITING)
892 def test_048_check_service_list(self):
893 response = test_utils.get_ordm_serv_list_request()
894 self.assertEqual(response['status_code'], requests.codes.ok)
895 self.assertEqual(len(response['service-list']['services']), 2)
897 def test_049_check_no_ODU4_connection_xpdra2(self):
898 response = test_utils.check_node_request("XPDR-A2")
899 self.assertEqual(response['status_code'], requests.codes.ok)
900 self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
902 def test_050_check_no_interface_ODU4_NETWORK_xpdra2(self):
903 response = test_utils.check_node_attribute_request(
904 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE')
905 self.assertEqual(response['status_code'], requests.codes.conflict)
907 def test_051_check_no_interface_ODU4_CLIENT_xpdra2(self):
908 response = test_utils.check_node_attribute_request(
909 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE')
910 self.assertEqual(response['status_code'], requests.codes.conflict)
912 def test_052_check_no_interface_100GE_CLIENT_xpdra2(self):
913 response = test_utils.check_node_attribute_request(
914 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
915 self.assertEqual(response['status_code'], requests.codes.conflict)
917 def test_053_check_otn_topo_links(self):
918 response = test_utils.get_ietf_network_request('otn-topology', 'config')
919 self.assertEqual(response['status_code'], requests.codes.ok)
920 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
921 for link in response['network'][0]['ietf-network-topology:link']:
922 linkId = link['link-id']
923 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
924 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
926 link['org-openroadm-otn-network-topology:available-bandwidth'], 400000)
928 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
930 def test_054_check_otn_topo_tp(self):
931 response = test_utils.get_ietf_network_request('otn-topology', 'config')
932 self.assertEqual(response['status_code'], requests.codes.ok)
933 for node in response['network'][0]['node']:
934 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
935 tpList = node['ietf-network-topology:termination-point']
937 if tp['tp-id'] == 'XPDR2-NETWORK1':
938 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
939 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
941 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 4)
943 # test service-create for 100GE service 3 from xpdra2 client1 to xpdrc2 client2
944 def test_055_create_100GE_service_3(self):
945 self.cr_serv_input_data["service-name"] = "service-100GE3"
946 self.cr_serv_input_data["connection-type"] = "service"
947 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
948 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
949 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
950 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
951 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
952 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
953 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
954 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
955 response = test_utils.transportpce_api_rpc_request(
956 'org-openroadm-service', 'service-create',
957 self.cr_serv_input_data)
958 self.assertEqual(response['status_code'], requests.codes.ok)
959 self.assertIn('PCE calculation in progress',
960 response['output']['configuration-response-common']['response-message'])
961 time.sleep(self.WAITING)
963 def test_056_get_100GE_service_3(self):
964 response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE3")
965 self.assertEqual(response['status_code'], requests.codes.ok)
966 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
967 self.assertEqual(response['services'][0]['service-name'], 'service-100GE3')
968 self.assertEqual(response['services'][0]['connection-type'], 'service')
969 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
972 def test_057_check_service_list(self):
973 response = test_utils.get_ordm_serv_list_request()
974 self.assertEqual(response['status_code'], requests.codes.ok)
975 self.assertEqual(len(response['service-list']['services']), 3)
977 def test_058_check_interface_100GE_CLIENT_xpdra2(self):
978 response = test_utils.check_node_attribute_request(
979 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
980 self.assertEqual(response['status_code'], requests.codes.ok)
981 input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
982 'administrative-state': 'inService',
983 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
984 'type': 'org-openroadm-interfaces:ethernetCsmacd',
985 'supporting-port': 'C1'
987 input_dict_2 = {'speed': 100000}
988 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
989 response['interface'][0])
990 self.assertDictEqual(
991 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
992 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
994 def test_059_check_interface_ODU4_CLIENT_xpdra2(self):
995 response = test_utils.check_node_attribute_request(
996 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE3')
997 self.assertEqual(response['status_code'], requests.codes.ok)
998 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE3',
999 'administrative-state': 'inService',
1000 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
1001 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100GE3',
1002 'type': 'org-openroadm-interfaces:otnOdu',
1003 'supporting-port': 'C1'}
1005 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1006 'rate': 'org-openroadm-otn-common-types:ODU4',
1007 'monitoring-mode': 'terminated'}
1009 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1010 response['interface'][0])
1011 self.assertDictEqual(dict(input_dict_2,
1012 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1013 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1014 self.assertDictEqual(
1015 {'payload-type': '07', 'exp-payload-type': '07'},
1016 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1018 def test_060_check_interface_ODU4_NETWORK_xpdra2(self):
1019 response = test_utils.check_node_attribute_request(
1020 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE3')
1021 self.assertEqual(response['status_code'], requests.codes.ok)
1022 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE3',
1023 'administrative-state': 'inService',
1024 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
1025 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
1026 'type': 'org-openroadm-interfaces:otnOdu',
1027 'supporting-port': 'L1'}
1029 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1030 'rate': 'org-openroadm-otn-common-types:ODU4',
1031 'monitoring-mode': 'not-terminated'}
1032 input_dict_3 = {'trib-port-number': 1}
1034 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1035 response['interface'][0])
1036 self.assertDictEqual(dict(input_dict_2,
1037 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1038 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1039 self.assertDictEqual(dict(input_dict_3,
1040 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1041 'parent-odu-allocation']),
1042 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1043 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1044 ['opucn-trib-slots'])
1045 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1046 ['opucn-trib-slots'])
1048 def test_061_check_ODU4_connection_xpdra2(self):
1049 response = test_utils.check_node_attribute_request(
1050 'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
1051 self.assertEqual(response['status_code'], requests.codes.ok)
1054 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
1055 'direction': 'bidirectional'
1058 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1059 response['odu-connection'][0])
1060 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE3'},
1061 response['odu-connection'][0]['destination'])
1062 self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE3'},
1063 response['odu-connection'][0]['source'])
1065 def test_062_check_interface_100GE_CLIENT_xpdrc2(self):
1066 response = test_utils.check_node_attribute_request(
1067 'XPDR-C2', 'interface', 'XPDR2-CLIENT2-ETHERNET-100G')
1068 self.assertEqual(response['status_code'], requests.codes.ok)
1069 input_dict_1 = {'name': 'XPDR2-CLIENT2-ETHERNET-100G',
1070 'administrative-state': 'inService',
1071 'supporting-circuit-pack-name': '1/2/1/2-PLUG-CLIENT',
1072 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1073 'supporting-port': 'C1'
1075 input_dict_2 = {'speed': 100000}
1076 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1077 response['interface'][0])
1078 self.assertDictEqual(
1079 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1080 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1082 def test_063_check_interface_ODU4_CLIENT_xpdrc2(self):
1083 response = test_utils.check_node_attribute_request(
1084 'XPDR-C2', 'interface', 'XPDR2-CLIENT2-ODU4:service-100GE3')
1085 self.assertEqual(response['status_code'], requests.codes.ok)
1086 input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE3',
1087 'administrative-state': 'inService',
1088 'supporting-circuit-pack-name': '1/2/1/2-PLUG-CLIENT',
1089 'supporting-interface-list': 'XPDR2-CLIENT2-ETHERNET-100GE3',
1090 'type': 'org-openroadm-interfaces:otnOdu',
1091 'supporting-port': 'C1'}
1093 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1094 'rate': 'org-openroadm-otn-common-types:ODU4',
1095 'monitoring-mode': 'terminated'}
1097 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1098 response['interface'][0])
1099 self.assertDictEqual(dict(input_dict_2,
1100 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1101 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1102 self.assertDictEqual(
1103 {'payload-type': '07', 'exp-payload-type': '07'},
1104 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1106 def test_064_check_interface_ODU4_NETWORK_xpdrc2(self):
1107 response = test_utils.check_node_attribute_request(
1108 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE3')
1109 self.assertEqual(response['status_code'], requests.codes.ok)
1110 input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE3',
1111 'administrative-state': 'inService',
1112 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
1113 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
1114 'type': 'org-openroadm-interfaces:otnOdu',
1115 'supporting-port': 'C1'}
1117 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1118 'rate': 'org-openroadm-otn-common-types:ODU4',
1119 'monitoring-mode': 'not-terminated'}
1121 input_dict_3 = {'trib-port-number': 1}
1123 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1124 response['interface'][0])
1125 self.assertDictEqual(dict(input_dict_2,
1126 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1127 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1128 self.assertDictEqual(dict(input_dict_3,
1129 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1130 'parent-odu-allocation']),
1131 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1132 'parent-odu-allocation'])
1133 self.assertIn('1.1',
1134 response['interface'][0][
1135 'org-openroadm-otn-odu-interfaces:odu'][
1136 'parent-odu-allocation']['opucn-trib-slots'])
1137 self.assertIn('1.20',
1138 response['interface'][0][
1139 'org-openroadm-otn-odu-interfaces:odu'][
1140 'parent-odu-allocation']['opucn-trib-slots'])
1142 def test_065_check_ODU4_connection_xpdrc2(self):
1143 response = test_utils.check_node_attribute_request(
1144 'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT2-ODU4-x-XPDR2-NETWORK1-ODU4')
1145 self.assertEqual(response['status_code'], requests.codes.ok)
1148 'XPDR2-CLIENT2-ODU4-x-XPDR2-NETWORK1-ODU4',
1149 'direction': 'bidirectional'
1152 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1153 response['odu-connection'][0])
1154 self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE3'},
1155 response['odu-connection'][0]['destination'])
1156 self.assertDictEqual({'src-if': 'XPDR2-CLIENT2-ODU4:service-100GE3'},
1157 response['odu-connection'][0]['source'])
1159 def test_066_check_otn_topo_links(self):
1160 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1161 self.assertEqual(response['status_code'], requests.codes.ok)
1162 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1163 for link in response['network'][0]['ietf-network-topology:link']:
1164 linkId = link['link-id']
1165 if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
1166 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
1168 link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
1170 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
1172 def test_067_check_otn_topo_tp(self):
1173 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1174 self.assertEqual(response['status_code'], requests.codes.ok)
1175 for node in response['network'][0]['node']:
1176 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1177 tpList = node['ietf-network-topology:termination-point']
1179 if tp['tp-id'] == 'XPDR2-NETWORK1':
1180 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1181 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
1182 tsPoolList = list(range(1, 20))
1183 self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
1184 self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
1185 self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1187 def test_068_delete_100GE_service_3(self):
1188 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE3"
1189 response = test_utils.transportpce_api_rpc_request(
1190 'org-openroadm-service', 'service-delete',
1191 self.del_serv_input_data)
1192 self.assertEqual(response['status_code'], requests.codes.ok)
1193 self.assertIn('Renderer service delete in progress',
1194 response['output']['configuration-response-common']['response-message'])
1195 time.sleep(self.WAITING)
1197 def test_069_delete_ODUC4_service(self):
1198 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODUC4"
1199 response = test_utils.transportpce_api_rpc_request(
1200 'org-openroadm-service', 'service-delete',
1201 self.del_serv_input_data)
1202 self.assertEqual(response['status_code'], requests.codes.ok)
1203 self.assertIn('Renderer service delete in progress',
1204 response['output']['configuration-response-common']['response-message'])
1205 time.sleep(self.WAITING)
1207 def test_070_check_service_list(self):
1208 response = test_utils.get_ordm_serv_list_request()
1209 self.assertEqual(response['status_code'], requests.codes.ok)
1210 self.assertEqual(len(response['service-list']['services']), 1)
1212 def test_071_check_no_interface_ODU4_xpdra2(self):
1213 response = test_utils.check_node_attribute_request(
1214 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODUC4')
1215 self.assertEqual(response['status_code'], requests.codes.conflict)
1217 def test_072_check_otn_topo_links(self):
1218 self.test_022_check_otn_topo_OTUC4_links()
1220 def test_073_check_otn_topo_tp(self):
1221 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1222 self.assertEqual(response['status_code'], requests.codes.ok)
1223 for node in response['network'][0]['node']:
1224 if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
1225 tpList = node['ietf-network-topology:termination-point']
1227 if tp['tp-id'] == 'XPDR2-NETWORK1':
1228 self.assertNotIn('org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes',
1231 def test_074_delete_OTUC4_service(self):
1232 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OTUC4"
1233 response = test_utils.transportpce_api_rpc_request(
1234 'org-openroadm-service', 'service-delete',
1235 self.del_serv_input_data)
1236 self.assertEqual(response['status_code'], requests.codes.ok)
1237 self.assertIn('Renderer service delete in progress',
1238 response['output']['configuration-response-common']['response-message'])
1239 time.sleep(self.WAITING)
1241 def test_075_get_no_service(self):
1242 response = test_utils.get_ordm_serv_list_request()
1243 self.assertEqual(response['status_code'], requests.codes.conflict)
1244 self.assertIn(response['service-list'], (
1246 "error-type": "protocol",
1247 "error-tag": "data-missing",
1249 "Request could not be completed because the relevant data "
1250 "model content does not exist"
1252 "error-type": "application",
1253 "error-tag": "data-missing",
1255 "Request could not be completed because the relevant data "
1256 "model content does not exist"
1259 def test_076_check_no_interface_OTUC4_xpdra2(self):
1260 response = test_utils.check_node_attribute_request(
1261 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTUC4')
1262 self.assertEqual(response['status_code'], requests.codes.conflict)
1264 def test_077_check_no_interface_OTSI_xpdra2(self):
1265 response = test_utils.check_node_attribute_request(
1266 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-755:768')
1267 self.assertEqual(response['status_code'], requests.codes.conflict)
1269 def test_078_check_no_interface_OTSIG_xpdra2(self):
1270 response = test_utils.check_node_attribute_request(
1271 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-OTSIGROUP-400G')
1272 self.assertEqual(response['status_code'], requests.codes.conflict)
1274 def test_079_getLinks_OtnTopology(self):
1275 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1276 self.assertEqual(response['status_code'], requests.codes.ok)
1277 self.assertNotIn('ietf-network-topology:link', response['network'][0])
1279 def test_080_check_openroadm_topo_xpdra2(self):
1280 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR2', 'config')
1281 self.assertEqual(response['status_code'], requests.codes.ok)
1282 tp = response['node']['ietf-network-topology:termination-point'][0]
1283 self.assertEqual('XPDR2-NETWORK1', tp['tp-id'])
1284 self.assertNotIn('wavelength', dict.keys(
1285 tp['org-openroadm-network-topology:xpdr-network-attributes']))
1287 def test_081_check_openroadm_topology(self):
1288 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1289 self.assertEqual(response['status_code'], requests.codes.ok)
1290 links = response['network'][0]['ietf-network-topology:link']
1291 self.assertEqual(22, len(links), 'Topology should contain 22 links')
1293 def test_082_connect_xpdra2_1_N1_to_roadma_PP2(self):
1294 response = test_utils.transportpce_api_rpc_request(
1295 'transportpce-networkutils', 'init-xpdr-rdm-links',
1296 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1297 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1298 self.assertEqual(response['status_code'], requests.codes.ok)
1299 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1301 def test_083_connect_roadma_PP2_to_xpdra2_1_N1(self):
1302 response = test_utils.transportpce_api_rpc_request(
1303 'transportpce-networkutils', 'init-rdm-xpdr-links',
1304 {'links-input': {'xpdr-node': 'XPDR-A2', 'xpdr-num': '1', 'network-num': '1',
1305 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1306 self.assertEqual(response['status_code'], requests.codes.ok)
1307 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1309 def test_084_connect_xpdrc2_1_N1_to_roadmc_PP2(self):
1310 response = test_utils.transportpce_api_rpc_request(
1311 'transportpce-networkutils', 'init-xpdr-rdm-links',
1312 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1313 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1314 self.assertEqual(response['status_code'], requests.codes.ok)
1315 self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
1317 def test_085_connect_roadmc_PP2_to_xpdrc2_1_N1(self):
1318 response = test_utils.transportpce_api_rpc_request(
1319 'transportpce-networkutils', 'init-rdm-xpdr-links',
1320 {'links-input': {'xpdr-node': 'XPDR-C2', 'xpdr-num': '1', 'network-num': '1',
1321 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
1322 self.assertEqual(response['status_code'], requests.codes.ok)
1323 self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
1325 # test service-create for 400GE service from xpdra2 to xpdrc2
1326 def test_086_create_400GE_service(self):
1327 self.cr_serv_input_data["service-name"] = "service-400GE"
1328 self.cr_serv_input_data["service-a-end"]["service-rate"] = "400"
1329 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1330 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
1331 self.cr_serv_input_data["service-z-end"]["service-rate"] = "400"
1332 del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"]
1333 del self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"]
1334 del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"]
1335 del self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"]
1336 del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"]
1337 del self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"]
1338 del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"]
1339 del self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"]
1340 response = test_utils.transportpce_api_rpc_request(
1341 'org-openroadm-service', 'service-create',
1342 self.cr_serv_input_data)
1343 self.assertEqual(response['status_code'], requests.codes.ok)
1344 time.sleep(self.WAITING)
1345 self.assertIn('PCE calculation in progress',
1346 response['output']['configuration-response-common']['response-message'])
1347 time.sleep(self.WAITING)
1349 def test_087_get_400GE_service(self):
1350 response = test_utils.get_ordm_serv_list_attr_request("services", "service-400GE")
1351 self.assertEqual(response['status_code'], requests.codes.ok)
1352 self.assertEqual(response['services'][0]['administrative-state'], 'inService')
1353 self.assertEqual(response['services'][0]['service-name'], 'service-400GE')
1354 self.assertEqual(response['services'][0]['connection-type'], 'service')
1355 self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
1358 def test_088_check_xc1_roadma(self):
1359 response = test_utils.check_node_attribute_request(
1360 "ROADM-A1", "roadm-connections", "SRG1-PP1-TXRX-DEG2-TTP-TXRX-755:768")
1361 self.assertEqual(response['status_code'], requests.codes.ok)
1362 self.assertDictEqual(
1364 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
1365 'opticalControlMode': 'gainLoss',
1366 'target-output-power': -3.0
1367 }, **response['roadm-connections'][0]), response['roadm-connections'][0])
1368 self.assertDictEqual({'src-if': 'SRG1-PP1-TXRX-nmc-755:768'}, response['roadm-connections'][0]['source'])
1369 self.assertDictEqual({'dst-if': 'DEG2-TTP-TXRX-nmc-755:768'}, response['roadm-connections'][0]['destination'])
1372 def test_089_check_topo_xpdra2(self):
1373 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'XPDR-A2-XPDR1', 'config')
1374 self.assertEqual(response['status_code'], requests.codes.ok)
1375 liste_tp = response['node']['ietf-network-topology:termination-point']
1376 for ele in liste_tp:
1377 if ele['tp-id'] == 'XPDR1-NETWORK1':
1380 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
1383 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
1384 if ele['tp-id'] == 'XPDR1-CLIENT1':
1385 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1388 def test_090_check_topo_roadma_SRG1(self):
1389 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
1390 self.assertEqual(response['status_code'], requests.codes.ok)
1391 freq_map = base64.b64decode(
1392 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
1393 freq_map_array = [int(x) for x in freq_map]
1394 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1395 liste_tp = response['node']['ietf-network-topology:termination-point']
1396 for ele in liste_tp:
1397 if ele['tp-id'] == 'SRG1-PP1-TXRX':
1398 freq_map = base64.b64decode(
1399 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
1400 freq_map_array = [int(x) for x in freq_map]
1401 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1402 if ele['tp-id'] == 'SRG1-PP2-TXRX':
1403 self.assertNotIn('avail-freq-maps', dict.keys(ele))
1406 def test_091_check_topo_roadma_DEG1(self):
1407 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
1408 self.assertEqual(response['status_code'], requests.codes.ok)
1409 freq_map = base64.b64decode(
1410 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1411 freq_map_array = [int(x) for x in freq_map]
1412 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1413 liste_tp = response['node']['ietf-network-topology:termination-point']
1414 for ele in liste_tp:
1415 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1416 freq_map = base64.b64decode(
1417 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1418 freq_map_array = [int(x) for x in freq_map]
1419 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1420 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1421 freq_map = base64.b64decode(
1422 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1423 freq_map_array = [int(x) for x in freq_map]
1424 self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
1427 def test_092_check_interface_400GE_CLIENT_xpdra2(self):
1428 response = test_utils.check_node_attribute_request(
1429 'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1430 self.assertEqual(response['status_code'], requests.codes.ok)
1431 input_dict_1 = {'name': 'XPDR1-CLIENT1-ETHERNET',
1432 'administrative-state': 'inService',
1433 'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
1434 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1435 'supporting-port': 'C1'
1437 input_dict_2 = {'speed': 400000}
1438 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1439 response['interface'][0])
1440 self.assertDictEqual(
1441 dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
1442 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1444 def test_093_check_interface_OTSI_xpdra2(self):
1445 response = test_utils.check_node_attribute_request(
1446 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1447 self.assertEqual(response['status_code'], requests.codes.ok)
1448 input_dict_1 = {'name': 'XPDR1-NETWORK1-755:768',
1449 'administrative-state': 'inService',
1450 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1451 'type': 'org-openroadm-interfaces:otsi',
1452 'supporting-port': 'L1'}
1454 "frequency": 196.0812,
1455 "otsi-rate": "org-openroadm-common-optical-channel-types:R400G-otsi",
1456 "fec": "org-openroadm-common-types:ofec",
1457 "transmit-power": -5,
1458 "provision-mode": "explicit",
1459 "modulation-format": "dp-qam16"}
1461 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1462 response['interface'][0])
1463 self.assertDictEqual(
1464 dict(input_dict_2, **response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']),
1465 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi'])
1466 self.assertDictEqual(
1467 {"foic-type": "org-openroadm-common-optical-channel-types:foic4.8", "iid": [1, 2, 3, 4]},
1468 response['interface'][0]['org-openroadm-optical-tributary-signal-interfaces:otsi']['flexo'])
1470 def test_094_check_interface_OTSI_GROUP_xpdra2(self):
1471 response = test_utils.check_node_attribute_request(
1472 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1473 self.assertEqual(response['status_code'], requests.codes.ok)
1474 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTSIGROUP-400G',
1475 'administrative-state': 'inService',
1476 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1477 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-755:768',
1478 'type': 'org-openroadm-interfaces:otsi-group',
1479 'supporting-port': 'L1'}
1480 input_dict_2 = {"group-id": 1,
1481 "group-rate": "org-openroadm-common-optical-channel-types:R400G-otsi"}
1483 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1484 response['interface'][0])
1485 self.assertDictEqual(dict(input_dict_2,
1486 **response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group']),
1487 response['interface'][0]['org-openroadm-otsi-group-interfaces:otsi-group'])
1489 def test_095_check_interface_OTUC4_xpdra2(self):
1490 response = test_utils.check_node_attribute_request(
1491 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1492 self.assertEqual(response['status_code'], requests.codes.ok)
1493 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTUC4',
1494 'administrative-state': 'inService',
1495 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1496 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTSIGROUP-400G',
1497 'type': 'org-openroadm-interfaces:otnOtu',
1498 'supporting-port': 'L1'}
1499 input_dict_2 = {"tx-sapi": "ANeUjNzWtDLV",
1500 "expected-dapi": "ANeUjNzWtDLV",
1501 'tx-dapi': 'AKsqPmWceByv',
1502 'expected-sapi': 'AKsqPmWceByv',
1503 "rate": "org-openroadm-otn-common-types:OTUCn",
1504 "degthr-percentage": 100,
1505 "tim-detect-mode": "Disabled",
1507 "degm-intervals": 2}
1509 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1510 response['interface'][0])
1511 self.assertDictEqual(dict(input_dict_2,
1512 **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
1513 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
1515 def test_096_check_interface_ODUC4_xpdra2(self):
1516 response = test_utils.check_node_attribute_request(
1517 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1518 self.assertEqual(response['status_code'], requests.codes.ok)
1519 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUC4',
1520 'administrative-state': 'inService',
1521 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1522 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-OTUC4',
1523 'type': 'org-openroadm-interfaces:otnOdu',
1524 'supporting-port': 'L1',
1525 'circuit-id': 'TBD',
1526 'description': 'TBD'}
1527 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP",
1528 "tim-detect-mode": "Disabled",
1529 "degm-intervals": 2,
1530 "degthr-percentage": 100,
1531 "monitoring-mode": "terminated",
1532 "rate": "org-openroadm-otn-common-types:ODUCn",
1534 input_dict_3 = {"exp-payload-type": "22", "payload-type": "22"}
1536 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1537 response['interface'][0])
1538 self.assertDictEqual(dict(input_dict_2,
1539 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1540 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1541 self.assertDictEqual(dict(input_dict_3,
1542 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1543 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1544 self.assertEqual('XPDR1-NETWORK1-OTUC4', response['interface'][0]['supporting-interface-list'][0])
1546 def test_097_check_interface_ODUFLEX_xpdra2(self):
1547 response = test_utils.check_node_attribute_request(
1548 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUFLEX')
1549 self.assertEqual(response['status_code'], requests.codes.ok)
1550 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODUFLEX',
1551 'administrative-state': 'inService',
1552 'supporting-circuit-pack-name': '1/1/2-PLUG-NET',
1553 ['supporting-interface-list'][0]: 'XPDR1-NETWORK1-ODUC4',
1554 'type': 'org-openroadm-interfaces:otnOdu',
1555 'supporting-port': 'L1',
1556 'circuit-id': 'TBD',
1557 'description': 'TBD'}
1558 input_dict_2 = {"odu-function": "org-openroadm-otn-common-types:ODU-TTP-CTP",
1559 "tim-detect-mode": "Disabled",
1560 "degm-intervals": 2,
1561 "degthr-percentage": 100,
1562 "monitoring-mode": "terminated",
1563 "rate": "org-openroadm-otn-common-types:ODUflex-cbr",
1564 "oduflex-cbr-service": "org-openroadm-otn-common-types:ODUflex-cbr-400G"
1566 input_dict_3 = {"exp-payload-type": "32", "payload-type": "32"}
1567 input_dict_4 = {'trib-port-number': 1}
1569 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1570 response['interface'][0])
1571 self.assertDictEqual(dict(input_dict_2,
1572 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1573 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1574 self.assertDictEqual(dict(input_dict_3,
1575 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu']),
1576 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1577 self.assertDictEqual(dict(input_dict_4,
1578 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1579 'parent-odu-allocation']),
1580 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1581 self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1582 ['opucn-trib-slots'])
1583 self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1584 ['opucn-trib-slots'])
1585 self.assertIn('2.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1586 ['opucn-trib-slots'])
1587 self.assertIn('2.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1588 ['opucn-trib-slots'])
1589 self.assertIn('3.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1590 ['opucn-trib-slots'])
1591 self.assertIn('3.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1592 ['opucn-trib-slots'])
1593 self.assertIn('4.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1594 ['opucn-trib-slots'])
1595 self.assertIn('4.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1596 ['opucn-trib-slots'])
1597 self.assertEqual('XPDR1-NETWORK1-ODUC4', response['interface'][0]['supporting-interface-list'][0])
1599 def test_098_delete_400GE_service(self):
1600 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-400GE"
1601 response = test_utils.transportpce_api_rpc_request(
1602 'org-openroadm-service', 'service-delete',
1603 self.del_serv_input_data)
1604 self.assertEqual(response['status_code'], requests.codes.ok)
1605 self.assertIn('Renderer service delete in progress',
1606 response['output']['configuration-response-common']['response-message'])
1607 time.sleep(self.WAITING)
1609 def test_099_get_no_service(self):
1610 response = test_utils.get_ordm_serv_list_request()
1611 self.assertEqual(response['status_code'], requests.codes.conflict)
1612 self.assertIn(response['service-list'], (
1614 "error-type": "protocol",
1615 "error-tag": "data-missing",
1617 "Request could not be completed because the relevant data "
1618 "model content does not exist"
1620 "error-type": "application",
1621 "error-tag": "data-missing",
1623 "Request could not be completed because the relevant data "
1624 "model content does not exist"
1628 def test_100_check_no_interface_ODUC4_xpdra2(self):
1629 response = test_utils.check_node_attribute_request(
1630 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-ODUC4')
1631 self.assertEqual(response['status_code'], requests.codes.conflict)
1633 def test_101_check_no_interface_OTUC4_xpdra2(self):
1634 response = test_utils.check_node_attribute_request(
1635 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTUC4')
1636 self.assertEqual(response['status_code'], requests.codes.conflict)
1638 def test_102_check_no_interface_OTSI_GROUP_xpdra2(self):
1639 response = test_utils.check_node_attribute_request(
1640 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-OTSIGROUP-400G')
1641 self.assertEqual(response['status_code'], requests.codes.conflict)
1643 def test_103_check_no_interface_OTSI_xpdra2(self):
1644 response = test_utils.check_node_attribute_request(
1645 'XPDR-A2', 'interface', 'XPDR1-NETWORK1-755:768')
1646 self.assertEqual(response['status_code'], requests.codes.conflict)
1648 def test_104_check_no_interface_400GE_CLIENT_xpdra2(self):
1649 response = test_utils.check_node_attribute_request(
1650 'XPDR-A2', 'interface', 'XPDR1-CLIENT1-ETHERNET')
1651 self.assertEqual(response['status_code'], requests.codes.conflict)
1653 def test_105_disconnect_xponders_from_roadm(self):
1654 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1655 self.assertEqual(response['status_code'], requests.codes.ok)
1656 links = response['network'][0]['ietf-network-topology:link']
1658 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1659 response = test_utils.del_ietf_network_link_request(
1660 'openroadm-topology', link['link-id'], 'config')
1661 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1663 def test_106_disconnect_xpdra2(self):
1664 response = test_utils.unmount_device("XPDR-A2")
1665 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1667 def test_107_disconnect_xpdrc2(self):
1668 response = test_utils.unmount_device("XPDR-C2")
1669 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1671 def test_108_disconnect_roadmA(self):
1672 response = test_utils.unmount_device("ROADM-A1")
1673 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1675 def test_109_disconnect_roadmC(self):
1676 response = test_utils.unmount_device("ROADM-C1")
1677 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1680 if __name__ == "__main__":
1681 unittest.main(verbosity=2)