3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
26 import test_utils_rfc8040 # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION = '2.2.1'
35 cr_serv_input_data = {
36 "sdnc-request-header": {
37 "request-id": "request-1",
38 "rpc-action": "service-create",
39 "request-system-id": "appname"
41 "service-name": "service1-OCH-OTU4",
42 "common-id": "commonId",
43 "connection-type": "infrastructure",
45 "service-rate": "100",
46 "node-id": "SPDR-SA1",
47 "service-format": "OTU",
48 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
52 "port-device-name": "SPDR-SA1-XPDR1",
54 "port-name": "XPDR1-NETWORK1",
55 "port-rack": "000000.00",
56 "port-shelf": "Chassis#1"
59 "lgx-device-name": "Some lgx-device-name",
60 "lgx-port-name": "Some lgx-port-name",
61 "lgx-port-rack": "000000.00",
62 "lgx-port-shelf": "00"
68 "port-device-name": "SPDR-SA1-XPDR1",
70 "port-name": "XPDR1-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
85 "service-rate": "100",
86 "node-id": "SPDR-SC1",
87 "service-format": "OTU",
88 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92 "port-device-name": "SPDR-SC1-XPDR1",
94 "port-name": "XPDR1-NETWORK1",
95 "port-rack": "000000.00",
96 "port-shelf": "Chassis#1"
99 "lgx-device-name": "Some lgx-device-name",
100 "lgx-port-name": "Some lgx-port-name",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
108 "port-device-name": "SPDR-SC1-XPDR1",
109 "port-type": "fixed",
110 "port-name": "XPDR1-NETWORK1",
111 "port-rack": "000000.00",
112 "port-shelf": "Chassis#1"
115 "lgx-device-name": "Some lgx-device-name",
116 "lgx-port-name": "Some lgx-port-name",
117 "lgx-port-rack": "000000.00",
118 "lgx-port-shelf": "00"
124 "due-date": "2018-06-15T00:00:01Z",
125 "operator-contact": "pw1234"
128 del_serv_input_data = {
129 "sdnc-request-header": {
130 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
131 "rpc-action": "service-delete",
132 "request-system-id": "appname",
133 "notification-url": "http://localhost:8585/NotificationServer/notify"},
134 "service-delete-req-info": {
135 "service-name": "TBD",
136 "tail-retention": "no"}
141 cls.processes = test_utils_rfc8040.start_tpce()
142 cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
143 ('roadma', cls.NODE_VERSION),
144 ('roadmc', cls.NODE_VERSION),
145 ('spdrc', cls.NODE_VERSION)])
148 def tearDownClass(cls):
149 # pylint: disable=not-an-iterable
150 for process in cls.processes:
151 test_utils_rfc8040.shutdown_process(process)
152 print("all processes killed")
157 def test_01_connect_spdrA(self):
158 response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
159 self.assertEqual(response.status_code,
160 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
162 def test_02_connect_spdrC(self):
163 response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
164 self.assertEqual(response.status_code,
165 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
167 def test_03_connect_rdmA(self):
168 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
169 self.assertEqual(response.status_code,
170 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
172 def test_04_connect_rdmC(self):
173 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
174 self.assertEqual(response.status_code,
175 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
177 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
178 response = test_utils_rfc8040.transportpce_api_rpc_request(
179 'transportpce-networkutils', 'init-xpdr-rdm-links',
180 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
181 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
182 self.assertEqual(response['status_code'], requests.codes.ok)
184 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
185 response = test_utils_rfc8040.transportpce_api_rpc_request(
186 'transportpce-networkutils', 'init-rdm-xpdr-links',
187 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
188 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
189 self.assertEqual(response['status_code'], requests.codes.ok)
191 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
192 response = test_utils_rfc8040.transportpce_api_rpc_request(
193 'transportpce-networkutils', 'init-xpdr-rdm-links',
194 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
195 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
196 self.assertEqual(response['status_code'], requests.codes.ok)
198 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
199 response = test_utils_rfc8040.transportpce_api_rpc_request(
200 'transportpce-networkutils', 'init-rdm-xpdr-links',
201 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
202 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
203 self.assertEqual(response['status_code'], requests.codes.ok)
205 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
206 # Config ROADMA-ROADMC oms-attributes
208 "auto-spanloss": "true",
209 "spanloss-base": 11.4,
210 "spanloss-current": 12,
211 "engineered-spanloss": 12.2,
212 "link-concatenation": [{
215 "SRLG-length": 100000,
217 response = test_utils_rfc8040.add_oms_attr_request(
218 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
219 self.assertEqual(response.status_code, requests.codes.created)
221 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
222 # Config ROADMC-ROADMA oms-attributes
224 "auto-spanloss": "true",
225 "spanloss-base": 11.4,
226 "spanloss-current": 12,
227 "engineered-spanloss": 12.2,
228 "link-concatenation": [{
231 "SRLG-length": 100000,
233 response = test_utils_rfc8040.add_oms_attr_request(
234 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
235 self.assertEqual(response.status_code, requests.codes.created)
237 # test service-create for OCH-OTU4 service from spdr to spdr
238 def test_11_check_otn_topology(self):
239 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
240 self.assertEqual(response['status_code'], requests.codes.ok)
241 self.assertEqual(len(response['network'][0]['node']), 6)
242 self.assertNotIn('ietf-network-topology:link', response['network'][0])
244 def test_12_create_OCH_OTU4_service(self):
245 response = test_utils_rfc8040.transportpce_api_rpc_request(
246 'org-openroadm-service', 'service-create',
247 self.cr_serv_input_data)
248 self.assertEqual(response['status_code'], requests.codes.ok)
249 time.sleep(self.WAITING)
251 def test_13_get_OCH_OTU4_service1(self):
252 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
253 "services", "service1-OCH-OTU4")
254 self.assertEqual(response['status_code'], requests.codes.ok)
256 response['services'][0]['administrative-state'], 'inService')
258 response['services'][0]['service-name'], 'service1-OCH-OTU4')
260 response['services'][0]['connection-type'], 'infrastructure')
262 response['services'][0]['lifecycle-state'], 'planned')
264 # Check correct configuration of devices
265 def test_14_check_interface_och_spdra(self):
266 response = test_utils_rfc8040.check_node_attribute_request(
267 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
268 self.assertEqual(response['status_code'], requests.codes.ok)
269 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
270 'administrative-state': 'inService',
271 'supporting-circuit-pack-name': 'CP1-CFP0',
272 'type': 'org-openroadm-interfaces:opticalChannel',
273 'supporting-port': 'CP1-CFP0-P1'
274 }, **response['interface'][0]),
275 response['interface'][0])
276 self.assertEqual('org-openroadm-common-types:R100G',
277 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
278 self.assertEqual('dp-qpsk',
279 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
280 self.assertEqual(196.1,
281 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
284 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
286 def test_15_check_interface_OTU4_spdra(self):
287 response = test_utils_rfc8040.check_node_attribute_request(
288 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
289 self.assertEqual(response['status_code'], requests.codes.ok)
290 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
291 'administrative-state': 'inService',
292 'supporting-circuit-pack-name': 'CP1-CFP0',
293 'supporting-interface': 'XPDR1-NETWORK1-761:768',
294 'type': 'org-openroadm-interfaces:otnOtu',
295 'supporting-port': 'CP1-CFP0-P1'
297 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
298 'expected-dapi': 'H/OelLynehI=',
299 'tx-dapi': 'AMf1n5hK6Xkk',
300 'expected-sapi': 'AMf1n5hK6Xkk',
301 'rate': 'org-openroadm-otn-common-types:OTU4',
304 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
305 response['interface'][0])
306 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
307 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
309 def test_16_check_interface_och_spdrc(self):
310 response = test_utils_rfc8040.check_node_attribute_request(
311 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
312 self.assertEqual(response['status_code'], requests.codes.ok)
313 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
314 'administrative-state': 'inService',
315 'supporting-circuit-pack-name': 'CP1-CFP0',
316 'type': 'org-openroadm-interfaces:opticalChannel',
317 'supporting-port': 'CP1-CFP0-P1'
318 }, **response['interface'][0]),
319 response['interface'][0])
320 self.assertEqual('org-openroadm-common-types:R100G',
321 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
322 self.assertEqual('dp-qpsk',
323 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
324 self.assertEqual(196.1,
325 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
328 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
330 def test_17_check_interface_OTU4_spdrc(self):
331 response = test_utils_rfc8040.check_node_attribute_request(
332 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
333 self.assertEqual(response['status_code'], requests.codes.ok)
334 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
335 'administrative-state': 'inService',
336 'supporting-circuit-pack-name': 'CP1-CFP0',
337 'supporting-interface': 'XPDR1-NETWORK1-761:768',
338 'type': 'org-openroadm-interfaces:otnOtu',
339 'supporting-port': 'CP1-CFP0-P1'
341 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
342 'expected-sapi': 'H/OelLynehI=',
343 'tx-sapi': 'AMf1n5hK6Xkk',
344 'expected-dapi': 'AMf1n5hK6Xkk',
345 'rate': 'org-openroadm-otn-common-types:OTU4',
348 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
349 response['interface'][0])
350 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
351 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
353 def test_18_check_no_interface_ODU4_spdra(self):
354 response = test_utils_rfc8040.check_node_attribute_request(
355 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
356 self.assertEqual(response['status_code'], requests.codes.conflict)
358 def test_19_check_openroadm_topo_spdra(self):
359 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
360 self.assertEqual(response['status_code'], requests.codes.ok)
361 ele = response['node']['ietf-network-topology:termination-point'][0]
362 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
365 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
368 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
370 def test_20_check_openroadm_topo_ROADMA_SRG(self):
371 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
372 self.assertEqual(response['status_code'], requests.codes.ok)
373 freq_map = base64.b64decode(
374 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
375 freq_map_array = [int(x) for x in freq_map]
376 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
377 liste_tp = response['node']['ietf-network-topology:termination-point']
379 if ele['tp-id'] == 'SRG1-PP1-TXRX':
380 freq_map = base64.b64decode(
381 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
382 freq_map_array = [int(x) for x in freq_map]
383 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
384 if ele['tp-id'] == 'SRG1-PP2-TXRX':
385 self.assertNotIn('avail-freq-maps', dict.keys(ele))
387 def test_21_check_openroadm_topo_ROADMA_DEG(self):
388 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
389 self.assertEqual(response['status_code'], requests.codes.ok)
390 freq_map = base64.b64decode(
391 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
392 freq_map_array = [int(x) for x in freq_map]
393 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
394 liste_tp = response['node']['ietf-network-topology:termination-point']
396 if ele['tp-id'] == 'DEG2-CTP-TXRX':
397 freq_map = base64.b64decode(
398 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
399 freq_map_array = [int(x) for x in freq_map]
400 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
401 if ele['tp-id'] == 'DEG2-TTP-TXRX':
402 freq_map = base64.b64decode(
403 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
404 freq_map_array = [int(x) for x in freq_map]
405 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
407 def test_22_check_otn_topo_otu4_links(self):
408 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
409 self.assertEqual(response['status_code'], requests.codes.ok)
410 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
411 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
412 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
413 for link in response['network'][0]['ietf-network-topology:link']:
414 self.assertIn(link['link-id'], listLinkId)
416 link['transportpce-topology:otn-link-type'], 'OTU4')
418 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
420 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
422 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
424 link['org-openroadm-common-network:opposite-link'], listLinkId)
426 # test service-create for ODU4 service from spdr to spdr
427 def test_23_create_ODU4_service(self):
428 self.cr_serv_input_data["service-name"] = "service1-ODU4"
429 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
430 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
431 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
432 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
433 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
434 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
436 response = test_utils_rfc8040.transportpce_api_rpc_request(
437 'org-openroadm-service', 'service-create',
438 self.cr_serv_input_data)
439 self.assertEqual(response['status_code'], requests.codes.ok)
440 time.sleep(self.WAITING)
442 def test_24_get_ODU4_service1(self):
443 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
444 "services", "service1-ODU4")
445 self.assertEqual(response['status_code'], requests.codes.ok)
447 response['services'][0]['administrative-state'], 'inService')
449 response['services'][0]['service-name'], 'service1-ODU4')
451 response['services'][0]['connection-type'], 'infrastructure')
453 response['services'][0]['lifecycle-state'], 'planned')
455 def test_25_check_interface_ODU4_spdra(self):
456 response = test_utils_rfc8040.check_node_attribute_request(
457 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
458 self.assertEqual(response['status_code'], requests.codes.ok)
459 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
460 'administrative-state': 'inService',
461 'supporting-circuit-pack-name': 'CP1-CFP0',
462 'supporting-interface': 'XPDR1-NETWORK1-OTU',
463 'type': 'org-openroadm-interfaces:otnOdu',
464 'supporting-port': 'CP1-CFP0-P1',
466 'description': 'TBD'}
467 # SAPI/DAPI are added in the Otu4 renderer
468 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
469 'rate': 'org-openroadm-otn-common-types:ODU4',
470 'monitoring-mode': 'terminated',
471 'expected-dapi': 'H/OelLynehI=',
472 'expected-sapi': 'AMf1n5hK6Xkk',
473 'tx-dapi': 'AMf1n5hK6Xkk',
474 'tx-sapi': 'H/OelLynehI='}
476 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
477 response['interface'][0])
478 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
480 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
482 self.assertDictEqual(
483 {'payload-type': '21', 'exp-payload-type': '21'},
484 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
486 def test_26_check_interface_ODU4_spdrc(self):
487 response = test_utils_rfc8040.check_node_attribute_request(
488 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
489 self.assertEqual(response['status_code'], requests.codes.ok)
490 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
491 'administrative-state': 'inService',
492 'supporting-circuit-pack-name': 'CP1-CFP0',
493 'supporting-interface': 'XPDR1-NETWORK1-OTU',
494 'type': 'org-openroadm-interfaces:otnOdu',
495 'supporting-port': 'CP1-CFP0-P1',
497 'description': 'TBD'}
498 # SAPI/DAPI are added in the Otu4 renderer
499 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
500 'rate': 'org-openroadm-otn-common-types:ODU4',
501 'monitoring-mode': 'terminated',
502 'tx-sapi': 'AMf1n5hK6Xkk',
503 'tx-dapi': 'H/OelLynehI=',
504 'expected-sapi': 'H/OelLynehI=',
505 'expected-dapi': 'AMf1n5hK6Xkk'
507 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
508 response['interface'][0])
509 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
511 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
513 self.assertDictEqual(
514 {'payload-type': '21', 'exp-payload-type': '21'},
515 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
517 def test_27_check_otn_topo_links(self):
518 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
519 self.assertEqual(response['status_code'], requests.codes.ok)
520 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
521 for link in response['network'][0]['ietf-network-topology:link']:
522 linkId = link['link-id']
523 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
524 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
526 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
528 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
529 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
530 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
532 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
534 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
536 link['transportpce-topology:otn-link-type'], 'ODTU4')
538 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
539 self.assertIn(link['org-openroadm-common-network:opposite-link'],
540 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
541 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
543 self.fail("this link should not exist")
545 def test_28_check_otn_topo_tp(self):
546 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
547 self.assertEqual(response['status_code'], requests.codes.ok)
548 for node in response['network'][0]['node']:
549 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
550 tpList = node['ietf-network-topology:termination-point']
552 if tp['tp-id'] == 'XPDR1-NETWORK1':
553 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
554 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
556 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
557 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
558 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
560 # test service-create for 10GE service from spdr to spdr
561 def test_29_create_10GE_service(self):
562 self.cr_serv_input_data["service-name"] = "service1-10GE"
563 self.cr_serv_input_data["connection-type"] = "service"
564 self.cr_serv_input_data["service-a-end"]["service-rate"] = "10"
565 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
566 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
567 self.cr_serv_input_data["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
568 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
569 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
570 self.cr_serv_input_data["service-z-end"]["service-rate"] = "10"
571 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
572 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
573 self.cr_serv_input_data["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
574 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
575 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
576 response = test_utils_rfc8040.transportpce_api_rpc_request(
577 'org-openroadm-service', 'service-create',
578 self.cr_serv_input_data)
579 self.assertEqual(response['status_code'], requests.codes.ok)
580 time.sleep(self.WAITING)
582 def test_30_get_10GE_service1(self):
583 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
584 "services", "service1-10GE")
585 self.assertEqual(response['status_code'], requests.codes.ok)
587 response['services'][0]['administrative-state'], 'inService')
589 response['services'][0]['service-name'], 'service1-10GE')
591 response['services'][0]['connection-type'], 'service')
593 response['services'][0]['lifecycle-state'], 'planned')
595 def test_31_check_interface_10GE_CLIENT_spdra(self):
596 response = test_utils_rfc8040.check_node_attribute_request(
597 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
598 self.assertEqual(response['status_code'], requests.codes.ok)
599 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
600 'administrative-state': 'inService',
601 'supporting-circuit-pack-name': 'CP1-SFP4',
602 'type': 'org-openroadm-interfaces:ethernetCsmacd',
603 'supporting-port': 'CP1-SFP4-P1'
605 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
606 response['interface'][0])
609 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
611 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
612 response = test_utils_rfc8040.check_node_attribute_request(
613 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
614 self.assertEqual(response['status_code'], requests.codes.ok)
615 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
616 'administrative-state': 'inService',
617 'supporting-circuit-pack-name': 'CP1-SFP4',
618 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
619 'type': 'org-openroadm-interfaces:otnOdu',
620 'supporting-port': 'CP1-SFP4-P1'}
622 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
623 'rate': 'org-openroadm-otn-common-types:ODU2e',
624 'monitoring-mode': 'terminated'}
626 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
627 response['interface'][0])
628 self.assertDictEqual(dict(input_dict_2,
629 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
630 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
631 self.assertDictEqual(
632 {'payload-type': '03', 'exp-payload-type': '03'},
633 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
635 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
636 response = test_utils_rfc8040.check_node_attribute_request(
637 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
638 self.assertEqual(response['status_code'], requests.codes.ok)
639 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
640 'administrative-state': 'inService',
641 'supporting-circuit-pack-name': 'CP1-CFP0',
642 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
643 'type': 'org-openroadm-interfaces:otnOdu',
644 'supporting-port': 'CP1-CFP0-P1'}
646 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
647 'rate': 'org-openroadm-otn-common-types:ODU2e',
648 'monitoring-mode': 'monitored'}
649 input_dict_3 = {'trib-port-number': 1}
651 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
652 response['interface'][0])
653 self.assertDictEqual(dict(input_dict_2,
654 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
655 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
656 self.assertDictEqual(dict(input_dict_3,
657 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
658 'parent-odu-allocation']),
659 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
660 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
663 def test_34_check_ODU2E_connection_spdra(self):
664 response = test_utils_rfc8040.check_node_attribute_request(
665 'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
666 self.assertEqual(response['status_code'], requests.codes.ok)
669 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
670 'direction': 'bidirectional'
673 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
674 response['odu-connection'][0])
675 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
676 response['odu-connection'][0]['destination'])
677 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
678 response['odu-connection'][0]['source'])
680 def test_35_check_interface_10GE_CLIENT_spdrc(self):
681 response = test_utils_rfc8040.check_node_attribute_request(
682 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
683 self.assertEqual(response['status_code'], requests.codes.ok)
684 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
685 'administrative-state': 'inService',
686 'supporting-circuit-pack-name': 'CP1-SFP4',
687 'type': 'org-openroadm-interfaces:ethernetCsmacd',
688 'supporting-port': 'CP1-SFP4-P1'
690 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
691 response['interface'][0])
694 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
696 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
697 response = test_utils_rfc8040.check_node_attribute_request(
698 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
699 self.assertEqual(response['status_code'], requests.codes.ok)
700 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
701 'administrative-state': 'inService',
702 'supporting-circuit-pack-name': 'CP1-SFP4',
703 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
704 'type': 'org-openroadm-interfaces:otnOdu',
705 'supporting-port': 'CP1-SFP4-P1'}
707 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
708 'rate': 'org-openroadm-otn-common-types:ODU2e',
709 'monitoring-mode': 'terminated'}
711 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
712 response['interface'][0])
713 self.assertDictEqual(dict(input_dict_2,
714 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
715 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
716 self.assertDictEqual(
717 {'payload-type': '03', 'exp-payload-type': '03'},
718 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
720 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
721 response = test_utils_rfc8040.check_node_attribute_request(
722 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
723 self.assertEqual(response['status_code'], requests.codes.ok)
724 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
725 'administrative-state': 'inService',
726 'supporting-circuit-pack-name': 'CP1-CFP0',
727 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
728 'type': 'org-openroadm-interfaces:otnOdu',
729 'supporting-port': 'CP1-CFP0-P1'}
731 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
732 'rate': 'org-openroadm-otn-common-types:ODU2e',
733 'monitoring-mode': 'monitored'}
735 input_dict_3 = {'trib-port-number': 1}
737 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
738 response['interface'][0])
739 self.assertDictEqual(dict(input_dict_2,
740 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
741 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
742 self.assertDictEqual(dict(input_dict_3,
743 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
744 'parent-odu-allocation']),
745 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
746 'parent-odu-allocation'])
748 response['interface'][0][
749 'org-openroadm-otn-odu-interfaces:odu'][
750 'parent-odu-allocation']['trib-slots'])
752 def test_38_check_ODU2E_connection_spdrc(self):
753 response = test_utils_rfc8040.check_node_attribute_request(
754 'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
755 self.assertEqual(response['status_code'], requests.codes.ok)
758 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
759 'direction': 'bidirectional'
762 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
763 response['odu-connection'][0])
764 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
765 response['odu-connection'][0]['destination'])
766 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
767 response['odu-connection'][0]['source'])
769 def test_39_check_otn_topo_links(self):
770 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
771 self.assertEqual(response['status_code'], requests.codes.ok)
772 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
773 for link in response['network'][0]['ietf-network-topology:link']:
774 linkId = link['link-id']
775 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
776 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
778 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
780 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
782 def test_40_check_otn_topo_tp(self):
783 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
784 self.assertEqual(response['status_code'], requests.codes.ok)
785 for node in response['network'][0]['node']:
786 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
787 tpList = node['ietf-network-topology:termination-point']
789 if tp['tp-id'] == 'XPDR1-NETWORK1':
790 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
791 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
792 tsPoolList = list(range(1, 9))
794 tsPoolList, xpdrTpPortConAt['ts-pool'])
796 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
798 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
800 def test_41_delete_10GE_service(self):
801 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-10GE"
802 response = test_utils_rfc8040.transportpce_api_rpc_request(
803 'org-openroadm-service', 'service-delete',
804 self.del_serv_input_data)
805 self.assertEqual(response['status_code'], requests.codes.ok)
806 time.sleep(self.WAITING)
808 def test_42_check_service_list(self):
809 response = test_utils_rfc8040.get_ordm_serv_list_request()
810 self.assertEqual(response['status_code'], requests.codes.ok)
811 self.assertEqual(len(response['service-list']['services']), 2)
813 def test_43_check_no_ODU2e_connection_spdra(self):
814 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
815 self.assertEqual(response.status_code, requests.codes.ok)
816 res = response.json()
817 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
820 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
821 response = test_utils_rfc8040.check_node_attribute_request(
822 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
823 self.assertEqual(response['status_code'], requests.codes.conflict)
825 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
826 response = test_utils_rfc8040.check_node_attribute_request(
827 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
828 self.assertEqual(response['status_code'], requests.codes.conflict)
830 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
831 response = test_utils_rfc8040.check_node_attribute_request(
832 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
833 self.assertEqual(response['status_code'], requests.codes.conflict)
835 def test_47_check_otn_topo_links(self):
836 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
837 self.assertEqual(response['status_code'], requests.codes.ok)
838 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
839 for link in response['network'][0]['ietf-network-topology:link']:
840 linkId = link['link-id']
841 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
842 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
844 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
846 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
848 def test_48_check_otn_topo_tp(self):
849 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
850 self.assertEqual(response['status_code'], requests.codes.ok)
851 for node in response['network'][0]['node']:
852 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
853 tpList = node['ietf-network-topology:termination-point']
855 if tp['tp-id'] == 'XPDR1-NETWORK1':
856 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
857 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
859 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
861 def test_49_delete_ODU4_service(self):
862 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
863 response = test_utils_rfc8040.transportpce_api_rpc_request(
864 'org-openroadm-service', 'service-delete',
865 self.del_serv_input_data)
866 self.assertEqual(response['status_code'], requests.codes.ok)
867 time.sleep(self.WAITING)
869 def test_50_check_service_list(self):
870 response = test_utils_rfc8040.get_ordm_serv_list_request()
871 self.assertEqual(response['status_code'], requests.codes.ok)
872 self.assertEqual(len(response['service-list']['services']), 1)
874 def test_51_check_no_interface_ODU4_spdra(self):
875 response = test_utils_rfc8040.check_node_attribute_request(
876 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
877 self.assertEqual(response['status_code'], requests.codes.conflict)
879 def test_52_check_otn_topo_links(self):
880 self.test_22_check_otn_topo_otu4_links()
882 def test_53_check_otn_topo_tp(self):
883 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
884 self.assertEqual(response['status_code'], requests.codes.ok)
885 for node in response['network'][0]['node']:
886 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
887 tpList = node['ietf-network-topology:termination-point']
889 if tp['tp-id'] == 'XPDR1-NETWORK1':
890 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
891 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
893 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
895 def test_54_delete_OCH_OTU4_service(self):
896 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
897 response = test_utils_rfc8040.transportpce_api_rpc_request(
898 'org-openroadm-service', 'service-delete',
899 self.del_serv_input_data)
900 self.assertEqual(response['status_code'], requests.codes.ok)
901 time.sleep(self.WAITING)
903 def test_55_get_no_service(self):
904 response = test_utils_rfc8040.get_ordm_serv_list_request()
905 self.assertEqual(response['status_code'], requests.codes.conflict)
907 def test_56_check_no_interface_OTU4_spdra(self):
908 response = test_utils_rfc8040.check_node_attribute_request(
909 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
910 self.assertEqual(response['status_code'], requests.codes.conflict)
912 def test_57_check_no_interface_OCH_spdra(self):
913 response = test_utils_rfc8040.check_node_attribute_request(
914 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
915 self.assertEqual(response['status_code'], requests.codes.conflict)
917 def test_58_getLinks_OtnTopology(self):
918 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
919 self.assertEqual(response['status_code'], requests.codes.ok)
920 self.assertNotIn('ietf-network-topology:link', response['network'][0])
922 def test_59_check_openroadm_topo_spdra(self):
923 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
924 self.assertEqual(response['status_code'], requests.codes.ok)
925 tp = response['node']['ietf-network-topology:termination-point'][0]
926 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
927 self.assertNotIn('wavelength', dict.keys(
928 tp['org-openroadm-network-topology:xpdr-network-attributes']))
930 def test_60_check_openroadm_topo_ROADMA_SRG(self):
931 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
932 self.assertEqual(response['status_code'], requests.codes.ok)
933 freq_map = base64.b64decode(
934 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
935 freq_map_array = [int(x) for x in freq_map]
936 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
937 liste_tp = response['node']['ietf-network-topology:termination-point']
939 if ele['tp-id'] == 'SRG1-PP1-TXRX':
940 freq_map = base64.b64decode(
941 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
942 freq_map_array = [int(x) for x in freq_map]
943 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
946 def test_61_check_openroadm_topo_ROADMA_DEG(self):
947 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
948 self.assertEqual(response['status_code'], requests.codes.ok)
949 freq_map = base64.b64decode(
950 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
951 freq_map_array = [int(x) for x in freq_map]
952 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
953 liste_tp = response['node']['ietf-network-topology:termination-point']
955 if ele['tp-id'] == 'DEG2-CTP-TXRX':
956 freq_map = base64.b64decode(
957 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
958 freq_map_array = [int(x) for x in freq_map]
959 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
960 if ele['tp-id'] == 'DEG2-TTP-TXRX':
961 freq_map = base64.b64decode(
962 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
963 freq_map_array = [int(x) for x in freq_map]
964 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
967 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
968 response = test_utils_rfc8040.transportpce_api_rpc_request(
969 'transportpce-networkutils', 'init-xpdr-rdm-links',
970 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
971 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
972 self.assertEqual(response['status_code'], requests.codes.ok)
974 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
975 response = test_utils_rfc8040.transportpce_api_rpc_request(
976 'transportpce-networkutils', 'init-rdm-xpdr-links',
977 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
978 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
979 self.assertEqual(response['status_code'], requests.codes.ok)
981 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
982 response = test_utils_rfc8040.transportpce_api_rpc_request(
983 'transportpce-networkutils', 'init-xpdr-rdm-links',
984 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
985 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
986 self.assertEqual(response['status_code'], requests.codes.ok)
988 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
989 response = test_utils_rfc8040.transportpce_api_rpc_request(
990 'transportpce-networkutils', 'init-rdm-xpdr-links',
991 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
992 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
993 self.assertEqual(response['status_code'], requests.codes.ok)
995 def test_66_create_OCH_OTU4_service_2(self):
996 # pylint: disable=line-too-long
997 self.cr_serv_input_data["service-name"] = "service2-OCH-OTU4"
998 self.cr_serv_input_data["connection-type"] = "infrastructure"
999 self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
1000 self.cr_serv_input_data["service-a-end"]["service-format"] = "OTU"
1001 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1002 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1003 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1004 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1005 self.cr_serv_input_data["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1006 self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
1007 self.cr_serv_input_data["service-z-end"]["service-format"] = "OTU"
1008 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1009 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1010 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1011 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1012 self.cr_serv_input_data["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1013 response = test_utils_rfc8040.transportpce_api_rpc_request(
1014 'org-openroadm-service', 'service-create',
1015 self.cr_serv_input_data)
1016 self.assertEqual(response['status_code'], requests.codes.ok)
1017 time.sleep(self.WAITING)
1019 def test_67_get_OCH_OTU4_service2(self):
1020 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
1021 "services", "service2-OCH-OTU4")
1022 self.assertEqual(response['status_code'], requests.codes.ok)
1024 response['services'][0]['administrative-state'], 'inService')
1026 response['services'][0]['service-name'], 'service2-OCH-OTU4')
1028 response['services'][0]['connection-type'], 'infrastructure')
1030 response['services'][0]['lifecycle-state'], 'planned')
1032 def test_68_create_ODU4_service_2(self):
1033 # pylint: disable=line-too-long
1034 self.cr_serv_input_data["service-name"] = "service2-ODU4"
1035 self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
1036 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1037 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1038 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1039 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1040 self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1041 del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
1042 self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
1043 self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1044 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1045 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1046 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1047 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1048 del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
1049 response = test_utils_rfc8040.transportpce_api_rpc_request(
1050 'org-openroadm-service', 'service-create',
1051 self.cr_serv_input_data)
1052 self.assertEqual(response['status_code'], requests.codes.ok)
1053 time.sleep(self.WAITING)
1055 def test_69_get_ODU4_service2(self):
1056 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
1057 "services", "service2-ODU4")
1058 self.assertEqual(response['status_code'], requests.codes.ok)
1060 response['services'][0]['administrative-state'], 'inService')
1062 response['services'][0]['service-name'], 'service2-ODU4')
1064 response['services'][0]['connection-type'], 'infrastructure')
1066 response['services'][0]['lifecycle-state'], 'planned')
1068 def test_70_create_1GE_service(self):
1069 # pylint: disable=line-too-long
1070 self.cr_serv_input_data["service-name"] = "service1-1GE"
1071 self.cr_serv_input_data["connection-type"] = "service"
1072 self.cr_serv_input_data["service-a-end"]["service-rate"] = "1"
1073 self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
1074 self.cr_serv_input_data["service-z-end"]["service-rate"] = "1"
1075 self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
1076 del self.cr_serv_input_data["service-a-end"]["odu-service-rate"]
1077 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1078 self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1079 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1080 self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1081 del self.cr_serv_input_data["service-z-end"]["odu-service-rate"]
1082 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1083 self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1084 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1085 self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1086 response = test_utils_rfc8040.transportpce_api_rpc_request(
1087 'org-openroadm-service', 'service-create',
1088 self.cr_serv_input_data)
1089 self.assertEqual(response['status_code'], requests.codes.ok)
1090 time.sleep(self.WAITING)
1092 def test_71_get_1GE_service1(self):
1093 response = test_utils_rfc8040.get_ordm_serv_list_attr_request(
1094 "services", "service1-1GE")
1095 self.assertEqual(response['status_code'], requests.codes.ok)
1097 response['services'][0]['administrative-state'], 'inService')
1099 response['services'][0]['service-name'], 'service1-1GE')
1101 response['services'][0]['connection-type'], 'service')
1103 response['services'][0]['lifecycle-state'], 'planned')
1105 def test_72_check_interface_1GE_CLIENT_spdra(self):
1106 response = test_utils_rfc8040.check_node_attribute_request(
1107 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1108 self.assertEqual(response['status_code'], requests.codes.ok)
1109 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1110 'administrative-state': 'inService',
1111 'supporting-circuit-pack-name': 'CP1-SFP4',
1112 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1113 'supporting-port': 'CP1-SFP4-P1'
1115 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1116 response['interface'][0])
1119 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1121 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1122 response = test_utils_rfc8040.check_node_attribute_request(
1123 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1124 self.assertEqual(response['status_code'], requests.codes.ok)
1125 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1126 'administrative-state': 'inService',
1127 'supporting-circuit-pack-name': 'CP3-SFP1',
1128 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1129 'type': 'org-openroadm-interfaces:otnOdu',
1130 'supporting-port': 'CP3-SFP1-P1'}
1132 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1133 'rate': 'org-openroadm-otn-common-types:ODU0',
1134 'monitoring-mode': 'terminated'}
1135 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1136 response['interface'][0])
1137 self.assertDictEqual(dict(input_dict_2,
1138 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1139 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1140 self.assertDictEqual(
1141 {'payload-type': '07', 'exp-payload-type': '07'},
1142 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1144 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1145 response = test_utils_rfc8040.check_node_attribute_request(
1146 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1147 self.assertEqual(response['status_code'], requests.codes.ok)
1148 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1149 'administrative-state': 'inService',
1150 'supporting-circuit-pack-name': 'CP3-CFP0',
1151 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1152 'type': 'org-openroadm-interfaces:otnOdu',
1153 'supporting-port': 'CP3-CFP0-P1'}
1155 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1156 'rate': 'org-openroadm-otn-common-types:ODU0',
1157 'monitoring-mode': 'monitored'}
1158 input_dict_3 = {'trib-port-number': 1}
1159 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1160 response['interface'][0])
1161 self.assertDictEqual(dict(input_dict_2,
1162 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1163 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1164 self.assertDictEqual(dict(input_dict_3,
1165 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1166 'parent-odu-allocation']),
1167 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1168 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1171 def test_75_check_ODU0_connection_spdra(self):
1172 response = test_utils_rfc8040.check_node_attribute_request(
1173 'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1174 self.assertEqual(response['status_code'], requests.codes.ok)
1177 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1178 'direction': 'bidirectional'
1180 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1181 response['odu-connection'][0])
1182 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1183 response['odu-connection'][0]['destination'])
1184 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1185 response['odu-connection'][0]['source'])
1187 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1188 response = test_utils_rfc8040.check_node_attribute_request(
1189 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1190 self.assertEqual(response['status_code'], requests.codes.ok)
1191 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1192 'administrative-state': 'inService',
1193 'supporting-circuit-pack-name': 'CP3-SFP1',
1194 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1195 'supporting-port': 'CP3-SFP1-P1'
1197 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1198 response['interface'][0])
1201 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1203 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1204 response = test_utils_rfc8040.check_node_attribute_request(
1205 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1206 self.assertEqual(response['status_code'], requests.codes.ok)
1207 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1208 'administrative-state': 'inService',
1209 'supporting-circuit-pack-name': 'CP3-SFP1',
1210 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1211 'type': 'org-openroadm-interfaces:otnOdu',
1212 'supporting-port': 'CP3-SFP1-P1'}
1214 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1215 'rate': 'org-openroadm-otn-common-types:ODU0',
1216 'monitoring-mode': 'terminated'}
1217 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1218 response['interface'][0])
1219 self.assertDictEqual(dict(input_dict_2,
1220 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1221 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1222 self.assertDictEqual(
1223 {'payload-type': '07', 'exp-payload-type': '07'},
1224 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1226 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1227 response = test_utils_rfc8040.check_node_attribute_request(
1228 'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1229 self.assertEqual(response['status_code'], requests.codes.ok)
1230 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1231 'administrative-state': 'inService',
1232 'supporting-circuit-pack-name': 'CP3-CFP0',
1233 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1234 'type': 'org-openroadm-interfaces:otnOdu',
1235 'supporting-port': 'CP3-CFP0-P1'}
1237 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1238 'rate': 'org-openroadm-otn-common-types:ODU0',
1239 'monitoring-mode': 'monitored'}
1240 input_dict_3 = {'trib-port-number': 1}
1241 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1242 response['interface'][0])
1243 self.assertDictEqual(dict(input_dict_2,
1244 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1245 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1246 self.assertDictEqual(dict(input_dict_3,
1247 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1248 'parent-odu-allocation']),
1249 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1250 'parent-odu-allocation'])
1252 response['interface'][0][
1253 'org-openroadm-otn-odu-interfaces:odu'][
1254 'parent-odu-allocation']['trib-slots'])
1256 def test_79_check_ODU0_connection_spdrc(self):
1257 response = test_utils_rfc8040.check_node_attribute_request(
1258 'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1259 self.assertEqual(response['status_code'], requests.codes.ok)
1262 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1263 'direction': 'bidirectional'
1265 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1266 response['odu-connection'][0])
1267 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1268 response['odu-connection'][0]['destination'])
1269 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1270 response['odu-connection'][0]['source'])
1272 def test_80_check_otn_topo_links(self):
1273 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1274 self.assertEqual(response['status_code'], requests.codes.ok)
1275 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1276 for link in response['network'][0]['ietf-network-topology:link']:
1277 linkId = link['link-id']
1278 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1279 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1281 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1283 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1285 def test_81_check_otn_topo_tp(self):
1286 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1287 self.assertEqual(response['status_code'], requests.codes.ok)
1288 for node in response['network'][0]['node']:
1289 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1290 tpList = node['ietf-network-topology:termination-point']
1292 if tp['tp-id'] == 'XPDR3-NETWORK1':
1293 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1294 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1295 tsPoolList = list(range(1, 2))
1297 tsPoolList, xpdrTpPortConAt['ts-pool'])
1299 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1301 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1303 def test_82_delete_1GE_service(self):
1304 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-1GE"
1305 response = test_utils_rfc8040.transportpce_api_rpc_request(
1306 'org-openroadm-service', 'service-delete',
1307 self.del_serv_input_data)
1308 self.assertEqual(response['status_code'], requests.codes.ok)
1309 time.sleep(self.WAITING)
1311 def test_83_check_service_list(self):
1312 response = test_utils_rfc8040.get_ordm_serv_list_request()
1313 self.assertEqual(response['status_code'], requests.codes.ok)
1314 self.assertEqual(len(response['service-list']['services']), 2)
1316 def test_84_check_no_ODU0_connection_spdra(self):
1317 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1318 self.assertEqual(response.status_code, requests.codes.ok)
1319 res = response.json()
1320 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1323 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1324 response = test_utils_rfc8040.check_node_attribute_request(
1325 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1326 self.assertEqual(response['status_code'], requests.codes.conflict)
1328 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1329 response = test_utils_rfc8040.check_node_attribute_request(
1330 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1331 self.assertEqual(response['status_code'], requests.codes.conflict)
1333 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1334 response = test_utils_rfc8040.check_node_attribute_request(
1335 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1336 self.assertEqual(response['status_code'], requests.codes.conflict)
1338 def test_88_check_otn_topo_links(self):
1339 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1340 self.assertEqual(response['status_code'], requests.codes.ok)
1341 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1342 for link in response['network'][0]['ietf-network-topology:link']:
1343 linkId = link['link-id']
1344 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1345 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1347 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1349 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1351 def test_89_check_otn_topo_tp(self):
1352 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1353 self.assertEqual(response['status_code'], requests.codes.ok)
1354 for node in response['network'][0]['node']:
1355 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1356 tpList = node['ietf-network-topology:termination-point']
1358 if tp['tp-id'] == 'XPDR3-NETWORK1':
1359 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1360 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1362 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1364 def test_90_delete_ODU4_service(self):
1365 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-ODU4"
1366 response = test_utils_rfc8040.transportpce_api_rpc_request(
1367 'org-openroadm-service', 'service-delete',
1368 self.del_serv_input_data)
1369 self.assertEqual(response['status_code'], requests.codes.ok)
1370 time.sleep(self.WAITING)
1372 def test_91_delete_OCH_OTU4_service(self):
1373 self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service2-OCH-OTU4"
1374 response = test_utils_rfc8040.transportpce_api_rpc_request(
1375 'org-openroadm-service', 'service-delete',
1376 self.del_serv_input_data)
1377 self.assertEqual(response['status_code'], requests.codes.ok)
1378 time.sleep(self.WAITING)
1380 def test_92_disconnect_xponders_from_roadm(self):
1381 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1382 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1383 self.assertEqual(response['status_code'], requests.codes.ok)
1384 links = response['network'][0]['ietf-network-topology:link']
1386 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1387 link_name = link["link-id"]
1388 response = test_utils.delete_request(url+link_name)
1389 self.assertEqual(response.status_code, requests.codes.ok)
1391 def test_93_check_openroadm_topology(self):
1392 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1393 self.assertEqual(response['status_code'], requests.codes.ok)
1394 self.assertEqual(18,
1395 len(response['network'][0]['ietf-network-topology:link']),
1396 'Topology should contain 18 links')
1398 def test_94_disconnect_spdrA(self):
1399 response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1400 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1402 def test_95_disconnect_spdrC(self):
1403 response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1404 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1406 def test_96_disconnect_roadmA(self):
1407 response = test_utils_rfc8040.unmount_device("ROADM-A1")
1408 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1410 def test_97_disconnect_roadmC(self):
1411 response = test_utils_rfc8040.unmount_device("ROADM-C1")
1412 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1415 if __name__ == "__main__":
1416 unittest.main(verbosity=2)