3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
26 import test_utils_rfc8040 # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION = '2.2.1'
35 cr_serv_sample_data = {"input": {
36 "sdnc-request-header": {
37 "request-id": "request-1",
38 "rpc-action": "service-create",
39 "request-system-id": "appname"
41 "service-name": "service1-OCH-OTU4",
42 "common-id": "commonId",
43 "connection-type": "infrastructure",
45 "service-rate": "100",
46 "node-id": "SPDR-SA1",
47 "service-format": "OTU",
48 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
52 "port-device-name": "SPDR-SA1-XPDR1",
54 "port-name": "XPDR1-NETWORK1",
55 "port-rack": "000000.00",
56 "port-shelf": "Chassis#1"
59 "lgx-device-name": "Some lgx-device-name",
60 "lgx-port-name": "Some lgx-port-name",
61 "lgx-port-rack": "000000.00",
62 "lgx-port-shelf": "00"
68 "port-device-name": "SPDR-SA1-XPDR1",
70 "port-name": "XPDR1-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
85 "service-rate": "100",
86 "node-id": "SPDR-SC1",
87 "service-format": "OTU",
88 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92 "port-device-name": "SPDR-SC1-XPDR1",
94 "port-name": "XPDR1-NETWORK1",
95 "port-rack": "000000.00",
96 "port-shelf": "Chassis#1"
99 "lgx-device-name": "Some lgx-device-name",
100 "lgx-port-name": "Some lgx-port-name",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
108 "port-device-name": "SPDR-SC1-XPDR1",
109 "port-type": "fixed",
110 "port-name": "XPDR1-NETWORK1",
111 "port-rack": "000000.00",
112 "port-shelf": "Chassis#1"
115 "lgx-device-name": "Some lgx-device-name",
116 "lgx-port-name": "Some lgx-port-name",
117 "lgx-port-rack": "000000.00",
118 "lgx-port-shelf": "00"
124 "due-date": "2018-06-15T00:00:01Z",
125 "operator-contact": "pw1234"
131 cls.processes = test_utils_rfc8040.start_tpce()
132 cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
133 ('roadma', cls.NODE_VERSION),
134 ('roadmc', cls.NODE_VERSION),
135 ('spdrc', cls.NODE_VERSION)])
138 def tearDownClass(cls):
139 # pylint: disable=not-an-iterable
140 for process in cls.processes:
141 test_utils_rfc8040.shutdown_process(process)
142 print("all processes killed")
147 def test_01_connect_spdrA(self):
148 response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
149 self.assertEqual(response.status_code,
150 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
152 def test_02_connect_spdrC(self):
153 response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
154 self.assertEqual(response.status_code,
155 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
157 def test_03_connect_rdmA(self):
158 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
159 self.assertEqual(response.status_code,
160 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
162 def test_04_connect_rdmC(self):
163 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
164 self.assertEqual(response.status_code,
165 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
167 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
168 response = test_utils_rfc8040.transportpce_api_rpc_request(
169 'transportpce-networkutils', 'init-xpdr-rdm-links',
170 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
171 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
172 self.assertEqual(response['status_code'], requests.codes.ok)
174 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
175 response = test_utils_rfc8040.transportpce_api_rpc_request(
176 'transportpce-networkutils', 'init-rdm-xpdr-links',
177 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
178 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
179 self.assertEqual(response['status_code'], requests.codes.ok)
181 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
182 response = test_utils_rfc8040.transportpce_api_rpc_request(
183 'transportpce-networkutils', 'init-xpdr-rdm-links',
184 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
185 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
186 self.assertEqual(response['status_code'], requests.codes.ok)
188 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
189 response = test_utils_rfc8040.transportpce_api_rpc_request(
190 'transportpce-networkutils', 'init-rdm-xpdr-links',
191 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
192 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
193 self.assertEqual(response['status_code'], requests.codes.ok)
195 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
196 # Config ROADMA-ROADMC oms-attributes
198 "auto-spanloss": "true",
199 "spanloss-base": 11.4,
200 "spanloss-current": 12,
201 "engineered-spanloss": 12.2,
202 "link-concatenation": [{
205 "SRLG-length": 100000,
207 response = test_utils_rfc8040.add_oms_attr_request(
208 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
209 self.assertEqual(response.status_code, requests.codes.created)
211 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
212 # Config ROADMC-ROADMA oms-attributes
214 "auto-spanloss": "true",
215 "spanloss-base": 11.4,
216 "spanloss-current": 12,
217 "engineered-spanloss": 12.2,
218 "link-concatenation": [{
221 "SRLG-length": 100000,
223 response = test_utils_rfc8040.add_oms_attr_request(
224 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
225 self.assertEqual(response.status_code, requests.codes.created)
227 # test service-create for OCH-OTU4 service from spdr to spdr
228 def test_11_check_otn_topology(self):
229 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
230 self.assertEqual(response['status_code'], requests.codes.ok)
231 self.assertEqual(len(response['network'][0]['node']), 6)
232 self.assertNotIn('ietf-network-topology:link', response['network'][0])
234 def test_12_create_OCH_OTU4_service(self):
235 response = test_utils.service_create_request(self.cr_serv_sample_data)
236 self.assertEqual(response.status_code, requests.codes.ok)
237 res = response.json()
238 self.assertIn('PCE calculation in progress',
239 res['output']['configuration-response-common']['response-message'])
240 time.sleep(self.WAITING)
242 def test_13_get_OCH_OTU4_service1(self):
243 response = test_utils.get_service_list_request(
244 "services/service1-OCH-OTU4")
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
248 res['services'][0]['administrative-state'], 'inService')
250 res['services'][0]['service-name'], 'service1-OCH-OTU4')
252 res['services'][0]['connection-type'], 'infrastructure')
254 res['services'][0]['lifecycle-state'], 'planned')
257 # Check correct configuration of devices
258 def test_14_check_interface_och_spdra(self):
259 response = test_utils_rfc8040.check_node_attribute_request(
260 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
261 self.assertEqual(response['status_code'], requests.codes.ok)
262 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
263 'administrative-state': 'inService',
264 'supporting-circuit-pack-name': 'CP1-CFP0',
265 'type': 'org-openroadm-interfaces:opticalChannel',
266 'supporting-port': 'CP1-CFP0-P1'
267 }, **response['interface'][0]),
268 response['interface'][0])
269 self.assertEqual('org-openroadm-common-types:R100G',
270 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
271 self.assertEqual('dp-qpsk',
272 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
273 self.assertEqual(196.1,
274 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
277 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
279 def test_15_check_interface_OTU4_spdra(self):
280 response = test_utils_rfc8040.check_node_attribute_request(
281 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
282 self.assertEqual(response['status_code'], requests.codes.ok)
283 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
284 'administrative-state': 'inService',
285 'supporting-circuit-pack-name': 'CP1-CFP0',
286 'supporting-interface': 'XPDR1-NETWORK1-761:768',
287 'type': 'org-openroadm-interfaces:otnOtu',
288 'supporting-port': 'CP1-CFP0-P1'
290 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
291 'expected-dapi': 'H/OelLynehI=',
292 'tx-dapi': 'AMf1n5hK6Xkk',
293 'expected-sapi': 'AMf1n5hK6Xkk',
294 'rate': 'org-openroadm-otn-common-types:OTU4',
297 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
298 response['interface'][0])
299 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
300 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
302 def test_16_check_interface_och_spdrc(self):
303 response = test_utils_rfc8040.check_node_attribute_request(
304 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
305 self.assertEqual(response['status_code'], requests.codes.ok)
306 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
307 'administrative-state': 'inService',
308 'supporting-circuit-pack-name': 'CP1-CFP0',
309 'type': 'org-openroadm-interfaces:opticalChannel',
310 'supporting-port': 'CP1-CFP0-P1'
311 }, **response['interface'][0]),
312 response['interface'][0])
313 self.assertEqual('org-openroadm-common-types:R100G',
314 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
315 self.assertEqual('dp-qpsk',
316 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
317 self.assertEqual(196.1,
318 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
321 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
323 def test_17_check_interface_OTU4_spdrc(self):
324 response = test_utils_rfc8040.check_node_attribute_request(
325 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
326 self.assertEqual(response['status_code'], requests.codes.ok)
327 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
328 'administrative-state': 'inService',
329 'supporting-circuit-pack-name': 'CP1-CFP0',
330 'supporting-interface': 'XPDR1-NETWORK1-761:768',
331 'type': 'org-openroadm-interfaces:otnOtu',
332 'supporting-port': 'CP1-CFP0-P1'
334 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
335 'expected-sapi': 'H/OelLynehI=',
336 'tx-sapi': 'AMf1n5hK6Xkk',
337 'expected-dapi': 'AMf1n5hK6Xkk',
338 'rate': 'org-openroadm-otn-common-types:OTU4',
341 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
342 response['interface'][0])
343 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
344 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
346 def test_18_check_no_interface_ODU4_spdra(self):
347 response = test_utils_rfc8040.check_node_attribute_request(
348 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
349 self.assertEqual(response['status_code'], requests.codes.conflict)
351 def test_19_check_openroadm_topo_spdra(self):
352 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
353 self.assertEqual(response['status_code'], requests.codes.ok)
354 ele = response['node']['ietf-network-topology:termination-point'][0]
355 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
358 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['frequency']))
361 float(ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength']['width']))
363 def test_20_check_openroadm_topo_ROADMA_SRG(self):
364 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
365 self.assertEqual(response['status_code'], requests.codes.ok)
366 freq_map = base64.b64decode(
367 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
368 freq_map_array = [int(x) for x in freq_map]
369 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
370 liste_tp = response['node']['ietf-network-topology:termination-point']
372 if ele['tp-id'] == 'SRG1-PP1-TXRX':
373 freq_map = base64.b64decode(
374 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
375 freq_map_array = [int(x) for x in freq_map]
376 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
377 if ele['tp-id'] == 'SRG1-PP2-TXRX':
378 self.assertNotIn('avail-freq-maps', dict.keys(ele))
380 def test_21_check_openroadm_topo_ROADMA_DEG(self):
381 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
382 self.assertEqual(response['status_code'], requests.codes.ok)
383 freq_map = base64.b64decode(
384 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
385 freq_map_array = [int(x) for x in freq_map]
386 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
387 liste_tp = response['node']['ietf-network-topology:termination-point']
389 if ele['tp-id'] == 'DEG2-CTP-TXRX':
390 freq_map = base64.b64decode(
391 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
392 freq_map_array = [int(x) for x in freq_map]
393 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
394 if ele['tp-id'] == 'DEG2-TTP-TXRX':
395 freq_map = base64.b64decode(
396 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
397 freq_map_array = [int(x) for x in freq_map]
398 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
400 def test_22_check_otn_topo_otu4_links(self):
401 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
402 self.assertEqual(response['status_code'], requests.codes.ok)
403 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 2)
404 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
405 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
406 for link in response['network'][0]['ietf-network-topology:link']:
407 self.assertIn(link['link-id'], listLinkId)
409 link['transportpce-topology:otn-link-type'], 'OTU4')
411 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
413 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
415 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
417 link['org-openroadm-common-network:opposite-link'], listLinkId)
419 # test service-create for ODU4 service from spdr to spdr
420 def test_23_create_ODU4_service(self):
421 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
422 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
423 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
424 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
425 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
426 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
427 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
429 response = test_utils.service_create_request(self.cr_serv_sample_data)
430 self.assertEqual(response.status_code, requests.codes.ok)
431 res = response.json()
432 self.assertIn('PCE calculation in progress',
433 res['output']['configuration-response-common']['response-message'])
434 time.sleep(self.WAITING)
436 def test_24_get_ODU4_service1(self):
437 response = test_utils.get_service_list_request(
438 "services/service1-ODU4")
439 self.assertEqual(response.status_code, requests.codes.ok)
440 res = response.json()
442 res['services'][0]['administrative-state'], 'inService')
444 res['services'][0]['service-name'], 'service1-ODU4')
446 res['services'][0]['connection-type'], 'infrastructure')
448 res['services'][0]['lifecycle-state'], 'planned')
451 def test_25_check_interface_ODU4_spdra(self):
452 response = test_utils_rfc8040.check_node_attribute_request(
453 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
454 self.assertEqual(response['status_code'], requests.codes.ok)
455 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
456 'administrative-state': 'inService',
457 'supporting-circuit-pack-name': 'CP1-CFP0',
458 'supporting-interface': 'XPDR1-NETWORK1-OTU',
459 'type': 'org-openroadm-interfaces:otnOdu',
460 'supporting-port': 'CP1-CFP0-P1',
462 'description': 'TBD'}
463 # SAPI/DAPI are added in the Otu4 renderer
464 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
465 'rate': 'org-openroadm-otn-common-types:ODU4',
466 'monitoring-mode': 'terminated',
467 'expected-dapi': 'H/OelLynehI=',
468 'expected-sapi': 'AMf1n5hK6Xkk',
469 'tx-dapi': 'AMf1n5hK6Xkk',
470 'tx-sapi': 'H/OelLynehI='}
472 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
473 response['interface'][0])
474 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
476 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
478 self.assertDictEqual(
479 {'payload-type': '21', 'exp-payload-type': '21'},
480 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
482 def test_26_check_interface_ODU4_spdrc(self):
483 response = test_utils_rfc8040.check_node_attribute_request(
484 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
485 self.assertEqual(response['status_code'], requests.codes.ok)
486 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
487 'administrative-state': 'inService',
488 'supporting-circuit-pack-name': 'CP1-CFP0',
489 'supporting-interface': 'XPDR1-NETWORK1-OTU',
490 'type': 'org-openroadm-interfaces:otnOdu',
491 'supporting-port': 'CP1-CFP0-P1',
493 'description': 'TBD'}
494 # SAPI/DAPI are added in the Otu4 renderer
495 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
496 'rate': 'org-openroadm-otn-common-types:ODU4',
497 'monitoring-mode': 'terminated',
498 'tx-sapi': 'AMf1n5hK6Xkk',
499 'tx-dapi': 'H/OelLynehI=',
500 'expected-sapi': 'H/OelLynehI=',
501 'expected-dapi': 'AMf1n5hK6Xkk'
503 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
504 response['interface'][0])
505 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
507 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
509 self.assertDictEqual(
510 {'payload-type': '21', 'exp-payload-type': '21'},
511 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
513 def test_27_check_otn_topo_links(self):
514 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
515 self.assertEqual(response['status_code'], requests.codes.ok)
516 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
517 for link in response['network'][0]['ietf-network-topology:link']:
518 linkId = link['link-id']
519 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
520 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
522 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
524 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
525 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
526 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
528 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
530 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
532 link['transportpce-topology:otn-link-type'], 'ODTU4')
534 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
535 self.assertIn(link['org-openroadm-common-network:opposite-link'],
536 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
537 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
539 self.fail("this link should not exist")
541 def test_28_check_otn_topo_tp(self):
542 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
543 self.assertEqual(response['status_code'], requests.codes.ok)
544 for node in response['network'][0]['node']:
545 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
546 tpList = node['ietf-network-topology:termination-point']
548 if tp['tp-id'] == 'XPDR1-NETWORK1':
549 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
550 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
552 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
553 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
554 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
556 # test service-create for 10GE service from spdr to spdr
557 def test_29_create_10GE_service(self):
558 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
559 self.cr_serv_sample_data["input"]["connection-type"] = "service"
560 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
561 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
562 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
563 self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
564 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
565 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
566 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
567 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
568 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
569 self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
570 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
571 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
572 response = test_utils.service_create_request(self.cr_serv_sample_data)
573 self.assertEqual(response.status_code, requests.codes.ok)
574 res = response.json()
575 self.assertIn('PCE calculation in progress',
576 res['output']['configuration-response-common']['response-message'])
577 time.sleep(self.WAITING)
579 def test_30_get_10GE_service1(self):
580 response = test_utils.get_service_list_request(
581 "services/service1-10GE")
582 self.assertEqual(response.status_code, requests.codes.ok)
583 res = response.json()
585 res['services'][0]['administrative-state'], 'inService')
587 res['services'][0]['service-name'], 'service1-10GE')
589 res['services'][0]['connection-type'], 'service')
591 res['services'][0]['lifecycle-state'], 'planned')
594 def test_31_check_interface_10GE_CLIENT_spdra(self):
595 response = test_utils_rfc8040.check_node_attribute_request(
596 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
597 self.assertEqual(response['status_code'], requests.codes.ok)
598 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
599 'administrative-state': 'inService',
600 'supporting-circuit-pack-name': 'CP1-SFP4',
601 'type': 'org-openroadm-interfaces:ethernetCsmacd',
602 'supporting-port': 'CP1-SFP4-P1'
604 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
605 response['interface'][0])
608 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
610 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
611 response = test_utils_rfc8040.check_node_attribute_request(
612 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
613 self.assertEqual(response['status_code'], requests.codes.ok)
614 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
615 'administrative-state': 'inService',
616 'supporting-circuit-pack-name': 'CP1-SFP4',
617 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
618 'type': 'org-openroadm-interfaces:otnOdu',
619 'supporting-port': 'CP1-SFP4-P1'}
621 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
622 'rate': 'org-openroadm-otn-common-types:ODU2e',
623 'monitoring-mode': 'terminated'}
625 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
626 response['interface'][0])
627 self.assertDictEqual(dict(input_dict_2,
628 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
629 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
630 self.assertDictEqual(
631 {'payload-type': '03', 'exp-payload-type': '03'},
632 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
634 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
635 response = test_utils_rfc8040.check_node_attribute_request(
636 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
637 self.assertEqual(response['status_code'], requests.codes.ok)
638 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
639 'administrative-state': 'inService',
640 'supporting-circuit-pack-name': 'CP1-CFP0',
641 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
642 'type': 'org-openroadm-interfaces:otnOdu',
643 'supporting-port': 'CP1-CFP0-P1'}
645 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
646 'rate': 'org-openroadm-otn-common-types:ODU2e',
647 'monitoring-mode': 'monitored'}
648 input_dict_3 = {'trib-port-number': 1}
650 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
651 response['interface'][0])
652 self.assertDictEqual(dict(input_dict_2,
653 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
654 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
655 self.assertDictEqual(dict(input_dict_3,
656 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
657 'parent-odu-allocation']),
658 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
659 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
662 def test_34_check_ODU2E_connection_spdra(self):
663 response = test_utils_rfc8040.check_node_attribute_request(
664 'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
665 self.assertEqual(response['status_code'], requests.codes.ok)
668 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
669 'direction': 'bidirectional'
672 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
673 response['odu-connection'][0])
674 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
675 response['odu-connection'][0]['destination'])
676 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
677 response['odu-connection'][0]['source'])
679 def test_35_check_interface_10GE_CLIENT_spdrc(self):
680 response = test_utils_rfc8040.check_node_attribute_request(
681 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
682 self.assertEqual(response['status_code'], requests.codes.ok)
683 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
684 'administrative-state': 'inService',
685 'supporting-circuit-pack-name': 'CP1-SFP4',
686 'type': 'org-openroadm-interfaces:ethernetCsmacd',
687 'supporting-port': 'CP1-SFP4-P1'
689 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
690 response['interface'][0])
693 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
695 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
696 response = test_utils_rfc8040.check_node_attribute_request(
697 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
698 self.assertEqual(response['status_code'], requests.codes.ok)
699 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
700 'administrative-state': 'inService',
701 'supporting-circuit-pack-name': 'CP1-SFP4',
702 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
703 'type': 'org-openroadm-interfaces:otnOdu',
704 'supporting-port': 'CP1-SFP4-P1'}
706 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
707 'rate': 'org-openroadm-otn-common-types:ODU2e',
708 'monitoring-mode': 'terminated'}
710 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
711 response['interface'][0])
712 self.assertDictEqual(dict(input_dict_2,
713 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
714 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
715 self.assertDictEqual(
716 {'payload-type': '03', 'exp-payload-type': '03'},
717 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
719 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
720 response = test_utils_rfc8040.check_node_attribute_request(
721 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
722 self.assertEqual(response['status_code'], requests.codes.ok)
723 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
724 'administrative-state': 'inService',
725 'supporting-circuit-pack-name': 'CP1-CFP0',
726 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
727 'type': 'org-openroadm-interfaces:otnOdu',
728 'supporting-port': 'CP1-CFP0-P1'}
730 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
731 'rate': 'org-openroadm-otn-common-types:ODU2e',
732 'monitoring-mode': 'monitored'}
734 input_dict_3 = {'trib-port-number': 1}
736 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
737 response['interface'][0])
738 self.assertDictEqual(dict(input_dict_2,
739 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
740 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
741 self.assertDictEqual(dict(input_dict_3,
742 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
743 'parent-odu-allocation']),
744 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
745 'parent-odu-allocation'])
747 response['interface'][0][
748 'org-openroadm-otn-odu-interfaces:odu'][
749 'parent-odu-allocation']['trib-slots'])
751 def test_38_check_ODU2E_connection_spdrc(self):
752 response = test_utils_rfc8040.check_node_attribute_request(
753 'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
754 self.assertEqual(response['status_code'], requests.codes.ok)
757 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
758 'direction': 'bidirectional'
761 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
762 response['odu-connection'][0])
763 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
764 response['odu-connection'][0]['destination'])
765 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
766 response['odu-connection'][0]['source'])
768 def test_39_check_otn_topo_links(self):
769 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
770 self.assertEqual(response['status_code'], requests.codes.ok)
771 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
772 for link in response['network'][0]['ietf-network-topology:link']:
773 linkId = link['link-id']
774 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
775 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
777 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
779 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
781 def test_40_check_otn_topo_tp(self):
782 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
783 self.assertEqual(response['status_code'], requests.codes.ok)
784 for node in response['network'][0]['node']:
785 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
786 tpList = node['ietf-network-topology:termination-point']
788 if tp['tp-id'] == 'XPDR1-NETWORK1':
789 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
790 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
791 tsPoolList = list(range(1, 9))
793 tsPoolList, xpdrTpPortConAt['ts-pool'])
795 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
797 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
799 def test_41_delete_10GE_service(self):
800 response = test_utils.service_delete_request("service1-10GE")
801 self.assertEqual(response.status_code, requests.codes.ok)
802 res = response.json()
803 self.assertIn('Renderer service delete in progress',
804 res['output']['configuration-response-common']['response-message'])
805 time.sleep(self.WAITING)
807 def test_42_check_service_list(self):
808 response = test_utils.get_service_list_request("")
809 self.assertEqual(response.status_code, requests.codes.ok)
810 res = response.json()
811 self.assertEqual(len(res['service-list']['services']), 2)
814 def test_43_check_no_ODU2e_connection_spdra(self):
815 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
816 self.assertEqual(response.status_code, requests.codes.ok)
817 res = response.json()
818 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
821 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
822 response = test_utils_rfc8040.check_node_attribute_request(
823 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
824 self.assertEqual(response['status_code'], requests.codes.conflict)
826 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
827 response = test_utils_rfc8040.check_node_attribute_request(
828 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
829 self.assertEqual(response['status_code'], requests.codes.conflict)
831 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
832 response = test_utils_rfc8040.check_node_attribute_request(
833 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
834 self.assertEqual(response['status_code'], requests.codes.conflict)
836 def test_47_check_otn_topo_links(self):
837 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
838 self.assertEqual(response['status_code'], requests.codes.ok)
839 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
840 for link in response['network'][0]['ietf-network-topology:link']:
841 linkId = link['link-id']
842 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
843 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
845 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
847 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
849 def test_48_check_otn_topo_tp(self):
850 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
851 self.assertEqual(response['status_code'], requests.codes.ok)
852 for node in response['network'][0]['node']:
853 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
854 tpList = node['ietf-network-topology:termination-point']
856 if tp['tp-id'] == 'XPDR1-NETWORK1':
857 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
858 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
860 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
862 def test_49_delete_ODU4_service(self):
863 response = test_utils.service_delete_request("service1-ODU4")
864 self.assertEqual(response.status_code, requests.codes.ok)
865 res = response.json()
866 self.assertIn('Renderer service delete in progress',
867 res['output']['configuration-response-common']['response-message'])
868 time.sleep(self.WAITING)
870 def test_50_check_service_list(self):
871 response = test_utils.get_service_list_request("")
872 self.assertEqual(response.status_code, requests.codes.ok)
873 res = response.json()
874 self.assertEqual(len(res['service-list']['services']), 1)
877 def test_51_check_no_interface_ODU4_spdra(self):
878 response = test_utils_rfc8040.check_node_attribute_request(
879 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
880 self.assertEqual(response['status_code'], requests.codes.conflict)
882 def test_52_check_otn_topo_links(self):
883 self.test_22_check_otn_topo_otu4_links()
885 def test_53_check_otn_topo_tp(self):
886 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
887 self.assertEqual(response['status_code'], requests.codes.ok)
888 for node in response['network'][0]['node']:
889 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
890 tpList = node['ietf-network-topology:termination-point']
892 if tp['tp-id'] == 'XPDR1-NETWORK1':
893 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
894 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
896 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
898 def test_54_delete_OCH_OTU4_service(self):
899 response = test_utils.service_delete_request("service1-OCH-OTU4")
900 self.assertEqual(response.status_code, requests.codes.ok)
901 res = response.json()
902 self.assertIn('Renderer service delete in progress',
903 res['output']['configuration-response-common']['response-message'])
904 time.sleep(self.WAITING)
906 def test_55_get_no_service(self):
907 response = test_utils.get_service_list_request("")
908 self.assertEqual(response.status_code, requests.codes.conflict)
909 res = response.json()
911 {"error-type": "application", "error-tag": "data-missing",
912 "error-message": "Request could not be completed because the relevant data model content does not exist"},
913 res['errors']['error'])
916 def test_56_check_no_interface_OTU4_spdra(self):
917 response = test_utils_rfc8040.check_node_attribute_request(
918 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
919 self.assertEqual(response['status_code'], requests.codes.conflict)
921 def test_57_check_no_interface_OCH_spdra(self):
922 response = test_utils_rfc8040.check_node_attribute_request(
923 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
924 self.assertEqual(response['status_code'], requests.codes.conflict)
926 def test_58_getLinks_OtnTopology(self):
927 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
928 self.assertEqual(response['status_code'], requests.codes.ok)
929 self.assertNotIn('ietf-network-topology:link', response['network'][0])
931 def test_59_check_openroadm_topo_spdra(self):
932 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'SPDR-SA1-XPDR1', 'config')
933 self.assertEqual(response['status_code'], requests.codes.ok)
934 tp = response['node']['ietf-network-topology:termination-point'][0]
935 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
936 self.assertNotIn('wavelength', dict.keys(
937 tp['org-openroadm-network-topology:xpdr-network-attributes']))
939 def test_60_check_openroadm_topo_ROADMA_SRG(self):
940 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-SRG1', 'config')
941 self.assertEqual(response['status_code'], requests.codes.ok)
942 freq_map = base64.b64decode(
943 response['node']['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
944 freq_map_array = [int(x) for x in freq_map]
945 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
946 liste_tp = response['node']['ietf-network-topology:termination-point']
948 if ele['tp-id'] == 'SRG1-PP1-TXRX':
949 freq_map = base64.b64decode(
950 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
951 freq_map_array = [int(x) for x in freq_map]
952 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
955 def test_61_check_openroadm_topo_ROADMA_DEG(self):
956 response = test_utils_rfc8040.get_ietf_network_node_request('openroadm-topology', 'ROADM-A1-DEG2', 'config')
957 self.assertEqual(response['status_code'], requests.codes.ok)
958 freq_map = base64.b64decode(
959 response['node']['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
960 freq_map_array = [int(x) for x in freq_map]
961 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
962 liste_tp = response['node']['ietf-network-topology:termination-point']
964 if ele['tp-id'] == 'DEG2-CTP-TXRX':
965 freq_map = base64.b64decode(
966 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
967 freq_map_array = [int(x) for x in freq_map]
968 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
969 if ele['tp-id'] == 'DEG2-TTP-TXRX':
970 freq_map = base64.b64decode(
971 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
972 freq_map_array = [int(x) for x in freq_map]
973 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
976 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
977 response = test_utils_rfc8040.transportpce_api_rpc_request(
978 'transportpce-networkutils', 'init-xpdr-rdm-links',
979 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
980 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
981 self.assertEqual(response['status_code'], requests.codes.ok)
983 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
984 response = test_utils_rfc8040.transportpce_api_rpc_request(
985 'transportpce-networkutils', 'init-rdm-xpdr-links',
986 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
987 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
988 self.assertEqual(response['status_code'], requests.codes.ok)
990 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
991 response = test_utils_rfc8040.transportpce_api_rpc_request(
992 'transportpce-networkutils', 'init-xpdr-rdm-links',
993 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
994 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
995 self.assertEqual(response['status_code'], requests.codes.ok)
997 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
998 response = test_utils_rfc8040.transportpce_api_rpc_request(
999 'transportpce-networkutils', 'init-rdm-xpdr-links',
1000 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1001 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1002 self.assertEqual(response['status_code'], requests.codes.ok)
1004 def test_66_create_OCH_OTU4_service_2(self):
1005 # pylint: disable=line-too-long
1006 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1007 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1008 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1009 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1010 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1011 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1012 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1013 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1014 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1015 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1016 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1017 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1018 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1019 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1020 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1021 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1022 response = test_utils.service_create_request(self.cr_serv_sample_data)
1023 self.assertEqual(response.status_code, requests.codes.ok)
1024 res = response.json()
1025 self.assertIn('PCE calculation in progress',
1026 res['output']['configuration-response-common']['response-message'])
1027 time.sleep(self.WAITING)
1029 def test_67_get_OCH_OTU4_service2(self):
1030 response = test_utils.get_service_list_request(
1031 "services/service2-OCH-OTU4")
1032 self.assertEqual(response.status_code, requests.codes.ok)
1033 res = response.json()
1035 res['services'][0]['administrative-state'], 'inService')
1037 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1039 res['services'][0]['connection-type'], 'infrastructure')
1041 res['services'][0]['lifecycle-state'], 'planned')
1044 def test_68_create_ODU4_service_2(self):
1045 # pylint: disable=line-too-long
1046 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1047 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1048 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1049 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1050 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1051 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1052 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1053 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1054 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1055 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1056 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1057 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1058 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1059 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1060 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1062 response = test_utils.service_create_request(self.cr_serv_sample_data)
1063 self.assertEqual(response.status_code, requests.codes.ok)
1064 res = response.json()
1065 self.assertIn('PCE calculation in progress',
1066 res['output']['configuration-response-common']['response-message'])
1067 time.sleep(self.WAITING)
1069 def test_69_get_ODU4_service2(self):
1070 response = test_utils.get_service_list_request(
1071 "services/service2-ODU4")
1072 self.assertEqual(response.status_code, requests.codes.ok)
1073 res = response.json()
1075 res['services'][0]['administrative-state'], 'inService')
1077 res['services'][0]['service-name'], 'service2-ODU4')
1079 res['services'][0]['connection-type'], 'infrastructure')
1081 res['services'][0]['lifecycle-state'], 'planned')
1084 def test_70_create_1GE_service(self):
1085 # pylint: disable=line-too-long
1086 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1087 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1088 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1089 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1090 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1091 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1092 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1093 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1094 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1095 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1096 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1097 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1098 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1099 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1100 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1101 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1102 response = test_utils.service_create_request(self.cr_serv_sample_data)
1103 self.assertEqual(response.status_code, requests.codes.ok)
1104 res = response.json()
1105 self.assertIn('PCE calculation in progress',
1106 res['output']['configuration-response-common']['response-message'])
1107 time.sleep(self.WAITING)
1109 def test_71_get_1GE_service1(self):
1110 response = test_utils.get_service_list_request("services/service1-1GE")
1111 self.assertEqual(response.status_code, requests.codes.ok)
1112 res = response.json()
1114 res['services'][0]['administrative-state'], 'inService')
1116 res['services'][0]['service-name'], 'service1-1GE')
1118 res['services'][0]['connection-type'], 'service')
1120 res['services'][0]['lifecycle-state'], 'planned')
1123 def test_72_check_interface_1GE_CLIENT_spdra(self):
1124 response = test_utils_rfc8040.check_node_attribute_request(
1125 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1126 self.assertEqual(response['status_code'], requests.codes.ok)
1127 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1128 'administrative-state': 'inService',
1129 'supporting-circuit-pack-name': 'CP1-SFP4',
1130 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1131 'supporting-port': 'CP1-SFP4-P1'
1133 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1134 response['interface'][0])
1137 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1139 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1140 response = test_utils_rfc8040.check_node_attribute_request(
1141 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1142 self.assertEqual(response['status_code'], requests.codes.ok)
1143 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1144 'administrative-state': 'inService',
1145 'supporting-circuit-pack-name': 'CP3-SFP1',
1146 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1147 'type': 'org-openroadm-interfaces:otnOdu',
1148 'supporting-port': 'CP3-SFP1-P1'}
1150 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1151 'rate': 'org-openroadm-otn-common-types:ODU0',
1152 'monitoring-mode': 'terminated'}
1153 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1154 response['interface'][0])
1155 self.assertDictEqual(dict(input_dict_2,
1156 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1157 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1158 self.assertDictEqual(
1159 {'payload-type': '07', 'exp-payload-type': '07'},
1160 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1162 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1163 response = test_utils_rfc8040.check_node_attribute_request(
1164 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1165 self.assertEqual(response['status_code'], requests.codes.ok)
1166 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1167 'administrative-state': 'inService',
1168 'supporting-circuit-pack-name': 'CP3-CFP0',
1169 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1170 'type': 'org-openroadm-interfaces:otnOdu',
1171 'supporting-port': 'CP3-CFP0-P1'}
1173 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1174 'rate': 'org-openroadm-otn-common-types:ODU0',
1175 'monitoring-mode': 'monitored'}
1176 input_dict_3 = {'trib-port-number': 1}
1177 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1178 response['interface'][0])
1179 self.assertDictEqual(dict(input_dict_2,
1180 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1181 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1182 self.assertDictEqual(dict(input_dict_3,
1183 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1184 'parent-odu-allocation']),
1185 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1186 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1189 def test_75_check_ODU0_connection_spdra(self):
1190 response = test_utils_rfc8040.check_node_attribute_request(
1191 'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1192 self.assertEqual(response['status_code'], requests.codes.ok)
1195 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1196 'direction': 'bidirectional'
1198 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1199 response['odu-connection'][0])
1200 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1201 response['odu-connection'][0]['destination'])
1202 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1203 response['odu-connection'][0]['source'])
1205 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1206 response = test_utils_rfc8040.check_node_attribute_request(
1207 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1208 self.assertEqual(response['status_code'], requests.codes.ok)
1209 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1210 'administrative-state': 'inService',
1211 'supporting-circuit-pack-name': 'CP3-SFP1',
1212 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1213 'supporting-port': 'CP3-SFP1-P1'
1215 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1216 response['interface'][0])
1219 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1221 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1222 response = test_utils_rfc8040.check_node_attribute_request(
1223 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1224 self.assertEqual(response['status_code'], requests.codes.ok)
1225 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1226 'administrative-state': 'inService',
1227 'supporting-circuit-pack-name': 'CP3-SFP1',
1228 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1229 'type': 'org-openroadm-interfaces:otnOdu',
1230 'supporting-port': 'CP3-SFP1-P1'}
1232 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1233 'rate': 'org-openroadm-otn-common-types:ODU0',
1234 'monitoring-mode': 'terminated'}
1235 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1236 response['interface'][0])
1237 self.assertDictEqual(dict(input_dict_2,
1238 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1239 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1240 self.assertDictEqual(
1241 {'payload-type': '07', 'exp-payload-type': '07'},
1242 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1244 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1245 response = test_utils_rfc8040.check_node_attribute_request(
1246 'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1247 self.assertEqual(response['status_code'], requests.codes.ok)
1248 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1249 'administrative-state': 'inService',
1250 'supporting-circuit-pack-name': 'CP3-CFP0',
1251 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1252 'type': 'org-openroadm-interfaces:otnOdu',
1253 'supporting-port': 'CP3-CFP0-P1'}
1255 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1256 'rate': 'org-openroadm-otn-common-types:ODU0',
1257 'monitoring-mode': 'monitored'}
1258 input_dict_3 = {'trib-port-number': 1}
1259 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1260 response['interface'][0])
1261 self.assertDictEqual(dict(input_dict_2,
1262 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1263 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1264 self.assertDictEqual(dict(input_dict_3,
1265 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1266 'parent-odu-allocation']),
1267 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1268 'parent-odu-allocation'])
1270 response['interface'][0][
1271 'org-openroadm-otn-odu-interfaces:odu'][
1272 'parent-odu-allocation']['trib-slots'])
1274 def test_79_check_ODU0_connection_spdrc(self):
1275 response = test_utils_rfc8040.check_node_attribute_request(
1276 'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1277 self.assertEqual(response['status_code'], requests.codes.ok)
1280 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1281 'direction': 'bidirectional'
1283 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1284 response['odu-connection'][0])
1285 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1286 response['odu-connection'][0]['destination'])
1287 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1288 response['odu-connection'][0]['source'])
1290 def test_80_check_otn_topo_links(self):
1291 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1292 self.assertEqual(response['status_code'], requests.codes.ok)
1293 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1294 for link in response['network'][0]['ietf-network-topology:link']:
1295 linkId = link['link-id']
1296 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1297 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1299 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1301 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1303 def test_81_check_otn_topo_tp(self):
1304 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1305 self.assertEqual(response['status_code'], requests.codes.ok)
1306 for node in response['network'][0]['node']:
1307 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1308 tpList = node['ietf-network-topology:termination-point']
1310 if tp['tp-id'] == 'XPDR3-NETWORK1':
1311 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1312 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1313 tsPoolList = list(range(1, 2))
1315 tsPoolList, xpdrTpPortConAt['ts-pool'])
1317 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1319 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1321 def test_82_delete_1GE_service(self):
1322 response = test_utils.service_delete_request("service1-1GE")
1323 self.assertEqual(response.status_code, requests.codes.ok)
1324 res = response.json()
1325 self.assertIn('Renderer service delete in progress',
1326 res['output']['configuration-response-common']['response-message'])
1327 time.sleep(self.WAITING)
1329 def test_83_check_service_list(self):
1330 response = test_utils.get_service_list_request("")
1331 self.assertEqual(response.status_code, requests.codes.ok)
1332 res = response.json()
1333 self.assertEqual(len(res['service-list']['services']), 2)
1336 def test_84_check_no_ODU0_connection_spdra(self):
1337 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1338 self.assertEqual(response.status_code, requests.codes.ok)
1339 res = response.json()
1340 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1343 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1344 response = test_utils_rfc8040.check_node_attribute_request(
1345 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1346 self.assertEqual(response['status_code'], requests.codes.conflict)
1348 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1349 response = test_utils_rfc8040.check_node_attribute_request(
1350 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1351 self.assertEqual(response['status_code'], requests.codes.conflict)
1353 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1354 response = test_utils_rfc8040.check_node_attribute_request(
1355 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1356 self.assertEqual(response['status_code'], requests.codes.conflict)
1358 def test_88_check_otn_topo_links(self):
1359 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1360 self.assertEqual(response['status_code'], requests.codes.ok)
1361 self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
1362 for link in response['network'][0]['ietf-network-topology:link']:
1363 linkId = link['link-id']
1364 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1365 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1367 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1369 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1371 def test_89_check_otn_topo_tp(self):
1372 response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
1373 self.assertEqual(response['status_code'], requests.codes.ok)
1374 for node in response['network'][0]['node']:
1375 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1376 tpList = node['ietf-network-topology:termination-point']
1378 if tp['tp-id'] == 'XPDR3-NETWORK1':
1379 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1380 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1382 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1384 def test_90_delete_ODU4_service(self):
1385 response = test_utils.service_delete_request("service2-ODU4")
1386 self.assertEqual(response.status_code, requests.codes.ok)
1387 res = response.json()
1388 self.assertIn('Renderer service delete in progress',
1389 res['output']['configuration-response-common']['response-message'])
1390 time.sleep(self.WAITING)
1392 def test_91_delete_OCH_OTU4_service(self):
1393 response = test_utils.service_delete_request("service2-OCH-OTU4")
1394 self.assertEqual(response.status_code, requests.codes.ok)
1395 res = response.json()
1396 self.assertIn('Renderer service delete in progress',
1397 res['output']['configuration-response-common']['response-message'])
1398 time.sleep(self.WAITING)
1400 def test_92_disconnect_xponders_from_roadm(self):
1401 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1402 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1403 self.assertEqual(response['status_code'], requests.codes.ok)
1404 links = response['network'][0]['ietf-network-topology:link']
1406 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1407 link_name = link["link-id"]
1408 response = test_utils.delete_request(url+link_name)
1409 self.assertEqual(response.status_code, requests.codes.ok)
1411 def test_93_check_openroadm_topology(self):
1412 response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
1413 self.assertEqual(response['status_code'], requests.codes.ok)
1414 self.assertEqual(18,
1415 len(response['network'][0]['ietf-network-topology:link']),
1416 'Topology should contain 18 links')
1418 def test_94_disconnect_spdrA(self):
1419 response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1420 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1422 def test_95_disconnect_spdrC(self):
1423 response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1424 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1426 def test_96_disconnect_roadmA(self):
1427 response = test_utils_rfc8040.unmount_device("ROADM-A1")
1428 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1430 def test_97_disconnect_roadmC(self):
1431 response = test_utils_rfc8040.unmount_device("ROADM-C1")
1432 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1435 if __name__ == "__main__":
1436 unittest.main(verbosity=2)