3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION = '2.2.1'
34 cr_serv_input_data = {
35 "sdnc-request-header": {
36 "request-id": "request-1",
37 "rpc-action": "service-create",
38 "request-system-id": "appname"
40 "service-name": "service1-OCH-OTU4",
41 "common-id": "commonId",
42 "connection-type": "infrastructure",
44 "service-rate": "100",
45 "node-id": "SPDR-SA1",
46 "service-format": "OTU",
47 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
51 "port-device-name": "SPDR-SA1-XPDR1",
53 "port-name": "XPDR1-NETWORK1",
54 "port-rack": "000000.00",
55 "port-shelf": "Chassis#1"
58 "lgx-device-name": "Some lgx-device-name",
59 "lgx-port-name": "Some lgx-port-name",
60 "lgx-port-rack": "000000.00",
61 "lgx-port-shelf": "00"
67 "port-device-name": "SPDR-SA1-XPDR1",
69 "port-name": "XPDR1-NETWORK1",
70 "port-rack": "000000.00",
71 "port-shelf": "Chassis#1"
74 "lgx-device-name": "Some lgx-device-name",
75 "lgx-port-name": "Some lgx-port-name",
76 "lgx-port-rack": "000000.00",
77 "lgx-port-shelf": "00"
84 "service-rate": "100",
85 "node-id": "SPDR-SC1",
86 "service-format": "OTU",
87 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
91 "port-device-name": "SPDR-SC1-XPDR1",
93 "port-name": "XPDR1-NETWORK1",
94 "port-rack": "000000.00",
95 "port-shelf": "Chassis#1"
98 "lgx-device-name": "Some lgx-device-name",
99 "lgx-port-name": "Some lgx-port-name",
100 "lgx-port-rack": "000000.00",
101 "lgx-port-shelf": "00"
107 "port-device-name": "SPDR-SC1-XPDR1",
108 "port-type": "fixed",
109 "port-name": "XPDR1-NETWORK1",
110 "port-rack": "000000.00",
111 "port-shelf": "Chassis#1"
114 "lgx-device-name": "Some lgx-device-name",
115 "lgx-port-name": "Some lgx-port-name",
116 "lgx-port-rack": "000000.00",
117 "lgx-port-shelf": "00"
123 "due-date": "2018-06-15T00:00:01Z",
124 "operator-contact": "pw1234"
127 del_serv_input_data = {
128 "sdnc-request-header": {
129 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
130 "rpc-action": "service-delete",
131 "request-system-id": "appname",
132 "notification-url": "http://localhost:8585/NotificationServer/notify"},
133 "service-delete-req-info": {
134 "service-name": "TBD",
135 "tail-retention": "no"}
140 cls.processes = test_utils.start_tpce()
141 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
142 ('roadma', cls.NODE_VERSION),
143 ('roadmc', cls.NODE_VERSION),
144 ('spdrc', cls.NODE_VERSION)])
147 def tearDownClass(cls):
148 # pylint: disable=not-an-iterable
149 for process in cls.processes:
150 test_utils.shutdown_process(process)
151 print("all processes killed")
156 def test_01_connect_spdrA(self):
157 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
158 self.assertEqual(response.status_code,
159 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
161 def test_02_connect_spdrC(self):
162 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
163 self.assertEqual(response.status_code,
164 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
166 def test_03_connect_rdmA(self):
167 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
168 self.assertEqual(response.status_code,
169 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
171 def test_04_connect_rdmC(self):
172 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
173 self.assertEqual(response.status_code,
174 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
176 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
177 response = test_utils.transportpce_api_rpc_request(
178 'transportpce-networkutils', 'init-xpdr-rdm-links',
179 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
180 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
181 self.assertEqual(response['status_code'], requests.codes.ok)
183 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
184 response = test_utils.transportpce_api_rpc_request(
185 'transportpce-networkutils', 'init-rdm-xpdr-links',
186 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
187 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
188 self.assertEqual(response['status_code'], requests.codes.ok)
190 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
191 response = test_utils.transportpce_api_rpc_request(
192 'transportpce-networkutils', 'init-xpdr-rdm-links',
193 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
194 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
195 self.assertEqual(response['status_code'], requests.codes.ok)
197 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
198 response = test_utils.transportpce_api_rpc_request(
199 'transportpce-networkutils', 'init-rdm-xpdr-links',
200 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
201 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
202 self.assertEqual(response['status_code'], requests.codes.ok)
204 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
205 # Config ROADMA-ROADMC oms-attributes
207 "auto-spanloss": "true",
208 "spanloss-base": 11.4,
209 "spanloss-current": 12,
210 "engineered-spanloss": 12.2,
211 "link-concatenation": [{
214 "SRLG-length": 100000,
216 response = test_utils.add_oms_attr_request(
217 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
218 self.assertEqual(response.status_code, requests.codes.created)
220 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
221 # Config ROADMC-ROADMA oms-attributes
223 "auto-spanloss": "true",
224 "spanloss-base": 11.4,
225 "spanloss-current": 12,
226 "engineered-spanloss": 12.2,
227 "link-concatenation": [{
230 "SRLG-length": 100000,
232 response = test_utils.add_oms_attr_request(
233 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
234 self.assertEqual(response.status_code, requests.codes.created)
236 # test service-create for OCH-OTU4 service from spdr to spdr
237 def test_11_check_otn_topology(self):
238 response = test_utils.get_ietf_network_request('otn-topology', 'config')
239 self.assertEqual(response['status_code'], requests.codes.ok)
240 self.assertEqual(len(response['network'][0]['node']), 6)
241 self.assertNotIn('ietf-network-topology:link', response['network'][0])
243 def test_12_create_OCH_OTU4_service(self):
244 response = test_utils.transportpce_api_rpc_request(
245 'org-openroadm-service', 'service-create',
246 self.cr_serv_input_data)
247 self.assertEqual(response['status_code'], requests.codes.ok)
248 time.sleep(self.WAITING)
250 def test_13_get_OCH_OTU4_service1(self):
251 response = test_utils.get_ordm_serv_list_attr_request(
252 "services", "service1-OCH-OTU4")
253 self.assertEqual(response['status_code'], requests.codes.ok)
255 response['services'][0]['administrative-state'], 'inService')
257 response['services'][0]['service-name'], 'service1-OCH-OTU4')
259 response['services'][0]['connection-type'], 'infrastructure')
261 response['services'][0]['lifecycle-state'], 'planned')
263 # Check correct configuration of devices
264 def test_14_check_interface_och_spdra(self):
265 response = test_utils.check_node_attribute_request(
266 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
267 self.assertEqual(response['status_code'], requests.codes.ok)
268 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
269 'administrative-state': 'inService',
270 'supporting-circuit-pack-name': 'CP1-CFP0',
271 'type': 'org-openroadm-interfaces:opticalChannel',
272 'supporting-port': 'CP1-CFP0-P1'
273 }, **response['interface'][0]),
274 response['interface'][0])
275 self.assertEqual('org-openroadm-common-types:R100G',
276 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
277 self.assertEqual('dp-qpsk',
278 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
279 self.assertEqual(196.1,
280 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
283 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
285 def test_15_check_interface_OTU4_spdra(self):
286 response = test_utils.check_node_attribute_request(
287 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
288 self.assertEqual(response['status_code'], requests.codes.ok)
289 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
290 'administrative-state': 'inService',
291 'supporting-circuit-pack-name': 'CP1-CFP0',
292 'supporting-interface': 'XPDR1-NETWORK1-761:768',
293 'type': 'org-openroadm-interfaces:otnOtu',
294 'supporting-port': 'CP1-CFP0-P1'
296 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
297 'expected-dapi': 'H/OelLynehI=',
298 'tx-dapi': 'AMf1n5hK6Xkk',
299 'expected-sapi': 'AMf1n5hK6Xkk',
300 'rate': 'org-openroadm-otn-common-types:OTU4',
303 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
304 response['interface'][0])
305 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
306 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
308 def test_16_check_interface_och_spdrc(self):
309 response = test_utils.check_node_attribute_request(
310 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
311 self.assertEqual(response['status_code'], requests.codes.ok)
312 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
313 'administrative-state': 'inService',
314 'supporting-circuit-pack-name': 'CP1-CFP0',
315 'type': 'org-openroadm-interfaces:opticalChannel',
316 'supporting-port': 'CP1-CFP0-P1'
317 }, **response['interface'][0]),
318 response['interface'][0])
319 self.assertEqual('org-openroadm-common-types:R100G',
320 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
321 self.assertEqual('dp-qpsk',
322 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
323 self.assertEqual(196.1,
324 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
327 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
329 def test_17_check_interface_OTU4_spdrc(self):
330 response = test_utils.check_node_attribute_request(
331 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
332 self.assertEqual(response['status_code'], requests.codes.ok)
333 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
334 'administrative-state': 'inService',
335 'supporting-circuit-pack-name': 'CP1-CFP0',
336 'supporting-interface': 'XPDR1-NETWORK1-761:768',
337 'type': 'org-openroadm-interfaces:otnOtu',
338 'supporting-port': 'CP1-CFP0-P1'
340 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
341 'expected-sapi': 'H/OelLynehI=',
342 'tx-sapi': 'AMf1n5hK6Xkk',
343 'expected-dapi': 'AMf1n5hK6Xkk',
344 'rate': 'org-openroadm-otn-common-types:OTU4',
347 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
348 response['interface'][0])
349 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
350 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
352 def test_18_check_no_interface_ODU4_spdra(self):
353 response = test_utils.check_node_attribute_request(
354 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
355 self.assertEqual(response['status_code'], requests.codes.conflict)
357 def test_19_check_openroadm_topo_spdra(self):
358 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
359 self.assertEqual(response['status_code'], requests.codes.ok)
360 ele = response['node']['ietf-network-topology:termination-point'][0]
361 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
364 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
367 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
369 def test_20_check_openroadm_topo_ROADMA_SRG(self):
370 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
371 self.assertEqual(response['status_code'], requests.codes.ok)
372 freq_map = base64.b64decode(
373 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
374 freq_map_array = [int(x) for x in freq_map]
375 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
376 liste_tp = response['node']['ietf-network-topology:termination-point']
378 if ele['tp-id'] == 'SRG1-PP1-TXRX':
379 freq_map = base64.b64decode(
380 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
381 freq_map_array = [int(x) for x in freq_map]
382 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
383 if ele['tp-id'] == 'SRG1-PP2-TXRX':
384 self.assertNotIn('avail-freq-maps', dict.keys(ele))
386 def test_21_check_openroadm_topo_ROADMA_DEG(self):
387 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
388 self.assertEqual(response['status_code'], requests.codes.ok)
389 freq_map = base64.b64decode(
390 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
391 freq_map_array = [int(x) for x in freq_map]
392 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
393 liste_tp = response['node']['ietf-network-topology:termination-point']
395 if ele['tp-id'] == 'DEG2-CTP-TXRX':
396 freq_map = base64.b64decode(
397 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
398 freq_map_array = [int(x) for x in freq_map]
399 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
400 if ele['tp-id'] == 'DEG2-TTP-TXRX':
401 freq_map = base64.b64decode(
402 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
403 freq_map_array = [int(x) for x in freq_map]
404 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
406 def test_22_check_otn_topo_otu4_links(self):
407 response = test_utils.get_ietf_network_request('otn-topology', 'config')
408 self.assertEqual(response['status_code'], requests.codes.ok)
409 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
410 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
411 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
412 for link in response['network'][0]['ietf-network-topology:link']:
413 self.assertIn(link['link-id'], listLinkId)
415 link['transportpce-networkutils:otn-link-type'], 'OTU4')
417 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
419 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
421 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
423 link['org-openroadm-common-network:opposite-link'], listLinkId)
425 # test service-create for ODU4 service from spdr to spdr
426 def test_23_create_ODU4_service(self):
427 self.cr_serv_input_data["service-name"] = "service1-ODU4"
428 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
429 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
430 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
431 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
432 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
433 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
435 response = test_utils.transportpce_api_rpc_request(
436 'org-openroadm-service', 'service-create',
437 self.cr_serv_input_data)
438 self.assertEqual(response['status_code'], requests.codes.ok)
439 time.sleep(self.WAITING)
441 def test_24_get_ODU4_service1(self):
442 response = test_utils.get_ordm_serv_list_attr_request(
443 "services", "service1-ODU4")
444 self.assertEqual(response['status_code'], requests.codes.ok)
446 response['services'][0]['administrative-state'], 'inService')
448 response['services'][0]['service-name'], 'service1-ODU4')
450 response['services'][0]['connection-type'], 'infrastructure')
452 response['services'][0]['lifecycle-state'], 'planned')
454 def test_25_check_interface_ODU4_spdra(self):
455 response = test_utils.check_node_attribute_request(
456 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
457 self.assertEqual(response['status_code'], requests.codes.ok)
458 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
459 'administrative-state': 'inService',
460 'supporting-circuit-pack-name': 'CP1-CFP0',
461 'supporting-interface': 'XPDR1-NETWORK1-OTU',
462 'type': 'org-openroadm-interfaces:otnOdu',
463 'supporting-port': 'CP1-CFP0-P1',
465 'description': 'TBD'}
466 # SAPI/DAPI are added in the Otu4 renderer
467 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
468 'rate': 'org-openroadm-otn-common-types:ODU4',
469 'monitoring-mode': 'terminated',
470 'expected-dapi': 'H/OelLynehI=',
471 'expected-sapi': 'AMf1n5hK6Xkk',
472 'tx-dapi': 'AMf1n5hK6Xkk',
473 'tx-sapi': 'H/OelLynehI='}
475 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
476 response['interface'][0])
477 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
479 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
481 self.assertDictEqual(
482 {'payload-type': '21', 'exp-payload-type': '21'},
483 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
485 def test_26_check_interface_ODU4_spdrc(self):
486 response = test_utils.check_node_attribute_request(
487 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
488 self.assertEqual(response['status_code'], requests.codes.ok)
489 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
490 'administrative-state': 'inService',
491 'supporting-circuit-pack-name': 'CP1-CFP0',
492 'supporting-interface': 'XPDR1-NETWORK1-OTU',
493 'type': 'org-openroadm-interfaces:otnOdu',
494 'supporting-port': 'CP1-CFP0-P1',
496 'description': 'TBD'}
497 # SAPI/DAPI are added in the Otu4 renderer
498 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
499 'rate': 'org-openroadm-otn-common-types:ODU4',
500 'monitoring-mode': 'terminated',
501 'tx-sapi': 'AMf1n5hK6Xkk',
502 'tx-dapi': 'H/OelLynehI=',
503 'expected-sapi': 'H/OelLynehI=',
504 'expected-dapi': 'AMf1n5hK6Xkk'
506 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
507 response['interface'][0])
508 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
510 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
512 self.assertDictEqual(
513 {'payload-type': '21', 'exp-payload-type': '21'},
514 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
516 def test_27_check_otn_topo_links(self):
517 response = test_utils.get_ietf_network_request('otn-topology', 'config')
518 self.assertEqual(response['status_code'], requests.codes.ok)
519 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
520 for link in response['network'][0]['ietf-network-topology:link']:
521 linkId = link['link-id']
522 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
523 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
525 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
527 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
528 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
529 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
531 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
533 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
535 link['transportpce-networkutils:otn-link-type'], 'ODTU4')
537 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
538 self.assertIn(link['org-openroadm-common-network:opposite-link'],
539 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
540 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
542 self.fail("this link should not exist")
544 def test_28_check_otn_topo_tp(self):
545 response = test_utils.get_ietf_network_request('otn-topology', 'config')
546 self.assertEqual(response['status_code'], requests.codes.ok)
547 for node in response['network'][0]['node']:
548 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
549 tpList = node['ietf-network-topology:termination-point']
551 if tp['tp-id'] == 'XPDR1-NETWORK1':
552 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
553 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
555 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
556 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
557 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
559 # test service-create for 10GE service from spdr to spdr
560 def test_29_create_10GE_service(self):
561 self.cr_serv_input_data["service-name"] = "service1-10GE"
562 self.cr_serv_input_data["connection-type"] = "service"
563 self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
564 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
565 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
566 self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
567 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
568 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
569 self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
570 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
571 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
572 self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
573 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
574 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
575 response = test_utils.transportpce_api_rpc_request(
576 'org-openroadm-service', 'service-create',
577 self.cr_serv_input_data)
578 self.assertEqual(response['status_code'], requests.codes.ok)
579 time.sleep(self.WAITING)
581 def test_30_get_10GE_service1(self):
582 response = test_utils.get_ordm_serv_list_attr_request(
583 "services", "service1-10GE")
584 self.assertEqual(response['status_code'], requests.codes.ok)
586 response['services'][0]['administrative-state'], 'inService')
588 response['services'][0]['service-name'], 'service1-10GE')
590 response['services'][0]['connection-type'], 'service')
592 response['services'][0]['lifecycle-state'], 'planned')
594 def test_31_check_interface_10GE_CLIENT_spdra(self):
595 response = test_utils.check_node_attribute_request(
596 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
597 self.assertEqual(response['status_code'], requests.codes.ok)
598 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
599 'administrative-state': 'inService',
600 'supporting-circuit-pack-name': 'CP1-SFP4',
601 'type': 'org-openroadm-interfaces:ethernetCsmacd',
602 'supporting-port': 'CP1-SFP4-P1'
604 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
605 response['interface'][0])
608 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
610 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
611 response = test_utils.check_node_attribute_request(
612 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e:service1-10GE')
613 self.assertEqual(response['status_code'], requests.codes.ok)
614 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e:service1-10GE',
615 'administrative-state': 'inService',
616 'supporting-circuit-pack-name': 'CP1-SFP4',
617 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
618 'type': 'org-openroadm-interfaces:otnOdu',
619 'supporting-port': 'CP1-SFP4-P1'}
621 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
622 'rate': 'org-openroadm-otn-common-types:ODU2e',
623 'monitoring-mode': 'terminated'}
625 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
626 response['interface'][0])
627 self.assertDictEqual(dict(input_dict_2,
628 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
629 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
630 self.assertDictEqual(
631 {'payload-type': '03', 'exp-payload-type': '03'},
632 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
634 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
635 response = test_utils.check_node_attribute_request(
636 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e:service1-10GE')
637 self.assertEqual(response['status_code'], requests.codes.ok)
638 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e:service1-10GE',
639 'administrative-state': 'inService',
640 'supporting-circuit-pack-name': 'CP1-CFP0',
641 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
642 'type': 'org-openroadm-interfaces:otnOdu',
643 'supporting-port': 'CP1-CFP0-P1'}
645 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
646 'rate': 'org-openroadm-otn-common-types:ODU2e',
647 'monitoring-mode': 'monitored'}
648 input_dict_3 = {'trib-port-number': 1}
650 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
651 response['interface'][0])
652 self.assertDictEqual(dict(input_dict_2,
653 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
654 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
655 self.assertDictEqual(dict(input_dict_3,
656 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
657 'parent-odu-allocation']),
658 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
659 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
662 def test_34_check_ODU2E_connection_spdra(self):
663 response = test_utils.check_node_attribute_request(
664 'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
665 self.assertEqual(response['status_code'], requests.codes.ok)
668 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
669 'direction': 'bidirectional'
672 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
673 response['odu-connection'][0])
674 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e:service1-10GE'},
675 response['odu-connection'][0]['destination'])
676 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e:service1-10GE'},
677 response['odu-connection'][0]['source'])
679 def test_35_check_interface_10GE_CLIENT_spdrc(self):
680 response = test_utils.check_node_attribute_request(
681 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
682 self.assertEqual(response['status_code'], requests.codes.ok)
683 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
684 'administrative-state': 'inService',
685 'supporting-circuit-pack-name': 'CP1-SFP4',
686 'type': 'org-openroadm-interfaces:ethernetCsmacd',
687 'supporting-port': 'CP1-SFP4-P1'
689 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
690 response['interface'][0])
693 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
695 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
696 response = test_utils.check_node_attribute_request(
697 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e:service1-10GE')
698 self.assertEqual(response['status_code'], requests.codes.ok)
699 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e:service1-10GE',
700 'administrative-state': 'inService',
701 'supporting-circuit-pack-name': 'CP1-SFP4',
702 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
703 'type': 'org-openroadm-interfaces:otnOdu',
704 'supporting-port': 'CP1-SFP4-P1'}
706 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
707 'rate': 'org-openroadm-otn-common-types:ODU2e',
708 'monitoring-mode': 'terminated'}
710 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
711 response['interface'][0])
712 self.assertDictEqual(dict(input_dict_2,
713 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
714 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
715 self.assertDictEqual(
716 {'payload-type': '03', 'exp-payload-type': '03'},
717 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
719 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
720 response = test_utils.check_node_attribute_request(
721 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e:service1-10GE')
722 self.assertEqual(response['status_code'], requests.codes.ok)
723 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e:service1-10GE',
724 'administrative-state': 'inService',
725 'supporting-circuit-pack-name': 'CP1-CFP0',
726 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
727 'type': 'org-openroadm-interfaces:otnOdu',
728 'supporting-port': 'CP1-CFP0-P1'}
730 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
731 'rate': 'org-openroadm-otn-common-types:ODU2e',
732 'monitoring-mode': 'monitored'}
734 input_dict_3 = {'trib-port-number': 1}
736 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
737 response['interface'][0])
738 self.assertDictEqual(dict(input_dict_2,
739 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
740 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
741 self.assertDictEqual(dict(input_dict_3,
742 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
743 'parent-odu-allocation']),
744 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
745 'parent-odu-allocation'])
747 response['interface'][0][
748 'org-openroadm-otn-odu-interfaces:odu'][
749 'parent-odu-allocation']['trib-slots'])
751 def test_38_check_ODU2E_connection_spdrc(self):
752 response = test_utils.check_node_attribute_request(
753 'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
754 self.assertEqual(response['status_code'], requests.codes.ok)
757 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
758 'direction': 'bidirectional'
761 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
762 response['odu-connection'][0])
763 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e:service1-10GE'},
764 response['odu-connection'][0]['destination'])
765 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e:service1-10GE'},
766 response['odu-connection'][0]['source'])
768 def test_39_check_otn_topo_links(self):
769 response = test_utils.get_ietf_network_request('otn-topology', 'config')
770 self.assertEqual(response['status_code'], requests.codes.ok)
771 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
772 for link in response['network'][0]['ietf-network-topology:link']:
773 linkId = link['link-id']
774 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
775 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
777 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
779 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
781 def test_40_check_otn_topo_tp(self):
782 response = test_utils.get_ietf_network_request('otn-topology', 'config')
783 self.assertEqual(response['status_code'], requests.codes.ok)
784 for node in response['network'][0]['node']:
785 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
786 tpList = node['ietf-network-topology:termination-point']
788 if tp['tp-id'] == 'XPDR1-NETWORK1':
789 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
790 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
791 tsPoolList = list(range(1, 9))
793 tsPoolList, xpdrTpPortConAt['ts-pool'])
795 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
797 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
799 def test_41_delete_10GE_service(self):
800 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
801 response = test_utils.transportpce_api_rpc_request(
802 'org-openroadm-service', 'service-delete',
803 self.del_serv_input_data)
804 self.assertEqual(response['status_code'], requests.codes.ok)
805 time.sleep(self.WAITING)
807 def test_42_check_service_list(self):
808 response = test_utils.get_ordm_serv_list_request()
809 self.assertEqual(response['status_code'], requests.codes.ok)
810 self.assertEqual(len(response['service-list']['services']), 2)
812 def test_43_check_no_ODU2e_connection_spdra(self):
813 response = test_utils.check_node_request("SPDR-SA1")
814 self.assertEqual(response['status_code'], requests.codes.ok)
815 self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
817 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
818 response = test_utils.check_node_attribute_request(
819 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
820 self.assertEqual(response['status_code'], requests.codes.conflict)
822 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
823 response = test_utils.check_node_attribute_request(
824 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
825 self.assertEqual(response['status_code'], requests.codes.conflict)
827 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
828 response = test_utils.check_node_attribute_request(
829 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
830 self.assertEqual(response['status_code'], requests.codes.conflict)
832 def test_47_check_otn_topo_links(self):
833 response = test_utils.get_ietf_network_request('otn-topology', 'config')
834 self.assertEqual(response['status_code'], requests.codes.ok)
835 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
836 for link in response['network'][0]['ietf-network-topology:link']:
837 linkId = link['link-id']
838 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
839 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
841 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
843 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
845 def test_48_check_otn_topo_tp(self):
846 response = test_utils.get_ietf_network_request('otn-topology', 'config')
847 self.assertEqual(response['status_code'], requests.codes.ok)
848 for node in response['network'][0]['node']:
849 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
850 tpList = node['ietf-network-topology:termination-point']
852 if tp['tp-id'] == 'XPDR1-NETWORK1':
853 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
854 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
856 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
858 def test_49_delete_ODU4_service(self):
859 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
860 response = test_utils.transportpce_api_rpc_request(
861 'org-openroadm-service', 'service-delete',
862 self.del_serv_input_data)
863 self.assertEqual(response['status_code'], requests.codes.ok)
864 time.sleep(self.WAITING)
866 def test_50_check_service_list(self):
867 response = test_utils.get_ordm_serv_list_request()
868 self.assertEqual(response['status_code'], requests.codes.ok)
869 self.assertEqual(len(response['service-list']['services']), 1)
871 def test_51_check_no_interface_ODU4_spdra(self):
872 response = test_utils.check_node_attribute_request(
873 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
874 self.assertEqual(response['status_code'], requests.codes.conflict)
876 def test_52_check_otn_topo_links(self):
877 self.test_22_check_otn_topo_otu4_links()
879 def test_53_check_otn_topo_tp(self):
880 response = test_utils.get_ietf_network_request('otn-topology', 'config')
881 self.assertEqual(response['status_code'], requests.codes.ok)
882 for node in response['network'][0]['node']:
883 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
884 tpList = node['ietf-network-topology:termination-point']
886 if tp['tp-id'] == 'XPDR1-NETWORK1':
887 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
888 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
890 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
892 def test_54_delete_OCH_OTU4_service(self):
893 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
894 response = test_utils.transportpce_api_rpc_request(
895 'org-openroadm-service', 'service-delete',
896 self.del_serv_input_data)
897 self.assertEqual(response['status_code'], requests.codes.ok)
898 time.sleep(self.WAITING)
900 def test_55_get_no_service(self):
901 response = test_utils.get_ordm_serv_list_request()
902 self.assertEqual(response['status_code'], requests.codes.conflict)
904 def test_56_check_no_interface_OTU4_spdra(self):
905 response = test_utils.check_node_attribute_request(
906 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
907 self.assertEqual(response['status_code'], requests.codes.conflict)
909 def test_57_check_no_interface_OCH_spdra(self):
910 response = test_utils.check_node_attribute_request(
911 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
912 self.assertEqual(response['status_code'], requests.codes.conflict)
914 def test_58_getLinks_OtnTopology(self):
915 response = test_utils.get_ietf_network_request('otn-topology', 'config')
916 self.assertEqual(response['status_code'], requests.codes.ok)
917 self.assertNotIn('ietf-network-topology:link', response['network'][0])
919 def test_59_check_openroadm_topo_spdra(self):
920 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
921 self.assertEqual(response['status_code'], requests.codes.ok)
922 tp = response['node']['ietf-network-topology:termination-point'][0]
923 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
924 self.assertNotIn('wavelength', dict.keys(
925 tp['org-openroadm-network-topology:xpdr-network-attributes']))
927 def test_60_check_openroadm_topo_ROADMA_SRG(self):
928 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
929 self.assertEqual(response['status_code'], requests.codes.ok)
930 freq_map = base64.b64decode(
931 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
932 freq_map_array = [int(x) for x in freq_map]
933 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
934 liste_tp = response['node']['ietf-network-topology:termination-point']
936 if ele['tp-id'] == 'SRG1-PP1-TXRX':
937 freq_map = base64.b64decode(
938 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
939 freq_map_array = [int(x) for x in freq_map]
940 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
943 def test_61_check_openroadm_topo_ROADMA_DEG(self):
944 response = test_utils.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
945 self.assertEqual(response['status_code'], requests.codes.ok)
946 freq_map = base64.b64decode(
947 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
948 freq_map_array = [int(x) for x in freq_map]
949 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
950 liste_tp = response['node']['ietf-network-topology:termination-point']
952 if ele['tp-id'] == 'DEG2-CTP-TXRX':
953 freq_map = base64.b64decode(
954 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
955 freq_map_array = [int(x) for x in freq_map]
956 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
957 if ele['tp-id'] == 'DEG2-TTP-TXRX':
958 freq_map = base64.b64decode(
959 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
960 freq_map_array = [int(x) for x in freq_map]
961 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
964 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
965 response = test_utils.transportpce_api_rpc_request(
966 'transportpce-networkutils', 'init-xpdr-rdm-links',
967 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
968 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
969 self.assertEqual(response['status_code'], requests.codes.ok)
971 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
972 response = test_utils.transportpce_api_rpc_request(
973 'transportpce-networkutils', 'init-rdm-xpdr-links',
974 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
975 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
976 self.assertEqual(response['status_code'], requests.codes.ok)
978 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
979 response = test_utils.transportpce_api_rpc_request(
980 'transportpce-networkutils', 'init-xpdr-rdm-links',
981 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
982 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
983 self.assertEqual(response['status_code'], requests.codes.ok)
985 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
986 response = test_utils.transportpce_api_rpc_request(
987 'transportpce-networkutils', 'init-rdm-xpdr-links',
988 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
989 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
990 self.assertEqual(response['status_code'], requests.codes.ok)
992 def test_66_create_OCH_OTU4_service_2(self):
993 # pylint: disable=line-too-long
994 self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
995 self.cr_serv_input_data["connection-type"] = "infrastructure"
996 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
997 self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
998 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
999 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1000 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1001 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1002 self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1003 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1004 self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1005 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1006 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1007 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1008 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1009 self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1010 response = test_utils.transportpce_api_rpc_request(
1011 'org-openroadm-service', 'service-create',
1012 self.cr_serv_input_data)
1013 self.assertEqual(response['status_code'], requests.codes.ok)
1014 time.sleep(self.WAITING)
1016 def test_67_get_OCH_OTU4_service2(self):
1017 response = test_utils.get_ordm_serv_list_attr_request(
1018 "services", "service2-OCH-OTU4")
1019 self.assertEqual(response['status_code'], requests.codes.ok)
1021 response['services'][0]['administrative-state'], 'inService')
1023 response['services'][0]['service-name'], 'service2-OCH-OTU4')
1025 response['services'][0]['connection-type'], 'infrastructure')
1027 response['services'][0]['lifecycle-state'], 'planned')
1029 def test_68_create_ODU4_service_2(self):
1030 # pylint: disable=line-too-long
1031 self.cr_serv_input_data["service-name"] = "service2-ODU4"
1032 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
1033 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1034 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1035 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1036 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1037 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1038 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1039 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
1040 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1041 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1042 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1043 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1044 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1045 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1046 response = test_utils.transportpce_api_rpc_request(
1047 'org-openroadm-service', 'service-create',
1048 self.cr_serv_input_data)
1049 self.assertEqual(response['status_code'], requests.codes.ok)
1050 time.sleep(self.WAITING)
1052 def test_69_get_ODU4_service2(self):
1053 response = test_utils.get_ordm_serv_list_attr_request(
1054 "services", "service2-ODU4")
1055 self.assertEqual(response['status_code'], requests.codes.ok)
1057 response['services'][0]['administrative-state'], 'inService')
1059 response['services'][0]['service-name'], 'service2-ODU4')
1061 response['services'][0]['connection-type'], 'infrastructure')
1063 response['services'][0]['lifecycle-state'], 'planned')
1065 def test_70_create_1GE_service(self):
1066 # pylint: disable=line-too-long
1067 self.cr_serv_input_data["service-name"] = "service1-1GE"
1068 self.cr_serv_input_data["connection-type"] = "service"
1069 self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
1070 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1071 self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
1072 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1073 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
1074 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1075 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1076 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1077 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1078 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
1079 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1080 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1081 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1082 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1083 response = test_utils.transportpce_api_rpc_request(
1084 'org-openroadm-service', 'service-create',
1085 self.cr_serv_input_data)
1086 self.assertEqual(response['status_code'], requests.codes.ok)
1087 time.sleep(self.WAITING)
1089 def test_71_get_1GE_service1(self):
1090 response = test_utils.get_ordm_serv_list_attr_request(
1091 "services", "service1-1GE")
1092 self.assertEqual(response['status_code'], requests.codes.ok)
1094 response['services'][0]['administrative-state'], 'inService')
1096 response['services'][0]['service-name'], 'service1-1GE')
1098 response['services'][0]['connection-type'], 'service')
1100 response['services'][0]['lifecycle-state'], 'planned')
1102 def test_72_check_interface_1GE_CLIENT_spdra(self):
1103 response = test_utils.check_node_attribute_request(
1104 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1105 self.assertEqual(response['status_code'], requests.codes.ok)
1106 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1107 'administrative-state': 'inService',
1108 'supporting-circuit-pack-name': 'CP1-SFP4',
1109 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1110 'supporting-port': 'CP1-SFP4-P1'
1112 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1113 response['interface'][0])
1116 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1118 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1119 response = test_utils.check_node_attribute_request(
1120 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0:service1-1GE')
1121 self.assertEqual(response['status_code'], requests.codes.ok)
1122 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0:service1-1GE',
1123 'administrative-state': 'inService',
1124 'supporting-circuit-pack-name': 'CP3-SFP1',
1125 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1126 'type': 'org-openroadm-interfaces:otnOdu',
1127 'supporting-port': 'CP3-SFP1-P1'}
1129 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1130 'rate': 'org-openroadm-otn-common-types:ODU0',
1131 'monitoring-mode': 'terminated'}
1132 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1133 response['interface'][0])
1134 self.assertDictEqual(dict(input_dict_2,
1135 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1136 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1137 self.assertDictEqual(
1138 {'payload-type': '07', 'exp-payload-type': '07'},
1139 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1141 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1142 response = test_utils.check_node_attribute_request(
1143 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0:service1-1GE')
1144 self.assertEqual(response['status_code'], requests.codes.ok)
1145 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0:service1-1GE',
1146 'administrative-state': 'inService',
1147 'supporting-circuit-pack-name': 'CP3-CFP0',
1148 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1149 'type': 'org-openroadm-interfaces:otnOdu',
1150 'supporting-port': 'CP3-CFP0-P1'}
1152 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1153 'rate': 'org-openroadm-otn-common-types:ODU0',
1154 'monitoring-mode': 'monitored'}
1155 input_dict_3 = {'trib-port-number': 1}
1156 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1157 response['interface'][0])
1158 self.assertDictEqual(dict(input_dict_2,
1159 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1160 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1161 self.assertDictEqual(dict(input_dict_3,
1162 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1163 'parent-odu-allocation']),
1164 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1165 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1168 def test_75_check_ODU0_connection_spdra(self):
1169 response = test_utils.check_node_attribute_request(
1170 'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1171 self.assertEqual(response['status_code'], requests.codes.ok)
1174 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1175 'direction': 'bidirectional'
1177 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1178 response['odu-connection'][0])
1179 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0:service1-1GE'},
1180 response['odu-connection'][0]['destination'])
1181 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0:service1-1GE'},
1182 response['odu-connection'][0]['source'])
1184 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1185 response = test_utils.check_node_attribute_request(
1186 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1187 self.assertEqual(response['status_code'], requests.codes.ok)
1188 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1189 'administrative-state': 'inService',
1190 'supporting-circuit-pack-name': 'CP3-SFP1',
1191 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1192 'supporting-port': 'CP3-SFP1-P1'
1194 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1195 response['interface'][0])
1198 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1200 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1201 response = test_utils.check_node_attribute_request(
1202 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0:service1-1GE')
1203 self.assertEqual(response['status_code'], requests.codes.ok)
1204 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0:service1-1GE',
1205 'administrative-state': 'inService',
1206 'supporting-circuit-pack-name': 'CP3-SFP1',
1207 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1208 'type': 'org-openroadm-interfaces:otnOdu',
1209 'supporting-port': 'CP3-SFP1-P1'}
1211 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1212 'rate': 'org-openroadm-otn-common-types:ODU0',
1213 'monitoring-mode': 'terminated'}
1214 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1215 response['interface'][0])
1216 self.assertDictEqual(dict(input_dict_2,
1217 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1218 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1219 self.assertDictEqual(
1220 {'payload-type': '07', 'exp-payload-type': '07'},
1221 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1223 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1224 response = test_utils.check_node_attribute_request(
1225 'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0:service1-1GE')
1226 self.assertEqual(response['status_code'], requests.codes.ok)
1227 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0:service1-1GE',
1228 'administrative-state': 'inService',
1229 'supporting-circuit-pack-name': 'CP3-CFP0',
1230 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1231 'type': 'org-openroadm-interfaces:otnOdu',
1232 'supporting-port': 'CP3-CFP0-P1'}
1234 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1235 'rate': 'org-openroadm-otn-common-types:ODU0',
1236 'monitoring-mode': 'monitored'}
1237 input_dict_3 = {'trib-port-number': 1}
1238 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1239 response['interface'][0])
1240 self.assertDictEqual(dict(input_dict_2,
1241 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1242 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1243 self.assertDictEqual(dict(input_dict_3,
1244 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1245 'parent-odu-allocation']),
1246 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1247 'parent-odu-allocation'])
1249 response['interface'][0][
1250 'org-openroadm-otn-odu-interfaces:odu'][
1251 'parent-odu-allocation']['trib-slots'])
1253 def test_79_check_ODU0_connection_spdrc(self):
1254 response = test_utils.check_node_attribute_request(
1255 'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1256 self.assertEqual(response['status_code'], requests.codes.ok)
1259 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1260 'direction': 'bidirectional'
1262 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1263 response['odu-connection'][0])
1264 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0:service1-1GE'},
1265 response['odu-connection'][0]['destination'])
1266 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0:service1-1GE'},
1267 response['odu-connection'][0]['source'])
1269 def test_80_check_otn_topo_links(self):
1270 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1271 self.assertEqual(response['status_code'], requests.codes.ok)
1272 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1273 for link in response['network'][0]['ietf-network-topology:link']:
1274 linkId = link['link-id']
1275 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1276 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1278 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1280 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1282 def test_81_check_otn_topo_tp(self):
1283 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1284 self.assertEqual(response['status_code'], requests.codes.ok)
1285 for node in response['network'][0]['node']:
1286 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1287 tpList = node['ietf-network-topology:termination-point']
1289 if tp['tp-id'] == 'XPDR3-NETWORK1':
1290 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1291 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1292 tsPoolList = list(range(1, 2))
1294 tsPoolList, xpdrTpPortConAt['ts-pool'])
1296 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1298 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1300 def test_82_delete_1GE_service(self):
1301 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
1302 response = test_utils.transportpce_api_rpc_request(
1303 'org-openroadm-service', 'service-delete',
1304 self.del_serv_input_data)
1305 self.assertEqual(response['status_code'], requests.codes.ok)
1306 time.sleep(self.WAITING)
1308 def test_83_check_service_list(self):
1309 response = test_utils.get_ordm_serv_list_request()
1310 self.assertEqual(response['status_code'], requests.codes.ok)
1311 self.assertEqual(len(response['service-list']['services']), 2)
1313 def test_84_check_no_ODU0_connection_spdra(self):
1314 response = test_utils.check_node_request("SPDR-SA1")
1315 self.assertEqual(response['status_code'], requests.codes.ok)
1316 self.assertNotIn(['odu-connection'][0], response['org-openroadm-device'])
1318 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1319 response = test_utils.check_node_attribute_request(
1320 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0:service1-1GE')
1321 self.assertEqual(response['status_code'], requests.codes.conflict)
1323 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1324 response = test_utils.check_node_attribute_request(
1325 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0:service1-1GE')
1326 self.assertEqual(response['status_code'], requests.codes.conflict)
1328 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1329 response = test_utils.check_node_attribute_request(
1330 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1331 self.assertEqual(response['status_code'], requests.codes.conflict)
1333 def test_88_check_otn_topo_links(self):
1334 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1335 self.assertEqual(response['status_code'], requests.codes.ok)
1336 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1337 for link in response['network'][0]['ietf-network-topology:link']:
1338 linkId = link['link-id']
1339 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1340 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1342 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1344 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1346 def test_89_check_otn_topo_tp(self):
1347 response = test_utils.get_ietf_network_request('otn-topology', 'config')
1348 self.assertEqual(response['status_code'], requests.codes.ok)
1349 for node in response['network'][0]['node']:
1350 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1351 tpList = node['ietf-network-topology:termination-point']
1353 if tp['tp-id'] == 'XPDR3-NETWORK1':
1354 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1355 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1357 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1359 def test_90_delete_ODU4_service(self):
1360 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
1361 response = test_utils.transportpce_api_rpc_request(
1362 'org-openroadm-service', 'service-delete',
1363 self.del_serv_input_data)
1364 self.assertEqual(response['status_code'], requests.codes.ok)
1365 time.sleep(self.WAITING)
1367 def test_91_delete_OCH_OTU4_service(self):
1368 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
1369 response = test_utils.transportpce_api_rpc_request(
1370 'org-openroadm-service', 'service-delete',
1371 self.del_serv_input_data)
1372 self.assertEqual(response['status_code'], requests.codes.ok)
1373 time.sleep(self.WAITING)
1375 def test_92_disconnect_xponders_from_roadm(self):
1376 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1377 self.assertEqual(response['status_code'], requests.codes.ok)
1378 links = response['network'][0]['ietf-network-topology:link']
1380 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1381 response = test_utils.del_ietf_network_link_request(
1382 'openroadm-topology', link['link-id'], 'config')
1383 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1385 def test_93_check_openroadm_topology(self):
1386 response = test_utils.get_ietf_network_request('openroadm-topology', 'config')
1387 self.assertEqual(response['status_code'], requests.codes.ok)
1388 self.assertEqual(18,
1389 len(response['network'][0]['ietf-network-topology:link']),
1390 'Topology should contain 18 links')
1392 def test_94_disconnect_spdrA(self):
1393 response = test_utils.unmount_device("SPDR-SA1")
1394 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1396 def test_95_disconnect_spdrC(self):
1397 response = test_utils.unmount_device("SPDR-SC1")
1398 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1400 def test_96_disconnect_roadmA(self):
1401 response = test_utils.unmount_device("ROADM-A1")
1402 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1404 def test_97_disconnect_roadmC(self):
1405 response = test_utils.unmount_device("ROADM-C1")
1406 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1409 if __name__ == "__main__":
1410 unittest.main(verbosity=2)