3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
26 import test_utils_rfc8040 # nopep8
29 class TransportPCEtesting(unittest.TestCase):
32 WAITING = 20 # nominal value is 300
33 NODE_VERSION = '2.2.1'
35 cr_serv_sample_data = {"input": {
36 "sdnc-request-header": {
37 "request-id": "request-1",
38 "rpc-action": "service-create",
39 "request-system-id": "appname"
41 "service-name": "service1-OCH-OTU4",
42 "common-id": "commonId",
43 "connection-type": "infrastructure",
45 "service-rate": "100",
46 "node-id": "SPDR-SA1",
47 "service-format": "OTU",
48 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
52 "port-device-name": "SPDR-SA1-XPDR1",
54 "port-name": "XPDR1-NETWORK1",
55 "port-rack": "000000.00",
56 "port-shelf": "Chassis#1"
59 "lgx-device-name": "Some lgx-device-name",
60 "lgx-port-name": "Some lgx-port-name",
61 "lgx-port-rack": "000000.00",
62 "lgx-port-shelf": "00"
68 "port-device-name": "SPDR-SA1-XPDR1",
70 "port-name": "XPDR1-NETWORK1",
71 "port-rack": "000000.00",
72 "port-shelf": "Chassis#1"
75 "lgx-device-name": "Some lgx-device-name",
76 "lgx-port-name": "Some lgx-port-name",
77 "lgx-port-rack": "000000.00",
78 "lgx-port-shelf": "00"
85 "service-rate": "100",
86 "node-id": "SPDR-SC1",
87 "service-format": "OTU",
88 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
92 "port-device-name": "SPDR-SC1-XPDR1",
94 "port-name": "XPDR1-NETWORK1",
95 "port-rack": "000000.00",
96 "port-shelf": "Chassis#1"
99 "lgx-device-name": "Some lgx-device-name",
100 "lgx-port-name": "Some lgx-port-name",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
108 "port-device-name": "SPDR-SC1-XPDR1",
109 "port-type": "fixed",
110 "port-name": "XPDR1-NETWORK1",
111 "port-rack": "000000.00",
112 "port-shelf": "Chassis#1"
115 "lgx-device-name": "Some lgx-device-name",
116 "lgx-port-name": "Some lgx-port-name",
117 "lgx-port-rack": "000000.00",
118 "lgx-port-shelf": "00"
124 "due-date": "2018-06-15T00:00:01Z",
125 "operator-contact": "pw1234"
131 cls.processes = test_utils_rfc8040.start_tpce()
132 cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
133 ('roadma', cls.NODE_VERSION),
134 ('roadmc', cls.NODE_VERSION),
135 ('spdrc', cls.NODE_VERSION)])
138 def tearDownClass(cls):
139 # pylint: disable=not-an-iterable
140 for process in cls.processes:
141 test_utils_rfc8040.shutdown_process(process)
142 print("all processes killed")
147 def test_01_connect_spdrA(self):
148 response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
149 self.assertEqual(response.status_code,
150 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
152 def test_02_connect_spdrC(self):
153 response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
154 self.assertEqual(response.status_code,
155 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
157 def test_03_connect_rdmA(self):
158 response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
159 self.assertEqual(response.status_code,
160 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
162 def test_04_connect_rdmC(self):
163 response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
164 self.assertEqual(response.status_code,
165 requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
167 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
168 response = test_utils_rfc8040.transportpce_api_rpc_request(
169 'transportpce-networkutils', 'init-xpdr-rdm-links',
170 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
171 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
172 self.assertEqual(response['status_code'], requests.codes.ok)
174 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
175 response = test_utils_rfc8040.transportpce_api_rpc_request(
176 'transportpce-networkutils', 'init-rdm-xpdr-links',
177 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
178 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
179 self.assertEqual(response['status_code'], requests.codes.ok)
181 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
182 response = test_utils_rfc8040.transportpce_api_rpc_request(
183 'transportpce-networkutils', 'init-xpdr-rdm-links',
184 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
185 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
186 self.assertEqual(response['status_code'], requests.codes.ok)
188 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
189 response = test_utils_rfc8040.transportpce_api_rpc_request(
190 'transportpce-networkutils', 'init-rdm-xpdr-links',
191 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
192 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
193 self.assertEqual(response['status_code'], requests.codes.ok)
195 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
196 # Config ROADMA-ROADMC oms-attributes
198 "auto-spanloss": "true",
199 "spanloss-base": 11.4,
200 "spanloss-current": 12,
201 "engineered-spanloss": 12.2,
202 "link-concatenation": [{
205 "SRLG-length": 100000,
207 response = test_utils_rfc8040.add_oms_attr_request(
208 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
209 self.assertEqual(response.status_code, requests.codes.created)
211 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
212 # Config ROADMC-ROADMA oms-attributes
214 "auto-spanloss": "true",
215 "spanloss-base": 11.4,
216 "spanloss-current": 12,
217 "engineered-spanloss": 12.2,
218 "link-concatenation": [{
221 "SRLG-length": 100000,
223 response = test_utils_rfc8040.add_oms_attr_request(
224 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
225 self.assertEqual(response.status_code, requests.codes.created)
227 # test service-create for OCH-OTU4 service from spdr to spdr
228 def test_11_check_otn_topology(self):
229 response = test_utils.get_otn_topo_request()
230 self.assertEqual(response.status_code, requests.codes.ok)
231 res = response.json()
232 nbNode = len(res['network'][0]['node'])
233 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
234 self.assertNotIn('ietf-network-topology:link', res['network'][0])
236 def test_12_create_OCH_OTU4_service(self):
237 response = test_utils.service_create_request(self.cr_serv_sample_data)
238 self.assertEqual(response.status_code, requests.codes.ok)
239 res = response.json()
240 self.assertIn('PCE calculation in progress',
241 res['output']['configuration-response-common']['response-message'])
242 time.sleep(self.WAITING)
244 def test_13_get_OCH_OTU4_service1(self):
245 response = test_utils.get_service_list_request(
246 "services/service1-OCH-OTU4")
247 self.assertEqual(response.status_code, requests.codes.ok)
248 res = response.json()
250 res['services'][0]['administrative-state'], 'inService')
252 res['services'][0]['service-name'], 'service1-OCH-OTU4')
254 res['services'][0]['connection-type'], 'infrastructure')
256 res['services'][0]['lifecycle-state'], 'planned')
259 # Check correct configuration of devices
260 def test_14_check_interface_och_spdra(self):
261 response = test_utils_rfc8040.check_node_attribute_request(
262 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
263 self.assertEqual(response['status_code'], requests.codes.ok)
264 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
265 'administrative-state': 'inService',
266 'supporting-circuit-pack-name': 'CP1-CFP0',
267 'type': 'org-openroadm-interfaces:opticalChannel',
268 'supporting-port': 'CP1-CFP0-P1'
269 }, **response['interface'][0]),
270 response['interface'][0])
271 self.assertEqual('org-openroadm-common-types:R100G',
272 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
273 self.assertEqual('dp-qpsk',
274 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
275 self.assertEqual(196.1,
276 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
279 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
281 def test_15_check_interface_OTU4_spdra(self):
282 response = test_utils_rfc8040.check_node_attribute_request(
283 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
284 self.assertEqual(response['status_code'], requests.codes.ok)
285 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
286 'administrative-state': 'inService',
287 'supporting-circuit-pack-name': 'CP1-CFP0',
288 'supporting-interface': 'XPDR1-NETWORK1-761:768',
289 'type': 'org-openroadm-interfaces:otnOtu',
290 'supporting-port': 'CP1-CFP0-P1'
292 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
293 'expected-dapi': 'H/OelLynehI=',
294 'tx-dapi': 'AMf1n5hK6Xkk',
295 'expected-sapi': 'AMf1n5hK6Xkk',
296 'rate': 'org-openroadm-otn-common-types:OTU4',
299 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
300 response['interface'][0])
301 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
302 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
304 def test_16_check_interface_och_spdrc(self):
305 response = test_utils_rfc8040.check_node_attribute_request(
306 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
307 self.assertEqual(response['status_code'], requests.codes.ok)
308 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
309 'administrative-state': 'inService',
310 'supporting-circuit-pack-name': 'CP1-CFP0',
311 'type': 'org-openroadm-interfaces:opticalChannel',
312 'supporting-port': 'CP1-CFP0-P1'
313 }, **response['interface'][0]),
314 response['interface'][0])
315 self.assertEqual('org-openroadm-common-types:R100G',
316 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
317 self.assertEqual('dp-qpsk',
318 response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
319 self.assertEqual(196.1,
320 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
323 float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
325 def test_17_check_interface_OTU4_spdrc(self):
326 response = test_utils_rfc8040.check_node_attribute_request(
327 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
328 self.assertEqual(response['status_code'], requests.codes.ok)
329 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
330 'administrative-state': 'inService',
331 'supporting-circuit-pack-name': 'CP1-CFP0',
332 'supporting-interface': 'XPDR1-NETWORK1-761:768',
333 'type': 'org-openroadm-interfaces:otnOtu',
334 'supporting-port': 'CP1-CFP0-P1'
336 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
337 'expected-sapi': 'H/OelLynehI=',
338 'tx-sapi': 'AMf1n5hK6Xkk',
339 'expected-dapi': 'AMf1n5hK6Xkk',
340 'rate': 'org-openroadm-otn-common-types:OTU4',
343 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
344 response['interface'][0])
345 self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
346 response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
348 def test_18_check_no_interface_ODU4_spdra(self):
349 response = test_utils_rfc8040.check_node_attribute_request(
350 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
351 self.assertEqual(response['status_code'], requests.codes.conflict)
353 def test_19_check_openroadm_topo_spdra(self):
354 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
358 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
359 self.assertEqual({'frequency': 196.1,
361 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
364 def test_20_check_openroadm_topo_ROADMA_SRG(self):
365 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 freq_map = base64.b64decode(
369 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
370 freq_map_array = [int(x) for x in freq_map]
371 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
372 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
374 if ele['tp-id'] == 'SRG1-PP1-TXRX':
375 freq_map = base64.b64decode(
376 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
377 freq_map_array = [int(x) for x in freq_map]
378 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
379 if ele['tp-id'] == 'SRG1-PP2-TXRX':
380 self.assertNotIn('avail-freq-maps', dict.keys(ele))
383 def test_21_check_openroadm_topo_ROADMA_DEG(self):
384 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
385 self.assertEqual(response.status_code, requests.codes.ok)
386 res = response.json()
387 freq_map = base64.b64decode(
388 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
389 freq_map_array = [int(x) for x in freq_map]
390 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
391 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
393 if ele['tp-id'] == 'DEG2-CTP-TXRX':
394 freq_map = base64.b64decode(
395 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
396 freq_map_array = [int(x) for x in freq_map]
397 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
398 if ele['tp-id'] == 'DEG2-TTP-TXRX':
399 freq_map = base64.b64decode(
400 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
401 freq_map_array = [int(x) for x in freq_map]
402 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
405 def test_22_check_otn_topo_otu4_links(self):
406 response = test_utils.get_otn_topo_request()
407 self.assertEqual(response.status_code, requests.codes.ok)
408 res = response.json()
409 nb_links = len(res['network'][0]['ietf-network-topology:link'])
410 self.assertEqual(nb_links, 2)
411 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
412 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
413 for link in res['network'][0]['ietf-network-topology:link']:
414 self.assertIn(link['link-id'], listLinkId)
416 link['transportpce-topology:otn-link-type'], 'OTU4')
418 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
420 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
422 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
424 link['org-openroadm-common-network:opposite-link'], listLinkId)
426 # test service-create for ODU4 service from spdr to spdr
427 def test_23_create_ODU4_service(self):
428 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
429 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
430 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
431 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
432 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
433 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
434 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
436 response = test_utils.service_create_request(self.cr_serv_sample_data)
437 self.assertEqual(response.status_code, requests.codes.ok)
438 res = response.json()
439 self.assertIn('PCE calculation in progress',
440 res['output']['configuration-response-common']['response-message'])
441 time.sleep(self.WAITING)
443 def test_24_get_ODU4_service1(self):
444 response = test_utils.get_service_list_request(
445 "services/service1-ODU4")
446 self.assertEqual(response.status_code, requests.codes.ok)
447 res = response.json()
449 res['services'][0]['administrative-state'], 'inService')
451 res['services'][0]['service-name'], 'service1-ODU4')
453 res['services'][0]['connection-type'], 'infrastructure')
455 res['services'][0]['lifecycle-state'], 'planned')
458 def test_25_check_interface_ODU4_spdra(self):
459 response = test_utils_rfc8040.check_node_attribute_request(
460 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
461 self.assertEqual(response['status_code'], requests.codes.ok)
462 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
463 'administrative-state': 'inService',
464 'supporting-circuit-pack-name': 'CP1-CFP0',
465 'supporting-interface': 'XPDR1-NETWORK1-OTU',
466 'type': 'org-openroadm-interfaces:otnOdu',
467 'supporting-port': 'CP1-CFP0-P1',
469 'description': 'TBD'}
470 # SAPI/DAPI are added in the Otu4 renderer
471 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
472 'rate': 'org-openroadm-otn-common-types:ODU4',
473 'monitoring-mode': 'terminated',
474 'expected-dapi': 'H/OelLynehI=',
475 'expected-sapi': 'AMf1n5hK6Xkk',
476 'tx-dapi': 'AMf1n5hK6Xkk',
477 'tx-sapi': 'H/OelLynehI='}
479 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
480 response['interface'][0])
481 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
483 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
485 self.assertDictEqual(
486 {'payload-type': '21', 'exp-payload-type': '21'},
487 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
489 def test_26_check_interface_ODU4_spdrc(self):
490 response = test_utils_rfc8040.check_node_attribute_request(
491 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
492 self.assertEqual(response['status_code'], requests.codes.ok)
493 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
494 'administrative-state': 'inService',
495 'supporting-circuit-pack-name': 'CP1-CFP0',
496 'supporting-interface': 'XPDR1-NETWORK1-OTU',
497 'type': 'org-openroadm-interfaces:otnOdu',
498 'supporting-port': 'CP1-CFP0-P1',
500 'description': 'TBD'}
501 # SAPI/DAPI are added in the Otu4 renderer
502 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
503 'rate': 'org-openroadm-otn-common-types:ODU4',
504 'monitoring-mode': 'terminated',
505 'tx-sapi': 'AMf1n5hK6Xkk',
506 'tx-dapi': 'H/OelLynehI=',
507 'expected-sapi': 'H/OelLynehI=',
508 'expected-dapi': 'AMf1n5hK6Xkk'
510 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
511 response['interface'][0])
512 self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
514 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
516 self.assertDictEqual(
517 {'payload-type': '21', 'exp-payload-type': '21'},
518 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
520 def test_27_check_otn_topo_links(self):
521 response = test_utils.get_otn_topo_request()
522 self.assertEqual(response.status_code, requests.codes.ok)
523 res = response.json()
524 nb_links = len(res['network'][0]['ietf-network-topology:link'])
525 self.assertEqual(nb_links, 4)
526 for link in res['network'][0]['ietf-network-topology:link']:
527 linkId = link['link-id']
528 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
529 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
531 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
533 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
534 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
535 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
537 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
539 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
541 link['transportpce-topology:otn-link-type'], 'ODTU4')
543 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
544 self.assertIn(link['org-openroadm-common-network:opposite-link'],
545 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
546 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
548 self.fail("this link should not exist")
550 def test_28_check_otn_topo_tp(self):
551 response = test_utils.get_otn_topo_request()
552 res = response.json()
553 for node in res['network'][0]['node']:
554 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
555 tpList = node['ietf-network-topology:termination-point']
557 if tp['tp-id'] == 'XPDR1-NETWORK1':
558 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
559 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
561 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
562 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
563 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
565 # test service-create for 10GE service from spdr to spdr
566 def test_29_create_10GE_service(self):
567 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
568 self.cr_serv_sample_data["input"]["connection-type"] = "service"
569 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
570 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
571 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
572 self.cr_serv_sample_data["input"]["service-a-end"]["ethernet-encoding"] = "10GBASE-R"
573 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
574 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
575 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
576 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
577 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
578 self.cr_serv_sample_data["input"]["service-z-end"]["ethernet-encoding"] = "10GBASE-R"
579 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
580 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR1-CLIENT1"
581 response = test_utils.service_create_request(self.cr_serv_sample_data)
582 self.assertEqual(response.status_code, requests.codes.ok)
583 res = response.json()
584 self.assertIn('PCE calculation in progress',
585 res['output']['configuration-response-common']['response-message'])
586 time.sleep(self.WAITING)
588 def test_30_get_10GE_service1(self):
589 response = test_utils.get_service_list_request(
590 "services/service1-10GE")
591 self.assertEqual(response.status_code, requests.codes.ok)
592 res = response.json()
594 res['services'][0]['administrative-state'], 'inService')
596 res['services'][0]['service-name'], 'service1-10GE')
598 res['services'][0]['connection-type'], 'service')
600 res['services'][0]['lifecycle-state'], 'planned')
603 def test_31_check_interface_10GE_CLIENT_spdra(self):
604 response = test_utils_rfc8040.check_node_attribute_request(
605 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
606 self.assertEqual(response['status_code'], requests.codes.ok)
607 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
608 'administrative-state': 'inService',
609 'supporting-circuit-pack-name': 'CP1-SFP4',
610 'type': 'org-openroadm-interfaces:ethernetCsmacd',
611 'supporting-port': 'CP1-SFP4-P1'
613 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
614 response['interface'][0])
617 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
619 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
620 response = test_utils_rfc8040.check_node_attribute_request(
621 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
622 self.assertEqual(response['status_code'], requests.codes.ok)
623 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
624 'administrative-state': 'inService',
625 'supporting-circuit-pack-name': 'CP1-SFP4',
626 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
627 'type': 'org-openroadm-interfaces:otnOdu',
628 'supporting-port': 'CP1-SFP4-P1'}
630 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
631 'rate': 'org-openroadm-otn-common-types:ODU2e',
632 'monitoring-mode': 'terminated'}
634 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
635 response['interface'][0])
636 self.assertDictEqual(dict(input_dict_2,
637 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
638 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
639 self.assertDictEqual(
640 {'payload-type': '03', 'exp-payload-type': '03'},
641 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
643 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
644 response = test_utils_rfc8040.check_node_attribute_request(
645 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
646 self.assertEqual(response['status_code'], requests.codes.ok)
647 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
648 'administrative-state': 'inService',
649 'supporting-circuit-pack-name': 'CP1-CFP0',
650 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
651 'type': 'org-openroadm-interfaces:otnOdu',
652 'supporting-port': 'CP1-CFP0-P1'}
654 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
655 'rate': 'org-openroadm-otn-common-types:ODU2e',
656 'monitoring-mode': 'monitored'}
657 input_dict_3 = {'trib-port-number': 1}
659 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
660 response['interface'][0])
661 self.assertDictEqual(dict(input_dict_2,
662 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
663 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
664 self.assertDictEqual(dict(input_dict_3,
665 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
666 'parent-odu-allocation']),
667 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
668 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
671 def test_34_check_ODU2E_connection_spdra(self):
672 response = test_utils_rfc8040.check_node_attribute_request(
673 'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
674 self.assertEqual(response['status_code'], requests.codes.ok)
677 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
678 'direction': 'bidirectional'
681 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
682 response['odu-connection'][0])
683 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
684 response['odu-connection'][0]['destination'])
685 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
686 response['odu-connection'][0]['source'])
688 def test_35_check_interface_10GE_CLIENT_spdrc(self):
689 response = test_utils_rfc8040.check_node_attribute_request(
690 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
691 self.assertEqual(response['status_code'], requests.codes.ok)
692 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
693 'administrative-state': 'inService',
694 'supporting-circuit-pack-name': 'CP1-SFP4',
695 'type': 'org-openroadm-interfaces:ethernetCsmacd',
696 'supporting-port': 'CP1-SFP4-P1'
698 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
699 response['interface'][0])
702 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
704 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
705 response = test_utils_rfc8040.check_node_attribute_request(
706 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
707 self.assertEqual(response['status_code'], requests.codes.ok)
708 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
709 'administrative-state': 'inService',
710 'supporting-circuit-pack-name': 'CP1-SFP4',
711 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
712 'type': 'org-openroadm-interfaces:otnOdu',
713 'supporting-port': 'CP1-SFP4-P1'}
715 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
716 'rate': 'org-openroadm-otn-common-types:ODU2e',
717 'monitoring-mode': 'terminated'}
719 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
720 response['interface'][0])
721 self.assertDictEqual(dict(input_dict_2,
722 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
723 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
724 self.assertDictEqual(
725 {'payload-type': '03', 'exp-payload-type': '03'},
726 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
728 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
729 response = test_utils_rfc8040.check_node_attribute_request(
730 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
731 self.assertEqual(response['status_code'], requests.codes.ok)
732 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
733 'administrative-state': 'inService',
734 'supporting-circuit-pack-name': 'CP1-CFP0',
735 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
736 'type': 'org-openroadm-interfaces:otnOdu',
737 'supporting-port': 'CP1-CFP0-P1'}
739 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
740 'rate': 'org-openroadm-otn-common-types:ODU2e',
741 'monitoring-mode': 'monitored'}
743 input_dict_3 = {'trib-port-number': 1}
745 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
746 response['interface'][0])
747 self.assertDictEqual(dict(input_dict_2,
748 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
749 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
750 self.assertDictEqual(dict(input_dict_3,
751 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
752 'parent-odu-allocation']),
753 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
754 'parent-odu-allocation'])
756 response['interface'][0][
757 'org-openroadm-otn-odu-interfaces:odu'][
758 'parent-odu-allocation']['trib-slots'])
760 def test_38_check_ODU2E_connection_spdrc(self):
761 response = test_utils_rfc8040.check_node_attribute_request(
762 'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
763 self.assertEqual(response['status_code'], requests.codes.ok)
766 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
767 'direction': 'bidirectional'
770 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
771 response['odu-connection'][0])
772 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
773 response['odu-connection'][0]['destination'])
774 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
775 response['odu-connection'][0]['source'])
777 def test_39_check_otn_topo_links(self):
778 response = test_utils.get_otn_topo_request()
779 self.assertEqual(response.status_code, requests.codes.ok)
780 res = response.json()
781 nb_links = len(res['network'][0]['ietf-network-topology:link'])
782 self.assertEqual(nb_links, 4)
783 for link in res['network'][0]['ietf-network-topology:link']:
784 linkId = link['link-id']
785 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
786 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
788 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
790 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
792 def test_40_check_otn_topo_tp(self):
793 response = test_utils.get_otn_topo_request()
794 res = response.json()
795 for node in res['network'][0]['node']:
796 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
797 tpList = node['ietf-network-topology:termination-point']
799 if tp['tp-id'] == 'XPDR1-NETWORK1':
800 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
801 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
802 tsPoolList = list(range(1, 9))
804 tsPoolList, xpdrTpPortConAt['ts-pool'])
806 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
808 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
810 def test_41_delete_10GE_service(self):
811 response = test_utils.service_delete_request("service1-10GE")
812 self.assertEqual(response.status_code, requests.codes.ok)
813 res = response.json()
814 self.assertIn('Renderer service delete in progress',
815 res['output']['configuration-response-common']['response-message'])
816 time.sleep(self.WAITING)
818 def test_42_check_service_list(self):
819 response = test_utils.get_service_list_request("")
820 self.assertEqual(response.status_code, requests.codes.ok)
821 res = response.json()
822 self.assertEqual(len(res['service-list']['services']), 2)
825 def test_43_check_no_ODU2e_connection_spdra(self):
826 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
827 self.assertEqual(response.status_code, requests.codes.ok)
828 res = response.json()
829 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
832 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
833 response = test_utils_rfc8040.check_node_attribute_request(
834 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
835 self.assertEqual(response['status_code'], requests.codes.conflict)
837 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
838 response = test_utils_rfc8040.check_node_attribute_request(
839 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
840 self.assertEqual(response['status_code'], requests.codes.conflict)
842 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
843 response = test_utils_rfc8040.check_node_attribute_request(
844 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
845 self.assertEqual(response['status_code'], requests.codes.conflict)
847 def test_47_check_otn_topo_links(self):
848 response = test_utils.get_otn_topo_request()
849 self.assertEqual(response.status_code, requests.codes.ok)
850 res = response.json()
851 nb_links = len(res['network'][0]['ietf-network-topology:link'])
852 self.assertEqual(nb_links, 4)
853 for link in res['network'][0]['ietf-network-topology:link']:
854 linkId = link['link-id']
855 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
856 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
858 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
860 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
862 def test_48_check_otn_topo_tp(self):
863 response = test_utils.get_otn_topo_request()
864 res = response.json()
865 for node in res['network'][0]['node']:
866 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
867 tpList = node['ietf-network-topology:termination-point']
869 if tp['tp-id'] == 'XPDR1-NETWORK1':
870 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
871 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
873 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
875 def test_49_delete_ODU4_service(self):
876 response = test_utils.service_delete_request("service1-ODU4")
877 self.assertEqual(response.status_code, requests.codes.ok)
878 res = response.json()
879 self.assertIn('Renderer service delete in progress',
880 res['output']['configuration-response-common']['response-message'])
881 time.sleep(self.WAITING)
883 def test_50_check_service_list(self):
884 response = test_utils.get_service_list_request("")
885 self.assertEqual(response.status_code, requests.codes.ok)
886 res = response.json()
887 self.assertEqual(len(res['service-list']['services']), 1)
890 def test_51_check_no_interface_ODU4_spdra(self):
891 response = test_utils_rfc8040.check_node_attribute_request(
892 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
893 self.assertEqual(response['status_code'], requests.codes.conflict)
895 def test_52_check_otn_topo_links(self):
896 self.test_22_check_otn_topo_otu4_links()
898 def test_53_check_otn_topo_tp(self):
899 response = test_utils.get_otn_topo_request()
900 res = response.json()
901 for node in res['network'][0]['node']:
902 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
903 tpList = node['ietf-network-topology:termination-point']
905 if tp['tp-id'] == 'XPDR1-NETWORK1':
906 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
907 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
909 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
911 def test_54_delete_OCH_OTU4_service(self):
912 response = test_utils.service_delete_request("service1-OCH-OTU4")
913 self.assertEqual(response.status_code, requests.codes.ok)
914 res = response.json()
915 self.assertIn('Renderer service delete in progress',
916 res['output']['configuration-response-common']['response-message'])
917 time.sleep(self.WAITING)
919 def test_55_get_no_service(self):
920 response = test_utils.get_service_list_request("")
921 self.assertEqual(response.status_code, requests.codes.conflict)
922 res = response.json()
924 {"error-type": "application", "error-tag": "data-missing",
925 "error-message": "Request could not be completed because the relevant data model content does not exist"},
926 res['errors']['error'])
929 def test_56_check_no_interface_OTU4_spdra(self):
930 response = test_utils_rfc8040.check_node_attribute_request(
931 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
932 self.assertEqual(response['status_code'], requests.codes.conflict)
934 def test_57_check_no_interface_OCH_spdra(self):
935 response = test_utils_rfc8040.check_node_attribute_request(
936 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
937 self.assertEqual(response['status_code'], requests.codes.conflict)
939 def test_58_getLinks_OtnTopology(self):
940 response = test_utils.get_otn_topo_request()
941 self.assertEqual(response.status_code, requests.codes.ok)
942 res = response.json()
943 self.assertNotIn('ietf-network-topology:link', res['network'][0])
945 def test_59_check_openroadm_topo_spdra(self):
946 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
947 self.assertEqual(response.status_code, requests.codes.ok)
948 res = response.json()
949 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
950 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
951 self.assertNotIn('wavelength', dict.keys(
952 tp['org-openroadm-network-topology:xpdr-network-attributes']))
955 def test_60_check_openroadm_topo_ROADMA_SRG(self):
956 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
957 self.assertEqual(response.status_code, requests.codes.ok)
958 res = response.json()
959 freq_map = base64.b64decode(
960 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
961 freq_map_array = [int(x) for x in freq_map]
962 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
963 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
965 if ele['tp-id'] == 'SRG1-PP1-TXRX':
966 freq_map = base64.b64decode(
967 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
968 freq_map_array = [int(x) for x in freq_map]
969 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
972 def test_61_check_openroadm_topo_ROADMA_DEG(self):
973 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
974 self.assertEqual(response.status_code, requests.codes.ok)
975 res = response.json()
976 freq_map = base64.b64decode(
977 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
978 freq_map_array = [int(x) for x in freq_map]
979 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
980 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
982 if ele['tp-id'] == 'DEG2-CTP-TXRX':
983 freq_map = base64.b64decode(
984 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
985 freq_map_array = [int(x) for x in freq_map]
986 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
987 if ele['tp-id'] == 'DEG2-TTP-TXRX':
988 freq_map = base64.b64decode(
989 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
990 freq_map_array = [int(x) for x in freq_map]
991 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
994 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
995 response = test_utils_rfc8040.transportpce_api_rpc_request(
996 'transportpce-networkutils', 'init-xpdr-rdm-links',
997 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
998 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
999 self.assertEqual(response['status_code'], requests.codes.ok)
1001 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1002 response = test_utils_rfc8040.transportpce_api_rpc_request(
1003 'transportpce-networkutils', 'init-rdm-xpdr-links',
1004 {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
1005 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1006 self.assertEqual(response['status_code'], requests.codes.ok)
1008 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1009 response = test_utils_rfc8040.transportpce_api_rpc_request(
1010 'transportpce-networkutils', 'init-xpdr-rdm-links',
1011 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1012 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1013 self.assertEqual(response['status_code'], requests.codes.ok)
1015 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1016 response = test_utils_rfc8040.transportpce_api_rpc_request(
1017 'transportpce-networkutils', 'init-rdm-xpdr-links',
1018 {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
1019 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
1020 self.assertEqual(response['status_code'], requests.codes.ok)
1022 def test_66_create_OCH_OTU4_service_2(self):
1023 # pylint: disable=line-too-long
1024 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1025 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1026 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1027 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1028 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1029 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1030 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1031 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1032 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1033 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1034 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1035 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1036 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1037 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1038 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1039 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1040 response = test_utils.service_create_request(self.cr_serv_sample_data)
1041 self.assertEqual(response.status_code, requests.codes.ok)
1042 res = response.json()
1043 self.assertIn('PCE calculation in progress',
1044 res['output']['configuration-response-common']['response-message'])
1045 time.sleep(self.WAITING)
1047 def test_67_get_OCH_OTU4_service2(self):
1048 response = test_utils.get_service_list_request(
1049 "services/service2-OCH-OTU4")
1050 self.assertEqual(response.status_code, requests.codes.ok)
1051 res = response.json()
1053 res['services'][0]['administrative-state'], 'inService')
1055 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1057 res['services'][0]['connection-type'], 'infrastructure')
1059 res['services'][0]['lifecycle-state'], 'planned')
1062 def test_68_create_ODU4_service_2(self):
1063 # pylint: disable=line-too-long
1064 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1065 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1066 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1067 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1068 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1069 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1070 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1071 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1072 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1073 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1074 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1075 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1076 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1077 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-NETWORK1"
1078 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1080 response = test_utils.service_create_request(self.cr_serv_sample_data)
1081 self.assertEqual(response.status_code, requests.codes.ok)
1082 res = response.json()
1083 self.assertIn('PCE calculation in progress',
1084 res['output']['configuration-response-common']['response-message'])
1085 time.sleep(self.WAITING)
1087 def test_69_get_ODU4_service2(self):
1088 response = test_utils.get_service_list_request(
1089 "services/service2-ODU4")
1090 self.assertEqual(response.status_code, requests.codes.ok)
1091 res = response.json()
1093 res['services'][0]['administrative-state'], 'inService')
1095 res['services'][0]['service-name'], 'service2-ODU4')
1097 res['services'][0]['connection-type'], 'infrastructure')
1099 res['services'][0]['lifecycle-state'], 'planned')
1102 def test_70_create_1GE_service(self):
1103 # pylint: disable=line-too-long
1104 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1105 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1106 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1107 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1108 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1109 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1110 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1111 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1112 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1113 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1114 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1115 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1116 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1117 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1118 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1119 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR3-CLIENT1"
1120 response = test_utils.service_create_request(self.cr_serv_sample_data)
1121 self.assertEqual(response.status_code, requests.codes.ok)
1122 res = response.json()
1123 self.assertIn('PCE calculation in progress',
1124 res['output']['configuration-response-common']['response-message'])
1125 time.sleep(self.WAITING)
1127 def test_71_get_1GE_service1(self):
1128 response = test_utils.get_service_list_request("services/service1-1GE")
1129 self.assertEqual(response.status_code, requests.codes.ok)
1130 res = response.json()
1132 res['services'][0]['administrative-state'], 'inService')
1134 res['services'][0]['service-name'], 'service1-1GE')
1136 res['services'][0]['connection-type'], 'service')
1138 res['services'][0]['lifecycle-state'], 'planned')
1141 def test_72_check_interface_1GE_CLIENT_spdra(self):
1142 response = test_utils_rfc8040.check_node_attribute_request(
1143 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1144 self.assertEqual(response['status_code'], requests.codes.ok)
1145 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1146 'administrative-state': 'inService',
1147 'supporting-circuit-pack-name': 'CP1-SFP4',
1148 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1149 'supporting-port': 'CP1-SFP4-P1'
1151 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1152 response['interface'][0])
1155 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1157 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1158 response = test_utils_rfc8040.check_node_attribute_request(
1159 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
1160 self.assertEqual(response['status_code'], requests.codes.ok)
1161 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1162 'administrative-state': 'inService',
1163 'supporting-circuit-pack-name': 'CP3-SFP1',
1164 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1165 'type': 'org-openroadm-interfaces:otnOdu',
1166 'supporting-port': 'CP3-SFP1-P1'}
1168 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1169 'rate': 'org-openroadm-otn-common-types:ODU0',
1170 'monitoring-mode': 'terminated'}
1171 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1172 response['interface'][0])
1173 self.assertDictEqual(dict(input_dict_2,
1174 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1175 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1176 self.assertDictEqual(
1177 {'payload-type': '07', 'exp-payload-type': '07'},
1178 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1180 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1181 response = test_utils_rfc8040.check_node_attribute_request(
1182 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
1183 self.assertEqual(response['status_code'], requests.codes.ok)
1184 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1185 'administrative-state': 'inService',
1186 'supporting-circuit-pack-name': 'CP3-CFP0',
1187 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1188 'type': 'org-openroadm-interfaces:otnOdu',
1189 'supporting-port': 'CP3-CFP0-P1'}
1191 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1192 'rate': 'org-openroadm-otn-common-types:ODU0',
1193 'monitoring-mode': 'monitored'}
1194 input_dict_3 = {'trib-port-number': 1}
1195 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1196 response['interface'][0])
1197 self.assertDictEqual(dict(input_dict_2,
1198 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1199 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1200 self.assertDictEqual(dict(input_dict_3,
1201 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1202 'parent-odu-allocation']),
1203 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1204 self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1207 def test_75_check_ODU0_connection_spdra(self):
1208 response = test_utils_rfc8040.check_node_attribute_request(
1209 'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1210 self.assertEqual(response['status_code'], requests.codes.ok)
1213 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1214 'direction': 'bidirectional'
1216 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1217 response['odu-connection'][0])
1218 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1219 response['odu-connection'][0]['destination'])
1220 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1221 response['odu-connection'][0]['source'])
1223 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1224 response = test_utils_rfc8040.check_node_attribute_request(
1225 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1226 self.assertEqual(response['status_code'], requests.codes.ok)
1227 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1228 'administrative-state': 'inService',
1229 'supporting-circuit-pack-name': 'CP3-SFP1',
1230 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1231 'supporting-port': 'CP3-SFP1-P1'
1233 self.assertDictEqual(dict(input_dict, **response['interface'][0]),
1234 response['interface'][0])
1237 response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
1239 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1240 response = test_utils_rfc8040.check_node_attribute_request(
1241 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
1242 self.assertEqual(response['status_code'], requests.codes.ok)
1243 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
1244 'administrative-state': 'inService',
1245 'supporting-circuit-pack-name': 'CP3-SFP1',
1246 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1247 'type': 'org-openroadm-interfaces:otnOdu',
1248 'supporting-port': 'CP3-SFP1-P1'}
1250 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1251 'rate': 'org-openroadm-otn-common-types:ODU0',
1252 'monitoring-mode': 'terminated'}
1253 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1254 response['interface'][0])
1255 self.assertDictEqual(dict(input_dict_2,
1256 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1257 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1258 self.assertDictEqual(
1259 {'payload-type': '07', 'exp-payload-type': '07'},
1260 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1262 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1263 response = test_utils_rfc8040.check_node_attribute_request(
1264 'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
1265 self.assertEqual(response['status_code'], requests.codes.ok)
1266 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
1267 'administrative-state': 'inService',
1268 'supporting-circuit-pack-name': 'CP3-CFP0',
1269 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1270 'type': 'org-openroadm-interfaces:otnOdu',
1271 'supporting-port': 'CP3-CFP0-P1'}
1273 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1274 'rate': 'org-openroadm-otn-common-types:ODU0',
1275 'monitoring-mode': 'monitored'}
1276 input_dict_3 = {'trib-port-number': 1}
1277 self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
1278 response['interface'][0])
1279 self.assertDictEqual(dict(input_dict_2,
1280 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1281 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1282 self.assertDictEqual(dict(input_dict_3,
1283 **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1284 'parent-odu-allocation']),
1285 response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1286 'parent-odu-allocation'])
1288 response['interface'][0][
1289 'org-openroadm-otn-odu-interfaces:odu'][
1290 'parent-odu-allocation']['trib-slots'])
1292 def test_79_check_ODU0_connection_spdrc(self):
1293 response = test_utils_rfc8040.check_node_attribute_request(
1294 'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
1295 self.assertEqual(response['status_code'], requests.codes.ok)
1298 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
1299 'direction': 'bidirectional'
1301 self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
1302 response['odu-connection'][0])
1303 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
1304 response['odu-connection'][0]['destination'])
1305 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
1306 response['odu-connection'][0]['source'])
1308 def test_80_check_otn_topo_links(self):
1309 response = test_utils.get_otn_topo_request()
1310 self.assertEqual(response.status_code, requests.codes.ok)
1311 res = response.json()
1312 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1313 self.assertEqual(nb_links, 4)
1314 for link in res['network'][0]['ietf-network-topology:link']:
1315 linkId = link['link-id']
1316 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1317 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1319 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1321 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1323 def test_81_check_otn_topo_tp(self):
1324 response = test_utils.get_otn_topo_request()
1325 res = response.json()
1326 for node in res['network'][0]['node']:
1327 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1328 tpList = node['ietf-network-topology:termination-point']
1330 if tp['tp-id'] == 'XPDR3-NETWORK1':
1331 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1332 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1333 tsPoolList = list(range(1, 2))
1335 tsPoolList, xpdrTpPortConAt['ts-pool'])
1337 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1339 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1341 def test_82_delete_1GE_service(self):
1342 response = test_utils.service_delete_request("service1-1GE")
1343 self.assertEqual(response.status_code, requests.codes.ok)
1344 res = response.json()
1345 self.assertIn('Renderer service delete in progress',
1346 res['output']['configuration-response-common']['response-message'])
1347 time.sleep(self.WAITING)
1349 def test_83_check_service_list(self):
1350 response = test_utils.get_service_list_request("")
1351 self.assertEqual(response.status_code, requests.codes.ok)
1352 res = response.json()
1353 self.assertEqual(len(res['service-list']['services']), 2)
1356 def test_84_check_no_ODU0_connection_spdra(self):
1357 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1358 self.assertEqual(response.status_code, requests.codes.ok)
1359 res = response.json()
1360 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1363 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1364 response = test_utils_rfc8040.check_node_attribute_request(
1365 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
1366 self.assertEqual(response['status_code'], requests.codes.conflict)
1368 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1369 response = test_utils_rfc8040.check_node_attribute_request(
1370 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
1371 self.assertEqual(response['status_code'], requests.codes.conflict)
1373 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1374 response = test_utils_rfc8040.check_node_attribute_request(
1375 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
1376 self.assertEqual(response['status_code'], requests.codes.conflict)
1378 def test_88_check_otn_topo_links(self):
1379 response = test_utils.get_otn_topo_request()
1380 self.assertEqual(response.status_code, requests.codes.ok)
1381 res = response.json()
1382 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1383 self.assertEqual(nb_links, 4)
1384 for link in res['network'][0]['ietf-network-topology:link']:
1385 linkId = link['link-id']
1386 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1387 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1389 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1391 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1393 def test_89_check_otn_topo_tp(self):
1394 response = test_utils.get_otn_topo_request()
1395 res = response.json()
1396 for node in res['network'][0]['node']:
1397 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1398 tpList = node['ietf-network-topology:termination-point']
1400 if tp['tp-id'] == 'XPDR3-NETWORK1':
1401 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1402 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1404 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1406 def test_90_delete_ODU4_service(self):
1407 response = test_utils.service_delete_request("service2-ODU4")
1408 self.assertEqual(response.status_code, requests.codes.ok)
1409 res = response.json()
1410 self.assertIn('Renderer service delete in progress',
1411 res['output']['configuration-response-common']['response-message'])
1412 time.sleep(self.WAITING)
1414 def test_91_delete_OCH_OTU4_service(self):
1415 response = test_utils.service_delete_request("service2-OCH-OTU4")
1416 self.assertEqual(response.status_code, requests.codes.ok)
1417 res = response.json()
1418 self.assertIn('Renderer service delete in progress',
1419 res['output']['configuration-response-common']['response-message'])
1420 time.sleep(self.WAITING)
1422 def test_92_disconnect_xponders_from_roadm(self):
1423 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1424 response = test_utils.get_ordm_topo_request("")
1425 self.assertEqual(response.status_code, requests.codes.ok)
1426 res = response.json()
1427 links = res['network'][0]['ietf-network-topology:link']
1429 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1430 link_name = link["link-id"]
1431 response = test_utils.delete_request(url+link_name)
1432 self.assertEqual(response.status_code, requests.codes.ok)
1434 def test_93_check_openroadm_topology(self):
1435 response = test_utils.get_ordm_topo_request("")
1436 self.assertEqual(response.status_code, requests.codes.ok)
1437 res = response.json()
1438 links = res['network'][0]['ietf-network-topology:link']
1439 self.assertEqual(18, len(links), 'Topology should contain 18 links')
1441 def test_94_disconnect_spdrA(self):
1442 response = test_utils_rfc8040.unmount_device("SPDR-SA1")
1443 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1445 def test_95_disconnect_spdrC(self):
1446 response = test_utils_rfc8040.unmount_device("SPDR-SC1")
1447 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1449 def test_96_disconnect_roadmA(self):
1450 response = test_utils_rfc8040.unmount_device("ROADM-A1")
1451 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1453 def test_97_disconnect_roadmC(self):
1454 response = test_utils_rfc8040.unmount_device("ROADM-C1")
1455 self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
1458 if __name__ == "__main__":
1459 unittest.main(verbosity=2)