3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
26 import test_utils_rfc8040 # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION = '2.2.1'
35 cr_serv_input_data = {
36 "sdnc-request-header": {
37 "request-id": "request-1",
38 "rpc-action": "service-create",
39 "request-system-id": "appname"
41 "service-name": "service1-OCH-OTU4",
42 "common-id": "commonId",
43 "connection-type": "infrastructure",
45 "service-rate": "100",
46 "node-id": "SPDR-SA1",
47 "service-format": "OTU",
48 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
52 "port-device-name": "SPDR-SA1-XPDR1",
54 "port-name": "XPDR1-NETWORK1",
55 "port-rack": "000000.00",
56 "port-shelf": "Chassis#1"
59 "lgx-device-name": "Some lgx-device-name",
60 "lgx-port-name": "Some lgx-port-name",
61 "lgx-port-rack": "000000.00",
62 "lgx-port-shelf": "00"
68 "port-device-name": "SPDR-SA1-XPDR1",
70 "port-name": "XPDR1-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
85 "service-rate": "100",
86 "node-id": "SPDR-SC1",
87 "service-format": "OTU",
88 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92 "port-device-name": "SPDR-SC1-XPDR1",
94 "port-name": "XPDR1-NETWORK1",
95 "port-rack": "000000.00",
96 "port-shelf": "Chassis#1"
99 "lgx-device-name": "Some lgx-device-name",
100 "lgx-port-name": "Some lgx-port-name",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
108 "port-device-name": "SPDR-SC1-XPDR1",
109 "port-type": "fixed",
110 "port-name": "XPDR1-NETWORK1",
111 "port-rack": "000000.00",
112 "port-shelf": "Chassis#1"
115 "lgx-device-name": "Some lgx-device-name",
116 "lgx-port-name": "Some lgx-port-name",
117 "lgx-port-rack": "000000.00",
118 "lgx-port-shelf": "00"
124 "due-date": "2018-06-15T00:00:01Z",
125 "operator-contact": "pw1234"
128 del_serv_input_data = {
129 "sdnc-request-header": {
130 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
131 "rpc-action": "service-delete",
132 "request-system-id": "appname",
133 "notification-url": "http://localhost:8585/NotificationServer/notify"},
134 "service-delete-req-info": {
135 "service-name": "TBD",
136 "tail-retention": "no"}
141 cls.processes = test_utils_rfc8040.start_tpce()
142 cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
143 ('roadma', cls.NODE_VERSION),
144 ('roadmc', cls.NODE_VERSION),
145 ('spdrc', cls.NODE_VERSION)])
148 def tearDownClass(cls):
149 # pylint: disable=not-an-iterable
150 for process in cls.processes:
151 test_utils_rfc8040.shutdown_process(process)
152 print("all processes killed")
157 def test_01_connect_spdrA(self):
158 response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
159 self.assertEqual(response.status_code,
160 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
162 def test_02_connect_spdrC(self):
163 response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
164 self.assertEqual(response.status_code,
165 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
167 def test_03_connect_rdmA(self):
168 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
169 self.assertEqual(response.status_code,
170 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
172 def test_04_connect_rdmC(self):
173 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
174 self.assertEqual(response.status_code,
175 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
177 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
178 response = test_utils_rfc8040.transportpce_api_rpc_request(
179 'transportpce-networkutils', 'init-xpdr-rdm-links',
180 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
181 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
182 self.assertEqual(response['status_code'], requests.codes.ok)
184 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
185 response = test_utils_rfc8040.transportpce_api_rpc_request(
186 'transportpce-networkutils', 'init-rdm-xpdr-links',
187 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
188 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
189 self.assertEqual(response['status_code'], requests.codes.ok)
191 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
192 response = test_utils_rfc8040.transportpce_api_rpc_request(
193 'transportpce-networkutils', 'init-xpdr-rdm-links',
194 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
195 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
196 self.assertEqual(response['status_code'], requests.codes.ok)
198 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
199 response = test_utils_rfc8040.transportpce_api_rpc_request(
200 'transportpce-networkutils', 'init-rdm-xpdr-links',
201 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
202 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
203 self.assertEqual(response['status_code'], requests.codes.ok)
205 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
206 # Config ROADMA-ROADMC oms-attributes
208 "auto-spanloss": "true",
209 "spanloss-base": 11.4,
210 "spanloss-current": 12,
211 "engineered-spanloss": 12.2,
212 "link-concatenation": [{
215 "SRLG-length": 100000,
217 response = test_utils_rfc8040.add_oms_attr_request(
218 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
219 self.assertEqual(response.status_code, requests.codes.created)
221 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
222 # Config ROADMC-ROADMA oms-attributes
224 "auto-spanloss": "true",
225 "spanloss-base": 11.4,
226 "spanloss-current": 12,
227 "engineered-spanloss": 12.2,
228 "link-concatenation": [{
231 "SRLG-length": 100000,
233 response = test_utils_rfc8040.add_oms_attr_request(
234 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
235 self.assertEqual(response.status_code, requests.codes.created)
237 # test service-create for OCH-OTU4 service from spdr to spdr
238 def test_11_check_otn_topology(self):
239 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
240 self.assertEqual(response['status_code'], requests.codes.ok)
241 self.assertEqual(len(response['network'][0]['node']), 6)
242 self.assertNotIn('ietf-network-topology:link', response['network'][0])
244 def test_12_create_OCH_OTU4_service(self):
245 response = test_utils_rfc8040.transportpce_api_rpc_request(
246 'org-openroadm-service', 'service-create',
247 self.cr_serv_input_data)
248 self.assertEqual(response['status_code'], requests.codes.ok)
249 time.sleep(self.WAITING)
251 def test_13_get_OCH_OTU4_service1(self):
252 response = test_utils.get_service_list_request(
253 "services/service1-OCH-OTU4")
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
257 res['services'][0]['administrative-state'], 'inService')
259 res['services'][0]['service-name'], 'service1-OCH-OTU4')
261 res['services'][0]['connection-type'], 'infrastructure')
263 res['services'][0]['lifecycle-state'], 'planned')
266 # Check correct configuration of devices
267 def test_14_check_interface_och_spdra(self):
268 response = test_utils_rfc8040.check_node_attribute_request(
269 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
270 self.assertEqual(response['status_code'], requests.codes.ok)
271 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
272 'administrative-state': 'inService',
273 'supporting-circuit-pack-name': 'CP1-CFP0',
274 'type': 'org-openroadm-interfaces:opticalChannel',
275 'supporting-port': 'CP1-CFP0-P1'
276 }, **response['interface'][0]),
277 response['interface'][0])
278 self.assertEqual('org-openroadm-common-types:R100G',
279 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
280 self.assertEqual('dp-qpsk',
281 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
282 self.assertEqual(196.1,
283 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
286 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
288 def test_15_check_interface_OTU4_spdra(self):
289 response = test_utils_rfc8040.check_node_attribute_request(
290 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
291 self.assertEqual(response['status_code'], requests.codes.ok)
292 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
293 'administrative-state': 'inService',
294 'supporting-circuit-pack-name': 'CP1-CFP0',
295 'supporting-interface': 'XPDR1-NETWORK1-761:768',
296 'type': 'org-openroadm-interfaces:otnOtu',
297 'supporting-port': 'CP1-CFP0-P1'
299 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
300 'expected-dapi': 'H/OelLynehI=',
301 'tx-dapi': 'AMf1n5hK6Xkk',
302 'expected-sapi': 'AMf1n5hK6Xkk',
303 'rate': 'org-openroadm-otn-common-types:OTU4',
306 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
307 response['interface'][0])
308 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
309 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
311 def test_16_check_interface_och_spdrc(self):
312 response = test_utils_rfc8040.check_node_attribute_request(
313 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
314 self.assertEqual(response['status_code'], requests.codes.ok)
315 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
316 'administrative-state': 'inService',
317 'supporting-circuit-pack-name': 'CP1-CFP0',
318 'type': 'org-openroadm-interfaces:opticalChannel',
319 'supporting-port': 'CP1-CFP0-P1'
320 }, **response['interface'][0]),
321 response['interface'][0])
322 self.assertEqual('org-openroadm-common-types:R100G',
323 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
324 self.assertEqual('dp-qpsk',
325 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
326 self.assertEqual(196.1,
327 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
330 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
332 def test_17_check_interface_OTU4_spdrc(self):
333 response = test_utils_rfc8040.check_node_attribute_request(
334 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
335 self.assertEqual(response['status_code'], requests.codes.ok)
336 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
337 'administrative-state': 'inService',
338 'supporting-circuit-pack-name': 'CP1-CFP0',
339 'supporting-interface': 'XPDR1-NETWORK1-761:768',
340 'type': 'org-openroadm-interfaces:otnOtu',
341 'supporting-port': 'CP1-CFP0-P1'
343 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
344 'expected-sapi': 'H/OelLynehI=',
345 'tx-sapi': 'AMf1n5hK6Xkk',
346 'expected-dapi': 'AMf1n5hK6Xkk',
347 'rate': 'org-openroadm-otn-common-types:OTU4',
350 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
351 response['interface'][0])
352 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
353 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
355 def test_18_check_no_interface_ODU4_spdra(self):
356 response = test_utils_rfc8040.check_node_attribute_request(
357 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
358 self.assertEqual(response['status_code'], requests.codes.conflict)
360 def test_19_check_openroadm_topo_spdra(self):
361 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
362 self.assertEqual(response['status_code'], requests.codes.ok)
363 ele = response['node']['ietf-network-topology:termination-point'][0]
364 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
367 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
370 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
372 def test_20_check_openroadm_topo_ROADMA_SRG(self):
373 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
374 self.assertEqual(response['status_code'], requests.codes.ok)
375 freq_map = base64.b64decode(
376 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
377 freq_map_array = [int(x) for x in freq_map]
378 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
379 liste_tp = response['node']['ietf-network-topology:termination-point']
381 if ele['tp-id'] == 'SRG1-PP1-TXRX':
382 freq_map = base64.b64decode(
383 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
384 freq_map_array = [int(x) for x in freq_map]
385 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
386 if ele['tp-id'] == 'SRG1-PP2-TXRX':
387 self.assertNotIn('avail-freq-maps', dict.keys(ele))
389 def test_21_check_openroadm_topo_ROADMA_DEG(self):
390 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
391 self.assertEqual(response['status_code'], requests.codes.ok)
392 freq_map = base64.b64decode(
393 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
394 freq_map_array = [int(x) for x in freq_map]
395 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
396 liste_tp = response['node']['ietf-network-topology:termination-point']
398 if ele['tp-id'] == 'DEG2-CTP-TXRX':
399 freq_map = base64.b64decode(
400 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
401 freq_map_array = [int(x) for x in freq_map]
402 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
403 if ele['tp-id'] == 'DEG2-TTP-TXRX':
404 freq_map = base64.b64decode(
405 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
406 freq_map_array = [int(x) for x in freq_map]
407 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
409 def test_22_check_otn_topo_otu4_links(self):
410 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
411 self.assertEqual(response['status_code'], requests.codes.ok)
412 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
413 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
414 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
415 for link in response['network'][0]['ietf-network-topology:link']:
416 self.assertIn(link['link-id'], listLinkId)
418 link['transportpce-networkutils:otn-link-type'], 'OTU4')
420 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
422 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
424 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
426 link['org-openroadm-common-network:opposite-link'], listLinkId)
428 # test service-create for ODU4 service from spdr to spdr
429 def test_23_create_ODU4_service(self):
430 self.cr_serv_input_data["service-name"] = "service1-ODU4"
431 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
432 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
433 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
434 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
435 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
436 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
438 response = test_utils_rfc8040.transportpce_api_rpc_request(
439 'org-openroadm-service', 'service-create',
440 self.cr_serv_input_data)
441 self.assertEqual(response['status_code'], requests.codes.ok)
442 time.sleep(self.WAITING)
444 def test_24_get_ODU4_service1(self):
445 response = test_utils.get_service_list_request(
446 "services/service1-ODU4")
447 self.assertEqual(response.status_code, requests.codes.ok)
448 res = response.json()
450 res['services'][0]['administrative-state'], 'inService')
452 res['services'][0]['service-name'], 'service1-ODU4')
454 res['services'][0]['connection-type'], 'infrastructure')
456 res['services'][0]['lifecycle-state'], 'planned')
459 def test_25_check_interface_ODU4_spdra(self):
460 response = test_utils_rfc8040.check_node_attribute_request(
461 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
462 self.assertEqual(response['status_code'], requests.codes.ok)
463 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
464 'administrative-state': 'inService',
465 'supporting-circuit-pack-name': 'CP1-CFP0',
466 'supporting-interface': 'XPDR1-NETWORK1-OTU',
467 'type': 'org-openroadm-interfaces:otnOdu',
468 'supporting-port': 'CP1-CFP0-P1',
470 'description': 'TBD'}
471 # SAPI/DAPI are added in the Otu4 renderer
472 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
473 'rate': 'org-openroadm-otn-common-types:ODU4',
474 'monitoring-mode': 'terminated',
475 'expected-dapi': 'H/OelLynehI=',
476 'expected-sapi': 'AMf1n5hK6Xkk',
477 'tx-dapi': 'AMf1n5hK6Xkk',
478 'tx-sapi': 'H/OelLynehI='}
480 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
481 response['interface'][0])
482 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
484 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
486 self.assertDictEqual(
487 {'payload-type': '21', 'exp-payload-type': '21'},
488 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
490 def test_26_check_interface_ODU4_spdrc(self):
491 response = test_utils_rfc8040.check_node_attribute_request(
492 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
493 self.assertEqual(response['status_code'], requests.codes.ok)
494 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
495 'administrative-state': 'inService',
496 'supporting-circuit-pack-name': 'CP1-CFP0',
497 'supporting-interface': 'XPDR1-NETWORK1-OTU',
498 'type': 'org-openroadm-interfaces:otnOdu',
499 'supporting-port': 'CP1-CFP0-P1',
501 'description': 'TBD'}
502 # SAPI/DAPI are added in the Otu4 renderer
503 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
504 'rate': 'org-openroadm-otn-common-types:ODU4',
505 'monitoring-mode': 'terminated',
506 'tx-sapi': 'AMf1n5hK6Xkk',
507 'tx-dapi': 'H/OelLynehI=',
508 'expected-sapi': 'H/OelLynehI=',
509 'expected-dapi': 'AMf1n5hK6Xkk'
511 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
512 response['interface'][0])
513 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
515 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
517 self.assertDictEqual(
518 {'payload-type': '21', 'exp-payload-type': '21'},
519 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
521 def test_27_check_otn_topo_links(self):
522 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
523 self.assertEqual(response['status_code'], requests.codes.ok)
524 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
525 for link in response['network'][0]['ietf-network-topology:link']:
526 linkId = link['link-id']
527 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
528 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
530 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
532 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
533 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
534 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
536 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
538 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
540 link['transportpce-networkutils:otn-link-type'], 'ODTU4')
542 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
543 self.assertIn(link['org-openroadm-common-network:opposite-link'],
544 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
545 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
547 self.fail("this link should not exist")
549 def test_28_check_otn_topo_tp(self):
550 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
551 self.assertEqual(response['status_code'], requests.codes.ok)
552 for node in response['network'][0]['node']:
553 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
554 tpList = node['ietf-network-topology:termination-point']
556 if tp['tp-id'] == 'XPDR1-NETWORK1':
557 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
558 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
560 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
561 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
562 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
564 # test service-create for 10GE service from spdr to spdr
565 def test_29_create_10GE_service(self):
566 self.cr_serv_input_data["service-name"] = "service1-10GE"
567 self.cr_serv_input_data["connection-type"] = "service"
568 self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
569 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
570 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
571 self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
572 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
573 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
574 self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
575 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
576 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
577 self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
578 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
579 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
580 response = test_utils_rfc8040.transportpce_api_rpc_request(
581 'org-openroadm-service', 'service-create',
582 self.cr_serv_input_data)
583 self.assertEqual(response['status_code'], requests.codes.ok)
584 time.sleep(self.WAITING)
586 def test_30_get_10GE_service1(self):
587 response = test_utils.get_service_list_request(
588 "services/service1-10GE")
589 self.assertEqual(response.status_code, requests.codes.ok)
590 res = response.json()
592 res['services'][0]['administrative-state'], 'inService')
594 res['services'][0]['service-name'], 'service1-10GE')
596 res['services'][0]['connection-type'], 'service')
598 res['services'][0]['lifecycle-state'], 'planned')
601 def test_31_check_interface_10GE_CLIENT_spdra(self):
602 response = test_utils_rfc8040.check_node_attribute_request(
603 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
604 self.assertEqual(response['status_code'], requests.codes.ok)
605 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
606 'administrative-state': 'inService',
607 'supporting-circuit-pack-name': 'CP1-SFP4',
608 'type': 'org-openroadm-interfaces:ethernetCsmacd',
609 'supporting-port': 'CP1-SFP4-P1'
611 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
612 response['interface'][0])
615 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
617 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
618 response = test_utils_rfc8040.check_node_attribute_request(
619 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
620 self.assertEqual(response['status_code'], requests.codes.ok)
621 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
622 'administrative-state': 'inService',
623 'supporting-circuit-pack-name': 'CP1-SFP4',
624 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
625 'type': 'org-openroadm-interfaces:otnOdu',
626 'supporting-port': 'CP1-SFP4-P1'}
628 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
629 'rate': 'org-openroadm-otn-common-types:ODU2e',
630 'monitoring-mode': 'terminated'}
632 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
633 response['interface'][0])
634 self.assertDictEqual(dict(input_dict_2,
635 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
636 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
637 self.assertDictEqual(
638 {'payload-type': '03', 'exp-payload-type': '03'},
639 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
641 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
642 response = test_utils_rfc8040.check_node_attribute_request(
643 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
644 self.assertEqual(response['status_code'], requests.codes.ok)
645 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
646 'administrative-state': 'inService',
647 'supporting-circuit-pack-name': 'CP1-CFP0',
648 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
649 'type': 'org-openroadm-interfaces:otnOdu',
650 'supporting-port': 'CP1-CFP0-P1'}
652 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
653 'rate': 'org-openroadm-otn-common-types:ODU2e',
654 'monitoring-mode': 'monitored'}
655 input_dict_3 = {'trib-port-number': 1}
657 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
658 response['interface'][0])
659 self.assertDictEqual(dict(input_dict_2,
660 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
661 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
662 self.assertDictEqual(dict(input_dict_3,
663 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
664 'parent-odu-allocation']),
665 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
666 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
669 def test_34_check_ODU2E_connection_spdra(self):
670 response = test_utils_rfc8040.check_node_attribute_request(
671 'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
672 self.assertEqual(response['status_code'], requests.codes.ok)
675 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
676 'direction': 'bidirectional'
679 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
680 response['odu-connection'][0])
681 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
682 response['odu-connection'][0]['destination'])
683 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
684 response['odu-connection'][0]['source'])
686 def test_35_check_interface_10GE_CLIENT_spdrc(self):
687 response = test_utils_rfc8040.check_node_attribute_request(
688 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
689 self.assertEqual(response['status_code'], requests.codes.ok)
690 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
691 'administrative-state': 'inService',
692 'supporting-circuit-pack-name': 'CP1-SFP4',
693 'type': 'org-openroadm-interfaces:ethernetCsmacd',
694 'supporting-port': 'CP1-SFP4-P1'
696 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
697 response['interface'][0])
700 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
702 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
703 response = test_utils_rfc8040.check_node_attribute_request(
704 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
705 self.assertEqual(response['status_code'], requests.codes.ok)
706 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
707 'administrative-state': 'inService',
708 'supporting-circuit-pack-name': 'CP1-SFP4',
709 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
710 'type': 'org-openroadm-interfaces:otnOdu',
711 'supporting-port': 'CP1-SFP4-P1'}
713 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
714 'rate': 'org-openroadm-otn-common-types:ODU2e',
715 'monitoring-mode': 'terminated'}
717 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
718 response['interface'][0])
719 self.assertDictEqual(dict(input_dict_2,
720 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
721 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
722 self.assertDictEqual(
723 {'payload-type': '03', 'exp-payload-type': '03'},
724 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
726 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
727 response = test_utils_rfc8040.check_node_attribute_request(
728 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
729 self.assertEqual(response['status_code'], requests.codes.ok)
730 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
731 'administrative-state': 'inService',
732 'supporting-circuit-pack-name': 'CP1-CFP0',
733 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
734 'type': 'org-openroadm-interfaces:otnOdu',
735 'supporting-port': 'CP1-CFP0-P1'}
737 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
738 'rate': 'org-openroadm-otn-common-types:ODU2e',
739 'monitoring-mode': 'monitored'}
741 input_dict_3 = {'trib-port-number': 1}
743 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
744 response['interface'][0])
745 self.assertDictEqual(dict(input_dict_2,
746 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
747 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
748 self.assertDictEqual(dict(input_dict_3,
749 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
750 'parent-odu-allocation']),
751 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
752 'parent-odu-allocation'])
754 response['interface'][0][
755 'org-openroadm-otn-odu-interfaces:odu'][
756 'parent-odu-allocation']['trib-slots'])
758 def test_38_check_ODU2E_connection_spdrc(self):
759 response = test_utils_rfc8040.check_node_attribute_request(
760 'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
761 self.assertEqual(response['status_code'], requests.codes.ok)
764 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
765 'direction': 'bidirectional'
768 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
769 response['odu-connection'][0])
770 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
771 response['odu-connection'][0]['destination'])
772 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
773 response['odu-connection'][0]['source'])
775 def test_39_check_otn_topo_links(self):
776 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
777 self.assertEqual(response['status_code'], requests.codes.ok)
778 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
779 for link in response['network'][0]['ietf-network-topology:link']:
780 linkId = link['link-id']
781 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
782 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
784 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
786 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
788 def test_40_check_otn_topo_tp(self):
789 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
790 self.assertEqual(response['status_code'], requests.codes.ok)
791 for node in response['network'][0]['node']:
792 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
793 tpList = node['ietf-network-topology:termination-point']
795 if tp['tp-id'] == 'XPDR1-NETWORK1':
796 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
797 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
798 tsPoolList = list(range(1, 9))
800 tsPoolList, xpdrTpPortConAt['ts-pool'])
802 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
804 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
806 def test_41_delete_10GE_service(self):
807 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
808 response = test_utils_rfc8040.transportpce_api_rpc_request(
809 'org-openroadm-service', 'service-delete',
810 self.del_serv_input_data)
811 self.assertEqual(response['status_code'], requests.codes.ok)
812 time.sleep(self.WAITING)
814 def test_42_check_service_list(self):
815 response = test_utils.get_service_list_request("")
816 self.assertEqual(response.status_code, requests.codes.ok)
817 res = response.json()
818 self.assertEqual(len(res['service-list']['services']), 2)
821 def test_43_check_no_ODU2e_connection_spdra(self):
822 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
823 self.assertEqual(response.status_code, requests.codes.ok)
824 res = response.json()
825 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
828 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
829 response = test_utils_rfc8040.check_node_attribute_request(
830 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
831 self.assertEqual(response['status_code'], requests.codes.conflict)
833 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
834 response = test_utils_rfc8040.check_node_attribute_request(
835 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
836 self.assertEqual(response['status_code'], requests.codes.conflict)
838 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
839 response = test_utils_rfc8040.check_node_attribute_request(
840 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
841 self.assertEqual(response['status_code'], requests.codes.conflict)
843 def test_47_check_otn_topo_links(self):
844 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
845 self.assertEqual(response['status_code'], requests.codes.ok)
846 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
847 for link in response['network'][0]['ietf-network-topology:link']:
848 linkId = link['link-id']
849 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
850 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
852 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
854 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
856 def test_48_check_otn_topo_tp(self):
857 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
858 self.assertEqual(response['status_code'], requests.codes.ok)
859 for node in response['network'][0]['node']:
860 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
861 tpList = node['ietf-network-topology:termination-point']
863 if tp['tp-id'] == 'XPDR1-NETWORK1':
864 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
865 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
867 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
869 def test_49_delete_ODU4_service(self):
870 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
871 response = test_utils_rfc8040.transportpce_api_rpc_request(
872 'org-openroadm-service', 'service-delete',
873 self.del_serv_input_data)
874 self.assertEqual(response['status_code'], requests.codes.ok)
875 time.sleep(self.WAITING)
877 def test_50_check_service_list(self):
878 response = test_utils.get_service_list_request("")
879 self.assertEqual(response.status_code, requests.codes.ok)
880 res = response.json()
881 self.assertEqual(len(res['service-list']['services']), 1)
884 def test_51_check_no_interface_ODU4_spdra(self):
885 response = test_utils_rfc8040.check_node_attribute_request(
886 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
887 self.assertEqual(response['status_code'], requests.codes.conflict)
889 def test_52_check_otn_topo_links(self):
890 self.test_22_check_otn_topo_otu4_links()
892 def test_53_check_otn_topo_tp(self):
893 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
894 self.assertEqual(response['status_code'], requests.codes.ok)
895 for node in response['network'][0]['node']:
896 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
897 tpList = node['ietf-network-topology:termination-point']
899 if tp['tp-id'] == 'XPDR1-NETWORK1':
900 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
901 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
903 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
905 def test_54_delete_OCH_OTU4_service(self):
906 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
907 response = test_utils_rfc8040.transportpce_api_rpc_request(
908 'org-openroadm-service', 'service-delete',
909 self.del_serv_input_data)
910 self.assertEqual(response['status_code'], requests.codes.ok)
911 time.sleep(self.WAITING)
913 def test_55_get_no_service(self):
914 response = test_utils.get_service_list_request("")
915 self.assertEqual(response.status_code, requests.codes.conflict)
916 res = response.json()
918 {"error-type": "application", "error-tag": "data-missing",
919 "error-message": "Request could not be completed because the relevant data model content does not exist"},
920 res['errors']['error'])
923 def test_56_check_no_interface_OTU4_spdra(self):
924 response = test_utils_rfc8040.check_node_attribute_request(
925 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
926 self.assertEqual(response['status_code'], requests.codes.conflict)
928 def test_57_check_no_interface_OCH_spdra(self):
929 response = test_utils_rfc8040.check_node_attribute_request(
930 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
931 self.assertEqual(response['status_code'], requests.codes.conflict)
933 def test_58_getLinks_OtnTopology(self):
934 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
935 self.assertEqual(response['status_code'], requests.codes.ok)
936 self.assertNotIn('ietf-network-topology:link', response['network'][0])
938 def test_59_check_openroadm_topo_spdra(self):
939 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
940 self.assertEqual(response['status_code'], requests.codes.ok)
941 tp = response['node']['ietf-network-topology:termination-point'][0]
942 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
943 self.assertNotIn('wavelength', dict.keys(
944 tp['org-openroadm-network-topology:xpdr-network-attributes']))
946 def test_60_check_openroadm_topo_ROADMA_SRG(self):
947 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
948 self.assertEqual(response['status_code'], requests.codes.ok)
949 freq_map = base64.b64decode(
950 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
951 freq_map_array = [int(x) for x in freq_map]
952 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
953 liste_tp = response['node']['ietf-network-topology:termination-point']
955 if ele['tp-id'] == 'SRG1-PP1-TXRX':
956 freq_map = base64.b64decode(
957 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
958 freq_map_array = [int(x) for x in freq_map]
959 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
962 def test_61_check_openroadm_topo_ROADMA_DEG(self):
963 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
964 self.assertEqual(response['status_code'], requests.codes.ok)
965 freq_map = base64.b64decode(
966 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
967 freq_map_array = [int(x) for x in freq_map]
968 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
969 liste_tp = response['node']['ietf-network-topology:termination-point']
971 if ele['tp-id'] == 'DEG2-CTP-TXRX':
972 freq_map = base64.b64decode(
973 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
974 freq_map_array = [int(x) for x in freq_map]
975 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
976 if ele['tp-id'] == 'DEG2-TTP-TXRX':
977 freq_map = base64.b64decode(
978 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
979 freq_map_array = [int(x) for x in freq_map]
980 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
983 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
984 response = test_utils_rfc8040.transportpce_api_rpc_request(
985 'transportpce-networkutils', 'init-xpdr-rdm-links',
986 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
987 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
988 self.assertEqual(response['status_code'], requests.codes.ok)
990 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
991 response = test_utils_rfc8040.transportpce_api_rpc_request(
992 'transportpce-networkutils', 'init-rdm-xpdr-links',
993 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
994 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
995 self.assertEqual(response['status_code'], requests.codes.ok)
997 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
998 response = test_utils_rfc8040.transportpce_api_rpc_request(
999 'transportpce-networkutils', 'init-xpdr-rdm-links',
1000 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1001 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1002 self.assertEqual(response['status_code'], requests.codes.ok)
1004 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1005 response = test_utils_rfc8040.transportpce_api_rpc_request(
1006 'transportpce-networkutils', 'init-rdm-xpdr-links',
1007 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1008 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1009 self.assertEqual(response['status_code'], requests.codes.ok)
1011 def test_66_create_OCH_OTU4_service_2(self):
1012 # pylint: disable=line-too-long
1013 self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
1014 self.cr_serv_input_data["connection-type"] = "infrastructure"
1015 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
1016 self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
1017 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1018 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1019 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1020 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1021 self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1022 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1023 self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1024 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1025 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1026 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1027 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1028 self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1029 response = test_utils_rfc8040.transportpce_api_rpc_request(
1030 'org-openroadm-service', 'service-create',
1031 self.cr_serv_input_data)
1032 self.assertEqual(response['status_code'], requests.codes.ok)
1033 time.sleep(self.WAITING)
1035 def test_67_get_OCH_OTU4_service2(self):
1036 response = test_utils.get_service_list_request(
1037 "services/service2-OCH-OTU4")
1038 self.assertEqual(response.status_code, requests.codes.ok)
1039 res = response.json()
1041 res['services'][0]['administrative-state'], 'inService')
1043 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1045 res['services'][0]['connection-type'], 'infrastructure')
1047 res['services'][0]['lifecycle-state'], 'planned')
1050 def test_68_create_ODU4_service_2(self):
1051 # pylint: disable=line-too-long
1052 self.cr_serv_input_data["service-name"] = "service2-ODU4"
1053 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
1054 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1055 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1056 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1057 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1058 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1059 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1060 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
1061 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1062 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1063 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1064 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1065 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1066 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1067 response = test_utils_rfc8040.transportpce_api_rpc_request(
1068 'org-openroadm-service', 'service-create',
1069 self.cr_serv_input_data)
1070 self.assertEqual(response['status_code'], requests.codes.ok)
1071 time.sleep(self.WAITING)
1073 def test_69_get_ODU4_service2(self):
1074 response = test_utils.get_service_list_request(
1075 "services/service2-ODU4")
1076 self.assertEqual(response.status_code, requests.codes.ok)
1077 res = response.json()
1079 res['services'][0]['administrative-state'], 'inService')
1081 res['services'][0]['service-name'], 'service2-ODU4')
1083 res['services'][0]['connection-type'], 'infrastructure')
1085 res['services'][0]['lifecycle-state'], 'planned')
1088 def test_70_create_1GE_service(self):
1089 # pylint: disable=line-too-long
1090 self.cr_serv_input_data["service-name"] = "service1-1GE"
1091 self.cr_serv_input_data["connection-type"] = "service"
1092 self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
1093 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1094 self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
1095 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1096 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
1097 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1098 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1099 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1100 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1101 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
1102 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1103 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1104 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1105 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1106 response = test_utils_rfc8040.transportpce_api_rpc_request(
1107 'org-openroadm-service', 'service-create',
1108 self.cr_serv_input_data)
1109 self.assertEqual(response['status_code'], requests.codes.ok)
1110 time.sleep(self.WAITING)
1112 def test_71_get_1GE_service1(self):
1113 response = test_utils.get_service_list_request("services/service1-1GE")
1114 self.assertEqual(response.status_code, requests.codes.ok)
1115 res = response.json()
1117 res['services'][0]['administrative-state'], 'inService')
1119 res['services'][0]['service-name'], 'service1-1GE')
1121 res['services'][0]['connection-type'], 'service')
1123 res['services'][0]['lifecycle-state'], 'planned')
1126 def test_72_check_interface_1GE_CLIENT_spdra(self):
1127 response = test_utils_rfc8040.check_node_attribute_request(
1128 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1129 self.assertEqual(response['status_code'], requests.codes.ok)
1130 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1131 'administrative-state': 'inService',
1132 'supporting-circuit-pack-name': 'CP1-SFP4',
1133 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1134 'supporting-port': 'CP1-SFP4-P1'
1136 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1137 response['interface'][0])
1140 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1142 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1143 response = test_utils_rfc8040.check_node_attribute_request(
1144 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1145 self.assertEqual(response['status_code'], requests.codes.ok)
1146 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1147 'administrative-state': 'inService',
1148 'supporting-circuit-pack-name': 'CP3-SFP1',
1149 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1150 'type': 'org-openroadm-interfaces:otnOdu',
1151 'supporting-port': 'CP3-SFP1-P1'}
1153 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1154 'rate': 'org-openroadm-otn-common-types:ODU0',
1155 'monitoring-mode': 'terminated'}
1156 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1157 response['interface'][0])
1158 self.assertDictEqual(dict(input_dict_2,
1159 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1160 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1161 self.assertDictEqual(
1162 {'payload-type': '07', 'exp-payload-type': '07'},
1163 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1165 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1166 response = test_utils_rfc8040.check_node_attribute_request(
1167 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1168 self.assertEqual(response['status_code'], requests.codes.ok)
1169 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1170 'administrative-state': 'inService',
1171 'supporting-circuit-pack-name': 'CP3-CFP0',
1172 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1173 'type': 'org-openroadm-interfaces:otnOdu',
1174 'supporting-port': 'CP3-CFP0-P1'}
1176 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1177 'rate': 'org-openroadm-otn-common-types:ODU0',
1178 'monitoring-mode': 'monitored'}
1179 input_dict_3 = {'trib-port-number': 1}
1180 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1181 response['interface'][0])
1182 self.assertDictEqual(dict(input_dict_2,
1183 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1184 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1185 self.assertDictEqual(dict(input_dict_3,
1186 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1187 'parent-odu-allocation']),
1188 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1189 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1192 def test_75_check_ODU0_connection_spdra(self):
1193 response = test_utils_rfc8040.check_node_attribute_request(
1194 'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1195 self.assertEqual(response['status_code'], requests.codes.ok)
1198 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1199 'direction': 'bidirectional'
1201 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1202 response['odu-connection'][0])
1203 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1204 response['odu-connection'][0]['destination'])
1205 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1206 response['odu-connection'][0]['source'])
1208 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1209 response = test_utils_rfc8040.check_node_attribute_request(
1210 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1211 self.assertEqual(response['status_code'], requests.codes.ok)
1212 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1213 'administrative-state': 'inService',
1214 'supporting-circuit-pack-name': 'CP3-SFP1',
1215 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1216 'supporting-port': 'CP3-SFP1-P1'
1218 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1219 response['interface'][0])
1222 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1224 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1225 response = test_utils_rfc8040.check_node_attribute_request(
1226 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1227 self.assertEqual(response['status_code'], requests.codes.ok)
1228 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1229 'administrative-state': 'inService',
1230 'supporting-circuit-pack-name': 'CP3-SFP1',
1231 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1232 'type': 'org-openroadm-interfaces:otnOdu',
1233 'supporting-port': 'CP3-SFP1-P1'}
1235 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1236 'rate': 'org-openroadm-otn-common-types:ODU0',
1237 'monitoring-mode': 'terminated'}
1238 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1239 response['interface'][0])
1240 self.assertDictEqual(dict(input_dict_2,
1241 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1242 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1243 self.assertDictEqual(
1244 {'payload-type': '07', 'exp-payload-type': '07'},
1245 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1247 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1248 response = test_utils_rfc8040.check_node_attribute_request(
1249 'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1250 self.assertEqual(response['status_code'], requests.codes.ok)
1251 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1252 'administrative-state': 'inService',
1253 'supporting-circuit-pack-name': 'CP3-CFP0',
1254 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1255 'type': 'org-openroadm-interfaces:otnOdu',
1256 'supporting-port': 'CP3-CFP0-P1'}
1258 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1259 'rate': 'org-openroadm-otn-common-types:ODU0',
1260 'monitoring-mode': 'monitored'}
1261 input_dict_3 = {'trib-port-number': 1}
1262 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1263 response['interface'][0])
1264 self.assertDictEqual(dict(input_dict_2,
1265 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1266 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1267 self.assertDictEqual(dict(input_dict_3,
1268 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1269 'parent-odu-allocation']),
1270 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1271 'parent-odu-allocation'])
1273 response['interface'][0][
1274 'org-openroadm-otn-odu-interfaces:odu'][
1275 'parent-odu-allocation']['trib-slots'])
1277 def test_79_check_ODU0_connection_spdrc(self):
1278 response = test_utils_rfc8040.check_node_attribute_request(
1279 'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1280 self.assertEqual(response['status_code'], requests.codes.ok)
1283 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1284 'direction': 'bidirectional'
1286 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1287 response['odu-connection'][0])
1288 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1289 response['odu-connection'][0]['destination'])
1290 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1291 response['odu-connection'][0]['source'])
1293 def test_80_check_otn_topo_links(self):
1294 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1295 self.assertEqual(response['status_code'], requests.codes.ok)
1296 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1297 for link in response['network'][0]['ietf-network-topology:link']:
1298 linkId = link['link-id']
1299 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1300 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1302 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1304 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1306 def test_81_check_otn_topo_tp(self):
1307 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1308 self.assertEqual(response['status_code'], requests.codes.ok)
1309 for node in response['network'][0]['node']:
1310 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1311 tpList = node['ietf-network-topology:termination-point']
1313 if tp['tp-id'] == 'XPDR3-NETWORK1':
1314 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1315 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1316 tsPoolList = list(range(1, 2))
1318 tsPoolList, xpdrTpPortConAt['ts-pool'])
1320 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1322 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1324 def test_82_delete_1GE_service(self):
1325 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
1326 response = test_utils_rfc8040.transportpce_api_rpc_request(
1327 'org-openroadm-service', 'service-delete',
1328 self.del_serv_input_data)
1329 self.assertEqual(response['status_code'], requests.codes.ok)
1330 time.sleep(self.WAITING)
1332 def test_83_check_service_list(self):
1333 response = test_utils.get_service_list_request("")
1334 self.assertEqual(response.status_code, requests.codes.ok)
1335 res = response.json()
1336 self.assertEqual(len(res['service-list']['services']), 2)
1339 def test_84_check_no_ODU0_connection_spdra(self):
1340 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1341 self.assertEqual(response.status_code, requests.codes.ok)
1342 res = response.json()
1343 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1346 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1347 response = test_utils_rfc8040.check_node_attribute_request(
1348 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1349 self.assertEqual(response['status_code'], requests.codes.conflict)
1351 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1352 response = test_utils_rfc8040.check_node_attribute_request(
1353 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1354 self.assertEqual(response['status_code'], requests.codes.conflict)
1356 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1357 response = test_utils_rfc8040.check_node_attribute_request(
1358 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1359 self.assertEqual(response['status_code'], requests.codes.conflict)
1361 def test_88_check_otn_topo_links(self):
1362 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1363 self.assertEqual(response['status_code'], requests.codes.ok)
1364 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1365 for link in response['network'][0]['ietf-network-topology:link']:
1366 linkId = link['link-id']
1367 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1368 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1370 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1372 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1374 def test_89_check_otn_topo_tp(self):
1375 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1376 self.assertEqual(response['status_code'], requests.codes.ok)
1377 for node in response['network'][0]['node']:
1378 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1379 tpList = node['ietf-network-topology:termination-point']
1381 if tp['tp-id'] == 'XPDR3-NETWORK1':
1382 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1383 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1385 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1387 def test_90_delete_ODU4_service(self):
1388 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
1389 response = test_utils_rfc8040.transportpce_api_rpc_request(
1390 'org-openroadm-service', 'service-delete',
1391 self.del_serv_input_data)
1392 self.assertEqual(response['status_code'], requests.codes.ok)
1393 time.sleep(self.WAITING)
1395 def test_91_delete_OCH_OTU4_service(self):
1396 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
1397 response = test_utils_rfc8040.transportpce_api_rpc_request(
1398 'org-openroadm-service', 'service-delete',
1399 self.del_serv_input_data)
1400 self.assertEqual(response['status_code'], requests.codes.ok)
1401 time.sleep(self.WAITING)
1403 def test_92_disconnect_xponders_from_roadm(self):
1404 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1405 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1406 self.assertEqual(response['status_code'], requests.codes.ok)
1407 links = response['network'][0]['ietf-network-topology:link']
1409 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1410 link_name = link["link-id"]
1411 response = test_utils.delete_request(url+link_name)
1412 self.assertEqual(response.status_code, requests.codes.ok)
1414 def test_93_check_openroadm_topology(self):
1415 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1416 self.assertEqual(response['status_code'], requests.codes.ok)
1417 self.assertEqual(18,
1418 len(response['network'][0]['ietf-network-topology:link']),
1419 'Topology should contain 18 links')
1421 def test_94_disconnect_spdrA(self):
1422 response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1423 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1425 def test_95_disconnect_spdrC(self):
1426 response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1427 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1429 def test_96_disconnect_roadmA(self):
1430 response = test_utils_rfc8040.unmount_device("ROADM-A1")
1431 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1433 def test_97_disconnect_roadmC(self):
1434 response = test_utils_rfc8040.unmount_device("ROADM-C1")
1435 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1438 if __name__ == "__main__":
1439 unittest.main(verbosity=2)