3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
26 import test_utils_rfc8040 # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION = '2.2.1'
35 cr_serv_sample_data = {"input": {
36 "sdnc-request-header": {
37 "request-id": "request-1",
38 "rpc-action": "service-create",
39 "request-system-id": "appname"
41 "service-name": "service1-OCH-OTU4",
42 "common-id": "commonId",
43 "connection-type": "infrastructure",
45 "service-rate": "100",
46 "node-id": "SPDR-SA1",
47 "service-format": "OTU",
48 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
52 "port-device-name": "SPDR-SA1-XPDR1",
54 "port-name": "XPDR1-NETWORK1",
55 "port-rack": "000000.00",
56 "port-shelf": "Chassis#1"
59 "lgx-device-name": "Some lgx-device-name",
60 "lgx-port-name": "Some lgx-port-name",
61 "lgx-port-rack": "000000.00",
62 "lgx-port-shelf": "00"
68 "port-device-name": "SPDR-SA1-XPDR1",
70 "port-name": "XPDR1-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
85 "service-rate": "100",
86 "node-id": "SPDR-SC1",
87 "service-format": "OTU",
88 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92 "port-device-name": "SPDR-SC1-XPDR1",
94 "port-name": "XPDR1-NETWORK1",
95 "port-rack": "000000.00",
96 "port-shelf": "Chassis#1"
99 "lgx-device-name": "Some lgx-device-name",
100 "lgx-port-name": "Some lgx-port-name",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
108 "port-device-name": "SPDR-SC1-XPDR1",
109 "port-type": "fixed",
110 "port-name": "XPDR1-NETWORK1",
111 "port-rack": "000000.00",
112 "port-shelf": "Chassis#1"
115 "lgx-device-name": "Some lgx-device-name",
116 "lgx-port-name": "Some lgx-port-name",
117 "lgx-port-rack": "000000.00",
118 "lgx-port-shelf": "00"
124 "due-date": "2018-06-15T00:00:01Z",
125 "operator-contact": "pw1234"
131 cls.processes = test_utils_rfc8040.start_tpce()
132 cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
133 ('roadma', cls.NODE_VERSION),
134 ('roadmc', cls.NODE_VERSION),
135 ('spdrc', cls.NODE_VERSION)])
138 def tearDownClass(cls):
139 # pylint: disable=not-an-iterable
140 for process in cls.processes:
141 test_utils_rfc8040.shutdown_process(process)
142 print("all processes killed")
147 def test_01_connect_spdrA(self):
148 response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
149 self.assertEqual(response.status_code,
150 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
152 def test_02_connect_spdrC(self):
153 response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
154 self.assertEqual(response.status_code,
155 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
157 def test_03_connect_rdmA(self):
158 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
159 self.assertEqual(response.status_code,
160 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
162 def test_04_connect_rdmC(self):
163 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
164 self.assertEqual(response.status_code,
165 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
167 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
168 response = test_utils_rfc8040.transportpce_api_rpc_request(
169 'transportpce-networkutils', 'init-xpdr-rdm-links',
170 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
171 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
172 self.assertEqual(response['status_code'], requests.codes.ok)
174 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
175 response = test_utils_rfc8040.transportpce_api_rpc_request(
176 'transportpce-networkutils', 'init-rdm-xpdr-links',
177 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
178 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
179 self.assertEqual(response['status_code'], requests.codes.ok)
181 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
182 response = test_utils_rfc8040.transportpce_api_rpc_request(
183 'transportpce-networkutils', 'init-xpdr-rdm-links',
184 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
185 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
186 self.assertEqual(response['status_code'], requests.codes.ok)
188 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
189 response = test_utils_rfc8040.transportpce_api_rpc_request(
190 'transportpce-networkutils', 'init-rdm-xpdr-links',
191 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
192 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
193 self.assertEqual(response['status_code'], requests.codes.ok)
195 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
196 # Config ROADMA-ROADMC oms-attributes
198 "auto-spanloss": "true",
199 "spanloss-base": 11.4,
200 "spanloss-current": 12,
201 "engineered-spanloss": 12.2,
202 "link-concatenation": [{
205 "SRLG-length": 100000,
207 response = test_utils_rfc8040.add_oms_attr_request(
208 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
209 self.assertEqual(response.status_code, requests.codes.created)
211 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
212 # Config ROADMC-ROADMA oms-attributes
214 "auto-spanloss": "true",
215 "spanloss-base": 11.4,
216 "spanloss-current": 12,
217 "engineered-spanloss": 12.2,
218 "link-concatenation": [{
221 "SRLG-length": 100000,
223 response = test_utils_rfc8040.add_oms_attr_request(
224 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
225 self.assertEqual(response.status_code, requests.codes.created)
227 # test service-create for OCH-OTU4 service from spdr to spdr
228 def test_11_check_otn_topology(self):
229 response = test_utils.get_otn_topo_request()
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
232 nbNode = len(res['network'][0]['node'])
233 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
234 self.assertNotIn('ietf-network-topology:link', res['network'][0])
236 def test_12_create_OCH_OTU4_service(self):
237 response = test_utils.service_create_request(self.cr_serv_sample_data)
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
240 self.assertIn('PCE calculation in progress',
241 res['output']['configuration-response-common']['response-message'])
242 time.sleep(self.WAITING)
244 def test_13_get_OCH_OTU4_service1(self):
245 response = test_utils.get_service_list_request(
246 "services/service1-OCH-OTU4")
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
250 res['services'][0]['administrative-state'], 'inService')
252 res['services'][0]['service-name'], 'service1-OCH-OTU4')
254 res['services'][0]['connection-type'], 'infrastructure')
256 res['services'][0]['lifecycle-state'], 'planned')
259 # Check correct configuration of devices
260 def test_14_check_interface_och_spdra(self):
261 response = test_utils_rfc8040.check_node_attribute_request(
262 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
263 self.assertEqual(response['status_code'], requests.codes.ok)
264 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
265 'administrative-state': 'inService',
266 'supporting-circuit-pack-name': 'CP1-CFP0',
267 'type': 'org-openroadm-interfaces:opticalChannel',
268 'supporting-port': 'CP1-CFP0-P1'
269 }, **response['interface'][0]),
270 response['interface'][0])
271 self.assertEqual('org-openroadm-common-types:R100G',
272 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
273 self.assertEqual('dp-qpsk',
274 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
275 self.assertEqual(196.1,
276 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
279 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
281 def test_15_check_interface_OTU4_spdra(self):
282 response = test_utils_rfc8040.check_node_attribute_request(
283 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
284 self.assertEqual(response['status_code'], requests.codes.ok)
285 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
286 'administrative-state': 'inService',
287 'supporting-circuit-pack-name': 'CP1-CFP0',
288 'supporting-interface': 'XPDR1-NETWORK1-761:768',
289 'type': 'org-openroadm-interfaces:otnOtu',
290 'supporting-port': 'CP1-CFP0-P1'
292 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
293 'expected-dapi': 'H/OelLynehI=',
294 'tx-dapi': 'AMf1n5hK6Xkk',
295 'expected-sapi': 'AMf1n5hK6Xkk',
296 'rate': 'org-openroadm-otn-common-types:OTU4',
299 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
300 response['interface'][0])
301 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
302 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
304 def test_16_check_interface_och_spdrc(self):
305 response = test_utils_rfc8040.check_node_attribute_request(
306 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
307 self.assertEqual(response['status_code'], requests.codes.ok)
308 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
309 'administrative-state': 'inService',
310 'supporting-circuit-pack-name': 'CP1-CFP0',
311 'type': 'org-openroadm-interfaces:opticalChannel',
312 'supporting-port': 'CP1-CFP0-P1'
313 }, **response['interface'][0]),
314 response['interface'][0])
315 self.assertEqual('org-openroadm-common-types:R100G',
316 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
317 self.assertEqual('dp-qpsk',
318 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
319 self.assertEqual(196.1,
320 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
323 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
325 def test_17_check_interface_OTU4_spdrc(self):
326 response = test_utils_rfc8040.check_node_attribute_request(
327 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
328 self.assertEqual(response['status_code'], requests.codes.ok)
329 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
330 'administrative-state': 'inService',
331 'supporting-circuit-pack-name': 'CP1-CFP0',
332 'supporting-interface': 'XPDR1-NETWORK1-761:768',
333 'type': 'org-openroadm-interfaces:otnOtu',
334 'supporting-port': 'CP1-CFP0-P1'
336 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
337 'expected-sapi': 'H/OelLynehI=',
338 'tx-sapi': 'AMf1n5hK6Xkk',
339 'expected-dapi': 'AMf1n5hK6Xkk',
340 'rate': 'org-openroadm-otn-common-types:OTU4',
343 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
344 response['interface'][0])
345 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
346 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
348 def test_18_check_no_interface_ODU4_spdra(self):
349 response = test_utils_rfc8040.check_node_attribute_request(
350 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
351 self.assertEqual(response['status_code'], requests.codes.conflict)
353 def test_19_check_openroadm_topo_spdra(self):
354 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
355 self.assertEqual(response['status_code'], requests.codes.ok)
356 ele = response['node']['ietf-network-topology:termination-point'][0]
357 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
360 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
363 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
365 def test_20_check_openroadm_topo_ROADMA_SRG(self):
366 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
367 self.assertEqual(response['status_code'], requests.codes.ok)
368 freq_map = base64.b64decode(
369 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
370 freq_map_array = [int(x) for x in freq_map]
371 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
372 liste_tp = response['node']['ietf-network-topology:termination-point']
374 if ele['tp-id'] == 'SRG1-PP1-TXRX':
375 freq_map = base64.b64decode(
376 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
377 freq_map_array = [int(x) for x in freq_map]
378 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
379 if ele['tp-id'] == 'SRG1-PP2-TXRX':
380 self.assertNotIn('avail-freq-maps', dict.keys(ele))
382 def test_21_check_openroadm_topo_ROADMA_DEG(self):
383 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
384 self.assertEqual(response['status_code'], requests.codes.ok)
385 freq_map = base64.b64decode(
386 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
387 freq_map_array = [int(x) for x in freq_map]
388 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
389 liste_tp = response['node']['ietf-network-topology:termination-point']
391 if ele['tp-id'] == 'DEG2-CTP-TXRX':
392 freq_map = base64.b64decode(
393 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
394 freq_map_array = [int(x) for x in freq_map]
395 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
396 if ele['tp-id'] == 'DEG2-TTP-TXRX':
397 freq_map = base64.b64decode(
398 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
399 freq_map_array = [int(x) for x in freq_map]
400 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
402 def test_22_check_otn_topo_otu4_links(self):
403 response = test_utils.get_otn_topo_request()
404 self.assertEqual(response.status_code, requests.codes.ok)
405 res = response.json()
406 nb_links = len(res['network'][0]['ietf-network-topology:link'])
407 self.assertEqual(nb_links, 2)
408 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
409 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
410 for link in res['network'][0]['ietf-network-topology:link']:
411 self.assertIn(link['link-id'], listLinkId)
413 link['transportpce-topology:otn-link-type'], 'OTU4')
415 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
417 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
419 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
421 link['org-openroadm-common-network:opposite-link'], listLinkId)
423 # test service-create for ODU4 service from spdr to spdr
424 def test_23_create_ODU4_service(self):
425 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
426 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
427 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
428 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
429 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
430 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
431 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
433 response = test_utils.service_create_request(self.cr_serv_sample_data)
434 self.assertEqual(response.status_code, requests.codes.ok)
435 res = response.json()
436 self.assertIn('PCE calculation in progress',
437 res['output']['configuration-response-common']['response-message'])
438 time.sleep(self.WAITING)
440 def test_24_get_ODU4_service1(self):
441 response = test_utils.get_service_list_request(
442 "services/service1-ODU4")
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
446 res['services'][0]['administrative-state'], 'inService')
448 res['services'][0]['service-name'], 'service1-ODU4')
450 res['services'][0]['connection-type'], 'infrastructure')
452 res['services'][0]['lifecycle-state'], 'planned')
455 def test_25_check_interface_ODU4_spdra(self):
456 response = test_utils_rfc8040.check_node_attribute_request(
457 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
458 self.assertEqual(response['status_code'], requests.codes.ok)
459 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
460 'administrative-state': 'inService',
461 'supporting-circuit-pack-name': 'CP1-CFP0',
462 'supporting-interface': 'XPDR1-NETWORK1-OTU',
463 'type': 'org-openroadm-interfaces:otnOdu',
464 'supporting-port': 'CP1-CFP0-P1',
466 'description': 'TBD'}
467 # SAPI/DAPI are added in the Otu4 renderer
468 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
469 'rate': 'org-openroadm-otn-common-types:ODU4',
470 'monitoring-mode': 'terminated',
471 'expected-dapi': 'H/OelLynehI=',
472 'expected-sapi': 'AMf1n5hK6Xkk',
473 'tx-dapi': 'AMf1n5hK6Xkk',
474 'tx-sapi': 'H/OelLynehI='}
476 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
477 response['interface'][0])
478 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
480 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
482 self.assertDictEqual(
483 {'payload-type': '21', 'exp-payload-type': '21'},
484 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
486 def test_26_check_interface_ODU4_spdrc(self):
487 response = test_utils_rfc8040.check_node_attribute_request(
488 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
489 self.assertEqual(response['status_code'], requests.codes.ok)
490 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
491 'administrative-state': 'inService',
492 'supporting-circuit-pack-name': 'CP1-CFP0',
493 'supporting-interface': 'XPDR1-NETWORK1-OTU',
494 'type': 'org-openroadm-interfaces:otnOdu',
495 'supporting-port': 'CP1-CFP0-P1',
497 'description': 'TBD'}
498 # SAPI/DAPI are added in the Otu4 renderer
499 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
500 'rate': 'org-openroadm-otn-common-types:ODU4',
501 'monitoring-mode': 'terminated',
502 'tx-sapi': 'AMf1n5hK6Xkk',
503 'tx-dapi': 'H/OelLynehI=',
504 'expected-sapi': 'H/OelLynehI=',
505 'expected-dapi': 'AMf1n5hK6Xkk'
507 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
508 response['interface'][0])
509 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
511 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
513 self.assertDictEqual(
514 {'payload-type': '21', 'exp-payload-type': '21'},
515 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
517 def test_27_check_otn_topo_links(self):
518 response = test_utils.get_otn_topo_request()
519 self.assertEqual(response.status_code, requests.codes.ok)
520 res = response.json()
521 nb_links = len(res['network'][0]['ietf-network-topology:link'])
522 self.assertEqual(nb_links, 4)
523 for link in res['network'][0]['ietf-network-topology:link']:
524 linkId = link['link-id']
525 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
526 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
528 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
530 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
531 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
532 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
534 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
536 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
538 link['transportpce-topology:otn-link-type'], 'ODTU4')
540 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
541 self.assertIn(link['org-openroadm-common-network:opposite-link'],
542 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
543 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
545 self.fail("this link should not exist")
547 def test_28_check_otn_topo_tp(self):
548 response = test_utils.get_otn_topo_request()
549 res = response.json()
550 for node in res['network'][0]['node']:
551 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
552 tpList = node['ietf-network-topology:termination-point']
554 if tp['tp-id'] == 'XPDR1-NETWORK1':
555 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
556 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
558 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
559 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
560 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
562 # test service-create for 10GE service from spdr to spdr
563 def test_29_create_10GE_service(self):
564 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
565 self.cr_serv_sample_data["input"]["connection-type"] = "service"
566 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
567 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
568 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
569 self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
570 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
571 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
572 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
573 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
574 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
575 self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
576 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
577 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
578 response = test_utils.service_create_request(self.cr_serv_sample_data)
579 self.assertEqual(response.status_code, requests.codes.ok)
580 res = response.json()
581 self.assertIn('PCE calculation in progress',
582 res['output']['configuration-response-common']['response-message'])
583 time.sleep(self.WAITING)
585 def test_30_get_10GE_service1(self):
586 response = test_utils.get_service_list_request(
587 "services/service1-10GE")
588 self.assertEqual(response.status_code, requests.codes.ok)
589 res = response.json()
591 res['services'][0]['administrative-state'], 'inService')
593 res['services'][0]['service-name'], 'service1-10GE')
595 res['services'][0]['connection-type'], 'service')
597 res['services'][0]['lifecycle-state'], 'planned')
600 def test_31_check_interface_10GE_CLIENT_spdra(self):
601 response = test_utils_rfc8040.check_node_attribute_request(
602 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
603 self.assertEqual(response['status_code'], requests.codes.ok)
604 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
605 'administrative-state': 'inService',
606 'supporting-circuit-pack-name': 'CP1-SFP4',
607 'type': 'org-openroadm-interfaces:ethernetCsmacd',
608 'supporting-port': 'CP1-SFP4-P1'
610 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
611 response['interface'][0])
614 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
616 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
617 response = test_utils_rfc8040.check_node_attribute_request(
618 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
619 self.assertEqual(response['status_code'], requests.codes.ok)
620 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
621 'administrative-state': 'inService',
622 'supporting-circuit-pack-name': 'CP1-SFP4',
623 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
624 'type': 'org-openroadm-interfaces:otnOdu',
625 'supporting-port': 'CP1-SFP4-P1'}
627 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
628 'rate': 'org-openroadm-otn-common-types:ODU2e',
629 'monitoring-mode': 'terminated'}
631 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
632 response['interface'][0])
633 self.assertDictEqual(dict(input_dict_2,
634 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
635 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
636 self.assertDictEqual(
637 {'payload-type': '03', 'exp-payload-type': '03'},
638 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
640 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
641 response = test_utils_rfc8040.check_node_attribute_request(
642 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
643 self.assertEqual(response['status_code'], requests.codes.ok)
644 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
645 'administrative-state': 'inService',
646 'supporting-circuit-pack-name': 'CP1-CFP0',
647 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
648 'type': 'org-openroadm-interfaces:otnOdu',
649 'supporting-port': 'CP1-CFP0-P1'}
651 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
652 'rate': 'org-openroadm-otn-common-types:ODU2e',
653 'monitoring-mode': 'monitored'}
654 input_dict_3 = {'trib-port-number': 1}
656 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
657 response['interface'][0])
658 self.assertDictEqual(dict(input_dict_2,
659 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
660 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
661 self.assertDictEqual(dict(input_dict_3,
662 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
663 'parent-odu-allocation']),
664 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
665 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
668 def test_34_check_ODU2E_connection_spdra(self):
669 response = test_utils_rfc8040.check_node_attribute_request(
670 'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
671 self.assertEqual(response['status_code'], requests.codes.ok)
674 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
675 'direction': 'bidirectional'
678 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
679 response['odu-connection'][0])
680 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
681 response['odu-connection'][0]['destination'])
682 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
683 response['odu-connection'][0]['source'])
685 def test_35_check_interface_10GE_CLIENT_spdrc(self):
686 response = test_utils_rfc8040.check_node_attribute_request(
687 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
688 self.assertEqual(response['status_code'], requests.codes.ok)
689 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
690 'administrative-state': 'inService',
691 'supporting-circuit-pack-name': 'CP1-SFP4',
692 'type': 'org-openroadm-interfaces:ethernetCsmacd',
693 'supporting-port': 'CP1-SFP4-P1'
695 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
696 response['interface'][0])
699 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
701 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
702 response = test_utils_rfc8040.check_node_attribute_request(
703 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
704 self.assertEqual(response['status_code'], requests.codes.ok)
705 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
706 'administrative-state': 'inService',
707 'supporting-circuit-pack-name': 'CP1-SFP4',
708 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
709 'type': 'org-openroadm-interfaces:otnOdu',
710 'supporting-port': 'CP1-SFP4-P1'}
712 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
713 'rate': 'org-openroadm-otn-common-types:ODU2e',
714 'monitoring-mode': 'terminated'}
716 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
717 response['interface'][0])
718 self.assertDictEqual(dict(input_dict_2,
719 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
720 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
721 self.assertDictEqual(
722 {'payload-type': '03', 'exp-payload-type': '03'},
723 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
725 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
726 response = test_utils_rfc8040.check_node_attribute_request(
727 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
728 self.assertEqual(response['status_code'], requests.codes.ok)
729 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
730 'administrative-state': 'inService',
731 'supporting-circuit-pack-name': 'CP1-CFP0',
732 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
733 'type': 'org-openroadm-interfaces:otnOdu',
734 'supporting-port': 'CP1-CFP0-P1'}
736 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
737 'rate': 'org-openroadm-otn-common-types:ODU2e',
738 'monitoring-mode': 'monitored'}
740 input_dict_3 = {'trib-port-number': 1}
742 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
743 response['interface'][0])
744 self.assertDictEqual(dict(input_dict_2,
745 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
746 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
747 self.assertDictEqual(dict(input_dict_3,
748 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
749 'parent-odu-allocation']),
750 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
751 'parent-odu-allocation'])
753 response['interface'][0][
754 'org-openroadm-otn-odu-interfaces:odu'][
755 'parent-odu-allocation']['trib-slots'])
757 def test_38_check_ODU2E_connection_spdrc(self):
758 response = test_utils_rfc8040.check_node_attribute_request(
759 'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
760 self.assertEqual(response['status_code'], requests.codes.ok)
763 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
764 'direction': 'bidirectional'
767 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
768 response['odu-connection'][0])
769 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
770 response['odu-connection'][0]['destination'])
771 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
772 response['odu-connection'][0]['source'])
774 def test_39_check_otn_topo_links(self):
775 response = test_utils.get_otn_topo_request()
776 self.assertEqual(response.status_code, requests.codes.ok)
777 res = response.json()
778 nb_links = len(res['network'][0]['ietf-network-topology:link'])
779 self.assertEqual(nb_links, 4)
780 for link in res['network'][0]['ietf-network-topology:link']:
781 linkId = link['link-id']
782 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
783 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
785 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
787 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
789 def test_40_check_otn_topo_tp(self):
790 response = test_utils.get_otn_topo_request()
791 res = response.json()
792 for node in res['network'][0]['node']:
793 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
794 tpList = node['ietf-network-topology:termination-point']
796 if tp['tp-id'] == 'XPDR1-NETWORK1':
797 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
798 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
799 tsPoolList = list(range(1, 9))
801 tsPoolList, xpdrTpPortConAt['ts-pool'])
803 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
805 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
807 def test_41_delete_10GE_service(self):
808 response = test_utils.service_delete_request("service1-10GE")
809 self.assertEqual(response.status_code, requests.codes.ok)
810 res = response.json()
811 self.assertIn('Renderer service delete in progress',
812 res['output']['configuration-response-common']['response-message'])
813 time.sleep(self.WAITING)
815 def test_42_check_service_list(self):
816 response = test_utils.get_service_list_request("")
817 self.assertEqual(response.status_code, requests.codes.ok)
818 res = response.json()
819 self.assertEqual(len(res['service-list']['services']), 2)
822 def test_43_check_no_ODU2e_connection_spdra(self):
823 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
824 self.assertEqual(response.status_code, requests.codes.ok)
825 res = response.json()
826 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
829 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
830 response = test_utils_rfc8040.check_node_attribute_request(
831 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
832 self.assertEqual(response['status_code'], requests.codes.conflict)
834 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
835 response = test_utils_rfc8040.check_node_attribute_request(
836 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
837 self.assertEqual(response['status_code'], requests.codes.conflict)
839 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
840 response = test_utils_rfc8040.check_node_attribute_request(
841 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
842 self.assertEqual(response['status_code'], requests.codes.conflict)
844 def test_47_check_otn_topo_links(self):
845 response = test_utils.get_otn_topo_request()
846 self.assertEqual(response.status_code, requests.codes.ok)
847 res = response.json()
848 nb_links = len(res['network'][0]['ietf-network-topology:link'])
849 self.assertEqual(nb_links, 4)
850 for link in res['network'][0]['ietf-network-topology:link']:
851 linkId = link['link-id']
852 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
853 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
855 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
857 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
859 def test_48_check_otn_topo_tp(self):
860 response = test_utils.get_otn_topo_request()
861 res = response.json()
862 for node in res['network'][0]['node']:
863 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
864 tpList = node['ietf-network-topology:termination-point']
866 if tp['tp-id'] == 'XPDR1-NETWORK1':
867 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
868 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
870 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
872 def test_49_delete_ODU4_service(self):
873 response = test_utils.service_delete_request("service1-ODU4")
874 self.assertEqual(response.status_code, requests.codes.ok)
875 res = response.json()
876 self.assertIn('Renderer service delete in progress',
877 res['output']['configuration-response-common']['response-message'])
878 time.sleep(self.WAITING)
880 def test_50_check_service_list(self):
881 response = test_utils.get_service_list_request("")
882 self.assertEqual(response.status_code, requests.codes.ok)
883 res = response.json()
884 self.assertEqual(len(res['service-list']['services']), 1)
887 def test_51_check_no_interface_ODU4_spdra(self):
888 response = test_utils_rfc8040.check_node_attribute_request(
889 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
890 self.assertEqual(response['status_code'], requests.codes.conflict)
892 def test_52_check_otn_topo_links(self):
893 self.test_22_check_otn_topo_otu4_links()
895 def test_53_check_otn_topo_tp(self):
896 response = test_utils.get_otn_topo_request()
897 res = response.json()
898 for node in res['network'][0]['node']:
899 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
900 tpList = node['ietf-network-topology:termination-point']
902 if tp['tp-id'] == 'XPDR1-NETWORK1':
903 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
904 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
906 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
908 def test_54_delete_OCH_OTU4_service(self):
909 response = test_utils.service_delete_request("service1-OCH-OTU4")
910 self.assertEqual(response.status_code, requests.codes.ok)
911 res = response.json()
912 self.assertIn('Renderer service delete in progress',
913 res['output']['configuration-response-common']['response-message'])
914 time.sleep(self.WAITING)
916 def test_55_get_no_service(self):
917 response = test_utils.get_service_list_request("")
918 self.assertEqual(response.status_code, requests.codes.conflict)
919 res = response.json()
921 {"error-type": "application", "error-tag": "data-missing",
922 "error-message": "Request could not be completed because the relevant data model content does not exist"},
923 res['errors']['error'])
926 def test_56_check_no_interface_OTU4_spdra(self):
927 response = test_utils_rfc8040.check_node_attribute_request(
928 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
929 self.assertEqual(response['status_code'], requests.codes.conflict)
931 def test_57_check_no_interface_OCH_spdra(self):
932 response = test_utils_rfc8040.check_node_attribute_request(
933 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
934 self.assertEqual(response['status_code'], requests.codes.conflict)
936 def test_58_getLinks_OtnTopology(self):
937 response = test_utils.get_otn_topo_request()
938 self.assertEqual(response.status_code, requests.codes.ok)
939 res = response.json()
940 self.assertNotIn('ietf-network-topology:link', res['network'][0])
942 def test_59_check_openroadm_topo_spdra(self):
943 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
944 self.assertEqual(response['status_code'], requests.codes.ok)
945 tp = response['node']['ietf-network-topology:termination-point'][0]
946 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
947 self.assertNotIn('wavelength', dict.keys(
948 tp['org-openroadm-network-topology:xpdr-network-attributes']))
950 def test_60_check_openroadm_topo_ROADMA_SRG(self):
951 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
952 self.assertEqual(response['status_code'], requests.codes.ok)
953 freq_map = base64.b64decode(
954 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
955 freq_map_array = [int(x) for x in freq_map]
956 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
957 liste_tp = response['node']['ietf-network-topology:termination-point']
959 if ele['tp-id'] == 'SRG1-PP1-TXRX':
960 freq_map = base64.b64decode(
961 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
962 freq_map_array = [int(x) for x in freq_map]
963 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
966 def test_61_check_openroadm_topo_ROADMA_DEG(self):
967 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
968 self.assertEqual(response['status_code'], requests.codes.ok)
969 freq_map = base64.b64decode(
970 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
971 freq_map_array = [int(x) for x in freq_map]
972 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
973 liste_tp = response['node']['ietf-network-topology:termination-point']
975 if ele['tp-id'] == 'DEG2-CTP-TXRX':
976 freq_map = base64.b64decode(
977 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
978 freq_map_array = [int(x) for x in freq_map]
979 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
980 if ele['tp-id'] == 'DEG2-TTP-TXRX':
981 freq_map = base64.b64decode(
982 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
983 freq_map_array = [int(x) for x in freq_map]
984 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
987 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
988 response = test_utils_rfc8040.transportpce_api_rpc_request(
989 'transportpce-networkutils', 'init-xpdr-rdm-links',
990 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
991 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
992 self.assertEqual(response['status_code'], requests.codes.ok)
994 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
995 response = test_utils_rfc8040.transportpce_api_rpc_request(
996 'transportpce-networkutils', 'init-rdm-xpdr-links',
997 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
998 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
999 self.assertEqual(response['status_code'], requests.codes.ok)
1001 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1002 response = test_utils_rfc8040.transportpce_api_rpc_request(
1003 'transportpce-networkutils', 'init-xpdr-rdm-links',
1004 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1005 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1006 self.assertEqual(response['status_code'], requests.codes.ok)
1008 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1009 response = test_utils_rfc8040.transportpce_api_rpc_request(
1010 'transportpce-networkutils', 'init-rdm-xpdr-links',
1011 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1012 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1013 self.assertEqual(response['status_code'], requests.codes.ok)
1015 def test_66_create_OCH_OTU4_service_2(self):
1016 # pylint: disable=line-too-long
1017 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1018 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1019 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1020 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1021 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1022 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1023 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1024 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1025 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1026 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1027 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1028 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1029 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1030 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1031 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1032 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1033 response = test_utils.service_create_request(self.cr_serv_sample_data)
1034 self.assertEqual(response.status_code, requests.codes.ok)
1035 res = response.json()
1036 self.assertIn('PCE calculation in progress',
1037 res['output']['configuration-response-common']['response-message'])
1038 time.sleep(self.WAITING)
1040 def test_67_get_OCH_OTU4_service2(self):
1041 response = test_utils.get_service_list_request(
1042 "services/service2-OCH-OTU4")
1043 self.assertEqual(response.status_code, requests.codes.ok)
1044 res = response.json()
1046 res['services'][0]['administrative-state'], 'inService')
1048 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1050 res['services'][0]['connection-type'], 'infrastructure')
1052 res['services'][0]['lifecycle-state'], 'planned')
1055 def test_68_create_ODU4_service_2(self):
1056 # pylint: disable=line-too-long
1057 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1058 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1059 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1060 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1061 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1062 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1063 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1064 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1065 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1066 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1067 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1068 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1069 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1070 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1071 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1073 response = test_utils.service_create_request(self.cr_serv_sample_data)
1074 self.assertEqual(response.status_code, requests.codes.ok)
1075 res = response.json()
1076 self.assertIn('PCE calculation in progress',
1077 res['output']['configuration-response-common']['response-message'])
1078 time.sleep(self.WAITING)
1080 def test_69_get_ODU4_service2(self):
1081 response = test_utils.get_service_list_request(
1082 "services/service2-ODU4")
1083 self.assertEqual(response.status_code, requests.codes.ok)
1084 res = response.json()
1086 res['services'][0]['administrative-state'], 'inService')
1088 res['services'][0]['service-name'], 'service2-ODU4')
1090 res['services'][0]['connection-type'], 'infrastructure')
1092 res['services'][0]['lifecycle-state'], 'planned')
1095 def test_70_create_1GE_service(self):
1096 # pylint: disable=line-too-long
1097 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1098 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1099 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1100 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1101 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1102 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1103 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1104 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1105 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1106 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1107 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1108 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1109 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1110 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1111 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1112 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1113 response = test_utils.service_create_request(self.cr_serv_sample_data)
1114 self.assertEqual(response.status_code, requests.codes.ok)
1115 res = response.json()
1116 self.assertIn('PCE calculation in progress',
1117 res['output']['configuration-response-common']['response-message'])
1118 time.sleep(self.WAITING)
1120 def test_71_get_1GE_service1(self):
1121 response = test_utils.get_service_list_request("services/service1-1GE")
1122 self.assertEqual(response.status_code, requests.codes.ok)
1123 res = response.json()
1125 res['services'][0]['administrative-state'], 'inService')
1127 res['services'][0]['service-name'], 'service1-1GE')
1129 res['services'][0]['connection-type'], 'service')
1131 res['services'][0]['lifecycle-state'], 'planned')
1134 def test_72_check_interface_1GE_CLIENT_spdra(self):
1135 response = test_utils_rfc8040.check_node_attribute_request(
1136 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1137 self.assertEqual(response['status_code'], requests.codes.ok)
1138 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1139 'administrative-state': 'inService',
1140 'supporting-circuit-pack-name': 'CP1-SFP4',
1141 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1142 'supporting-port': 'CP1-SFP4-P1'
1144 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1145 response['interface'][0])
1148 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1150 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1151 response = test_utils_rfc8040.check_node_attribute_request(
1152 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1153 self.assertEqual(response['status_code'], requests.codes.ok)
1154 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1155 'administrative-state': 'inService',
1156 'supporting-circuit-pack-name': 'CP3-SFP1',
1157 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1158 'type': 'org-openroadm-interfaces:otnOdu',
1159 'supporting-port': 'CP3-SFP1-P1'}
1161 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1162 'rate': 'org-openroadm-otn-common-types:ODU0',
1163 'monitoring-mode': 'terminated'}
1164 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1165 response['interface'][0])
1166 self.assertDictEqual(dict(input_dict_2,
1167 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1168 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1169 self.assertDictEqual(
1170 {'payload-type': '07', 'exp-payload-type': '07'},
1171 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1173 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1174 response = test_utils_rfc8040.check_node_attribute_request(
1175 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1176 self.assertEqual(response['status_code'], requests.codes.ok)
1177 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1178 'administrative-state': 'inService',
1179 'supporting-circuit-pack-name': 'CP3-CFP0',
1180 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1181 'type': 'org-openroadm-interfaces:otnOdu',
1182 'supporting-port': 'CP3-CFP0-P1'}
1184 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1185 'rate': 'org-openroadm-otn-common-types:ODU0',
1186 'monitoring-mode': 'monitored'}
1187 input_dict_3 = {'trib-port-number': 1}
1188 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1189 response['interface'][0])
1190 self.assertDictEqual(dict(input_dict_2,
1191 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1192 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1193 self.assertDictEqual(dict(input_dict_3,
1194 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1195 'parent-odu-allocation']),
1196 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1197 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1200 def test_75_check_ODU0_connection_spdra(self):
1201 response = test_utils_rfc8040.check_node_attribute_request(
1202 'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1203 self.assertEqual(response['status_code'], requests.codes.ok)
1206 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1207 'direction': 'bidirectional'
1209 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1210 response['odu-connection'][0])
1211 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1212 response['odu-connection'][0]['destination'])
1213 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1214 response['odu-connection'][0]['source'])
1216 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1217 response = test_utils_rfc8040.check_node_attribute_request(
1218 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1219 self.assertEqual(response['status_code'], requests.codes.ok)
1220 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1221 'administrative-state': 'inService',
1222 'supporting-circuit-pack-name': 'CP3-SFP1',
1223 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1224 'supporting-port': 'CP3-SFP1-P1'
1226 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1227 response['interface'][0])
1230 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1232 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1233 response = test_utils_rfc8040.check_node_attribute_request(
1234 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1235 self.assertEqual(response['status_code'], requests.codes.ok)
1236 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1237 'administrative-state': 'inService',
1238 'supporting-circuit-pack-name': 'CP3-SFP1',
1239 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1240 'type': 'org-openroadm-interfaces:otnOdu',
1241 'supporting-port': 'CP3-SFP1-P1'}
1243 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1244 'rate': 'org-openroadm-otn-common-types:ODU0',
1245 'monitoring-mode': 'terminated'}
1246 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1247 response['interface'][0])
1248 self.assertDictEqual(dict(input_dict_2,
1249 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1250 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1251 self.assertDictEqual(
1252 {'payload-type': '07', 'exp-payload-type': '07'},
1253 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1255 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1256 response = test_utils_rfc8040.check_node_attribute_request(
1257 'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1258 self.assertEqual(response['status_code'], requests.codes.ok)
1259 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1260 'administrative-state': 'inService',
1261 'supporting-circuit-pack-name': 'CP3-CFP0',
1262 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1263 'type': 'org-openroadm-interfaces:otnOdu',
1264 'supporting-port': 'CP3-CFP0-P1'}
1266 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1267 'rate': 'org-openroadm-otn-common-types:ODU0',
1268 'monitoring-mode': 'monitored'}
1269 input_dict_3 = {'trib-port-number': 1}
1270 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1271 response['interface'][0])
1272 self.assertDictEqual(dict(input_dict_2,
1273 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1274 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1275 self.assertDictEqual(dict(input_dict_3,
1276 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1277 'parent-odu-allocation']),
1278 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1279 'parent-odu-allocation'])
1281 response['interface'][0][
1282 'org-openroadm-otn-odu-interfaces:odu'][
1283 'parent-odu-allocation']['trib-slots'])
1285 def test_79_check_ODU0_connection_spdrc(self):
1286 response = test_utils_rfc8040.check_node_attribute_request(
1287 'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1288 self.assertEqual(response['status_code'], requests.codes.ok)
1291 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1292 'direction': 'bidirectional'
1294 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1295 response['odu-connection'][0])
1296 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1297 response['odu-connection'][0]['destination'])
1298 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1299 response['odu-connection'][0]['source'])
1301 def test_80_check_otn_topo_links(self):
1302 response = test_utils.get_otn_topo_request()
1303 self.assertEqual(response.status_code, requests.codes.ok)
1304 res = response.json()
1305 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1306 self.assertEqual(nb_links, 4)
1307 for link in res['network'][0]['ietf-network-topology:link']:
1308 linkId = link['link-id']
1309 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1310 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1312 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1314 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1316 def test_81_check_otn_topo_tp(self):
1317 response = test_utils.get_otn_topo_request()
1318 res = response.json()
1319 for node in res['network'][0]['node']:
1320 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1321 tpList = node['ietf-network-topology:termination-point']
1323 if tp['tp-id'] == 'XPDR3-NETWORK1':
1324 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1325 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1326 tsPoolList = list(range(1, 2))
1328 tsPoolList, xpdrTpPortConAt['ts-pool'])
1330 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1332 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1334 def test_82_delete_1GE_service(self):
1335 response = test_utils.service_delete_request("service1-1GE")
1336 self.assertEqual(response.status_code, requests.codes.ok)
1337 res = response.json()
1338 self.assertIn('Renderer service delete in progress',
1339 res['output']['configuration-response-common']['response-message'])
1340 time.sleep(self.WAITING)
1342 def test_83_check_service_list(self):
1343 response = test_utils.get_service_list_request("")
1344 self.assertEqual(response.status_code, requests.codes.ok)
1345 res = response.json()
1346 self.assertEqual(len(res['service-list']['services']), 2)
1349 def test_84_check_no_ODU0_connection_spdra(self):
1350 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1351 self.assertEqual(response.status_code, requests.codes.ok)
1352 res = response.json()
1353 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1356 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1357 response = test_utils_rfc8040.check_node_attribute_request(
1358 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1359 self.assertEqual(response['status_code'], requests.codes.conflict)
1361 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1362 response = test_utils_rfc8040.check_node_attribute_request(
1363 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1364 self.assertEqual(response['status_code'], requests.codes.conflict)
1366 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1367 response = test_utils_rfc8040.check_node_attribute_request(
1368 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1369 self.assertEqual(response['status_code'], requests.codes.conflict)
1371 def test_88_check_otn_topo_links(self):
1372 response = test_utils.get_otn_topo_request()
1373 self.assertEqual(response.status_code, requests.codes.ok)
1374 res = response.json()
1375 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1376 self.assertEqual(nb_links, 4)
1377 for link in res['network'][0]['ietf-network-topology:link']:
1378 linkId = link['link-id']
1379 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1380 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1382 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1384 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1386 def test_89_check_otn_topo_tp(self):
1387 response = test_utils.get_otn_topo_request()
1388 res = response.json()
1389 for node in res['network'][0]['node']:
1390 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1391 tpList = node['ietf-network-topology:termination-point']
1393 if tp['tp-id'] == 'XPDR3-NETWORK1':
1394 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1395 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1397 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1399 def test_90_delete_ODU4_service(self):
1400 response = test_utils.service_delete_request("service2-ODU4")
1401 self.assertEqual(response.status_code, requests.codes.ok)
1402 res = response.json()
1403 self.assertIn('Renderer service delete in progress',
1404 res['output']['configuration-response-common']['response-message'])
1405 time.sleep(self.WAITING)
1407 def test_91_delete_OCH_OTU4_service(self):
1408 response = test_utils.service_delete_request("service2-OCH-OTU4")
1409 self.assertEqual(response.status_code, requests.codes.ok)
1410 res = response.json()
1411 self.assertIn('Renderer service delete in progress',
1412 res['output']['configuration-response-common']['response-message'])
1413 time.sleep(self.WAITING)
1415 def test_92_disconnect_xponders_from_roadm(self):
1416 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1417 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1418 self.assertEqual(response['status_code'], requests.codes.ok)
1419 links = response['network'][0]['ietf-network-topology:link']
1421 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1422 link_name = link["link-id"]
1423 response = test_utils.delete_request(url+link_name)
1424 self.assertEqual(response.status_code, requests.codes.ok)
1426 def test_93_check_openroadm_topology(self):
1427 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1428 self.assertEqual(response['status_code'], requests.codes.ok)
1429 self.assertEqual(18,
1430 len(response['network'][0]['ietf-network-topology:link']),
1431 'Topology should contain 18 links')
1433 def test_94_disconnect_spdrA(self):
1434 response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1435 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1437 def test_95_disconnect_spdrC(self):
1438 response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1439 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1441 def test_96_disconnect_roadmA(self):
1442 response = test_utils_rfc8040.unmount_device("ROADM-A1")
1443 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1445 def test_97_disconnect_roadmC(self):
1446 response = test_utils_rfc8040.unmount_device("ROADM-C1")
1447 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1450 if __name__ == "__main__":
1451 unittest.main(verbosity=2)