3 ##############################################################################
4 # Copyright (c) 2020 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # pylint: disable=no-member
13 # pylint: disable=too-many-public-methods
14 # pylint: disable=too-many-lines
20 # pylint: disable=wrong-import-order
22 sys.path.append('transportpce_tests/common/')
23 # pylint: disable=wrong-import-position
24 # pylint: disable=import-error
25 import test_utils # nopep8
28 class TransportPCEtesting(unittest.TestCase):
31 WAITING = 20 # nominal value is 300
32 NODE_VERSION = '2.2.1'
34 cr_serv_sample_data = {"input": {
35 "sdnc-request-header": {
36 "request-id": "request-1",
37 "rpc-action": "service-create",
38 "request-system-id": "appname"
40 "service-name": "service1-OCH-OTU4",
41 "common-id": "commonId",
42 "connection-type": "infrastructure",
44 "service-rate": "100",
45 "node-id": "SPDR-SA1",
46 "service-format": "OTU",
47 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
51 "committed-info-rate": "100000",
52 "committed-burst-size": "64"
57 "port-device-name": "SPDR-SA1-XPDR1",
59 "port-name": "XPDR1-NETWORK1",
60 "port-rack": "000000.00",
61 "port-shelf": "Chassis#1"
64 "lgx-device-name": "Some lgx-device-name",
65 "lgx-port-name": "Some lgx-port-name",
66 "lgx-port-rack": "000000.00",
67 "lgx-port-shelf": "00"
72 "port-device-name": "SPDR-SA1-XPDR1",
74 "port-name": "XPDR1-NETWORK1",
75 "port-rack": "000000.00",
76 "port-shelf": "Chassis#1"
79 "lgx-device-name": "Some lgx-device-name",
80 "lgx-port-name": "Some lgx-port-name",
81 "lgx-port-rack": "000000.00",
82 "lgx-port-shelf": "00"
88 "service-rate": "100",
89 "node-id": "SPDR-SC1",
90 "service-format": "OTU",
91 "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
95 "committed-info-rate": "100000",
96 "committed-burst-size": "64"
101 "port-device-name": "SPDR-SC1-XPDR1",
102 "port-type": "fixed",
103 "port-name": "XPDR1-NETWORK1",
104 "port-rack": "000000.00",
105 "port-shelf": "Chassis#1"
108 "lgx-device-name": "Some lgx-device-name",
109 "lgx-port-name": "Some lgx-port-name",
110 "lgx-port-rack": "000000.00",
111 "lgx-port-shelf": "00"
116 "port-device-name": "SPDR-SC1-XPDR1",
117 "port-type": "fixed",
118 "port-name": "XPDR1-NETWORK1",
119 "port-rack": "000000.00",
120 "port-shelf": "Chassis#1"
123 "lgx-device-name": "Some lgx-device-name",
124 "lgx-port-name": "Some lgx-port-name",
125 "lgx-port-rack": "000000.00",
126 "lgx-port-shelf": "00"
131 "due-date": "2018-06-15T00:00:01Z",
132 "operator-contact": "pw1234"
138 cls.processes = test_utils.start_tpce()
139 cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
140 ('roadma', cls.NODE_VERSION),
141 ('roadmc', cls.NODE_VERSION),
142 ('spdrc', cls.NODE_VERSION)])
145 def tearDownClass(cls):
146 # pylint: disable=not-an-iterable
147 for process in cls.processes:
148 test_utils.shutdown_process(process)
149 print("all processes killed")
154 def test_01_connect_spdrA(self):
155 response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
156 self.assertEqual(response.status_code,
157 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
159 def test_02_connect_spdrC(self):
160 response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
161 self.assertEqual(response.status_code,
162 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
164 def test_03_connect_rdmA(self):
165 response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
166 self.assertEqual(response.status_code,
167 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
169 def test_04_connect_rdmC(self):
170 response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
171 self.assertEqual(response.status_code,
172 requests.codes.created, test_utils.CODE_SHOULD_BE_201)
174 def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
175 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
176 "ROADM-A1", "1", "SRG1-PP1-TXRX")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertIn('Xponder Roadm Link created successfully',
180 res["output"]["result"])
183 def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
184 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
185 "ROADM-A1", "1", "SRG1-PP1-TXRX")
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Roadm Xponder links created successfully',
189 res["output"]["result"])
192 def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
193 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
194 "ROADM-C1", "1", "SRG1-PP1-TXRX")
195 self.assertEqual(response.status_code, requests.codes.ok)
196 res = response.json()
197 self.assertIn('Xponder Roadm Link created successfully',
198 res["output"]["result"])
201 def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
202 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
203 "ROADM-C1", "1", "SRG1-PP1-TXRX")
204 self.assertEqual(response.status_code, requests.codes.ok)
205 res = response.json()
206 self.assertIn('Roadm Xponder links created successfully',
207 res["output"]["result"])
210 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
211 # Config ROADMA-ROADMC oms-attributes
213 "auto-spanloss": "true",
214 "spanloss-base": 11.4,
215 "spanloss-current": 12,
216 "engineered-spanloss": 12.2,
217 "link-concatenation": [{
220 "SRLG-length": 100000,
222 response = test_utils.add_oms_attr_request(
223 "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
224 self.assertEqual(response.status_code, requests.codes.created)
226 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
227 # Config ROADMC-ROADMA oms-attributes
229 "auto-spanloss": "true",
230 "spanloss-base": 11.4,
231 "spanloss-current": 12,
232 "engineered-spanloss": 12.2,
233 "link-concatenation": [{
236 "SRLG-length": 100000,
238 response = test_utils.add_oms_attr_request(
239 "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
240 self.assertEqual(response.status_code, requests.codes.created)
242 # test service-create for OCH-OTU4 service from spdr to spdr
243 def test_11_check_otn_topology(self):
244 response = test_utils.get_otn_topo_request()
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
247 nbNode = len(res['network'][0]['node'])
248 self.assertEqual(nbNode, 6, 'There should be 6 nodes')
249 self.assertNotIn('ietf-network-topology:link', res['network'][0])
251 def test_12_create_OCH_OTU4_service(self):
252 response = test_utils.service_create_request(self.cr_serv_sample_data)
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
255 self.assertIn('PCE calculation in progress',
256 res['output']['configuration-response-common']['response-message'])
257 time.sleep(self.WAITING)
259 def test_13_get_OCH_OTU4_service1(self):
260 response = test_utils.get_service_list_request(
261 "services/service1-OCH-OTU4")
262 self.assertEqual(response.status_code, requests.codes.ok)
263 res = response.json()
265 res['services'][0]['administrative-state'], 'inService')
267 res['services'][0]['service-name'], 'service1-OCH-OTU4')
269 res['services'][0]['connection-type'], 'infrastructure')
271 res['services'][0]['lifecycle-state'], 'planned')
274 # Check correct configuration of devices
275 def test_14_check_interface_och_spdra(self):
276 response = test_utils.check_netconf_node_request(
277 "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
278 self.assertEqual(response.status_code, requests.codes.ok)
279 res = response.json()
280 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
281 'administrative-state': 'inService',
282 'supporting-circuit-pack-name': 'CP1-CFP0',
283 'type': 'org-openroadm-interfaces:opticalChannel',
284 'supporting-port': 'CP1-CFP0-P1'
285 }, **res['interface'][0]),
288 self.assertDictEqual(
289 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
290 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
291 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
293 def test_15_check_interface_OTU4_spdra(self):
294 response = test_utils.check_netconf_node_request(
295 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
296 self.assertEqual(response.status_code, requests.codes.ok)
297 res = response.json()
298 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
299 'administrative-state': 'inService',
300 'supporting-circuit-pack-name': 'CP1-CFP0',
301 'supporting-interface': 'XPDR1-NETWORK1-761:768',
302 'type': 'org-openroadm-interfaces:otnOtu',
303 'supporting-port': 'CP1-CFP0-P1'
305 input_dict_2 = {'tx-sapi': 'H/OelLynehI=',
306 'expected-dapi': 'H/OelLynehI=',
307 'tx-dapi': 'AMf1n5hK6Xkk',
308 'expected-sapi': 'AMf1n5hK6Xkk',
309 'rate': 'org-openroadm-otn-common-types:OTU4',
312 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
315 self.assertDictEqual(input_dict_2,
317 ['org-openroadm-otn-otu-interfaces:otu'])
319 def test_16_check_interface_och_spdrc(self):
320 response = test_utils.check_netconf_node_request(
321 "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
322 self.assertEqual(response.status_code, requests.codes.ok)
323 res = response.json()
324 self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
325 'administrative-state': 'inService',
326 'supporting-circuit-pack-name': 'CP1-CFP0',
327 'type': 'org-openroadm-interfaces:opticalChannel',
328 'supporting-port': 'CP1-CFP0-P1'
329 }, **res['interface'][0]),
332 self.assertDictEqual(
333 {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
334 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
335 res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
337 def test_17_check_interface_OTU4_spdrc(self):
338 response = test_utils.check_netconf_node_request(
339 "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
343 'administrative-state': 'inService',
344 'supporting-circuit-pack-name': 'CP1-CFP0',
345 'supporting-interface': 'XPDR1-NETWORK1-761:768',
346 'type': 'org-openroadm-interfaces:otnOtu',
347 'supporting-port': 'CP1-CFP0-P1'
349 input_dict_2 = {'tx-dapi': 'H/OelLynehI=',
350 'expected-sapi': 'H/OelLynehI=',
351 'tx-sapi': 'AMf1n5hK6Xkk',
352 'expected-dapi': 'AMf1n5hK6Xkk',
353 'rate': 'org-openroadm-otn-common-types:OTU4',
357 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
360 self.assertDictEqual(input_dict_2,
362 ['org-openroadm-otn-otu-interfaces:otu'])
364 def test_18_check_no_interface_ODU4_spdra(self):
365 response = test_utils.check_netconf_node_request(
366 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
367 self.assertEqual(response.status_code, requests.codes.conflict)
368 res = response.json()
370 {"error-type": "application", "error-tag": "data-missing",
371 "error-message": "Request could not be completed because the relevant data model content does not exist"},
372 res['errors']['error'])
374 def test_19_check_openroadm_topo_spdra(self):
375 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
376 self.assertEqual(response.status_code, requests.codes.ok)
377 res = response.json()
378 ele = res['node'][0]['ietf-network-topology:termination-point'][0]
379 self.assertEqual('XPDR1-NETWORK1', ele['tp-id'])
380 self.assertEqual({'frequency': 196.1,
382 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
385 def test_20_check_openroadm_topo_ROADMA_SRG(self):
386 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
387 self.assertEqual(response.status_code, requests.codes.ok)
388 res = response.json()
389 freq_map = base64.b64decode(
390 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
391 freq_map_array = [int(x) for x in freq_map]
392 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
393 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
395 if ele['tp-id'] == 'SRG1-PP1-TXRX':
396 freq_map = base64.b64decode(
397 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
398 freq_map_array = [int(x) for x in freq_map]
399 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
400 if ele['tp-id'] == 'SRG1-PP2-TXRX':
401 self.assertNotIn('avail-freq-maps', dict.keys(ele))
404 def test_21_check_openroadm_topo_ROADMA_DEG(self):
405 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
406 self.assertEqual(response.status_code, requests.codes.ok)
407 res = response.json()
408 freq_map = base64.b64decode(
409 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
410 freq_map_array = [int(x) for x in freq_map]
411 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
412 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
414 if ele['tp-id'] == 'DEG2-CTP-TXRX':
415 freq_map = base64.b64decode(
416 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
417 freq_map_array = [int(x) for x in freq_map]
418 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
419 if ele['tp-id'] == 'DEG2-TTP-TXRX':
420 freq_map = base64.b64decode(
421 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
422 freq_map_array = [int(x) for x in freq_map]
423 self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
426 def test_22_check_otn_topo_otu4_links(self):
427 response = test_utils.get_otn_topo_request()
428 self.assertEqual(response.status_code, requests.codes.ok)
429 res = response.json()
430 nb_links = len(res['network'][0]['ietf-network-topology:link'])
431 self.assertEqual(nb_links, 2)
432 listLinkId = ['OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
433 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1']
434 for link in res['network'][0]['ietf-network-topology:link']:
435 self.assertIn(link['link-id'], listLinkId)
437 link['transportpce-topology:otn-link-type'], 'OTU4')
439 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
441 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
443 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
445 link['org-openroadm-common-network:opposite-link'], listLinkId)
447 # test service-create for ODU4 service from spdr to spdr
448 def test_23_create_ODU4_service(self):
449 self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
450 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
451 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
452 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
453 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
454 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
455 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
457 response = test_utils.service_create_request(self.cr_serv_sample_data)
458 self.assertEqual(response.status_code, requests.codes.ok)
459 res = response.json()
460 self.assertIn('PCE calculation in progress',
461 res['output']['configuration-response-common']['response-message'])
462 time.sleep(self.WAITING)
464 def test_24_get_ODU4_service1(self):
465 response = test_utils.get_service_list_request(
466 "services/service1-ODU4")
467 self.assertEqual(response.status_code, requests.codes.ok)
468 res = response.json()
470 res['services'][0]['administrative-state'], 'inService')
472 res['services'][0]['service-name'], 'service1-ODU4')
474 res['services'][0]['connection-type'], 'infrastructure')
476 res['services'][0]['lifecycle-state'], 'planned')
479 def test_25_check_interface_ODU4_spdra(self):
480 response = test_utils.check_netconf_node_request(
481 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
482 self.assertEqual(response.status_code, requests.codes.ok)
483 res = response.json()
484 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
485 'administrative-state': 'inService',
486 'supporting-circuit-pack-name': 'CP1-CFP0',
487 'supporting-interface': 'XPDR1-NETWORK1-OTU',
488 'type': 'org-openroadm-interfaces:otnOdu',
489 'supporting-port': 'CP1-CFP0-P1',
491 'description': 'TBD'}
492 # SAPI/DAPI are added in the Otu4 renderer
493 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
494 'rate': 'org-openroadm-otn-common-types:ODU4',
495 'monitoring-mode': 'terminated',
496 'expected-dapi': 'H/OelLynehI=',
497 'expected-sapi': 'AMf1n5hK6Xkk',
498 'tx-dapi': 'AMf1n5hK6Xkk',
499 'tx-sapi': 'H/OelLynehI='}
501 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
503 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
505 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
507 self.assertDictEqual(
508 {'payload-type': '21', 'exp-payload-type': '21'},
509 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
511 def test_26_check_interface_ODU4_spdrc(self):
512 response = test_utils.check_netconf_node_request(
513 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
514 self.assertEqual(response.status_code, requests.codes.ok)
515 res = response.json()
516 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
517 'administrative-state': 'inService',
518 'supporting-circuit-pack-name': 'CP1-CFP0',
519 'supporting-interface': 'XPDR1-NETWORK1-OTU',
520 'type': 'org-openroadm-interfaces:otnOdu',
521 'supporting-port': 'CP1-CFP0-P1',
523 'description': 'TBD'}
524 # SAPI/DAPI are added in the Otu4 renderer
525 input_dict_2 = {'odu-function': 'org-openroadm-otn-common-types:ODU-TTP',
526 'rate': 'org-openroadm-otn-common-types:ODU4',
527 'monitoring-mode': 'terminated',
528 'tx-sapi': 'AMf1n5hK6Xkk',
529 'tx-dapi': 'H/OelLynehI=',
530 'expected-sapi': 'H/OelLynehI=',
531 'expected-dapi': 'AMf1n5hK6Xkk'
533 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
535 self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
537 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
539 self.assertDictEqual(
540 {'payload-type': '21', 'exp-payload-type': '21'},
541 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
543 def test_27_check_otn_topo_links(self):
544 response = test_utils.get_otn_topo_request()
545 self.assertEqual(response.status_code, requests.codes.ok)
546 res = response.json()
547 nb_links = len(res['network'][0]['ietf-network-topology:link'])
548 self.assertEqual(nb_links, 4)
549 for link in res['network'][0]['ietf-network-topology:link']:
550 linkId = link['link-id']
551 if (linkId in ('OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
552 'OTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
554 link['org-openroadm-otn-network-topology:available-bandwidth'], 0)
556 link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
557 elif (linkId in ('ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
558 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
560 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
562 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
564 link['transportpce-topology:otn-link-type'], 'ODTU4')
566 link['org-openroadm-common-network:link-type'], 'OTN-LINK')
567 self.assertIn(link['org-openroadm-common-network:opposite-link'],
568 ['ODTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
569 'ODTU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1'])
571 self.fail("this link should not exist")
573 def test_28_check_otn_topo_tp(self):
574 response = test_utils.get_otn_topo_request()
575 res = response.json()
576 for node in res['network'][0]['node']:
577 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
578 tpList = node['ietf-network-topology:termination-point']
580 if tp['tp-id'] == 'XPDR1-NETWORK1':
581 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
582 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
584 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
585 self.assertEqual(xpdrTpPortConAt['odtu-tpn-pool'][0]['odtu-type'],
586 'org-openroadm-otn-common-types:ODTU4.ts-Allocated')
588 # test service-create for 10GE service from spdr to spdr
589 def test_29_create_10GE_service(self):
590 self.cr_serv_sample_data["input"]["service-name"] = "service1-10GE"
591 self.cr_serv_sample_data["input"]["connection-type"] = "service"
592 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "10"
593 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
594 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
595 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
596 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
597 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "10"
598 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
599 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
600 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
601 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR1-CLIENT1"
602 response = test_utils.service_create_request(self.cr_serv_sample_data)
603 self.assertEqual(response.status_code, requests.codes.ok)
604 res = response.json()
605 self.assertIn('PCE calculation in progress',
606 res['output']['configuration-response-common']['response-message'])
607 time.sleep(self.WAITING)
609 def test_30_get_10GE_service1(self):
610 response = test_utils.get_service_list_request(
611 "services/service1-10GE")
612 self.assertEqual(response.status_code, requests.codes.ok)
613 res = response.json()
615 res['services'][0]['administrative-state'], 'inService')
617 res['services'][0]['service-name'], 'service1-10GE')
619 res['services'][0]['connection-type'], 'service')
621 res['services'][0]['lifecycle-state'], 'planned')
624 def test_31_check_interface_10GE_CLIENT_spdra(self):
625 response = test_utils.check_netconf_node_request(
626 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
627 self.assertEqual(response.status_code, requests.codes.ok)
628 res = response.json()
629 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
630 'administrative-state': 'inService',
631 'supporting-circuit-pack-name': 'CP1-SFP4',
632 'type': 'org-openroadm-interfaces:ethernetCsmacd',
633 'supporting-port': 'CP1-SFP4-P1'
635 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
637 self.assertDictEqual(
639 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
641 def test_32_check_interface_ODU2E_CLIENT_spdra(self):
642 response = test_utils.check_netconf_node_request(
643 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
644 self.assertEqual(response.status_code, requests.codes.ok)
645 res = response.json()
646 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
647 'administrative-state': 'inService',
648 'supporting-circuit-pack-name': 'CP1-SFP4',
649 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
650 'type': 'org-openroadm-interfaces:otnOdu',
651 'supporting-port': 'CP1-SFP4-P1'}
653 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
654 'rate': 'org-openroadm-otn-common-types:ODU2e',
655 'monitoring-mode': 'terminated'}
657 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
659 self.assertDictEqual(dict(input_dict_2,
660 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
661 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
662 self.assertDictEqual(
663 {'payload-type': '03', 'exp-payload-type': '03'},
664 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
666 def test_33_check_interface_ODU2E_NETWORK_spdra(self):
667 response = test_utils.check_netconf_node_request(
668 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
669 self.assertEqual(response.status_code, requests.codes.ok)
670 res = response.json()
671 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
672 'administrative-state': 'inService',
673 'supporting-circuit-pack-name': 'CP1-CFP0',
674 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
675 'type': 'org-openroadm-interfaces:otnOdu',
676 'supporting-port': 'CP1-CFP0-P1'}
678 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
679 'rate': 'org-openroadm-otn-common-types:ODU2e',
680 'monitoring-mode': 'monitored'}
681 input_dict_3 = {'trib-port-number': 1}
683 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
685 self.assertDictEqual(dict(input_dict_2,
686 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
687 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
688 self.assertDictEqual(dict(input_dict_3,
689 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
690 'parent-odu-allocation']),
691 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
692 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
695 def test_34_check_ODU2E_connection_spdra(self):
696 response = test_utils.check_netconf_node_request(
698 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
699 self.assertEqual(response.status_code, requests.codes.ok)
700 res = response.json()
703 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
704 'direction': 'bidirectional'
707 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
708 res['odu-connection'][0])
709 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
710 res['odu-connection'][0]['destination'])
711 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
712 res['odu-connection'][0]['source'])
714 def test_35_check_interface_10GE_CLIENT_spdrc(self):
715 response = test_utils.check_netconf_node_request(
716 "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
717 self.assertEqual(response.status_code, requests.codes.ok)
718 res = response.json()
719 input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
720 'administrative-state': 'inService',
721 'supporting-circuit-pack-name': 'CP1-SFP4',
722 'type': 'org-openroadm-interfaces:ethernetCsmacd',
723 'supporting-port': 'CP1-SFP4-P1'
725 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
727 self.assertDictEqual(
729 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
731 def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
732 response = test_utils.check_netconf_node_request(
733 "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e-service1-10GE")
734 self.assertEqual(response.status_code, requests.codes.ok)
735 res = response.json()
736 input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e-service1-10GE',
737 'administrative-state': 'inService',
738 'supporting-circuit-pack-name': 'CP1-SFP4',
739 'supporting-interface': 'XPDR1-CLIENT1-ETHERNET10G',
740 'type': 'org-openroadm-interfaces:otnOdu',
741 'supporting-port': 'CP1-SFP4-P1'}
743 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
744 'rate': 'org-openroadm-otn-common-types:ODU2e',
745 'monitoring-mode': 'terminated'}
747 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
749 self.assertDictEqual(dict(input_dict_2,
750 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
751 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
752 self.assertDictEqual(
753 {'payload-type': '03', 'exp-payload-type': '03'},
754 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
756 def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
757 response = test_utils.check_netconf_node_request(
758 "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e-service1-10GE")
759 self.assertEqual(response.status_code, requests.codes.ok)
760 res = response.json()
761 input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e-service1-10GE',
762 'administrative-state': 'inService',
763 'supporting-circuit-pack-name': 'CP1-CFP0',
764 'supporting-interface': 'XPDR1-NETWORK1-ODU4',
765 'type': 'org-openroadm-interfaces:otnOdu',
766 'supporting-port': 'CP1-CFP0-P1'}
768 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
769 'rate': 'org-openroadm-otn-common-types:ODU2e',
770 'monitoring-mode': 'monitored'}
772 input_dict_3 = {'trib-port-number': 1}
774 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
776 self.assertDictEqual(dict(input_dict_2,
777 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
778 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
779 self.assertDictEqual(dict(input_dict_3,
780 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
781 'parent-odu-allocation']),
782 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
783 'parent-odu-allocation'])
786 'org-openroadm-otn-odu-interfaces:odu'][
787 'parent-odu-allocation']['trib-slots'])
789 def test_38_check_ODU2E_connection_spdrc(self):
790 response = test_utils.check_netconf_node_request(
792 "odu-connection/XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE")
793 self.assertEqual(response.status_code, requests.codes.ok)
794 res = response.json()
797 'XPDR1-CLIENT1-ODU2e-service1-10GE-x-XPDR1-NETWORK1-ODU2e-service1-10GE',
798 'direction': 'bidirectional'
801 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
802 res['odu-connection'][0])
803 self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e-service1-10GE'},
804 res['odu-connection'][0]['destination'])
805 self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e-service1-10GE'},
806 res['odu-connection'][0]['source'])
808 def test_39_check_otn_topo_links(self):
809 response = test_utils.get_otn_topo_request()
810 self.assertEqual(response.status_code, requests.codes.ok)
811 res = response.json()
812 nb_links = len(res['network'][0]['ietf-network-topology:link'])
813 self.assertEqual(nb_links, 4)
814 for link in res['network'][0]['ietf-network-topology:link']:
815 linkId = link['link-id']
816 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
817 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
819 link['org-openroadm-otn-network-topology:available-bandwidth'], 90000)
821 link['org-openroadm-otn-network-topology:used-bandwidth'], 10000)
823 def test_40_check_otn_topo_tp(self):
824 response = test_utils.get_otn_topo_request()
825 res = response.json()
826 for node in res['network'][0]['node']:
827 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
828 tpList = node['ietf-network-topology:termination-point']
830 if tp['tp-id'] == 'XPDR1-NETWORK1':
831 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
832 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 72)
833 tsPoolList = list(range(1, 9))
835 tsPoolList, xpdrTpPortConAt['ts-pool'])
837 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
839 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
841 def test_41_delete_10GE_service(self):
842 response = test_utils.service_delete_request("service1-10GE")
843 self.assertEqual(response.status_code, requests.codes.ok)
844 res = response.json()
845 self.assertIn('Renderer service delete in progress',
846 res['output']['configuration-response-common']['response-message'])
847 time.sleep(self.WAITING)
849 def test_42_check_service_list(self):
850 response = test_utils.get_service_list_request("")
851 self.assertEqual(response.status_code, requests.codes.ok)
852 res = response.json()
853 self.assertEqual(len(res['service-list']['services']), 2)
856 def test_43_check_no_ODU2e_connection_spdra(self):
857 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
858 self.assertEqual(response.status_code, requests.codes.ok)
859 res = response.json()
860 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
863 def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
864 response = test_utils.check_netconf_node_request(
865 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
866 self.assertEqual(response.status_code, requests.codes.conflict)
868 def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
869 response = test_utils.check_netconf_node_request(
870 "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
871 self.assertEqual(response.status_code, requests.codes.conflict)
873 def test_46_check_no_interface_10GE_CLIENT_spdra(self):
874 response = test_utils.check_netconf_node_request(
875 "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
876 self.assertEqual(response.status_code, requests.codes.conflict)
878 def test_47_check_otn_topo_links(self):
879 response = test_utils.get_otn_topo_request()
880 self.assertEqual(response.status_code, requests.codes.ok)
881 res = response.json()
882 nb_links = len(res['network'][0]['ietf-network-topology:link'])
883 self.assertEqual(nb_links, 4)
884 for link in res['network'][0]['ietf-network-topology:link']:
885 linkId = link['link-id']
886 if (linkId in ('ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1',
887 'ODU4-SPDR-SC1-XPDR1-XPDR1-NETWORK1toSPDR-SA1-XPDR1-XPDR1-NETWORK1')):
889 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
891 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
893 def test_48_check_otn_topo_tp(self):
894 response = test_utils.get_otn_topo_request()
895 res = response.json()
896 for node in res['network'][0]['node']:
897 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
898 tpList = node['ietf-network-topology:termination-point']
900 if tp['tp-id'] == 'XPDR1-NETWORK1':
901 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
902 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
904 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
906 def test_49_delete_ODU4_service(self):
907 response = test_utils.service_delete_request("service1-ODU4")
908 self.assertEqual(response.status_code, requests.codes.ok)
909 res = response.json()
910 self.assertIn('Renderer service delete in progress',
911 res['output']['configuration-response-common']['response-message'])
912 time.sleep(self.WAITING)
914 def test_50_check_service_list(self):
915 response = test_utils.get_service_list_request("")
916 self.assertEqual(response.status_code, requests.codes.ok)
917 res = response.json()
918 self.assertEqual(len(res['service-list']['services']), 1)
921 def test_51_check_no_interface_ODU4_spdra(self):
922 response = test_utils.check_netconf_node_request(
923 "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
924 self.assertEqual(response.status_code, requests.codes.conflict)
926 def test_52_check_otn_topo_links(self):
927 self.test_22_check_otn_topo_otu4_links()
929 def test_53_check_otn_topo_tp(self):
930 response = test_utils.get_otn_topo_request()
931 res = response.json()
932 for node in res['network'][0]['node']:
933 if node['node-id'] in ('SPDR-SA1-XPDR1', 'SPDR-SC1-XPDR1'):
934 tpList = node['ietf-network-topology:termination-point']
936 if tp['tp-id'] == 'XPDR1-NETWORK1':
937 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
938 self.assertNotIn('ts-pool', dict.keys(xpdrTpPortConAt))
940 'odtu-tpn-pool', dict.keys(xpdrTpPortConAt))
942 def test_54_delete_OCH_OTU4_service(self):
943 response = test_utils.service_delete_request("service1-OCH-OTU4")
944 self.assertEqual(response.status_code, requests.codes.ok)
945 res = response.json()
946 self.assertIn('Renderer service delete in progress',
947 res['output']['configuration-response-common']['response-message'])
948 time.sleep(self.WAITING)
950 def test_55_get_no_service(self):
951 response = test_utils.get_service_list_request("")
952 self.assertEqual(response.status_code, requests.codes.conflict)
953 res = response.json()
955 {"error-type": "application", "error-tag": "data-missing",
956 "error-message": "Request could not be completed because the relevant data model content does not exist"},
957 res['errors']['error'])
960 def test_56_check_no_interface_OTU4_spdra(self):
961 response = test_utils.check_netconf_node_request(
962 "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
963 self.assertEqual(response.status_code, requests.codes.conflict)
965 def test_57_check_no_interface_OCH_spdra(self):
966 response = test_utils.check_netconf_node_request(
967 "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
968 self.assertEqual(response.status_code, requests.codes.conflict)
970 def test_58_getLinks_OtnTopology(self):
971 response = test_utils.get_otn_topo_request()
972 self.assertEqual(response.status_code, requests.codes.ok)
973 res = response.json()
974 self.assertNotIn('ietf-network-topology:link', res['network'][0])
976 def test_59_check_openroadm_topo_spdra(self):
977 response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
978 self.assertEqual(response.status_code, requests.codes.ok)
979 res = response.json()
980 tp = res['node'][0]['ietf-network-topology:termination-point'][0]
981 self.assertEqual('XPDR1-NETWORK1', tp['tp-id'])
982 self.assertNotIn('wavelength', dict.keys(
983 tp['org-openroadm-network-topology:xpdr-network-attributes']))
986 def test_60_check_openroadm_topo_ROADMA_SRG(self):
987 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
988 self.assertEqual(response.status_code, requests.codes.ok)
989 res = response.json()
990 freq_map = base64.b64decode(
991 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
992 freq_map_array = [int(x) for x in freq_map]
993 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
994 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
996 if ele['tp-id'] == 'SRG1-PP1-TXRX':
997 freq_map = base64.b64decode(
998 ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
999 freq_map_array = [int(x) for x in freq_map]
1000 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1003 def test_61_check_openroadm_topo_ROADMA_DEG(self):
1004 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
1005 self.assertEqual(response.status_code, requests.codes.ok)
1006 res = response.json()
1007 freq_map = base64.b64decode(
1008 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
1009 freq_map_array = [int(x) for x in freq_map]
1010 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1011 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1012 for ele in liste_tp:
1013 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1014 freq_map = base64.b64decode(
1015 ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
1016 freq_map_array = [int(x) for x in freq_map]
1017 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1018 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1019 freq_map = base64.b64decode(
1020 ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
1021 freq_map_array = [int(x) for x in freq_map]
1022 self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
1025 def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
1026 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
1027 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1028 self.assertEqual(response.status_code, requests.codes.ok)
1029 res = response.json()
1030 self.assertIn('Xponder Roadm Link created successfully',
1031 res["output"]["result"])
1034 def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
1035 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
1036 "ROADM-A1", "1", "SRG1-PP2-TXRX")
1037 self.assertEqual(response.status_code, requests.codes.ok)
1038 res = response.json()
1039 self.assertIn('Roadm Xponder links created successfully',
1040 res["output"]["result"])
1043 def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
1044 response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
1045 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1046 self.assertEqual(response.status_code, requests.codes.ok)
1047 res = response.json()
1048 self.assertIn('Xponder Roadm Link created successfully',
1049 res["output"]["result"])
1052 def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
1053 response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
1054 "ROADM-C1", "1", "SRG1-PP2-TXRX")
1055 self.assertEqual(response.status_code, requests.codes.ok)
1056 res = response.json()
1057 self.assertIn('Roadm Xponder links created successfully',
1058 res["output"]["result"])
1061 def test_66_create_OCH_OTU4_service_2(self):
1062 # pylint: disable=line-too-long
1063 self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
1064 self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
1065 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
1066 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
1067 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1068 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1069 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1070 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1071 self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1072 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
1073 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
1074 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1075 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1076 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1077 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1078 self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
1079 response = test_utils.service_create_request(self.cr_serv_sample_data)
1080 self.assertEqual(response.status_code, requests.codes.ok)
1081 res = response.json()
1082 self.assertIn('PCE calculation in progress',
1083 res['output']['configuration-response-common']['response-message'])
1084 time.sleep(self.WAITING)
1086 def test_67_get_OCH_OTU4_service2(self):
1087 response = test_utils.get_service_list_request(
1088 "services/service2-OCH-OTU4")
1089 self.assertEqual(response.status_code, requests.codes.ok)
1090 res = response.json()
1092 res['services'][0]['administrative-state'], 'inService')
1094 res['services'][0]['service-name'], 'service2-OCH-OTU4')
1096 res['services'][0]['connection-type'], 'infrastructure')
1098 res['services'][0]['lifecycle-state'], 'planned')
1101 def test_68_create_ODU4_service_2(self):
1102 # pylint: disable=line-too-long
1103 self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
1104 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
1105 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1106 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1107 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1108 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1109 self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1110 del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
1111 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
1112 self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
1113 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1114 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1115 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1116 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
1117 del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
1119 response = test_utils.service_create_request(self.cr_serv_sample_data)
1120 self.assertEqual(response.status_code, requests.codes.ok)
1121 res = response.json()
1122 self.assertIn('PCE calculation in progress',
1123 res['output']['configuration-response-common']['response-message'])
1124 time.sleep(self.WAITING)
1126 def test_69_get_ODU4_service2(self):
1127 response = test_utils.get_service_list_request(
1128 "services/service2-ODU4")
1129 self.assertEqual(response.status_code, requests.codes.ok)
1130 res = response.json()
1132 res['services'][0]['administrative-state'], 'inService')
1134 res['services'][0]['service-name'], 'service2-ODU4')
1136 res['services'][0]['connection-type'], 'infrastructure')
1138 res['services'][0]['lifecycle-state'], 'planned')
1141 def test_70_create_1GE_service(self):
1142 # pylint: disable=line-too-long
1143 self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
1144 self.cr_serv_sample_data["input"]["connection-type"] = "service"
1145 self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
1146 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
1147 self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
1148 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
1149 del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
1150 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1151 self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1152 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
1153 self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1154 del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
1155 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1156 self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1157 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
1158 self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
1159 response = test_utils.service_create_request(self.cr_serv_sample_data)
1160 self.assertEqual(response.status_code, requests.codes.ok)
1161 res = response.json()
1162 self.assertIn('PCE calculation in progress',
1163 res['output']['configuration-response-common']['response-message'])
1164 time.sleep(self.WAITING)
1166 def test_71_get_1GE_service1(self):
1167 response = test_utils.get_service_list_request("services/service1-1GE")
1168 self.assertEqual(response.status_code, requests.codes.ok)
1169 res = response.json()
1171 res['services'][0]['administrative-state'], 'inService')
1173 res['services'][0]['service-name'], 'service1-1GE')
1175 res['services'][0]['connection-type'], 'service')
1177 res['services'][0]['lifecycle-state'], 'planned')
1180 def test_72_check_interface_1GE_CLIENT_spdra(self):
1181 response = test_utils.check_netconf_node_request(
1182 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1183 self.assertEqual(response.status_code, requests.codes.ok)
1184 res = response.json()
1185 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1186 'administrative-state': 'inService',
1187 'supporting-circuit-pack-name': 'CP1-SFP4',
1188 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1189 'supporting-port': 'CP1-SFP4-P1'
1191 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1192 res['interface'][0])
1193 self.assertDictEqual(
1195 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1197 def test_73_check_interface_ODU0_CLIENT_spdra(self):
1198 response = test_utils.check_netconf_node_request(
1199 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1200 self.assertEqual(response.status_code, requests.codes.ok)
1201 res = response.json()
1202 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1203 'administrative-state': 'inService',
1204 'supporting-circuit-pack-name': 'CP3-SFP1',
1205 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1206 'type': 'org-openroadm-interfaces:otnOdu',
1207 'supporting-port': 'CP3-SFP1-P1'}
1209 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1210 'rate': 'org-openroadm-otn-common-types:ODU0',
1211 'monitoring-mode': 'terminated'}
1213 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1214 res['interface'][0])
1215 self.assertDictEqual(dict(input_dict_2,
1216 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1217 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1218 self.assertDictEqual(
1219 {'payload-type': '07', 'exp-payload-type': '07'},
1220 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1222 def test_74_check_interface_ODU0_NETWORK_spdra(self):
1223 response = test_utils.check_netconf_node_request(
1224 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1225 self.assertEqual(response.status_code, requests.codes.ok)
1226 res = response.json()
1227 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1228 'administrative-state': 'inService',
1229 'supporting-circuit-pack-name': 'CP3-CFP0',
1230 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1231 'type': 'org-openroadm-interfaces:otnOdu',
1232 'supporting-port': 'CP3-CFP0-P1'}
1234 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1235 'rate': 'org-openroadm-otn-common-types:ODU0',
1236 'monitoring-mode': 'monitored'}
1237 input_dict_3 = {'trib-port-number': 1}
1239 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1240 res['interface'][0])
1241 self.assertDictEqual(dict(input_dict_2,
1242 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1243 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1244 self.assertDictEqual(dict(input_dict_3,
1245 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1246 'parent-odu-allocation']),
1247 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
1248 self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
1251 def test_75_check_ODU0_connection_spdra(self):
1252 response = test_utils.check_netconf_node_request(
1254 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1255 self.assertEqual(response.status_code, requests.codes.ok)
1256 res = response.json()
1259 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1260 'direction': 'bidirectional'
1263 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1264 res['odu-connection'][0])
1265 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1266 res['odu-connection'][0]['destination'])
1267 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1268 res['odu-connection'][0]['source'])
1270 def test_76_check_interface_1GE_CLIENT_spdrc(self):
1271 response = test_utils.check_netconf_node_request(
1272 "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1273 self.assertEqual(response.status_code, requests.codes.ok)
1274 res = response.json()
1275 input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
1276 'administrative-state': 'inService',
1277 'supporting-circuit-pack-name': 'CP3-SFP1',
1278 'type': 'org-openroadm-interfaces:ethernetCsmacd',
1279 'supporting-port': 'CP3-SFP1-P1'
1281 self.assertDictEqual(dict(input_dict, **res['interface'][0]),
1282 res['interface'][0])
1283 self.assertDictEqual(
1285 res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
1287 def test_77_check_interface_ODU0_CLIENT_spdrc(self):
1288 response = test_utils.check_netconf_node_request(
1289 "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
1290 self.assertEqual(response.status_code, requests.codes.ok)
1291 res = response.json()
1292 input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
1293 'administrative-state': 'inService',
1294 'supporting-circuit-pack-name': 'CP3-SFP1',
1295 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
1296 'type': 'org-openroadm-interfaces:otnOdu',
1297 'supporting-port': 'CP3-SFP1-P1'}
1299 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
1300 'rate': 'org-openroadm-otn-common-types:ODU0',
1301 'monitoring-mode': 'terminated'}
1303 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1304 res['interface'][0])
1305 self.assertDictEqual(dict(input_dict_2,
1306 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1307 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1308 self.assertDictEqual(
1309 {'payload-type': '07', 'exp-payload-type': '07'},
1310 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
1312 def test_78_check_interface_ODU0_NETWORK_spdrc(self):
1313 response = test_utils.check_netconf_node_request(
1314 "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
1315 self.assertEqual(response.status_code, requests.codes.ok)
1316 res = response.json()
1317 input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
1318 'administrative-state': 'inService',
1319 'supporting-circuit-pack-name': 'CP3-CFP0',
1320 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
1321 'type': 'org-openroadm-interfaces:otnOdu',
1322 'supporting-port': 'CP3-CFP0-P1'}
1324 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
1325 'rate': 'org-openroadm-otn-common-types:ODU0',
1326 'monitoring-mode': 'monitored'}
1328 input_dict_3 = {'trib-port-number': 1}
1330 self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
1331 res['interface'][0])
1332 self.assertDictEqual(dict(input_dict_2,
1333 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
1334 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
1335 self.assertDictEqual(dict(input_dict_3,
1336 **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1337 'parent-odu-allocation']),
1338 res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
1339 'parent-odu-allocation'])
1341 res['interface'][0][
1342 'org-openroadm-otn-odu-interfaces:odu'][
1343 'parent-odu-allocation']['trib-slots'])
1345 def test_79_check_ODU0_connection_spdrc(self):
1346 response = test_utils.check_netconf_node_request(
1348 "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
1349 self.assertEqual(response.status_code, requests.codes.ok)
1350 res = response.json()
1353 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
1354 'direction': 'bidirectional'
1357 self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
1358 res['odu-connection'][0])
1359 self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0-service1-1GE'},
1360 res['odu-connection'][0]['destination'])
1361 self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0-service1-1GE'},
1362 res['odu-connection'][0]['source'])
1364 def test_80_check_otn_topo_links(self):
1365 response = test_utils.get_otn_topo_request()
1366 self.assertEqual(response.status_code, requests.codes.ok)
1367 res = response.json()
1368 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1369 self.assertEqual(nb_links, 4)
1370 for link in res['network'][0]['ietf-network-topology:link']:
1371 linkId = link['link-id']
1372 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1373 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1375 link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
1377 link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
1379 def test_81_check_otn_topo_tp(self):
1380 response = test_utils.get_otn_topo_request()
1381 res = response.json()
1382 for node in res['network'][0]['node']:
1383 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1384 tpList = node['ietf-network-topology:termination-point']
1386 if tp['tp-id'] == 'XPDR3-NETWORK1':
1387 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1388 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
1389 tsPoolList = list(range(1, 2))
1391 tsPoolList, xpdrTpPortConAt['ts-pool'])
1393 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
1395 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
1397 def test_82_delete_1GE_service(self):
1398 response = test_utils.service_delete_request("service1-1GE")
1399 self.assertEqual(response.status_code, requests.codes.ok)
1400 res = response.json()
1401 self.assertIn('Renderer service delete in progress',
1402 res['output']['configuration-response-common']['response-message'])
1403 time.sleep(self.WAITING)
1405 def test_83_check_service_list(self):
1406 response = test_utils.get_service_list_request("")
1407 self.assertEqual(response.status_code, requests.codes.ok)
1408 res = response.json()
1409 self.assertEqual(len(res['service-list']['services']), 2)
1412 def test_84_check_no_ODU0_connection_spdra(self):
1413 response = test_utils.check_netconf_node_request("SPDR-SA1", "")
1414 self.assertEqual(response.status_code, requests.codes.ok)
1415 res = response.json()
1416 self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
1419 def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
1420 response = test_utils.check_netconf_node_request(
1421 "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
1422 self.assertEqual(response.status_code, requests.codes.conflict)
1424 def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
1425 response = test_utils.check_netconf_node_request(
1426 "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
1427 self.assertEqual(response.status_code, requests.codes.conflict)
1429 def test_87_check_no_interface_10GE_CLIENT_spdra(self):
1430 response = test_utils.check_netconf_node_request(
1431 "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
1432 self.assertEqual(response.status_code, requests.codes.conflict)
1434 def test_88_check_otn_topo_links(self):
1435 response = test_utils.get_otn_topo_request()
1436 self.assertEqual(response.status_code, requests.codes.ok)
1437 res = response.json()
1438 nb_links = len(res['network'][0]['ietf-network-topology:link'])
1439 self.assertEqual(nb_links, 4)
1440 for link in res['network'][0]['ietf-network-topology:link']:
1441 linkId = link['link-id']
1442 if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
1443 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
1445 link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
1447 link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
1449 def test_89_check_otn_topo_tp(self):
1450 response = test_utils.get_otn_topo_request()
1451 res = response.json()
1452 for node in res['network'][0]['node']:
1453 if node['node-id'] in ('SPDR-SA1-XPDR3', 'SPDR-SC1-XPDR3'):
1454 tpList = node['ietf-network-topology:termination-point']
1456 if tp['tp-id'] == 'XPDR3-NETWORK1':
1457 xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
1458 self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
1460 len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
1462 def test_90_delete_ODU4_service(self):
1463 response = test_utils.service_delete_request("service2-ODU4")
1464 self.assertEqual(response.status_code, requests.codes.ok)
1465 res = response.json()
1466 self.assertIn('Renderer service delete in progress',
1467 res['output']['configuration-response-common']['response-message'])
1468 time.sleep(self.WAITING)
1470 def test_91_delete_OCH_OTU4_service(self):
1471 response = test_utils.service_delete_request("service2-OCH-OTU4")
1472 self.assertEqual(response.status_code, requests.codes.ok)
1473 res = response.json()
1474 self.assertIn('Renderer service delete in progress',
1475 res['output']['configuration-response-common']['response-message'])
1476 time.sleep(self.WAITING)
1478 def test_92_disconnect_xponders_from_roadm(self):
1479 url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
1480 response = test_utils.get_ordm_topo_request("")
1481 self.assertEqual(response.status_code, requests.codes.ok)
1482 res = response.json()
1483 links = res['network'][0]['ietf-network-topology:link']
1485 if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
1486 link_name = link["link-id"]
1487 response = test_utils.delete_request(url+link_name)
1488 self.assertEqual(response.status_code, requests.codes.ok)
1490 def test_93_check_openroadm_topology(self):
1491 response = test_utils.get_ordm_topo_request("")
1492 self.assertEqual(response.status_code, requests.codes.ok)
1493 res = response.json()
1494 links = res['network'][0]['ietf-network-topology:link']
1495 self.assertEqual(18, len(links), 'Topology should contain 18 links')
1497 def test_94_disconnect_spdrA(self):
1498 response = test_utils.unmount_device("SPDR-SA1")
1499 self.assertEqual(response.status_code, requests.codes.ok,
1500 test_utils.CODE_SHOULD_BE_200)
1502 def test_95_disconnect_spdrC(self):
1503 response = test_utils.unmount_device("SPDR-SC1")
1504 self.assertEqual(response.status_code, requests.codes.ok,
1505 test_utils.CODE_SHOULD_BE_200)
1507 def test_96_disconnect_roadmA(self):
1508 response = test_utils.unmount_device("ROADM-A1")
1509 self.assertEqual(response.status_code, requests.codes.ok,
1510 test_utils.CODE_SHOULD_BE_200)
1512 def test_97_disconnect_roadmC(self):
1513 response = test_utils.unmount_device("ROADM-C1")
1514 self.assertEqual(response.status_code, requests.codes.ok,
1515 test_utils.CODE_SHOULD_BE_200)
1518 if __name__ == "__main__":
1519 unittest.main(verbosity=2)